Digitalisierte Elektroverteilung zur permanenten Verbraucherüberwachung
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DataAcquisition.cpp 5.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #include "DataAcquisition.h"
  2. #include "easylogging++.h"
  3. #include <iomanip>
  4. #include <condition_variable>
  5. #include <mutex>
  6. std::atomic_bool modbusThreadRunning = false;
  7. std::atomic_bool modbusThreadCancelled = false;
  8. //Modbus worker thread, lock and notification variable
  9. std::condition_variable cvar_modbus_queue;
  10. std::mutex mux_modbus_queue;
  11. DataAcquisition::DataAcquisition() : dataModel(DataModel::Instance())
  12. {
  13. }
  14. DataAcquisition::~DataAcquisition()
  15. {
  16. LOG(INFO) << "Wait until modbus worker queue finishes...";
  17. modbusThreadCancelled = true;
  18. cvar_modbus_queue.notify_one();
  19. if(modbusThread.size() != 0 && modbusThread[0].joinable()){
  20. modbusThread[0].join();
  21. modbusThread.clear();
  22. }
  23. }
  24. //Modbus thread
  25. //Execute the enqueued modbus register requests
  26. void modbusWorkerThread(DataAcquisition* this_p) {
  27. LOG(INFO) << "Modbus Worker Thread started";
  28. auto& queue = this_p->getModbusQueue();
  29. auto& publishers = this_p->getPublishers();
  30. bool first = true;
  31. try {
  32. //vector of connections which failed to open
  33. std::vector<u_int> failedConnections;
  34. while (1) {
  35. if (modbusThreadCancelled)
  36. return;
  37. //If this is the first execution or the thread has
  38. //been notified after being in sleep mode
  39. if (queue.size() == 0 || first){
  40. first = false;
  41. failedConnections.clear();
  42. //No work to do, wait for something to be enqueued
  43. std::unique_lock<std::mutex> threadLock(mux_modbus_queue);
  44. //Wait till notification (happens when some registers are enqueued
  45. cvar_modbus_queue.wait(threadLock);
  46. if (modbusThreadCancelled)
  47. return;
  48. //open all of the registered connections
  49. for(unsigned int i = 0; i < publishers.size(); i++){
  50. if(!publishers[i]->open()){
  51. //write id to list if connection failed
  52. failedConnections.push_back(publishers[i]->getID());
  53. }
  54. }
  55. }
  56. if (modbusThreadCancelled)
  57. return;
  58. //Get next parameter from the queue
  59. ParameterSpecification modbusParameter = queue.pop_front();
  60. //Look if the connection is in the list of the failed connections, if so , skip this parameter
  61. if(std::find(failedConnections.begin(), failedConnections.end(), modbusParameter.connection->getID()) != failedConnections.end()){
  62. continue;
  63. }
  64. //Skip parameter if not a reading parameter or connection is not open
  65. if(!modbusParameter.isReadable())
  66. continue;
  67. switch (modbusParameter.length) {
  68. case 1:
  69. modbusParameter.connection->readBit(modbusParameter);
  70. break;
  71. case 8:
  72. modbusParameter.connection->readByte(modbusParameter);
  73. break;
  74. case 16:
  75. modbusParameter.connection->readRegister(modbusParameter);
  76. break;
  77. case 32:
  78. modbusParameter.connection->readDRegister(modbusParameter);
  79. break;
  80. case 64:
  81. modbusParameter.connection->readQRegister(modbusParameter);
  82. break;
  83. default:
  84. modbusParameter.connection->readBits(modbusParameter);
  85. }
  86. if(modbusParameter.error){
  87. //Rading of Modbus Parameter was not successful
  88. LOG(WARNING) << std::dec
  89. << "[Modbus " << modbusParameter.connection->getID() << ": " << modbusParameter.connection->getConnectionType() << "] "
  90. << "Failed reading parameter "<< (int)modbusParameter.description
  91. << " at 0x"
  92. << std::hex << std::setfill('0') << std::setw(4)
  93. << modbusParameter.address;
  94. }
  95. else{
  96. //LOG(INFO) << std::dec
  97. // << "[Modbus " << modbusParameter.connection->getConnectionType() << " "<<
  98. // modbusParameter.connection->getID() << "]" << " Readed param " << (int)modbusParameter.description
  99. // << " value: " << value;
  100. DataModel::Instance()->saveModbusParameter(std::move(modbusParameter));
  101. }
  102. }
  103. }
  104. catch (std::exception& e) {
  105. LOG(FATAL) << "Error in modbus access, shutting down modbus thread: " << e.what();
  106. }
  107. modbusThreadRunning = false;
  108. }
  109. void DataAcquisition::startModbusThread() {
  110. if (modbusThreadRunning == false)
  111. {
  112. modbusThreadRunning = true;
  113. modbusThread.push_back(std::thread(modbusWorkerThread, this));
  114. }
  115. }
  116. //Registers publisher and moves ownership to DataAcquisition class
  117. void DataAcquisition::registerPublisher(std::unique_ptr<PublisherInterface> publisher) {
  118. publishers.push_back(std::move(publisher));
  119. }
  120. void DataAcquisition::enqueuePublisherRegister(){
  121. for (size_t i = 0; i < publishers.size(); i++) {
  122. //Collects the alert modbus registers and enqueue them
  123. publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Alert);
  124. }
  125. for (size_t i = 0; i < publishers.size(); i++) {
  126. //Collects the condition modbus registers and enqueue them
  127. publishers[i]->enqueueReadingRegisters(modbusAccessQueue, Category::Condition);
  128. }
  129. cvar_modbus_queue.notify_one();
  130. }