Digitalisierte Elektroverteilung zur permanenten Verbraucherüberwachung
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

SystemControler.cpp 6.2KB


  1. #include "SystemControler.h"
  2. #include <easylogging++.h>
  3. #include "SystemConfig.h"
  4. #include <optional>
  5. using namespace std::chrono;
  6. std::atomic<bool> dataReadingCancelled = false;
  7. std::atomic<bool> updateServerCancelled = false;
  8. std::atomic<bool> threadRunning = false;
  9. std::thread* readDataThread = nullptr;
  10. SystemControler::SystemControler() :
  11. dataModel(DataModel::Instance()), dataAcquire(std::make_shared<DataAcquisition>())
  12. {
  13. this->alertReadInterval = SystemConfig::getIntConfigParameter("alert_read_interval");
  14. this->serverPort = SystemConfig::getIntConfigParameter("tcp_server_port");
  15. initializePermanentStorage();
  16. }
  17. SystemControler::~SystemControler()
  18. {
  19. LOG(INFO) << "Wait until continous modbus reading thread finishes...";
  20. if(readDataThread != nullptr && threadRunning){
  21. dataReadingCancelled = true;
  22. if(readDataThread->joinable())
  23. readDataThread->join();
  24. }
  25. LOG(INFO) << "Wait until server update thread finishes...";
  26. if(serverUpdateThread != nullptr){
  27. updateServerCancelled = true;
  28. if(serverUpdateThread->joinable())
  29. serverUpdateThread->join();
  30. }
  31. server.reset();
  32. if(readDataThread != nullptr)
  33. delete readDataThread;
  34. if(serverUpdateThread != nullptr)
  35. delete serverUpdateThread;
  36. DataModel::Destroy();
  37. }
  38. void updateServer(std::shared_ptr<NetServer> server, std::chrono::milliseconds ms) {
  39. auto now = std::chrono::system_clock::now();
  40. //Run as long as server exists and is not stopped
  41. while (server != nullptr && !server->isStopped()) {
  42. if(updateServerCancelled)
  43. return;
  44. //Check for alerts
  45. auto& alerts = DataModel::Instance()->getAlerts();
  46. while(alerts.size() > 0){
  47. net::Message<net::MessageTypes> msg;
  48. auto a = alerts.pop_front();
  49. msg.header.id = net::MessageTypes::ServerAlert;
  50. short bitmask = 0;
  51. std::memcpy(&bitmask, a.readedBytes.data(), 2);
  52. std::stringstream tmp;
  53. tmp << "Modbus " << a.connection->getConnectionType() << " alert: " << ModbusDataPOC::getStatusMessage(bitmask);
  54. msg << tmp.str();
  55. server->MessageAllClients(msg);
  56. }
  57. //check for messages, specify no max message count
  58. //return if no message is available
  59. server->Update(-1, false);
  60. std::this_thread::sleep_until(now + ms);
  61. }
  62. return;
  63. }
  64. void SystemControler::startIpTcpServer() {
  65. try {
  66. //start server
  67. if(server == nullptr)
  68. server = std::make_shared<NetServer>(serverPort);
  69. if(server->isRunning()){
  70. LOG(WARNING) << "Invoked server start, but server is already running";
  71. return;
  72. }
  73. server->Start();
  74. //Continous running update thread, that checks for incomming messages
  75. serverUpdateThread = new std::thread(updateServer, server, serverTickRateMs);
  76. }
  77. catch (asio::system_error& e) {
  78. LOG(FATAL) << "[SERVER]: Error "
  79. << e.code() << " >> "
  80. << e.what();
  81. }
  82. }
  83. /*
  84. thread whhich acquires data from all registered publishers
  85. ->pointer to dataAcquisition, which handles the readiong requests
  86. ->intervalSec: # of seconds
  87. */
  88. void enqueueRegisterThread(std::shared_ptr<DataAcquisition> dataAcquisition, SystemControler* controler, const u_int intervalSec)
  89. {
  90. threadRunning = true;
  91. dataReadingCancelled = false;
  92. LOG(INFO) << "Started continous data reading";
  93. while (!dataReadingCancelled) {
  94. const auto currentTime = system_clock::now();
  95. //read in the data from the publishers
  96. dataAcquisition->enqueuePublisherRegister();
  97. controler->evaluate();
  98. for (u_int i = 1; i <= intervalSec; i++)
  99. {
  100. if (dataReadingCancelled) {
  101. LOG(INFO) << "Stopped continous data reading";
  102. threadRunning = false;
  103. dataReadingCancelled = false;
  104. return;
  105. }
  106. //Wait for 1 additional second and then check for cancelled thread
  107. std::this_thread::sleep_until(currentTime + std::chrono::seconds(1) * i);
  108. }
  109. }
  110. LOG(INFO) << "Stopped continous data reading";
  111. dataReadingCancelled = false;
  112. threadRunning = false;
  113. }
  114. //Invokes continous modbus alert reading
  115. void SystemControler::startEnqueuingRegister()
  116. {
  117. if (threadRunning && !dataReadingCancelled) {
  118. LOG(ERROR) << "ILLEGAL STATE: Invoked starting readDataThread while thread is already running (Thread ID: " << readDataThread->get_id() << ")";
  119. return;
  120. }
  121. //Continue thread if it is already cancelled but still running
  122. else if (threadRunning && dataReadingCancelled) {
  123. dataReadingCancelled = false;
  124. }
  125. else {
  126. if(readDataThread != nullptr)
  127. delete readDataThread;
  128. readDataThread = new std::thread(enqueueRegisterThread, dataAcquire, this, alertReadInterval);
  129. }
  130. }
  131. //Cancels continous modbus alert reading
  132. void SystemControler::cancelReadingModbusAlerts()
  133. {
  134. if (dataReadingCancelled){
  135. LOG(INFO) << "Thread is already cancelled, waiting for thread to stop";
  136. }
  137. else if (!threadRunning || readDataThread == nullptr){
  138. LOG(ERROR) << "ILLEGAL STATE: Invoke cancelling of readDataThread while thread is not running";
  139. }
  140. else{
  141. dataReadingCancelled = true;
  142. if(readDataThread->joinable())
  143. readDataThread->join();
  144. }
  145. }
  146. //Check wether to flush or not, else counts up
  147. void SystemControler::flushAfterNData()
  148. {
  149. dataModel->checkForFlushData();
  150. }
  151. //delegate register Publisher
  152. void SystemControler::registerPublisher(std::unique_ptr<PublisherInterface> publisher)
  153. {
  154. dataAcquire->registerPublisher(std::move(publisher));
  155. }
  156. void SystemControler::evaluate(){
  157. if(evaluator == nullptr)
  158. return;
  159. auto alerts = evaluator->evaluate();
  160. for(auto &a : alerts){
  161. net::Message<net::MessageTypes> msg;
  162. msg.header.id = net::MessageTypes::ServerAlert;
  163. msg << a.message;
  164. server->MessageAllClients(msg);
  165. }
  166. }
  167. void SystemControler::initializePermanentStorage(){
  168. //register Residual Current (dummy) to perform linear regression
  169. //set biased fo ML Algorithm
  170. dataModel->makePermanent(ModbusRegister::BENDER_Residual_current, true);
  171. evaluator = std::make_unique<Evaluator>();
  172. }