123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- #include "ovasCPluginExternalStimulations.h"
-
- #include <boost/interprocess/ipc/message_queue.hpp>
-
- #include <vector>
- #include <ctime>
-
- #include <system/ovCTime.h>
-
- #include "../ovasCSettingsHelper.h"
- #include "../ovasCSettingsHelperOperators.h"
-
- namespace OpenViBE {
- namespace AcquisitionServer {
- namespace Plugins {
-
- CPluginExternalStimulations::CPluginExternalStimulations(const Kernel::IKernelContext& ctx)
- : IAcquisitionServerPlugin(ctx, CString("AcquisitionServer_Plugin_ExternalStimulations")), m_ExternalStimulationsQueueName("openvibeExternalStimulations")
- {
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: ExternalStimulations (deprecated)\n";
-
- m_settings.add("EnableExternalStimulations", &m_IsExternalStimulationsEnabled);
- m_settings.add("ExternalStimulationQueueName", &m_ExternalStimulationsQueueName);
- m_settings.load();
- }
-
- // Hooks
-
-
- bool CPluginExternalStimulations::startHook(const std::vector<CString>& /*selectedChannelNames*/, const size_t /*sampling*/, const size_t /*nChannel*/, const size_t /*nSamplePerSentBlock*/)
- {
- if (m_IsExternalStimulationsEnabled)
- {
- ftime(&m_CTStartTime);
- m_IsESThreadRunning = true;
- m_ESthreadPtr.reset(new std::thread(std::bind(&CPluginExternalStimulations::readExternalStimulations, this)));
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "External stimulations (deprecated) activated...\n";
- }
- m_ExternalStimulations.clear();
-
- m_DebugExternalStimulationsSent = 0;
- m_DebugCurrentReadIPCStimulations = 0;
- m_DebugStimulationsLost = 0;
- m_DebugStimulationsReceivedEarlier = 0;
- m_DebugStimulationsReceivedLate = 0;
- m_DebugStimulationsReceivedWrongSize = 0;
- m_DebugStimulationsBuffered = 0;
-
- return true;
- }
-
- void CPluginExternalStimulations::loopHook(std::deque<std::vector<float>>& /* vPendingBuffer */, CStimulationSet& stimulationSet, const uint64_t start, const uint64_t end, const uint64_t /* sampleTime */)
- {
- if (m_IsExternalStimulationsEnabled)
- {
- //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Checking for external stimulations:" << p << "\n";
- addExternalStimulations(&stimulationSet, m_kernelCtx.getLogManager(), start, end);
- }
- }
-
- void CPluginExternalStimulations::stopHook()
- {
- if (m_IsExternalStimulationsEnabled)
- {
- m_IsESThreadRunning = false;
- if (m_ESthreadPtr) { m_ESthreadPtr->join(); }
- else { m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "Warning: External Stims plugin stopHook() tried to join a NULL thread\n"; }
- }
-
- //software tagging diagnosting
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Total external ones received through IPC: " << m_DebugCurrentReadIPCStimulations << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Sent to Designer: " << m_DebugExternalStimulationsSent << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Lost because of invalid timestamp: " << m_DebugStimulationsLost << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came earlier: " << m_DebugStimulationsReceivedEarlier << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came later: " << m_DebugStimulationsReceivedLate << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that had wrong size: " << m_DebugStimulationsReceivedWrongSize << "\n";
- m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Buffered: " << m_DebugStimulationsBuffered << "\n";
- //end software tagging diagnosting
- }
-
- // Plugin specific methods
-
- void CPluginExternalStimulations::readExternalStimulations()
- {
- using namespace boost::interprocess;
-
- //std::cout << "Creating External Stimulations thread" << std::endl;
- //std::cout << "Queue Name : " << m_ExternalStimulationsQueueName << std::endl;
- //char mq_name[255];
- //std::strcpy(mq_name, m_ExternalStimulationsQueueName.toASCIIString());
- const int chunkLength = 3;
- const int pauseTime = 5;
-
- uint32_t priority;
- size_t recvdSize;
-
- uint64_t chunk[chunkLength];
-
- while (m_IsESThreadRunning)
- {
- bool success;
- try
- {
- //Open a message queue.
- message_queue mq(open_only //only open
- , m_ExternalStimulationsQueueName.toASCIIString() //name
- //,mq_name //name
- );
-
- success = mq.try_receive(&chunk, sizeof(chunk), recvdSize, priority);
- }
- catch (interprocess_exception& /* ex */)
- {
- //m_IsESThreadRunning = false;
- //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with message queue in external stimulations:" << ex.what() << "\n";
- System::Time::sleep(pauseTime);
- continue;
- }
-
- if (!success)
- {
- System::Time::sleep(pauseTime);
- continue;
- }
-
- m_DebugCurrentReadIPCStimulations++;
-
- if (recvdSize != sizeof(chunk))
- {
- //m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with type of received data when reqding external stimulation!\n";
- m_DebugStimulationsReceivedWrongSize++;
- }
- else
- {
- //m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "received\n";
-
- SExternalStimulation stim;
-
- stim.identifier = chunk[1];
- const uint64_t receivedTime = chunk[2];
-
- //1. calculate time
- const uint64_t ctStartTimeMs = (m_CTStartTime.time * 1000 + m_CTStartTime.millitm);
-
- const int64_t timeTest = receivedTime - ctStartTimeMs;
-
- if (timeTest < 0)
- {
- m_DebugStimulationsLost++;
- //m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "AS: external stimulation time is invalid, probably stimulation is before reference point, total invalid so far: " << m_FlashesLost << "\n";
- System::Time::sleep(pauseTime);
- continue; //we skip this stimulation
- }
- //2. Convert to OpenVibe time
- const uint64_t ctEventTime = receivedTime - ctStartTimeMs;
-
- const double time = double(ctEventTime) / double(1000);
-
- const uint64_t ovTime = CTime(time).time();
- stim.timestamp = ovTime;
-
- //3. Store, the main thread will process it
- {
- //lock
- std::lock_guard<std::mutex> lock(m_es_mutex);
-
- m_ExternalStimulations.push_back(stim);
- m_DebugStimulationsBuffered++;
- m_esAvailable.notify_one();
- //unlock
- }
-
- System::Time::sleep(pauseTime);
- }
- }
- }
-
- void CPluginExternalStimulations::addExternalStimulations(CStimulationSet* ss, Kernel::ILogManager& /*logm*/, const uint64_t start, const uint64_t /*end*/)
- {
- const uint64_t durationMs = 40;
- {
- //lock
- std::lock_guard<std::mutex> lock(m_es_mutex);
-
- for (auto i = m_ExternalStimulations.begin(); i != m_ExternalStimulations.end(); ++i)
- {
- // if time is current or any time in the future - send it (AS will buffer it)
- if (i->timestamp >= start)
- {
- //flashes_in_this_time_chunk++;
- //logm << Kernel::LogLevel_Error << "Stimulation added." << "\n";
- ss->appendStimulation(i->identifier, i->timestamp, durationMs);
- }
- else
- {
- //the stimulation is coming too late - after the current block being processed
- //we correct the timestamp to the current block and we send it
- m_DebugStimulationsReceivedLate++;
- ss->appendStimulation(i->identifier, start, durationMs);
- }
- m_DebugExternalStimulationsSent++;
- }
-
- // Since we processed all stimulations, we can clear the queue
- m_ExternalStimulations.clear();
-
- m_esAvailable.notify_one();
- //unlock
- }
- }
-
- bool CPluginExternalStimulations::setExternalStimulationsEnabled(const bool active)
- {
- m_IsExternalStimulationsEnabled = active;
- return true;
- }
-
- } // namespace Plugins
- } // namespace AcquisitionServer
- } // namespace OpenViBE
|