123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- import numpy as np
- import multiprocessing
- from modules.reader_interface import ReaderInterface
- import ast
- import os
- import time
-
-
- FREQ_SWITCH_TIME = 0.05
-
- LEN_COLLECTED_SAMPLES = 10
-
-
- class FrequencySweep(multiprocessing.Process):
-
- def __init__(self, pipe=None, consumers_pipes=None):
- multiprocessing.Process.__init__(self)
- self.pipe = pipe
- self.consumers_pipes = consumers_pipes
- self.config = None
- self.current_amp = None
- self.reader = None
- self.request = None
- self.freq_sweep_channel = None
- self.samples_block_size = None
- self.sweeping = False
- self.calibrated = False
- self.command = None
- self.swept_array = None
- self.frequency_list = None
- self.collected_samples = None
- self.offsets = None
- self.phis = None
- self.magnitude_offsets = None
- self.current_feedback = None
- self.percentage = 0
- self.sweep_index = 0
-
-
- def read_config(self):
- config_file = open(os.path.join(os.getcwd(), "frequency_sweep_config.txt"), "r")
- contents = config_file.read()
- self.config = ast.literal_eval(contents)
- config_file.close()
- self.current_amp = self.config["current_amp"]
- self.freq_sweep_channel = self.config["freq_sweep_channel"]
- self.samples_block_size = self.config["samples_block_size"]
-
- def process_sample(self, samples):
- sum_samples = np.zeros(np.shape(samples[0].getRawData()[self.freq_sweep_channel]), dtype=np.complex)
- for sample in samples:
- sum_samples += sample.getRawData()[self.freq_sweep_channel]
- self.collected_samples = sum_samples / len(samples)
- self.collected_samples = self.collected_samples.flatten()
-
- def configure_reader(self):
- self.reader = ReaderInterface(resistance=self.config["resistance"],
- inductance=self.config["inductance"],
- load_factor=self.config["loadFactor"],
- num_antennas=self.config["numAntennas"],
- channel_permutation=self.config["channelPermutation"])
- for idx, c in enumerate(self.config["channels"]):
- self.reader.setFrequency(idx, c[0])
- if c[1] > 0:
- self.reader.setExciterCurrent(idx, self.current_amp)
- self.reader.setExciterEnabled(idx, True)
- self.reader.setChannel20Switch(3) # Set channel 20 to current feedback
- self.reader.enableConfiguration()
- self.request = self.reader.requestData(self.process_sample, blockSize=self.samples_block_size, blocks=-1)
-
- def run(self):
- self.read_config()
- self.configure_reader()
- while True:
- if self.pipe.poll():
- self.command = self.pipe.recv()
- self.reset_sweep()
- if self.command[0] != 'stop':
- self.initiate_sweep()
-
- if self.sweeping:
- self.set_reader()
- time.sleep(FREQ_SWITCH_TIME)
- self.update_arrays()
- self.percentage = int((self.sweep_index / (len(self.frequency_list) - 1)) * 100)
- self.send()
- if self.sweep_index < len(self.frequency_list) - 1:
- self.sweep_index += 1
- else:
- if self.command[0] == 'continuous_sweep':
- self.sweep_index = 0
- else:
- self.reset_sweep()
-
- def send(self):
- if self.command[0] == 'recalibrate':
- self.pipe.send([self.percentage, self.offsets[self.sweep_index, 1:], self.offsets])
- elif self.command[0] == 'sweep':
- self.pipe.send([self.percentage, self.swept_array[self.sweep_index, 1:], self.swept_array])
- if self.consumers_pipes is not None:
- for pipe in self.consumers_pipes:
- pipe.send([self.percentage, self.swept_array[self.sweep_index, 1:], self.swept_array])
- elif self.command[0] == 'continuous_sweep':
- self.pipe.send([self.percentage, self.swept_array[self.sweep_index, 1:], self.swept_array])
- if self.consumers_pipes is not None:
- for pipe in self.consumers_pipes:
- pipe.send([self.percentage, self.swept_array[self.sweep_index, 1:], self.swept_array])
-
- def initiate_sweep(self):
- self.frequency_list = self.command[1]
- self.swept_array = np.ones((len(self.frequency_list), 22), dtype=np.complex)
- self.swept_array[:, 0] = self.frequency_list
- if self.command[0] == 'recalibrate':
- self.offsets = np.ones((len(self.frequency_list), 22), dtype=np.complex)
- self.calibrated = False
- self.sweeping = True
-
- def reset_sweep(self):
- self.sweeping = False
- self.sweep_index = 0
-
- def set_reader(self):
- self.reader.setFrequency(self.freq_sweep_channel, float(self.frequency_list[self.sweep_index]))
- self.reader.setExciterCurrent(self.freq_sweep_channel, self.current_amp)
- #self.reader.setExciterEnabled(self.freq_sweep_channel, True)
-
- def update_arrays(self):
- if self.command[0] == 'recalibrate':
- self.offsets[self.sweep_index, 1:] = self.collected_samples
- if self.sweep_index == len(self.frequency_list) - 1:
- self.phis = np.angle(self.offsets[:, 1:21], deg=False)
- self.magnitude_offsets = np.abs(self.offsets[:, 1:21])
- self.current_feedback = self.offsets[:, 21]
- self.calibrated = True
-
- else:
- if self.calibrated:
- self.swept_array[self.sweep_index, 1:] = (self.collected_samples - self.offsets[self.sweep_index, 1:])
- s_rotated = self.collected_samples[0:20] * np.exp(-1j * self.phis[self.sweep_index, :])
- current_feedback_calibrated = self.collected_samples[20] / self.current_feedback[self.sweep_index]
- self.swept_array[self.sweep_index, 1:21] = s_rotated - \
- (current_feedback_calibrated * self.magnitude_offsets[self.sweep_index, :])
- self.swept_array[self.sweep_index, 21] = current_feedback_calibrated
-
-
-
-
-
-
-
-
-
-
|