146 lines
7.0 KiB
Python
146 lines
7.0 KiB
Python
import numpy as np
|
|
import multiprocessing
|
|
import ast
|
|
import os
|
|
import time
|
|
from PIL import Image, ImageTk
|
|
import time
|
|
from copy import deepcopy
|
|
|
|
|
|
class ObjectRecognizer(multiprocessing.Process):
|
|
|
|
def __init__(self, pipe_from_gui=None, pipe_from_frequency_sweep=None, antenna_index=0, threshold=0.1):
|
|
multiprocessing.Process.__init__(self)
|
|
self.pipe_from_gui = pipe_from_gui
|
|
self.pipe_from_frequency_sweep = pipe_from_frequency_sweep
|
|
self.antenna_index = antenna_index
|
|
self.threshold = threshold
|
|
self.objects_database = {}
|
|
self.objects_database_loaded = False
|
|
self.data_from_gui = None
|
|
self.data_from_frequency_sweep = None
|
|
self.above_threshold = False
|
|
self.recognized_object = [None, None]
|
|
self.image_dimensions = [600, 600]
|
|
|
|
def run(self):
|
|
while True:
|
|
if self.pipe_from_gui.poll():
|
|
self.data_from_gui = self.pipe_from_gui.recv()
|
|
command = self.data_from_gui[0]
|
|
if command == 'set_objects_database_path':
|
|
self.load_objects_database()
|
|
#self.load_helper_images()
|
|
self.pipe_from_gui.send(['database_loaded', self.objects_database_loaded])
|
|
elif command == 'set_threshold':
|
|
self.threshold = self.data_from_gui[1]
|
|
if self.pipe_from_frequency_sweep.poll():
|
|
self.data_from_frequency_sweep = self.pipe_from_frequency_sweep.recv()
|
|
if self.objects_database_loaded:
|
|
self.check_threshold()
|
|
if self.data_from_frequency_sweep[0] == 100:
|
|
if self.above_threshold:
|
|
self.recognize_object()
|
|
self.send_recognized_object()
|
|
|
|
def send_recognized_object(self):
|
|
self.pipe_from_gui.send(self.recognized_object)
|
|
|
|
def load_objects_database(self):
|
|
self.objects_database = {}
|
|
self.objects_database_loaded = False
|
|
self.database_path = os.path.join(os.getcwd(), self.data_from_gui[1])
|
|
if os.path.isdir(self.database_path):
|
|
fingerprints_directory = os.path.join(self.database_path, 'objects_fingerprints')
|
|
if os.path.isdir(fingerprints_directory):
|
|
files = os.listdir(fingerprints_directory)
|
|
if len(files) != 0:
|
|
for i, file in enumerate(files):
|
|
file_text = file.split('.')
|
|
filename = file_text[0]
|
|
file_extension = file_text[1]
|
|
if file_extension == 'npy':
|
|
self.objects_database[filename] = [np.load(os.path.join(fingerprints_directory, file)), None]
|
|
if (len(files) -1) !=0:
|
|
self.pipe_from_gui.send(['sweep_database_loading',(i / (len(files)-1)) *100])
|
|
self.objects_database_loaded = True
|
|
images_directory = os.path.join(self.database_path, 'objects_images')
|
|
if os.path.isdir(images_directory):
|
|
files = os.listdir(images_directory)
|
|
if len(files) != 0:
|
|
for i, file in enumerate(files):
|
|
file_text = file.split('.')
|
|
filename = file_text[0]
|
|
file_extension = file_text[1]
|
|
if file_extension in ['jpg', 'jpeg', 'png', 'JPG','JPEG','PNG']:
|
|
if filename in self.objects_database.keys():
|
|
image_file_path = os.path.join(images_directory, file)
|
|
image_file = Image.open(image_file_path)
|
|
image_file = image_file.resize((self.image_dimensions[0], self.image_dimensions[1]),
|
|
Image.ANTIALIAS)
|
|
|
|
self.objects_database[filename][1] = deepcopy(image_file)
|
|
if (len(files) - 1) != 0:
|
|
self.pipe_from_gui.send(['images_database_loading', (i / (len(files)-1)) *100])
|
|
|
|
def load_helper_images(self):
|
|
|
|
self.helper_images = {}
|
|
self.helper_images['place_an_object'] = ['place_an_object', None]
|
|
self.helper_images['blank'] = ['blank', None]
|
|
self.helper_images['identifying_object'] = ['identifying_object', None]
|
|
helper_images_directory = os.path.join(self.database_path, 'helper_images')
|
|
if os.path.isdir(helper_images_directory):
|
|
files = os.listdir(helper_images_directory)
|
|
if len(files) != 0:
|
|
for i, file in enumerate(files):
|
|
file_text = file.split('.')
|
|
filename = file_text[0]
|
|
file_extension = file_text[1]
|
|
if file_extension in ['jpg', 'jpeg', 'png', 'JPG', 'JPEG', 'PNG']:
|
|
if filename in self.helper_images.keys():
|
|
print(filename)
|
|
image_file_path = os.path.join(helper_images_directory, file)
|
|
print(image_file_path)
|
|
image_file = Image.open(image_file_path)
|
|
image_file = image_file.resize((self.image_dimensions[0], self.image_dimensions[1]),
|
|
Image.ANTIALIAS)
|
|
|
|
#object_image = ImageTk.PhotoImage(image_file)
|
|
self.helper_images[filename][1] = deepcopy(image_file)
|
|
if (len(files) - 1) != 0:
|
|
self.pipe_from_gui.send(['helper_images_loading', str(i / (len(files)-1))])
|
|
|
|
def recognize_object(self):
|
|
min_euclidean_distance = 100000000
|
|
for key in self.objects_database.keys():
|
|
ref_sweep = self.objects_database[key][0][:, 0+1]
|
|
measured_sweep = self.data_from_frequency_sweep[2][:, self.antenna_index+1]
|
|
if len(ref_sweep) == len(measured_sweep):
|
|
phase_ref = np.arctan2(np.imag(ref_sweep), np.real(ref_sweep))
|
|
phase_meas = np.arctan2(np.imag(measured_sweep), np.real(measured_sweep))
|
|
dist = phase_meas - phase_ref
|
|
dist = np.sum(np.abs(dist))
|
|
if dist < min_euclidean_distance:
|
|
min_euclidean_distance = dist
|
|
self.recognized_object = [key, self.objects_database[key][1]]
|
|
else:
|
|
self.recognized_object = [None, None]
|
|
self.pipe_from_gui.send(['error', 'Measured and reference data dimensions don\'t match!'])
|
|
|
|
def check_threshold(self):
|
|
antenna_signal = self.data_from_frequency_sweep[1][self.antenna_index]
|
|
if np.abs(antenna_signal) > self.threshold:
|
|
self.pipe_from_gui.send(['identifying_object', None])
|
|
self.above_threshold = True
|
|
else:
|
|
self.pipe_from_gui.send(['place_an_object', None])
|
|
self.above_threshold = False
|
|
|
|
|
|
|
|
|
|
|
|
|