#include "SystemControler.h" #include #include "SystemConfig.h" #include using namespace std::chrono; std::atomic dataReadingCancelled = false; std::atomic updateServerCancelled = false; std::atomic threadRunning = false; std::thread* readDataThread = nullptr; SystemControler::SystemControler() : dataModel(DataModel::Instance()), dataAcquire(std::make_shared()) { this->alertReadInterval = SystemConfig::getIntConfigParameter("alert_read_interval"); this->serverPort = SystemConfig::getIntConfigParameter("tcp_server_port"); initializePermanentStorage(); } SystemControler::~SystemControler() { LOG(INFO) << "Wait until continous modbus reading thread finishes..."; if(readDataThread != nullptr && threadRunning){ dataReadingCancelled = true; if(readDataThread->joinable()) readDataThread->join(); } LOG(INFO) << "Wait until server update thread finishes..."; if(serverUpdateThread != nullptr){ updateServerCancelled = true; if(serverUpdateThread->joinable()) serverUpdateThread->join(); } server.reset(); if(readDataThread != nullptr) delete readDataThread; if(serverUpdateThread != nullptr) delete serverUpdateThread; DataModel::Destroy(); } void updateServer(std::shared_ptr server, std::chrono::milliseconds ms) { auto now = std::chrono::system_clock::now(); //Run as long as server exists and is not stopped while (server != nullptr && !server->isStopped()) { if(updateServerCancelled) return; //Check for alerts auto& alerts = DataModel::Instance()->getAlerts(); while(alerts.size() > 0){ net::Message msg; auto a = alerts.pop_front(); msg.header.id = net::MessageTypes::ServerAlert; short bitmask = 0; std::memcpy(&bitmask, a.readedBytes.data(), 2); std::stringstream tmp; tmp << "Modbus " << a.connection->getConnectionType() << " alert: " << ModbusDataPOC::getStatusMessage(bitmask); msg << tmp.str(); server->MessageAllClients(msg); } //check for messages, specify no max message count //return if no message is available server->Update(-1, false); std::this_thread::sleep_until(now + ms); } return; } void SystemControler::startIpTcpServer() { try { //start server if(server == nullptr) server = std::make_shared(serverPort); if(server->isRunning()){ LOG(WARNING) << "Invoked server start, but server is already running"; return; } server->Start(); //Continous running update thread, that checks for incomming messages serverUpdateThread = new std::thread(updateServer, server, serverTickRateMs); } catch (asio::system_error& e) { LOG(FATAL) << "[SERVER]: Error " << e.code() << " >> " << e.what(); } } /* thread whhich acquires data from all registered publishers ->pointer to dataAcquisition, which handles the readiong requests ->intervalSec: # of seconds */ void enqueueRegisterThread(std::shared_ptr dataAcquisition, SystemControler* controler, const u_int intervalSec) { threadRunning = true; dataReadingCancelled = false; LOG(INFO) << "Started continous data reading"; while (!dataReadingCancelled) { const auto currentTime = system_clock::now(); //read in the data from the publishers dataAcquisition->enqueuePublisherRegister(); controler->evaluate(); for (u_int i = 1; i <= intervalSec; i++) { if (dataReadingCancelled) { LOG(INFO) << "Stopped continous data reading"; threadRunning = false; dataReadingCancelled = false; return; } //Wait for 1 additional second and then check for cancelled thread std::this_thread::sleep_until(currentTime + std::chrono::seconds(1) * i); } } LOG(INFO) << "Stopped continous data reading"; dataReadingCancelled = false; threadRunning = false; } //Invokes continous modbus alert reading void SystemControler::startEnqueuingRegister() { if (threadRunning && !dataReadingCancelled) { LOG(ERROR) << "ILLEGAL STATE: Invoked starting readDataThread while thread is already running (Thread ID: " << readDataThread->get_id() << ")"; return; } //Continue thread if it is already cancelled but still running else if (threadRunning && dataReadingCancelled) { dataReadingCancelled = false; } else { if(readDataThread != nullptr) delete readDataThread; readDataThread = new std::thread(enqueueRegisterThread, dataAcquire, this, alertReadInterval); } } //Cancels continous modbus alert reading void SystemControler::cancelReadingModbusAlerts() { if (dataReadingCancelled){ LOG(INFO) << "Thread is already cancelled, waiting for thread to stop"; } else if (!threadRunning || readDataThread == nullptr){ LOG(ERROR) << "ILLEGAL STATE: Invoke cancelling of readDataThread while thread is not running"; } else{ dataReadingCancelled = true; if(readDataThread->joinable()) readDataThread->join(); } } //Check wether to flush or not, else counts up void SystemControler::flushAfterNData() { dataModel->checkForFlushData(); } //delegate register Publisher void SystemControler::registerPublisher(std::unique_ptr publisher) { dataAcquire->registerPublisher(std::move(publisher)); } void SystemControler::evaluate(){ if(evaluator == nullptr) return; auto alerts = evaluator->evaluate(); for(auto &a : alerts){ net::Message msg; msg.header.id = net::MessageTypes::ServerAlert; msg << a.message; server->MessageAllClients(msg); } } void SystemControler::initializePermanentStorage(){ //register Residual Current (dummy) to perform linear regression //set biased fo ML Algorithm dataModel->makePermanent(ModbusRegister::BENDER_Residual_current, true); evaluator = std::make_unique(); }