|
|
@@ -0,0 +1,205 @@
|
|
|
+use axum::{extract::{State},Json};
|
|
|
+use tokio::io::{AsyncWriteExt};
|
|
|
+use super::{Ident,check_token};
|
|
|
+use crate::{AppState, datasource::Datasource, log, LogLevel::*};
|
|
|
+
|
|
|
+#[derive(serde::Serialize)]
|
|
|
+pub struct MqttBack{
|
|
|
+ pub errcode: i16,
|
|
|
+ #[serde(skip_serializing_if = "Option::is_none")]
|
|
|
+ pub errmsg: Option<String>,
|
|
|
+ #[serde(skip_serializing_if = "Option::is_none")]
|
|
|
+ pub clientid: Option<String>,
|
|
|
+ #[serde(skip_serializing_if = "Option::is_none")]
|
|
|
+ pub mqttu: Option<String>,
|
|
|
+ #[serde(skip_serializing_if = "Option::is_none")]
|
|
|
+ pub mqttp: Option<String>
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn get_mqtt(
|
|
|
+ State(state): State<AppState>,
|
|
|
+ Json(u): axum::extract::Json<Ident>
|
|
|
+) -> Json<MqttBack> {
|
|
|
+ let uid=match check_token(&state, u.token).await {
|
|
|
+ Ok(id) => {id},
|
|
|
+ Err(_) => {
|
|
|
+ return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
|
|
|
+ }
|
|
|
+ };
|
|
|
+ let mut mqid:String;
|
|
|
+ match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
|
|
|
+ Ok(clientid)=>{
|
|
|
+ mqid=clientid;
|
|
|
+ // if clientid.is_empty(){
|
|
|
+ // mqid = format!("webu{}",super::token(5));
|
|
|
+ // }else{
|
|
|
+ // if let Err(e)=tokio::fs::remove_file(format!("/var/lib/mosquitto/{}",clientid)).await{
|
|
|
+ // log(Warning, format!("fail to remove old mqtt user file {e}"))
|
|
|
+ // };
|
|
|
+ // mqid=clientid;
|
|
|
+ // }
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ if !e.is_empty(){
|
|
|
+ log(Error, format!("query mqid failed: {e}"));
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户信息失败".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ }
|
|
|
+ mqid="".to_string();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if mqid.is_empty(){
|
|
|
+
|
|
|
+ let mut mq: String;
|
|
|
+ let mut try_time=0;
|
|
|
+ loop{// 确保获取不重复的clientid,mqid设定为unique
|
|
|
+ if try_time>5{
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 无法为用户绑定新clientid, 因为当前已经存在太多用户".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ }
|
|
|
+ match state.db_lite.execute("update user set mqid=? where id=?", ({mq=format!("webu{}",super::token(5));mq.clone()},uid)).await{
|
|
|
+ Ok(_) => {mqid=mq;break;},
|
|
|
+ Err(e) => {
|
|
|
+ if e.is_empty(){
|
|
|
+ try_time+=1;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ log(Error, format!("update mqid failed: {e}"));
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 未能绑定mqtt-clientid到用户".to_string()),mqttp:None,clientid:None,mqttu:None})
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let (mqtp, mqtu): (String,String);
|
|
|
+ match tokio::fs::File::create(format!("/var/lib/mosquitto/user/{mqid}")).await{
|
|
|
+ Ok(mut f) => {
|
|
|
+ (mqtu,mqtp) = (super::token(12),super::token(12));
|
|
|
+ if let Err(e)=f.write(
|
|
|
+ format!("{}\n{}\n1\n{}",mqtu.clone(),mqtp.clone(),
|
|
|
+ if let Ok(t)=(std::time::SystemTime::now()+std::time::Duration::from_mins(10)).duration_since(std::time::UNIX_EPOCH){
|
|
|
+ t.as_secs()
|
|
|
+ }else{0}
|
|
|
+ ).as_bytes()
|
|
|
+ ).await{
|
|
|
+ log(Warning, format!("mqtt user file write failed {e}"));
|
|
|
+ };
|
|
|
+ },
|
|
|
+ Err(e) => {
|
|
|
+ log(Warning, format!("mqtt user file create failed {e}"));
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ }
|
|
|
+ };
|
|
|
+ match state.db_lite.query_rows("select d.sn from device d left join map_user_device m on d.id=m.did where m.uid=?", [uid], |r|{r.get::<usize,String>(0)}).await{
|
|
|
+ Ok(r)=>{
|
|
|
+ log(Debug, format!("get mqtt sub list{:?}",r));
|
|
|
+ let mut fsub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/sub/{}",mqid)).await{f}else{
|
|
|
+ log(Warning, format!("fail at mqtt sub create"));
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ };
|
|
|
+ let mut fpub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/pub/{}",mqid)).await{f}else{
|
|
|
+ log(Warning, format!("fail at mqtt pub create"));
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ };
|
|
|
+ if !r.is_empty(){
|
|
|
+ if let Err(e)=fpub.write(format!("/wf/Iot/device/{}", r.join("\n/wf/Iot/device/")).as_bytes()).await{
|
|
|
+ log(Warning, format!("fail to write pub lit to file {e}"));
|
|
|
+ };
|
|
|
+ if let Err(e)=fsub.write(format!("/wf/Iot/client/{}", r.join("\n/wf/Iot/client/")).as_bytes()).await{
|
|
|
+ log(Warning, format!("fail to write sub lit to file {e}"));
|
|
|
+ };
|
|
|
+ } else {
|
|
|
+ if let Err(e)=fpub.write(String::new().as_bytes()).await{
|
|
|
+ log(Warning, format!("fail to write pub lit to file {e}"));
|
|
|
+ };
|
|
|
+ if let Err(e)=fsub.write(String::new().as_bytes()).await{
|
|
|
+ log(Warning, format!("fail to write sub lit to file {e}"));
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ if !e.is_empty(){
|
|
|
+ return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 获取用户下属清单失败".to_string()),mqttp:None,clientid:None,mqttu:None});
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)})
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+// pub async fn get_mqtt(
|
|
|
+// State(state): State<AppState>,
|
|
|
+// Json(u): axum::extract::Json<Ident>
|
|
|
+// ) -> Json<MqttBack> {
|
|
|
+// let uid=match check_token(&state, u.token).await {
|
|
|
+// Ok(id) => {id},
|
|
|
+// Err(_) => {
|
|
|
+// return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
|
|
|
+// }
|
|
|
+// };
|
|
|
+
|
|
|
+// match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
|
|
|
+// Ok(mut mqid) => {
|
|
|
+// if mqid.is_empty(){
|
|
|
+// loop{ // 确保获取不重复的clientid,mqid设定为unique
|
|
|
+// match state.db_lite.execute("update user set mqid=? where id=?", ({mqid=super::token(12);mqid.clone()},uid)).await{
|
|
|
+// Ok(_) => {break;},
|
|
|
+// Err(e) => {
|
|
|
+// if e.is_empty(){
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// log(Error, format!("update mqid failed: {e}"));
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// };
|
|
|
+// }
|
|
|
+// }
|
|
|
+// let (mqu,mqp) :(String,String);
|
|
|
+// match state.db_mqtt.query("select user,pass from user where clientid=?", [mqid.clone()], |r|{Ok((r.get::<usize,String>(1)?,r.get::<usize,String>(2)?))}).await{
|
|
|
+// Ok((u,p)) => {mqu=u;mqp=p;},
|
|
|
+// Err(e) => {
|
|
|
+// log(Warning,format!("{e}"));
|
|
|
+// (mqu,mqp)=(super::token(12),super::token(12));
|
|
|
+// match state.db_mqtt.execute("insert into user(clientid,user,pass) values(?,?,?)", (mqid.clone(),mqu.clone(),mqp.clone())).await{
|
|
|
+// Ok(_) => {},
|
|
|
+// Err(e) => {log(Error, format!("insert mqtt user failed: {e}"));}
|
|
|
+// }
|
|
|
+// }
|
|
|
+// };
|
|
|
+// return Json(MqttBack{
|
|
|
+// errcode: 0,
|
|
|
+// errmsg: None,
|
|
|
+// mqttp:Some(mqp),clientid:Some(mqid),mqttu:Some(mqu)
|
|
|
+// })
|
|
|
+// }
|
|
|
+// Err(e) => {
|
|
|
+// return Json(MqttBack{errcode: 3000, errmsg: Some(format!("查询用户失败: {e}")),mqttp:None,clientid:None,mqttu:None})
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+#[test]
|
|
|
+fn test_f(){
|
|
|
+use tokio::io::{AsyncReadExt};
|
|
|
+ tokio::runtime::Runtime::new().unwrap().block_on(async{
|
|
|
+ match tokio::fs::File::create("testf").await{
|
|
|
+ Ok(mut f) =>{
|
|
|
+ let buf: &mut [u8];
|
|
|
+ let mut b = [0;1024];
|
|
|
+ buf=b.as_mut_slice();
|
|
|
+ if let Ok(n)=f.read(buf).await{println!("file read {}",n)};
|
|
|
+ if let Ok(n)=f.write(format!("asdf").as_bytes()).await{println!("file write {}",n);
|
|
|
+ if let Err(e)=f.flush().await{ println!("file flush failed: {}",e)};};
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ // match tokio::fs::File::create("testf").await{
|
|
|
+ // Ok(mut f) => {
|
|
|
+ // println!("file create");
|
|
|
+ // if let Err(e)=f.write(format!("creatt").as_bytes()).await{
|
|
|
+ // println!("file write failed: {}",e);
|
|
|
+ // };
|
|
|
+ // }
|
|
|
+ // Err(e) => {println!("file create failed: {}",e)}
|
|
|
+ // }
|
|
|
+ println!("file create failed: {}",e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|