From 67500f8e988de7195b190824a7ce07a6a8e9df98 Mon Sep 17 00:00:00 2001 From: Yash Karandikar Date: Thu, 7 Jul 2022 11:52:39 +0530 Subject: [PATCH] Attempt to figure out why it crashes --- Cargo.lock | 5 +- Cargo.toml | 1 + src/irc_discord.rs | 118 ++++++++++++++++++++++++++++++++++----------- src/main.rs | 6 +-- 4 files changed, 96 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3eb1429..02eb1c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,6 +206,7 @@ dependencies = [ "serde", "serenity", "tokio", + "tokio-stream", "toml", "vergen", ] @@ -1476,9 +1477,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 0e94380..3df5251 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ serde = "1.0" lazy_static = "1.4" pulldown-cmark = "0.9.1" fancy-regex = "0.10.0" +tokio-stream = "0.1.9" [dependencies.tokio] version = "1.15.0" diff --git a/src/irc_discord.rs b/src/irc_discord.rs index 6438d04..d40e23d 100644 --- a/src/irc_discord.rs +++ b/src/irc_discord.rs @@ -2,7 +2,9 @@ use irc::{client::Client as IrcClient, proto::Command}; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::unbounded_channel, Mutex}; + +use tokio_stream::wrappers::UnboundedReceiverStream; use serenity::{ futures::StreamExt, @@ -27,6 +29,7 @@ macro_rules! unwrap_or_continue { }; } +#[allow(clippy::too_many_lines)] // missing, fight me pub async fn irc_loop( mut client: IrcClient, http: Arc, @@ -34,6 +37,9 @@ pub async fn irc_loop( webhooks: HashMap, members: Arc>>, ) -> anyhow::Result<()> { + let (send, recv) = unbounded_channel(); + tokio::spawn(msg_task(UnboundedReceiverStream::new(recv))); + let mut avatar_cache: HashMap> = HashMap::new(); let mut id_cache: HashMap> = HashMap::new(); let mut channel_users: HashMap> = HashMap::new(); @@ -63,7 +69,6 @@ pub async fn irc_loop( }; let nickname = unwrap_or_continue!(orig_message.source_nickname()); - if let Command::PRIVMSG(ref channel, ref message) = orig_message.command { let channel_id = ChannelId::from(*unwrap_or_continue!(mapping.get(channel))); @@ -90,19 +95,20 @@ pub async fn irc_loop( }) }); - webhook - .execute(&http, false, |w| { - if let Some(ref url) = avatar { - w.avatar_url(url); - } - - w.username(nickname).content(computed) - }) - .await?; + let m = QueuedMessage::Webhook { + webhook: webhook.clone(), + http: http.clone(), + avatar_url: avatar.clone(), + content: computed, + nickname: nickname.to_string(), + }; + send.send(m)?; } else { - channel_id - .say(&http, format!("<{}> {}", nickname, computed)) - .await?; + send.send(QueuedMessage::Raw { + channel_id, + http: http.clone(), + message: format!("<{}>, {}", nickname, computed), + })?; } } else if let Command::JOIN(ref channel, _, _) = orig_message.command { let channel_id = ChannelId::from(*unwrap_or_continue!(mapping.get(channel))); @@ -110,9 +116,11 @@ pub async fn irc_loop( users.push(nickname.to_string()); - channel_id - .say(&http, format!("*{}* has joined the channel", nickname)) - .await?; + send.send(QueuedMessage::Raw { + channel_id, + http: http.clone(), + message: format!("*{}* has joined the channel", nickname), + })?; } else if let Command::PART(ref channel, ref reason) = orig_message.command { let users = unwrap_or_continue!(channel_users.get_mut(channel)); let channel_id = ChannelId::from(*unwrap_or_continue!(mapping.get(channel))); @@ -122,9 +130,11 @@ pub async fn irc_loop( let reason = reason.as_deref().unwrap_or("Connection closed"); - channel_id - .say(&http, format!("*{}* has quit ({})", nickname, reason)) - .await?; + send.send(QueuedMessage::Raw { + channel_id, + http: http.clone(), + message: format!("*{}* has quit ({})", nickname, reason), + })?; } else if let Command::QUIT(ref reason) = orig_message.command { for (channel, users) in &mut channel_users { let channel_id = ChannelId::from(*unwrap_or_continue!(mapping.get(channel))); @@ -134,9 +144,11 @@ pub async fn irc_loop( let reason = reason.as_deref().unwrap_or("Connection closed"); - channel_id - .say(&http, format!("*{}* has quit ({})", nickname, reason)) - .await?; + send.send(QueuedMessage::Raw { + channel_id, + http: http.clone(), + message: format!("*{}* has quit ({})", nickname, reason), + })?; } } else if let Command::NICK(ref new_nick) = orig_message.command { for (channel, users) in &mut channel_users { @@ -145,12 +157,11 @@ pub async fn irc_loop( users[pos] = new_nick.to_string(); - channel_id - .say( - &http, - format!("*{}* is now known as *{}*", nickname, new_nick), - ) - .await?; + send.send(QueuedMessage::Raw { + channel_id, + http: http.clone(), + message: format!("*{}* is now known as *{}*", nickname, new_nick), + })?; } } } @@ -263,3 +274,52 @@ fn irc_to_discord_processing( computed } + +#[allow(clippy::large_enum_variant)] // lmao +#[derive(Debug)] +enum QueuedMessage { + Webhook { + webhook: Webhook, + http: Arc, + avatar_url: Option, + content: String, + nickname: String, + }, + Raw { + channel_id: ChannelId, + http: Arc, + message: String, + }, +} + +async fn msg_task(mut recv: UnboundedReceiverStream) -> anyhow::Result<()> { + while let Some(msg) = recv.next().await { + match msg { + QueuedMessage::Webhook { + webhook, + http, + avatar_url, + content, + nickname, + } => { + webhook + .execute(&http, true, |w| { + if let Some(ref url) = avatar_url { + w.avatar_url(url); + } + + w.username(nickname).content(content) + }) + .await?; + } + QueuedMessage::Raw { + channel_id, + http, + message, + } => { + channel_id.say(&http, message).await?; + } + } + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index c916b33..88d6de2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,12 +144,12 @@ async fn main() -> anyhow::Result<()> { } select! { - r = irc_loop(irc_client, http.clone(), channels.clone(), webhooks_transformed, members) => r?, - r = discord_client.start() => r?, + r = irc_loop(irc_client, http.clone(), channels.clone(), webhooks_transformed, members) => r.unwrap(), + r = discord_client.start() => r.unwrap(), _ = terminate_signal() => { for (_, &v) in channels.iter() { let channel_id = ChannelId::from(v); - channel_id.say(&http, format!("dircord shutting down! (dircord {}-{})", env!("VERGEN_GIT_BRANCH"), &env!("VERGEN_GIT_SHA")[..7])).await?; + channel_id.say(&http, format!("dircord shutting down! (dircord {}-{})", env!("VERGEN_GIT_BRANCH"), &env!("VERGEN_GIT_SHA")[..7])).await.unwrap(); } }, }