123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- from reader_interface import ReaderInterface
- import numpy as np
- from calibration import Calibration
- from localization import Localization
- from plot_results import Plotting
- from moving_average import MovingAverage
- from tkinter import *
- import time
- import socket
-
-
- """-----------------------------------GUI Code-------------------------------------------------------------------- """
-
- def calibrate():
- global calibration
- print("-----Started calibration...")
- calibration.new_calibration()
-
-
- def start_recording():
- global record, recording_array
- recording_array = np.zeros((record_length.get(), 19), dtype=np.float)
- record = True
- print("-----Started recording...")
-
-
- def calculate_positions():
- localization.scale_factor = float(scale_factor.get())
- localization.k_nearest = k_nearest.get()
-
- localization.localize_all_samples(filename_localize_in.get(), filename_localize_out.get())
-
- def plot_antennas():
- antenna_list_formatted = antenna_list.get().split(', ')
- print("Antenna plots were made from : recorded_data/", filename_record_out.get(), "| it uses filename_record_out")
- plotting.plot_antennas(filename_record_out.get(), antenna_list_formatted)
-
- def plot_positions():
-
- plotting.plot_positions(filename_plot_in.get())
-
- # Set up Labelframes
- root = Tk()
-
- record = LabelFrame(root, text="Record")
- record.grid(row=0, column=0, sticky=N+E+S+W)
-
- localize = LabelFrame(root, text="Localize")
- localize.grid(row=0, column=1, sticky=N+E+S+W)
-
- plot = LabelFrame(root, text="Plot")
- plot.grid(row=1, column=1)
-
- # Record Labelframe
- filename_record_out = StringVar(record)
- filename_record_out.set('current_recording')
-
- record_length = IntVar(record)
- record_length.set(1000)
-
- label_record_length = Label(record, text="Number of samples:")
- label_record_length.grid(row=0, column=0, sticky=W)
- entry_record_length = Entry(record, textvariable=record_length)
- entry_record_length.grid(row=0, column=1, sticky=N + E + S + W)
-
- label_filename_record_out = Label(record, text="Filename (Out):")
- label_filename_record_out.grid(row=1, column=0, sticky=W)
- entry_filename_record_out = Entry(record, textvariable=filename_record_out)
- entry_filename_record_out.grid(row=1, column=1, sticky=N + E + S + W)
-
- button_calibrate = Button(record, text="Calibrate", command=calibrate)
- button_calibrate.grid(row=2, column=0, sticky=N + E + S + W)
-
- button_record = Button(record, text="Record", command=start_recording)
- button_record.grid(row=2, column=1, sticky=N + E + S + W)
-
- # Localize Labelframe
- filename_localize_in = StringVar(root)
- filename_localize_in.set('current_recording')
-
- scale_factor = StringVar(plot)
- scale_factor.set("0.003")
-
- k_nearest = IntVar(plot)
- k_nearest.set(10)
-
- filename_localize_out = StringVar(root)
- filename_localize_out.set('current_positions')
-
-
- label_filename_localize_in = Label(localize, text="Filename (In):")
- label_filename_localize_in.grid(row=0, column=0, sticky=W)
- emtry_filename_localize_in = Entry(localize, textvariable=filename_localize_in)
- emtry_filename_localize_in.grid(row=0, column=1, sticky=N + E + S + W)
-
- label_scale_factor = Label(localize, text="Scale factor:")
- label_scale_factor.grid(row=1, column=0, sticky=W)
- entry_scale_factor = Entry(localize, textvariable=scale_factor)
- entry_scale_factor.grid(row=1, column=1, sticky=N + E + S + W)
-
- label_k_nearest = Label(localize, text="k_nearest:")
- label_k_nearest.grid(row=2, column=0, sticky=W)
- entry_k_nearest = Entry(localize, textvariable=k_nearest)
- entry_k_nearest.grid(row=2, column=1, sticky=N + E + S + W)
-
- label_filename_localize_out = Label(localize, text="Filename (Out):")
- label_filename_localize_out.grid(row=3, column=0, sticky=W)
- entry_filename_localize_out = Entry(localize, textvariable=filename_localize_out)
- entry_filename_localize_out.grid(row=3, column=1, sticky=N + E + S + W)
-
- button_calculate_positions = Button(localize, text="Calculate positions", command=calculate_positions)
- button_calculate_positions.grid(row=4, column=0)
-
-
- # Plot Labelframe
- filename_plot_in = StringVar(plot)
- filename_plot_in.set('current_positions')
-
- antenna_list = StringVar(plot)
- antenna_list.set("f1, f2, f3, f4")
-
- label_filename_plot_in = Label(plot, text="Filename (In):")
- label_filename_plot_in.grid(row=0, column=0, sticky=W)
- entry_filename_plot_in = Entry(plot, textvariable=filename_plot_in)
- entry_filename_plot_in.grid(row=0, column=1, sticky=N + E + S + W)
-
- label_antennas = Label(plot, text="Antenna(s): [f1, f2, m1, m3, ...]")
- label_antennas.grid(row=1, column=0)
- entry_antennas = Entry(plot, textvariable=antenna_list)
- entry_antennas.grid(row=1, column=1)
- button_plot_antennas = Button(plot, text="Plot Antennas", command=plot_antennas)
- button_plot_antennas.grid(row=1, column=2)
-
- button_plot_positons = Button(plot, text="Plot Positions", command=plot_positions)
- button_plot_positons.grid(row=2, column=0)
-
-
- """ --------------------- Initializing Parameters--------------------------------------------------------------------"""
- # Localization Parameters
- moving_average_length = 100
- moving_avg = MovingAverage(moving_average_length)
- localization_method = 'k-nearest' # full_table
- #scale_factor = 1
- table = np.load('fingerprinting_tables/Julian_BThesis_table2.npy') # load fingerprinting table
- #k_nearest = 15
- timer_stream_positions_to_mqtt_client = time.time()
- # Recording parameters
- recording_index = 0
- record = False
- recording_array = recording_array = np.zeros((1000, 19))
- counter = 0
- startTime = time.time()
- samples_counter = 1
- socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-
- # fingerprinting_table = np.loadtxt("fingerprinting_tables/Julian_BThesis_table2.csv", delimiter=';')
- # np.save("fingerprinting_tables/Julian_BThesis_table2", fingerprinting_table)
-
-
- def set_reader():
- config = {
- "samples_block_size": 10,
- "resistance": .55,
- "inductance": 10e-6,
- "loadFactor": 1.0,
- "channels": [
- (119000, 1.1),
- (61500, 0.0),
- (50000, 0.0),
- (142000, 0.0)],
- # "channelPermutation": [0, 1, 3, 2, 4, 5, 6, 7, 8, 9,
- # 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
- # 20],
- "channelPermutation" :[1, 0, 3, 2, 4, 5, 7, 6, 9, 8, #config for hmi shelf
- 11, 10, 13, 12, 15, 14, 17, 16, 18, 19,
- 20],
- "numAntennas": 8
-
- }
- reader = ReaderInterface(resistance=config["resistance"],
- inductance=config["inductance"],
- load_factor=config["loadFactor"],
- num_antennas=config["numAntennas"],
- channel_permutation=config["channelPermutation"])
- for idx, c in enumerate(config["channels"]):
- reader.setFrequency(idx, c[0])
- if c[1] > 0: # CHANGED c[1] -> c[0]
- reader.setExciterCurrent(idx, c[1]) # CHANGED
- reader.setExciterEnabled(idx, True)
- reader.setChannel20Switch(3) # Set channel 20 to current feedback
- reader.enableConfiguration()
- return reader
-
-
- def process_sample(samples):
- global record, recording_index, recording_array, record_length, filename, counter, \
- startTime, samples_counter, timer_stream_positions_to_mqtt_client
- if time.time() - startTime > 1:
- counter = 0
- startTime = time.time()
- else:
- counter += 1
- for sample in samples:
- rawdata = sample.getRawData()
- rawdata = rawdata[0, :]
- calibrated_sample = calibration.process_sample(rawdata)
-
- if calibrated_sample is not None:
- averaged_sample = moving_avg.processSample(calibrated_sample)
- frame_antennas = np.imag(averaged_sample[0:16:2])
- main_antennas = np.imag(averaged_sample[1:16:2])
- frames_list = list(frame_antennas)
- mains_list = list(main_antennas)
- fv = np.append(frame_antennas, main_antennas)
- pos = [0, 0, 0]
-
- if record:
- if recording_index < (record_length.get()):
- # pos = localize(fv)
-
- recording_array[recording_index, 0:3] = pos
- recording_array[recording_index, 3:] = fv
- recording_index += 1
- print("recording progress=", recording_index)
- # print("Recording progress : ", round((recording_index / record_length.get()) * 100, 3), "%")
- else:
- np.save("recorded_data/" + entry_filename_record_out.get(), recording_array)
- print('-----Finished recording. Saved in: recorded_data/' + entry_filename_record_out.get() + ".npy ( shape=",
- np.shape(recording_array), ")")
- recording_index = 0
- record = False
- else:
- samples_counter += 1
-
-
- """ ------------------------ Setting up reader and starting request Data Thread-----------------------------------"""
-
- calibration = Calibration()
- reader = set_reader()
- request = reader.requestData(process_sample, blockSize=100, blocks=-1)
- localization = Localization()
- plotting = Plotting()
-
- root.mainloop()
|