| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- use bb8::Pool;
- /// SQLite连接池类型
- pub type SqlitePool = bb8::Pool<SqliteConnectionManager>;
- /// SQLite连接管理器
- #[derive(Clone)]
- pub struct SqliteConnectionManager {
- url: String,
- }
- impl SqliteConnectionManager {
- /// 创建新的SQLite连接管理器
- pub fn new(url: String) -> Self {
- Self { url }
- }
- }
- impl bb8::ManageConnection for SqliteConnectionManager {
- type Connection = rusqlite::Connection;
- type Error = rusqlite::Error;
- fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
- let url = self.url.clone();
- async move {
- let conn = tokio::task::spawn_blocking(move || rusqlite::Connection::open(&url))
- .await
- .unwrap();
- conn
- }
- }
- fn is_valid(&self, _conn: &mut Self::Connection) -> impl Future<Output = Result<(), Self::Error>> + Send {
- // 对于SQLite,我们暂时跳过验证
- async move {
- Ok(())
- }
- }
- fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
- // 暂时假设连接未损坏
- false
- }
- }
- /// 初始化SQLite连接池
- pub async fn init_sqlite_pool(url: &str, max_size: u32) -> Result<SqlitePool, Box<dyn std::error::Error>> {
- let manager = SqliteConnectionManager::new(url.to_string());
- let pool = Pool::builder()
- .max_size(max_size)
- .build(manager)
- .await?;
- Ok(pool)
- }
- impl crate::datasource::Datasource for SqlitePool{
- async fn query<P, T, F>(&self, sql: &str, params: P,f: F) -> Result<T, String>
- where
- P: rusqlite::Params,
- F: FnOnce(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
- {
- match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await {
- Ok(Ok(conn)) => conn,
- Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())),
- Err(_) => return Err("Timeout".to_string())
- }.query_row(sql, params, f).map_err(|e| if e == rusqlite::Error::QueryReturnedNoRows { String::new() } else{ e.to_string()})
- }
-
- async fn query_rows<P, T, F>(&self, sql: &str, params: P, mut f: F) -> Result<Vec<T>, String>
- where
- P: rusqlite::Params,
- F: FnMut(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
- {
- match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await {
- Ok(Ok(conn)) => conn,
- Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())),
- Err(_) => return Err("Timeout".to_string())
- }.prepare(sql)
- .map_err(|e| e.to_string())
- .and_then(|mut stmt| {
- let mut results = Vec::new();
- let mut rows = stmt.query(params).map_err(|e| e.to_string())?;
-
- while let Some(row) = rows.next().map_err(|e| e.to_string())? {
- match f(row) {
- Ok(item) => results.push(item),
- Err(e) => return Err(e.to_string()),
- }
- }
-
- Ok(results)
- })
- }
-
- async fn execute<P>(&self, sql: &str, params:P) -> Result<usize, String>
- where
- P: rusqlite::Params {
- match tokio::time::timeout(std::time::Duration::from_secs(5), self.get()).await {
- Ok(Ok(conn)) => conn,
- Ok(Err(e)) => return Err(format!("connection err: {}",e.to_string())),
- Err(_) => return Err("Timeout".to_string())
- }.execute(sql, params).map_err(|e| if e.sqlite_error_code()==Some(rusqlite::ErrorCode::ConstraintViolation) { println!("{e}");String::new() } else{ e.to_string()})
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use crate::datasource::Datasource;
- #[tokio::test]
- async fn test_sqlite_pool() {
- let pool = init_sqlite_pool("./db.sqlite", 10).await.unwrap();
- match pool.execute("insert into flow_task_share(did,typo,ticket)values(?,?,?)", (1,1,1)).await{
- Ok(n) => println!("inserted {} rows", n),
- Err(e) => println!("Err {}", e),
- };
- }
- }
|