use axum::{extract::{State},Json}; use super::{Ident,check_token}; use crate::{AppState, LogLevel::*, datasource::Datasource, log}; #[derive(serde::Serialize)] pub struct MqttBack{ pub errcode: i16, #[serde(skip_serializing_if = "Option::is_none")] pub errmsg: Option, #[serde(skip_serializing_if = "Option::is_none")] pub clientid: Option, #[serde(skip_serializing_if = "Option::is_none")] pub mqttu: Option, #[serde(skip_serializing_if = "Option::is_none")] pub mqttp: Option } // pub async fn get_mqtt( // State(state): State, // Json(u): axum::extract::Json // ) -> Json { // let (uid,_) = match check_token(&state, u.token).await { // Ok(id) => {id}, // Err(_) => { // return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None}) // } // }; // let mut mqid:String; // match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::(0)}).await{ // Ok(clientid)=>{ // mqid=clientid; // // if clientid.is_empty(){ // // mqid = format!("webu{}",super::token(5)); // // }else{ // // if let Err(e)=tokio::fs::remove_file(format!("/var/lib/mosquitto/{}",clientid)).await{ // // log(Warning, format!("fail to remove old mqtt user file {e}")) // // }; // // mqid=clientid; // // } // } // Err(e) => { // if !e.is_empty(){ // log(Error, format!("query mqid failed: {e}")); // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户信息失败".to_string()),mqttp:None,clientid:None,mqttu:None}); // } // mqid="".to_string(); // } // } // if mqid.is_empty(){ // let mut mq: String; // let mut try_time=0; // loop{// 确保获取不重复的clientid,mqid设定为unique // if try_time>5{ // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 无法为用户绑定新clientid, 因为当前已经存在太多用户".to_string()),mqttp:None,clientid:None,mqttu:None}); // } // match state.db_lite.execute("update user set mqid=? where id=?", ({mq=format!("webu{}",super::token(5));mq.clone()},uid)).await{ // Ok(_) => {mqid=mq;break;}, // Err(e) => { // if e.is_empty(){ // try_time+=1; // continue; // } // log(Error, format!("update mqid failed: {e}")); // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 未能绑定mqtt-clientid到用户".to_string()),mqttp:None,clientid:None,mqttu:None}) // } // }; // } // } // let (mqtp, mqtu): (String,String); // match tokio::fs::File::create(format!("/var/lib/mosquitto/user/{mqid}")).await{ // Ok(mut f) => { // (mqtu,mqtp) = (super::token(12),super::token(12)); // if let Err(e)=f.write( // format!("{}\n{}\n1\n{}",mqtu.clone(),mqtp.clone(), // if let Ok(t)=(std::time::SystemTime::now()+std::time::Duration::from_mins(10)).duration_since(std::time::UNIX_EPOCH){ // t.as_secs() // }else{0} // ).as_bytes() // ).await{ // log(Warning, format!("mqtt user file write failed {e}")); // }; // }, // Err(e) => { // log(Warning, format!("mqtt user file create failed {e}")); // return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None}); // } // }; // match state.db_lite.query_rows("select d.sn from device d left join map_user_device m on d.id=m.did where m.uid=?", [uid], |r|{r.get::(0)}).await{ // Ok(r)=>{ // // log(Debug, format!("get mqtt sub list{:?}",r)); // let mut fsub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/sub/{}",mqid)).await{f}else{ // log(Warning, format!("fail at mqtt sub create")); // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None}); // }; // let mut fpub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/pub/{}",mqid)).await{f}else{ // log(Warning, format!("fail at mqtt pub create")); // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None}); // }; // if !r.is_empty(){ // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}", r.join("\n/wf/Iot/device/"),r.join("\n")).as_bytes()).await{ // // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}/+/cmd", r.join("\n/wf/Iot/device/"),r.join("/+/cmd\n")).as_bytes()).await{ // log(Warning, format!("fail to write pub list to file {e}")); // }; // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}", r.join("\n/wf/Iot/client/"),r.join("\n")).as_bytes()).await{ // // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}/+/state", r.join("\n/wf/Iot/client/"),r.join("/+/state\n")).as_bytes()).await{ // log(Warning, format!("fail to write sub list to file {e}")); // }; // } else { // if let Err(e)=fpub.write(String::new().as_bytes()).await{ // log(Warning, format!("fail to write pub list to file {e}")); // }; // if let Err(e)=fsub.write(String::new().as_bytes()).await{ // log(Warning, format!("fail to write sub list to file {e}")); // }; // } // } // Err(e) => { // if !e.is_empty(){ // return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 获取用户下属清单失败".to_string()),mqttp:None,clientid:None,mqttu:None}); // } // } // } // return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)}) // } pub async fn get_mqtt( State(state): State, Json(u): axum::extract::Json ) -> Json { let (uid,_) = match check_token(&state, u.token).await { Ok(id) => {id}, Err(_) => { return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None}) } }; let (mqtp, mqtu): (String,String) = (super::token(12),super::token(12)); let mqid = if let Ok(s) = state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::(0)}).await{ s }else{ return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户mqtt-clientid失败".to_string()),mqttp:None,clientid:None,mqttu:None}); }; if let Some(d) = state.mq_detail{ match d.db_mq_user.execute("INSERT INTO user(clientid,user,pass,mode,deadline)VALUES(?,?,?,1,unixepoch()+300)ON CONFLICT(clientid)DO UPDATE SET user=excluded.user,pass=excluded.pass,mode=excluded.mode,deadline=excluded.deadline", (mqid.clone(),mqtu.clone(),mqtp.clone())).await{ Ok(_) => {} Err(e)=>{log(Warning, format!("fail to update mqttuser: {e}"));} } } return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)}); } #[test] fn test_f(){ use tokio::io::{AsyncReadExt}; tokio::runtime::Runtime::new().unwrap().block_on(async{ use tokio::io::{AsyncWriteExt}; match tokio::fs::File::create("testf").await{ Ok(mut f) =>{ let buf: &mut [u8]; let mut b = [0;1024]; buf=b.as_mut_slice(); if let Ok(n)=f.read(buf).await{println!("file read {}",n)}; if let Ok(n)=f.write(format!("asdf").as_bytes()).await{println!("file write {}",n); if let Err(e)=f.flush().await{ println!("file flush failed: {}",e)};}; } Err(e) => { // match tokio::fs::File::create("testf").await{ // Ok(mut f) => { // println!("file create"); // if let Err(e)=f.write(format!("creatt").as_bytes()).await{ // println!("file write failed: {}",e); // }; // } // Err(e) => {println!("file create failed: {}",e)} // } println!("file create failed: {}",e); } } }) }