Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.py 13KB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. # This has to be done first as Twisted is import-order-sensitive with reactors
  2. import asyncio # isort:skip
  3. import os # isort:skip
  4. import sys # isort:skip
  5. import warnings # isort:skip
  6. from concurrent.futures import ThreadPoolExecutor # isort:skip
  7. from twisted.internet import asyncioreactor # isort:skip
  8. twisted_loop = asyncio.new_event_loop()
  9. if "ASGI_THREADS" in os.environ:
  10. twisted_loop.set_default_executor(
  11. ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
  12. )
  13. current_reactor = sys.modules.get("twisted.internet.reactor", None)
  14. if current_reactor is not None:
  15. if not isinstance(current_reactor, asyncioreactor.AsyncioSelectorReactor):
  16. warnings.warn(
  17. "Something has already installed a non-asyncio Twisted reactor. Attempting to uninstall it; "
  18. + "you can fix this warning by importing daphne.server early in your codebase or "
  19. + "finding the package that imports Twisted and importing it later on.",
  20. UserWarning,
  21. )
  22. del sys.modules["twisted.internet.reactor"]
  23. asyncioreactor.install(twisted_loop)
  24. else:
  25. asyncioreactor.install(twisted_loop)
  26. import logging
  27. import time
  28. from concurrent.futures import CancelledError
  29. from functools import partial
  30. from twisted.internet import defer, reactor
  31. from twisted.internet.endpoints import serverFromString
  32. from twisted.logger import STDLibLogObserver, globalLogBeginner
  33. from twisted.web import http
  34. from .http_protocol import HTTPFactory
  35. from .ws_protocol import WebSocketFactory
  36. logger = logging.getLogger(__name__)
  37. class Server:
  38. def __init__(
  39. self,
  40. application,
  41. endpoints=None,
  42. signal_handlers=True,
  43. action_logger=None,
  44. http_timeout=None,
  45. request_buffer_size=8192,
  46. websocket_timeout=86400,
  47. websocket_connect_timeout=20,
  48. ping_interval=20,
  49. ping_timeout=30,
  50. root_path="",
  51. proxy_forwarded_address_header=None,
  52. proxy_forwarded_port_header=None,
  53. proxy_forwarded_proto_header=None,
  54. verbosity=1,
  55. websocket_handshake_timeout=5,
  56. application_close_timeout=10,
  57. ready_callable=None,
  58. server_name="daphne",
  59. ):
  60. self.application = application
  61. self.endpoints = endpoints or []
  62. self.listeners = []
  63. self.listening_addresses = []
  64. self.signal_handlers = signal_handlers
  65. self.action_logger = action_logger
  66. self.http_timeout = http_timeout
  67. self.ping_interval = ping_interval
  68. self.ping_timeout = ping_timeout
  69. self.request_buffer_size = request_buffer_size
  70. self.proxy_forwarded_address_header = proxy_forwarded_address_header
  71. self.proxy_forwarded_port_header = proxy_forwarded_port_header
  72. self.proxy_forwarded_proto_header = proxy_forwarded_proto_header
  73. self.websocket_timeout = websocket_timeout
  74. self.websocket_connect_timeout = websocket_connect_timeout
  75. self.websocket_handshake_timeout = websocket_handshake_timeout
  76. self.application_close_timeout = application_close_timeout
  77. self.root_path = root_path
  78. self.verbosity = verbosity
  79. self.abort_start = False
  80. self.ready_callable = ready_callable
  81. self.server_name = server_name
  82. # Check our construction is actually sensible
  83. if not self.endpoints:
  84. logger.error("No endpoints. This server will not listen on anything.")
  85. sys.exit(1)
  86. def run(self):
  87. # A dict of protocol: {"application_instance":, "connected":, "disconnected":} dicts
  88. self.connections = {}
  89. # Make the factory
  90. self.http_factory = HTTPFactory(self)
  91. self.ws_factory = WebSocketFactory(self, server=self.server_name)
  92. self.ws_factory.setProtocolOptions(
  93. autoPingTimeout=self.ping_timeout,
  94. allowNullOrigin=True,
  95. openHandshakeTimeout=self.websocket_handshake_timeout,
  96. )
  97. if self.verbosity <= 1:
  98. # Redirect the Twisted log to nowhere
  99. globalLogBeginner.beginLoggingTo(
  100. [lambda _: None], redirectStandardIO=False, discardBuffer=True
  101. )
  102. else:
  103. globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
  104. # Detect what Twisted features are enabled
  105. if http.H2_ENABLED:
  106. logger.info("HTTP/2 support enabled")
  107. else:
  108. logger.info(
  109. "HTTP/2 support not enabled (install the http2 and tls Twisted extras)"
  110. )
  111. # Kick off the timeout loop
  112. reactor.callLater(1, self.application_checker)
  113. reactor.callLater(2, self.timeout_checker)
  114. for socket_description in self.endpoints:
  115. logger.info("Configuring endpoint %s", socket_description)
  116. ep = serverFromString(reactor, str(socket_description))
  117. listener = ep.listen(self.http_factory)
  118. listener.addCallback(self.listen_success)
  119. listener.addErrback(self.listen_error)
  120. self.listeners.append(listener)
  121. # Set the asyncio reactor's event loop as global
  122. # TODO: Should we instead pass the global one into the reactor?
  123. asyncio.set_event_loop(reactor._asyncioEventloop)
  124. # Verbosity 3 turns on asyncio debug to find those blocking yields
  125. if self.verbosity >= 3:
  126. asyncio.get_event_loop().set_debug(True)
  127. reactor.addSystemEventTrigger("before", "shutdown", self.kill_all_applications)
  128. if not self.abort_start:
  129. # Trigger the ready flag if we had one
  130. if self.ready_callable:
  131. self.ready_callable()
  132. # Run the reactor
  133. reactor.run(installSignalHandlers=self.signal_handlers)
  134. def listen_success(self, port):
  135. """
  136. Called when a listen succeeds so we can store port details (if there are any)
  137. """
  138. if hasattr(port, "getHost"):
  139. host = port.getHost()
  140. if hasattr(host, "host") and hasattr(host, "port"):
  141. self.listening_addresses.append((host.host, host.port))
  142. logger.info(
  143. "Listening on TCP address %s:%s",
  144. port.getHost().host,
  145. port.getHost().port,
  146. )
  147. def listen_error(self, failure):
  148. logger.critical("Listen failure: %s", failure.getErrorMessage())
  149. self.stop()
  150. def stop(self):
  151. """
  152. Force-stops the server.
  153. """
  154. if reactor.running:
  155. reactor.stop()
  156. else:
  157. self.abort_start = True
  158. ### Protocol handling
  159. def protocol_connected(self, protocol):
  160. """
  161. Adds a protocol as a current connection.
  162. """
  163. if protocol in self.connections:
  164. raise RuntimeError("Protocol %r was added to main list twice!" % protocol)
  165. self.connections[protocol] = {"connected": time.time()}
  166. def protocol_disconnected(self, protocol):
  167. # Set its disconnected time (the loops will come and clean it up)
  168. # Do not set it if it is already set. Overwriting it might
  169. # cause it to never be cleaned up.
  170. # See https://github.com/django/channels/issues/1181
  171. if "disconnected" not in self.connections[protocol]:
  172. self.connections[protocol]["disconnected"] = time.time()
  173. ### Internal event/message handling
  174. def create_application(self, protocol, scope):
  175. """
  176. Creates a new application instance that fronts a Protocol instance
  177. for one of our supported protocols. Pass it the protocol,
  178. and it will work out the type, supply appropriate callables, and
  179. return you the application's input queue
  180. """
  181. # Make sure the protocol has not had another application made for it
  182. assert "application_instance" not in self.connections[protocol]
  183. # Make an instance of the application
  184. input_queue = asyncio.Queue()
  185. scope.setdefault("asgi", {"version": "3.0"})
  186. application_instance = self.application(
  187. scope=scope,
  188. receive=input_queue.get,
  189. send=partial(self.handle_reply, protocol),
  190. )
  191. # Run it, and stash the future for later checking
  192. if protocol not in self.connections:
  193. return None
  194. self.connections[protocol]["application_instance"] = asyncio.ensure_future(
  195. application_instance,
  196. loop=asyncio.get_event_loop(),
  197. )
  198. return input_queue
  199. async def handle_reply(self, protocol, message):
  200. """
  201. Coroutine that jumps the reply message from asyncio to Twisted
  202. """
  203. # Don't do anything if the connection is closed or does not exist
  204. if protocol not in self.connections or self.connections[protocol].get(
  205. "disconnected", None
  206. ):
  207. return
  208. try:
  209. self.check_headers_type(message)
  210. except ValueError:
  211. # Ensure to send SOME reply.
  212. protocol.basic_error(500, b"Server Error", "Server Error")
  213. raise
  214. # Let the protocol handle it
  215. protocol.handle_reply(message)
  216. @staticmethod
  217. def check_headers_type(message):
  218. if not message["type"] == "http.response.start":
  219. return
  220. for k, v in message.get("headers", []):
  221. if not isinstance(k, bytes):
  222. raise ValueError(
  223. "Header name '{}' expected to be `bytes`, but got `{}`".format(
  224. k, type(k)
  225. )
  226. )
  227. if not isinstance(v, bytes):
  228. raise ValueError(
  229. "Header value '{}' expected to be `bytes`, but got `{}`".format(
  230. v, type(v)
  231. )
  232. )
  233. ### Utility
  234. def application_checker(self):
  235. """
  236. Goes through the set of current application Futures and cleans up
  237. any that are done/prints exceptions for any that errored.
  238. """
  239. for protocol, details in list(self.connections.items()):
  240. disconnected = details.get("disconnected", None)
  241. application_instance = details.get("application_instance", None)
  242. # First, see if the protocol disconnected and the app has taken
  243. # too long to close up
  244. if (
  245. disconnected
  246. and time.time() - disconnected > self.application_close_timeout
  247. ):
  248. if application_instance and not application_instance.done():
  249. logger.warning(
  250. "Application instance %r for connection %s took too long to shut down and was killed.",
  251. application_instance,
  252. repr(protocol),
  253. )
  254. application_instance.cancel()
  255. # Then see if the app is done and we should reap it
  256. if application_instance and application_instance.done():
  257. try:
  258. exception = application_instance.exception()
  259. except (CancelledError, asyncio.CancelledError):
  260. # Future cancellation. We can ignore this.
  261. pass
  262. else:
  263. if exception:
  264. if isinstance(exception, KeyboardInterrupt):
  265. # Protocol is asking the server to exit (likely during test)
  266. self.stop()
  267. else:
  268. logger.error(
  269. "Exception inside application: %s",
  270. exception,
  271. exc_info=exception,
  272. )
  273. if not disconnected:
  274. protocol.handle_exception(exception)
  275. del self.connections[protocol]["application_instance"]
  276. application_instance = None
  277. # Check to see if protocol is closed and app is closed so we can remove it
  278. if not application_instance and disconnected:
  279. del self.connections[protocol]
  280. reactor.callLater(1, self.application_checker)
  281. def kill_all_applications(self):
  282. """
  283. Kills all application coroutines before reactor exit.
  284. """
  285. # Send cancel to all coroutines
  286. wait_for = []
  287. for details in self.connections.values():
  288. application_instance = details["application_instance"]
  289. if not application_instance.done():
  290. application_instance.cancel()
  291. wait_for.append(application_instance)
  292. logger.info("Killed %i pending application instances", len(wait_for))
  293. # Make Twisted wait until they're all dead
  294. wait_deferred = defer.Deferred.fromFuture(asyncio.gather(*wait_for))
  295. wait_deferred.addErrback(lambda x: None)
  296. return wait_deferred
  297. def timeout_checker(self):
  298. """
  299. Called periodically to enforce timeout rules on all connections.
  300. Also checks pings at the same time.
  301. """
  302. for protocol in list(self.connections.keys()):
  303. protocol.check_timeouts()
  304. reactor.callLater(2, self.timeout_checker)
  305. def log_action(self, protocol, action, details):
  306. """
  307. Dispatches to any registered action logger, if there is one.
  308. """
  309. if self.action_logger:
  310. self.action_logger(protocol, action, details)