123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- /*********************************************************************
- * Software License Agreement (AGPL-3 License)
- *
- * OpenViBE SDK Test Software
- * Based on OpenViBE V1.1.0, Copyright (C) Inria, 2006-2015
- * Copyright (C) Inria, 2015-2017,V1.0
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.
- * If not, see <http://www.gnu.org/licenses/>.
- */
-
- #include <string>
- #include <cstring>
- #include <thread>
- #include <condition_variable>
- #include <vector>
-
- #include "socket/IConnection.h"
- #include "socket/IConnectionClient.h"
- #include "socket/IConnectionServer.h"
-
- #include "ovtAssert.h"
-
- namespace {
- std::condition_variable gServerStartedCondVar;
- std::mutex gServerStartedMutex;
- std::vector<std::string> gReceivedData;
- bool gServerStarted = false;
-
- // server callback run from a child thread
- void onServerListening(const int port, const size_t expectedPacketCount)
- {
- gReceivedData.clear();
-
- Socket::IConnection* clientConnection = nullptr;
-
- // create server
- Socket::IConnectionServer* server = Socket::createConnectionServer();
- server->listen(port);
-
- // keep the scope braces here, as it ensures mutex is released
- {
- std::lock_guard<std::mutex> lockOnServerStart(gServerStartedMutex);
- gServerStarted = true;
- }
-
- // notify main thread that the server is created so that it can connect a single client
- gServerStartedCondVar.notify_one();
-
- // loop until all packet are received
- while (gReceivedData.size() < expectedPacketCount)
- {
- if (server->isReadyToReceive()) { clientConnection = server->accept(); }
-
- if (clientConnection && clientConnection->isReadyToReceive())
- {
- size_t dataSize = 0;
- size_t bytesToReceive = sizeof(dataSize);
- size_t bytesReceived = 0;
- char dataBuffer[32];
-
- // first receive data size
- while (bytesReceived < bytesToReceive) { bytesReceived += clientConnection->receiveBuffer(&dataSize, bytesToReceive - bytesReceived); }
-
- // then receive data
- bytesToReceive = dataSize;
- bytesReceived = 0;
-
- while (bytesReceived < bytesToReceive) { bytesReceived += clientConnection->receiveBuffer(dataBuffer, bytesToReceive - bytesReceived); }
-
- gReceivedData.push_back(std::string(dataBuffer, dataSize));
- }
- }
-
- server->release();
- }
-
- void sendData(Socket::IConnectionClient* client, void* data, const size_t size)
- {
- const size_t bytesToSend = size;
- size_t bytesSent = 0;
-
- while (bytesSent < bytesToSend) { bytesSent += client->sendBuffer(data, bytesToSend - bytesSent); }
- }
- } // namespace
-
- int uoSocketClientServerASyncCommunicationTest(int argc, char* argv[])
- {
- OVT_ASSERT(argc == 4, "Failure to retrieve tests arguments. Expecting: server_name port_number packet_count");
-
- const std::string serverName = argv[1];
- int portNumber = std::atoi(argv[2]);
- size_t packetCount = size_t(std::atoi(argv[3]));
-
- // test asynchronous data transmission from a single client to server:
- // - launch a server on a background thread
- // - connect a single client
- // - make client sending data
- // - marke server receiving and storing data
- // - join the thread and do the assertions on received data when no data race is possible
-
- // create client
- Socket::IConnectionClient* client = Socket::createConnectionClient();
-
- // launch server on background thread
- std::thread serverThread(onServerListening, portNumber, packetCount);
-
- // wait until the server is started to connect clients
- std::unique_lock<std::mutex> lock(gServerStartedMutex);
- gServerStartedCondVar.wait(lock, []() { return gServerStarted; });
-
- client->connect(serverName.c_str(), portNumber);
-
- // transmit data
- // transmission follows the protocol: data size transmission + data transmission
- const std::string baseData = "Data packet index: ";
-
- for (size_t sendIndex = 0; sendIndex < packetCount; ++sendIndex)
- {
- std::string tmp = baseData + std::to_string(sendIndex);
- size_t size = tmp.size();
- sendData(client, &size, sizeof(size));
- sendData(client, const_cast<char*>(tmp.c_str()), size);
- }
-
- serverThread.join(); // wait until the end of the thread
-
- // release resources
- client->close();
- client->release();
-
- // do the assertion on the main thread
- OVT_ASSERT(gReceivedData.size() == packetCount, "Failure to retrieve packet count");
-
- for (size_t receivedIndex = 0; receivedIndex < packetCount; ++receivedIndex)
- {
- OVT_ASSERT_STREQ(gReceivedData[receivedIndex], (baseData + std::to_string(receivedIndex)), "Failure to retrieve packet");
- }
-
- return EXIT_SUCCESS;
- }
|