#include "DataModel.h" #include #include #include #include "SystemConfig.h" #include #include #include "PublisherPowercenter.h" #include "PublisherBenderRcm.h" std::chrono::milliseconds timeoutMS = std::chrono::milliseconds(10'000); long DataModel::permanentParamHistory = 0; int DataModel::narrowBlock = 0; //Initialization of static member std::unique_ptr DataModel::instance = nullptr; //Read file locker std::timed_mutex accessFilesMTX; constexpr unsigned long long DataModel::timepoint_to_sec_long(const std::chrono::system_clock::time_point& t) { return duration_to_sec_long(t.time_since_epoch()); } const ts_map &DataModel::getPublisherData() const { return temporaryStorage; } CappedStorage* DataModel::getPermanentData(ModbusRegister reg){ auto it = permanentStorage.find(reg); if(it != permanentStorage.end()){ return &(it->second); } else return nullptr; } constexpr unsigned long long DataModel::duration_to_sec_long(const std::chrono::system_clock::duration& t) { return std::chrono::duration_cast(t).count(); } inline void DataModel::Create() { instance = std::make_unique(); permanentParamHistory = SystemConfig::getIntConfigParameter("permanent_param_history"); narrowBlock = SystemConfig::getIntConfigParameter("narrow_block"); } DataModel::~DataModel() { LOG(INFO) << "Save locally stored permanent data to permanent storage..."; //Save permanent data to storage long long time = std::chrono::system_clock::now().time_since_epoch().count(); std::fstream file; std::stringstream ss; ss << dataDir << "data_" << time; std::string filename = ss.str(); file.open(filename, std::ios::out); if (file.is_open()) { for(auto &storage : permanentStorage){ if(storage.second.size() == 0) continue; file << (int)storage.first << ":" << std::endl; file << storage.second << std::endl; } file.close(); } else LOG(ERROR) << "Could not access file to store permanent data..."; } std::unique_ptr& DataModel::Instance() { if (!instance) Create(); return instance; } void DataModel::Destroy() { if (instance) instance.reset(); } void DataModel::makePermanent(ModbusRegister reg, bool biased) { permanentStorage.emplace(reg, CappedStorage(biased)); } void DataModel::saveModbusParameter(ParameterSpecification param) { if(param.cat == Category::Alert){ short bitmask = 0; std::memcpy(&bitmask, param.readedBytes.data(), param.length); //Found an alert here if(bitmask != 0) LOG(WARNING) << "Received an alert from Modbus " << param.connection->getConnectionType() << ", ID " << param.connection->getID(); alerts.push(param); return; } //Temporary storage of last value temporaryStorage.emplaceOrOverwrite(param.description, SavedData(param.cat, param.readedBytes)); //If parameter is listet to be stored permanently auto res = permanentStorage.find(param.description); if(res != permanentStorage.end()){ checkForFlushData(); res->second.store(param.readedBytes); } } void DataModel::checkForFlushData() { while(1){ auto permStorageIter = permanentStorage.begin(); auto storageIter = permanentStorage.find(permStorageIter->first); auto& X = storageIter->second.getX(); auto& y = storageIter->second.getY(); //Rescale Matrix if data exceeds time limit storageIter->second.lock(); if(X.size() > 1 && (X(0,1) <= (X(X.rows()-1, 1) - DataModel::permanentParamHistory))){ if(X.size() <= narrowBlock) return; LOG(INFO) << "Shrink permanent storage of parameter " << std::dec << (int)permStorageIter->first; std::stringstream ss; ss << (int)storageIter->first << "(X);"; Eigen::Matrix temp(X.rows()-narrowBlock, X.cols()); temp = X.block(narrowBlock, 0, X.rows() - narrowBlock, X.cols()); //backup capped data to flush to file for(int i = 0; i < narrowBlock-1; i++){ ss << std::fixed << std::setprecision(0) << X(i, X.cols()-1) << ";"; } X.resize(X.rows() - narrowBlock, X.cols()); X = std::move(temp); ss << std::fixed << std::setprecision(0) << std::endl << (int)storageIter->first << "(y);"; temp.resize(y.rows()-narrowBlock, 1); temp = y.block(narrowBlock, 0, y.rows() - narrowBlock, 1); //backup capped data to flush to file for(int i = 0; i < narrowBlock-1; i++){ ss << std::fixed << std::setprecision(5) << y(i) << ";"; } ss << std::endl; y.resize(y.rows() - narrowBlock, 1); y = std::move(temp); flush(ss); } storageIter->second.unlock(); if((++permStorageIter) == permanentStorage.end()) break; } } bool DataModel::flush(std::stringstream& ss) { //Data file lock condition if (!accessFilesMTX.try_lock_for(timeoutMS)) { LOG(ERROR) << "Timeout after waiting " << (long long)timeoutMS.count() << " for reading data files"; return false; } LOG(INFO) << "Flush data to local file"; std::fstream file; auto now = std::chrono::system_clock::now(); std::stringstream filename; filename << dataDir << "data_" << now.time_since_epoch().count(); file.open(filename.str(), std::ios_base::out); if (file.is_open()) { file << ss.str(); file.close(); } else LOG(ERROR) << "Could not open file for writing"; accessFilesMTX.unlock(); return true; } //reads in the log file to the supplied stringstream //define fromTime to get only logs older than fromTime (seconds after epoch) uintmax_t DataModel::readLogFile(std::stringstream& ss, long long fromTime) { el::Loggers::flushAll(); uintmax_t sizeRead = 0; std::filesystem::path p{ logDir }; std::ifstream file; std::string lineBuffer; std::filesystem::directory_iterator iterator(p); for (auto& currentFile : std::filesystem::directory_iterator(p)) if (currentFile.is_regular_file()) { if (logFileName.compare(currentFile.path().filename().string()) == 0) { file.open(currentFile.path()); if (file.is_open()) { sizeRead += currentFile.file_size(); bool foundTimePoint = false; while (std::getline(file, lineBuffer)) { if (lineBuffer.size() == 0) continue; if (fromTime != 0 && foundTimePoint == false) { std::stringstream temp; temp << lineBuffer; std::tm tm{}; temp >> std::get_time(&tm, dateFormatLogger.c_str()); auto timePoint = std::chrono::system_clock::from_time_t(std::mktime(&tm)); if (timePoint.time_since_epoch().count() >= fromTime) { foundTimePoint = true; } } else ss << lineBuffer << std::endl; } file.close(); LOG(INFO) << "Readed log file"; } else LOG(WARNING) << "Couldn't open LOG file for writing"; break; } } //If size read is 0, no tile at specified index found or file was empty return sizeRead; } uintmax_t DataModel::readAllDataFiles(std::stringstream& ss) { uintmax_t sizeRead = 0; if (!accessFilesMTX.try_lock_for(timeoutMS)) { LOG(ERROR) << "Timeout after waiting " << (long long)timeoutMS.count() << " for reading data files - blocked by another thread"; return 0; } std::filesystem::path p{ dataDir }; std::ifstream file; std::string lineBuffer; std::filesystem::directory_iterator iterator(p); for (auto& currentFile : std::filesystem::directory_iterator(p)){ if (currentFile.is_regular_file() && std::regex_match(currentFile.path().filename().string(), regexPatternFile)) { //look for a valid file with the specified index file.open(currentFile.path()); if (file.is_open()) { sizeRead += currentFile.file_size(); while (std::getline(file, lineBuffer)) { ss << lineBuffer << std::endl; } file.close(); std::filesystem::remove(currentFile.path()); } } } accessFilesMTX.unlock(); //If size read is 0, no tile at specified index found or file was empty return sizeRead; } std::stringstream& DataModel::readPermanentData(std::stringstream& buffer, bool retain){ for(auto &e: permanentStorage){ buffer << (int)e.first << ":" << std::endl; buffer << e.second; } if(!retain) permanentStorage.clear(); return buffer; } ts_queue& DataModel::getAlerts() { return alerts; } std::stringstream& DataModel::readTemporaryData(std::stringstream& buffer){ for(auto &e: temporaryStorage){ buffer << (int)e.first << ":"; buffer << e.second; } return buffer; } //olderThan: files which are older than this specified time are deleted unsigned long DataModel::removeStoredData(seconds olderThan) { using namespace std::filesystem; auto timeNow = duration_cast(system_clock::now().time_since_epoch()); u_int filesDeleted = 0; for (auto& file : directory_iterator(path("data/"))) { if (file.is_regular_file()) { if (std::regex_match(file.path().stem().string(), regexPatternFile)) { std::string str = file.path().stem().string(); str = str.substr(str.find_first_of("0123456789", 0)); //time of file in seconds after 01/01/1970 long long timeOfFile = 0; //if (auto [p, ec] = std::from_chars(str.data(), str.data() + str.length(), timeOfFile); ec == std::errc()) timeOfFile = std::stoll(str, 0); if ((timeOfFile + olderThan.count()) < timeNow.count()) if(remove(file)) filesDeleted++; } } } LOG(INFO) << "Deleted data files (" << filesDeleted << ") that were older than " << (long long)olderThan.count() <<"seconds on local storage"; return filesDeleted; } //remove every file that are stored locally unsigned long DataModel::removeStoredData() { using namespace std::filesystem; create_directory("data/"); unsigned long filesDeleted = 0; try{ for (auto& file : directory_iterator(path("data/"))) { if (file.is_regular_file()) { if (std::regex_match(file.path().stem().string(), regexPatternFile)) { std::string str = file.path().stem().string(); str = str.substr(str.find_first_of("0123456789", 0)); filesDeleted++; } } } } catch(std::exception& e){ LOG(ERROR) << "Can't access data directory"; } LOG(INFO) << "Deleted all data files (" << filesDeleted << ") on local storage"; return filesDeleted; } std::ostream& operator<<(std::ostream& os, const PublisherType type) { switch (type) { case PublisherType::RCMS_BENDER: os << "Bender RCMS"; break; case PublisherType::POWERCENTER: os << "Siemens Powercenter"; break; default: os << ""; break; } return os; } std::ostream& operator<<(std::ostream& os, const SavedData& savedData){ float value = 0; std::memcpy(&value, savedData.data.data(), 4); os << value << std::endl; return os; }