| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- use axum::{extract::{State},Json};
- use super::{Ident,check_token};
- use crate::{AppState, LogLevel::*, datasource::Datasource, log};
- #[derive(serde::Serialize)]
- pub struct MqttBack{
- pub errcode: i16,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub errmsg: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub clientid: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub mqttu: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub mqttp: Option<String>
- }
- // pub async fn get_mqtt(
- // State(state): State<AppState>,
- // Json(u): axum::extract::Json<Ident>
- // ) -> Json<MqttBack> {
- // let (uid,_) = match check_token(&state, u.token).await {
- // Ok(id) => {id},
- // Err(_) => {
- // return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
- // }
- // };
- // let mut mqid:String;
- // match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
- // Ok(clientid)=>{
- // mqid=clientid;
- // // if clientid.is_empty(){
- // // mqid = format!("webu{}",super::token(5));
- // // }else{
- // // if let Err(e)=tokio::fs::remove_file(format!("/var/lib/mosquitto/{}",clientid)).await{
- // // log(Warning, format!("fail to remove old mqtt user file {e}"))
- // // };
- // // mqid=clientid;
- // // }
- // }
- // Err(e) => {
- // if !e.is_empty(){
- // log(Error, format!("query mqid failed: {e}"));
- // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户信息失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- // }
- // mqid="".to_string();
- // }
- // }
- // if mqid.is_empty(){
-
- // let mut mq: String;
- // let mut try_time=0;
- // loop{// 确保获取不重复的clientid,mqid设定为unique
- // if try_time>5{
- // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 无法为用户绑定新clientid, 因为当前已经存在太多用户".to_string()),mqttp:None,clientid:None,mqttu:None});
- // }
- // match state.db_lite.execute("update user set mqid=? where id=?", ({mq=format!("webu{}",super::token(5));mq.clone()},uid)).await{
- // Ok(_) => {mqid=mq;break;},
- // Err(e) => {
- // if e.is_empty(){
- // try_time+=1;
- // continue;
- // }
- // log(Error, format!("update mqid failed: {e}"));
- // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 未能绑定mqtt-clientid到用户".to_string()),mqttp:None,clientid:None,mqttu:None})
- // }
- // };
- // }
- // }
- // let (mqtp, mqtu): (String,String);
- // match tokio::fs::File::create(format!("/var/lib/mosquitto/user/{mqid}")).await{
- // Ok(mut f) => {
- // (mqtu,mqtp) = (super::token(12),super::token(12));
- // if let Err(e)=f.write(
- // format!("{}\n{}\n1\n{}",mqtu.clone(),mqtp.clone(),
- // if let Ok(t)=(std::time::SystemTime::now()+std::time::Duration::from_mins(10)).duration_since(std::time::UNIX_EPOCH){
- // t.as_secs()
- // }else{0}
- // ).as_bytes()
- // ).await{
- // log(Warning, format!("mqtt user file write failed {e}"));
- // };
- // },
- // Err(e) => {
- // log(Warning, format!("mqtt user file create failed {e}"));
- // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- // }
- // };
- // match state.db_lite.query_rows("select d.sn from device d left join map_user_device m on d.id=m.did where m.uid=?", [uid], |r|{r.get::<usize,String>(0)}).await{
- // Ok(r)=>{
- // // log(Debug, format!("get mqtt sub list{:?}",r));
- // let mut fsub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/sub/{}",mqid)).await{f}else{
- // log(Warning, format!("fail at mqtt sub create"));
- // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- // };
- // let mut fpub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/pub/{}",mqid)).await{f}else{
- // log(Warning, format!("fail at mqtt pub create"));
- // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- // };
- // if !r.is_empty(){
- // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}", r.join("\n/wf/Iot/device/"),r.join("\n")).as_bytes()).await{
- // // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}/+/cmd", r.join("\n/wf/Iot/device/"),r.join("/+/cmd\n")).as_bytes()).await{
- // log(Warning, format!("fail to write pub list to file {e}"));
- // };
- // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}", r.join("\n/wf/Iot/client/"),r.join("\n")).as_bytes()).await{
- // // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}/+/state", r.join("\n/wf/Iot/client/"),r.join("/+/state\n")).as_bytes()).await{
- // log(Warning, format!("fail to write sub list to file {e}"));
- // };
- // } else {
- // if let Err(e)=fpub.write(String::new().as_bytes()).await{
- // log(Warning, format!("fail to write pub list to file {e}"));
- // };
- // if let Err(e)=fsub.write(String::new().as_bytes()).await{
- // log(Warning, format!("fail to write sub list to file {e}"));
- // };
- // }
- // }
- // Err(e) => {
- // if !e.is_empty(){
- // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 获取用户下属清单失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- // }
- // }
- // }
- // return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)})
- // }
- pub async fn get_mqtt(
- State(state): State<AppState>,
- Json(u): axum::extract::Json<Ident>
- ) -> Json<MqttBack> {
- let (uid,_) = match check_token(&state, u.token).await {
- Ok(id) => {id},
- Err(_) => {
- return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
- }
- };
- let (mqtp, mqtu): (String,String) = (super::token(12),super::token(12));
- let mqid = if let Ok(s) = state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
- s
- }else{
- return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户mqtt-clientid失败".to_string()),mqttp:None,clientid:None,mqttu:None});
- };
- if let Some(d) = state.mq_detail{
- match d.db_mq_user.execute("INSERT INTO user(clientid,user,pass,mode,deadline)VALUES(?,?,?,1,unixepoch()+300)ON CONFLICT(clientid)DO UPDATE SET user=excluded.user,pass=excluded.pass,mode=excluded.mode,deadline=excluded.deadline", (mqid.clone(),mqtu.clone(),mqtp.clone())).await{
- Ok(_) => {}
- Err(e)=>{log(Warning, format!("fail to update mqttuser: {e}"));}
- }
- }
- return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)});
- }
- #[test]
- fn test_f(){
- use tokio::io::{AsyncReadExt};
- tokio::runtime::Runtime::new().unwrap().block_on(async{
- use tokio::io::{AsyncWriteExt};
- match tokio::fs::File::create("testf").await{
- Ok(mut f) =>{
- let buf: &mut [u8];
- let mut b = [0;1024];
- buf=b.as_mut_slice();
- if let Ok(n)=f.read(buf).await{println!("file read {}",n)};
- if let Ok(n)=f.write(format!("asdf").as_bytes()).await{println!("file write {}",n);
- if let Err(e)=f.flush().await{ println!("file flush failed: {}",e)};};
- }
- Err(e) => {
- // match tokio::fs::File::create("testf").await{
- // Ok(mut f) => {
- // println!("file create");
- // if let Err(e)=f.write(format!("creatt").as_bytes()).await{
- // println!("file write failed: {}",e);
- // };
- // }
- // Err(e) => {println!("file create failed: {}",e)}
- // }
- println!("file create failed: {}",e);
- }
- }
- })
- }
|