Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

testing.py 9.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import logging
  2. import multiprocessing
  3. import os
  4. import pickle
  5. import tempfile
  6. import traceback
  7. from concurrent.futures import CancelledError
  8. class BaseDaphneTestingInstance:
  9. """
  10. Launches an instance of Daphne in a subprocess, with a host and port
  11. attribute allowing you to call it.
  12. Works as a context manager.
  13. """
  14. startup_timeout = 2
  15. def __init__(
  16. self, xff=False, http_timeout=None, request_buffer_size=None, *, application
  17. ):
  18. self.xff = xff
  19. self.http_timeout = http_timeout
  20. self.host = "127.0.0.1"
  21. self.request_buffer_size = request_buffer_size
  22. self.application = application
  23. def get_application(self):
  24. return self.application
  25. def __enter__(self):
  26. # Option Daphne features
  27. kwargs = {}
  28. if self.request_buffer_size:
  29. kwargs["request_buffer_size"] = self.request_buffer_size
  30. # Optionally enable X-Forwarded-For support.
  31. if self.xff:
  32. kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For"
  33. kwargs["proxy_forwarded_port_header"] = "X-Forwarded-Port"
  34. kwargs["proxy_forwarded_proto_header"] = "X-Forwarded-Proto"
  35. if self.http_timeout:
  36. kwargs["http_timeout"] = self.http_timeout
  37. # Start up process
  38. self.process = DaphneProcess(
  39. host=self.host,
  40. get_application=self.get_application,
  41. kwargs=kwargs,
  42. setup=self.process_setup,
  43. teardown=self.process_teardown,
  44. )
  45. self.process.start()
  46. # Wait for the port
  47. if self.process.ready.wait(self.startup_timeout):
  48. self.port = self.process.port.value
  49. return self
  50. else:
  51. if self.process.errors.empty():
  52. raise RuntimeError("Daphne did not start up, no error caught")
  53. else:
  54. error, traceback = self.process.errors.get(False)
  55. raise RuntimeError("Daphne did not start up:\n%s" % traceback)
  56. def __exit__(self, exc_type, exc_value, traceback):
  57. # Shut down the process
  58. self.process.terminate()
  59. del self.process
  60. def process_setup(self):
  61. """
  62. Called by the process just before it starts serving.
  63. """
  64. pass
  65. def process_teardown(self):
  66. """
  67. Called by the process just after it stops serving
  68. """
  69. pass
  70. def get_received(self):
  71. pass
  72. class DaphneTestingInstance(BaseDaphneTestingInstance):
  73. def __init__(self, *args, **kwargs):
  74. self.lock = multiprocessing.Lock()
  75. super().__init__(*args, **kwargs, application=TestApplication(lock=self.lock))
  76. def __enter__(self):
  77. # Clear result storage
  78. TestApplication.delete_setup()
  79. TestApplication.delete_result()
  80. return super().__enter__()
  81. def get_received(self):
  82. """
  83. Returns the scope and messages the test application has received
  84. so far. Note you'll get all messages since scope start, not just any
  85. new ones since the last call.
  86. Also checks for any exceptions in the application. If there are,
  87. raises them.
  88. """
  89. try:
  90. with self.lock:
  91. inner_result = TestApplication.load_result()
  92. except FileNotFoundError:
  93. raise ValueError("No results available yet.")
  94. # Check for exception
  95. if "exception" in inner_result:
  96. raise inner_result["exception"]
  97. return inner_result["scope"], inner_result["messages"]
  98. def add_send_messages(self, messages):
  99. """
  100. Adds messages for the application to send back.
  101. The next time it receives an incoming message, it will reply with these.
  102. """
  103. TestApplication.save_setup(response_messages=messages)
  104. class DaphneProcess(multiprocessing.Process):
  105. """
  106. Process subclass that launches and runs a Daphne instance, communicating the
  107. port it ends up listening on back to the parent process.
  108. """
  109. def __init__(self, host, get_application, kwargs=None, setup=None, teardown=None):
  110. super().__init__()
  111. self.host = host
  112. self.get_application = get_application
  113. self.kwargs = kwargs or {}
  114. self.setup = setup
  115. self.teardown = teardown
  116. self.port = multiprocessing.Value("i")
  117. self.ready = multiprocessing.Event()
  118. self.errors = multiprocessing.Queue()
  119. def run(self):
  120. # OK, now we are in a forked child process, and want to use the reactor.
  121. # However, FreeBSD systems like MacOS do not fork the underlying Kqueue,
  122. # which asyncio (hence asyncioreactor) is built on.
  123. # Therefore, we should uninstall the broken reactor and install a new one.
  124. _reinstall_reactor()
  125. from twisted.internet import reactor
  126. from .endpoints import build_endpoint_description_strings
  127. from .server import Server
  128. application = self.get_application()
  129. try:
  130. # Create the server class
  131. endpoints = build_endpoint_description_strings(host=self.host, port=0)
  132. self.server = Server(
  133. application=application,
  134. endpoints=endpoints,
  135. signal_handlers=False,
  136. **self.kwargs
  137. )
  138. # Set up a poller to look for the port
  139. reactor.callLater(0.1, self.resolve_port)
  140. # Run with setup/teardown
  141. if self.setup is not None:
  142. self.setup()
  143. try:
  144. self.server.run()
  145. finally:
  146. if self.teardown is not None:
  147. self.teardown()
  148. except BaseException as e:
  149. # Put the error on our queue so the parent gets it
  150. self.errors.put((e, traceback.format_exc()))
  151. def resolve_port(self):
  152. from twisted.internet import reactor
  153. if self.server.listening_addresses:
  154. self.port.value = self.server.listening_addresses[0][1]
  155. self.ready.set()
  156. else:
  157. reactor.callLater(0.1, self.resolve_port)
  158. class TestApplication:
  159. """
  160. An application that receives one or more messages, sends a response,
  161. and then quits the server. For testing.
  162. """
  163. setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio")
  164. result_storage = os.path.join(tempfile.gettempdir(), "result.testio")
  165. def __init__(self, lock):
  166. self.lock = lock
  167. self.messages = []
  168. async def __call__(self, scope, receive, send):
  169. self.scope = scope
  170. # Receive input and send output
  171. logging.debug("test app coroutine alive")
  172. try:
  173. while True:
  174. # Receive a message and save it into the result store
  175. self.messages.append(await receive())
  176. self.lock.acquire()
  177. logging.debug("test app received %r", self.messages[-1])
  178. self.save_result(self.scope, self.messages)
  179. self.lock.release()
  180. # See if there are any messages to send back
  181. setup = self.load_setup()
  182. self.delete_setup()
  183. for message in setup["response_messages"]:
  184. await send(message)
  185. logging.debug("test app sent %r", message)
  186. except Exception as e:
  187. if isinstance(e, CancelledError):
  188. # Don't catch task-cancelled errors!
  189. raise
  190. else:
  191. self.save_exception(e)
  192. @classmethod
  193. def save_setup(cls, response_messages):
  194. """
  195. Stores setup information.
  196. """
  197. with open(cls.setup_storage, "wb") as fh:
  198. pickle.dump({"response_messages": response_messages}, fh)
  199. @classmethod
  200. def load_setup(cls):
  201. """
  202. Returns setup details.
  203. """
  204. try:
  205. with open(cls.setup_storage, "rb") as fh:
  206. return pickle.load(fh)
  207. except FileNotFoundError:
  208. return {"response_messages": []}
  209. @classmethod
  210. def save_result(cls, scope, messages):
  211. """
  212. Saves details of what happened to the result storage.
  213. We could use pickle here, but that seems wrong, still, somehow.
  214. """
  215. with open(cls.result_storage, "wb") as fh:
  216. pickle.dump({"scope": scope, "messages": messages}, fh)
  217. @classmethod
  218. def save_exception(cls, exception):
  219. """
  220. Saves details of what happened to the result storage.
  221. We could use pickle here, but that seems wrong, still, somehow.
  222. """
  223. with open(cls.result_storage, "wb") as fh:
  224. pickle.dump({"exception": exception}, fh)
  225. @classmethod
  226. def load_result(cls):
  227. """
  228. Returns result details.
  229. """
  230. with open(cls.result_storage, "rb") as fh:
  231. return pickle.load(fh)
  232. @classmethod
  233. def delete_setup(cls):
  234. """
  235. Clears setup storage files.
  236. """
  237. try:
  238. os.unlink(cls.setup_storage)
  239. except OSError:
  240. pass
  241. @classmethod
  242. def delete_result(cls):
  243. """
  244. Clears result storage files.
  245. """
  246. try:
  247. os.unlink(cls.result_storage)
  248. except OSError:
  249. pass
  250. def _reinstall_reactor():
  251. import asyncio
  252. import sys
  253. from twisted.internet import asyncioreactor
  254. # Uninstall the reactor.
  255. if "twisted.internet.reactor" in sys.modules:
  256. del sys.modules["twisted.internet.reactor"]
  257. # The daphne.server module may have already installed the reactor.
  258. # If so, using this module will use uninstalled one, thus we should
  259. # reimport this module too.
  260. if "daphne.server" in sys.modules:
  261. del sys.modules["daphne.server"]
  262. event_loop = asyncio.new_event_loop()
  263. asyncioreactor.install(event_loop)
  264. asyncio.set_event_loop(event_loop)