You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

consumer.py 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import functools
  2. from asgiref.sync import async_to_sync
  3. from . import DEFAULT_CHANNEL_LAYER
  4. from .db import database_sync_to_async
  5. from .exceptions import StopConsumer
  6. from .layers import get_channel_layer
  7. from .utils import await_many_dispatch
  8. def get_handler_name(message):
  9. """
  10. Looks at a message, checks it has a sensible type, and returns the
  11. handler name for that type.
  12. """
  13. # Check message looks OK
  14. if "type" not in message:
  15. raise ValueError("Incoming message has no 'type' attribute")
  16. if message["type"].startswith("_"):
  17. raise ValueError("Malformed type in message (leading underscore)")
  18. # Extract type and replace . with _
  19. return message["type"].replace(".", "_")
  20. class AsyncConsumer:
  21. """
  22. Base consumer class. Implements the ASGI application spec, and adds on
  23. channel layer management and routing of events to named methods based
  24. on their type.
  25. """
  26. _sync = False
  27. channel_layer_alias = DEFAULT_CHANNEL_LAYER
  28. def __init__(self, scope):
  29. self.scope = scope
  30. async def __call__(self, receive, send):
  31. """
  32. Dispatches incoming messages to type-based handlers asynchronously.
  33. """
  34. # Initialize channel layer
  35. self.channel_layer = get_channel_layer(self.channel_layer_alias)
  36. if self.channel_layer is not None:
  37. self.channel_name = await self.channel_layer.new_channel()
  38. self.channel_receive = functools.partial(
  39. self.channel_layer.receive, self.channel_name
  40. )
  41. # Store send function
  42. if self._sync:
  43. self.base_send = async_to_sync(send)
  44. else:
  45. self.base_send = send
  46. # Pass messages in from channel layer or client to dispatch method
  47. try:
  48. if self.channel_layer is not None:
  49. await await_many_dispatch(
  50. [receive, self.channel_receive], self.dispatch
  51. )
  52. else:
  53. await await_many_dispatch([receive], self.dispatch)
  54. except StopConsumer:
  55. # Exit cleanly
  56. pass
  57. async def dispatch(self, message):
  58. """
  59. Works out what to do with a message.
  60. """
  61. handler = getattr(self, get_handler_name(message), None)
  62. if handler:
  63. await handler(message)
  64. else:
  65. raise ValueError("No handler for message type %s" % message["type"])
  66. async def send(self, message):
  67. """
  68. Overrideable/callable-by-subclasses send method.
  69. """
  70. await self.base_send(message)
  71. class SyncConsumer(AsyncConsumer):
  72. """
  73. Synchronous version of the consumer, which is what we write most of the
  74. generic consumers against (for now). Calls handlers in a threadpool and
  75. uses CallBouncer to get the send method out to the main event loop.
  76. It would have been possible to have "mixed" consumers and auto-detect
  77. if a handler was awaitable or not, but that would have made the API
  78. for user-called methods very confusing as there'd be two types of each.
  79. """
  80. _sync = True
  81. @database_sync_to_async
  82. def dispatch(self, message):
  83. """
  84. Dispatches incoming messages to type-based handlers asynchronously.
  85. """
  86. # Get and execute the handler
  87. handler = getattr(self, get_handler_name(message), None)
  88. if handler:
  89. handler(message)
  90. else:
  91. raise ValueError("No handler for message type %s" % message["type"])
  92. def send(self, message):
  93. """
  94. Overrideable/callable-by-subclasses send method.
  95. """
  96. self.base_send(message)