Explorar o código

mqtt侧转sqlite调整

zii hai 1 mes
pai
achega
6d4c17849e
Modificáronse 9 ficheiros con 470 adicións e 193 borrados
  1. 107 0
      docs/mqtt插件侧README.md
  2. 117 142
      src/api/ability.rs
  3. 4 4
      src/api/area.rs
  4. 35 5
      src/api/auth.rs
  5. 146 21
      src/api/device.rs
  6. 36 11
      src/api/flow_task.rs
  7. 6 6
      src/api/mod.rs
  8. 2 2
      src/api/user.rs
  9. 17 2
      src/main.rs

+ 107 - 0
docs/mqtt插件侧README.md

@@ -0,0 +1,107 @@
+# 库介绍
+
+这里用到了3个库,库文件的路径为  
+/var/lib/mosquitto/db/user.db  
+/var/lib/mosquitto/db/acl.db  
+/var/lib/mosquitto/db/dev.db  
+  
+/var/lib/mosquitto/db/user.db下只有一个表  
+表名为user,是用户表,建表语句如下
+
+```sql
+CREATE TABLE `user` (
+  `uid` INTEGER PRIMARY KEY AUTOINCREMENT,
+  `clientid` TEXT NOT NULL,
+  `user` TEXT NOT NULL,
+  `pass` TEXT NOT NULL,
+  `mode` INTEGER NOT NULL DEFAULT 0,
+  `deadline` INTEGER NOT NULL DEFAULT 9223372036854775807,
+  UNIQUE(`clientid`, `user`, `pass`)
+);
+```
+
+其中clientid、user、pass均为登录参数,  
+mode代表是多次有效还是一次有效,0为多次有效,1为一次有效。  
+deadline代表这条记录的有效期,默认为无穷大,如果想创建有有效期的账号,可以利用这个字段。  
+  
+/var/lib/mosquitto/db/acl.db下有两个表  
+分别是pub表与sub表,是发送与订阅控制表,建表语句如下
+
+```sql
+CREATE TABLE `pub` (
+  `pid` INTEGER PRIMARY KEY AUTOINCREMENT,
+  `clientid` TEXT NOT NULL,
+  `topic` TEXT NOT NULL,
+  `deadline` INTEGER NOT NULL DEFAULT 9223372036854775807,
+  UNIQUE(`clientid`, `topic`)
+);
+CREATE TABLE `sub` (
+  `sid` INTEGER PRIMARY KEY AUTOINCREMENT,
+  `clientid` TEXT NOT NULL,
+  `topic` TEXT NOT NULL,
+  `deadline` INTEGER NOT NULL DEFAULT 9223372036854775807,
+  UNIQUE(`clientid`, `topic`)
+);
+```
+
+其中clientid为登录时参数,topic为topic的前缀,不是完整topic哦。  
+deadline代表这条记录的有效期,默认为无穷大,如果想创建有有效期的账号,可以利用这个字段。  
+  
+/var/lib/mosquitto/db/dev.db
+表名为dev表,是设备状态表,建表语句如下
+
+```sql
+CREATE TABLE `dev` (
+  `did` INTEGER PRIMARY KEY AUTOINCREMENT,
+  `clientid` TEXT NOT NULL,
+  `DI0` INTEGER NOT NULL DEFAULT 0,
+  `DI1` INTEGER NOT NULL DEFAULT 0,
+  `DI2` INTEGER NOT NULL DEFAULT 0,
+  `DI3` INTEGER NOT NULL DEFAULT 0,
+  `DI4` INTEGER NOT NULL DEFAULT 0,
+  `DI5` INTEGER NOT NULL DEFAULT 0,
+  `DI6` INTEGER NOT NULL DEFAULT 0,
+  `DI7` INTEGER NOT NULL DEFAULT 0,
+  `DO0` INTEGER NOT NULL DEFAULT 0,
+  `DO1` INTEGER NOT NULL DEFAULT 0,
+  `DO2` INTEGER NOT NULL DEFAULT 0,
+  `DO3` INTEGER NOT NULL DEFAULT 0,
+  `DO4` INTEGER NOT NULL DEFAULT 0,
+  `DO5` INTEGER NOT NULL DEFAULT 0,
+  `DO6` INTEGER NOT NULL DEFAULT 0,
+  `DO7` INTEGER NOT NULL DEFAULT 0,
+  UNIQUE(`clientid`)
+);
+```
+
+其中clientid为设备的ID,其他为设备状态,如果收到了下线包(avty:offline),会删除那一行设备。  
+也就是说,如果没查到,就说明设备离线,查到东西说明设备在线。  
+
+## 编译方法
+
+需要先安装mosquitto的库与开发库,方法如下
+
+```bash
+apt install libmosquitto-dev mosquitto-dev -y
+```
+
+然后直接执行make即可。
+
+## 使用方法
+
+将生成的mosquitto_plugin.so放到特定目录下,如/etc/mosquitto  
+修改mosquitto的配置文件,在最后面添加  
+plugin [目录]/mosquitto_plugin.so  
+如`plugin /etc/mosquitto/mosquitto_plugin.so`  
+完整mosquitto配置文件参考
+
+```yaml
+listener 1883
+protocol mqtt
+
+listener 9001
+protocol websockets
+
+allow_anonymous false
+plugin /etc/mosquitto/mosquitto_plugin.so
+```

+ 117 - 142
src/api/ability.rs

@@ -1,7 +1,6 @@
 use axum::{extract::{State},Json};
-use tokio::io::{AsyncWriteExt};
 use super::{Ident,check_token};
-use crate::{AppState, datasource::Datasource, log, LogLevel::*};
+use crate::{AppState, LogLevel::*, datasource::Datasource, log};
 
 #[derive(serde::Serialize)]
 pub struct MqttBack{
@@ -17,170 +16,146 @@ pub struct MqttBack{
 
 }
 
-pub async fn get_mqtt(
-    State(state): State<AppState>,
-    Json(u): axum::extract::Json<Ident>
-) -> Json<MqttBack> {
-    let uid=match check_token(&state, u.token).await {
-        Ok(id) => {id},
-        Err(_) => {
-            return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
-        }
-    };
-    let mut mqid:String;
-    match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
-        Ok(clientid)=>{
-            mqid=clientid;
-            // if clientid.is_empty(){
-            //     mqid = format!("webu{}",super::token(5));
-            // }else{
-            //     if let Err(e)=tokio::fs::remove_file(format!("/var/lib/mosquitto/{}",clientid)).await{
-            //         log(Warning, format!("fail to remove old mqtt user file {e}"))
-            //     };
-            //     mqid=clientid;
-            // }
-        }
-        Err(e) => {
-            if !e.is_empty(){
-                log(Error, format!("query mqid failed: {e}"));
-                return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户信息失败".to_string()),mqttp:None,clientid:None,mqttu:None});
-            }
-            mqid="".to_string();
-        }
-    }
-    if mqid.is_empty(){ 
-        
-    let mut mq: String;
-    let mut try_time=0;
-        loop{// 确保获取不重复的clientid,mqid设定为unique
-            if try_time>5{
-                return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 无法为用户绑定新clientid, 因为当前已经存在太多用户".to_string()),mqttp:None,clientid:None,mqttu:None});
-            }
-            match state.db_lite.execute("update user set mqid=? where id=?", ({mq=format!("webu{}",super::token(5));mq.clone()},uid)).await{
-                Ok(_) => {mqid=mq;break;},
-                Err(e) => {
-                    if e.is_empty(){
-                        try_time+=1;
-                        continue;
-                    }
-                    log(Error, format!("update mqid failed: {e}"));
-                    return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 未能绑定mqtt-clientid到用户".to_string()),mqttp:None,clientid:None,mqttu:None})
-                }
-            };
-        }
-    }
-    let (mqtp, mqtu): (String,String);
-    match tokio::fs::File::create(format!("/var/lib/mosquitto/user/{mqid}")).await{
-        Ok(mut f) => {
-            (mqtu,mqtp) = (super::token(12),super::token(12));
-            if let Err(e)=f.write(
-                format!("{}\n{}\n1\n{}",mqtu.clone(),mqtp.clone(),
-                    if let Ok(t)=(std::time::SystemTime::now()+std::time::Duration::from_mins(10)).duration_since(std::time::UNIX_EPOCH){
-                        t.as_secs()
-                    }else{0}
-                ).as_bytes()
-            ).await{
-                log(Warning, format!("mqtt user file write failed {e}"));
-            };
-        },
-        Err(e) => {
-            log(Warning, format!("mqtt user file create failed {e}"));
-            return  Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
-        }
-    };
-    match state.db_lite.query_rows("select d.sn from device d left join map_user_device m on d.id=m.did where m.uid=?", [uid], |r|{r.get::<usize,String>(0)}).await{
-        Ok(r)=>{
-            // log(Debug, format!("get mqtt sub list{:?}",r));
-            let mut fsub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/sub/{}",mqid)).await{f}else{
-                log(Warning, format!("fail at mqtt sub create"));
-                return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
-            };
-            let mut fpub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/pub/{}",mqid)).await{f}else{
-                log(Warning, format!("fail at mqtt pub create"));
-                return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
-            };
-            if !r.is_empty(){
-                if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}", r.join("\n/wf/Iot/device/"),r.join("\n")).as_bytes()).await{
-                // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}/+/cmd", r.join("\n/wf/Iot/device/"),r.join("/+/cmd\n")).as_bytes()).await{
-                    log(Warning, format!("fail to write pub list to file {e}"));
-                };
-                if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}", r.join("\n/wf/Iot/client/"),r.join("\n")).as_bytes()).await{
-                // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}/+/state", r.join("\n/wf/Iot/client/"),r.join("/+/state\n")).as_bytes()).await{
-                    log(Warning, format!("fail to write sub list to file {e}"));
-                };
-            } else {
-                if let Err(e)=fpub.write(String::new().as_bytes()).await{
-                    log(Warning, format!("fail to write pub list to file {e}"));
-                };
-                if let Err(e)=fsub.write(String::new().as_bytes()).await{
-                    log(Warning, format!("fail to write sub list to file {e}"));
-                };
-            }
-        }
-        Err(e) => {
-            if !e.is_empty(){
-                return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 获取用户下属清单失败".to_string()),mqttp:None,clientid:None,mqttu:None});
-            }
-        }
-    }
-    return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)})
-}
-
-
 // pub async fn get_mqtt(
 //     State(state): State<AppState>,
 //     Json(u): axum::extract::Json<Ident>
 // ) -> Json<MqttBack> {
-//     let uid=match check_token(&state, u.token).await {
+//     let (uid,_) = match check_token(&state, u.token).await {
 //         Ok(id) => {id},
 //         Err(_) => {
 //             return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
 //         }
 //     };
-
+//     let mut mqid:String;
 //     match state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
-//         Ok(mut mqid) => {
-//             if mqid.is_empty(){
-//                 loop{ // 确保获取不重复的clientid,mqid设定为unique
-//                     match state.db_lite.execute("update user set mqid=? where id=?", ({mqid=super::token(12);mqid.clone()},uid)).await{
-//                         Ok(_) => {break;},
-//                         Err(e) => {
-//                             if e.is_empty(){
-//                                 continue;
-//                             }
-//                             log(Error, format!("update mqid failed: {e}"));
-//                             break;
-//                         }
-//                     };
-//                 }
+//         Ok(clientid)=>{
+//             mqid=clientid;
+//             // if clientid.is_empty(){
+//             //     mqid = format!("webu{}",super::token(5));
+//             // }else{
+//             //     if let Err(e)=tokio::fs::remove_file(format!("/var/lib/mosquitto/{}",clientid)).await{
+//             //         log(Warning, format!("fail to remove old mqtt user file {e}"))
+//             //     };
+//             //     mqid=clientid;
+//             // }
+//         }
+//         Err(e) => {
+//             if !e.is_empty(){
+//                 log(Error, format!("query mqid failed: {e}"));
+//                 return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户信息失败".to_string()),mqttp:None,clientid:None,mqttu:None});
 //             }
-//             let (mqu,mqp) :(String,String);
-//             match state.db_mqtt.query("select user,pass from user where clientid=?", [mqid.clone()], |r|{Ok((r.get::<usize,String>(1)?,r.get::<usize,String>(2)?))}).await{
-//                 Ok((u,p)) => {mqu=u;mqp=p;},
+//             mqid="".to_string();
+//         }
+//     }
+//     if mqid.is_empty(){ 
+        
+//     let mut mq: String;
+//     let mut try_time=0;
+//         loop{// 确保获取不重复的clientid,mqid设定为unique
+//             if try_time>5{
+//                 return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 无法为用户绑定新clientid, 因为当前已经存在太多用户".to_string()),mqttp:None,clientid:None,mqttu:None});
+//             }
+//             match state.db_lite.execute("update user set mqid=? where id=?", ({mq=format!("webu{}",super::token(5));mq.clone()},uid)).await{
+//                 Ok(_) => {mqid=mq;break;},
 //                 Err(e) => {
-//                     log(Warning,format!("{e}"));
-//                     (mqu,mqp)=(super::token(12),super::token(12));
-//                     match state.db_mqtt.execute("insert into user(clientid,user,pass) values(?,?,?)", (mqid.clone(),mqu.clone(),mqp.clone())).await{
-//                         Ok(_) => {},
-//                         Err(e) => {log(Error, format!("insert mqtt user failed: {e}"));}
+//                     if e.is_empty(){
+//                         try_time+=1;
+//                         continue;
 //                     }
+//                     log(Error, format!("update mqid failed: {e}"));
+//                     return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 未能绑定mqtt-clientid到用户".to_string()),mqttp:None,clientid:None,mqttu:None})
 //                 }
 //             };
-//             return Json(MqttBack{
-//                 errcode: 0,
-//                 errmsg: None,
-//                 mqttp:Some(mqp),clientid:Some(mqid),mqttu:Some(mqu)
-//             })
 //         }
+//     }
+//     let (mqtp, mqtu): (String,String);
+//     match tokio::fs::File::create(format!("/var/lib/mosquitto/user/{mqid}")).await{
+//         Ok(mut f) => {
+//             (mqtu,mqtp) = (super::token(12),super::token(12));
+//             if let Err(e)=f.write(
+//                 format!("{}\n{}\n1\n{}",mqtu.clone(),mqtp.clone(),
+//                     if let Ok(t)=(std::time::SystemTime::now()+std::time::Duration::from_mins(10)).duration_since(std::time::UNIX_EPOCH){
+//                         t.as_secs()
+//                     }else{0}
+//                 ).as_bytes()
+//             ).await{
+//                 log(Warning, format!("mqtt user file write failed {e}"));
+//             };
+//         },
 //         Err(e) => {
-//             return Json(MqttBack{errcode: 3000, errmsg: Some(format!("查询用户失败: {e}")),mqttp:None,clientid:None,mqttu:None})
+//             log(Warning, format!("mqtt user file create failed {e}"));
+//             return  Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
+//         }
+//     };
+//     match state.db_lite.query_rows("select d.sn from device d left join map_user_device m on d.id=m.did where m.uid=?", [uid], |r|{r.get::<usize,String>(0)}).await{
+//         Ok(r)=>{
+//             // log(Debug, format!("get mqtt sub list{:?}",r));
+//             let mut fsub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/sub/{}",mqid)).await{f}else{
+//                 log(Warning, format!("fail at mqtt sub create"));
+//                 return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
+//             };
+//             let mut fpub = if let Ok(f)=tokio::fs::File::create(format!("/var/lib/mosquitto/pub/{}",mqid)).await{f}else{
+//                 log(Warning, format!("fail at mqtt pub create"));
+//                 return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 创建用户文件失败".to_string()),mqttp:None,clientid:None,mqttu:None});
+//             };
+//             if !r.is_empty(){
+//                 if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}", r.join("\n/wf/Iot/device/"),r.join("\n")).as_bytes()).await{
+//                 // if let Err(e)=fpub.write(format!("/wf/Iot/device/{}\n{}/+/cmd", r.join("\n/wf/Iot/device/"),r.join("/+/cmd\n")).as_bytes()).await{
+//                     log(Warning, format!("fail to write pub list to file {e}"));
+//                 };
+//                 if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}", r.join("\n/wf/Iot/client/"),r.join("\n")).as_bytes()).await{
+//                 // if let Err(e)=fsub.write(format!("/wf/Iot/client/{}\n{}/+/state", r.join("\n/wf/Iot/client/"),r.join("/+/state\n")).as_bytes()).await{
+//                     log(Warning, format!("fail to write sub list to file {e}"));
+//                 };
+//             } else {
+//                 if let Err(e)=fpub.write(String::new().as_bytes()).await{
+//                     log(Warning, format!("fail to write pub list to file {e}"));
+//                 };
+//                 if let Err(e)=fsub.write(String::new().as_bytes()).await{
+//                     log(Warning, format!("fail to write sub list to file {e}"));
+//                 };
+//             }
+//         }
+//         Err(e) => {
+//             if !e.is_empty(){
+//                 return Json(MqttBack{errcode: 3000, errmsg: Some("mqtt鉴权失败: 获取用户下属清单失败".to_string()),mqttp:None,clientid:None,mqttu:None});
+//             }
 //         }
 //     }
+//     return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)})
 // }
+
+
+pub async fn get_mqtt(
+    State(state): State<AppState>,
+    Json(u): axum::extract::Json<Ident>
+) -> Json<MqttBack> {
+    let (uid,_) = match check_token(&state, u.token).await {
+        Ok(id) => {id},
+        Err(_) => {
+            return Json(MqttBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),mqttp:None,clientid:None,mqttu:None})
+        }
+    };
+    let (mqtp, mqtu): (String,String) = (super::token(12),super::token(12));
+    let mqid = if let Ok(s) = state.db_lite.query("select mqid from user where id=?", [uid], |r|{r.get::<usize,String>(0)}).await{
+        s
+    }else{
+        return Json(MqttBack{errcode: 3000, errmsg: Some("数据库异常: 获取用户mqtt-clientid失败".to_string()),mqttp:None,clientid:None,mqttu:None});
+    };
+    if let Some(d) = state.mq_detail{
+        if let Err(e)=d.db_mq_user.execute("update user set user=?, pass=? where clientid=?", (mqtu.clone(),mqtp.clone(),mqid.clone())).await{
+            log(Warning, format!("fail to update mqttuser: {e}"));
+        }
+    }
+
+    return Json(MqttBack{errcode: 0, errmsg: None,mqttp:Some(mqtp),clientid:Some(mqid),mqttu:Some(mqtu)});
+}
+
 #[test]
 fn test_f(){
 use tokio::io::{AsyncReadExt};
     tokio::runtime::Runtime::new().unwrap().block_on(async{
+use tokio::io::{AsyncWriteExt};
         match tokio::fs::File::create("testf").await{
             Ok(mut f) =>{
             let buf: &mut [u8];

+ 4 - 4
src/api/area.rs

@@ -20,7 +20,7 @@ pub async fn a_new(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<ANew>
 ) -> Json<IdBack> {
-    let uid=match check_token(&state, u.token).await {
+    let (uid,_) = match check_token(&state, u.token).await {
         Ok(id) => {id},
         Err(_) => {
             return Json(IdBack{
@@ -68,7 +68,7 @@ pub async fn a_edit(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<EditIdName>
 ) -> Json<JsonBack> {
-    let uid = match check_token(&state, u.token).await {
+    let (uid,_) = match check_token(&state, u.token).await {
         Ok(id) => id,
         Err(_) => {
                         return token_fail();
@@ -100,7 +100,7 @@ pub async fn a_remove(
     State(state): State<AppState>,
     Json(a): axum::extract::Json<EditIdName>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, a.token).await {
+    let (uid,_) = match check_token(&state, a.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
@@ -124,7 +124,7 @@ pub async fn a_del(
     State(state): State<AppState>,
     Json(a): axum::extract::Json<EditIdName>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, a.token).await {
+    let (uid,_) = match check_token(&state, a.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();

+ 35 - 5
src/api/auth.rs

@@ -102,7 +102,7 @@ pub async fn auth(
     log(Debuging, format!("in authing"));
     let token=super::token(32);
     let res = super::check_openid(&state, a.openid.clone()).await;
-    if let Ok(id) = res{
+    if let Ok((id,mqid)) = res{
         // 用户已经注册 函数末尾发生跳转到主页
         if let Err(e) = state.db_lite.execute("update user set token=? where id=?", (token.clone(), id)).await{
             println!("{e}");
@@ -116,18 +116,32 @@ pub async fn auth(
             //  if let Some(did)=state.db_lite.last_insert_rowid().await.ok(){  // 错误位置:update不会给last_insert_rowid
                 if let Err(e) = state.db_lite.execute("insert into map_user_device (uid, did)values(?,?)", (id,did)).await{
                     println!("{e}");
+                } else{
+                    if let Some(mq) = state.mq_detail{
+                        if let Err(e)=mq.db_mq_map.execute(
+                            "insert into pub (clientid,topic,deadline)values(?,?,unixepoch()+300),(?,?,unixepoch()+300)", 
+                            (mqid.clone(),format!("{}/cmd",sn),mqid.clone(),format!("/wf/Iot/device/{sn}"))).await{
+                            log(Warning, format!("on register with device: {e}"));
+                        };
+                        
+                        if let Err(e)=mq.db_mq_map.execute(
+                            "insert into sub (clientid,topic,deadline)values(?,?,unixepoch()+300)", 
+                            (mqid.clone(),format!("{}/state",sn),mqid.clone(),format!("/wf/Iot/client/{sn}"))).await{
+                            log(Warning, format!("on register with device: {e}"));
+                        };
+                    }
                 }
             };  
         }
     } else if a.scope !="snsapi_userinfo"{
-        // 
+        // 用户未注册,跳转获取用户详细信息
         log(Debug, "send redirect for get uinfo".to_string());
         return Redirect::to(
             format!("https://open.weixin.qq.com/connect/oauth2/authorize?appid=wx664469117500d259&redirect_uri=https%3A%2F%2Fwww.worldflying.cn%2Ftools%2Fwxredirect%2Fplc.html&response_type=code&scope=snsapi_userinfo&state={}#wechat_redirect",q.state).as_str()
             // format!("https://open.weixin.qq.com/connect/oauth2/authorize?appid=wx664469117500d259&redirect_uri=https%3A%2F%2Fwww.worldflying.cn%2Ftools%2Fwxredirect%2Fiot.html&response_type=code&scope=snsapi_userinfo&state={}#wechat_redirect",q.state).as_str()
         )
     } else {
-        //
+        // 带用户信息的重定向收取目标分支,保存用户数据并创建mqtt相关信息
         let b:WxUserInfo = if let Ok(b) = {if let Ok(b) = reqwest::get(format!(
             "https://api.weixin.qq.com/sns/userinfo?access_token={}&openid={}&lang=zh_CN",
             a.access_token,
@@ -137,7 +151,10 @@ pub async fn auth(
             return Redirect::to(format!("{}/null",crate::WEBP).as_str())}}{b} else {
             log(Info, "redirect 5".to_string());
                 return Redirect::to(format!("{}/null",crate::WEBP).as_str())};
-        if let Err(e) = state.db_lite.execute("insert into user (uname,nickname,passwd,headimgurl,openid,sex,token,mqid) values(?,?,?,?,?,?,?,?)", (b.nickname.clone(),b.nickname.clone(),"",b.headimgurl,b.openid,b.sex,token.clone(),format!("webu{}",super::token(5)))).await{
+        let mqid = format!("webu{}",super::token(5));
+        if let Err(e) = state.db_lite.execute(
+            "insert into user (uname,nickname,passwd,headimgurl,openid,sex,token,mqid) values(?,?,?,?,?,?,?,?)", 
+        (b.nickname.clone(),b.nickname.clone(),"",b.headimgurl,b.openid,b.sex,token.clone(),mqid.clone())).await{
             println!("{e}");
         } 
         let uid = match state.db_lite.last_insert_rowid().await{
@@ -170,7 +187,20 @@ pub async fn auth(
                 if let Err(e) = state.db_lite.execute("insert into map_user_device (uid, did)values(?,?)", (uid,did)).await{
                     println!("{e}");
                 }
-            };  
+            };
+            if let Some(mq) = state.mq_detail{
+                if let Err(e)=mq.db_mq_map.execute(
+                    "insert into pub (clientid,topic,deadline)values(?,?,unixepoch()+300),(?,?,unixepoch()+300)", 
+                    (mqid.clone(),format!("{}/cmd",sn),mqid.clone(),format!("/wf/Iot/device/{sn}"))).await{
+                    log(Warning, format!("on register with device: {e}"));
+                };
+                
+                if let Err(e)=mq.db_mq_map.execute(
+                    "insert into sub (clientid,topic,deadline)values(?,?,unixepoch()+300)", 
+                    (mqid.clone(),format!("{}/state",sn),mqid.clone(),format!("/wf/Iot/client/{sn}"))).await{
+                    log(Warning, format!("on register with device: {e}"));
+                };
+            }
         }
         log(Debug, "register success".to_string());
         // 注册成功 函数末尾发生跳转到主页

+ 146 - 21
src/api/device.rs

@@ -1,4 +1,4 @@
-use crate::{AppState};
+use crate::{AppState, log,LogLevel::*};
 use super::{JsonBack};
 use super::{token_fail,check_token, errcode0};
 use crate::datasource::{Datasource, SqliteParams};
@@ -53,7 +53,7 @@ pub async fn d_edit(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<Dedit>
 ) -> Json<JsonBack> {
-    let uid =match check_token(&state, u.token).await {
+    let (uid,_) = match check_token(&state, u.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
@@ -93,7 +93,7 @@ pub async fn d_cancel_share(
     State(state): State<AppState>,
     Json(d): axum::extract::Json<Dshare>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, d.token).await {
+    let (uid,_) = match check_token(&state, d.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
@@ -112,26 +112,56 @@ pub async fn d_cancel_share(
     errcode0()
 }
 
+// 设备转移::旧用户主动转移设备给新用户
 pub async fn d_remove(
     State(state): State<AppState>,
-    Json(u): axum::extract::Json<Dedit>
+    Json(d): axum::extract::Json<Dedit>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, u.token).await {
+    let (uid, mqid) = match check_token(&state, d.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
         }
     };
+    let sn = if let Ok(sn)= state.db_lite.query(
+        "select sn from device where id=? and belongto=?", [d.id,uid], |r|{r.get::<usize,String>(0)}).await{
+            sn
+    }else{
+        return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除设备失败: 设备不存在"))});
+    };
+    
+    let mqnew = if let Ok(sn)= state.db_lite.query(
+        "select mqid from user where id=?", [d.id], |r|{r.get::<usize,String>(0)}).await{
+            sn
+    }else{
+        return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除设备失败: 设备不存在"))});
+    };
     if let Err(e) = state.db_lite.execute(
         "update device set belongto=0 where id=? and belongto=?",
-        [u.id, uid]).await {
+        [d.id, uid]).await {
         return Json(JsonBack{errcode: 3000, errmsg: Some(format!("更改设备从属失败: {e}"))});
     }
     if let Err(e) = state.db_lite.execute(
         "delete from map_user_device where did=? and uid=?",
-        [u.id, uid]).await {
+        [d.id, uid]).await {
         return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除用户设备关系失败: {e}"))});
     }
+    if let Some(s)=state.mq_detail{
+        // 移除旧关系
+        if let Err(e)=s.db_mq_map.execute("delete from pub where clientid=? and topic in (?,?)", (mqid.clone(),format!("{sn}/cmd",),format!("/wf/Iot/device/{sn}",))).await{
+            log(Error, format!("删除设备失败: {e}"));
+        }
+        if let Err(e)=s.db_mq_map.execute("delete from sub where clientid=? and topic in (?,?)", (mqid,format!("{sn}/state",),format!("/wf/Iot/client/{sn}",))).await{
+            log(Error, format!("删除设备失败: {e}"));
+        } 
+        // 插入新关系
+        if let Err(e)=s.db_mq_map.execute("insert into pub (clientid,topic)values(?,?,9223372036854775807),(?,?,9223372036854775807)", (mqnew.clone(),format!("{sn}/cmd",),mqnew.clone(),format!("/wf/Iot/device/{sn}",))).await{
+            log(Error, format!("添加设备失败: {e}"));
+        }
+        if let Err(e)=s.db_mq_map.execute("insert into sub (clientid,topic)values(?,?,9223372036854775807),(?,?,9223372036854775807)", (mqnew.clone(),format!("{sn}/state",),mqnew,format!("/wf/Iot/client/{sn}",))).await{
+            log(Error, format!("添加设备失败: {e}"));
+        }
+    }
 
     errcode0()
 }
@@ -139,15 +169,21 @@ pub async fn d_del(
     State(state): State<AppState>,
     Json(d): axum::extract::Json<Dedit>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, d.token).await {
+    let (uid,mqid) = match check_token(&state, d.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
         }
     };
+    let sn = if let Ok(sn)= state.db_lite.query(
+        "select sn from device where id=? and belongto=?", [d.id,uid], |r|{r.get::<usize,String>(0)}).await{
+            sn
+    }else{
+        return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除设备失败: 设备不存在"))});
+    };
     if let Err(e) = state.db_lite.execute(
-        "update device set belongto=0 where belongto=? and id=?",
-        [uid,d.id]).await {
+        "update device set belongto=0 where id=?",
+        [d.id]).await {
         return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除设备失败: {e}"))});
     }
     if let Err(e) = state.db_lite.execute(
@@ -155,6 +191,14 @@ pub async fn d_del(
         [d.id,uid]).await {
         return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除设备失败: {e}"))});
     }
+    if let Some(s)=state.mq_detail{
+        if let Err(e)=s.db_mq_map.execute("delete from pub where clientid=?, topic in (?,?)", (mqid.clone(),format!("{sn}/cmd",),format!("/wf/Iot/device/{sn}",))).await{
+            log(Error, format!("删除设备失败: {e}"));
+        }
+        if let Err(e)=s.db_mq_map.execute("delete from sub where clientid=?, topic in (?,?)", (mqid,format!("{sn}/state",),format!("/wf/Iot/client/{sn}",))).await{
+            log(Error, format!("删除设备失败: {e}"));
+        }
+    }
     errcode0()
 }
 #[derive(Deserialize)]
@@ -166,7 +210,7 @@ pub async fn burn(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<TokenPath>
 ) -> Json<JsonBack> {
-    let uid=match check_token(&state, u.token).await {
+    let (uid,_) = match check_token(&state, u.token).await {
         Ok(id) => {id},
         Err(_) => {
             return token_fail();
@@ -198,6 +242,45 @@ pub struct Area{
     pub sup: u64,
     createby: bool,
 }
+#[derive(serde::Serialize)]
+pub struct DV{
+    pub clientid: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di0: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di1: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di2: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di3: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di4: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di5: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di6: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub di7: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do0: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do1: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do2: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do3: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do4: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do5: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do6: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub do7: Option<u8>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub beep: Option<u8>,
+}
+
 
 #[derive(serde::Serialize)]
 pub struct UTokenBack{
@@ -207,7 +290,9 @@ pub struct UTokenBack{
     #[serde(skip_serializing_if = "Option::is_none")]
     pub devices: Option<Vec<Devices>>,
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub area: Option<Vec<Area>>
+    pub area: Option<Vec<Area>>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub latest: Option<Vec<DV>>
 }
 
 // #[derive(serde::Deserialize)]
@@ -253,11 +338,13 @@ pub async fn d_all(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<super::Ident>
 ) -> Json<UTokenBack> {
-    let uid = if let Ok( id) = check_token(&state, u.token).await {
+    let uid = if let Ok( (id,_)) = check_token(&state, u.token).await {
         id
     } else {
-        return Json(UTokenBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),devices: None,area: None})
+        return Json(UTokenBack{errcode: 2000, errmsg: Some("鉴权失败: token无效".to_string()),devices: None,area: None,latest:None})
     };
+    let mut sqls = Vec::new();
+    let mut devices: Vec<String> = Vec::new();
     // 区域是否为自己创建的
     Json(UTokenBack{
         errcode: 0,
@@ -266,10 +353,14 @@ pub async fn d_all(
             match state.db_lite.query_rows(
                 "select d.id,d.name,d.sn,d.area,d.belongto,d.info from device d left join map_user_device m on d.id=m.did where d.isdelete=0 and m.uid=?", 
                 [uid], 
-                |r|{Ok(Devices{
+                |r|{
+                    let sn = r.get::<usize,String>(2)?;
+                    sqls.push("?");
+                    devices.push(sn.clone());
+                    Ok(Devices{
                 id: r.get::<usize,u64>(0)?,
                 name: r.get::<usize,String>(1)?,
-                sn: r.get::<usize,String>(2)?,
+                sn: sn,
                 area: r.get::<usize,u64>(3)?,
                 belongto: r.get::<usize,u64>(4)?==uid,
                 info:  r.get::<usize,String>(5)?
@@ -281,12 +372,13 @@ pub async fn d_all(
                         errcode: 3000,
                         errmsg: Some(format!("设备信息获取失败")),
                         devices: None,
-                        area: None
+                        area: None,
+                        latest: None
                     })
                 }
             }
-            ),
-            area: Some(
+        ),
+        area: Some(
             match state.db_lite.query_rows(
                 "select a.id,a.name,a.sup,a.createby from area a left join map_user_area m on a.id=m.aid where a.isdelete=0 and m.uid=?", 
                 [uid], 
@@ -303,11 +395,44 @@ pub async fn d_all(
                         errcode: 3000,
                         errmsg: Some(format!("设备信息获取失败")),
                         devices: None,
-                        area: None
+                        area: None,
+                        latest: None
                     })
                 }
             }
-            )
+        ),
+        latest: match state.mq_detail{
+            Some(s)=>{
+                s.db_mq_latest.query_rows(
+                    format!("select clientid, 
+                    DI0,DI1,DI2,DI3,DI4,DI5,DI6,DI7,
+                    DO0,DO1,DO2,DO3,DO4,DO5,DO6,DO7,BEEP 
+                    from dev where clientid in ({})", sqls.join(",")).as_str(), 
+                    rusqlite::params_from_iter(devices),|r|{
+                    Ok(DV{
+                        clientid: r.get(0)?,
+                        di0: r.get(1)?,
+                        di1: r.get(2)?,
+                        di2: r.get(3)?,
+                        di3: r.get(4)?,
+                        di4: r.get(5)?,
+                        di5: r.get(6)?,
+                        di6: r.get(7)?,
+                        di7: r.get(8)?,
+                        do0: r.get(9)?,
+                        do1: r.get(10)?,
+                        do2: r.get(11)?,
+                        do3: r.get(12)?,
+                        do4: r.get(13)?,
+                        do5: r.get(14)?,
+                        do6: r.get(15)?,
+                        do7: r.get(16)?,
+                        beep: r.get(17)?,
+                    })
+                }).await.ok()
+        },
+        None =>None
+        }
         }
     )
 }

+ 36 - 11
src/api/flow_task.rs

@@ -1,4 +1,4 @@
-use crate::{AppState,Debug, log};
+use crate::{AppState, log, LogLevel::*};
 use super::{JsonBack, errcode0, token};
 use super::{token_fail,check_token};
 use crate::datasource::Datasource;
@@ -26,7 +26,7 @@ pub async fn new_flow_task_share_device(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<TaskOfDeviceShareOrTransfer>
 ) -> Json<UrlBack> {
-    let uid=match check_token(&state, u.token).await {
+    let (uid,_) = match check_token(&state, u.token).await {
         Ok(id) => {id},
         Err(_) => {
             return Json(UrlBack{errcode: 3000, errmsg: Some(format!("鉴权失败: token失效")),url:None});
@@ -80,18 +80,18 @@ pub async fn checkout_flow_task_of_share_device(
     // Query(params): Query<QueryParams>,
     Json(u): Json<CheckoutShare>
 ) -> Json<JsonBack> {
-    let uid = match check_token(&state, u.token).await {
+    let (uid, mqid) = match check_token(&state, u.token).await {
         Ok(uid) => uid,
         Err(_) => {
             return token_fail();
         }
     };
-    let (the_id, typo,interval, createby) = match state.db_lite.query(
-        "select did,typo,strftime('%s','now')-strftime('%s',createtime),createby from flow_task_share where ticket=? and isdelete=0",
+    let (the_id,sn, typo,interval, createby) = match state.db_lite.query(
+        "select did,sn,typo,strftime('%s','now')-strftime('%s',createtime),createby from flow_task_share where ticket=? and isdelete=0",
         [
             // params.linkrand.clone()
             u.linkrand.clone()
-        ], |r|Ok((r.get::<usize, u64>(0)?,r.get::<usize, u8>(1)?,r.get::<usize, u64>(2)?,r.get::<usize, u64>(3)?))).await{
+        ], |r|Ok((r.get::<usize, u64>(0)?,r.get::<usize, String>(1)?,r.get::<usize, u8>(2)?,r.get::<usize, u64>(3)?,r.get::<usize, u64>(4)?))).await{
         Ok(a) => a,
         Err(_) => return Json(JsonBack{errcode: 3000, errmsg: Some(
             "未找到分享流程".to_string()
@@ -140,7 +140,17 @@ pub async fn checkout_flow_task_of_share_device(
     // 为1时分享设备,为2时转让设备,此处添加设备与新用户的关系
     else if typo == 1 || typo == 2 {
         match state.db_lite.execute("insert into map_user_device(uid,did)values(?,?)", [uid, the_id]).await{
-            Ok(_) => {},
+            Ok(_) => {
+                if let Some(ref s)=state.mq_detail{
+                    // 插入新关系
+                    if let Err(e)=s.db_mq_map.execute("insert into pub (clientid,topic)values(?,?,9223372036854775807),(?,?,9223372036854775807)", (mqid.clone(),format!("{sn}/cmd",),mqid.clone(),format!("/wf/Iot/device/{sn}",))).await{
+                        log(Error, format!("添加设备失败: {e}"));
+                    }
+                    if let Err(e)=s.db_mq_map.execute("insert into sub (clientid,topic)values(?,?,9223372036854775807),(?,?,9223372036854775807)", (mqid.clone(),format!("{sn}/state",),mqid,format!("/wf/Iot/client/{sn}",))).await{
+                        log(Error, format!("添加设备失败: {e}"));
+                    }
+                }
+            },
             Err(e) => if e==""{/*Unique异常跳过*/}else{ return Json(JsonBack{errcode: 3000, errmsg: Some(format!("添加设备分享关系失败: {e}"))});}
         };    
     } else {
@@ -149,6 +159,11 @@ pub async fn checkout_flow_task_of_share_device(
 
     // 转让设备需要将belongto更新
     if typo == 2{
+        let mqold = if let Ok(mqid) = state.db_lite.query("select u.mqid from device d left join user u on u.id=d.belongto where id=?", [the_id], |r|Ok(r.get::<usize, String>(0)?)).await{
+            mqid
+        }else{
+            String::new()
+        };
         match state.db_lite.execute("update device set belongto=? where id=?", [uid, the_id]).await{
             Ok(_) => {},
             Err(e) => return Json(JsonBack{errcode: 3000, errmsg: Some(format!("更新设备分享关系失败: {e}"))}),
@@ -157,11 +172,21 @@ pub async fn checkout_flow_task_of_share_device(
             Ok(_) => {},
             Err(e) => {println!("when delete transform flow got error: {e}")}
         }
+        if let Some(s)=state.mq_detail{
+            // 移除旧关系
+            if let Err(e)=s.db_mq_map.execute("delete from pub where clientid=? and topic in (?,?)", (mqold.clone(),format!("{sn}/cmd",),format!("/wf/Iot/device/{sn}",))).await{
+                log(Error, format!("删除设备失败: {e}"));
+            }
+            if let Err(e)=s.db_mq_map.execute("delete from sub where clientid=? and topic in (?,?)", (mqold,format!("{sn}/state",),format!("/wf/Iot/client/{sn}",))).await{
+                log(Error, format!("删除设备失败: {e}"));
+            } 
+        }
     } else {
 
-    match state.db_lite.execute("delete from flow_task_share where strftime('%s','now')-strftime('%s',createtime)>30*60", []).await{
-        Ok(_) => {},
-        Err(e) => {println!("when delete extra flow got error: {e}")}
-    }}
+        match state.db_lite.execute("delete from flow_task_share where strftime('%s','now')-strftime('%s',createtime)>30*60", []).await{
+            Ok(_) => {},
+            Err(e) => {println!("when delete extra flow got error: {e}")}
+        }
+    }
     errcode0()
 }

+ 6 - 6
src/api/mod.rs

@@ -56,18 +56,18 @@ pub async fn example(axum::extract::State(_): axum::extract::State<crate::AppSta
     })
 }
 
-pub async fn check_token(state: &crate::AppState, token: String) -> Result<u64,()>{
+pub async fn check_token(state: &crate::AppState, token: String) -> Result<(u64,String),()>{
     state.db_lite.query(
-        "select id from user where token=? and isdelete=0", 
+        "select id,mqid from user where token=? and isdelete=0", 
         [token],  // 这里不能写作 rusqlite::params![token] 
-        |r|{r.get::<usize,u64>(0)}).await.map_err(|e| println!("{e}"))
+        |r|{Ok((r.get::<usize,u64>(0)?,r.get::<usize,String>(1)?))}).await.map_err(|e| println!("{e}"))
 }
 
-pub async fn check_openid(state: &crate::AppState, openid: String) -> Result<u64,()>{
+pub async fn check_openid(state: &crate::AppState, openid: String) -> Result<(u64,String),()>{
     state.db_lite.query(
-        "select id from user where openid=? and isdelete=0", 
+        "select id,mqid from user where openid=? and isdelete=0", 
         [openid],  // 这里不能写作 rusqlite::params![token] 
-        |r|{r.get::<usize,u64>(0)}).await.map_err(|e| println!("{e}"))
+        |r|{Ok((r.get::<usize,u64>(0)?,r.get::<usize,String>(1)?))}).await.map_err(|e| println!("{e}"))
 }
 
 pub fn token(s: usize) -> String{

+ 2 - 2
src/api/user.rs

@@ -156,7 +156,7 @@ pub async fn u_edit(
         errmsg: Some(format!("不允许创建用户"))
     });
     }
-    let uid = match check_token(&state, u.token.unwrap()).await{
+    let (uid,_) = match check_token(&state, u.token.unwrap()).await{
         Ok(uid)=> uid,
         Err(()) => return token_fail()
     };
@@ -282,7 +282,7 @@ pub async fn u_info(
     State(state): State<AppState>,
     Json(u): axum::extract::Json<super::Ident>
 ) -> Json<UInfo>{
-    let uid = match check_token(&state, u.token).await{
+    let (uid,_) = match check_token(&state, u.token).await{
         Ok(uid) => uid,
         Err(_) => return Json(UInfo { errcode: 2000, errmsg: Some("鉴权失败: token失效".to_string()), nickname: None, headimgurl: None })
     };

+ 17 - 2
src/main.rs

@@ -39,10 +39,16 @@ pub fn log(level: LogLevel, msg: String) {
     }
     println!("[{:?}] {} {}", level, now(), msg);
 }
-
+#[derive(Clone)]
+struct MqDetail{
+    db_mq_user: sqlite::SqlitePool,
+    db_mq_latest: sqlite::SqlitePool,
+    db_mq_map: sqlite::SqlitePool,
+}
 #[derive(Clone)]
 struct AppState{
     db_lite: sqlite::SqlitePool,
+    mq_detail: Option<MqDetail>,
 }
 
 #[derive(Clone)]
@@ -110,7 +116,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
              }},
     };
 
-    let appstat = AppState{db_lite: sqlite::init_sqlite_pool(&conf.db, 10).await?};
+    let appstat = AppState{
+        db_lite: sqlite::init_sqlite_pool(&conf.db, 10).await?,
+        mq_detail: if   
+            let (Ok(a),Ok(b),Ok(c)) = 
+            (sqlite::init_sqlite_pool("/var/lib/mosquitto/db/user.db", 10).await,
+             sqlite::init_sqlite_pool("/var/lib/mosquitto/db/acl.db", 10).await,
+             sqlite::init_sqlite_pool("/var/lib/mosquitto/db/dev.db", 10).await){
+                Some(MqDetail { db_mq_user: a, db_mq_latest: b, db_mq_map: c })
+             } else{None}
+    };
 
     let monitor = Counter::new();