123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- #include "DataAcquisition.h"
- #include "easylogging++.h"
- #include <iomanip>
- #include <condition_variable>
- #include <mutex>
-
- std::atomic_bool modbusThreadRunning = false;
- std::atomic_bool modbusThreadCancelled = false;
-
- //Modbus worker thread, lock and notification variable
- std::condition_variable cvar_modbus_queue;
- std::mutex mux_modbus_queue;
-
- DataAcquisition::DataAcquisition() : dataModel(DataModel::Instance())
- {
- }
-
- DataAcquisition::~DataAcquisition()
- {
- LOG(INFO) << "Wait until modbus worker queue finishes...";
- modbusThreadCancelled = true;
- cvar_modbus_queue.notify_one();
- if(modbusThread.size() != 0 && modbusThread[0].joinable()){
- modbusThread[0].join();
- modbusThread.clear();
- }
- }
-
-
- //Modbus thread
- //Execute the enqueued modbus register requests
- void modbusWorkerThread(DataAcquisition* this_p) {
- LOG(INFO) << "Modbus Worker Thread started";
- auto& queue = this_p->getModbusQueue();
- auto& publishers = this_p->getPublishers();
- bool first = true;
- try {
- //vector of connections which failed to open
- std::vector<u_int> failedConnections;
-
- while (1) {
-
- if (modbusThreadCancelled)
- return;
-
-
- //If this is the first execution or the thread has
- //been notified after being in sleep mode
- if (queue.size() == 0 || first){
- first = false;
- failedConnections.clear();
- //No work to do, wait for something to be enqueued
- std::unique_lock<std::mutex> threadLock(mux_modbus_queue);
-
- //Wait till notification (happens when some registers are enqueued
- cvar_modbus_queue.wait(threadLock);
-
- if (modbusThreadCancelled)
- return;
-
- //open all of the registered connections
- for(unsigned int i = 0; i < publishers.size(); i++){
- if(!publishers[i]->open()){
- //write id to list if connection failed
- failedConnections.push_back(publishers[i]->getID());
- }
- }
-
- }
-
- if (modbusThreadCancelled)
- return;
-
- //Get next parameter from the queue
- ParameterSpecification modbusParameter = queue.pop_front();
-
- //Look if the connection is in the list of the failed connections, if so , skip this parameter
- if(std::find(failedConnections.begin(), failedConnections.end(), modbusParameter.connection->getID()) != failedConnections.end()){
- continue;
- }
-
- //Skip parameter if not a reading parameter or connection is not open
- if(!modbusParameter.isReadable())
- continue;
-
- switch (modbusParameter.length) {
- case 1:
- modbusParameter.connection->readBit(modbusParameter);
- break;
- case 8:
- modbusParameter.connection->readByte(modbusParameter);
- break;
- case 16:
- modbusParameter.connection->readRegister(modbusParameter);
- break;
- case 32:
- modbusParameter.connection->readDRegister(modbusParameter);
- break;
- case 64:
- modbusParameter.connection->readQRegister(modbusParameter);
- break;
- default:
- modbusParameter.connection->readBits(modbusParameter);
- }
-
- if(modbusParameter.error){
- //Rading of Modbus Parameter was not successful
- LOG(WARNING) << std::dec
- << "[Modbus " << modbusParameter.connection->getID() << ": " << modbusParameter.connection->getConnectionType() << "] "
- << "Failed reading parameter "<< (int)modbusParameter.description
- << " at 0x"
- << std::hex << std::setfill('0') << std::setw(4)
- << modbusParameter.address;
- }
- else{
-
- //LOG(INFO) << std::dec
- // << "[Modbus " << modbusParameter.connection->getConnectionType() << " "<<
- // modbusParameter.connection->getID() << "]" << " Readed param " << (int)modbusParameter.description
- // << " value: " << value;
-
- DataModel::Instance()->saveModbusParameter(std::move(modbusParameter));
-
- }
-
- }
-
- }
- catch (std::exception& e) {
- LOG(FATAL) << "Error in modbus access, shutting down modbus thread: " << e.what();
- }
- modbusThreadRunning = false;
- }
-
-
-
-
- void DataAcquisition::startModbusThread() {
- if (modbusThreadRunning == false)
- {
- modbusThreadRunning = true;
- modbusThread.push_back(std::thread(modbusWorkerThread, this));
- }
- }
-
-
- //Registers publisher and moves ownership to DataAcquisition class
- void DataAcquisition::registerPublisher(std::unique_ptr<PublisherInterface> publisher) {
- publishers.push_back(std::move(publisher));
- }
-
- void DataAcquisition::enqueuePublisherRegister(){
- for (size_t i = 0; i < publishers.size(); i++) {
- //Collects the alert modbus registers and enqueue them
- publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Alert);
- }
- for (size_t i = 0; i < publishers.size(); i++) {
- //Collects the condition modbus registers and enqueue them
- publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Condition);
- }
- cvar_modbus_queue.notify_one();
- }
|