use bb8::Pool; /// SQLite连接池类型 pub type SqlitePool = bb8::Pool; /// SQLite连接管理器 #[derive(Clone)] pub struct SqliteConnectionManager { url: String, } impl SqliteConnectionManager { /// 创建新的SQLite连接管理器 pub fn new(url: String) -> Self { Self { url } } } impl bb8::ManageConnection for SqliteConnectionManager { type Connection = rusqlite::Connection; type Error = rusqlite::Error; fn connect(&self) -> impl Future> + Send { let url = self.url.clone(); async move { let conn = tokio::task::spawn_blocking(move || rusqlite::Connection::open(&url)) .await .unwrap(); conn } } fn is_valid(&self, _conn: &mut Self::Connection) -> impl Future> + Send { // 对于SQLite,我们暂时跳过验证 async move { Ok(()) } } fn has_broken(&self, _conn: &mut Self::Connection) -> bool { // 暂时假设连接未损坏 false } } /// 初始化SQLite连接池 pub async fn init_sqlite_pool(url: &str, max_size: u32) -> Result> { let manager = SqliteConnectionManager::new(url.to_string()); let pool = Pool::builder() .max_size(max_size) .build(manager) .await?; Ok(pool) } impl crate::datasource::Datasource for SqlitePool{ async fn query(&self, sql: &str, params: P,f: F) -> Result where P: rusqlite::Params, F: FnOnce(&rusqlite::Row<'_>) -> rusqlite::Result, { match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await { Ok(Ok(conn)) => conn, Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())), Err(_) => return Err("Timeout".to_string()) }.query_row(sql, params, f).map_err(|e| if e == rusqlite::Error::QueryReturnedNoRows { String::new() } else{ e.to_string()}) } async fn query_rows(&self, sql: &str, params: P, mut f: F) -> Result, String> where P: rusqlite::Params, F: FnMut(&rusqlite::Row<'_>) -> rusqlite::Result, { match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await { Ok(Ok(conn)) => conn, Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())), Err(_) => return Err("Timeout".to_string()) }.prepare(sql) .map_err(|e| e.to_string()) .and_then(|mut stmt| { let mut results = Vec::new(); let mut rows = stmt.query(params).map_err(|e| e.to_string())?; while let Some(row) = rows.next().map_err(|e| e.to_string())? { match f(row) { Ok(item) => results.push(item), Err(e) => return Err(e.to_string()), } } Ok(results) }) } async fn execute

(&self, sql: &str, params:P) -> Result where P: rusqlite::Params { match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await { Ok(Ok(conn)) => conn, Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())), Err(_) => return Err("Timeout".to_string()) }.execute(sql, params).map_err(|e| if e.sqlite_error_code()==Some(rusqlite::ErrorCode::ConstraintViolation) { println!("{e}");String::new() } else{ e.to_string()}) } } #[cfg(test)] mod tests { use super::*; use crate::datasource::Datasource; #[tokio::test] async fn test_sqlite_pool() { let pool = init_sqlite_pool("./db.sqlite", 10).await.unwrap(); match pool.execute("insert into flow_task_share(did,typo,ticket)values(?,?,?)", (1,1,1)).await{ Ok(n) => println!("inserted {} rows", n), Err(e) => println!("Err {}", e), }; } }