connection_pool;
unencrypted text group chat functional
This commit is contained in:
parent
e943e4cbb5
commit
2ee738b355
64
src/bin/connection_pool.rs
Normal file
64
src/bin/connection_pool.rs
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
use std::{
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
collections::HashMap,
|
||||||
|
net::{TcpStream},
|
||||||
|
io::{Write},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct ConnectionPool{
|
||||||
|
conns: Arc<Mutex<HashMap<u32, TcpStream>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionPool{
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
ConnectionPool{
|
||||||
|
conns: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_connection(&self, iid: u32, conn: TcpStream) {
|
||||||
|
self.conns.lock().unwrap().insert(iid, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_connection(&self, iid: u32) {
|
||||||
|
self.conns.lock().unwrap().remove(&iid);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unicast(&self, recv_iid: &u32, msg: &String) {
|
||||||
|
let mut del_conn = false;
|
||||||
|
|
||||||
|
match self.conns.lock().unwrap().get(recv_iid) {
|
||||||
|
Some(mut conn) => {
|
||||||
|
if conn.write_all(msg.as_bytes()).is_err() {
|
||||||
|
del_conn = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
println!("Unknown recv_iid: {:#?}", recv_iid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if del_conn{
|
||||||
|
self.del_connection(*recv_iid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn broadcast(&self, sender_iid: &u32, msg: &String) {
|
||||||
|
let mut del_conns: Vec<u32> = Vec::new();
|
||||||
|
for (iid, mut conn) in self.conns.lock().unwrap().iter() {
|
||||||
|
if sender_iid.ne(iid){
|
||||||
|
if conn.write_all(msg.as_bytes()).is_err() {
|
||||||
|
del_conns.push(*iid);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if conn.write_all(b"\n").is_err() {
|
||||||
|
del_conns.push(*iid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for iid in del_conns {
|
||||||
|
self.del_connection(iid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,10 +2,13 @@ use std::{
|
|||||||
io::{prelude::*, BufReader, Lines},
|
io::{prelude::*, BufReader, Lines},
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
};
|
};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::available_parallelism;
|
use std::thread::available_parallelism;
|
||||||
|
use crate::connection_pool::ConnectionPool;
|
||||||
use crate::thread_pool::ThreadPool;
|
use crate::thread_pool::ThreadPool;
|
||||||
|
|
||||||
mod thread_pool;
|
mod thread_pool;
|
||||||
|
mod connection_pool;
|
||||||
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -14,28 +17,40 @@ fn main() {
|
|||||||
|
|
||||||
// create thread pool (size = no. of cpu cores)
|
// create thread pool (size = no. of cpu cores)
|
||||||
let pool = ThreadPool::new(available_parallelism().unwrap().get());
|
let pool = ThreadPool::new(available_parallelism().unwrap().get());
|
||||||
|
//let pool = ThreadPool::new(2);
|
||||||
|
|
||||||
|
let conn_pool = Arc::new(Mutex::new(ConnectionPool::new()));
|
||||||
|
let mut cc: u32 = 0;
|
||||||
|
|
||||||
// handling function for incoming requests
|
// handling function for incoming requests
|
||||||
for stream in listener.incoming() {
|
for stream in listener.incoming() {
|
||||||
let stream = stream.unwrap();
|
let stream = stream.unwrap();
|
||||||
|
|
||||||
|
conn_pool.lock().unwrap().add_connection(cc, stream.try_clone().unwrap());
|
||||||
|
|
||||||
|
let ccc = cc;
|
||||||
|
let cconn_pool = conn_pool.clone();
|
||||||
|
|
||||||
// pass request to thread pool
|
// pass request to thread pool
|
||||||
pool.execute(|| {
|
pool.execute(move || {
|
||||||
handle_connection(stream);
|
handle_connection(stream, ccc, cconn_pool);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
cc += 1;
|
||||||
}
|
}
|
||||||
println!("Shutting down.");
|
println!("Shutting down.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn handle_connection(mut stream: TcpStream) {
|
fn handle_connection(mut stream: TcpStream, iid: u32, conn_pool: Arc<Mutex<ConnectionPool>>) {
|
||||||
// clone TcpStream for simultaneous receiving and sending
|
// clone TcpStream for simultaneous receiving and sending
|
||||||
let mut stream2: TcpStream = stream.try_clone().unwrap();
|
//let mut stream2 = stream.try_clone().unwrap();
|
||||||
|
|
||||||
// initialize reading buffer (for better performance!)
|
// initialize reading buffer (for better performance!)
|
||||||
let buf_reader: BufReader<&mut TcpStream> = BufReader::new(&mut stream);
|
let buf_reader: BufReader<&mut TcpStream> = BufReader::new(&mut stream);
|
||||||
// request: read received text line by line (continuous until connection closed)
|
// request: read received text line by line (continuous until connection closed)
|
||||||
let request: Lines<BufReader<&mut TcpStream>> = buf_reader.lines();
|
let request: Lines<BufReader<&mut TcpStream>> = buf_reader.lines();
|
||||||
|
// let request: Bytes<BufReader<&mut TcpStream>> = buf_reader.bytes();
|
||||||
|
|
||||||
println!("Request: {:#?}", request);
|
println!("Request: {:#?}", request);
|
||||||
|
|
||||||
@ -46,8 +61,10 @@ fn handle_connection(mut stream: TcpStream) {
|
|||||||
let s = elem.unwrap();
|
let s = elem.unwrap();
|
||||||
println!("{:?}", s);
|
println!("{:?}", s);
|
||||||
|
|
||||||
|
conn_pool.lock().unwrap().broadcast(&iid, &s);
|
||||||
|
|
||||||
// send received line back to sender (append LF)
|
// send received line back to sender (append LF)
|
||||||
stream2.write_all(s.as_bytes()).unwrap();
|
//stream2.write_all(s.as_bytes()).unwrap();
|
||||||
stream2.write_all(b"\n").unwrap();
|
//stream2.write_all(b"\n").unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user