Compare commits
2 commits
b4975ea05c
...
cf6c4c3d82
Author | SHA1 | Date | |
---|---|---|---|
Yash Karandikar | cf6c4c3d82 | ||
ee15094425 |
7
Cargo.lock
generated
7
Cargo.lock
generated
|
@ -2,6 +2,12 @@
|
|||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.51"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b26702f315f53b6071259e15dd9d64528213b44d61de1ec926eca7715d62203"
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.0.1"
|
||||
|
@ -102,6 +108,7 @@ dependencies = [
|
|||
name = "quiver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
|
|
@ -3,7 +3,8 @@ name = "quiver"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
|
||||
[dependencies.tokio]
|
||||
version = "1.14.0"
|
||||
|
|
161
src/main.rs
161
src/main.rs
|
@ -1,126 +1,95 @@
|
|||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::broadcast::{self, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::{select, spawn};
|
||||
use uuid::Uuid;
|
||||
|
||||
// TODO:
|
||||
// * use the log crate
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Message {
|
||||
msg: String,
|
||||
sender: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum BroadcastMessage {
|
||||
Message(Message),
|
||||
// we can add more types here, for example a graceful shutdown message
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> io::Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:9090").await?;
|
||||
let (sender, _) = broadcast::channel(150);
|
||||
let (br_send, _) = broadcast::channel::<BroadcastMessage>(1024);
|
||||
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
let disconnected = Arc::new(Mutex::new(false));
|
||||
let current_client_id = Arc::new(Mutex::new(None::<String>));
|
||||
let (sockread, sockwrite) = socket.into_split();
|
||||
spawn(connection_read(
|
||||
sockread,
|
||||
disconnected.clone(),
|
||||
current_client_id.clone(),
|
||||
sender.clone(),
|
||||
));
|
||||
spawn(connection_write(
|
||||
sockwrite,
|
||||
disconnected,
|
||||
current_client_id,
|
||||
sender.clone().subscribe(),
|
||||
));
|
||||
let (tx, rx) = (br_send.clone(), br_send.subscribe());
|
||||
spawn(async move {
|
||||
if let Err(e) = connection_handler(socket, tx, rx).await {
|
||||
println!("{:?}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn connection_write(
|
||||
mut sockwrite: OwnedWriteHalf,
|
||||
disconnected: Arc<Mutex<bool>>,
|
||||
current_client_id: Arc<Mutex<Option<String>>>,
|
||||
mut receiver: Receiver<Message>,
|
||||
) {
|
||||
async fn connection_handler(
|
||||
mut sock: TcpStream,
|
||||
br_sender: Sender<BroadcastMessage>,
|
||||
mut br_receiver: Receiver<BroadcastMessage>,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("new client joined");
|
||||
let id = Uuid::new_v4().to_hyphenated().to_string();
|
||||
println!("ID: {}", id);
|
||||
let mut nick = String::from(id.split("-").collect::<Vec<&str>>()[0]);
|
||||
// todo: change this to a broadcast message, see line 22
|
||||
br_sender.send(BroadcastMessage::Message(Message {
|
||||
msg: format!("{} has joined the chat\n", nick),
|
||||
sender: id.clone(),
|
||||
}))?;
|
||||
let (rx, mut tx) = sock.split();
|
||||
let mut bufreader = BufReader::new(rx);
|
||||
loop {
|
||||
let mut buf = String::new();
|
||||
|
||||
select! {
|
||||
disconed = disconnected.lock() => {
|
||||
if *disconed {
|
||||
break;
|
||||
}
|
||||
},
|
||||
ido = current_client_id.lock() => {
|
||||
if let Some(id) = ido.clone() {
|
||||
if let Ok(msg) = receiver.recv().await {
|
||||
if msg.sender != id {
|
||||
if sockwrite.write_all(msg.msg.as_bytes()).await.is_err() {
|
||||
*disconnected.lock().await = true;
|
||||
}
|
||||
o = bufreader.read_line(&mut buf) => {
|
||||
let bytes = o?;
|
||||
if bytes == 0 {
|
||||
break;
|
||||
}
|
||||
if buf.starts_with("MSG") {
|
||||
// todo: change this to a broadcast message, see line 22
|
||||
br_sender.send(BroadcastMessage::Message(Message { msg: format!("{} {}", nick, buf.clone()) , sender: id.clone() }))?;
|
||||
} else if buf.starts_with("NICK") {
|
||||
let old_nick = nick.clone();
|
||||
nick = buf[5..].trim().to_string();
|
||||
// todo: change this to a broadcast message, see line 22
|
||||
br_sender.send(BroadcastMessage::Message(Message { msg: format!("{} {}", old_nick, buf.clone()), sender: id.clone()}))?;
|
||||
}
|
||||
},
|
||||
o = br_receiver.recv() => {
|
||||
let msg = o?;
|
||||
match msg {
|
||||
BroadcastMessage::Message(m) => {
|
||||
if m.sender != id {
|
||||
tx.write_all(m.msg.as_bytes()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connection_read(
|
||||
mut sockread: OwnedReadHalf,
|
||||
disconnected: Arc<Mutex<bool>>,
|
||||
current_client_id: Arc<Mutex<Option<String>>>,
|
||||
sender: Sender<Message>,
|
||||
) {
|
||||
println!("new client joined");
|
||||
let id = tokio::task::spawn_blocking(|| Uuid::new_v4().to_hyphenated().to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
println!("ID: {}", id);
|
||||
let mut nick = String::from(id.split("-").collect::<Vec<&str>>()[0]);
|
||||
sender
|
||||
.send(Message {
|
||||
msg: format!("{} has joined the chat\n", nick),
|
||||
sender: id.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
*current_client_id.lock().await = Some(id.clone());
|
||||
loop {
|
||||
let mut buf = [0u8; 1024];
|
||||
|
||||
select! {
|
||||
disconed = disconnected.lock() => {
|
||||
if *disconed {
|
||||
break;
|
||||
}
|
||||
},
|
||||
resp = sockread.read(&mut buf) => {
|
||||
let bytes = match resp {
|
||||
Ok(0) => {
|
||||
*disconnected.lock().await = true;
|
||||
continue;
|
||||
}, // eof
|
||||
Ok(n) => n,
|
||||
Err(e) => panic!("error occurred: {}", e)
|
||||
};
|
||||
let res = String::from_utf8_lossy(&buf[..bytes]).to_string();
|
||||
print!("{}", res);
|
||||
if res.starts_with("MSG") {
|
||||
sender.send(Message { msg: format!("{} {}", nick, res.clone()) , sender: id.clone() }).unwrap();
|
||||
} else if res.starts_with("NICK") {
|
||||
let old_nick = nick.clone();
|
||||
nick = res[5..].trim().to_string();
|
||||
sender.send(Message { msg: format!("{} {}", old_nick, res.clone()), sender: id.clone()}).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sender
|
||||
.send(Message {
|
||||
msg: format!("{} has disconnected\n", nick),
|
||||
sender: id.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
// todo: change this to a broadcast message, see line 22
|
||||
br_sender.send(BroadcastMessage::Message(Message {
|
||||
msg: format!("{} has disconnected\n", nick),
|
||||
sender: id.clone(),
|
||||
}))?;
|
||||
println!("Client {} disconnected", id);
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue