Digitalisierte Elektroverteilung zur permanenten Verbraucherüberwachung
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DataModel.cpp 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #include "DataModel.h"
  2. #include <easylogging++.h>
  3. #include <fstream>
  4. #include <filesystem>
  5. #include "SystemConfig.h"
  6. #include <charconv>
  7. #include <mutex>
  8. #include "PublisherPowercenter.h"
  9. #include "PublisherBenderRcm.h"
  10. std::chrono::milliseconds timeoutMS = std::chrono::milliseconds(10'000);
  11. long DataModel::permanentParamHistory = 0;
  12. int DataModel::narrowBlock = 0;
  13. //Initialization of static member
  14. std::unique_ptr<DataModel> DataModel::instance = nullptr;
  15. //Read file locker
  16. std::timed_mutex accessFilesMTX;
  17. constexpr unsigned long long DataModel::timepoint_to_sec_long(const std::chrono::system_clock::time_point& t) {
  18. return duration_to_sec_long(t.time_since_epoch());
  19. }
  20. const ts_map<ModbusRegister, SavedData> &DataModel::getPublisherData() const
  21. {
  22. return temporaryStorage;
  23. }
  24. CappedStorage* DataModel::getPermanentData(ModbusRegister reg){
  25. auto it = permanentStorage.find(reg);
  26. if(it != permanentStorage.end()){
  27. return &(it->second);
  28. }
  29. else
  30. return nullptr;
  31. }
  32. constexpr unsigned long long DataModel::duration_to_sec_long(const std::chrono::system_clock::duration& t) {
  33. return std::chrono::duration_cast<std::chrono::seconds>(t).count();
  34. }
  35. inline void DataModel::Create() {
  36. instance = std::make_unique<DataModel>();
  37. permanentParamHistory = SystemConfig::getIntConfigParameter("permanent_param_history");
  38. narrowBlock = SystemConfig::getIntConfigParameter("narrow_block");
  39. }
  40. DataModel::~DataModel()
  41. {
  42. LOG(INFO) << "Save locally stored permanent data to permanent storage...";
  43. //Save permanent data to storage
  44. long long time = std::chrono::system_clock::now().time_since_epoch().count();
  45. std::fstream file;
  46. std::stringstream ss;
  47. ss << dataDir << "data_" << time;
  48. std::string filename = ss.str();
  49. file.open(filename, std::ios::out);
  50. if (file.is_open()) {
  51. for(auto &storage : permanentStorage){
  52. if(storage.second.size() == 0)
  53. continue;
  54. file << (int)storage.first << ":" << std::endl;
  55. file << storage.second << std::endl;
  56. }
  57. file.close();
  58. }
  59. else
  60. LOG(ERROR) << "Could not access file to store permanent data...";
  61. }
  62. std::unique_ptr<DataModel>& DataModel::Instance() {
  63. if (!instance)
  64. Create();
  65. return instance;
  66. }
  67. void DataModel::Destroy() {
  68. if (instance)
  69. instance.reset();
  70. }
  71. void DataModel::makePermanent(ModbusRegister reg, bool biased)
  72. {
  73. permanentStorage.emplace(reg, CappedStorage(biased));
  74. }
  75. void DataModel::saveModbusParameter(ParameterSpecification param)
  76. {
  77. if(param.cat == Category::Alert){
  78. short bitmask = 0;
  79. std::memcpy(&bitmask, param.readedBytes.data(), param.length);
  80. //Found an alert here
  81. if(bitmask != 0)
  82. LOG(WARNING) << "Received an alert from Modbus " << param.connection->getConnectionType() << ", ID " << param.connection->getID();
  83. alerts.push(param);
  84. return;
  85. }
  86. //Temporary storage of last value
  87. temporaryStorage.emplaceOrOverwrite(param.description, SavedData(param.cat, param.readedBytes));
  88. //If parameter is listet to be stored permanently
  89. auto res = permanentStorage.find(param.description);
  90. if(res != permanentStorage.end()){
  91. checkForFlushData();
  92. res->second.store(param.readedBytes);
  93. }
  94. }
  95. void DataModel::checkForFlushData() {
  96. while(1){
  97. auto permStorageIter = permanentStorage.begin();
  98. auto storageIter = permanentStorage.find(permStorageIter->first);
  99. auto& X = storageIter->second.getX();
  100. auto& y = storageIter->second.getY();
  101. //Rescale Matrix if data exceeds time limit
  102. storageIter->second.lock();
  103. if(X.size() > 1 && (X(0,1) <= (X(X.rows()-1, 1) - DataModel::permanentParamHistory))){
  104. if(X.size() <= narrowBlock)
  105. return;
  106. LOG(INFO) << "Shrink permanent storage of parameter " << std::dec << (int)permStorageIter->first;
  107. std::stringstream ss;
  108. ss << (int)storageIter->first << "(X);";
  109. Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> temp(X.rows()-narrowBlock, X.cols());
  110. temp = X.block(narrowBlock, 0, X.rows() - narrowBlock, X.cols());
  111. //backup capped data to flush to file
  112. for(int i = 0; i < narrowBlock-1; i++){
  113. ss << std::fixed << std::setprecision(0) << X(i, X.cols()-1) << ";";
  114. }
  115. X.resize(X.rows() - narrowBlock, X.cols());
  116. X = std::move(temp);
  117. ss << std::fixed << std::setprecision(0) << std::endl << (int)storageIter->first << "(y);";
  118. temp.resize(y.rows()-narrowBlock, 1);
  119. temp = y.block(narrowBlock, 0, y.rows() - narrowBlock, 1);
  120. //backup capped data to flush to file
  121. for(int i = 0; i < narrowBlock-1; i++){
  122. ss << std::fixed << std::setprecision(5) << y(i) << ";";
  123. }
  124. ss << std::endl;
  125. y.resize(y.rows() - narrowBlock, 1);
  126. y = std::move(temp);
  127. flush(ss);
  128. }
  129. storageIter->second.unlock();
  130. if((++permStorageIter) == permanentStorage.end())
  131. break;
  132. }
  133. }
  134. bool DataModel::flush(std::stringstream& ss)
  135. {
  136. //Data file lock condition
  137. if (!accessFilesMTX.try_lock_for(timeoutMS)) {
  138. LOG(ERROR) << "Timeout after waiting " << (long long)timeoutMS.count() << " for reading data files";
  139. return false;
  140. }
  141. LOG(INFO) << "Flush data to local file";
  142. std::fstream file;
  143. auto now = std::chrono::system_clock::now();
  144. std::stringstream filename;
  145. filename << dataDir << "data_" << now.time_since_epoch().count();
  146. file.open(filename.str(), std::ios_base::out);
  147. if (file.is_open()) {
  148. file << ss.str();
  149. file.close();
  150. }
  151. else
  152. LOG(ERROR) << "Could not open file for writing";
  153. accessFilesMTX.unlock();
  154. return true;
  155. }
  156. //reads in the log file to the supplied stringstream
  157. //define fromTime to get only logs older than fromTime (seconds after epoch)
  158. uintmax_t DataModel::readLogFile(std::stringstream& ss, long long fromTime) {
  159. el::Loggers::flushAll();
  160. uintmax_t sizeRead = 0;
  161. std::filesystem::path p{ logDir };
  162. std::ifstream file;
  163. std::string lineBuffer;
  164. std::filesystem::directory_iterator iterator(p);
  165. for (auto& currentFile : std::filesystem::directory_iterator(p))
  166. if (currentFile.is_regular_file()) {
  167. if (logFileName.compare(currentFile.path().filename().string()) == 0) {
  168. file.open(currentFile.path());
  169. if (file.is_open()) {
  170. sizeRead += currentFile.file_size();
  171. bool foundTimePoint = false;
  172. while (std::getline(file, lineBuffer)) {
  173. if (lineBuffer.size() == 0)
  174. continue;
  175. if (fromTime != 0 && foundTimePoint == false) {
  176. std::stringstream temp;
  177. temp << lineBuffer;
  178. std::tm tm{};
  179. temp >> std::get_time(&tm, dateFormatLogger.c_str());
  180. auto timePoint = std::chrono::system_clock::from_time_t(std::mktime(&tm));
  181. if (timePoint.time_since_epoch().count() >= fromTime) {
  182. foundTimePoint = true;
  183. }
  184. }
  185. else
  186. ss << lineBuffer << std::endl;
  187. }
  188. file.close();
  189. LOG(INFO) << "Readed log file";
  190. }
  191. else
  192. LOG(WARNING) << "Couldn't open LOG file for writing";
  193. break;
  194. }
  195. }
  196. //If size read is 0, no tile at specified index found or file was empty
  197. return sizeRead;
  198. }
  199. uintmax_t DataModel::readAllDataFiles(std::stringstream& ss) {
  200. uintmax_t sizeRead = 0;
  201. if (!accessFilesMTX.try_lock_for(timeoutMS)) {
  202. LOG(ERROR) << "Timeout after waiting " << (long long)timeoutMS.count() << " for reading data files - blocked by another thread";
  203. return 0;
  204. }
  205. std::filesystem::path p{ dataDir };
  206. std::ifstream file;
  207. std::string lineBuffer;
  208. std::filesystem::directory_iterator iterator(p);
  209. for (auto& currentFile : std::filesystem::directory_iterator(p)){
  210. if (currentFile.is_regular_file() && std::regex_match(currentFile.path().filename().string(), regexPatternFile)) {
  211. //look for a valid file with the specified index
  212. file.open(currentFile.path());
  213. if (file.is_open()) {
  214. sizeRead += currentFile.file_size();
  215. while (std::getline(file, lineBuffer)) {
  216. ss << lineBuffer << std::endl;
  217. }
  218. file.close();
  219. std::filesystem::remove(currentFile.path());
  220. }
  221. }
  222. }
  223. accessFilesMTX.unlock();
  224. //If size read is 0, no tile at specified index found or file was empty
  225. return sizeRead;
  226. }
  227. std::stringstream& DataModel::readPermanentData(std::stringstream& buffer, bool retain){
  228. for(auto &e: permanentStorage){
  229. buffer << (int)e.first << ":" << std::endl;
  230. buffer << e.second;
  231. }
  232. if(!retain)
  233. permanentStorage.clear();
  234. return buffer;
  235. }
  236. ts_queue<ParameterSpecification>& DataModel::getAlerts()
  237. {
  238. return alerts;
  239. }
  240. std::stringstream& DataModel::readTemporaryData(std::stringstream& buffer){
  241. for(auto &e: temporaryStorage){
  242. buffer << (int)e.first << ":";
  243. buffer << e.second;
  244. }
  245. return buffer;
  246. }
  247. //olderThan: files which are older than this specified time are deleted
  248. unsigned long DataModel::removeStoredData(seconds olderThan) {
  249. using namespace std::filesystem;
  250. auto timeNow = duration_cast<seconds>(system_clock::now().time_since_epoch());
  251. u_int filesDeleted = 0;
  252. for (auto& file : directory_iterator(path("data/"))) {
  253. if (file.is_regular_file()) {
  254. if (std::regex_match(file.path().stem().string(), regexPatternFile)) {
  255. std::string str = file.path().stem().string();
  256. str = str.substr(str.find_first_of("0123456789", 0));
  257. //time of file in seconds after 01/01/1970
  258. long long timeOfFile = 0;
  259. //if (auto [p, ec] = std::from_chars(str.data(), str.data() + str.length(), timeOfFile); ec == std::errc())
  260. timeOfFile = std::stoll(str, 0);
  261. if ((timeOfFile + olderThan.count()) < timeNow.count())
  262. if(remove(file))
  263. filesDeleted++;
  264. }
  265. }
  266. }
  267. LOG(INFO) << "Deleted data files (" << filesDeleted << ") that were older than " << (long long)olderThan.count() <<"seconds on local storage";
  268. return filesDeleted;
  269. }
  270. //remove every file that are stored locally
  271. unsigned long DataModel::removeStoredData() {
  272. using namespace std::filesystem;
  273. create_directory("data/");
  274. unsigned long filesDeleted = 0;
  275. try{
  276. for (auto& file : directory_iterator(path("data/"))) {
  277. if (file.is_regular_file()) {
  278. if (std::regex_match(file.path().stem().string(), regexPatternFile)) {
  279. std::string str = file.path().stem().string();
  280. str = str.substr(str.find_first_of("0123456789", 0));
  281. filesDeleted++;
  282. }
  283. }
  284. }
  285. }
  286. catch(std::exception& e){
  287. LOG(ERROR) << "Can't access data directory";
  288. }
  289. LOG(INFO) << "Deleted all data files (" << filesDeleted << ") on local storage";
  290. return filesDeleted;
  291. }
  292. std::ostream& operator<<(std::ostream& os, const PublisherType type) {
  293. switch (type) {
  294. case PublisherType::RCMS_BENDER:
  295. os << "Bender RCMS";
  296. break;
  297. case PublisherType::POWERCENTER:
  298. os << "Siemens Powercenter";
  299. break;
  300. default:
  301. os << "<Unknown Publisher Type>";
  302. break;
  303. }
  304. return os;
  305. }
  306. std::ostream& operator<<(std::ostream& os, const SavedData& savedData){
  307. float value = 0;
  308. std::memcpy(&value, savedData.data.data(), 4);
  309. os << value << std::endl;
  310. return os;
  311. }