Development of an internal social media platform with personalised dashboards for students
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

compat.py 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """
  2. kombu.compat
  3. ============
  4. Carrot compatible interface for :class:`Publisher` and :class:`Producer`.
  5. See http://packages.python.org/pypi/carrot for documentation.
  6. """
  7. from __future__ import absolute_import
  8. from itertools import count
  9. from . import messaging
  10. from .entity import Exchange, Queue
  11. from .five import items
  12. __all__ = ['Publisher', 'Consumer']
  13. # XXX compat attribute
  14. entry_to_queue = Queue.from_dict
  15. def _iterconsume(connection, consumer, no_ack=False, limit=None):
  16. consumer.consume(no_ack=no_ack)
  17. for iteration in count(0): # for infinity
  18. if limit and iteration >= limit:
  19. raise StopIteration
  20. yield connection.drain_events()
  21. class Publisher(messaging.Producer):
  22. exchange = ''
  23. exchange_type = 'direct'
  24. routing_key = ''
  25. durable = True
  26. auto_delete = False
  27. _closed = False
  28. def __init__(self, connection, exchange=None, routing_key=None,
  29. exchange_type=None, durable=None, auto_delete=None,
  30. channel=None, **kwargs):
  31. if channel:
  32. connection = channel
  33. self.exchange = exchange or self.exchange
  34. self.exchange_type = exchange_type or self.exchange_type
  35. self.routing_key = routing_key or self.routing_key
  36. if auto_delete is not None:
  37. self.auto_delete = auto_delete
  38. if durable is not None:
  39. self.durable = durable
  40. if not isinstance(self.exchange, Exchange):
  41. self.exchange = Exchange(name=self.exchange,
  42. type=self.exchange_type,
  43. routing_key=self.routing_key,
  44. auto_delete=self.auto_delete,
  45. durable=self.durable)
  46. super(Publisher, self).__init__(connection, self.exchange, **kwargs)
  47. def send(self, *args, **kwargs):
  48. return self.publish(*args, **kwargs)
  49. def close(self):
  50. super(Publisher, self).close()
  51. self._closed = True
  52. def __enter__(self):
  53. return self
  54. def __exit__(self, *exc_info):
  55. self.close()
  56. @property
  57. def backend(self):
  58. return self.channel
  59. class Consumer(messaging.Consumer):
  60. queue = ''
  61. exchange = ''
  62. routing_key = ''
  63. exchange_type = 'direct'
  64. durable = True
  65. exclusive = False
  66. auto_delete = False
  67. exchange_type = 'direct'
  68. _closed = False
  69. def __init__(self, connection, queue=None, exchange=None,
  70. routing_key=None, exchange_type=None, durable=None,
  71. exclusive=None, auto_delete=None, **kwargs):
  72. self.backend = connection.channel()
  73. if durable is not None:
  74. self.durable = durable
  75. if exclusive is not None:
  76. self.exclusive = exclusive
  77. if auto_delete is not None:
  78. self.auto_delete = auto_delete
  79. self.queue = queue or self.queue
  80. self.exchange = exchange or self.exchange
  81. self.exchange_type = exchange_type or self.exchange_type
  82. self.routing_key = routing_key or self.routing_key
  83. exchange = Exchange(self.exchange,
  84. type=self.exchange_type,
  85. routing_key=self.routing_key,
  86. auto_delete=self.auto_delete,
  87. durable=self.durable)
  88. queue = Queue(self.queue,
  89. exchange=exchange,
  90. routing_key=self.routing_key,
  91. durable=self.durable,
  92. exclusive=self.exclusive,
  93. auto_delete=self.auto_delete)
  94. super(Consumer, self).__init__(self.backend, queue, **kwargs)
  95. def revive(self, channel):
  96. self.backend = channel
  97. super(Consumer, self).revive(channel)
  98. def close(self):
  99. self.cancel()
  100. self.backend.close()
  101. self._closed = True
  102. def __enter__(self):
  103. return self
  104. def __exit__(self, *exc_info):
  105. self.close()
  106. def __iter__(self):
  107. return self.iterqueue(infinite=True)
  108. def fetch(self, no_ack=None, enable_callbacks=False):
  109. if no_ack is None:
  110. no_ack = self.no_ack
  111. message = self.queues[0].get(no_ack)
  112. if message:
  113. if enable_callbacks:
  114. self.receive(message.payload, message)
  115. return message
  116. def process_next(self):
  117. raise NotImplementedError('Use fetch(enable_callbacks=True)')
  118. def discard_all(self, filterfunc=None):
  119. if filterfunc is not None:
  120. raise NotImplementedError(
  121. 'discard_all does not implement filters')
  122. return self.purge()
  123. def iterconsume(self, limit=None, no_ack=None):
  124. return _iterconsume(self.connection, self, no_ack, limit)
  125. def wait(self, limit=None):
  126. it = self.iterconsume(limit)
  127. return list(it)
  128. def iterqueue(self, limit=None, infinite=False):
  129. for items_since_start in count(): # for infinity
  130. item = self.fetch()
  131. if (not infinite and item is None) or \
  132. (limit and items_since_start >= limit):
  133. raise StopIteration
  134. yield item
  135. class ConsumerSet(messaging.Consumer):
  136. def __init__(self, connection, from_dict=None, consumers=None,
  137. channel=None, **kwargs):
  138. if channel:
  139. self._provided_channel = True
  140. self.backend = channel
  141. else:
  142. self._provided_channel = False
  143. self.backend = connection.channel()
  144. queues = []
  145. if consumers:
  146. for consumer in consumers:
  147. queues.extend(consumer.queues)
  148. if from_dict:
  149. for queue_name, queue_options in items(from_dict):
  150. queues.append(Queue.from_dict(queue_name, **queue_options))
  151. super(ConsumerSet, self).__init__(self.backend, queues, **kwargs)
  152. def iterconsume(self, limit=None, no_ack=False):
  153. return _iterconsume(self.connection, self, no_ack, limit)
  154. def discard_all(self):
  155. return self.purge()
  156. def add_consumer_from_dict(self, queue, **options):
  157. return self.add_queue_from_dict(queue, **options)
  158. def add_consumer(self, consumer):
  159. for queue in consumer.queues:
  160. self.add_queue(queue)
  161. def revive(self, channel):
  162. self.backend = channel
  163. super(ConsumerSet, self).revive(channel)
  164. def close(self):
  165. self.cancel()
  166. if not self._provided_channel:
  167. self.channel.close()