diff --git a/src/main.rs b/src/main.rs index 6e2a4d9..18627b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,14 @@ use std::{env, fs, thread}; use crate::database::{DbExecutor, ExecutorConnection}; use log::{debug, error, info, warn}; use serde::Deserialize; -use tokio::sync::oneshot; +use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; use warp::http::{Response, StatusCode}; use warp::hyper::body::Bytes; use warp::path::FullPath; use warp::{any, body, header, path, query, Filter, Reply}; +use tokio::time::sleep; +use tokio::time::Duration; mod database; mod discord; @@ -41,6 +44,7 @@ struct Config { webhook: String, secret: String, dbpath: String, + cleanup: Option } #[derive(Debug, Deserialize)] @@ -56,7 +60,7 @@ async fn main() -> anyhow::Result<()> { let config_path = config_var.as_deref().unwrap_or("xuproxy.toml"); info!("Loading config '{}'", config_path); let config_str = fs::read_to_string(config_path)?; - let config: Config = toml::from_str(&config_str)?; + let config: Arc = Arc::new(toml::from_str(&config_str)?); info!("Initializing database..."); let (mut db_exec, db_conn) = DbExecutor::create(&config.dbpath)?; let executor_thread = thread::spawn(move || { @@ -64,14 +68,16 @@ async fn main() -> anyhow::Result<()> { log::info!("Database executor shutting down"); }); - let (ctx, crx) = oneshot::channel(); - let server_task = tokio::spawn(run_server(config, db_conn, crx)); + let (ctx, _) = broadcast::channel(1); + let server_task = tokio::spawn(run_server(config.clone(), db_conn.clone(), ctx.subscribe())); + let cleanup_task = tokio::spawn(cleanup_task(db_conn, config, ctx.subscribe())); terminate_signal().await; info!("Shutdown signal received, powering down"); let _ = ctx.send(()); - server_task - .await + server_task.await .unwrap_or_else(|e| error!("Couldn't await the server task: {}", e)); + cleanup_task.await + .unwrap_or_else(|e| error!("Couldn't await the cleanup task: {}", e)); executor_thread.join().unwrap(); Ok(()) } @@ -109,13 +115,14 @@ async fn handle_put( debug!("Token '{}' doesn't match HMAC secret", query.v); return StatusCode::FORBIDDEN; } + match discord::upload_webhook(&config.webhook, body, filename_str).await { Err(e) => { warn!("Could not upload file to Discord: {}", e); StatusCode::FORBIDDEN } - Ok(url) => { - if db.add_file(filename_str.to_string(), url).await { + Ok(o) => { + if db.add_file(filename_str.to_string(), o.0, o.1).await { StatusCode::CREATED } else { StatusCode::FORBIDDEN @@ -124,6 +131,27 @@ async fn handle_put( } } +async fn cleanup_task(db: ExecutorConnection, conf: Arc, mut cancel: Receiver<()>) { + let older_than = conf.cleanup.unwrap()*3600; + loop { + debug!("Starting daily cleanup..."); + if let Some(mids) = db.cleanup(older_than).await { + let midslen = mids.len(); + for mid in mids { + if let Err(e) = discord::delete(&conf.webhook, mid).await { + warn!("Couldn't delete message {}: {}", mid, e); + } + } + info!("Daily cleanup complete! Removed {} entries", midslen); + } + tokio::select! { + _ = sleep(Duration::from_secs(86400)) => {}, + _ = cancel.recv() => break + } + } + info!("Daily cleanup task has been shut down"); +} + async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: bool) -> impl Reply { let filename_str = &filename.as_str()[1..]; debug!("Received GET request, name({})", filename_str); @@ -139,9 +167,10 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo return Response::builder() .header("Content-Length", o.0) .header("Content-Type", o.1) - .body("").into_response() + .body("") + .into_response() } - Err(e) => e + Err(e) => e, } } else { match discord::get(&url).await { @@ -153,7 +182,7 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo .unwrap() .into_response(); } - Err(e) => e + Err(e) => e, } }; warn!("Could not retrieve '{}' from Discord: {}", url, err); @@ -163,9 +192,7 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo } } -async fn run_server(conf: Config, db: ExecutorConnection, cancel: oneshot::Receiver<()>) { - let conf = Arc::new(conf); - +async fn run_server(conf: Arc, db: ExecutorConnection, mut cancel: Receiver<()>) { let put_route = warp::put() .and(path::full()) .and(header::("content-length")) @@ -206,18 +233,14 @@ async fn run_server(conf: Config, db: ExecutorConnection, cancel: oneshot::Recei .tls() .cert_path(&tls.cert) .key_path(&tls.key) - .bind_with_graceful_shutdown(conf.address, async { - let _ = cancel.await; - }) - .1 - .await; + .bind_with_graceful_shutdown(conf.address, async move { + let _ = cancel.recv().await; + }).1.await; } else { warp::serve(routes) - .bind_with_graceful_shutdown(conf.address, async { - let _ = cancel.await; - }) - .1 - .await; + .bind_with_graceful_shutdown(conf.address, async move { + let _ = cancel.recv().await; + }).1.await; }; info!("Webserver shutting down"); }