use log::{error, info}; use rusqlite::{params, OptionalExtension}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; #[derive(Debug)] enum Task { // syntax: TaskName(oneshot::Sender, /* param_name */ ParamType), GetFile(oneshot::Sender>, /* path */ String), AddFile(oneshot::Sender, /* path */ String, /* url */ String, /* mid */ u64), Cleanup(oneshot::Sender>>, /* older_than */ u64), } pub struct DbExecutor { rx: UnboundedReceiver, db: rusqlite::Connection, } impl DbExecutor { pub fn create(dbpath: &str) -> rusqlite::Result<(Self, ExecutorConnection)> { let (tx, rx) = unbounded_channel(); let db = rusqlite::Connection::open(dbpath)?; db.execute( "create table if not exists files(id integer primary key,\ path text not null, url text not null, mid integer not null, \ timestamp integer default (strftime('%s','now')))", [], )?; info!("Database executor for '{}' created!", dbpath); Ok((Self { rx, db }, ExecutorConnection { tx })) } pub fn run(&mut self) { while let Some(task) = self.rx.blocking_recv() { match task { Task::GetFile(tx, path) => { let paste = self .db .query_row( "select url from files where path=? limit 1", params![path], |r| r.get(0), ) .optional() .unwrap_or_else(|v| { error!("A database error has occurred: {}", v); None }); let _ = tx.send(paste); } Task::AddFile(tx, path, url, mid) => { let _ = if let Err(e) = self .db .execute("insert into files(path,url,mid) values(?,?,?)", params![path, url, mid]) { error!("A database error has occurred: {}", e); tx.send(false) } else { tx.send(true) }; } Task::Cleanup(tx, older_than) => { let _ = tx.send(match self.db .prepare("delete from files where timestamp < strftime('%s','now')-? returning mid") .and_then(|mut v| v.query(params![older_than]) .and_then(|mut v| { let mut mids: Vec = Vec::new(); while let Some(row) = v.next()? { mids.push(row.get(0)?); } Ok(mids) })) { Ok(o) => { Some(o) } Err(e) => { error!("A database error has occurred: {}", e); None } }); } } } } } pub struct ExecutorConnection { tx: UnboundedSender, } impl Clone for ExecutorConnection { fn clone(&self) -> Self { Self { tx: self.tx.clone(), } } } macro_rules! executor_wrapper { ($name:ident, $task:expr, $ret:ty, $($arg:ident: $ty:ty),*) => { pub async fn $name(&self, $($arg: $ty),*) -> $ret { let (otx, orx) = oneshot::channel(); self.tx.send($task(otx, $($arg),*)).unwrap(); orx.await.unwrap() } } } impl ExecutorConnection { executor_wrapper!(add_file, Task::AddFile, bool, path: String, url: String, mid: u64); executor_wrapper!(cleanup, Task::Cleanup, Option>, older_than: u64); executor_wrapper!(get_file, Task::GetFile, Option, path: String); }