Implement cleanup DB operation
This commit is contained in:
parent
8728b95e5f
commit
d7e68e827e
|
@ -7,8 +7,11 @@ use tokio::sync::{
|
|||
|
||||
#[derive(Debug)]
|
||||
enum Task {
|
||||
GetFile(oneshot::Sender<Option<String>>, String),
|
||||
AddFile(oneshot::Sender<bool>, String, String),
|
||||
// syntax: TaskName(oneshot::Sender<ReturnType>, /* param_name */ ParamType),
|
||||
|
||||
GetFile(oneshot::Sender<Option<String>>, /* path */ String),
|
||||
AddFile(oneshot::Sender<bool>, /* path */ String, /* url */ String, /* mid */ u64),
|
||||
Cleanup(oneshot::Sender<Option<Vec<u64>>>, /* older_than */ u64),
|
||||
}
|
||||
|
||||
pub struct DbExecutor {
|
||||
|
@ -21,8 +24,8 @@ impl DbExecutor {
|
|||
let (tx, rx) = unbounded_channel();
|
||||
let db = rusqlite::Connection::open(dbpath)?;
|
||||
db.execute(
|
||||
"create table if not exists files(\
|
||||
id integer primary key, path text not null, url text not null, \
|
||||
"create table if not exists files(id integer primary key,\
|
||||
path text not null, url text not null, mid integer not null, \
|
||||
timestamp integer default (strftime('%s','now')))",
|
||||
[],
|
||||
)?;
|
||||
|
@ -33,12 +36,12 @@ impl DbExecutor {
|
|||
pub fn run(&mut self) {
|
||||
while let Some(task) = self.rx.blocking_recv() {
|
||||
match task {
|
||||
Task::GetFile(tx, p) => {
|
||||
Task::GetFile(tx, path) => {
|
||||
let paste = self
|
||||
.db
|
||||
.query_row(
|
||||
"select url from files where path=? limit 1",
|
||||
params![p],
|
||||
params![path],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.optional()
|
||||
|
@ -46,18 +49,39 @@ impl DbExecutor {
|
|||
error!("A database error has occurred: {}", v);
|
||||
None
|
||||
});
|
||||
tx.send(paste).unwrap();
|
||||
let _ = tx.send(paste);
|
||||
}
|
||||
Task::AddFile(tx, p, u) => {
|
||||
if let Err(e) = self
|
||||
Task::AddFile(tx, path, url, mid) => {
|
||||
let _ = if let Err(e) = self
|
||||
.db
|
||||
.execute("insert into files(path,url) values(?,?)", params![p, u])
|
||||
.execute("insert into files(path,url,mid) values(?,?,?)", params![path, url, mid])
|
||||
{
|
||||
error!("A database error has occurred: {}", e);
|
||||
tx.send(false).unwrap();
|
||||
tx.send(false)
|
||||
} else {
|
||||
tx.send(true).unwrap();
|
||||
}
|
||||
tx.send(true)
|
||||
};
|
||||
}
|
||||
Task::Cleanup(tx, older_than) => {
|
||||
let _ = tx.send(match self.db
|
||||
.prepare("delete from files where timestamp < strftime('%s','now')-? returning mid")
|
||||
.and_then(|mut v| v.query(params![older_than])
|
||||
.and_then(|mut v| {
|
||||
let mut mids: Vec<u64> = Vec::new();
|
||||
while let Some(row) = v.next()? {
|
||||
mids.push(row.get(0)?);
|
||||
}
|
||||
Ok(mids)
|
||||
}))
|
||||
{
|
||||
Ok(o) => {
|
||||
Some(o)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("A database error has occurred: {}", e);
|
||||
None
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -76,15 +100,18 @@ impl Clone for ExecutorConnection {
|
|||
}
|
||||
}
|
||||
|
||||
impl ExecutorConnection {
|
||||
pub async fn add_file(&self, path: String, url: String) -> bool {
|
||||
let (otx, orx) = oneshot::channel();
|
||||
self.tx.send(Task::AddFile(otx, path, url)).unwrap();
|
||||
orx.await.unwrap()
|
||||
}
|
||||
pub async fn get_file(&self, path: String) -> Option<String> {
|
||||
let (otx, orx) = oneshot::channel();
|
||||
self.tx.send(Task::GetFile(otx, path)).unwrap();
|
||||
orx.await.unwrap()
|
||||
macro_rules! executor_wrapper {
|
||||
($name:ident, $task:expr, $ret:ty, $($arg:ident: $ty:ty),*) => {
|
||||
pub async fn $name(&self, $($arg: $ty),*) -> $ret {
|
||||
let (otx, orx) = oneshot::channel();
|
||||
self.tx.send($task(otx, $($arg),*)).unwrap();
|
||||
orx.await.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutorConnection {
|
||||
executor_wrapper!(add_file, Task::AddFile, bool, path: String, url: String, mid: u64);
|
||||
executor_wrapper!(cleanup, Task::Cleanup, Option<Vec<u64>>, older_than: u64);
|
||||
executor_wrapper!(get_file, Task::GetFile, Option<String>, path: String);
|
||||
}
|
||||
|
|
Reference in a new issue