#include "ovasCPluginExternalStimulations.h" #include #include #include #include #include "../ovasCSettingsHelper.h" #include "../ovasCSettingsHelperOperators.h" namespace OpenViBE { namespace AcquisitionServer { namespace Plugins { CPluginExternalStimulations::CPluginExternalStimulations(const Kernel::IKernelContext& ctx) : IAcquisitionServerPlugin(ctx, CString("AcquisitionServer_Plugin_ExternalStimulations")), m_ExternalStimulationsQueueName("openvibeExternalStimulations") { m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: ExternalStimulations (deprecated)\n"; m_settings.add("EnableExternalStimulations", &m_IsExternalStimulationsEnabled); m_settings.add("ExternalStimulationQueueName", &m_ExternalStimulationsQueueName); m_settings.load(); } // Hooks bool CPluginExternalStimulations::startHook(const std::vector& /*selectedChannelNames*/, const size_t /*sampling*/, const size_t /*nChannel*/, const size_t /*nSamplePerSentBlock*/) { if (m_IsExternalStimulationsEnabled) { ftime(&m_CTStartTime); m_IsESThreadRunning = true; m_ESthreadPtr.reset(new std::thread(std::bind(&CPluginExternalStimulations::readExternalStimulations, this))); m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "External stimulations (deprecated) activated...\n"; } m_ExternalStimulations.clear(); m_DebugExternalStimulationsSent = 0; m_DebugCurrentReadIPCStimulations = 0; m_DebugStimulationsLost = 0; m_DebugStimulationsReceivedEarlier = 0; m_DebugStimulationsReceivedLate = 0; m_DebugStimulationsReceivedWrongSize = 0; m_DebugStimulationsBuffered = 0; return true; } void CPluginExternalStimulations::loopHook(std::deque>& /* vPendingBuffer */, CStimulationSet& stimulationSet, const uint64_t start, const uint64_t end, const uint64_t /* sampleTime */) { if (m_IsExternalStimulationsEnabled) { //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Checking for external stimulations:" << p << "\n"; addExternalStimulations(&stimulationSet, m_kernelCtx.getLogManager(), start, end); } } void CPluginExternalStimulations::stopHook() { if (m_IsExternalStimulationsEnabled) { m_IsESThreadRunning = false; if (m_ESthreadPtr) { m_ESthreadPtr->join(); } else { m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "Warning: External Stims plugin stopHook() tried to join a NULL thread\n"; } } //software tagging diagnosting m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Total external ones received through IPC: " << m_DebugCurrentReadIPCStimulations << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Sent to Designer: " << m_DebugExternalStimulationsSent << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Lost because of invalid timestamp: " << m_DebugStimulationsLost << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came earlier: " << m_DebugStimulationsReceivedEarlier << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came later: " << m_DebugStimulationsReceivedLate << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that had wrong size: " << m_DebugStimulationsReceivedWrongSize << "\n"; m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Buffered: " << m_DebugStimulationsBuffered << "\n"; //end software tagging diagnosting } // Plugin specific methods void CPluginExternalStimulations::readExternalStimulations() { using namespace boost::interprocess; //std::cout << "Creating External Stimulations thread" << std::endl; //std::cout << "Queue Name : " << m_ExternalStimulationsQueueName << std::endl; //char mq_name[255]; //std::strcpy(mq_name, m_ExternalStimulationsQueueName.toASCIIString()); const int chunkLength = 3; const int pauseTime = 5; uint32_t priority; size_t recvdSize; uint64_t chunk[chunkLength]; while (m_IsESThreadRunning) { bool success; try { //Open a message queue. message_queue mq(open_only //only open , m_ExternalStimulationsQueueName.toASCIIString() //name //,mq_name //name ); success = mq.try_receive(&chunk, sizeof(chunk), recvdSize, priority); } catch (interprocess_exception& /* ex */) { //m_IsESThreadRunning = false; //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with message queue in external stimulations:" << ex.what() << "\n"; System::Time::sleep(pauseTime); continue; } if (!success) { System::Time::sleep(pauseTime); continue; } m_DebugCurrentReadIPCStimulations++; if (recvdSize != sizeof(chunk)) { //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with type of received data when reqding external stimulation!\n"; m_DebugStimulationsReceivedWrongSize++; } else { //m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "received\n"; SExternalStimulation stim; stim.identifier = chunk[1]; const uint64_t receivedTime = chunk[2]; //1. calculate time const uint64_t ctStartTimeMs = (m_CTStartTime.time * 1000 + m_CTStartTime.millitm); const int64_t timeTest = receivedTime - ctStartTimeMs; if (timeTest < 0) { m_DebugStimulationsLost++; //m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "AS: external stimulation time is invalid, probably stimulation is before reference point, total invalid so far: " << m_FlashesLost << "\n"; System::Time::sleep(pauseTime); continue; //we skip this stimulation } //2. Convert to OpenVibe time const uint64_t ctEventTime = receivedTime - ctStartTimeMs; const double time = double(ctEventTime) / double(1000); const uint64_t ovTime = CTime(time).time(); stim.timestamp = ovTime; //3. Store, the main thread will process it { //lock std::lock_guard lock(m_es_mutex); m_ExternalStimulations.push_back(stim); m_DebugStimulationsBuffered++; m_esAvailable.notify_one(); //unlock } System::Time::sleep(pauseTime); } } } void CPluginExternalStimulations::addExternalStimulations(CStimulationSet* ss, Kernel::ILogManager& /*logm*/, const uint64_t start, const uint64_t /*end*/) { const uint64_t durationMs = 40; { //lock std::lock_guard lock(m_es_mutex); for (auto i = m_ExternalStimulations.begin(); i != m_ExternalStimulations.end(); ++i) { // if time is current or any time in the future - send it (AS will buffer it) if (i->timestamp >= start) { //flashes_in_this_time_chunk++; //logm << Kernel::LogLevel_Error << "Stimulation added." << "\n"; ss->appendStimulation(i->identifier, i->timestamp, durationMs); } else { //the stimulation is coming too late - after the current block being processed //we correct the timestamp to the current block and we send it m_DebugStimulationsReceivedLate++; ss->appendStimulation(i->identifier, start, durationMs); } m_DebugExternalStimulationsSent++; } // Since we processed all stimulations, we can clear the queue m_ExternalStimulations.clear(); m_esAvailable.notify_one(); //unlock } } bool CPluginExternalStimulations::setExternalStimulationsEnabled(const bool active) { m_IsExternalStimulationsEnabled = active; return true; } } // namespace Plugins } // namespace AcquisitionServer } // namespace OpenViBE