Compare commits

...

5 commits

2 changed files with 55 additions and 49 deletions

5
README.md Normal file
View file

@ -0,0 +1,5 @@
# Quiver
Quiver is an experimental chat protocol written in Rust.
You can run it using `cargo run` on `rustc 1.57.0`. The default port is 9000.

View file

@ -3,62 +3,62 @@ use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::{select, spawn};
use uuid::Uuid;
#[derive(Clone, Debug, Default)]
struct Client {
uuid: String,
queue: Vec<String>,
#[derive(Clone, Debug)]
struct Message {
msg: String,
sender: String,
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:9090").await?;
let clients: Arc<RwLock<Vec<Client>>> = Arc::new(RwLock::new(Vec::new()));
let (sender, _) = broadcast::channel(150);
loop {
let (socket, _) = listener.accept().await?;
let disconnected = Arc::new(RwLock::new(false));
let current_client_id = Arc::new(RwLock::new(None::<String>));
let disconnected = Arc::new(Mutex::new(false));
let current_client_id = Arc::new(Mutex::new(None::<String>));
let (sockread, sockwrite) = socket.into_split();
spawn(connection_read(
sockread,
clients.clone(),
disconnected.clone(),
current_client_id.clone(),
sender.clone(),
));
spawn(connection_write(
sockwrite,
disconnected,
current_client_id,
clients.clone(),
sender.clone().subscribe(),
));
}
}
async fn connection_write(
mut sockwrite: OwnedWriteHalf,
disconnected: Arc<RwLock<bool>>,
current_client_id: Arc<RwLock<Option<String>>>,
clients: Arc<RwLock<Vec<Client>>>,
disconnected: Arc<Mutex<bool>>,
current_client_id: Arc<Mutex<Option<String>>>,
mut receiver: Receiver<Message>,
) {
loop {
select! {
disconed = disconnected.read() => {
if *disconed {
break;
}
},
cliento = current_client_id.read() => {
if let Some(id) = &*cliento {
let clients = clients.read().await;
let client = clients.iter().find(|x| x.uuid == *id).expect("Congratulations, you broke it.");
for msg in &client.queue {
println!("msg {}", msg);
if sockwrite.write_all(msg.as_bytes()).await.is_err() {
*disconnected.write().await = true;
disconed = disconnected.lock() => {
if *disconed {
break;
}
},
ido = current_client_id.lock() => {
if let Some(id) = ido.clone() {
if let Ok(msg) = receiver.recv().await {
if msg.sender != id {
if sockwrite.write_all(msg.msg.as_bytes()).await.is_err() {
*disconnected.lock().await = true;
}
}
}
}
@ -69,26 +69,28 @@ async fn connection_write(
async fn connection_read(
mut sockread: OwnedReadHalf,
clients: Arc<RwLock<Vec<Client>>>,
disconnected: Arc<RwLock<bool>>,
current_client_id: Arc<RwLock<Option<String>>>,
disconnected: Arc<Mutex<bool>>,
current_client_id: Arc<Mutex<Option<String>>>,
sender: Sender<Message>,
) {
println!("new client joined");
let id = tokio::task::spawn_blocking(|| Uuid::new_v4().to_hyphenated().to_string())
.await
.unwrap();
println!("ID: {}", id);
let client = Client {
uuid: id.clone(),
queue: Vec::new(),
};
*current_client_id.write().await = Some(client.clone().uuid);
clients.write().await.push(client);
let mut nick = String::from(id.split("-").collect::<Vec<&str>>()[0]);
sender
.send(Message {
msg: format!("{} has joined the chat\n", nick),
sender: id.clone(),
})
.unwrap();
*current_client_id.lock().await = Some(id.clone());
loop {
let mut buf = [0u8; 1024];
select! {
disconed = disconnected.read() => {
disconed = disconnected.lock() => {
if *disconed {
break;
}
@ -96,7 +98,7 @@ async fn connection_read(
resp = sockread.read(&mut buf) => {
let bytes = match resp {
Ok(0) => {
*disconnected.write().await = true;
*disconnected.lock().await = true;
continue;
}, // eof
Ok(n) => n,
@ -105,21 +107,20 @@ async fn connection_read(
let res = String::from_utf8_lossy(&buf[..bytes]).to_string();
print!("{}", res);
if res.starts_with("MSG") {
broadcast(clients.clone(), id.clone(), res).await;
sender.send(Message { msg: format!("{} {}", nick, res.clone()) , sender: id.clone() }).unwrap();
} else if res.starts_with("NICK") {
let old_nick = nick.clone();
nick = res[5..].trim().to_string();
sender.send(Message { msg: format!("{} {}", old_nick, res.clone()), sender: id.clone()}).unwrap();
}
}
}
}
sender
.send(Message {
msg: format!("{} has disconnected\n", nick),
sender: id.clone(),
})
.unwrap();
println!("Client {} disconnected", id);
}
async fn broadcast(clients: Arc<RwLock<Vec<Client>>>, id: String, msg: String) {
for mut client in clients.write().await.clone() {
println!("{} {}", client.uuid, id);
if client.uuid != id {
println!("pushing...");
client.queue.push(msg.clone());
println!("pushed");
}
}
}