123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- #include "lfr_socket.h"
-
- LFR_Socket::LFR_Socket(ExceptionCallback cb): cb(cb)
- {
- bufferIterator = std::begin(buffer);
- if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
- {
- throw std::runtime_error("Failed opening the socket");
- }
-
- if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) != 0)
- {
- throw std::runtime_error("Failed setting the socket options");
- }
-
- address.sin_family = AF_INET;
- address.sin_addr.s_addr = INADDR_ANY;
- address.sin_port = htons(8080);
-
- if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) != 0)
- {
- throw std::runtime_error("Failed binding a name to the socket");
- }
-
- if (listen(server_fd, 0) != 0)
- {
- throw std::runtime_error("Failed listening for connections");
- }
- }
-
- LFR_Socket::~LFR_Socket()
- {
- endLoop();
- if (thread)
- {
- thread->join();
- }
- }
-
- void LFR_Socket::removeListener(LFR_Socket::ListenerKey key)
- {
- std::lock_guard<std::mutex> lock(mutex);
- auto it = std::find_if(std::begin(listeners), std::end(listeners), [&](auto const &val){
- return val.first == key;
- });
- if(it != std::end(listeners))
- {
- listeners.erase(it);
- }
- }
-
- void LFR_Socket::addListener(LFR_Socket::ListenerCallback cb, LFR_Socket::ListenerKey key)
- {
- std::lock_guard<std::mutex> lock(mutex);
- listeners.emplace_back(key, std::move(cb));
- }
-
- void LFR_Socket::setStop(bool val)
- {
- std::lock_guard<std::mutex> lock(mutex);
- stop = val;
- }
-
- void LFR_Socket::createThread()
- {
- thread = std::make_unique<std::thread>([this](){
- bool connected = false;
- while(true)
- {
- LFR_Socket::LFR_Telegram telegram = {};
- bool received = false;
- if(!stop && !listeners.empty())
- {
- try
- {
- std::lock_guard<std::mutex> lock(mutex);
-
- struct pollfd pollfds[2];
- pollfds[0].fd = server_fd;
- pollfds[0].events = POLLIN;
- pollfds[1].events = POLLIN;
-
- if (poll(pollfds, 2, 200) > 0)
- {
- if(pollfds[0].revents & POLLIN || pollfds[1].revents & POLLIN)
- {
- if (!connected && pollfds[0].revents & POLLIN)
- {
- if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1)
- {
- throw std::runtime_error("failed accepting connection");
- }
- else
- {
- std::cout << "accepted connection" << std::endl;
- pollfds[1].fd = new_socket;
- connected = true;
- }
- }
- else if (connected && pollfds[0].revents & POLLIN)
- {
- std::cout << "second connection incoming" << std::endl;
- int tmp_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
- close(tmp_socket);
- std::cout << "second connection closed" << std::endl;
- }
- else if (pollfds[1].revents & POLLIN)
- {
- LFR_Telegram tmpBuffer = {0};
- bytesReceived = recv(new_socket, tmpBuffer, sizeof(tmpBuffer), 0);
- if (bytesReceived > 0)
- {
- if(bufferIterator - std::begin(buffer) + bytesReceived > 1024)
- {
- //Too long for buffer. Clear it.
- std::fill(std::begin(buffer), std::end(buffer), 0);
- bufferIterator = std::begin(buffer);
- }
- bufferIterator = std::copy_if(
- std::begin(tmpBuffer),
- std::begin(tmpBuffer)+bytesReceived,
- bufferIterator,
- [](char c){
- return(c != '\n' && c != '\r');
- });
-
- //std::cout << "bytes received: " << bytesReceived << std::endl;
- //received = true;
- }
- else if(bytesReceived == 0)
- {
- connected = false;
- close(pollfds[1].fd);
- std::cout << "connection closed by peer" << std::endl;
- }
- }
- }
- }
- }
- catch(std::exception const &ex)
- {
- if(!cb(ex))
- {
- //callback returned false -> exception not handled -> exit
- exit(EXIT_FAILURE);
- }
- }
-
- // Search the buffer for the start of a telegram
- char *telegramStart = NULL;
- telegramStart = strstr(buffer, "aa");
- if(telegramStart && telegramStart != std::begin(buffer)) {
- //Move the content of the buffer
- auto delta = bufferIterator - telegramStart;
- std::copy(telegramStart, std::end(buffer), std::begin(buffer));
- bufferIterator = std::begin(buffer) + delta;
- }
-
- // Search the buffer for the end of a telegram
- char *telegramEnd = NULL;
- telegramEnd = strstr(buffer, "zz");
- if(telegramEnd && telegramStart && telegramEnd-telegramStart>2) {
- std::copy(telegramStart+2, telegramEnd, telegram);
- received = true;
- std::copy(telegramEnd+2, std::end(buffer), std::begin(buffer));
- bufferIterator = std::begin(buffer);
- }
- else if(telegramEnd && telegramStart && telegramEnd-telegramStart<=2)
- {
- //Got an empty telegram
- std::copy(telegramEnd, std::end(buffer), std::begin(buffer));
- bufferIterator = std::begin(buffer);
- }
-
- //Invoke the callback method (ListenerPair second -> ListenerCallback)
- if (received)
- {
- for(auto &val : listeners)
- {
- val.second(telegram);
- }
- }
- }
- else
- {
- break;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(150));
- }
- });
- }
-
- void LFR_Socket::startLoop()
- {
- if(thread)
- {
- //Restart thread if it is running
- setStop(true);
- thread->join();
- setStop(false);
- }
- createThread();
- }
-
- void LFR_Socket::endLoop()
- {
- setStop(true);
- }
|