Brain_Computer_Interface/Code/UIModellVisuell.py
2022-07-12 08:01:52 +02:00

230 lines
9.2 KiB
Python

'''--------------------------------------------------------------------------------------------------
Das Visuelle Model steuert den Ablauf der aufrufe der Openvibe Funktionen fuer das visuelle BCI
'''
from subprocess import *
from threading import Thread
import time
import re
from UIModell import Modell
class ModellVisuell(Modell):
def __init__(self,c, dll):
Thread.__init__(self)
Modell.__init__(self, c, dll)
self.aktiv= True
self.openVibeAktiv = False
def stop(self):
'''--------------------------------------------------------------------------------------------------
Die Funktion stop stopt den ablaufenden Thread
'''
print("stop thread")
self.aktiv = False
def setFunktion(self, func, args=None, kwargs=None):
'''--------------------------------------------------------------------------------------------------
Mittels dieser Funktion kann die im Thread ablaufende Funktion und deren Argumente eingestellt werden
'''
self.func = func
self.args = args or []
self.kwargs = kwargs or {}
def run(self):
'''--------------------------------------------------------------------------------------------------
Wird bei Thread.start() aus dem Controller aufgerufen und laeuft ab sobald der Thread destartet wurde
'''
print("start thread")
#self.aktiv = True
t = Thread(target=self.func, args=self.args, kwargs=self.kwargs)
t.setDaemon(True)
t.start()
while self.aktiv:
time.sleep(0.1)
def startCopySpelling(self):
'''--------------------------------------------------------------------------------------------------
Holt aus der shared Librarz die Befehle und startet das Copyspelling. Reagiert auf Error oder
beenden. Bei Beenden startet es das XDawnTraining, bei Error wird der Vorgang gestoppt.
Positioniert die visuelle Matrix.
'''
print("start copySpelling")
self.controller.resetInfo()
self.controller.addInfoText('Starten des gefuerhten Buchstabierens -- ')
command = self.dll.getCommandCopyspellingVisuell().split(" ")
process = Popen(command, stdout=PIPE, universal_newlines=True)
self.openVibeAktiv = True
while True:
output = process.stdout.readline()
print(output.strip())
x = output.find("Application terminated")
y = output.find("Error")
z = output.find("Initialization")
if(x != -1):
print("Training finished")
process.terminate()
self.controller.addInfoText('Buchstabieren beendet\n')
self.controller.stop("filterXdawn")
break
elif(y != -1 ):
print("Error occured")
self.controller.changeScreen("StartPage")
self.controller.addInfoText("Fehler beim Buchstabieren aufgetaucht\n")
process.terminate()
self.controller.stop()
break
elif(z != -1):
print("Initiation finisched")
time.sleep(2)
self.positionWindow("p300")
if not self.aktiv:
print("stop")
break
#self.controller.stop()
def trainXDawn(self):
'''--------------------------------------------------------------------------------------------------
Holt aus der shared Library die Befehle und startet das XDawnTraining. Reagiert auf Error oder
die Ausgabe das das Training beendet wurde. Bei Beenden startet es das ClassifierTraining, bei Error wird der Vorgang gestoppt.
'''
print("start training Xdawn")
self.controller.addInfoText('Starten des XDawn/Trainings -- ')
command = self.dll.getCommandXDawn_visuell().split(" ")
process = Popen(command, stdout=PIPE, universal_newlines=True)
self.openVibeAktiv = True
while self.aktiv:
output = process.stdout.readline()
print(output.strip())
x = output.find("Training finished and saved")
y = output.find("Error")
if(x != -1):
print("Training finished")
process.terminate()
self.controller.addInfoText('Beenden des Trainings\n')
self.controller.stop("filterClassic")
break
elif(y != -1 ):
print("Error occured")
self.controller.changeScreen("StartPage")
self.controller.addInfoText("Fehler beim XDawn Training aufgetaucht\n")
self.controller.stop()
process.terminate()
break
if not self.aktiv:
print("stop")
break
#self.controller.stop()
def trainClassifier(self):
'''--------------------------------------------------------------------------------------------------
Holt aus der shared Library die Befehle und startet das ClassifierTraining. Reagiert auf Error oder
die Ausgabe das das Training beendet wurde. Bei Beenden startet es das speichern der Daten, bei Error wird der Vorgang gestoppt.
'''
print("start training Classifier")
self.controller.addInfoText("Starten Classifier-Trainings -- ")
command = self.dll.getCommandClassifier_visuell().split(" ")
process = Popen(command, stdout=PIPE, universal_newlines=True)
self.openVibeAktiv = True
counter = 0
acc = 0
while True:
output = process.stdout.readline()
print(output.strip())
x = output.find("aka Classifier trainer")
accuracy = output.find("Cross-validation test accuracy is ")
y = output.find("Error")
if(x != -1):
counter = counter +1
#counter = 18
if(counter >= 17):
print("Training finished")
self.controller.addInfoText('Beenden des Training (' + acc + ')\n')
process.terminate()
self.controller.stop("save")
break
elif(y != -1 ):
print("Error occured")
self.controller.changeScreen("StartPage")
self.controller.addInfoText("Fehler beim Classifier Training aufgetaucht\n")
process.terminate()
break
if(accuracy != -1):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
test = output.encode("windows-1252").decode("utf-8")
i = len("Cross-validation test")
acc_s = test[accuracy+i:accuracy+i+41].strip()
acc = ansi_escape.sub('', acc_s)
print(acc)
if not self.aktiv:
print("stop")
break
self.controller.changeScreen("StartPage")
self.controller.stop()
def freeSpelling(self):
'''--------------------------------------------------------------------------------------------------
Holt aus der shared Library die Befehle und startet das Freespelling. Reagiert auf Error oder
beenden. Daraufhin wird der Vorgang gestoppt. Positioniert die visuelle Matrix.
'''
print("start freeSpelling")
self.controller.resetInfo()
self.controller.addInfoText('Starten des freien Buchstabierens -- ')
command = self.dll.getCommandFreespellingVisuell().split(" ")
process = Popen(command, stdout=PIPE, universal_newlines=True)
self.openVibeAktiv = True
while True:
output = process.stdout.readline()
print(output.strip())
y = output.find("Error")
x = output.find("Schlagwort")
z = output.find("Initialization")
if(x != -1):
print("End Spelling")
process.terminate()
self.controller.addInfoText('Buchstabieren beendet\n')
self.controller.changeScreen("StartPage")
self.controller.stop()
break
elif(y != -1 ):
print("Error occured")
self.controller.changeScreen("StartPage")
self.controller.addInfoText("Fehler beim Buchstabieren aufgetaucht\n")
process.terminate()
self.controller.stop()
break
elif(z != -1):
print("Initiation finisched")
time.sleep(2)
self.positionWindow("p300")
if not self.aktiv:
print("stop")
break
self.controller.changeScreen("StartPage")
self.controller.stop()
def killProzess(self):
'''--------------------------------------------------------------------------------------------------
Ruft den Parent auf, dass Openvibe falls es noch laeuft geschlossen wird
'''
if(self.openVibeAktiv):
self.openVibeAktiv = False
self.killProzessParent()