Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

consumer.py 4.2KB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import functools
  2. from asgiref.sync import async_to_sync
  3. from . import DEFAULT_CHANNEL_LAYER
  4. from .db import database_sync_to_async
  5. from .exceptions import StopConsumer
  6. from .layers import get_channel_layer
  7. from .utils import await_many_dispatch
  8. def get_handler_name(message):
  9. """
  10. Looks at a message, checks it has a sensible type, and returns the
  11. handler name for that type.
  12. """
  13. # Check message looks OK
  14. if "type" not in message:
  15. raise ValueError("Incoming message has no 'type' attribute")
  16. # Extract type and replace . with _
  17. handler_name = message["type"].replace(".", "_")
  18. if handler_name.startswith("_"):
  19. raise ValueError("Malformed type in message (leading underscore)")
  20. return handler_name
  21. class AsyncConsumer:
  22. """
  23. Base consumer class. Implements the ASGI application spec, and adds on
  24. channel layer management and routing of events to named methods based
  25. on their type.
  26. """
  27. _sync = False
  28. channel_layer_alias = DEFAULT_CHANNEL_LAYER
  29. async def __call__(self, scope, receive, send):
  30. """
  31. Dispatches incoming messages to type-based handlers asynchronously.
  32. """
  33. self.scope = scope
  34. # Initialize channel layer
  35. self.channel_layer = get_channel_layer(self.channel_layer_alias)
  36. if self.channel_layer is not None:
  37. self.channel_name = await self.channel_layer.new_channel()
  38. self.channel_receive = functools.partial(
  39. self.channel_layer.receive, self.channel_name
  40. )
  41. # Store send function
  42. if self._sync:
  43. self.base_send = async_to_sync(send)
  44. else:
  45. self.base_send = send
  46. # Pass messages in from channel layer or client to dispatch method
  47. try:
  48. if self.channel_layer is not None:
  49. await await_many_dispatch(
  50. [receive, self.channel_receive], self.dispatch
  51. )
  52. else:
  53. await await_many_dispatch([receive], self.dispatch)
  54. except StopConsumer:
  55. # Exit cleanly
  56. pass
  57. async def dispatch(self, message):
  58. """
  59. Works out what to do with a message.
  60. """
  61. handler = getattr(self, get_handler_name(message), None)
  62. if handler:
  63. await handler(message)
  64. else:
  65. raise ValueError("No handler for message type %s" % message["type"])
  66. async def send(self, message):
  67. """
  68. Overrideable/callable-by-subclasses send method.
  69. """
  70. await self.base_send(message)
  71. @classmethod
  72. def as_asgi(cls, **initkwargs):
  73. """
  74. Return an ASGI v3 single callable that instantiates a consumer instance
  75. per scope. Similar in purpose to Django's as_view().
  76. initkwargs will be used to instantiate the consumer instance.
  77. """
  78. async def app(scope, receive, send):
  79. consumer = cls(**initkwargs)
  80. return await consumer(scope, receive, send)
  81. app.consumer_class = cls
  82. app.consumer_initkwargs = initkwargs
  83. # take name and docstring from class
  84. functools.update_wrapper(app, cls, updated=())
  85. return app
  86. class SyncConsumer(AsyncConsumer):
  87. """
  88. Synchronous version of the consumer, which is what we write most of the
  89. generic consumers against (for now). Calls handlers in a threadpool and
  90. uses CallBouncer to get the send method out to the main event loop.
  91. It would have been possible to have "mixed" consumers and auto-detect
  92. if a handler was awaitable or not, but that would have made the API
  93. for user-called methods very confusing as there'd be two types of each.
  94. """
  95. _sync = True
  96. @database_sync_to_async
  97. def dispatch(self, message):
  98. """
  99. Dispatches incoming messages to type-based handlers asynchronously.
  100. """
  101. # Get and execute the handler
  102. handler = getattr(self, get_handler_name(message), None)
  103. if handler:
  104. handler(message)
  105. else:
  106. raise ValueError("No handler for message type %s" % message["type"])
  107. def send(self, message):
  108. """
  109. Overrideable/callable-by-subclasses send method.
  110. """
  111. self.base_send(message)