221 lines
7.4 KiB
C++
Raw Permalink Normal View History

2021-10-14 13:47:35 +02:00
#include "ovasCPluginExternalStimulations.h"
#include <boost/interprocess/ipc/message_queue.hpp>
#include <vector>
#include <ctime>
#include <system/ovCTime.h>
#include "../ovasCSettingsHelper.h"
#include "../ovasCSettingsHelperOperators.h"
namespace OpenViBE {
namespace AcquisitionServer {
namespace Plugins {
CPluginExternalStimulations::CPluginExternalStimulations(const Kernel::IKernelContext& ctx)
: IAcquisitionServerPlugin(ctx, CString("AcquisitionServer_Plugin_ExternalStimulations")), m_ExternalStimulationsQueueName("openvibeExternalStimulations")
{
m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "Loading plugin: ExternalStimulations (deprecated)\n";
m_settings.add("EnableExternalStimulations", &m_IsExternalStimulationsEnabled);
m_settings.add("ExternalStimulationQueueName", &m_ExternalStimulationsQueueName);
m_settings.load();
}
// Hooks
bool CPluginExternalStimulations::startHook(const std::vector<CString>& /*selectedChannelNames*/, const size_t /*sampling*/, const size_t /*nChannel*/, const size_t /*nSamplePerSentBlock*/)
{
if (m_IsExternalStimulationsEnabled)
{
ftime(&m_CTStartTime);
m_IsESThreadRunning = true;
m_ESthreadPtr.reset(new std::thread(std::bind(&CPluginExternalStimulations::readExternalStimulations, this)));
m_kernelCtx.getLogManager() << Kernel::LogLevel_Info << "External stimulations (deprecated) activated...\n";
}
m_ExternalStimulations.clear();
m_DebugExternalStimulationsSent = 0;
m_DebugCurrentReadIPCStimulations = 0;
m_DebugStimulationsLost = 0;
m_DebugStimulationsReceivedEarlier = 0;
m_DebugStimulationsReceivedLate = 0;
m_DebugStimulationsReceivedWrongSize = 0;
m_DebugStimulationsBuffered = 0;
return true;
}
void CPluginExternalStimulations::loopHook(std::deque<std::vector<float>>& /* vPendingBuffer */, CStimulationSet& stimulationSet, const uint64_t start, const uint64_t end, const uint64_t /* sampleTime */)
{
if (m_IsExternalStimulationsEnabled)
{
//m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Checking for external stimulations:" << p << "\n";
addExternalStimulations(&stimulationSet, m_kernelCtx.getLogManager(), start, end);
}
}
void CPluginExternalStimulations::stopHook()
{
if (m_IsExternalStimulationsEnabled)
{
m_IsESThreadRunning = false;
if (m_ESthreadPtr) { m_ESthreadPtr->join(); }
else { m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "Warning: External Stims plugin stopHook() tried to join a NULL thread\n"; }
}
//software tagging diagnosting
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Total external ones received through IPC: " << m_DebugCurrentReadIPCStimulations << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Sent to Designer: " << m_DebugExternalStimulationsSent << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Lost because of invalid timestamp: " << m_DebugStimulationsLost << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came earlier: " << m_DebugStimulationsReceivedEarlier << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that came later: " << m_DebugStimulationsReceivedLate << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Stimulations that had wrong size: " << m_DebugStimulationsReceivedWrongSize << "\n";
m_kernelCtx.getLogManager() << Kernel::LogLevel_Debug << " Buffered: " << m_DebugStimulationsBuffered << "\n";
//end software tagging diagnosting
}
// Plugin specific methods
void CPluginExternalStimulations::readExternalStimulations()
{
using namespace boost::interprocess;
//std::cout << "Creating External Stimulations thread" << std::endl;
//std::cout << "Queue Name : " << m_ExternalStimulationsQueueName << std::endl;
//char mq_name[255];
//std::strcpy(mq_name, m_ExternalStimulationsQueueName.toASCIIString());
const int chunkLength = 3;
const int pauseTime = 5;
uint32_t priority;
size_t recvdSize;
uint64_t chunk[chunkLength];
while (m_IsESThreadRunning)
{
bool success;
try
{
//Open a message queue.
message_queue mq(open_only //only open
, m_ExternalStimulationsQueueName.toASCIIString() //name
//,mq_name //name
);
success = mq.try_receive(&chunk, sizeof(chunk), recvdSize, priority);
}
catch (interprocess_exception& /* ex */)
{
//m_IsESThreadRunning = false;
//m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with message queue in external stimulations:" << ex.what() << "\n";
System::Time::sleep(pauseTime);
continue;
}
if (!success)
{
System::Time::sleep(pauseTime);
continue;
}
m_DebugCurrentReadIPCStimulations++;
if (recvdSize != sizeof(chunk))
{
//m_kernelCtx.getLogManager() << Kernel::LogLevel_Error << "Problem with type of received data when reqding external stimulation!\n";
m_DebugStimulationsReceivedWrongSize++;
}
else
{
//m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "received\n";
SExternalStimulation stim;
stim.identifier = chunk[1];
const uint64_t receivedTime = chunk[2];
//1. calculate time
const uint64_t ctStartTimeMs = (m_CTStartTime.time * 1000 + m_CTStartTime.millitm);
const int64_t timeTest = receivedTime - ctStartTimeMs;
if (timeTest < 0)
{
m_DebugStimulationsLost++;
//m_kernelCtx.getLogManager() << Kernel::LogLevel_Warning << "AS: external stimulation time is invalid, probably stimulation is before reference point, total invalid so far: " << m_FlashesLost << "\n";
System::Time::sleep(pauseTime);
continue; //we skip this stimulation
}
//2. Convert to OpenVibe time
const uint64_t ctEventTime = receivedTime - ctStartTimeMs;
const double time = double(ctEventTime) / double(1000);
const uint64_t ovTime = CTime(time).time();
stim.timestamp = ovTime;
//3. Store, the main thread will process it
{
//lock
std::lock_guard<std::mutex> lock(m_es_mutex);
m_ExternalStimulations.push_back(stim);
m_DebugStimulationsBuffered++;
m_esAvailable.notify_one();
//unlock
}
System::Time::sleep(pauseTime);
}
}
}
void CPluginExternalStimulations::addExternalStimulations(CStimulationSet* ss, Kernel::ILogManager& /*logm*/, const uint64_t start, const uint64_t /*end*/)
{
const uint64_t durationMs = 40;
{
//lock
std::lock_guard<std::mutex> lock(m_es_mutex);
for (auto i = m_ExternalStimulations.begin(); i != m_ExternalStimulations.end(); ++i)
{
// if time is current or any time in the future - send it (AS will buffer it)
if (i->timestamp >= start)
{
//flashes_in_this_time_chunk++;
//logm << Kernel::LogLevel_Error << "Stimulation added." << "\n";
ss->appendStimulation(i->identifier, i->timestamp, durationMs);
}
else
{
//the stimulation is coming too late - after the current block being processed
//we correct the timestamp to the current block and we send it
m_DebugStimulationsReceivedLate++;
ss->appendStimulation(i->identifier, start, durationMs);
}
m_DebugExternalStimulationsSent++;
}
// Since we processed all stimulations, we can clear the queue
m_ExternalStimulations.clear();
m_esAvailable.notify_one();
//unlock
}
}
bool CPluginExternalStimulations::setExternalStimulationsEnabled(const bool active)
{
m_IsExternalStimulationsEnabled = active;
return true;
}
} // namespace Plugins
} // namespace AcquisitionServer
} // namespace OpenViBE