Development of an internal social media platform with personalised dashboards for students
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

messaging.py 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. """
  2. kombu.messaging
  3. ===============
  4. Sending and receiving messages.
  5. """
  6. from __future__ import absolute_import
  7. import numbers
  8. from itertools import count
  9. from .common import maybe_declare
  10. from .compression import compress
  11. from .connection import maybe_channel, is_connection
  12. from .entity import Exchange, Queue, DELIVERY_MODES
  13. from .exceptions import ContentDisallowed
  14. from .five import text_t, values
  15. from .serialization import dumps, prepare_accept_content
  16. from .utils import ChannelPromise, maybe_list
  17. __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
  18. class Producer(object):
  19. """Message Producer.
  20. :param channel: Connection or channel.
  21. :keyword exchange: Optional default exchange.
  22. :keyword routing_key: Optional default routing key.
  23. :keyword serializer: Default serializer. Default is `"json"`.
  24. :keyword compression: Default compression method. Default is no
  25. compression.
  26. :keyword auto_declare: Automatically declare the default exchange
  27. at instantiation. Default is :const:`True`.
  28. :keyword on_return: Callback to call for undeliverable messages,
  29. when the `mandatory` or `immediate` arguments to
  30. :meth:`publish` is used. This callback needs the following
  31. signature: `(exception, exchange, routing_key, message)`.
  32. Note that the producer needs to drain events to use this feature.
  33. """
  34. #: Default exchange
  35. exchange = None
  36. #: Default routing key.
  37. routing_key = ''
  38. #: Default serializer to use. Default is JSON.
  39. serializer = None
  40. #: Default compression method. Disabled by default.
  41. compression = None
  42. #: By default the exchange is declared at instantiation.
  43. #: If you want to declare manually then you can set this
  44. #: to :const:`False`.
  45. auto_declare = True
  46. #: Basic return callback.
  47. on_return = None
  48. #: Set if channel argument was a Connection instance (using
  49. #: default_channel).
  50. __connection__ = None
  51. def __init__(self, channel, exchange=None, routing_key=None,
  52. serializer=None, auto_declare=None, compression=None,
  53. on_return=None):
  54. self._channel = channel
  55. self.exchange = exchange
  56. self.routing_key = routing_key or self.routing_key
  57. self.serializer = serializer or self.serializer
  58. self.compression = compression or self.compression
  59. self.on_return = on_return or self.on_return
  60. self._channel_promise = None
  61. if self.exchange is None:
  62. self.exchange = Exchange('')
  63. if auto_declare is not None:
  64. self.auto_declare = auto_declare
  65. if self._channel:
  66. self.revive(self._channel)
  67. def __repr__(self):
  68. return '<Producer: {0._channel}>'.format(self)
  69. def __reduce__(self):
  70. return self.__class__, self.__reduce_args__()
  71. def __reduce_args__(self):
  72. return (None, self.exchange, self.routing_key, self.serializer,
  73. self.auto_declare, self.compression)
  74. def declare(self):
  75. """Declare the exchange.
  76. This happens automatically at instantiation if
  77. :attr:`auto_declare` is enabled.
  78. """
  79. if self.exchange.name:
  80. self.exchange.declare()
  81. def maybe_declare(self, entity, retry=False, **retry_policy):
  82. """Declare the exchange if it hasn't already been declared
  83. during this session."""
  84. if entity:
  85. return maybe_declare(entity, self.channel, retry, **retry_policy)
  86. def publish(self, body, routing_key=None, delivery_mode=None,
  87. mandatory=False, immediate=False, priority=0,
  88. content_type=None, content_encoding=None, serializer=None,
  89. headers=None, compression=None, exchange=None, retry=False,
  90. retry_policy=None, declare=[], expiration=None, **properties):
  91. """Publish message to the specified exchange.
  92. :param body: Message body.
  93. :keyword routing_key: Message routing key.
  94. :keyword delivery_mode: See :attr:`delivery_mode`.
  95. :keyword mandatory: Currently not supported.
  96. :keyword immediate: Currently not supported.
  97. :keyword priority: Message priority. A number between 0 and 9.
  98. :keyword content_type: Content type. Default is auto-detect.
  99. :keyword content_encoding: Content encoding. Default is auto-detect.
  100. :keyword serializer: Serializer to use. Default is auto-detect.
  101. :keyword compression: Compression method to use. Default is none.
  102. :keyword headers: Mapping of arbitrary headers to pass along
  103. with the message body.
  104. :keyword exchange: Override the exchange. Note that this exchange
  105. must have been declared.
  106. :keyword declare: Optional list of required entities that must
  107. have been declared before publishing the message. The entities
  108. will be declared using :func:`~kombu.common.maybe_declare`.
  109. :keyword retry: Retry publishing, or declaring entities if the
  110. connection is lost.
  111. :keyword retry_policy: Retry configuration, this is the keywords
  112. supported by :meth:`~kombu.Connection.ensure`.
  113. :keyword expiration: A TTL in seconds can be specified per message.
  114. Default is no expiration.
  115. :keyword \*\*properties: Additional message properties, see AMQP spec.
  116. """
  117. headers = {} if headers is None else headers
  118. retry_policy = {} if retry_policy is None else retry_policy
  119. routing_key = self.routing_key if routing_key is None else routing_key
  120. compression = self.compression if compression is None else compression
  121. exchange = exchange or self.exchange
  122. if isinstance(exchange, Exchange):
  123. delivery_mode = delivery_mode or exchange.delivery_mode
  124. exchange = exchange.name
  125. else:
  126. delivery_mode = delivery_mode or self.exchange.delivery_mode
  127. if not isinstance(delivery_mode, numbers.Integral):
  128. delivery_mode = DELIVERY_MODES[delivery_mode]
  129. properties['delivery_mode'] = delivery_mode
  130. if expiration is not None:
  131. properties['expiration'] = str(int(expiration*1000))
  132. body, content_type, content_encoding = self._prepare(
  133. body, serializer, content_type, content_encoding,
  134. compression, headers)
  135. publish = self._publish
  136. if retry:
  137. publish = self.connection.ensure(self, publish, **retry_policy)
  138. return publish(body, priority, content_type,
  139. content_encoding, headers, properties,
  140. routing_key, mandatory, immediate, exchange, declare)
  141. def _publish(self, body, priority, content_type, content_encoding,
  142. headers, properties, routing_key, mandatory,
  143. immediate, exchange, declare):
  144. channel = self.channel
  145. message = channel.prepare_message(
  146. body, priority, content_type,
  147. content_encoding, headers, properties,
  148. )
  149. if declare:
  150. maybe_declare = self.maybe_declare
  151. [maybe_declare(entity) for entity in declare]
  152. return channel.basic_publish(
  153. message,
  154. exchange=exchange, routing_key=routing_key,
  155. mandatory=mandatory, immediate=immediate,
  156. )
  157. def _get_channel(self):
  158. channel = self._channel
  159. if isinstance(channel, ChannelPromise):
  160. channel = self._channel = channel()
  161. self.exchange.revive(channel)
  162. if self.on_return:
  163. channel.events['basic_return'].add(self.on_return)
  164. return channel
  165. def _set_channel(self, channel):
  166. self._channel = channel
  167. channel = property(_get_channel, _set_channel)
  168. def revive(self, channel):
  169. """Revive the producer after connection loss."""
  170. if is_connection(channel):
  171. connection = channel
  172. self.__connection__ = connection
  173. channel = ChannelPromise(lambda: connection.default_channel)
  174. if isinstance(channel, ChannelPromise):
  175. self._channel = channel
  176. self.exchange = self.exchange(channel)
  177. else:
  178. # Channel already concrete
  179. self._channel = channel
  180. if self.on_return:
  181. self._channel.events['basic_return'].add(self.on_return)
  182. self.exchange = self.exchange(channel)
  183. if self.auto_declare:
  184. # auto_decare is not recommended as this will force
  185. # evaluation of the channel.
  186. self.declare()
  187. def __enter__(self):
  188. return self
  189. def __exit__(self, *exc_info):
  190. self.release()
  191. def release(self):
  192. pass
  193. close = release
  194. def _prepare(self, body, serializer=None, content_type=None,
  195. content_encoding=None, compression=None, headers=None):
  196. # No content_type? Then we're serializing the data internally.
  197. if not content_type:
  198. serializer = serializer or self.serializer
  199. (content_type, content_encoding,
  200. body) = dumps(body, serializer=serializer)
  201. else:
  202. # If the programmer doesn't want us to serialize,
  203. # make sure content_encoding is set.
  204. if isinstance(body, text_t):
  205. if not content_encoding:
  206. content_encoding = 'utf-8'
  207. body = body.encode(content_encoding)
  208. # If they passed in a string, we can't know anything
  209. # about it. So assume it's binary data.
  210. elif not content_encoding:
  211. content_encoding = 'binary'
  212. if compression:
  213. body, headers['compression'] = compress(body, compression)
  214. return body, content_type, content_encoding
  215. @property
  216. def connection(self):
  217. try:
  218. return self.__connection__ or self.channel.connection.client
  219. except AttributeError:
  220. pass
  221. class Consumer(object):
  222. """Message consumer.
  223. :param channel: see :attr:`channel`.
  224. :param queues: see :attr:`queues`.
  225. :keyword no_ack: see :attr:`no_ack`.
  226. :keyword auto_declare: see :attr:`auto_declare`
  227. :keyword callbacks: see :attr:`callbacks`.
  228. :keyword on_message: See :attr:`on_message`
  229. :keyword on_decode_error: see :attr:`on_decode_error`.
  230. """
  231. ContentDisallowed = ContentDisallowed
  232. #: The connection/channel to use for this consumer.
  233. channel = None
  234. #: A single :class:`~kombu.Queue`, or a list of queues to
  235. #: consume from.
  236. queues = None
  237. #: Flag for automatic message acknowledgment.
  238. #: If enabled the messages are automatically acknowledged by the
  239. #: broker. This can increase performance but means that you
  240. #: have no control of when the message is removed.
  241. #:
  242. #: Disabled by default.
  243. no_ack = None
  244. #: By default all entities will be declared at instantiation, if you
  245. #: want to handle this manually you can set this to :const:`False`.
  246. auto_declare = True
  247. #: List of callbacks called in order when a message is received.
  248. #:
  249. #: The signature of the callbacks must take two arguments:
  250. #: `(body, message)`, which is the decoded message body and
  251. #: the `Message` instance (a subclass of
  252. #: :class:`~kombu.transport.base.Message`).
  253. callbacks = None
  254. #: Optional function called whenever a message is received.
  255. #:
  256. #: When defined this function will be called instead of the
  257. #: :meth:`receive` method, and :attr:`callbacks` will be disabled.
  258. #:
  259. #: So this can be used as an alternative to :attr:`callbacks` when
  260. #: you don't want the body to be automatically decoded.
  261. #: Note that the message will still be decompressed if the message
  262. #: has the ``compression`` header set.
  263. #:
  264. #: The signature of the callback must take a single argument,
  265. #: which is the raw message object (a subclass of
  266. #: :class:`~kombu.transport.base.Message`).
  267. #:
  268. #: Also note that the ``message.body`` attribute, which is the raw
  269. #: contents of the message body, may in some cases be a read-only
  270. #: :class:`buffer` object.
  271. on_message = None
  272. #: Callback called when a message can't be decoded.
  273. #:
  274. #: The signature of the callback must take two arguments: `(message,
  275. #: exc)`, which is the message that can't be decoded and the exception
  276. #: that occurred while trying to decode it.
  277. on_decode_error = None
  278. #: List of accepted content-types.
  279. #:
  280. #: An exception will be raised if the consumer receives
  281. #: a message with an untrusted content type.
  282. #: By default all content-types are accepted, but not if
  283. #: :func:`kombu.disable_untrusted_serializers` was called,
  284. #: in which case only json is allowed.
  285. accept = None
  286. _tags = count(1) # global
  287. def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
  288. callbacks=None, on_decode_error=None, on_message=None,
  289. accept=None, tag_prefix=None):
  290. self.channel = channel
  291. self.queues = self.queues or [] if queues is None else queues
  292. self.no_ack = self.no_ack if no_ack is None else no_ack
  293. self.callbacks = (self.callbacks or [] if callbacks is None
  294. else callbacks)
  295. self.on_message = on_message
  296. self.tag_prefix = tag_prefix
  297. self._active_tags = {}
  298. if auto_declare is not None:
  299. self.auto_declare = auto_declare
  300. if on_decode_error is not None:
  301. self.on_decode_error = on_decode_error
  302. self.accept = prepare_accept_content(accept)
  303. if self.channel:
  304. self.revive(self.channel)
  305. def revive(self, channel):
  306. """Revive consumer after connection loss."""
  307. self._active_tags.clear()
  308. channel = self.channel = maybe_channel(channel)
  309. self.queues = [queue(self.channel)
  310. for queue in maybe_list(self.queues)]
  311. for queue in self.queues:
  312. queue.revive(channel)
  313. if self.auto_declare:
  314. self.declare()
  315. def declare(self):
  316. """Declare queues, exchanges and bindings.
  317. This is done automatically at instantiation if :attr:`auto_declare`
  318. is set.
  319. """
  320. for queue in self.queues:
  321. queue.declare()
  322. def register_callback(self, callback):
  323. """Register a new callback to be called when a message
  324. is received.
  325. The signature of the callback needs to accept two arguments:
  326. `(body, message)`, which is the decoded message body
  327. and the `Message` instance (a subclass of
  328. :class:`~kombu.transport.base.Message`.
  329. """
  330. self.callbacks.append(callback)
  331. def __enter__(self):
  332. self.consume()
  333. return self
  334. def __exit__(self, *exc_info):
  335. try:
  336. self.cancel()
  337. except Exception:
  338. pass
  339. def add_queue(self, queue):
  340. """Add a queue to the list of queues to consume from.
  341. This will not start consuming from the queue,
  342. for that you will have to call :meth:`consume` after.
  343. """
  344. queue = queue(self.channel)
  345. if self.auto_declare:
  346. queue.declare()
  347. self.queues.append(queue)
  348. return queue
  349. def add_queue_from_dict(self, queue, **options):
  350. """This method is deprecated.
  351. Instead please use::
  352. consumer.add_queue(Queue.from_dict(d))
  353. """
  354. return self.add_queue(Queue.from_dict(queue, **options))
  355. def consume(self, no_ack=None):
  356. """Start consuming messages.
  357. Can be called multiple times, but note that while it
  358. will consume from new queues added since the last call,
  359. it will not cancel consuming from removed queues (
  360. use :meth:`cancel_by_queue`).
  361. :param no_ack: See :attr:`no_ack`.
  362. """
  363. if self.queues:
  364. no_ack = self.no_ack if no_ack is None else no_ack
  365. H, T = self.queues[:-1], self.queues[-1]
  366. for queue in H:
  367. self._basic_consume(queue, no_ack=no_ack, nowait=True)
  368. self._basic_consume(T, no_ack=no_ack, nowait=False)
  369. def cancel(self):
  370. """End all active queue consumers.
  371. This does not affect already delivered messages, but it does
  372. mean the server will not send any more messages for this consumer.
  373. """
  374. cancel = self.channel.basic_cancel
  375. for tag in values(self._active_tags):
  376. cancel(tag)
  377. self._active_tags.clear()
  378. close = cancel
  379. def cancel_by_queue(self, queue):
  380. """Cancel consumer by queue name."""
  381. try:
  382. tag = self._active_tags.pop(queue)
  383. except KeyError:
  384. pass
  385. else:
  386. self.queues[:] = [q for q in self.queues if q.name != queue]
  387. self.channel.basic_cancel(tag)
  388. def consuming_from(self, queue):
  389. """Return :const:`True` if the consumer is currently
  390. consuming from queue'."""
  391. name = queue
  392. if isinstance(queue, Queue):
  393. name = queue.name
  394. return name in self._active_tags
  395. def purge(self):
  396. """Purge messages from all queues.
  397. .. warning::
  398. This will *delete all ready messages*, there is no
  399. undo operation.
  400. """
  401. return sum(queue.purge() for queue in self.queues)
  402. def flow(self, active):
  403. """Enable/disable flow from peer.
  404. This is a simple flow-control mechanism that a peer can use
  405. to avoid overflowing its queues or otherwise finding itself
  406. receiving more messages than it can process.
  407. The peer that receives a request to stop sending content
  408. will finish sending the current content (if any), and then wait
  409. until flow is reactivated.
  410. """
  411. self.channel.flow(active)
  412. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  413. """Specify quality of service.
  414. The client can request that messages should be sent in
  415. advance so that when the client finishes processing a message,
  416. the following message is already held locally, rather than needing
  417. to be sent down the channel. Prefetching gives a performance
  418. improvement.
  419. The prefetch window is Ignored if the :attr:`no_ack` option is set.
  420. :param prefetch_size: Specify the prefetch window in octets.
  421. The server will send a message in advance if it is equal to
  422. or smaller in size than the available prefetch size (and
  423. also falls within other prefetch limits). May be set to zero,
  424. meaning "no specific limit", although other prefetch limits
  425. may still apply.
  426. :param prefetch_count: Specify the prefetch window in terms of
  427. whole messages.
  428. :param apply_global: Apply new settings globally on all channels.
  429. """
  430. return self.channel.basic_qos(prefetch_size,
  431. prefetch_count,
  432. apply_global)
  433. def recover(self, requeue=False):
  434. """Redeliver unacknowledged messages.
  435. Asks the broker to redeliver all unacknowledged messages
  436. on the specified channel.
  437. :keyword requeue: By default the messages will be redelivered
  438. to the original recipient. With `requeue` set to true, the
  439. server will attempt to requeue the message, potentially then
  440. delivering it to an alternative subscriber.
  441. """
  442. return self.channel.basic_recover(requeue=requeue)
  443. def receive(self, body, message):
  444. """Method called when a message is received.
  445. This dispatches to the registered :attr:`callbacks`.
  446. :param body: The decoded message body.
  447. :param message: The `Message` instance.
  448. :raises NotImplementedError: If no consumer callbacks have been
  449. registered.
  450. """
  451. callbacks = self.callbacks
  452. if not callbacks:
  453. raise NotImplementedError('Consumer does not have any callbacks')
  454. [callback(body, message) for callback in callbacks]
  455. def _basic_consume(self, queue, consumer_tag=None,
  456. no_ack=no_ack, nowait=True):
  457. tag = self._active_tags.get(queue.name)
  458. if tag is None:
  459. tag = self._add_tag(queue, consumer_tag)
  460. queue.consume(tag, self._receive_callback,
  461. no_ack=no_ack, nowait=nowait)
  462. return tag
  463. def _add_tag(self, queue, consumer_tag=None):
  464. tag = consumer_tag or '{0}{1}'.format(
  465. self.tag_prefix, next(self._tags))
  466. self._active_tags[queue.name] = tag
  467. return tag
  468. def _receive_callback(self, message):
  469. accept = self.accept
  470. on_m, channel, decoded = self.on_message, self.channel, None
  471. try:
  472. m2p = getattr(channel, 'message_to_python', None)
  473. if m2p:
  474. message = m2p(message)
  475. if accept is not None:
  476. message.accept = accept
  477. if message.errors:
  478. return message._reraise_error(self.on_decode_error)
  479. decoded = None if on_m else message.decode()
  480. except Exception as exc:
  481. if not self.on_decode_error:
  482. raise
  483. self.on_decode_error(message, exc)
  484. else:
  485. return on_m(message) if on_m else self.receive(decoded, message)
  486. def __repr__(self):
  487. return '<Consumer: {0.queues}>'.format(self)
  488. @property
  489. def connection(self):
  490. try:
  491. return self.channel.connection.client
  492. except AttributeError:
  493. pass