This repository has been archived on 2022-03-12. You can view files and clone it, but cannot push or open issues or pull requests.
xuproxy/src/database.rs

118 lines
4 KiB
Rust

use log::{error, info};
use rusqlite::{params, OptionalExtension};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
#[derive(Debug)]
enum Task {
// syntax: TaskName(oneshot::Sender<ReturnType>, /* param_name */ ParamType),
GetFile(oneshot::Sender<Option<String>>, /* path */ String),
AddFile(oneshot::Sender<bool>, /* path */ String, /* url */ String, /* mid */ u64),
Cleanup(oneshot::Sender<Option<Vec<u64>>>, /* older_than */ u64),
}
pub struct DbExecutor {
rx: UnboundedReceiver<Task>,
db: rusqlite::Connection,
}
impl DbExecutor {
pub fn create(dbpath: &str) -> rusqlite::Result<(Self, ExecutorConnection)> {
let (tx, rx) = unbounded_channel();
let db = rusqlite::Connection::open(dbpath)?;
db.execute(
"create table if not exists files(id integer primary key,\
path text not null, url text not null, mid integer not null, \
timestamp integer default (strftime('%s','now')))",
[],
)?;
info!("Database executor for '{}' created!", dbpath);
Ok((Self { rx, db }, ExecutorConnection { tx }))
}
pub fn run(&mut self) {
while let Some(task) = self.rx.blocking_recv() {
match task {
Task::GetFile(tx, path) => {
let paste = self
.db
.query_row(
"select url from files where path=? limit 1",
params![path],
|r| r.get(0),
)
.optional()
.unwrap_or_else(|v| {
error!("A database error has occurred: {}", v);
None
});
let _ = tx.send(paste);
}
Task::AddFile(tx, path, url, mid) => {
let _ = if let Err(e) = self
.db
.execute("insert into files(path,url,mid) values(?,?,?)", params![path, url, mid])
{
error!("A database error has occurred: {}", e);
tx.send(false)
} else {
tx.send(true)
};
}
Task::Cleanup(tx, older_than) => {
let _ = tx.send(match self.db
.prepare("delete from files where timestamp < strftime('%s','now')-? returning mid")
.and_then(|mut v| v.query(params![older_than])
.and_then(|mut v| {
let mut mids: Vec<u64> = Vec::new();
while let Some(row) = v.next()? {
mids.push(row.get(0)?);
}
Ok(mids)
}))
{
Ok(o) => {
Some(o)
}
Err(e) => {
error!("A database error has occurred: {}", e);
None
}
});
}
}
}
}
}
pub struct ExecutorConnection {
tx: UnboundedSender<Task>,
}
impl Clone for ExecutorConnection {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
macro_rules! executor_wrapper {
($name:ident, $task:expr, $ret:ty, $($arg:ident: $ty:ty),*) => {
pub async fn $name(&self, $($arg: $ty),*) -> $ret {
let (otx, orx) = oneshot::channel();
self.tx.send($task(otx, $($arg),*)).unwrap();
orx.await.unwrap()
}
}
}
impl ExecutorConnection {
executor_wrapper!(add_file, Task::AddFile, bool, path: String, url: String, mid: u64);
executor_wrapper!(cleanup, Task::Cleanup, Option<Vec<u64>>, older_than: u64);
executor_wrapper!(get_file, Task::GetFile, Option<String>, path: String);
}