From d8e30fcf5555f2fc5e06da5d553f9418fba0c134 Mon Sep 17 00:00:00 2001 From: RafayAhmad7548 Date: Thu, 10 Apr 2025 11:28:15 +0500 Subject: [PATCH] first try babbyyyy --- Cargo.lock | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 +- src/main.rs | 86 +++++++++++++++------------------ 3 files changed, 177 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 366b242..d7ecd1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,8 +69,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "clipsync" version = "0.1.0" dependencies = [ + "futures", + "tokio", "tokio-tungstenite", - "tungstenite", ] [[package]] @@ -114,12 +115,65 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -138,9 +192,13 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -203,6 +261,16 @@ version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -244,6 +312,29 @@ dependencies = [ "memchr", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -319,12 +410,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "sha1" version = "0.10.6" @@ -336,6 +442,15 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -345,6 +460,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "socket2" version = "0.5.9" @@ -396,11 +517,25 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-tungstenite" version = "0.26.2" diff --git a/Cargo.toml b/Cargo.toml index 20bb145..ce56a77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "0.3.31" +tokio = { version = "1.44.2", features = ["full"] } tokio-tungstenite = "0.26.2" -tungstenite = "0.26.2" diff --git a/src/main.rs b/src/main.rs index 2e0d809..08bf61e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,62 +1,54 @@ -use std::{collections::HashMap, net::TcpListener, sync::{mpsc::{channel, Receiver, Sender}, Arc, Mutex}, thread::spawn}; -use tungstenite::{accept, Message}; +use std::{collections::HashMap, sync::Arc}; -fn main() { +use futures::{SinkExt, StreamExt}; +use tokio::{net::TcpListener, spawn, sync::{mpsc::{channel, Receiver, Sender}, Mutex}}; +use tokio_tungstenite::{accept_async, tungstenite::Message}; + +#[tokio::main] +async fn main() { let mut connection_count: usize = 0; let senders = Arc::new(Mutex::new(HashMap::>>::new())); - let server = TcpListener::bind("0.0.0.0:9001").unwrap(); - for stream in server.incoming() { + let server = TcpListener::bind("0.0.0.0:9001").await.unwrap(); + + while let Ok((stream, socket)) = server.accept().await { + + println!("Connection Received from ip: {}, port: {}", socket.ip(), socket.port()); let t_senders = Arc::clone(&senders); - let (sender, receiver): (Sender>, Receiver>) = channel(); + let (sender, mut receiver): (Sender>, Receiver>) = channel(100); { - let mut sender_lock = t_senders.lock().unwrap(); + let mut sender_lock = t_senders.lock().await; connection_count += 1; (*sender_lock).insert(connection_count, sender); } - let websocket = Arc::new(Mutex::new(accept(stream.unwrap()).unwrap())); - let r_websocket = Arc::clone(&websocket); - spawn(move || { + let websocket = accept_async(stream).await.unwrap(); + let (mut websocket_send, mut websocket_read) = websocket.split(); + + spawn(async move { let id = connection_count; println!("Connection Received: {}", id); - loop { - let res = { - let mut wssocket_lock = r_websocket.lock().unwrap(); - (*wssocket_lock).read() - }; + + while let Some(res) = websocket_read.next().await { match res { - // Ok(Message::Close(_)) => { - // let mut t_sender_lock = t_senders.lock().unwrap(); - // (*t_sender_lock).remove(&id); - // match (*wssocket_lock).send(Message::Close(Some(CloseFrame{ - // code: CloseCode::Normal, - // reason: "Connection Closed :)".into() - // }))) { - // Ok(_) => println!("Close frame sent succesfully"), - // Err(e) => println!("Failed to send close frame (connection may already be closed): {}", e) - // }; - // println!("closing connection"); - // break; - // } Ok(msg) => { let arc_msg = Arc::new(msg); if arc_msg.is_close() { - let t_sender_lock = t_senders.lock().unwrap(); + let sender_lock = t_senders.lock().await; let arc_msg_clone = Arc::clone(&arc_msg); - match (*t_sender_lock).get(&id).unwrap().send(arc_msg_clone) { + match (*sender_lock).get(&id).unwrap().send(arc_msg_clone).await { Ok(_) => println!("close message sent to thread no {}", id), Err(e) => println!("My Error: {}", e) } break; } else if arc_msg.is_text() || arc_msg.is_binary() { - let t_sender_lock = t_senders.lock().unwrap(); - for (iter_id, sender) in (*t_sender_lock).iter() { + let sender_lock = t_senders.lock().await; + for (iter_id, sender) in (*sender_lock).iter() { let arc_msg_clone = Arc::clone(&arc_msg); if *iter_id != id { - match (*sender).send(arc_msg_clone) { + match (*sender).send(arc_msg_clone).await { Ok(_) => println!("message sent to thread no {}", *iter_id), Err(e) => println!("My Error: {}", e) } @@ -65,36 +57,36 @@ fn main() { } } Err(e) => { - let mut t_sender_lock = t_senders.lock().unwrap(); - (*t_sender_lock).remove(&id); + let mut sender_lock = t_senders.lock().await; + (*sender_lock).remove(&id); println!("MyError Occured: {}", e); println!("Connection Closed"); break; } - } } }); - let s_websocket = Arc::clone(&websocket); - spawn(move || { + spawn(async move { let id = connection_count; loop { - match receiver.recv() { - Ok(arc_msg) => { + match receiver.recv().await { + Some(arc_msg) => { println!("Message received by thread {}", id); println!("The message is {}", arc_msg); - let mut wssocket_lock = s_websocket.lock().unwrap(); - println!("Lock acquired"); let msg = (*arc_msg).clone(); - //TODO add match for this - let _ = (*wssocket_lock).send(msg); - println!("message sent to client {}", id); + match websocket_send.send(msg).await { + Ok(_) => println!("message sent to client {}", id), + Err(e) => { + println!("error in sending to client {}: {}", id, e); + break; + } + } } - Err(e) => { - println!("Error in receiving from other threads: {}", e); + None => { + println!("Error in receiving from other threads"); break; } }