123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """
- kombu.compat
- ============
-
- Carrot compatible interface for :class:`Publisher` and :class:`Producer`.
-
- See http://packages.python.org/pypi/carrot for documentation.
-
- """
- from __future__ import absolute_import
-
- from itertools import count
-
- from . import messaging
- from .entity import Exchange, Queue
- from .five import items
-
- __all__ = ['Publisher', 'Consumer']
-
- # XXX compat attribute
- entry_to_queue = Queue.from_dict
-
-
- def _iterconsume(connection, consumer, no_ack=False, limit=None):
- consumer.consume(no_ack=no_ack)
- for iteration in count(0): # for infinity
- if limit and iteration >= limit:
- raise StopIteration
- yield connection.drain_events()
-
-
- class Publisher(messaging.Producer):
- exchange = ''
- exchange_type = 'direct'
- routing_key = ''
- durable = True
- auto_delete = False
- _closed = False
-
- def __init__(self, connection, exchange=None, routing_key=None,
- exchange_type=None, durable=None, auto_delete=None,
- channel=None, **kwargs):
- if channel:
- connection = channel
-
- self.exchange = exchange or self.exchange
- self.exchange_type = exchange_type or self.exchange_type
- self.routing_key = routing_key or self.routing_key
-
- if auto_delete is not None:
- self.auto_delete = auto_delete
- if durable is not None:
- self.durable = durable
-
- if not isinstance(self.exchange, Exchange):
- self.exchange = Exchange(name=self.exchange,
- type=self.exchange_type,
- routing_key=self.routing_key,
- auto_delete=self.auto_delete,
- durable=self.durable)
- super(Publisher, self).__init__(connection, self.exchange, **kwargs)
-
- def send(self, *args, **kwargs):
- return self.publish(*args, **kwargs)
-
- def close(self):
- super(Publisher, self).close()
- self._closed = True
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc_info):
- self.close()
-
- @property
- def backend(self):
- return self.channel
-
-
- class Consumer(messaging.Consumer):
- queue = ''
- exchange = ''
- routing_key = ''
- exchange_type = 'direct'
- durable = True
- exclusive = False
- auto_delete = False
- exchange_type = 'direct'
- _closed = False
-
- def __init__(self, connection, queue=None, exchange=None,
- routing_key=None, exchange_type=None, durable=None,
- exclusive=None, auto_delete=None, **kwargs):
- self.backend = connection.channel()
-
- if durable is not None:
- self.durable = durable
- if exclusive is not None:
- self.exclusive = exclusive
- if auto_delete is not None:
- self.auto_delete = auto_delete
-
- self.queue = queue or self.queue
- self.exchange = exchange or self.exchange
- self.exchange_type = exchange_type or self.exchange_type
- self.routing_key = routing_key or self.routing_key
-
- exchange = Exchange(self.exchange,
- type=self.exchange_type,
- routing_key=self.routing_key,
- auto_delete=self.auto_delete,
- durable=self.durable)
- queue = Queue(self.queue,
- exchange=exchange,
- routing_key=self.routing_key,
- durable=self.durable,
- exclusive=self.exclusive,
- auto_delete=self.auto_delete)
- super(Consumer, self).__init__(self.backend, queue, **kwargs)
-
- def revive(self, channel):
- self.backend = channel
- super(Consumer, self).revive(channel)
-
- def close(self):
- self.cancel()
- self.backend.close()
- self._closed = True
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc_info):
- self.close()
-
- def __iter__(self):
- return self.iterqueue(infinite=True)
-
- def fetch(self, no_ack=None, enable_callbacks=False):
- if no_ack is None:
- no_ack = self.no_ack
- message = self.queues[0].get(no_ack)
- if message:
- if enable_callbacks:
- self.receive(message.payload, message)
- return message
-
- def process_next(self):
- raise NotImplementedError('Use fetch(enable_callbacks=True)')
-
- def discard_all(self, filterfunc=None):
- if filterfunc is not None:
- raise NotImplementedError(
- 'discard_all does not implement filters')
- return self.purge()
-
- def iterconsume(self, limit=None, no_ack=None):
- return _iterconsume(self.connection, self, no_ack, limit)
-
- def wait(self, limit=None):
- it = self.iterconsume(limit)
- return list(it)
-
- def iterqueue(self, limit=None, infinite=False):
- for items_since_start in count(): # for infinity
- item = self.fetch()
- if (not infinite and item is None) or \
- (limit and items_since_start >= limit):
- raise StopIteration
- yield item
-
-
- class ConsumerSet(messaging.Consumer):
-
- def __init__(self, connection, from_dict=None, consumers=None,
- channel=None, **kwargs):
- if channel:
- self._provided_channel = True
- self.backend = channel
- else:
- self._provided_channel = False
- self.backend = connection.channel()
-
- queues = []
- if consumers:
- for consumer in consumers:
- queues.extend(consumer.queues)
- if from_dict:
- for queue_name, queue_options in items(from_dict):
- queues.append(Queue.from_dict(queue_name, **queue_options))
-
- super(ConsumerSet, self).__init__(self.backend, queues, **kwargs)
-
- def iterconsume(self, limit=None, no_ack=False):
- return _iterconsume(self.connection, self, no_ack, limit)
-
- def discard_all(self):
- return self.purge()
-
- def add_consumer_from_dict(self, queue, **options):
- return self.add_queue_from_dict(queue, **options)
-
- def add_consumer(self, consumer):
- for queue in consumer.queues:
- self.add_queue(queue)
-
- def revive(self, channel):
- self.backend = channel
- super(ConsumerSet, self).revive(channel)
-
- def close(self):
- self.cancel()
- if not self._provided_channel:
- self.channel.close()
|