Merge pull request 'Webhook server, DB server rewrite, timeout fix' (#8) from famfo/uberbot:master into master

Reviewed-on: lemonsh/uberbot#8
This commit is contained in:
lemonsh 2022-01-15 12:39:04 -06:00
commit 307918c108
6 changed files with 97 additions and 23 deletions

View file

@ -21,11 +21,11 @@ serde = "1.0"
arrayvec = "0.7"
rand = "0.8"
meval = "0.2"
async-circe = { git = "https://git.karx.xyz/circe/async-circe", default-features = false }
async-circe = { git = "https://git.karx.xyz/circe/async-circe" }
lazy_static = "1.4"
sedregex = "0.2"
rusqlite = { version = "0.26", features = ["bundled"] }
hyper = { version = "0.14", features = ["server"] }
warp = "0.3"
[features]
tls = ["async-circe/tls"]

View file

@ -14,3 +14,6 @@ prefix = "!"
# Web service config
http_listen = "127.0.0.1:8080"
# Git webhook config
git_channel = "#main"

35
src/bots/git.rs Normal file
View file

@ -0,0 +1,35 @@
use serde_json::Value::Null;
use tokio::sync::mpsc::Sender;
pub async fn handle_post(
json: serde_json::Value,
tx: Sender<String>,
) -> Result<impl warp::Reply, warp::Rejection> {
if json["commits"] != Null {
let commits = json["commits"].as_array().unwrap();
let repo = &json["repository"]["full_name"].as_str().unwrap().trim();
if commits.len() != 1 {
tx.send(format!("{} new commits on {}:", commits.len(), repo))
.await
.expect("Failed to send string to main thread");
for commit in commits {
let author = &commit["author"]["name"].as_str().unwrap().trim();
let message = &commit["message"].as_str().unwrap().trim();
tx.send(format!("{} - {}", author, message))
.await
.expect("Failed to send string to main thread");
}
} else {
let author = &json["commits"][0]["author"]["name"]
.as_str()
.unwrap()
.trim();
let message = &json["commits"][0]["message"].as_str().unwrap().trim();
tx.send(format!("New commit on {}: {} - {}", repo, message, author))
.await
.expect("Failed to send string to main thread");
}
}
Ok(warp::reply::with_status("Ok", warp::http::StatusCode::OK))
}

View file

@ -1,3 +1,4 @@
pub mod git;
pub mod leek;
pub mod misc;
pub mod sed;

View file

@ -12,10 +12,11 @@ use serde::Deserialize;
use std::fmt::Write;
use std::fs::File;
use std::io::Read;
use std::net::SocketAddr;
use std::thread;
use std::{collections::HashMap, env};
use std::net::SocketAddr;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing_subscriber::EnvFilter;
// this will be displayed when the help command is used
@ -54,6 +55,7 @@ pub struct AppState {
last_eval: HashMap<String, f64>,
titlebot: Titlebot,
db: ExecutorConnection,
git_channel: String,
}
#[derive(Deserialize)]
@ -68,7 +70,8 @@ struct ClientConf {
spotify_client_secret: String,
prefix: String,
db_path: Option<String>,
http_listen: Option<SocketAddr>
http_listen: Option<SocketAddr>,
git_channel: String,
}
#[tokio::main(flavor = "current_thread")]
@ -96,6 +99,10 @@ async fn main() -> anyhow::Result<()> {
&client_config.spotify_client_secret,
);
let http_listen = client_config
.http_listen
.unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 5000)));
let config = Config::runtime_config(
client_config.channels,
client_config.host,
@ -115,10 +122,12 @@ async fn main() -> anyhow::Result<()> {
last_eval: HashMap::new(),
titlebot: Titlebot::create(spotify_creds).await?,
db: db_conn,
git_channel: client_config.git_channel,
};
let http_listen = client_config.http_listen.unwrap_or_else(|| SocketAddr::from(([127, 0, 0, 1], 5000)));
if let Err(e) = executor(state, http_listen).await {
let (git_tx, git_recv) = channel(512);
if let Err(e) = executor(state, git_tx, git_recv, http_listen).await {
tracing::error!("Error in message loop: {}", e);
}
@ -130,11 +139,22 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
async fn executor(mut state: AppState, http_listen: SocketAddr) -> anyhow::Result<()> {
async fn executor(
mut state: AppState,
git_tx: Sender<String>,
mut git_recv: Receiver<String>,
http_listen: SocketAddr,
) -> anyhow::Result<()> {
let web_db = state.db.clone();
let git_channel = state.git_channel.clone();
select! {
r = web_service::run(web_db, http_listen) => r?,
r = web_service::run(web_db, git_tx, http_listen) => r?,
r = message_loop(&mut state) => r?,
r = git_recv.recv() => {
if let Some(message) = r {
state.client.privmsg(&git_channel, &message).await?;
}
}
_ = terminate_signal() => {
tracing::info!("Sending QUIT message");
state.client.quit(Some("überbot shutting down")).await?;

View file

@ -1,23 +1,38 @@
use std::net::SocketAddr;
use crate::ExecutorConnection;
use std::convert::Infallible;
use std::sync::Arc;
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use std::net::SocketAddr;
use tokio::sync::mpsc::Sender;
use warp::Filter;
pub async fn run(db: ExecutorConnection, listen: SocketAddr) -> anyhow::Result<()> {
let db = Arc::new(db);
pub async fn run(
db: ExecutorConnection,
tx: Sender<String>,
listen: SocketAddr,
) -> anyhow::Result<()> {
let db_filter = warp::any().map(move || db.clone());
let db_filter = warp::get().and(db_filter).and_then(handle);
Server::bind(&listen).serve(make_service_fn(|_| {
let db = Arc::clone(&db);
async move {
Ok::<_, Infallible>(service_fn(move |r| handle(r, Arc::clone(&db))))
}
})).await?;
let tx_filter = warp::any().map(move || tx.clone());
let tx_filter = warp::path("webhook")
.and(warp::post())
.and(warp::body::json())
.and(tx_filter)
.and_then(crate::bots::git::handle_post);
let filter = db_filter.or(tx_filter);
warp::serve(filter).run(listen).await;
Ok(())
}
async fn handle(req: Request<Body>, db: Arc<ExecutorConnection>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from(format!("{:?}", db.get_quote(None).await))))
async fn handle(db: ExecutorConnection) -> Result<impl warp::Reply, warp::Rejection> {
if let Some((a, b)) = db.get_quote(None).await {
Ok(warp::reply::with_status(
format!("{} {}", a, b),
warp::http::StatusCode::OK,
))
} else {
Ok(warp::reply::with_status(
"None".into(),
warp::http::StatusCode::NO_CONTENT,
))
}
}