#include "DataAcquisition.h" #include "easylogging++.h" #include #include #include std::atomic_bool modbusThreadRunning = false; std::atomic_bool modbusThreadCancelled = false; //Modbus worker thread, lock and notification variable std::condition_variable cvar_modbus_queue; std::mutex mux_modbus_queue; DataAcquisition::DataAcquisition() : dataModel(DataModel::Instance()) { } DataAcquisition::~DataAcquisition() { LOG(INFO) << "Wait until modbus worker queue finishes..."; modbusThreadCancelled = true; cvar_modbus_queue.notify_one(); if(modbusThread.size() != 0 && modbusThread[0].joinable()){ modbusThread[0].join(); modbusThread.clear(); } } //Modbus thread //Execute the enqueued modbus register requests void modbusWorkerThread(DataAcquisition* this_p) { LOG(INFO) << "Modbus Worker Thread started"; auto& queue = this_p->getModbusQueue(); auto& publishers = this_p->getPublishers(); bool first = true; try { //vector of connections which failed to open std::vector failedConnections; while (1) { if (modbusThreadCancelled) return; //If this is the first execution or the thread has //been notified after being in sleep mode if (queue.size() == 0 || first){ first = false; failedConnections.clear(); //No work to do, wait for something to be enqueued std::unique_lock threadLock(mux_modbus_queue); //Wait till notification (happens when some registers are enqueued cvar_modbus_queue.wait(threadLock); if (modbusThreadCancelled) return; //open all of the registered connections for(unsigned int i = 0; i < publishers.size(); i++){ if(!publishers[i]->open()){ //write id to list if connection failed failedConnections.push_back(publishers[i]->getID()); } } } if (modbusThreadCancelled) return; //Get next parameter from the queue ParameterSpecification modbusParameter = queue.pop_front(); //Look if the connection is in the list of the failed connections, if so , skip this parameter if(std::find(failedConnections.begin(), failedConnections.end(), modbusParameter.connection->getID()) != failedConnections.end()){ continue; } //Skip parameter if not a reading parameter or connection is not open if(!modbusParameter.isReadable()) continue; switch (modbusParameter.length) { case 1: modbusParameter.connection->readBit(modbusParameter); break; case 8: modbusParameter.connection->readByte(modbusParameter); break; case 16: modbusParameter.connection->readRegister(modbusParameter); break; case 32: modbusParameter.connection->readDRegister(modbusParameter); break; case 64: modbusParameter.connection->readQRegister(modbusParameter); break; default: modbusParameter.connection->readBits(modbusParameter); } if(modbusParameter.error){ //Rading of Modbus Parameter was not successful LOG(WARNING) << std::dec << "[Modbus " << modbusParameter.connection->getID() << ": " << modbusParameter.connection->getConnectionType() << "] " << "Failed reading parameter "<< (int)modbusParameter.description << " at 0x" << std::hex << std::setfill('0') << std::setw(4) << modbusParameter.address; } else{ //LOG(INFO) << std::dec // << "[Modbus " << modbusParameter.connection->getConnectionType() << " "<< // modbusParameter.connection->getID() << "]" << " Readed param " << (int)modbusParameter.description // << " value: " << value; DataModel::Instance()->saveModbusParameter(std::move(modbusParameter)); } } } catch (std::exception& e) { LOG(FATAL) << "Error in modbus access, shutting down modbus thread: " << e.what(); } modbusThreadRunning = false; } void DataAcquisition::startModbusThread() { if (modbusThreadRunning == false) { modbusThreadRunning = true; modbusThread.push_back(std::thread(modbusWorkerThread, this)); } } //Registers publisher and moves ownership to DataAcquisition class void DataAcquisition::registerPublisher(std::unique_ptr publisher) { publishers.push_back(std::move(publisher)); } void DataAcquisition::enqueuePublisherRegister(){ for (size_t i = 0; i < publishers.size(); i++) { //Collects the alert modbus registers and enqueue them publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Alert); } for (size_t i = 0; i < publishers.size(); i++) { //Collects the condition modbus registers and enqueue them publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Condition); } cvar_modbus_queue.notify_one(); }