Compare commits

...

2 commits

3 changed files with 74 additions and 97 deletions

7
Cargo.lock generated
View file

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "anyhow"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b26702f315f53b6071259e15dd9d64528213b44d61de1ec926eca7715d62203"
[[package]]
name = "autocfg"
version = "1.0.1"
@ -102,6 +108,7 @@ dependencies = [
name = "quiver"
version = "0.1.0"
dependencies = [
"anyhow",
"tokio",
"uuid",
]

View file

@ -3,7 +3,8 @@ name = "quiver"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
[dependencies.tokio]
version = "1.14.0"

View file

@ -1,126 +1,95 @@
use std::io;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpListener;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::{select, spawn};
use uuid::Uuid;
// TODO:
// * use the log crate
#[derive(Clone, Debug)]
struct Message {
msg: String,
sender: String,
}
#[derive(Clone, Debug)]
enum BroadcastMessage {
Message(Message),
// we can add more types here, for example a graceful shutdown message
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:9090").await?;
let (sender, _) = broadcast::channel(150);
let (br_send, _) = broadcast::channel::<BroadcastMessage>(1024);
loop {
let (socket, _) = listener.accept().await?;
let disconnected = Arc::new(Mutex::new(false));
let current_client_id = Arc::new(Mutex::new(None::<String>));
let (sockread, sockwrite) = socket.into_split();
spawn(connection_read(
sockread,
disconnected.clone(),
current_client_id.clone(),
sender.clone(),
));
spawn(connection_write(
sockwrite,
disconnected,
current_client_id,
sender.clone().subscribe(),
));
let (tx, rx) = (br_send.clone(), br_send.subscribe());
spawn(async move {
if let Err(e) = connection_handler(socket, tx, rx).await {
println!("{:?}", e);
}
});
}
}
async fn connection_write(
mut sockwrite: OwnedWriteHalf,
disconnected: Arc<Mutex<bool>>,
current_client_id: Arc<Mutex<Option<String>>>,
mut receiver: Receiver<Message>,
) {
async fn connection_handler(
mut sock: TcpStream,
br_sender: Sender<BroadcastMessage>,
mut br_receiver: Receiver<BroadcastMessage>,
) -> anyhow::Result<()> {
println!("new client joined");
let id = Uuid::new_v4().to_hyphenated().to_string();
println!("ID: {}", id);
let mut nick = String::from(id.split("-").collect::<Vec<&str>>()[0]);
// todo: change this to a broadcast message, see line 22
br_sender.send(BroadcastMessage::Message(Message {
msg: format!("{} has joined the chat\n", nick),
sender: id.clone(),
}))?;
let (rx, mut tx) = sock.split();
let mut bufreader = BufReader::new(rx);
loop {
let mut buf = String::new();
select! {
disconed = disconnected.lock() => {
if *disconed {
break;
}
},
ido = current_client_id.lock() => {
if let Some(id) = ido.clone() {
if let Ok(msg) = receiver.recv().await {
if msg.sender != id {
if sockwrite.write_all(msg.msg.as_bytes()).await.is_err() {
*disconnected.lock().await = true;
}
o = bufreader.read_line(&mut buf) => {
let bytes = o?;
if bytes == 0 {
break;
}
if buf.starts_with("MSG") {
// todo: change this to a broadcast message, see line 22
br_sender.send(BroadcastMessage::Message(Message { msg: format!("{} {}", nick, buf.clone()) , sender: id.clone() }))?;
} else if buf.starts_with("NICK") {
let old_nick = nick.clone();
nick = buf[5..].trim().to_string();
// todo: change this to a broadcast message, see line 22
br_sender.send(BroadcastMessage::Message(Message { msg: format!("{} {}", old_nick, buf.clone()), sender: id.clone()}))?;
}
},
o = br_receiver.recv() => {
let msg = o?;
match msg {
BroadcastMessage::Message(m) => {
if m.sender != id {
tx.write_all(m.msg.as_bytes()).await?;
}
}
}
}
}
}
}
async fn connection_read(
mut sockread: OwnedReadHalf,
disconnected: Arc<Mutex<bool>>,
current_client_id: Arc<Mutex<Option<String>>>,
sender: Sender<Message>,
) {
println!("new client joined");
let id = tokio::task::spawn_blocking(|| Uuid::new_v4().to_hyphenated().to_string())
.await
.unwrap();
println!("ID: {}", id);
let mut nick = String::from(id.split("-").collect::<Vec<&str>>()[0]);
sender
.send(Message {
msg: format!("{} has joined the chat\n", nick),
sender: id.clone(),
})
.unwrap();
*current_client_id.lock().await = Some(id.clone());
loop {
let mut buf = [0u8; 1024];
select! {
disconed = disconnected.lock() => {
if *disconed {
break;
}
},
resp = sockread.read(&mut buf) => {
let bytes = match resp {
Ok(0) => {
*disconnected.lock().await = true;
continue;
}, // eof
Ok(n) => n,
Err(e) => panic!("error occurred: {}", e)
};
let res = String::from_utf8_lossy(&buf[..bytes]).to_string();
print!("{}", res);
if res.starts_with("MSG") {
sender.send(Message { msg: format!("{} {}", nick, res.clone()) , sender: id.clone() }).unwrap();
} else if res.starts_with("NICK") {
let old_nick = nick.clone();
nick = res[5..].trim().to_string();
sender.send(Message { msg: format!("{} {}", old_nick, res.clone()), sender: id.clone()}).unwrap();
}
}
}
}
sender
.send(Message {
msg: format!("{} has disconnected\n", nick),
sender: id.clone(),
})
.unwrap();
// todo: change this to a broadcast message, see line 22
br_sender.send(BroadcastMessage::Message(Message {
msg: format!("{} has disconnected\n", nick),
sender: id.clone(),
}))?;
println!("Client {} disconnected", id);
Ok(())
}