Parallel message handling

This commit is contained in:
lemonsh 2022-07-28 13:18:40 +02:00
parent ceb64afd26
commit 47c15d8d43
2 changed files with 17 additions and 10 deletions

View file

@ -3,7 +3,7 @@ use crate::ExecutorConnection;
use async_trait::async_trait; use async_trait::async_trait;
use fancy_regex::{Captures, Regex}; use fancy_regex::{Captures, Regex};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::Mutex; use tokio::sync::{Mutex, mpsc};
fn dissect<'a>(prefixes: &[String], str: &'a str) -> Option<(&'a str, Option<&'a str>)> { fn dissect<'a>(prefixes: &[String], str: &'a str) -> Option<(&'a str, Option<&'a str>)> {
for prefix in prefixes { for prefix in prefixes {
@ -108,8 +108,8 @@ impl<SF: Fn(String, String) -> anyhow::Result<()>> Bot<SF> {
Ok(()) Ok(())
} }
pub async fn handle_message(&self, origin: &str, author: &str, content: &str) { pub async fn handle_message(&self, origin: String, author: String, content: String, _cancel_handle: mpsc::Sender<()>) {
if let Err(e) = self.handle_message_inner(origin, author, content).await { if let Err(e) = self.handle_message_inner(&origin, &author, &content).await {
let _err = (self.sendmsg)(origin.into(), format!("Error: {}", e)); let _err = (self.sendmsg)(origin.into(), format!("Error: {}", e));
} }
} }

View file

@ -22,7 +22,7 @@ use irc::client::{Client, ClientStream};
use irc::proto::{ChannelExt, Command, Prefix}; use irc::proto::{ChannelExt, Command, Prefix};
use rspotify::Credentials; use rspotify::Credentials;
use tokio::select; use tokio::select;
use tokio::sync::broadcast; use tokio::sync::{broadcast, mpsc};
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
use tracing::Level; use tracing::Level;
@ -185,23 +185,30 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn message_loop<SF: Fn(String, String) -> anyhow::Result<()>>( async fn message_loop<SF>(
mut stream: ClientStream, mut stream: ClientStream,
bot: Bot<SF>, bot: Bot<SF>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> where SF: Fn(String, String) -> anyhow::Result<()> + Send + Sync + 'static {
let bot = Arc::new(bot);
let (cancelled_send, mut cancelled_recv) = mpsc::channel::<()>(1);
while let Some(message) = stream.next().await.transpose()? { while let Some(message) = stream.next().await.transpose()? {
if let Command::PRIVMSG(ref origin, content) = message.command { if let Command::PRIVMSG(origin, content) = message.command {
if origin.is_channel_name() { if origin.is_channel_name() {
if let Some(author) = message.prefix.as_ref().and_then(|p| match p { if let Some(author) = message.prefix.and_then(|p| match p {
Prefix::Nickname(name, _, _) => Some(&name[..]), Prefix::Nickname(name, _, _) => Some(name),
Prefix::ServerName(_) => None, Prefix::ServerName(_) => None,
}) { }) {
bot.handle_message(origin, author, &content).await; let bot = bot.clone();
let cancelled_send = cancelled_send.clone();
tokio::spawn(async move {
bot.handle_message(origin, author, content, cancelled_send).await;
});
} else { } else {
tracing::warn!("Couldn't get the author for a message"); tracing::warn!("Couldn't get the author for a message");
} }
} }
} }
} }
let _ = cancelled_recv.recv().await;
Ok(()) Ok(())
} }