#include "lfr_socket.h" LFR_Socket::LFR_Socket(ExceptionCallback cb): cb(cb) { if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { throw std::runtime_error("Failed opening the socket"); } if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) != 0) { throw std::runtime_error("Failed setting the socket options"); } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) != 0) { throw std::runtime_error("Failed binding a name to the socket"); } if (listen(server_fd, 0) != 0) { throw std::runtime_error("Failed listening for connections"); } } LFR_Socket::~LFR_Socket() { endLoop(); if (thread) { thread->join(); } } void LFR_Socket::removeListener(LFR_Socket::ListenerKey key) { std::lock_guard lock(mutex); auto it = std::find_if(std::begin(listeners), std::end(listeners), [&](auto const &val){ return val.first == key; }); if(it != std::end(listeners)) { listeners.erase(it); } } void LFR_Socket::addListener(LFR_Socket::ListenerCallback cb, LFR_Socket::ListenerKey key) { std::lock_guard lock(mutex); listeners.emplace_back(key, std::move(cb)); } void LFR_Socket::setStop(bool val) { std::lock_guard lock(mutex); stop = val; } void LFR_Socket::createThread() { thread = std::make_unique([this](){ bool connected = false; while(true) { LFR_Socket::LFR_Telegram telegram; bool received = false; if(!stop && !listeners.empty()) { try { std::lock_guard lock(mutex); struct pollfd pollfds[2]; pollfds[0].fd = server_fd; pollfds[0].events = POLLIN; pollfds[1].events = POLLIN; if (poll(pollfds, 2, 200) > 0) { if(pollfds[0].revents & POLLIN || pollfds[1].revents & POLLIN) { if (!connected && pollfds[0].revents & POLLIN) { if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1) { throw std::runtime_error("failed accepting connection"); } else { //std::cout << "accepted connection" << std::endl; pollfds[1].fd = new_socket; connected = true; } } else if (connected && pollfds[0].revents & POLLIN) { //std::cout << "second connection incoming" << std::endl; int tmp_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen); close(tmp_socket); //std::cout << "second connection closed" << std::endl; } else if (pollfds[1].revents & POLLIN) { bytes_received = recv(new_socket, buffer, sizeof(buffer), 0); if (bytes_received > 0) { received = true; std::copy(std::begin(buffer), std::end(buffer), std::begin(telegram)); //std::cout << "message received" << std::endl; } else if(bytes_received == 0) { connected = false; close(pollfds[1].fd); //std::cout << "connection closed by peer" << std::endl; } } } } } catch(std::exception const &ex) { if(!cb(ex)) { //callback returned false -> exception not handled -> exit exit(EXIT_FAILURE); } } //Invoke the callback method (ListenerPair second -> ListenerCallback) if (received) { for(auto &val : listeners) { val.second(telegram); } } } else { break; } std::this_thread::sleep_for(std::chrono::milliseconds(150)); } }); } void LFR_Socket::startLoop() { if(thread) { //Restart thread if it is running setStop(true); thread->join(); setStop(false); } createThread(); } void LFR_Socket::endLoop() { setStop(true); }