You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

protocol.py 84KB


  1. ###############################################################################
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (c) Crossbar.io Technologies GmbH
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy
  8. # of this software and associated documentation files (the "Software"), to deal
  9. # in the Software without restriction, including without limitation the rights
  10. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. # copies of the Software, and to permit persons to whom the Software is
  12. # furnished to do so, subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in
  15. # all copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. # THE SOFTWARE.
  24. #
  25. ###############################################################################
  26. from __future__ import absolute_import
  27. import six
  28. import txaio
  29. import inspect
  30. from functools import reduce
  31. from autobahn import wamp
  32. from autobahn.util import public, IdGenerator, ObservableMixin
  33. from autobahn.wamp import uri
  34. from autobahn.wamp import message
  35. from autobahn.wamp import types
  36. from autobahn.wamp import role
  37. from autobahn.wamp import exception
  38. from autobahn.wamp.exception import ApplicationError, ProtocolError, SessionNotReady, SerializationError
  39. from autobahn.wamp.interfaces import ISession, IPayloadCodec, IAuthenticator # noqa
  40. from autobahn.wamp.types import SessionDetails, CloseDetails, EncodedPayload
  41. from autobahn.exception import PayloadExceededError
  42. from autobahn.wamp.request import \
  43. Publication, \
  44. Subscription, \
  45. Handler, \
  46. Registration, \
  47. Endpoint, \
  48. PublishRequest, \
  49. SubscribeRequest, \
  50. UnsubscribeRequest, \
  51. CallRequest, \
  52. InvocationRequest, \
  53. RegisterRequest, \
  54. UnregisterRequest
  55. def is_method_or_function(f):
  56. return inspect.ismethod(f) or inspect.isfunction(f)
  57. class BaseSession(ObservableMixin):
  58. """
  59. WAMP session base class.
  60. This class implements :class:`autobahn.wamp.interfaces.ISession`.
  61. """
  62. log = txaio.make_logger()
  63. def __init__(self):
  64. """
  65. """
  66. self.set_valid_events(
  67. valid_events=[
  68. 'join', # right before onJoin runs
  69. 'leave', # after onLeave has run
  70. 'ready', # after onJoin and all 'join' listeners have completed
  71. 'connect', # right before onConnect
  72. 'disconnect', # right after onDisconnect
  73. ]
  74. )
  75. # this is for marshalling traceback from exceptions thrown in user
  76. # code within WAMP error messages (that is, when invoking remoted
  77. # procedures)
  78. self.traceback_app = False
  79. # mapping of exception classes to WAMP error URIs
  80. self._ecls_to_uri_pat = {}
  81. # mapping of WAMP error URIs to exception classes
  82. self._uri_to_ecls = {
  83. ApplicationError.INVALID_PAYLOAD: SerializationError,
  84. ApplicationError.PAYLOAD_SIZE_EXCEEDED: PayloadExceededError,
  85. }
  86. # session authentication information
  87. self._realm = None
  88. self._session_id = None
  89. self._authid = None
  90. self._authrole = None
  91. self._authmethod = None
  92. self._authprovider = None
  93. # payload transparency codec
  94. self._payload_codec = None
  95. # generator for WAMP request IDs
  96. self._request_id_gen = IdGenerator()
  97. @property
  98. def realm(self):
  99. return self._realm
  100. @property
  101. def session_id(self):
  102. return self._session_id
  103. @property
  104. def authid(self):
  105. return self._authid
  106. @property
  107. def authrole(self):
  108. return self._authrole
  109. @property
  110. def authmethod(self):
  111. return self._authmethod
  112. @property
  113. def authprovider(self):
  114. return self._authprovider
  115. def define(self, exception, error=None):
  116. """
  117. Implements :func:`autobahn.wamp.interfaces.ISession.define`
  118. """
  119. if error is None:
  120. assert(hasattr(exception, '_wampuris'))
  121. self._ecls_to_uri_pat[exception] = exception._wampuris
  122. self._uri_to_ecls[exception._wampuris[0].uri()] = exception
  123. else:
  124. assert(not hasattr(exception, '_wampuris'))
  125. self._ecls_to_uri_pat[exception] = [uri.Pattern(six.u(error), uri.Pattern.URI_TARGET_HANDLER)]
  126. self._uri_to_ecls[six.u(error)] = exception
  127. def _message_from_exception(self, request_type, request, exc, tb=None, enc_algo=None):
  128. """
  129. Create a WAMP error message from an exception.
  130. :param request_type: The request type this WAMP error message is for.
  131. :type request_type: int
  132. :param request: The request ID this WAMP error message is for.
  133. :type request: int
  134. :param exc: The exception.
  135. :type exc: Instance of :class:`Exception` or subclass thereof.
  136. :param tb: Optional traceback. If present, it'll be included with the WAMP error message.
  137. :type tb: list or None
  138. """
  139. args = None
  140. if hasattr(exc, 'args'):
  141. args = list(exc.args) # make sure tuples are made into lists
  142. kwargs = None
  143. if hasattr(exc, 'kwargs'):
  144. kwargs = exc.kwargs
  145. if kwargs and six.PY2:
  146. kwargs = {
  147. k.decode('utf8'): v
  148. for k, v in kwargs.iteritems()
  149. }
  150. if tb:
  151. if kwargs:
  152. kwargs[u'traceback'] = tb
  153. else:
  154. kwargs = {u'traceback': tb}
  155. if isinstance(exc, exception.ApplicationError):
  156. error = exc.error if type(exc.error) == six.text_type else six.u(exc.error)
  157. else:
  158. if exc.__class__ in self._ecls_to_uri_pat:
  159. error = self._ecls_to_uri_pat[exc.__class__][0]._uri
  160. else:
  161. error = u"wamp.error.runtime_error"
  162. encoded_payload = None
  163. if self._payload_codec:
  164. encoded_payload = self._payload_codec.encode(False, error, args, kwargs)
  165. if encoded_payload:
  166. msg = message.Error(request_type,
  167. request,
  168. error,
  169. payload=encoded_payload.payload,
  170. enc_algo=encoded_payload.enc_algo,
  171. enc_key=encoded_payload.enc_key,
  172. enc_serializer=encoded_payload.enc_serializer)
  173. else:
  174. msg = message.Error(request_type,
  175. request,
  176. error,
  177. args,
  178. kwargs)
  179. return msg
  180. def _exception_from_message(self, msg):
  181. """
  182. Create a user (or generic) exception from a WAMP error message.
  183. :param msg: A WAMP error message.
  184. :type msg: instance of :class:`autobahn.wamp.message.Error`
  185. """
  186. # FIXME:
  187. # 1. map to ecls based on error URI wildcard/prefix
  188. # 2. extract additional args/kwargs from error URI
  189. exc = None
  190. enc_err = None
  191. if msg.enc_algo:
  192. if not self._payload_codec:
  193. log_msg = u"received encoded payload, but no payload codec active"
  194. self.log.warn(log_msg)
  195. enc_err = ApplicationError(ApplicationError.ENC_NO_PAYLOAD_CODEC, log_msg, enc_algo=msg.enc_algo)
  196. else:
  197. try:
  198. encoded_payload = EncodedPayload(msg.payload, msg.enc_algo, msg.enc_serializer, msg.enc_key)
  199. decrypted_error, msg.args, msg.kwargs = self._payload_codec.decode(True, msg.error, encoded_payload)
  200. except Exception as e:
  201. self.log.warn("failed to decrypt application payload 1: {err}", err=e)
  202. enc_err = ApplicationError(
  203. ApplicationError.ENC_DECRYPT_ERROR,
  204. u"failed to decrypt application payload 1: {}".format(e),
  205. enc_algo=msg.enc_algo,
  206. )
  207. else:
  208. if msg.error != decrypted_error:
  209. self.log.warn(
  210. u"URI within encrypted payload ('{decrypted_error}') does not match the envelope ('{error}')",
  211. decrypted_error=decrypted_error,
  212. error=msg.error,
  213. )
  214. enc_err = ApplicationError(
  215. ApplicationError.ENC_TRUSTED_URI_MISMATCH,
  216. u"URI within encrypted payload ('{}') does not match the envelope ('{}')".format(decrypted_error, msg.error),
  217. enc_algo=msg.enc_algo,
  218. )
  219. if enc_err:
  220. return enc_err
  221. if msg.error in self._uri_to_ecls:
  222. ecls = self._uri_to_ecls[msg.error]
  223. try:
  224. # the following might fail, eg. TypeError when
  225. # signature of exception constructor is incompatible
  226. # with args/kwargs or when the exception constructor raises
  227. if msg.kwargs:
  228. if msg.args:
  229. exc = ecls(*msg.args, **msg.kwargs)
  230. else:
  231. exc = ecls(**msg.kwargs)
  232. else:
  233. if msg.args:
  234. exc = ecls(*msg.args)
  235. else:
  236. exc = ecls()
  237. except Exception:
  238. try:
  239. self.onUserError(
  240. txaio.create_failure(),
  241. "While re-constructing exception",
  242. )
  243. except:
  244. pass
  245. if not exc:
  246. # the following ctor never fails ..
  247. if msg.kwargs:
  248. if msg.args:
  249. exc = exception.ApplicationError(msg.error, *msg.args, **msg.kwargs)
  250. else:
  251. exc = exception.ApplicationError(msg.error, **msg.kwargs)
  252. else:
  253. if msg.args:
  254. exc = exception.ApplicationError(msg.error, *msg.args)
  255. else:
  256. exc = exception.ApplicationError(msg.error)
  257. # FIXME: cleanup and integate into ctors above
  258. if hasattr(exc, 'enc_algo'):
  259. exc.enc_algo = msg.enc_algo
  260. if hasattr(exc, 'callee'):
  261. exc.callee = msg.callee
  262. if hasattr(exc, 'callee_authid'):
  263. exc.callee_authid = msg.callee_authid
  264. if hasattr(exc, 'callee_authrole'):
  265. exc.callee_authrole = msg.callee_authrole
  266. if hasattr(exc, 'forward_for'):
  267. exc.forward_for = msg.forward_for
  268. return exc
  269. @public
  270. class ApplicationSession(BaseSession):
  271. """
  272. WAMP endpoint session.
  273. """
  274. def __init__(self, config=None):
  275. """
  276. Implements :func:`autobahn.wamp.interfaces.ISession`
  277. """
  278. BaseSession.__init__(self)
  279. self.config = config or types.ComponentConfig(realm=u"realm1")
  280. # set client role features supported and announced
  281. self._session_roles = role.DEFAULT_CLIENT_ROLES
  282. self._transport = None
  283. self._session_id = None
  284. self._realm = None
  285. self._goodbye_sent = False
  286. self._transport_is_closing = False
  287. # outstanding requests
  288. self._publish_reqs = {}
  289. self._subscribe_reqs = {}
  290. self._unsubscribe_reqs = {}
  291. self._call_reqs = {}
  292. self._register_reqs = {}
  293. self._unregister_reqs = {}
  294. # subscriptions in place
  295. self._subscriptions = {}
  296. # registrations in place
  297. self._registrations = {}
  298. # incoming invocations
  299. self._invocations = {}
  300. @public
  301. def set_payload_codec(self, payload_codec):
  302. """
  303. Implements :func:`autobahn.wamp.interfaces.ISession.set_payload_codec`
  304. """
  305. assert(payload_codec is None or isinstance(payload_codec, IPayloadCodec))
  306. self._payload_codec = payload_codec
  307. @public
  308. def get_payload_codec(self):
  309. """
  310. Implements :func:`autobahn.wamp.interfaces.ISession.get_payload_codec`
  311. """
  312. return self._payload_codec
  313. @public
  314. def onOpen(self, transport):
  315. """
  316. Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen`
  317. """
  318. self._transport = transport
  319. d = self.fire('connect', self, transport)
  320. txaio.add_callbacks(
  321. d, None,
  322. lambda fail: self._swallow_error(fail, "While notifying 'connect'")
  323. )
  324. txaio.add_callbacks(
  325. d,
  326. lambda _: txaio.as_future(self.onConnect),
  327. lambda fail: self._swallow_error(fail, "While calling 'onConnect'")
  328. )
  329. @public
  330. def onConnect(self):
  331. """
  332. Implements :func:`autobahn.wamp.interfaces.ISession.onConnect`
  333. """
  334. self.join(self.config.realm)
  335. @public
  336. def join(self,
  337. realm,
  338. authmethods=None,
  339. authid=None,
  340. authrole=None,
  341. authextra=None,
  342. resumable=None,
  343. resume_session=None,
  344. resume_token=None):
  345. """
  346. Implements :func:`autobahn.wamp.interfaces.ISession.join`
  347. """
  348. assert(realm is None or type(realm) == six.text_type)
  349. assert(authmethods is None or type(authmethods) == list)
  350. if type(authmethods) == list:
  351. for authmethod in authmethods:
  352. assert(type(authmethod) == six.text_type)
  353. assert(authid is None or type(authid) == six.text_type)
  354. assert(authrole is None or type(authrole) == six.text_type)
  355. assert(authextra is None or type(authextra) == dict)
  356. if self._session_id:
  357. raise Exception("already joined")
  358. # store the realm requested by client, though this might be overwritten later,
  359. # when realm redirection kicks in
  360. self._realm = realm
  361. # closing handshake state
  362. self._goodbye_sent = False
  363. # send HELLO message to router
  364. msg = message.Hello(realm=realm,
  365. roles=self._session_roles,
  366. authmethods=authmethods,
  367. authid=authid,
  368. authrole=authrole,
  369. authextra=authextra,
  370. resumable=resumable,
  371. resume_session=resume_session,
  372. resume_token=resume_token)
  373. self._transport.send(msg)
  374. @public
  375. def disconnect(self):
  376. """
  377. Implements :func:`autobahn.wamp.interfaces.ISession.disconnect`
  378. """
  379. if self._transport:
  380. self._transport.close()
  381. @public
  382. def is_connected(self):
  383. """
  384. Implements :func:`autobahn.wamp.interfaces.ISession.is_connected`
  385. """
  386. return self._transport is not None
  387. @public
  388. def is_attached(self):
  389. """
  390. Implements :func:`autobahn.wamp.interfaces.ISession.is_attached`
  391. """
  392. return self._transport is not None and self._session_id is not None
  393. @public
  394. def onUserError(self, fail, msg):
  395. """
  396. Implements :func:`autobahn.wamp.interfaces.ISession.onUserError`
  397. """
  398. if isinstance(fail.value, exception.ApplicationError):
  399. self.log.warn('{klass}.onUserError(): "{msg}"',
  400. klass=self.__class__.__name__,
  401. msg=fail.value.error_message())
  402. else:
  403. self.log.error(
  404. '{klass}.onUserError(): "{msg}"\n{traceback}',
  405. klass=self.__class__.__name__,
  406. msg=msg,
  407. traceback=txaio.failure_format_traceback(fail),
  408. )
  409. def _swallow_error(self, fail, msg):
  410. '''
  411. This is an internal generic error-handler for errors encountered
  412. when calling down to on*() handlers that can reasonably be
  413. expected to be overridden in user code.
  414. Note that it *cancels* the error, so use with care!
  415. Specifically, this should *never* be added to the errback
  416. chain for a Deferred/coroutine that will make it out to user
  417. code.
  418. '''
  419. try:
  420. self.onUserError(fail, msg)
  421. except Exception:
  422. self.log.error(
  423. "Internal error: {tb}",
  424. tb=txaio.failure_format_traceback(txaio.create_failure()),
  425. )
  426. return None
  427. def onMessage(self, msg):
  428. """
  429. Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage`
  430. """
  431. if self._session_id is None:
  432. # the first message must be WELCOME, ABORT or CHALLENGE ..
  433. if isinstance(msg, message.Welcome):
  434. # before we let user code see the session -- that is,
  435. # before we fire "join" -- we give authentication
  436. # instances a chance to abort the session. Usually
  437. # this would be for "mutual authentication"
  438. # scenarios. For example, WAMP-SCRAM uses this to
  439. # confirm the server-signature
  440. d = txaio.as_future(self.onWelcome, msg)
  441. def success(res):
  442. if res is not None:
  443. self.log.info("Session denied by onWelcome")
  444. reply = message.Abort(
  445. u"wamp.error.cannot_authenticate", u"{0}".format(res)
  446. )
  447. self._transport.send(reply)
  448. return
  449. if msg.realm:
  450. self._realm = msg.realm
  451. self._session_id = msg.session
  452. self._authid = msg.authid
  453. self._authrole = msg.authrole
  454. self._authmethod = msg.authmethod
  455. self._authprovider = msg.authprovider
  456. self._router_roles = msg.roles
  457. details = SessionDetails(
  458. realm=self._realm,
  459. session=self._session_id,
  460. authid=self._authid,
  461. authrole=self._authrole,
  462. authmethod=self._authmethod,
  463. authprovider=self._authprovider,
  464. authextra=msg.authextra,
  465. serializer=self._transport._serializer.SERIALIZER_ID,
  466. resumed=msg.resumed,
  467. resumable=msg.resumable,
  468. resume_token=msg.resume_token,
  469. )
  470. # firing 'join' *before* running onJoin, so that
  471. # the idiom where you "do stuff" in onJoin --
  472. # possibly including self.leave() -- works
  473. # properly. Besides, there's "ready" that fires
  474. # after 'join' and onJoin have all completed...
  475. d = self.fire('join', self, details)
  476. # add a logging errback first, which will ignore any
  477. # errors from fire()
  478. txaio.add_callbacks(
  479. d, None,
  480. lambda e: self._swallow_error(e, "While notifying 'join'")
  481. )
  482. # this should run regardless
  483. txaio.add_callbacks(
  484. d,
  485. lambda _: txaio.as_future(self.onJoin, details),
  486. None
  487. )
  488. # ignore any errors from onJoin (XXX or, should that be fatal?)
  489. txaio.add_callbacks(
  490. d, None,
  491. lambda e: self._swallow_error(e, "While firing onJoin")
  492. )
  493. # this instance is now "ready"...
  494. txaio.add_callbacks(
  495. d,
  496. lambda _: self.fire('ready', self),
  497. None
  498. )
  499. # ignore any errors from 'ready'
  500. txaio.add_callbacks(
  501. d, None,
  502. lambda e: self._swallow_error(e, "While notifying 'ready'")
  503. )
  504. def error(e):
  505. reply = message.Abort(
  506. u"wamp.error.cannot_authenticate", u"Error calling onWelcome handler"
  507. )
  508. self._transport.send(reply)
  509. return self._swallow_error(e, "While firing onWelcome")
  510. txaio.add_callbacks(d, success, error)
  511. elif isinstance(msg, message.Abort):
  512. # fire callback and close the transport
  513. details = types.CloseDetails(msg.reason, msg.message)
  514. d = txaio.as_future(self.onLeave, details)
  515. def success(arg):
  516. # XXX also: handle async
  517. d = self.fire('leave', self, details)
  518. def return_arg(_):
  519. return arg
  520. def _error(e):
  521. return self._swallow_error(e, "While firing 'leave' event")
  522. txaio.add_callbacks(d, return_arg, _error)
  523. return d
  524. def _error(e):
  525. return self._swallow_error(e, "While firing onLeave")
  526. txaio.add_callbacks(d, success, _error)
  527. elif isinstance(msg, message.Challenge):
  528. challenge = types.Challenge(msg.method, msg.extra)
  529. d = txaio.as_future(self.onChallenge, challenge)
  530. def success(signature):
  531. if signature is None:
  532. raise Exception('onChallenge user callback did not return a signature')
  533. if type(signature) == six.binary_type:
  534. signature = signature.decode('utf8')
  535. if type(signature) != six.text_type:
  536. raise Exception('signature must be unicode (was {})'.format(type(signature)))
  537. reply = message.Authenticate(signature)
  538. self._transport.send(reply)
  539. def error(err):
  540. self.onUserError(err, "Authentication failed")
  541. reply = message.Abort(u"wamp.error.cannot_authenticate", u"{0}".format(err.value))
  542. self._transport.send(reply)
  543. # fire callback and close the transport
  544. details = types.CloseDetails(reply.reason, reply.message)
  545. d = txaio.as_future(self.onLeave, details)
  546. def success(arg):
  547. # XXX also: handle async
  548. self.fire('leave', self, details)
  549. return arg
  550. def _error(e):
  551. return self._swallow_error(e, "While firing onLeave")
  552. txaio.add_callbacks(d, success, _error)
  553. # switching to the callback chain, effectively
  554. # cancelling error (which we've now handled)
  555. return d
  556. txaio.add_callbacks(d, success, error)
  557. else:
  558. raise ProtocolError("Received {0} message, and session is not yet established".format(msg.__class__))
  559. else:
  560. # self._session_id != None (aka "session established")
  561. if isinstance(msg, message.Goodbye):
  562. if not self._goodbye_sent:
  563. # the peer wants to close: send GOODBYE reply
  564. reply = message.Goodbye()
  565. self._transport.send(reply)
  566. self._session_id = None
  567. # fire callback and close the transport
  568. details = types.CloseDetails(msg.reason, msg.message)
  569. d = txaio.as_future(self.onLeave, details)
  570. def success(arg):
  571. # XXX also: handle async
  572. self.fire('leave', self, details)
  573. return arg
  574. def _error(e):
  575. errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message)
  576. return self._swallow_error(e, errmsg)
  577. txaio.add_callbacks(d, success, _error)
  578. elif isinstance(msg, message.Event):
  579. if msg.subscription in self._subscriptions:
  580. # fire all event handlers on subscription ..
  581. for subscription in self._subscriptions[msg.subscription]:
  582. handler = subscription.handler
  583. topic = msg.topic or subscription.topic
  584. if msg.enc_algo:
  585. # FIXME: behavior in error cases (no keyring, decrypt issues, URI mismatch, ..)
  586. if not self._payload_codec:
  587. self.log.warn("received encoded payload with enc_algo={enc_algo}, but no payload codec active - ignoring encoded payload!", enc_algo=msg.enc_algo)
  588. return
  589. else:
  590. try:
  591. encoded_payload = EncodedPayload(msg.payload, msg.enc_algo, msg.enc_serializer, msg.enc_key)
  592. decoded_topic, msg.args, msg.kwargs = self._payload_codec.decode(False, topic, encoded_payload)
  593. except Exception as e:
  594. self.log.warn("failed to decode application payload encoded with enc_algo={enc_algo}: {error}", error=e, enc_algo=msg.enc_algo)
  595. return
  596. else:
  597. if topic != decoded_topic:
  598. self.log.warn("envelope topic URI does not match encoded one")
  599. return
  600. invoke_args = (handler.obj,) if handler.obj else tuple()
  601. if msg.args:
  602. invoke_args = invoke_args + tuple(msg.args)
  603. invoke_kwargs = msg.kwargs if msg.kwargs else dict()
  604. if handler.details_arg:
  605. invoke_kwargs[handler.details_arg] = types.EventDetails(subscription, msg.publication, publisher=msg.publisher, publisher_authid=msg.publisher_authid, publisher_authrole=msg.publisher_authrole, topic=topic, retained=msg.retained, enc_algo=msg.enc_algo, forward_for=msg.forward_for)
  606. # FIXME: https://github.com/crossbario/autobahn-python/issues/764
  607. def _success(_):
  608. # Acknowledged Events -- only if we got the details header and
  609. # the broker advertised it
  610. if msg.x_acknowledged_delivery and self._router_roles["broker"].x_acknowledged_event_delivery:
  611. if self._transport:
  612. response = message.EventReceived(msg.publication)
  613. self._transport.send(response)
  614. else:
  615. self.log.warn("successfully processed event with acknowledged delivery, but could not send ACK, since the transport was lost in the meantime")
  616. def _error(e):
  617. errmsg = 'While firing {0} subscribed under {1}.'.format(
  618. handler.fn, msg.subscription)
  619. return self._swallow_error(e, errmsg)
  620. future = txaio.as_future(handler.fn, *invoke_args, **invoke_kwargs)
  621. txaio.add_callbacks(future, _success, _error)
  622. else:
  623. raise ProtocolError("EVENT received for non-subscribed subscription ID {0}".format(msg.subscription))
  624. elif isinstance(msg, message.Published):
  625. if msg.request in self._publish_reqs:
  626. # get and pop outstanding publish request
  627. publish_request = self._publish_reqs.pop(msg.request)
  628. # create a new publication object
  629. publication = Publication(msg.publication, was_encrypted=publish_request.was_encrypted)
  630. # resolve deferred/future for publishing successfully
  631. txaio.resolve(publish_request.on_reply, publication)
  632. else:
  633. raise ProtocolError("PUBLISHED received for non-pending request ID {0}".format(msg.request))
  634. elif isinstance(msg, message.Subscribed):
  635. if msg.request in self._subscribe_reqs:
  636. # get and pop outstanding subscribe request
  637. request = self._subscribe_reqs.pop(msg.request)
  638. # create new handler subscription list for subscription ID if not yet tracked
  639. if msg.subscription not in self._subscriptions:
  640. self._subscriptions[msg.subscription] = []
  641. subscription = Subscription(msg.subscription, request.topic, self, request.handler)
  642. # add handler to existing subscription
  643. self._subscriptions[msg.subscription].append(subscription)
  644. # resolve deferred/future for subscribing successfully
  645. txaio.resolve(request.on_reply, subscription)
  646. else:
  647. raise ProtocolError("SUBSCRIBED received for non-pending request ID {0}".format(msg.request))
  648. elif isinstance(msg, message.Unsubscribed):
  649. if msg.request in self._unsubscribe_reqs:
  650. # get and pop outstanding subscribe request
  651. request = self._unsubscribe_reqs.pop(msg.request)
  652. # if the subscription still exists, mark as inactive and remove ..
  653. if request.subscription_id in self._subscriptions:
  654. for subscription in self._subscriptions[request.subscription_id]:
  655. subscription.active = False
  656. del self._subscriptions[request.subscription_id]
  657. # resolve deferred/future for unsubscribing successfully
  658. txaio.resolve(request.on_reply, 0)
  659. else:
  660. raise ProtocolError("UNSUBSCRIBED received for non-pending request ID {0}".format(msg.request))
  661. elif isinstance(msg, message.Result):
  662. if msg.request in self._call_reqs:
  663. call_request = self._call_reqs[msg.request]
  664. proc = call_request.procedure
  665. enc_err = None
  666. if msg.enc_algo:
  667. if not self._payload_codec:
  668. log_msg = u"received encoded payload, but no payload codec active"
  669. self.log.warn(log_msg)
  670. enc_err = ApplicationError(ApplicationError.ENC_NO_PAYLOAD_CODEC, log_msg)
  671. else:
  672. try:
  673. encoded_payload = EncodedPayload(msg.payload, msg.enc_algo, msg.enc_serializer, msg.enc_key)
  674. decrypted_proc, msg.args, msg.kwargs = self._payload_codec.decode(True, proc, encoded_payload)
  675. except Exception as e:
  676. self.log.warn(
  677. "failed to decrypt application payload 1: {err}",
  678. err=e,
  679. )
  680. enc_err = ApplicationError(
  681. ApplicationError.ENC_DECRYPT_ERROR,
  682. u"failed to decrypt application payload 1: {}".format(e),
  683. )
  684. else:
  685. if proc != decrypted_proc:
  686. self.log.warn(
  687. "URI within encrypted payload ('{decrypted_proc}') does not match the envelope ('{proc}')",
  688. decrypted_proc=decrypted_proc,
  689. proc=proc,
  690. )
  691. enc_err = ApplicationError(
  692. ApplicationError.ENC_TRUSTED_URI_MISMATCH,
  693. u"URI within encrypted payload ('{}') does not match the envelope ('{}')".format(decrypted_proc, proc),
  694. )
  695. if msg.progress:
  696. # process progressive call result
  697. if call_request.options.on_progress:
  698. if enc_err:
  699. self.onUserError(enc_err, "could not deliver progressive call result, because payload decryption failed")
  700. else:
  701. kw = msg.kwargs or dict()
  702. args = msg.args or tuple()
  703. def _error(fail):
  704. self.onUserError(fail, "While firing on_progress")
  705. if call_request.options and call_request.options.details:
  706. prog_d = txaio.as_future(call_request.options.on_progress,
  707. types.CallResult(*msg.args,
  708. callee=msg.callee,
  709. callee_authid=msg.callee_authid,
  710. callee_authrole=msg.callee_authrole,
  711. forward_for=msg.forward_for,
  712. **msg.kwargs))
  713. else:
  714. prog_d = txaio.as_future(call_request.options.on_progress,
  715. *args,
  716. **kw)
  717. txaio.add_callbacks(prog_d, None, _error)
  718. else:
  719. # process final call result
  720. # drop original request
  721. del self._call_reqs[msg.request]
  722. # user callback that gets fired
  723. on_reply = call_request.on_reply
  724. # above might already have rejected, so we guard ..
  725. if enc_err:
  726. txaio.reject(on_reply, enc_err)
  727. else:
  728. if msg.kwargs or (call_request.options and call_request.options.details):
  729. kwargs = msg.kwargs or {}
  730. if msg.args:
  731. res = types.CallResult(*msg.args,
  732. callee=msg.callee,
  733. callee_authid=msg.callee_authid,
  734. callee_authrole=msg.callee_authrole,
  735. forward_for=msg.forward_for,
  736. **kwargs)
  737. else:
  738. res = types.CallResult(callee=msg.callee,
  739. callee_authid=msg.callee_authid,
  740. callee_authrole=msg.callee_authrole,
  741. forward_for=msg.forward_for,
  742. **kwargs)
  743. txaio.resolve(on_reply, res)
  744. else:
  745. if msg.args:
  746. if len(msg.args) > 1:
  747. res = types.CallResult(*msg.args)
  748. txaio.resolve(on_reply, res)
  749. else:
  750. txaio.resolve(on_reply, msg.args[0])
  751. else:
  752. txaio.resolve(on_reply, None)
  753. else:
  754. raise ProtocolError("RESULT received for non-pending request ID {0}".format(msg.request))
  755. elif isinstance(msg, message.Invocation):
  756. if msg.request in self._invocations:
  757. raise ProtocolError("INVOCATION received for request ID {0} already invoked".format(msg.request))
  758. else:
  759. if msg.registration not in self._registrations:
  760. raise ProtocolError("INVOCATION received for non-registered registration ID {0}".format(msg.registration))
  761. else:
  762. registration = self._registrations[msg.registration]
  763. endpoint = registration.endpoint
  764. proc = msg.procedure or registration.procedure
  765. enc_err = None
  766. if msg.enc_algo:
  767. if not self._payload_codec:
  768. log_msg = u"received encrypted INVOCATION payload, but no keyring active"
  769. self.log.warn(log_msg)
  770. enc_err = ApplicationError(ApplicationError.ENC_NO_PAYLOAD_CODEC, log_msg)
  771. else:
  772. try:
  773. encoded_payload = EncodedPayload(msg.payload, msg.enc_algo, msg.enc_serializer, msg.enc_key)
  774. decrypted_proc, msg.args, msg.kwargs = self._payload_codec.decode(False, proc, encoded_payload)
  775. except Exception as e:
  776. self.log.warn(
  777. "failed to decrypt INVOCATION payload: {err}",
  778. err=e,
  779. )
  780. enc_err = ApplicationError(
  781. ApplicationError.ENC_DECRYPT_ERROR,
  782. "failed to decrypt INVOCATION payload: {}".format(e),
  783. )
  784. else:
  785. if proc != decrypted_proc:
  786. self.log.warn(
  787. "URI within encrypted INVOCATION payload ('{decrypted_proc}') "
  788. "does not match the envelope ('{proc}')",
  789. decrypted_proc=decrypted_proc,
  790. proc=proc,
  791. )
  792. enc_err = ApplicationError(
  793. ApplicationError.ENC_TRUSTED_URI_MISMATCH,
  794. u"URI within encrypted INVOCATION payload ('{}') does not match the envelope ('{}')".format(decrypted_proc, proc),
  795. )
  796. if enc_err:
  797. # when there was a problem decrypting the INVOCATION payload, we obviously can't invoke
  798. # the endpoint, but return and
  799. reply = self._message_from_exception(message.Invocation.MESSAGE_TYPE, msg.request, enc_err)
  800. self._transport.send(reply)
  801. else:
  802. if endpoint.obj is not None:
  803. invoke_args = (endpoint.obj,)
  804. else:
  805. invoke_args = tuple()
  806. if msg.args:
  807. invoke_args = invoke_args + tuple(msg.args)
  808. invoke_kwargs = msg.kwargs if msg.kwargs else dict()
  809. if endpoint.details_arg:
  810. if msg.receive_progress:
  811. def progress(*args, **kwargs):
  812. assert(args is None or type(args) in (list, tuple))
  813. assert(kwargs is None or type(kwargs) == dict)
  814. if kwargs and six.PY2:
  815. kwargs = {
  816. k.decode('utf8'): v
  817. for k, v in kwargs.iteritems()
  818. }
  819. encoded_payload = None
  820. if msg.enc_algo:
  821. if not self._payload_codec:
  822. raise Exception(u"trying to send encrypted payload, but no keyring active")
  823. encoded_payload = self._payload_codec.encode(False, proc, args, kwargs)
  824. if encoded_payload:
  825. progress_msg = message.Yield(msg.request,
  826. payload=encoded_payload.payload,
  827. progress=True,
  828. enc_algo=encoded_payload.enc_algo,
  829. enc_key=encoded_payload.enc_key,
  830. enc_serializer=encoded_payload.enc_serializer)
  831. else:
  832. progress_msg = message.Yield(msg.request,
  833. args=args,
  834. kwargs=kwargs,
  835. progress=True)
  836. self._transport.send(progress_msg)
  837. else:
  838. progress = None
  839. invoke_kwargs[endpoint.details_arg] = types.CallDetails(registration,
  840. progress=progress,
  841. caller=msg.caller,
  842. caller_authid=msg.caller_authid,
  843. caller_authrole=msg.caller_authrole,
  844. procedure=proc,
  845. enc_algo=msg.enc_algo)
  846. on_reply = txaio.as_future(endpoint.fn, *invoke_args, **invoke_kwargs)
  847. def success(res):
  848. del self._invocations[msg.request]
  849. encoded_payload = None
  850. if msg.enc_algo:
  851. if not self._payload_codec:
  852. log_msg = u"trying to send encrypted payload, but no keyring active"
  853. self.log.warn(log_msg)
  854. else:
  855. try:
  856. if isinstance(res, types.CallResult):
  857. encoded_payload = self._payload_codec.encode(False, proc, res.results, res.kwresults)
  858. else:
  859. encoded_payload = self._payload_codec.encode(False, proc, [res])
  860. except Exception as e:
  861. self.log.warn(
  862. "failed to encrypt application payload: {err}",
  863. err=e,
  864. )
  865. if encoded_payload:
  866. if isinstance(res, types.CallResult):
  867. reply = message.Yield(msg.request,
  868. payload=encoded_payload.payload,
  869. enc_algo=encoded_payload.enc_algo,
  870. enc_key=encoded_payload.enc_key,
  871. enc_serializer=encoded_payload.enc_serializer,
  872. callee=res.callee,
  873. callee_authid=res.callee_authid,
  874. callee_authrole=res.callee_authrole,
  875. forward_for=res.forward_for)
  876. else:
  877. reply = message.Yield(msg.request,
  878. payload=encoded_payload.payload,
  879. enc_algo=encoded_payload.enc_algo,
  880. enc_key=encoded_payload.enc_key,
  881. enc_serializer=encoded_payload.enc_serializer)
  882. else:
  883. if isinstance(res, types.CallResult):
  884. reply = message.Yield(msg.request,
  885. args=res.results,
  886. kwargs=res.kwresults,
  887. callee=res.callee,
  888. callee_authid=res.callee_authid,
  889. callee_authrole=res.callee_authrole,
  890. forward_for=res.forward_for)
  891. else:
  892. reply = message.Yield(msg.request,
  893. args=[res])
  894. if self._transport is None:
  895. self.log.debug('Skipping result of "{}", request {} because transport disconnected.'.format(registration.procedure, msg.request))
  896. return
  897. try:
  898. self._transport.send(reply)
  899. except SerializationError as e:
  900. # the application-level payload returned from the invoked procedure can't be serialized
  901. reply = message.Error(message.Invocation.MESSAGE_TYPE, msg.request, ApplicationError.INVALID_PAYLOAD,
  902. args=[u'success return value from invoked procedure "{0}" could not be serialized: {1}'.format(registration.procedure, e)])
  903. self._transport.send(reply)
  904. except PayloadExceededError as e:
  905. # the application-level payload returned from the invoked procedure, when serialized and framed
  906. # for the transport, exceeds the transport message/frame size limit
  907. reply = message.Error(message.Invocation.MESSAGE_TYPE, msg.request, ApplicationError.PAYLOAD_SIZE_EXCEEDED,
  908. args=[u'success return value from invoked procedure "{0}" exceeds transport size limit: {1}'.format(registration.procedure, e)])
  909. self._transport.send(reply)
  910. def error(err):
  911. del self._invocations[msg.request]
  912. errmsg = txaio.failure_message(err)
  913. try:
  914. self.onUserError(err, errmsg)
  915. except:
  916. pass
  917. formatted_tb = None
  918. if self.traceback_app:
  919. formatted_tb = txaio.failure_format_traceback(err)
  920. reply = self._message_from_exception(
  921. message.Invocation.MESSAGE_TYPE,
  922. msg.request,
  923. err.value,
  924. formatted_tb,
  925. msg.enc_algo
  926. )
  927. try:
  928. self._transport.send(reply)
  929. except SerializationError as e:
  930. # the application-level payload returned from the invoked procedure can't be serialized
  931. reply = message.Error(message.Invocation.MESSAGE_TYPE, msg.request, ApplicationError.INVALID_PAYLOAD,
  932. args=[u'error return value from invoked procedure "{0}" could not be serialized: {1}'.format(registration.procedure, e)])
  933. self._transport.send(reply)
  934. except PayloadExceededError as e:
  935. # the application-level payload returned from the invoked procedure, when serialized and framed
  936. # for the transport, exceeds the transport message/frame size limit
  937. reply = message.Error(message.Invocation.MESSAGE_TYPE, msg.request, ApplicationError.PAYLOAD_SIZE_EXCEEDED,
  938. args=[u'success return value from invoked procedure "{0}" exceeds transport size limit: {1}'.format(registration.procedure, e)])
  939. self._transport.send(reply)
  940. # we have handled the error, so we eat it
  941. return None
  942. self._invocations[msg.request] = InvocationRequest(msg.request, on_reply)
  943. txaio.add_callbacks(on_reply, success, error)
  944. elif isinstance(msg, message.Interrupt):
  945. if msg.request not in self._invocations:
  946. # raise ProtocolError("INTERRUPT received for non-pending invocation {0}".format(msg.request))
  947. self.log.debug('INTERRUPT received for non-pending invocation {request}', request=msg.request)
  948. else:
  949. invoked = self._invocations[msg.request]
  950. # this will result in a CancelledError which will
  951. # be captured by the error handler around line 979
  952. # to delete the invocation..
  953. txaio.cancel(invoked.on_reply)
  954. elif isinstance(msg, message.Registered):
  955. if msg.request in self._register_reqs:
  956. # get and pop outstanding register request
  957. request = self._register_reqs.pop(msg.request)
  958. # create new registration if not yet tracked
  959. if msg.registration not in self._registrations:
  960. registration = Registration(self, msg.registration, request.procedure, request.endpoint)
  961. self._registrations[msg.registration] = registration
  962. else:
  963. raise ProtocolError("REGISTERED received for already existing registration ID {0}".format(msg.registration))
  964. txaio.resolve(request.on_reply, registration)
  965. else:
  966. raise ProtocolError("REGISTERED received for non-pending request ID {0}".format(msg.request))
  967. elif isinstance(msg, message.Unregistered):
  968. if msg.request == 0:
  969. # this is a forced un-register either from a call
  970. # to the wamp.* meta-api or the force_reregister
  971. # option
  972. try:
  973. reg = self._registrations[msg.registration]
  974. except KeyError:
  975. raise ProtocolError(
  976. "UNREGISTERED received for non-existant registration"
  977. " ID {0}".format(msg.registration)
  978. )
  979. self.log.info(
  980. u"Router unregistered procedure '{proc}' with ID {id}",
  981. proc=reg.procedure,
  982. id=msg.registration,
  983. )
  984. elif msg.request in self._unregister_reqs:
  985. # get and pop outstanding subscribe request
  986. request = self._unregister_reqs.pop(msg.request)
  987. # if the registration still exists, mark as inactive and remove ..
  988. if request.registration_id in self._registrations:
  989. self._registrations[request.registration_id].active = False
  990. del self._registrations[request.registration_id]
  991. # resolve deferred/future for unregistering successfully
  992. txaio.resolve(request.on_reply)
  993. else:
  994. raise ProtocolError("UNREGISTERED received for non-pending request ID {0}".format(msg.request))
  995. elif isinstance(msg, message.Error):
  996. # remove outstanding request and get the reply deferred/future
  997. on_reply = None
  998. # ERROR reply to CALL
  999. if msg.request_type == message.Call.MESSAGE_TYPE and msg.request in self._call_reqs:
  1000. on_reply = self._call_reqs.pop(msg.request).on_reply
  1001. # ERROR reply to PUBLISH
  1002. elif msg.request_type == message.Publish.MESSAGE_TYPE and msg.request in self._publish_reqs:
  1003. on_reply = self._publish_reqs.pop(msg.request).on_reply
  1004. # ERROR reply to SUBSCRIBE
  1005. elif msg.request_type == message.Subscribe.MESSAGE_TYPE and msg.request in self._subscribe_reqs:
  1006. on_reply = self._subscribe_reqs.pop(msg.request).on_reply
  1007. # ERROR reply to UNSUBSCRIBE
  1008. elif msg.request_type == message.Unsubscribe.MESSAGE_TYPE and msg.request in self._unsubscribe_reqs:
  1009. on_reply = self._unsubscribe_reqs.pop(msg.request).on_reply
  1010. # ERROR reply to REGISTER
  1011. elif msg.request_type == message.Register.MESSAGE_TYPE and msg.request in self._register_reqs:
  1012. on_reply = self._register_reqs.pop(msg.request).on_reply
  1013. # ERROR reply to UNREGISTER
  1014. elif msg.request_type == message.Unregister.MESSAGE_TYPE and msg.request in self._unregister_reqs:
  1015. on_reply = self._unregister_reqs.pop(msg.request).on_reply
  1016. if on_reply:
  1017. if not txaio.is_called(on_reply):
  1018. txaio.reject(on_reply, self._exception_from_message(msg))
  1019. else:
  1020. raise ProtocolError("WampAppSession.onMessage(): ERROR received for non-pending request_type {0} and request ID {1}".format(msg.request_type, msg.request))
  1021. else:
  1022. raise ProtocolError("Unexpected message {0}".format(msg.__class__))
  1023. @public
  1024. def onClose(self, wasClean):
  1025. """
  1026. Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onClose`
  1027. """
  1028. self._transport = None
  1029. if self._session_id:
  1030. # fire callback and close the transport
  1031. details = types.CloseDetails(
  1032. reason=types.CloseDetails.REASON_TRANSPORT_LOST,
  1033. message=u'WAMP transport was lost without closing the session {} before'.format(self._session_id),
  1034. )
  1035. d = txaio.as_future(self.onLeave, details)
  1036. def success(arg):
  1037. # XXX also: handle async
  1038. self.fire('leave', self, details)
  1039. return arg
  1040. def _error(e):
  1041. return self._swallow_error(e, "While firing onLeave")
  1042. txaio.add_callbacks(d, success, _error)
  1043. self._session_id = None
  1044. d = txaio.as_future(self.onDisconnect)
  1045. def success(arg):
  1046. # XXX do we care about returning 'arg' properly?
  1047. return self.fire('disconnect', self, was_clean=wasClean)
  1048. def _error(e):
  1049. return self._swallow_error(e, "While firing onDisconnect")
  1050. txaio.add_callbacks(d, success, _error)
  1051. @public
  1052. def onChallenge(self, challenge):
  1053. """
  1054. Implements :func:`autobahn.wamp.interfaces.ISession.onChallenge`
  1055. """
  1056. raise Exception("received authentication challenge, but onChallenge not implemented")
  1057. @public
  1058. def onJoin(self, details):
  1059. """
  1060. Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
  1061. """
  1062. @public
  1063. def onWelcome(self, msg):
  1064. """
  1065. Implements :func:`autobahn.wamp.interfaces.ISession.onWelcome`
  1066. """
  1067. def _errback_outstanding_requests(self, exc):
  1068. """
  1069. Errback any still outstanding requests with exc.
  1070. """
  1071. d = txaio.create_future_success(None)
  1072. all_requests = [
  1073. self._publish_reqs,
  1074. self._subscribe_reqs,
  1075. self._unsubscribe_reqs,
  1076. self._call_reqs,
  1077. self._register_reqs,
  1078. self._unregister_reqs
  1079. ]
  1080. outstanding = []
  1081. for requests in all_requests:
  1082. outstanding.extend(requests.values())
  1083. requests.clear()
  1084. if outstanding:
  1085. self.log.info(
  1086. 'Cancelling {count} outstanding requests',
  1087. count=len(outstanding),
  1088. )
  1089. for request in outstanding:
  1090. self.log.debug(
  1091. 'cleaning up outstanding {request_type} request {request_id}, '
  1092. 'firing errback on user handler {request_on_reply}',
  1093. request_on_reply=request.on_reply,
  1094. request_id=request.request_id,
  1095. request_type=request.__class__.__name__,
  1096. )
  1097. if not txaio.is_called(request.on_reply):
  1098. txaio.reject(request.on_reply, exc)
  1099. # wait for any async-ness in the error handlers for on_reply
  1100. txaio.add_callbacks(d, lambda _: request.on_reply, lambda _: request.on_reply)
  1101. return d
  1102. @public
  1103. def onLeave(self, details):
  1104. """
  1105. Implements :func:`autobahn.wamp.interfaces.ISession.onLeave`
  1106. """
  1107. if details.reason != CloseDetails.REASON_DEFAULT:
  1108. self.log.warn('session closed with reason {reason} [{message}]', reason=details.reason, message=details.message)
  1109. # fire ApplicationError on any currently outstanding requests
  1110. exc = ApplicationError(details.reason, details.message)
  1111. d = self._errback_outstanding_requests(exc)
  1112. def disconnect(_):
  1113. if self._transport:
  1114. self.disconnect()
  1115. txaio.add_callbacks(d, disconnect, disconnect)
  1116. return d
  1117. @public
  1118. def leave(self, reason=None, message=None):
  1119. """
  1120. Implements :func:`autobahn.wamp.interfaces.ISession.leave`
  1121. """
  1122. if not self._session_id:
  1123. raise SessionNotReady(u"session hasn't joined a realm")
  1124. if not self._goodbye_sent:
  1125. if not reason:
  1126. reason = u"wamp.close.normal"
  1127. msg = wamp.message.Goodbye(reason=reason, message=message)
  1128. self._transport.send(msg)
  1129. self._goodbye_sent = True
  1130. else:
  1131. self.log.warn('session was already requested to leave - not sending GOODBYE again')
  1132. is_closed = self._transport is None or self._transport.is_closed
  1133. return is_closed
  1134. @public
  1135. def onDisconnect(self):
  1136. """
  1137. Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect`
  1138. """
  1139. # fire TransportLost on any _still_ outstanding requests
  1140. # (these should have been already cleaned up in onLeave() - when
  1141. # this actually has fired)
  1142. exc = exception.TransportLost()
  1143. self._errback_outstanding_requests(exc)
  1144. @public
  1145. def publish(self, topic, *args, **kwargs):
  1146. """
  1147. Implements :func:`autobahn.wamp.interfaces.IPublisher.publish`
  1148. """
  1149. assert(type(topic) == six.text_type)
  1150. assert(args is None or type(args) in (list, tuple))
  1151. assert(kwargs is None or type(kwargs) == dict)
  1152. message.check_or_raise_uri(topic,
  1153. message='{}.publish()'.format(self.__class__.__name__),
  1154. strict=False,
  1155. allow_empty_components=False,
  1156. allow_none=False)
  1157. options = kwargs.pop('options', None)
  1158. if options and not isinstance(options, types.PublishOptions):
  1159. raise Exception("options must be of type a.w.t.PublishOptions")
  1160. if kwargs and six.PY2:
  1161. kwargs = {
  1162. k.decode('utf8'): v
  1163. for k, v in kwargs.iteritems()
  1164. }
  1165. if not self._transport:
  1166. raise exception.TransportLost()
  1167. request_id = self._request_id_gen.next()
  1168. encoded_payload = None
  1169. if self._payload_codec:
  1170. encoded_payload = self._payload_codec.encode(True, topic, args, kwargs)
  1171. if encoded_payload:
  1172. if options:
  1173. msg = message.Publish(request_id,
  1174. topic,
  1175. payload=encoded_payload.payload,
  1176. enc_algo=encoded_payload.enc_algo,
  1177. enc_key=encoded_payload.enc_key,
  1178. enc_serializer=encoded_payload.enc_serializer,
  1179. **options.message_attr())
  1180. else:
  1181. msg = message.Publish(request_id,
  1182. topic,
  1183. payload=encoded_payload.payload,
  1184. enc_algo=encoded_payload.enc_algo,
  1185. enc_key=encoded_payload.enc_key,
  1186. enc_serializer=encoded_payload.enc_serializer)
  1187. else:
  1188. if options:
  1189. msg = message.Publish(request_id,
  1190. topic,
  1191. args=args,
  1192. kwargs=kwargs,
  1193. **options.message_attr())
  1194. else:
  1195. msg = message.Publish(request_id,
  1196. topic,
  1197. args=args,
  1198. kwargs=kwargs)
  1199. if options:
  1200. if options.correlation_id is not None:
  1201. msg.correlation_id = options.correlation_id
  1202. if options.correlation_uri is not None:
  1203. msg.correlation_uri = options.correlation_uri
  1204. if options.correlation_is_anchor is not None:
  1205. msg.correlation_is_anchor = options.correlation_is_anchor
  1206. if options.correlation_is_last is not None:
  1207. msg.correlation_is_last = options.correlation_is_last
  1208. if options and options.acknowledge:
  1209. # only acknowledged publications expect a reply ..
  1210. on_reply = txaio.create_future()
  1211. self._publish_reqs[request_id] = PublishRequest(request_id, on_reply, was_encrypted=(encoded_payload is not None))
  1212. else:
  1213. on_reply = None
  1214. try:
  1215. # Notes:
  1216. #
  1217. # * this might raise autobahn.wamp.exception.SerializationError
  1218. # when the user payload cannot be serialized
  1219. # * we have to setup a PublishRequest() in _publish_reqs _before_
  1220. # calling transpor.send(), because a mock- or side-by-side transport
  1221. # will immediately lead on an incoming WAMP message in onMessage()
  1222. #
  1223. self._transport.send(msg)
  1224. except Exception as e:
  1225. if request_id in self._publish_reqs:
  1226. del self._publish_reqs[request_id]
  1227. raise e
  1228. return on_reply
  1229. @public
  1230. def subscribe(self, handler, topic=None, options=None):
  1231. """
  1232. Implements :func:`autobahn.wamp.interfaces.ISubscriber.subscribe`
  1233. """
  1234. assert((callable(handler) and topic is not None) or hasattr(handler, '__class__'))
  1235. assert(topic is None or type(topic) == six.text_type)
  1236. assert(options is None or isinstance(options, types.SubscribeOptions))
  1237. if not self._transport:
  1238. raise exception.TransportLost()
  1239. def _subscribe(obj, fn, topic, options):
  1240. message.check_or_raise_uri(topic,
  1241. message='{}.subscribe()'.format(self.__class__.__name__),
  1242. strict=False,
  1243. allow_empty_components=True,
  1244. allow_none=False)
  1245. request_id = self._request_id_gen.next()
  1246. on_reply = txaio.create_future()
  1247. handler_obj = Handler(fn, obj, options.details_arg if options else None)
  1248. self._subscribe_reqs[request_id] = SubscribeRequest(request_id, topic, on_reply, handler_obj)
  1249. if options:
  1250. msg = message.Subscribe(request_id, topic, **options.message_attr())
  1251. else:
  1252. msg = message.Subscribe(request_id, topic)
  1253. if options:
  1254. if options.correlation_id is not None:
  1255. msg.correlation_id = options.correlation_id
  1256. if options.correlation_uri is not None:
  1257. msg.correlation_uri = options.correlation_uri
  1258. if options.correlation_is_anchor is not None:
  1259. msg.correlation_is_anchor = options.correlation_is_anchor
  1260. if options.correlation_is_last is not None:
  1261. msg.correlation_is_last = options.correlation_is_last
  1262. self._transport.send(msg)
  1263. return on_reply
  1264. if callable(handler):
  1265. # subscribe a single handler
  1266. return _subscribe(None, handler, topic, options)
  1267. else:
  1268. # subscribe all methods on an object decorated with "wamp.subscribe"
  1269. on_replies = []
  1270. for k in inspect.getmembers(handler.__class__, is_method_or_function):
  1271. proc = k[1]
  1272. if "_wampuris" in proc.__dict__:
  1273. for pat in proc.__dict__["_wampuris"]:
  1274. if pat.is_handler():
  1275. _uri = pat.uri()
  1276. subopts = pat.options or options
  1277. if subopts is None:
  1278. if pat.uri_type == uri.Pattern.URI_TYPE_WILDCARD:
  1279. subopts = types.SubscribeOptions(match=u"wildcard")
  1280. else:
  1281. subopts = types.SubscribeOptions(match=u"exact")
  1282. on_replies.append(_subscribe(handler, proc, _uri, subopts))
  1283. # XXX needs coverage
  1284. return txaio.gather(on_replies, consume_exceptions=True)
  1285. def _unsubscribe(self, subscription):
  1286. """
  1287. Called from :meth:`autobahn.wamp.protocol.Subscription.unsubscribe`
  1288. """
  1289. assert(isinstance(subscription, Subscription))
  1290. assert subscription.active
  1291. assert(subscription.id in self._subscriptions)
  1292. assert(subscription in self._subscriptions[subscription.id])
  1293. if not self._transport:
  1294. raise exception.TransportLost()
  1295. # remove handler subscription and mark as inactive
  1296. self._subscriptions[subscription.id].remove(subscription)
  1297. subscription.active = False
  1298. # number of handler subscriptions left ..
  1299. scount = len(self._subscriptions[subscription.id])
  1300. if scount == 0:
  1301. # if the last handler was removed, unsubscribe from broker ..
  1302. request_id = self._request_id_gen.next()
  1303. on_reply = txaio.create_future()
  1304. self._unsubscribe_reqs[request_id] = UnsubscribeRequest(request_id, on_reply, subscription.id)
  1305. msg = message.Unsubscribe(request_id, subscription.id)
  1306. self._transport.send(msg)
  1307. return on_reply
  1308. else:
  1309. # there are still handlers active on the subscription!
  1310. return txaio.create_future_success(scount)
  1311. @public
  1312. def call(self, procedure, *args, **kwargs):
  1313. """
  1314. Implements :func:`autobahn.wamp.interfaces.ICaller.call`
  1315. """
  1316. assert(type(procedure) == six.text_type)
  1317. assert(args is None or type(args) in (list, tuple))
  1318. assert(kwargs is None or type(kwargs) == dict)
  1319. message.check_or_raise_uri(procedure,
  1320. message='{}.call()'.format(self.__class__.__name__),
  1321. strict=False,
  1322. allow_empty_components=False,
  1323. allow_none=False)
  1324. options = kwargs.pop('options', None)
  1325. if options and not isinstance(options, types.CallOptions):
  1326. raise Exception("options must be of type a.w.t.CallOptions")
  1327. if kwargs and six.PY2:
  1328. kwargs = {
  1329. k.decode('utf8'): v
  1330. for k, v in kwargs.iteritems()
  1331. }
  1332. if not self._transport:
  1333. raise exception.TransportLost()
  1334. request_id = self._request_id_gen.next()
  1335. encoded_payload = None
  1336. if self._payload_codec:
  1337. try:
  1338. encoded_payload = self._payload_codec.encode(True, procedure, args, kwargs)
  1339. except:
  1340. self.log.failure()
  1341. raise
  1342. if encoded_payload:
  1343. if options:
  1344. msg = message.Call(request_id,
  1345. procedure,
  1346. payload=encoded_payload.payload,
  1347. enc_algo=encoded_payload.enc_algo,
  1348. enc_key=encoded_payload.enc_key,
  1349. enc_serializer=encoded_payload.enc_serializer,
  1350. **options.message_attr())
  1351. else:
  1352. msg = message.Call(request_id,
  1353. procedure,
  1354. payload=encoded_payload.payload,
  1355. enc_algo=encoded_payload.enc_algo,
  1356. enc_key=encoded_payload.enc_key,
  1357. enc_serializer=encoded_payload.enc_serializer)
  1358. else:
  1359. if options:
  1360. msg = message.Call(request_id,
  1361. procedure,
  1362. args=args,
  1363. kwargs=kwargs,
  1364. **options.message_attr())
  1365. else:
  1366. msg = message.Call(request_id,
  1367. procedure,
  1368. args=args,
  1369. kwargs=kwargs)
  1370. if options:
  1371. if options.correlation_id is not None:
  1372. msg.correlation_id = options.correlation_id
  1373. if options.correlation_uri is not None:
  1374. msg.correlation_uri = options.correlation_uri
  1375. if options.correlation_is_anchor is not None:
  1376. msg.correlation_is_anchor = options.correlation_is_anchor
  1377. if options.correlation_is_last is not None:
  1378. msg.correlation_is_last = options.correlation_is_last
  1379. def canceller(d):
  1380. cancel_msg = message.Cancel(request_id)
  1381. self._transport.send(cancel_msg)
  1382. # since we announced support for cancelling, we should
  1383. # definitely get an Error back for our Cancel which will
  1384. # clean up this invocation
  1385. on_reply = txaio.create_future(canceller=canceller)
  1386. self._call_reqs[request_id] = CallRequest(request_id, procedure, on_reply, options)
  1387. try:
  1388. # Notes:
  1389. #
  1390. # * this might raise autobahn.wamp.exception.SerializationError
  1391. # when the user payload cannot be serialized
  1392. # * we have to setup a CallRequest() in _call_reqs _before_
  1393. # calling transpor.send(), because a mock- or side-by-side transport
  1394. # will immediately lead on an incoming WAMP message in onMessage()
  1395. #
  1396. self._transport.send(msg)
  1397. except:
  1398. if request_id in self._call_reqs:
  1399. del self._call_reqs[request_id]
  1400. raise
  1401. return on_reply
  1402. @public
  1403. def register(self, endpoint, procedure=None, options=None, prefix=None):
  1404. """
  1405. Implements :func:`autobahn.wamp.interfaces.ICallee.register`
  1406. """
  1407. assert((callable(endpoint) and procedure is not None) or hasattr(endpoint, '__class__'))
  1408. assert(procedure is None or type(procedure) == six.text_type)
  1409. assert(options is None or isinstance(options, types.RegisterOptions))
  1410. assert prefix is None or isinstance(prefix, six.text_type)
  1411. if not self._transport:
  1412. raise exception.TransportLost()
  1413. def _register(obj, fn, procedure, options):
  1414. message.check_or_raise_uri(procedure,
  1415. message='{}.register()'.format(self.__class__.__name__),
  1416. strict=False,
  1417. allow_empty_components=True,
  1418. allow_none=False)
  1419. request_id = self._request_id_gen.next()
  1420. on_reply = txaio.create_future()
  1421. endpoint_obj = Endpoint(fn, obj, options.details_arg if options else None)
  1422. if prefix is not None:
  1423. procedure = u"{}{}".format(prefix, procedure)
  1424. self._register_reqs[request_id] = RegisterRequest(request_id, on_reply, procedure, endpoint_obj)
  1425. if options:
  1426. msg = message.Register(request_id, procedure, **options.message_attr())
  1427. else:
  1428. msg = message.Register(request_id, procedure)
  1429. if options:
  1430. if options.correlation_id is not None:
  1431. msg.correlation_id = options.correlation_id
  1432. if options.correlation_uri is not None:
  1433. msg.correlation_uri = options.correlation_uri
  1434. if options.correlation_is_anchor is not None:
  1435. msg.correlation_is_anchor = options.correlation_is_anchor
  1436. if options.correlation_is_last is not None:
  1437. msg.correlation_is_last = options.correlation_is_last
  1438. self._transport.send(msg)
  1439. return on_reply
  1440. if callable(endpoint):
  1441. # register a single callable
  1442. return _register(None, endpoint, procedure, options)
  1443. else:
  1444. # register all methods on an object decorated with "wamp.register"
  1445. on_replies = []
  1446. for k in inspect.getmembers(endpoint.__class__, is_method_or_function):
  1447. proc = k[1]
  1448. if "_wampuris" in proc.__dict__:
  1449. for pat in proc.__dict__["_wampuris"]:
  1450. if pat.is_endpoint():
  1451. _uri = pat.uri()
  1452. regopts = pat.options or options
  1453. on_replies.append(_register(endpoint, proc, _uri, regopts))
  1454. # XXX needs coverage
  1455. return txaio.gather(on_replies, consume_exceptions=True)
  1456. def _unregister(self, registration):
  1457. """
  1458. Called from :meth:`autobahn.wamp.protocol.Registration.unregister`
  1459. """
  1460. assert(isinstance(registration, Registration))
  1461. assert registration.active
  1462. assert(registration.id in self._registrations)
  1463. if not self._transport:
  1464. raise exception.TransportLost()
  1465. request_id = self._request_id_gen.next()
  1466. on_reply = txaio.create_future()
  1467. self._unregister_reqs[request_id] = UnregisterRequest(request_id, on_reply, registration.id)
  1468. msg = message.Unregister(request_id, registration.id)
  1469. self._transport.send(msg)
  1470. return on_reply
  1471. class _SessionShim(ApplicationSession):
  1472. """
  1473. shim that lets us present pep8 API for user-classes to override,
  1474. but also backwards-compatible for existing code using
  1475. ApplicationSession "directly".
  1476. **NOTE:** this is not public or intended for use; you should import
  1477. either :class:`autobahn.asyncio.wamp.Session` or
  1478. :class:`autobahn.twisted.wamp.Session` depending on which async
  1479. framework you're using.
  1480. """
  1481. #: name -> IAuthenticator
  1482. _authenticators = None
  1483. def onJoin(self, details):
  1484. return self.on_join(details)
  1485. def onConnect(self):
  1486. if self._authenticators:
  1487. # authid, authrole *must* match across all authenticators
  1488. # (checked in add_authenticator) so these lists are either
  1489. # [None] or [None, 'some_authid']
  1490. authid = [x._args.get('authid', None) for x in self._authenticators.values()][-1]
  1491. authrole = [x._args.get('authrole', None) for x in self._authenticators.values()][-1]
  1492. # we need a "merged" authextra here because we can send a
  1493. # list of acceptable authmethods, but only a single
  1494. # authextra dict
  1495. authextra = self._merged_authextra()
  1496. self.join(
  1497. self.config.realm,
  1498. authmethods=list(self._authenticators.keys()),
  1499. authid=authid or u'public',
  1500. authrole=authrole or u'default',
  1501. authextra=authextra,
  1502. )
  1503. else:
  1504. self.on_connect()
  1505. def onChallenge(self, challenge):
  1506. try:
  1507. authenticator = self._authenticators[challenge.method]
  1508. except KeyError:
  1509. raise RuntimeError(
  1510. "Received challenge for unknown authmethod '{}'".format(
  1511. challenge.method
  1512. )
  1513. )
  1514. return authenticator.on_challenge(self, challenge)
  1515. def onWelcome(self, msg):
  1516. if msg.authmethod is None or self._authenticators is None:
  1517. # no authentication
  1518. return
  1519. try:
  1520. authenticator = self._authenticators[msg.authmethod]
  1521. except KeyError:
  1522. raise RuntimeError(
  1523. "Received onWelcome for unknown authmethod '{}'".format(
  1524. msg.authmethod
  1525. )
  1526. )
  1527. return authenticator.on_welcome(self, msg.authextra)
  1528. def onLeave(self, details):
  1529. return self.on_leave(details)
  1530. def onDisconnect(self):
  1531. return self.on_disconnect()
  1532. # experimental authentication API
  1533. def add_authenticator(self, authenticator):
  1534. assert isinstance(authenticator, IAuthenticator)
  1535. if self._authenticators is None:
  1536. self._authenticators = {}
  1537. # before adding this authenticator we need to validate that
  1538. # it's consistent with any other authenticators we may have --
  1539. # for example, they must all agree on "authid" etc because
  1540. # .join() only takes one value for all of those.
  1541. def at_most_one(name):
  1542. uni = set([
  1543. a._args[name]
  1544. for a in list(self._authenticators.values()) + [authenticator]
  1545. if name in a._args
  1546. ])
  1547. if len(uni) > 1:
  1548. raise ValueError(
  1549. "Inconsistent {}s: {}".format(
  1550. name,
  1551. ' '.join(uni),
  1552. )
  1553. )
  1554. # all authids must match
  1555. at_most_one('authid')
  1556. # all authroles must match
  1557. at_most_one('authrole')
  1558. # can we do anything else other than merge all authextra keys?
  1559. # here we check that any duplicate keys have the same values
  1560. authextra = authenticator.authextra
  1561. merged = self._merged_authextra()
  1562. for k, v in merged.items():
  1563. if k in authextra and authextra[k] != v:
  1564. raise ValueError(
  1565. "Inconsistent authextra values for '{}': '{}' vs '{}'".format(
  1566. k, v, authextra[k],
  1567. )
  1568. )
  1569. # validation complete, add it
  1570. self._authenticators[authenticator.name] = authenticator
  1571. def _merged_authextra(self):
  1572. """
  1573. internal helper
  1574. :returns: a single 'authextra' dict, consisting of all keys
  1575. from any authenticator's authextra.
  1576. Note that when the authenticator was added, we already checked
  1577. that any keys it does contain has the same value as any
  1578. existing authextra.
  1579. """
  1580. authextras = [a.authextra for a in self._authenticators.values()]
  1581. def extract_keys(x, y):
  1582. return x | set(y.keys())
  1583. unique_keys = reduce(extract_keys, authextras, set())
  1584. def first_value_for(k):
  1585. """
  1586. for anything already in self._authenticators, we checked
  1587. that it has the same value for any keys in its authextra --
  1588. so here we just extract the first one
  1589. """
  1590. for authextra in authextras:
  1591. if k in authextra:
  1592. return authextra[k]
  1593. # "can't" happen
  1594. raise ValueError(
  1595. "No values for '{}'".format(k)
  1596. )
  1597. return {
  1598. k: first_value_for(k)
  1599. for k in unique_keys
  1600. }
  1601. # these are the actual "new API" methods (i.e. snake_case)
  1602. #
  1603. def on_join(self, details):
  1604. pass
  1605. def on_leave(self, details):
  1606. self.disconnect()
  1607. def on_connect(self):
  1608. self.join(self.config.realm)
  1609. def on_disconnect(self):
  1610. pass
  1611. # ISession.register collides with the abc.ABCMeta.register method
  1612. # ISession.register(ApplicationSession)
  1613. class ApplicationSessionFactory(object):
  1614. """
  1615. WAMP endpoint session factory.
  1616. """
  1617. session = ApplicationSession
  1618. """
  1619. WAMP application session class to be used in this factory.
  1620. """
  1621. def __init__(self, config=None):
  1622. """
  1623. :param config: The default component configuration.
  1624. :type config: instance of :class:`autobahn.wamp.types.ComponentConfig`
  1625. """
  1626. self.config = config or types.ComponentConfig(realm=u"realm1")
  1627. def __call__(self):
  1628. """
  1629. Creates a new WAMP application session.
  1630. :returns: -- An instance of the WAMP application session class as
  1631. given by `self.session`.
  1632. """
  1633. session = self.session(self.config)
  1634. session.factory = self
  1635. return session