You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pubcontrolclient.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # pubcontrolclient.py
  2. # ~~~~~~~~~
  3. # This module implements the PubControlClient class.
  4. # :authors: Justin Karneges, Konstantin Bokarius.
  5. # :copyright: (c) 2015 by Fanout, Inc.
  6. # :license: MIT, see LICENSE for more details.
  7. import sys
  8. import json
  9. import copy
  10. from base64 import b64encode
  11. import threading
  12. from collections import deque
  13. import requests
  14. from .pubsubmonitor import PubSubMonitor
  15. from .utilities import _gen_auth_jwt_header
  16. try:
  17. import ndg.httpsclient
  18. except ImportError:
  19. ndg = None
  20. # TODO: Remove in the future when most users have Python >= 2.7.9.
  21. try:
  22. import urllib3
  23. except ImportError:
  24. try:
  25. from requests.packages import urllib3
  26. except ImportError:
  27. pass
  28. try:
  29. urllib3.disable_warnings()
  30. except NameError:
  31. pass
  32. except AttributeError:
  33. pass
  34. # The PubControlClient class allows consumers to publish either synchronously
  35. # or asynchronously to an endpoint of their choice. The consumer wraps a Format
  36. # class instance in an Item class instance and passes that to the publish
  37. # method. The async publish method has an optional callback parameter that
  38. # is called after the publishing is complete to notify the consumer of the
  39. # result. Optionally provide JWT authentication claim and key information.
  40. # If require_subscribers is set to True then channel subscription monitoring
  41. # will be enabled and only channels that are subscribed to will be published
  42. # to.
  43. class PubControlClient(object):
  44. # Initialize this class with a URL representing the publishing endpoint.
  45. def __init__(self, uri, auth_jwt_claim=None,
  46. auth_jwt_key=None, require_subscribers=False, sub_callback=None):
  47. self.uri = uri
  48. self.lock = threading.Lock()
  49. self.thread = None
  50. self.thread_cond = None
  51. self.req_queue = deque()
  52. self.auth_basic_user = None
  53. self.auth_basic_pass = None
  54. self.auth_jwt_claim = auth_jwt_claim
  55. self.auth_jwt_key = auth_jwt_key
  56. self.requests_session = requests.session()
  57. self.sub_monitor = None
  58. self.closed = False
  59. if require_subscribers:
  60. self.sub_monitor = PubSubMonitor(uri, auth_jwt_claim, auth_jwt_key, sub_callback)
  61. # Call this method and pass a username and password to use basic
  62. # authentication with the configured endpoint.
  63. def set_auth_basic(self, username, password):
  64. self._verify_notclosed()
  65. self.lock.acquire()
  66. self.auth_basic_user = username
  67. self.auth_basic_pass = password
  68. self.lock.release()
  69. # Call this method and pass a claim and key to use JWT authentication
  70. # with the configured endpoint.
  71. def set_auth_jwt(self, claim, key):
  72. self._verify_notclosed()
  73. self.lock.acquire()
  74. self.auth_jwt_claim = claim
  75. self.auth_jwt_key = key
  76. self.lock.release()
  77. # The publish method for publishing the specified item to the specified
  78. # channel on the configured endpoint. The blocking parameter indicates
  79. # whether the the callback method should be blocking or non-blocking. The
  80. # callback parameter is optional and will be passed the publishing results
  81. # after publishing is complete. Note that the callback executes on a
  82. # separate thread. If require_subscribers was set to True then the message
  83. # will only be published if the channel is subscribed to. If the sub_monitor
  84. # instance failed to retrieve subscriber information then an error will be
  85. # raised.
  86. def publish(self, channel, item, blocking=False, callback=None):
  87. self._verify_notclosed()
  88. if self.sub_monitor and self.sub_monitor.is_closed():
  89. if callback:
  90. callback(False, 'failed to retrieve channel subscribers')
  91. else:
  92. raise ValueError('failed to retrieve channel subscribers')
  93. elif self.sub_monitor and not self.sub_monitor.is_channel_subscribed_to(channel):
  94. if callback:
  95. callback(True, '')
  96. return
  97. i = item.export()
  98. i['channel'] = channel
  99. if blocking:
  100. self.lock.acquire()
  101. uri = self.uri
  102. auth = self._gen_auth_header()
  103. self.lock.release()
  104. self._pubcall(uri, auth, [i])
  105. else:
  106. self.lock.acquire()
  107. uri = self.uri
  108. auth = self._gen_auth_header()
  109. self._ensure_thread()
  110. self.lock.release()
  111. self._queue_req(('pub', uri, auth, i, callback))
  112. # This method is a blocking method that ensures that all asynchronous
  113. # publishing is complete prior to returning and allowing the consumer to
  114. # proceed.
  115. def wait_all_sent(self):
  116. self.lock.acquire()
  117. if self.thread is not None:
  118. self._queue_req(('stop',))
  119. self.thread.join()
  120. self.thread = None
  121. self.lock.release()
  122. # DEPRECATED: The finish method is now deprecated in favor of the more
  123. # descriptive wait_all_sent() method.
  124. def finish(self):
  125. self.wait_all_sent()
  126. # This method is meant to close the PubControlClient instance and should
  127. # implemented when needed for future features. Currently this method
  128. # is simply a passthrough to wait_all_sent().
  129. def close(self):
  130. self.lock.acquire()
  131. self.closed = True
  132. if self.sub_monitor:
  133. self.sub_monitor.close()
  134. self.lock.release()
  135. self.wait_all_sent()
  136. # This method makes an HTTP request to an endpoint relative to the base
  137. # URI, using configured authentication. Returns a tuple of
  138. # (status code, headers, body).
  139. def http_call(self, endpoint, data, headers={}):
  140. uri = self.uri + endpoint
  141. send_headers = copy.deepcopy(headers)
  142. auth_header = self._gen_auth_header()
  143. if auth_header:
  144. send_headers['Authorization'] = auth_header
  145. try:
  146. return self._make_http_request(uri, data, send_headers)
  147. except Exception as e:
  148. raise ValueError('failed during http call: ' + str(e))
  149. # An internal method for verifying that the PubControlClient instance
  150. # has not been closed via the close() method. If it has then an error
  151. # is raised.
  152. def _verify_notclosed(self):
  153. if self.closed:
  154. raise ValueError('pubcontrolclient instance is closed')
  155. # An internal method used to generate an authorization header. The
  156. # authorization header is generated based on whether basic or JWT
  157. # authorization information was provided via the publicly accessible
  158. # 'set_*_auth' methods defined above.
  159. def _gen_auth_header(self):
  160. if self.auth_basic_user:
  161. return 'Basic ' + str(b64encode(('%s:%s' % (self.auth_basic_user, self.auth_basic_pass)).encode('ascii')))
  162. elif self.auth_jwt_claim:
  163. return _gen_auth_jwt_header(self.auth_jwt_claim, self.auth_jwt_key)
  164. else:
  165. return None
  166. # An internal method that ensures that asynchronous publish calls are
  167. # properly processed. This method initializes the required class fields,
  168. # starts the pubworker worker thread, and is meant to execute only when
  169. # the consumer makes an asynchronous publish call.
  170. def _ensure_thread(self):
  171. if self.thread is None:
  172. self.thread_cond = threading.Condition()
  173. self.thread = threading.Thread(target=self._pubworker)
  174. self.thread.daemon = True
  175. self.thread.start()
  176. # An internal method for adding an asynchronous publish request to the
  177. # publishing queue. This method will also activate the pubworker worker
  178. # thread to make sure that it process any and all requests added to
  179. # the queue.
  180. def _queue_req(self, req):
  181. self.thread_cond.acquire()
  182. self.req_queue.append(req)
  183. self.thread_cond.notify()
  184. self.thread_cond.release()
  185. # An internal method for preparing the HTTP POST request for publishing
  186. # data to the endpoint. This method accepts the URI endpoint, authorization
  187. # header, and a list of items to publish.
  188. def _pubcall(self, uri, auth_header, items):
  189. uri = uri + '/publish/'
  190. headers = dict()
  191. if auth_header:
  192. headers['Authorization'] = auth_header
  193. headers['Content-Type'] = 'application/json'
  194. content = dict()
  195. content['items'] = items
  196. content_raw = json.dumps(content)
  197. try:
  198. if isinstance(content_raw, unicode):
  199. content_raw = content_raw.encode('utf-8')
  200. except NameError:
  201. if isinstance(content_raw, str):
  202. content_raw = content_raw.encode('utf-8')
  203. try:
  204. self._make_http_request(uri, content_raw, headers)
  205. except Exception as e:
  206. raise ValueError('failed to publish: ' + str(e))
  207. # An internal method for making an HTTP request to the specified URI
  208. # with the specified content and headers.
  209. def _make_http_request(self, uri, data, headers):
  210. if sys.version_info >= (2, 7, 9) or (ndg and ndg.httpsclient):
  211. res = self.requests_session.post(uri, headers=headers, data=data)
  212. else:
  213. res = self.requests_session.post(uri, headers=headers, data=data, verify=False)
  214. self._verify_status_code(res.status_code, res.text)
  215. return (res.status_code, res.headers, res.text)
  216. # An internal method for ensuring a successful status code is returned
  217. # from the server.
  218. def _verify_status_code(self, code, message):
  219. if code < 200 or code >= 300:
  220. raise ValueError('received failed status code ' + str(code) +
  221. ' with message: ' + message)
  222. # An internal method for publishing a batch of requests. The requests are
  223. # parsed for the URI, authorization header, and each request is published
  224. # to the endpoint. After all publishing is complete, each callback
  225. # corresponding to each request is called (if a callback was originally
  226. # provided for that request) and passed a result indicating whether that
  227. # request was successfully published.
  228. def _pubbatch(self, reqs):
  229. assert(len(reqs) > 0)
  230. uri = reqs[0][0]
  231. auth_header = reqs[0][1]
  232. items = list()
  233. callbacks = list()
  234. for req in reqs:
  235. items.append(req[2])
  236. callbacks.append(req[3])
  237. try:
  238. self._pubcall(uri, auth_header, items)
  239. result = (True, '')
  240. except Exception as e:
  241. try:
  242. result = (False, e.message)
  243. except AttributeError:
  244. result = (False, str(e))
  245. for c in callbacks:
  246. if c:
  247. c(result[0], result[1])
  248. # An internal method that is meant to run as a separate thread and process
  249. # asynchronous publishing requests. The method runs continously and
  250. # publishes requests in batches containing a maximum of 10 requests. The
  251. # method completes and the thread is terminated only when a 'stop' command
  252. # is provided in the request queue.
  253. def _pubworker(self):
  254. quit = False
  255. while not quit:
  256. self.thread_cond.acquire()
  257. # if no requests ready, wait for one
  258. if len(self.req_queue) == 0:
  259. self.thread_cond.wait()
  260. # still no requests after notification? start over
  261. if len(self.req_queue) == 0:
  262. self.thread_cond.release()
  263. continue
  264. reqs = list()
  265. while len(self.req_queue) > 0 and len(reqs) < 10:
  266. m = self.req_queue.popleft()
  267. if m[0] == 'stop':
  268. quit = True
  269. break
  270. reqs.append((m[1], m[2], m[3], m[4]))
  271. self.thread_cond.release()
  272. if len(reqs) > 0:
  273. self._pubbatch(reqs)