Browse Source

async socket

master
Tim Zeuner 2 years ago
parent
commit
b1a8673617
2 changed files with 226 additions and 11 deletions
  1. 173
    10
      Socket/lfr_socket.cpp
  2. 53
    1
      Socket/lfr_socket.h

+ 173
- 10
Socket/lfr_socket.cpp View File

@@ -1,18 +1,181 @@
#include "lfr_socket.h"
#include <boost/regex.hpp>
#include <iostream>
#include <string>

LFR_Socket::LFR_Socket(ExceptionCallback cb): cb(cb)
{
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
throw std::runtime_error("Failed opening the socket");
}

if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) != 0)
{
throw std::runtime_error("Failed setting the socket options");
}

address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(8080);

if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) != 0)
{
throw std::runtime_error("Failed binding a name to the socket");
}

if (listen(server_fd, 3) != 0)
{
throw std::runtime_error("Failed listening for connections");
}
}

LFR_Socket::~LFR_Socket()
{
endLoop();
thread->join();
}

void LFR_Socket::removeListener(LFR_Socket::ListenerKey key)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = std::find_if(std::begin(listeners), std::end(listeners), [&](auto const &val){
return val.first == key;
});
if(it != std::end(listeners))
{
listeners.erase(it);
}
}

void LFR_Socket::addListener(LFR_Socket::ListenerCallback cb, LFR_Socket::ListenerKey key)
{
std::lock_guard<std::mutex> lock(mutex);
listeners.emplace_back(key, std::move(cb));
}

void LFR_Socket::setStop(bool val)
{
std::lock_guard<std::mutex> lock(mutex);
stop = val;
}

void LFR_Socket::createThread()
{
thread = std::make_unique<std::thread>([this](){
bool connected = false;
while(true)
{
LFR_Socket::LFR_Telegram telegram;
bool received = false;
if(!stop && !listeners.empty())
{
try
{
std::lock_guard<std::mutex> lock(mutex);

struct pollfd pollfds[2];
pollfds[0].fd = server_fd;
pollfds[0].events = POLLIN;
pollfds[1].events = POLLIN;
if (poll(pollfds, 2, 200) > 0)
{
if(pollfds[0].revents & POLLIN || pollfds[1].revents & POLLIN)
{
if (!connected && pollfds[0].revents & POLLIN)
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1)
{
throw std::runtime_error("failed accepting connection");
}
else
{
pollfds[1].fd = new_socket;
connected = true;
}
}
else if (pollfds[1].revents & POLLIN)
{
bytes_received = recv(new_socket, buffer, sizeof(buffer), 0);
if (bytes_received > 0)
{
received = true;
std::copy(std::begin(buffer), std::end(buffer), std::begin(telegram));
}
}
}
}
}
catch(std::exception const &ex)
{
if(!cb(ex))
{
//callback returned false -> exception not handled -> exit
exit(EXIT_FAILURE);
}
}

//Invoke the callback method (ListenerPair second -> ListenerCallback)
if (received)
{
for(auto &val : listeners)
{
val.second(telegram);
}
}
}
else
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
});
}

void LFR_Socket::startLoop()
{
if(thread)
{
//Restart thread if it is running
setStop(true);
thread->join();
setStop(false);
}
createThread();
}

void LFR_Socket::endLoop()
{
setStop(true);
}

int main()
{
std::string line;
boost::regex pat( "^Subject: (Re: |Aw: )*(.*)" );
std::mutex mutex;
LFR_Socket socket([&](std::exception const &ex)
{
std::unique_lock<std::mutex> lock(mutex);
std::cerr<<"socket exception:"<<ex.what()<<std::endl;
return false;
});

socket.addListener([&](LFR_Socket::LFR_Telegram telegram)
{
std::unique_lock<std::mutex> lock(mutex);
std::cout << telegram;
}, &mutex);

socket.startLoop();

while (std::cin)
//send(new_socket, "Hello from the server", sizeof("Hello from the server"), 0);
char input;
std::cout << "press q to quit" << std::endl;
std::cin >> input;
std::cout << "cinned" << std::endl;
while (input != 'q')
{
std::getline(std::cin, line);
boost::smatch matches;
if (boost::regex_match(line, matches, pat))
std::cout << matches[2] << std::endl;
std::cin >> input;
std::cout << "cinned" << std::endl;
}
std::cout << "im out" << std::endl;
return 0;
}

+ 53
- 1
Socket/lfr_socket.h View File

@@ -1,3 +1,55 @@
#pragma once

#include <iostream>
#include <iostream>
#include <functional>
#include <vector>
#include <memory>
#include <mutex>
#include <thread>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <string>
#include <algorithm>
#include <exception>

class LFR_Socket
{
public:
using LFR_Telegram = char[1024];
using ListenerKey = void const*;
using ExceptionCallback = std::function<bool(std::exception const &ex)>;
using ListenerCallback = std::function<void(LFR_Telegram)>;
private:
using ListenerPair = std::pair<ListenerKey, ListenerCallback>;
using ListenerVector = std::vector<ListenerPair>;

ListenerVector listeners;
ExceptionCallback cb;
bool stop;
std::unique_ptr<std::thread> thread;
mutable std::mutex mutex;

int server_fd, new_socket, bytes_received;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[1024] = {0};

//void provideOutput(Mat originalImage, Mat processedImage, const FrameData& frameData, const Rect& roi);
void createThread();
void setStop(bool val);

public:
LFR_Socket() = delete;
LFR_Socket(ExceptionCallback cb);
~LFR_Socket();

void startLoop();
void endLoop();
void addListener(ListenerCallback cv, ListenerKey key);
void removeListener(ListenerKey key);
void isStopped() const noexcept;
//const bool interpretMessage(co)
};

Loading…
Cancel
Save