You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

moving_average.py 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. '''
  2. Created on 15.01.2017
  3. @author: muellead
  4. '''
  5. import numpy
  6. from collections import deque
  7. class MovingAverage(object):
  8. def __init__(self, length=100):
  9. self._length = length
  10. self.clear()
  11. def processSample(self, sample):
  12. """!
  13. @brief
  14. # If resetPeriod samples have already been processed
  15. # reset the averaging sum to prevent drift from
  16. # numerical errors
  17. @param sample
  18. @return averaged
  19. """
  20. if self._count == 100*self._length or self._sum is None:
  21. self._sum = 0
  22. for x in self._buffer:
  23. self._sum += x
  24. self._count = 0
  25. self._count += 1
  26. # If there is currently space in the queue add the sample
  27. if len(self._buffer) <= self._length:
  28. self._buffer.append(sample)
  29. self._sum += sample
  30. # If the queue is full remove the oldest sample
  31. if len(self._buffer) > self._length:
  32. old = self._buffer.popleft()
  33. self._sum -= old
  34. # Calculate new average value
  35. averaged = self._sum / len(self._buffer)
  36. return averaged
  37. def getCurrentAverage(self):
  38. """!
  39. @brief
  40. @return
  41. """
  42. if self._sum is None:
  43. return 0.0
  44. return self._sum / len(self._buffer)
  45. def getCurrentLength(self):
  46. """!
  47. @brief
  48. @return
  49. """
  50. return len(self._buffer)
  51. def getNoisePower(self):
  52. """!
  53. @brief
  54. """
  55. if self._sum is None:
  56. return 0.0
  57. # Iterate over buffer and sum over squared signal-mean-difference
  58. # This gives the noise energy in the buffered signal
  59. avg = self.getCurrentAverage()
  60. noise = 0
  61. for x in self._buffer:
  62. noise += numpy.abs(x - avg) ** 2
  63. # Divide by length of buffer to get noise power (energy per sample, standard deviation)
  64. return noise / (len(self._buffer) - 1)
  65. def getAbsNoiseVariance(self):
  66. """!
  67. @brief
  68. """
  69. if self._sum is None:
  70. return 0.0
  71. avg = numpy.abs(self.getCurrentAverage())
  72. noise = 0
  73. for x in self._buffer:
  74. noise += (numpy.abs(x) - avg) ** 2
  75. return noise / (len(self._buffer) - 1)
  76. def getPhaseNoiseVariance(self):
  77. """!
  78. @brief
  79. """
  80. if self._sum is None:
  81. return 0.0
  82. avg = numpy.angle(self.getCurrentAverage())
  83. noise = 0
  84. for x in self._buffer:
  85. n = numpy.abs(numpy.angle(x) - avg)
  86. while n > 2*numpy.pi: n -= 2*numpy.pi
  87. noise += n**2
  88. return noise / (len(self._buffer) - 1)
  89. def clear(self):
  90. """!
  91. @brief
  92. """
  93. self._sum = None
  94. self._buffer = deque([])
  95. self._count = 0