Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_common.py 5.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. ###############################################################################
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (c) typedef int GmbH
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy
  8. # of this software and associated documentation files (the "Software"), to deal
  9. # in the Software without restriction, including without limitation the rights
  10. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. # copies of the Software, and to permit persons to whom the Software is
  12. # furnished to do so, subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in
  15. # all copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. # THE SOFTWARE.
  24. #
  25. ###############################################################################
  26. import math
  27. from txaio.interfaces import IBatchedTimer
  28. class _BatchedCall(object):
  29. """
  30. Wraps IDelayedCall-implementing objects, implementing only the API
  31. which txaio promised in the first place: .cancel
  32. Do not create these yourself; use _BatchedTimer.call_later()
  33. """
  34. def __init__(self, timer, index, the_call):
  35. # XXX WeakRef?
  36. self._timer = timer
  37. self._index = index
  38. self._call = the_call
  39. def cancel(self):
  40. self._timer._remove_call(self._index, self)
  41. self._timer = None
  42. def __call__(self):
  43. return self._call()
  44. class _BatchedTimer(IBatchedTimer):
  45. """
  46. Internal helper.
  47. Instances of this are returned from
  48. :meth:`txaio.make_batched_timer` and that is the only way they
  49. should be instantiated. You may depend on methods from the
  50. interface class only (:class:`txaio.IBatchedTimer`)
  51. **NOTE** that the times are in milliseconds in this class!
  52. """
  53. def __init__(self, bucket_milliseconds, chunk_size,
  54. seconds_provider, delayed_call_creator, loop=None):
  55. if bucket_milliseconds <= 0.0:
  56. raise ValueError(
  57. "bucket_milliseconds must be > 0.0"
  58. )
  59. self._bucket_milliseconds = float(bucket_milliseconds)
  60. self._chunk_size = chunk_size
  61. self._get_seconds = seconds_provider
  62. self._create_delayed_call = delayed_call_creator
  63. self._buckets = dict() # real seconds -> (IDelayedCall, list)
  64. self._loop = loop
  65. def call_later(self, delay, func, *args, **kwargs):
  66. """
  67. IBatchedTimer API
  68. """
  69. # "quantize" the delay to the nearest bucket
  70. now = self._get_seconds()
  71. real_time = int(now + delay) * 1000
  72. real_time -= int(real_time % self._bucket_milliseconds)
  73. call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
  74. try:
  75. self._buckets[real_time][1].append(call)
  76. except KeyError:
  77. # new bucket; need to add "actual" underlying IDelayedCall
  78. diff = (real_time / 1000.0) - now
  79. # we need to clamp this because if we quantized to the
  80. # nearest second, but that second is actually (slightly)
  81. # less than the current time 'diff' will be negative.
  82. delayed_call = self._create_delayed_call(
  83. max(0.0, diff),
  84. self._notify_bucket, real_time,
  85. )
  86. self._buckets[real_time] = (delayed_call, [call])
  87. return call
  88. def _notify_bucket(self, real_time):
  89. """
  90. Internal helper. This 'does' the callbacks in a particular bucket.
  91. :param real_time: the bucket to do callbacks on
  92. """
  93. (delayed_call, calls) = self._buckets[real_time]
  94. del self._buckets[real_time]
  95. errors = []
  96. def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
  97. for call in calls[:chunk_size]:
  98. try:
  99. call()
  100. except Exception as e:
  101. errors.append(e)
  102. calls = calls[chunk_size:]
  103. if calls:
  104. self._create_delayed_call(
  105. chunk_delay_ms / 1000.0,
  106. notify_one_chunk, calls, chunk_size, chunk_delay_ms,
  107. )
  108. else:
  109. # done all calls; make sure there were no errors
  110. if len(errors):
  111. msg = "Error(s) processing call_later bucket:\n"
  112. for e in errors:
  113. msg += "{}\n".format(e)
  114. raise RuntimeError(msg)
  115. # ceil()ing because we want the number of chunks, and a
  116. # partial chunk is still a chunk
  117. delay_ms = self._bucket_milliseconds / math.ceil(float(len(calls)) / self._chunk_size)
  118. # I can't imagine any scenario in which chunk_delay_ms is
  119. # actually less than zero, but just being safe here
  120. notify_one_chunk(calls, self._chunk_size, max(0.0, delay_ms))
  121. def _remove_call(self, real_time, call):
  122. """
  123. Internal helper. Removes a (possibly still pending) call from a
  124. bucket. It is *not* an error of the bucket is gone (e.g. the
  125. call has already happened).
  126. """
  127. try:
  128. (delayed_call, calls) = self._buckets[real_time]
  129. except KeyError:
  130. # no such bucket ... error? swallow?
  131. return
  132. # remove call; if we're empty, cancel underlying
  133. # bucket-timeout IDelayedCall
  134. calls.remove(call)
  135. if not calls:
  136. del self._buckets[real_time]
  137. delayed_call.cancel()