flow_task.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. use crate::{AppState, log, LogLevel::*};
  2. use super::{JsonBack, errcode0, token};
  3. use super::{token_fail,check_token};
  4. use crate::datasource::Datasource;
  5. use serde::{Serialize, Deserialize};
  6. use axum::{Json,extract::{State}, http::HeaderMap};
  7. #[derive(Deserialize)]
  8. pub struct TaskOfDeviceShareOrTransfer{
  9. token: String,
  10. id: u64,
  11. r#type: u8,
  12. }
  13. #[derive(Serialize)]
  14. pub struct UrlBack{
  15. errcode: u16,
  16. errmsg: Option<String>,
  17. url: Option<String>,
  18. }
  19. #[allow(unused_variables)]
  20. pub async fn new_flow_task_share_device(
  21. headers: HeaderMap,
  22. State(state): State<AppState>,
  23. Json(u): axum::extract::Json<TaskOfDeviceShareOrTransfer>
  24. ) -> Json<UrlBack> {
  25. let (uid,_) = match check_token(&state, u.token).await {
  26. Ok(id) => {id},
  27. Err(_) => {
  28. return Json(UrlBack{errcode: 3000, errmsg: Some(format!("鉴权失败: token失效")),url:None});
  29. }
  30. };
  31. match u.r#type{
  32. 0|1|2 => {},
  33. _ => {
  34. return Json(UrlBack{errcode: 3000, errmsg: Some(format!("参数错误: type应为0、1、2")),url:None});
  35. }
  36. }
  37. let mut tryno=0;
  38. let linkrand = token(32);
  39. while tryno<5 && match state.db_lite.execute(
  40. "insert into flow_task_share (did, typo, ticket,createby)values (?,?,?,?)",
  41. (u.id,u.r#type,linkrand.clone(),uid)).await{
  42. Ok(_) => false,
  43. Err(e) =>
  44. if e==""{true} // execute时如果遇到Unique异常时会将异常处理为空字符串
  45. else{return Json(UrlBack{errcode: 3000, errmsg: Some(format!("分享流程创建失败: {e}")),url:None})},
  46. }
  47. {
  48. tryno+=1;
  49. };
  50. if tryno==5{
  51. return Json(UrlBack{errcode: 3000, errmsg: Some(format!("分享流程创建失败: 反复重试失败")),url: None});
  52. }
  53. // 获取主机名
  54. // let host = format!("https://{}",headers.get("host")
  55. // .and_then(|hv| hv.to_str().ok())
  56. // .unwrap_or("localhost:3000"));
  57. let host = crate::WEBP;
  58. Json(UrlBack{errcode: 0, errmsg: None,url:Some(format!("{host}?linkrand={linkrand}"))})
  59. }
  60. // #[derive(Deserialize)]
  61. // pub struct QueryParams {
  62. // linkrand: String,
  63. // }
  64. #[derive(Deserialize)]
  65. pub struct CheckoutShare {
  66. token: String,
  67. linkrand: String,
  68. }
  69. pub async fn checkout_flow_task_of_share_device(
  70. State(state): State<AppState>,
  71. // Query(params): Query<QueryParams>,
  72. Json(u): Json<CheckoutShare>
  73. ) -> Json<JsonBack> {
  74. let (uid, mqid) = match check_token(&state, u.token).await {
  75. Ok(uid) => uid,
  76. Err(_) => {
  77. return token_fail();
  78. }
  79. };
  80. let (the_id,sn, typo,interval, createby) = match state.db_lite.query(
  81. "select f.did,d.sn,f.typo,strftime('%s','now')-strftime('%s',createtime),f.createby from flow_task_share f left join device d on f.did=d.id where f.ticket=? and f.isdelete=0",
  82. [
  83. // params.linkrand.clone()
  84. u.linkrand.clone()
  85. ], |r|Ok((r.get::<usize, u64>(0)?,r.get::<usize, String>(1)?,r.get::<usize, u8>(2)?,r.get::<usize, u64>(3)?,r.get::<usize, u64>(4)?))).await{
  86. Ok(a) => a,
  87. Err(_) => return Json(JsonBack{errcode: 3000, errmsg: Some(
  88. "未找到分享流程".to_string()
  89. // format!("未找到分享流程 {} {e}",params.ticket.clone())
  90. )}),
  91. };
  92. if interval > 30*60 /* interval为当前时间减去createtime的秒数 */ {
  93. match state.db_lite.execute(
  94. "delete from flow_task_share where ticket=?",
  95. // "update flow_task_share set isdelete=1 where ticket=?",
  96. [u.linkrand.clone()]).await{
  97. Ok(_) => {},
  98. Err(e) => return Json(JsonBack{errcode: 3000, errmsg: Some(format!("删除分享流程失败: {e}"))}),
  99. };
  100. return Json(JsonBack{errcode: 3000, errmsg: Some("分享流程已过期".to_string())});
  101. }
  102. // 为2时转让设备,需要移除原用户与设备的关系
  103. if typo == 2{
  104. if let Err(e) = state.db_lite.execute("delete from map_user_device where uid=? and did=?", (createby, the_id)).await{
  105. return Json(JsonBack{errcode: 3000, errmsg: Some(format!("转让时删除设备关系失败: {e}"))});
  106. }
  107. }
  108. // 为0时分享区域
  109. if typo == 0{
  110. match state.db_lite.execute("insert into map_user_area(uid,aid)values(?,?)", [uid, the_id]).await{
  111. Ok(i) => {
  112. log(Debug, format!("影响一级区域数量{i}个"));
  113. },
  114. Err(e) => if e==""{/*Unique异常跳过 */} else {return Json(JsonBack{errcode: 3000, errmsg: Some(format!("添加区域分享关系失败: {e}"))});},
  115. };
  116. match state.db_lite.execute("insert into map_user_area(uid,aid) select ?,id from area where sup=? and id not in (select aid from map_user_area where uid=?)", [uid, the_id,uid]).await{
  117. Ok(i) => {
  118. log(Debug, format!("影响二级区域数量{i}个"));
  119. },
  120. Err(e) => if e==""{/*Unique异常跳过 */} else {return Json(JsonBack{errcode: 3000, errmsg: Some(format!("添加区域下级分享关系失败: {e}"))})},
  121. };
  122. match state.db_lite.execute("insert into map_user_device(uid,did) select ?,device.id from device where area in (select id from area where area.sup=?) and device.id not in (select did from map_user_device where uid=?)", [uid, the_id,uid]).await{
  123. Ok(i) => {
  124. log(Debug, format!("影响设备数量{i}个"));
  125. },
  126. Err(e) =>if e==""{/*Unique异常跳过 */} else { return Json(JsonBack{errcode: 3000, errmsg: Some(format!("添加区域下全部设备关系失败: {e}"))})},
  127. };
  128. }
  129. // 为1时分享设备,为2时转让设备,此处添加设备与新用户的关系
  130. else if typo == 1 || typo == 2 {
  131. match state.db_lite.execute("insert into map_user_device(uid,did)values(?,?)", [uid, the_id]).await{
  132. Ok(_) => {/*正常插入,可以跳过*/},
  133. Err(e) => if e==""{/*Unique异常跳过*/}else{ return Json(JsonBack{errcode: 3000, errmsg: Some(format!("添加设备分享关系失败: {e}"))});}
  134. };
  135. match state.db_lite.execute("delete from flow_task_share where ticket=?", [u.linkrand.clone()]).await{
  136. Ok(_) => {},
  137. Err(e) => {println!("when delete transform flow got error: {e}")}
  138. }
  139. // 添加监听
  140. if let Some(ref s)=state.mq_detail{
  141. // 插入新关系
  142. if let Err(e)=s.db_mq_map.execute("insert into pub (clientid,topic)values(?,?),(?,?)", (mqid.clone(),format!("{sn}/cmd",),mqid.clone(),format!("/wf/Iot/device/{sn}",))).await{
  143. log(Error, format!("添加设备失败: {e}"));
  144. }
  145. if let Err(e)=s.db_mq_map.execute("insert into sub (clientid,topic)values(?,?),(?,?)", (mqid.clone(),format!("{sn}/state",),mqid,format!("/wf/Iot/client/{sn}",))).await{
  146. log(Error, format!("添加设备失败: {e}"));
  147. }
  148. }
  149. } else {
  150. return Json(JsonBack{errcode: 3000, errmsg: Some("分享流程类型错误".to_string())});
  151. }
  152. // 转让设备需要将belongto更新
  153. if typo == 2{
  154. let (mqold, effected) = if let Ok(ans) = state.db_lite.query(
  155. "select d.mqid, count(*) from (select u.mqid mqid, d.id id from device d left join user u on u.id=d.belongto where d.id=?)d left join map_user_device m on d.id=m.did",
  156. [the_id], |r|Ok((r.get::<usize, String>(0)?,r.get::<usize, u64>(1)?,))).await{
  157. ans
  158. }else{
  159. (String::new(),0)
  160. };
  161. if effected!=1{
  162. return Json(JsonBack { errcode: 3000, errmsg: Some(format!("该设备被分享状态占用,目前有{effected}条分享")) });
  163. }
  164. match state.db_lite.execute("update device set belongto=? where id=?", [uid, the_id]).await{
  165. Ok(_) => {},
  166. Err(e) => return Json(JsonBack{errcode: 3000, errmsg: Some(format!("更新设备分享关系失败: {e}"))}),
  167. }
  168. if let Some(s)=state.mq_detail{
  169. // 移除旧关系
  170. if let Err(e)=s.db_mq_map.execute("delete from pub where clientid=? and topic in (?,?)", (mqold.clone(),format!("{sn}/cmd",),format!("/wf/Iot/device/{sn}",))).await{
  171. log(Error, format!("删除设备失败: {e}"));
  172. }
  173. if let Err(e)=s.db_mq_map.execute("delete from sub where clientid=? and topic in (?,?)", (mqold,format!("{sn}/state",),format!("/wf/Iot/client/{sn}",))).await{
  174. log(Error, format!("删除设备失败: {e}"));
  175. }
  176. }
  177. } else {
  178. match state.db_lite.execute("delete from flow_task_share where strftime('%s','now')-strftime('%s',createtime)>30*60", []).await{
  179. Ok(_) => {},
  180. Err(e) => {println!("when delete extra flow got error: {e}")}
  181. }
  182. }
  183. errcode0()
  184. }