From 2ee738b355f7bfc5b896600e9815f1d4f4bde0d3 Mon Sep 17 00:00:00 2001 From: Andrew Onuchowski Date: Thu, 15 Dec 2022 12:21:10 +0100 Subject: [PATCH] connection_pool; unencrypted text group chat functional --- src/bin/connection_pool.rs | 64 ++++++++++++++++++++++++++++++++++++++ src/bin/server.rs | 29 +++++++++++++---- 2 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 src/bin/connection_pool.rs diff --git a/src/bin/connection_pool.rs b/src/bin/connection_pool.rs new file mode 100644 index 0000000..41d8c65 --- /dev/null +++ b/src/bin/connection_pool.rs @@ -0,0 +1,64 @@ +use std::{ + sync::{Arc, Mutex}, + collections::HashMap, + net::{TcpStream}, + io::{Write}, +}; + +pub struct ConnectionPool{ + conns: Arc>>, +} + +impl ConnectionPool{ + + pub fn new() -> Self { + ConnectionPool{ + conns: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn add_connection(&self, iid: u32, conn: TcpStream) { + self.conns.lock().unwrap().insert(iid, conn); + } + + pub fn del_connection(&self, iid: u32) { + self.conns.lock().unwrap().remove(&iid); + } + + pub fn unicast(&self, recv_iid: &u32, msg: &String) { + let mut del_conn = false; + + match self.conns.lock().unwrap().get(recv_iid) { + Some(mut conn) => { + if conn.write_all(msg.as_bytes()).is_err() { + del_conn = true; + } + }, + None => { + println!("Unknown recv_iid: {:#?}", recv_iid); + return; + } + } + if del_conn{ + self.del_connection(*recv_iid); + } + } + + pub fn broadcast(&self, sender_iid: &u32, msg: &String) { + let mut del_conns: Vec = Vec::new(); + for (iid, mut conn) in self.conns.lock().unwrap().iter() { + if sender_iid.ne(iid){ + if conn.write_all(msg.as_bytes()).is_err() { + del_conns.push(*iid); + continue; + } + else if conn.write_all(b"\n").is_err() { + del_conns.push(*iid); + } + } + } + for iid in del_conns { + self.del_connection(iid); + } + } +} \ No newline at end of file diff --git a/src/bin/server.rs b/src/bin/server.rs index ffb2155..6c77232 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -2,10 +2,13 @@ use std::{ io::{prelude::*, BufReader, Lines}, net::{TcpListener, TcpStream}, }; +use std::sync::{Arc, Mutex}; use std::thread::available_parallelism; +use crate::connection_pool::ConnectionPool; use crate::thread_pool::ThreadPool; mod thread_pool; +mod connection_pool; fn main() { @@ -14,28 +17,40 @@ fn main() { // create thread pool (size = no. of cpu cores) let pool = ThreadPool::new(available_parallelism().unwrap().get()); + //let pool = ThreadPool::new(2); + + let conn_pool = Arc::new(Mutex::new(ConnectionPool::new())); + let mut cc: u32 = 0; // handling function for incoming requests for stream in listener.incoming() { let stream = stream.unwrap(); + conn_pool.lock().unwrap().add_connection(cc, stream.try_clone().unwrap()); + + let ccc = cc; + let cconn_pool = conn_pool.clone(); + // pass request to thread pool - pool.execute(|| { - handle_connection(stream); + pool.execute(move || { + handle_connection(stream, ccc, cconn_pool); }); + + cc += 1; } println!("Shutting down."); } -fn handle_connection(mut stream: TcpStream) { +fn handle_connection(mut stream: TcpStream, iid: u32, conn_pool: Arc>) { // clone TcpStream for simultaneous receiving and sending - let mut stream2: TcpStream = stream.try_clone().unwrap(); + //let mut stream2 = stream.try_clone().unwrap(); // initialize reading buffer (for better performance!) let buf_reader: BufReader<&mut TcpStream> = BufReader::new(&mut stream); // request: read received text line by line (continuous until connection closed) let request: Lines> = buf_reader.lines(); + // let request: Bytes> = buf_reader.bytes(); println!("Request: {:#?}", request); @@ -46,8 +61,10 @@ fn handle_connection(mut stream: TcpStream) { let s = elem.unwrap(); println!("{:?}", s); + conn_pool.lock().unwrap().broadcast(&iid, &s); + // send received line back to sender (append LF) - stream2.write_all(s.as_bytes()).unwrap(); - stream2.write_all(b"\n").unwrap(); + //stream2.write_all(s.as_bytes()).unwrap(); + //stream2.write_all(b"\n").unwrap(); } }