You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.py 1.6KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import asyncio
  2. from asgiref.server import StatelessServer
  3. class Worker(StatelessServer):
  4. """
  5. ASGI protocol server that surfaces events sent to specific channels
  6. on the channel layer into a single application instance.
  7. """
  8. def __init__(self, application, channels, channel_layer, max_applications=1000):
  9. super().__init__(application, max_applications)
  10. self.channels = channels
  11. self.channel_layer = channel_layer
  12. if self.channel_layer is None:
  13. raise ValueError("Channel layer is not valid")
  14. async def handle(self):
  15. """
  16. Listens on all the provided channels and handles the messages.
  17. """
  18. # For each channel, launch its own listening coroutine
  19. listeners = []
  20. for channel in self.channels:
  21. listeners.append(asyncio.ensure_future(self.listener(channel)))
  22. # Wait for them all to exit
  23. await asyncio.wait(listeners)
  24. # See if any of the listeners had an error (e.g. channel layer error)
  25. [listener.result() for listener in listeners]
  26. async def listener(self, channel):
  27. """
  28. Single-channel listener
  29. """
  30. while True:
  31. message = await self.channel_layer.receive(channel)
  32. if not message.get("type", None):
  33. raise ValueError("Worker received message with no type.")
  34. # Make a scope and get an application instance for it
  35. scope = {"type": "channel", "channel": channel}
  36. instance_queue = self.get_or_create_application_instance(channel, scope)
  37. # Run the message into the app
  38. await instance_queue.put(message)