123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- '''
- Created on 15.01.2017
-
- @author: muellead
- '''
-
- import numpy
- from collections import deque
-
- class MovingAverage(object):
-
- def __init__(self, length=100):
- self._length = length
- self.clear()
-
- def processSample(self, sample):
- """!
- @brief
- # If resetPeriod samples have already been processed
- # reset the averaging sum to prevent drift from
- # numerical errors
- @param sample
- @return averaged
- """
- if self._count == 100*self._length or self._sum is None:
- self._sum = 0
- for x in self._buffer:
- self._sum += x
- self._count = 0
- self._count += 1
-
- # If there is currently space in the queue add the sample
- if len(self._buffer) <= self._length:
- self._buffer.append(sample)
- self._sum += sample
- # If the queue is full remove the oldest sample
- if len(self._buffer) > self._length:
- old = self._buffer.popleft()
- self._sum -= old
-
- # Calculate new average value
- averaged = self._sum / len(self._buffer)
- return averaged
-
- def getCurrentAverage(self):
- """!
- @brief
- @return
- """
- if self._sum is None:
- return 0.0
- return self._sum / len(self._buffer)
-
- def getCurrentLength(self):
- """!
- @brief
- @return
- """
- return len(self._buffer)
-
- def getNoisePower(self):
- """!
- @brief
- """
- if self._sum is None:
- return 0.0
-
- # Iterate over buffer and sum over squared signal-mean-difference
- # This gives the noise energy in the buffered signal
- avg = self.getCurrentAverage()
- noise = 0
- for x in self._buffer:
- noise += numpy.abs(x - avg) ** 2
- # Divide by length of buffer to get noise power (energy per sample, standard deviation)
- return noise / (len(self._buffer) - 1)
-
- def getAbsNoiseVariance(self):
- """!
- @brief
- """
- if self._sum is None:
- return 0.0
-
- avg = numpy.abs(self.getCurrentAverage())
- noise = 0
- for x in self._buffer:
- noise += (numpy.abs(x) - avg) ** 2
- return noise / (len(self._buffer) - 1)
-
- def getPhaseNoiseVariance(self):
- """!
- @brief
- """
- if self._sum is None:
- return 0.0
-
- avg = numpy.angle(self.getCurrentAverage())
- noise = 0
- for x in self._buffer:
- n = numpy.abs(numpy.angle(x) - avg)
- while n > 2*numpy.pi: n -= 2*numpy.pi
- noise += n**2
- return noise / (len(self._buffer) - 1)
-
-
- def clear(self):
- """!
- @brief
- """
- self._sum = None
- self._buffer = deque([])
- self._count = 0
-
|