Compare commits

..

No commits in common. "d06aead691705d7cd3cf95d6f72ad0012b9c01e4" and "2c2b55f77693450688140531c11bb0e3979b18ee" have entirely different histories.

2 changed files with 6 additions and 87 deletions

View File

@ -1,64 +0,0 @@
use std::{
sync::{Arc, Mutex},
collections::HashMap,
net::{TcpStream},
io::{Write},
};
pub struct ConnectionPool{
conns: Arc<Mutex<HashMap<u32, TcpStream>>>,
}
impl ConnectionPool{
pub fn new() -> Self {
ConnectionPool{
conns: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn add_connection(&self, iid: u32, conn: TcpStream) {
self.conns.lock().unwrap().insert(iid, conn);
}
pub fn del_connection(&self, iid: u32) {
self.conns.lock().unwrap().remove(&iid);
}
pub fn unicast(&self, recv_iid: &u32, msg: &String) {
let mut del_conn = false;
match self.conns.lock().unwrap().get(recv_iid) {
Some(mut conn) => {
if conn.write_all(msg.as_bytes()).is_err() {
del_conn = true;
}
},
None => {
println!("Unknown recv_iid: {:#?}", recv_iid);
return;
}
}
if del_conn{
self.del_connection(*recv_iid);
}
}
pub fn broadcast(&self, sender_iid: &u32, msg: &String) {
let mut del_conns: Vec<u32> = Vec::new();
for (iid, mut conn) in self.conns.lock().unwrap().iter() {
if sender_iid.ne(iid){
if conn.write_all(msg.as_bytes()).is_err() {
del_conns.push(*iid);
continue;
}
else if conn.write_all(b"\n").is_err() {
del_conns.push(*iid);
}
}
}
for iid in del_conns {
self.del_connection(iid);
}
}
}

View File

@ -2,13 +2,10 @@ use std::{
io::{prelude::*, BufReader, Lines}, io::{prelude::*, BufReader, Lines},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
}; };
use std::sync::{Arc, Mutex};
use std::thread::available_parallelism; use std::thread::available_parallelism;
use crate::connection_pool::ConnectionPool;
use crate::thread_pool::ThreadPool; use crate::thread_pool::ThreadPool;
mod thread_pool; mod thread_pool;
mod connection_pool;
fn main() { fn main() {
@ -17,40 +14,28 @@ fn main() {
// create thread pool (size = no. of cpu cores) // create thread pool (size = no. of cpu cores)
let pool = ThreadPool::new(available_parallelism().unwrap().get()); let pool = ThreadPool::new(available_parallelism().unwrap().get());
//let pool = ThreadPool::new(2);
let conn_pool = Arc::new(Mutex::new(ConnectionPool::new()));
let mut cc: u32 = 0;
// handling function for incoming requests // handling function for incoming requests
for stream in listener.incoming() { for stream in listener.incoming() {
let stream = stream.unwrap(); let stream = stream.unwrap();
conn_pool.lock().unwrap().add_connection(cc, stream.try_clone().unwrap());
let ccc = cc;
let cconn_pool = conn_pool.clone();
// pass request to thread pool // pass request to thread pool
pool.execute(move || { pool.execute(|| {
handle_connection(stream, ccc, cconn_pool); handle_connection(stream);
}); });
cc += 1;
} }
println!("Shutting down."); println!("Shutting down.");
} }
fn handle_connection(mut stream: TcpStream, iid: u32, conn_pool: Arc<Mutex<ConnectionPool>>) { fn handle_connection(mut stream: TcpStream) {
// clone TcpStream for simultaneous receiving and sending // clone TcpStream for simultaneous receiving and sending
//let mut stream2 = stream.try_clone().unwrap(); let mut stream2: TcpStream = stream.try_clone().unwrap();
// initialize reading buffer (for better performance!) // initialize reading buffer (for better performance!)
let buf_reader: BufReader<&mut TcpStream> = BufReader::new(&mut stream); let buf_reader: BufReader<&mut TcpStream> = BufReader::new(&mut stream);
// request: read received text line by line (continuous until connection closed) // request: read received text line by line (continuous until connection closed)
let request: Lines<BufReader<&mut TcpStream>> = buf_reader.lines(); let request: Lines<BufReader<&mut TcpStream>> = buf_reader.lines();
// let request: Bytes<BufReader<&mut TcpStream>> = buf_reader.bytes();
println!("Request: {:#?}", request); println!("Request: {:#?}", request);
@ -61,10 +46,8 @@ fn handle_connection(mut stream: TcpStream, iid: u32, conn_pool: Arc<Mutex<Conne
let s = elem.unwrap(); let s = elem.unwrap();
println!("{:?}", s); println!("{:?}", s);
conn_pool.lock().unwrap().broadcast(&iid, &s);
// send received line back to sender (append LF) // send received line back to sender (append LF)
//stream2.write_all(s.as_bytes()).unwrap(); stream2.write_all(s.as_bytes()).unwrap();
//stream2.write_all(b"\n").unwrap(); stream2.write_all(b"\n").unwrap();
} }
} }