/********************************************************************* * Software License Agreement (AGPL-3 License) * * OpenViBE SDK Test Software * Based on OpenViBE V1.1.0, Copyright (C) Inria, 2006-2015 * Copyright (C) Inria, 2015-2017,V1.0 * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. * If not, see . */ #include #include #include #include #include #include "socket/IConnection.h" #include "socket/IConnectionClient.h" #include "socket/IConnectionServer.h" #include "ovtAssert.h" namespace { std::condition_variable gServerStartedCondVar; std::mutex gServerStartedMutex; std::vector gReceivedData; bool gServerStarted = false; // server callback run from a child thread void onServerListening(const int port, const size_t expectedPacketCount) { gReceivedData.clear(); Socket::IConnection* clientConnection = nullptr; // create server Socket::IConnectionServer* server = Socket::createConnectionServer(); server->listen(port); // keep the scope braces here, as it ensures mutex is released { std::lock_guard lockOnServerStart(gServerStartedMutex); gServerStarted = true; } // notify main thread that the server is created so that it can connect a single client gServerStartedCondVar.notify_one(); // loop until all packet are received while (gReceivedData.size() < expectedPacketCount) { if (server->isReadyToReceive()) { clientConnection = server->accept(); } if (clientConnection && clientConnection->isReadyToReceive()) { size_t dataSize = 0; size_t bytesToReceive = sizeof(dataSize); size_t bytesReceived = 0; char dataBuffer[32]; // first receive data size while (bytesReceived < bytesToReceive) { bytesReceived += clientConnection->receiveBuffer(&dataSize, bytesToReceive - bytesReceived); } // then receive data bytesToReceive = dataSize; bytesReceived = 0; while (bytesReceived < bytesToReceive) { bytesReceived += clientConnection->receiveBuffer(dataBuffer, bytesToReceive - bytesReceived); } gReceivedData.push_back(std::string(dataBuffer, dataSize)); } } server->release(); } void sendData(Socket::IConnectionClient* client, void* data, const size_t size) { const size_t bytesToSend = size; size_t bytesSent = 0; while (bytesSent < bytesToSend) { bytesSent += client->sendBuffer(data, bytesToSend - bytesSent); } } } // namespace int uoSocketClientServerASyncCommunicationTest(int argc, char* argv[]) { OVT_ASSERT(argc == 4, "Failure to retrieve tests arguments. Expecting: server_name port_number packet_count"); const std::string serverName = argv[1]; int portNumber = std::atoi(argv[2]); size_t packetCount = size_t(std::atoi(argv[3])); // test asynchronous data transmission from a single client to server: // - launch a server on a background thread // - connect a single client // - make client sending data // - marke server receiving and storing data // - join the thread and do the assertions on received data when no data race is possible // create client Socket::IConnectionClient* client = Socket::createConnectionClient(); // launch server on background thread std::thread serverThread(onServerListening, portNumber, packetCount); // wait until the server is started to connect clients std::unique_lock lock(gServerStartedMutex); gServerStartedCondVar.wait(lock, []() { return gServerStarted; }); client->connect(serverName.c_str(), portNumber); // transmit data // transmission follows the protocol: data size transmission + data transmission const std::string baseData = "Data packet index: "; for (size_t sendIndex = 0; sendIndex < packetCount; ++sendIndex) { std::string tmp = baseData + std::to_string(sendIndex); size_t size = tmp.size(); sendData(client, &size, sizeof(size)); sendData(client, const_cast(tmp.c_str()), size); } serverThread.join(); // wait until the end of the thread // release resources client->close(); client->release(); // do the assertion on the main thread OVT_ASSERT(gReceivedData.size() == packetCount, "Failure to retrieve packet count"); for (size_t receivedIndex = 0; receivedIndex < packetCount; ++receivedIndex) { OVT_ASSERT_STREQ(gReceivedData[receivedIndex], (baseData + std::to_string(receivedIndex)), "Failure to retrieve packet"); } return EXIT_SUCCESS; }