use std::{collections::HashMap, sync::Arc}; use futures::{SinkExt, StreamExt}; use tokio::{net::TcpListener, spawn, sync::{mpsc::{channel, Receiver, Sender}, Mutex}}; use tokio_tungstenite::{accept_async, tungstenite::Message}; #[tokio::main] async fn main() { let mut connection_count: usize = 0; let senders = Arc::new(Mutex::new(HashMap::>>::new())); let server = TcpListener::bind("0.0.0.0:9001").await.unwrap(); while let Ok((stream, socket)) = server.accept().await { println!("Connection Received from ip: {}, port: {}", socket.ip(), socket.port()); let t_senders = Arc::clone(&senders); let (sender, mut receiver): (Sender>, Receiver>) = channel(100); { let mut sender_lock = t_senders.lock().await; connection_count += 1; (*sender_lock).insert(connection_count, sender); } let websocket = accept_async(stream).await.unwrap(); let (mut websocket_send, mut websocket_read) = websocket.split(); spawn(async move { let id = connection_count; println!("Connection Received: {}", id); while let Some(res) = websocket_read.next().await { match res { Ok(msg) => { let arc_msg = Arc::new(msg); if arc_msg.is_close() { let sender_lock = t_senders.lock().await; let arc_msg_clone = Arc::clone(&arc_msg); match (*sender_lock).get(&id).unwrap().send(arc_msg_clone).await { Ok(_) => println!("close message sent to thread no {}", id), Err(e) => println!("My Error: {}", e) } break; } else if arc_msg.is_text() || arc_msg.is_binary() { let sender_lock = t_senders.lock().await; for (iter_id, sender) in (*sender_lock).iter() { let arc_msg_clone = Arc::clone(&arc_msg); if *iter_id != id { match (*sender).send(arc_msg_clone).await { Ok(_) => println!("message sent to thread no {}", *iter_id), Err(e) => println!("My Error: {}", e) } } } } } Err(e) => { let mut sender_lock = t_senders.lock().await; (*sender_lock).remove(&id); println!("MyError Occured: {}", e); println!("Connection Closed"); break; } } } }); spawn(async move { let id = connection_count; loop { match receiver.recv().await { Some(arc_msg) => { println!("Message received by thread {}", id); println!("The message is {}", arc_msg); let msg = (*arc_msg).clone(); match websocket_send.send(msg).await { Ok(_) => println!("message sent to client {}", id), Err(e) => { println!("error in sending to client {}: {}", id, e); break; } } } None => { println!("Error in receiving from other threads"); break; } } } }); } }