Implement the daily cleanup task
This commit is contained in:
parent
c58db54696
commit
608a2e8797
71
src/main.rs
71
src/main.rs
|
@ -6,11 +6,14 @@ use std::{env, fs, thread};
|
|||
use crate::database::{DbExecutor, ExecutorConnection};
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use warp::http::{Response, StatusCode};
|
||||
use warp::hyper::body::Bytes;
|
||||
use warp::path::FullPath;
|
||||
use warp::{any, body, header, path, query, Filter, Reply};
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::Duration;
|
||||
|
||||
mod database;
|
||||
mod discord;
|
||||
|
@ -41,6 +44,7 @@ struct Config {
|
|||
webhook: String,
|
||||
secret: String,
|
||||
dbpath: String,
|
||||
cleanup: Option<u64>
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
@ -56,7 +60,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let config_path = config_var.as_deref().unwrap_or("xuproxy.toml");
|
||||
info!("Loading config '{}'", config_path);
|
||||
let config_str = fs::read_to_string(config_path)?;
|
||||
let config: Config = toml::from_str(&config_str)?;
|
||||
let config: Arc<Config> = Arc::new(toml::from_str(&config_str)?);
|
||||
info!("Initializing database...");
|
||||
let (mut db_exec, db_conn) = DbExecutor::create(&config.dbpath)?;
|
||||
let executor_thread = thread::spawn(move || {
|
||||
|
@ -64,14 +68,16 @@ async fn main() -> anyhow::Result<()> {
|
|||
log::info!("Database executor shutting down");
|
||||
});
|
||||
|
||||
let (ctx, crx) = oneshot::channel();
|
||||
let server_task = tokio::spawn(run_server(config, db_conn, crx));
|
||||
let (ctx, _) = broadcast::channel(1);
|
||||
let server_task = tokio::spawn(run_server(config.clone(), db_conn.clone(), ctx.subscribe()));
|
||||
let cleanup_task = tokio::spawn(cleanup_task(db_conn, config, ctx.subscribe()));
|
||||
terminate_signal().await;
|
||||
info!("Shutdown signal received, powering down");
|
||||
let _ = ctx.send(());
|
||||
server_task
|
||||
.await
|
||||
server_task.await
|
||||
.unwrap_or_else(|e| error!("Couldn't await the server task: {}", e));
|
||||
cleanup_task.await
|
||||
.unwrap_or_else(|e| error!("Couldn't await the cleanup task: {}", e));
|
||||
executor_thread.join().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
@ -109,13 +115,14 @@ async fn handle_put(
|
|||
debug!("Token '{}' doesn't match HMAC secret", query.v);
|
||||
return StatusCode::FORBIDDEN;
|
||||
}
|
||||
|
||||
match discord::upload_webhook(&config.webhook, body, filename_str).await {
|
||||
Err(e) => {
|
||||
warn!("Could not upload file to Discord: {}", e);
|
||||
StatusCode::FORBIDDEN
|
||||
}
|
||||
Ok(url) => {
|
||||
if db.add_file(filename_str.to_string(), url).await {
|
||||
Ok(o) => {
|
||||
if db.add_file(filename_str.to_string(), o.0, o.1).await {
|
||||
StatusCode::CREATED
|
||||
} else {
|
||||
StatusCode::FORBIDDEN
|
||||
|
@ -124,6 +131,27 @@ async fn handle_put(
|
|||
}
|
||||
}
|
||||
|
||||
async fn cleanup_task(db: ExecutorConnection, conf: Arc<Config>, mut cancel: Receiver<()>) {
|
||||
let older_than = conf.cleanup.unwrap()*3600;
|
||||
loop {
|
||||
debug!("Starting daily cleanup...");
|
||||
if let Some(mids) = db.cleanup(older_than).await {
|
||||
let midslen = mids.len();
|
||||
for mid in mids {
|
||||
if let Err(e) = discord::delete(&conf.webhook, mid).await {
|
||||
warn!("Couldn't delete message {}: {}", mid, e);
|
||||
}
|
||||
}
|
||||
info!("Daily cleanup complete! Removed {} entries", midslen);
|
||||
}
|
||||
tokio::select! {
|
||||
_ = sleep(Duration::from_secs(86400)) => {},
|
||||
_ = cancel.recv() => break
|
||||
}
|
||||
}
|
||||
info!("Daily cleanup task has been shut down");
|
||||
}
|
||||
|
||||
async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: bool) -> impl Reply {
|
||||
let filename_str = &filename.as_str()[1..];
|
||||
debug!("Received GET request, name({})", filename_str);
|
||||
|
@ -139,9 +167,10 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo
|
|||
return Response::builder()
|
||||
.header("Content-Length", o.0)
|
||||
.header("Content-Type", o.1)
|
||||
.body("").into_response()
|
||||
.body("")
|
||||
.into_response()
|
||||
}
|
||||
Err(e) => e
|
||||
Err(e) => e,
|
||||
}
|
||||
} else {
|
||||
match discord::get(&url).await {
|
||||
|
@ -153,7 +182,7 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo
|
|||
.unwrap()
|
||||
.into_response();
|
||||
}
|
||||
Err(e) => e
|
||||
Err(e) => e,
|
||||
}
|
||||
};
|
||||
warn!("Could not retrieve '{}' from Discord: {}", url, err);
|
||||
|
@ -163,9 +192,7 @@ async fn handle_get_n_head(filename: FullPath, db: ExecutorConnection, head: boo
|
|||
}
|
||||
}
|
||||
|
||||
async fn run_server(conf: Config, db: ExecutorConnection, cancel: oneshot::Receiver<()>) {
|
||||
let conf = Arc::new(conf);
|
||||
|
||||
async fn run_server(conf: Arc<Config>, db: ExecutorConnection, mut cancel: Receiver<()>) {
|
||||
let put_route = warp::put()
|
||||
.and(path::full())
|
||||
.and(header::<u64>("content-length"))
|
||||
|
@ -206,18 +233,14 @@ async fn run_server(conf: Config, db: ExecutorConnection, cancel: oneshot::Recei
|
|||
.tls()
|
||||
.cert_path(&tls.cert)
|
||||
.key_path(&tls.key)
|
||||
.bind_with_graceful_shutdown(conf.address, async {
|
||||
let _ = cancel.await;
|
||||
})
|
||||
.1
|
||||
.await;
|
||||
.bind_with_graceful_shutdown(conf.address, async move {
|
||||
let _ = cancel.recv().await;
|
||||
}).1.await;
|
||||
} else {
|
||||
warp::serve(routes)
|
||||
.bind_with_graceful_shutdown(conf.address, async {
|
||||
let _ = cancel.await;
|
||||
})
|
||||
.1
|
||||
.await;
|
||||
.bind_with_graceful_shutdown(conf.address, async move {
|
||||
let _ = cancel.recv().await;
|
||||
}).1.await;
|
||||
};
|
||||
info!("Webserver shutting down");
|
||||
}
|
||||
|
|
Reference in a new issue