Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

aio.py 17KB


  1. ###############################################################################
  2. #
  3. # The MIT License (MIT)
  4. #
  5. # Copyright (c) typedef int GmbH
  6. #
  7. # Permission is hereby granted, free of charge, to any person obtaining a copy
  8. # of this software and associated documentation files (the "Software"), to deal
  9. # in the Software without restriction, including without limitation the rights
  10. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. # copies of the Software, and to permit persons to whom the Software is
  12. # furnished to do so, subject to the following conditions:
  13. #
  14. # The above copyright notice and this permission notice shall be included in
  15. # all copies or substantial portions of the Software.
  16. #
  17. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. # THE SOFTWARE.
  24. #
  25. ###############################################################################
  26. import asyncio
  27. from asyncio import iscoroutine
  28. from asyncio import Future
  29. try:
  30. from types import AsyncGeneratorType
  31. except ImportError:
  32. class AsyncGeneratorType:
  33. pass
  34. import io
  35. import os
  36. import sys
  37. import time
  38. import weakref
  39. import functools
  40. import traceback
  41. import logging
  42. import inspect
  43. from datetime import datetime
  44. from txaio.interfaces import IFailedFuture, ILogger, log_levels
  45. from txaio._iotype import guess_stream_needs_encoding
  46. from txaio._common import _BatchedTimer
  47. from txaio import _util
  48. from txaio import _Config
  49. config = _Config()
  50. def with_config(loop=None):
  51. """
  52. :return: an instance of the txaio API with the given
  53. configuration. This won't affect anything using the 'gloabl'
  54. config nor other instances created using this function.
  55. If you need to customize txaio configuration separately (e.g. to
  56. use multiple event-loops in asyncio), you can take code like this:
  57. import txaio
  58. class FunTimes(object):
  59. def something_async(self):
  60. return txaio.call_later(1, lambda: 'some result')
  61. and instead do this:
  62. import txaio
  63. class FunTimes(object):
  64. txaio = txaio
  65. def something_async(self):
  66. # this will run in the local/new event loop created in the constructor
  67. return self.txaio.call_later(1, lambda: 'some result')
  68. fun0 = FunTimes()
  69. fun1 = FunTimes()
  70. fun1.txaio = txaio.with_config(loop=asyncio.new_event_loop())
  71. So `fun1` will run its futures on the newly-created event loop,
  72. while `fun0` will work just as it did before this `with_config`
  73. method was introduced (after 2.6.2).
  74. """
  75. cfg = _Config()
  76. if loop is not None:
  77. cfg.loop = loop
  78. return _AsyncioApi(cfg)
  79. # logging should probably all be folded into _AsyncioApi as well
  80. _stderr, _stdout = sys.stderr, sys.stdout
  81. _loggers = weakref.WeakSet() # weak-ref's of each logger we've created before start_logging()
  82. _log_level = 'info' # re-set by start_logging
  83. _started_logging = False
  84. _categories = {}
  85. def add_log_categories(categories):
  86. _categories.update(categories)
  87. class FailedFuture(IFailedFuture):
  88. """
  89. This provides an object with any features from Twisted's Failure
  90. that we might need in Autobahn classes that use FutureMixin.
  91. We need to encapsulate information from exceptions so that
  92. errbacks still have access to the traceback (in case they want to
  93. print it out) outside of "except" blocks.
  94. """
  95. def __init__(self, type_, value, traceback):
  96. """
  97. These are the same parameters as returned from ``sys.exc_info()``
  98. :param type_: exception type
  99. :param value: the Exception instance
  100. :param traceback: a traceback object
  101. """
  102. self._type = type_
  103. self._value = value
  104. self._traceback = traceback
  105. @property
  106. def value(self):
  107. return self._value
  108. def __str__(self):
  109. return str(self.value)
  110. # logging API methods
  111. def _log(logger, level, format=u'', **kwargs):
  112. # Look for a log_category, switch it in if we have it
  113. if "log_category" in kwargs and kwargs["log_category"] in _categories:
  114. format = _categories.get(kwargs["log_category"])
  115. kwargs['log_time'] = time.time()
  116. kwargs['log_level'] = level
  117. kwargs['log_format'] = format
  118. # NOTE: turning kwargs into a single "argument which
  119. # is a dict" on purpose, since a LogRecord only keeps
  120. # args, not kwargs.
  121. if level == 'trace':
  122. level = 'debug'
  123. kwargs['txaio_trace'] = True
  124. msg = format.format(**kwargs)
  125. getattr(logger._logger, level)(msg)
  126. def _no_op(*args, **kw):
  127. pass
  128. class _TxaioLogWrapper(ILogger):
  129. def __init__(self, logger):
  130. self._logger = logger
  131. self._set_log_level(_log_level)
  132. def emit(self, level, *args, **kwargs):
  133. func = getattr(self, level)
  134. return func(*args, **kwargs)
  135. def _set_log_level(self, level):
  136. target_level = log_levels.index(level)
  137. # this binds either _log or _no_op above to this instance,
  138. # depending on the desired level.
  139. for (idx, name) in enumerate(log_levels):
  140. if idx <= target_level:
  141. log_method = functools.partial(_log, self, name)
  142. else:
  143. log_method = _no_op
  144. setattr(self, name, log_method)
  145. self._log_level = level
  146. class _TxaioFileHandler(logging.Handler, object):
  147. def __init__(self, fileobj, **kw):
  148. super(_TxaioFileHandler, self).__init__(**kw)
  149. self._file = fileobj
  150. self._encode = guess_stream_needs_encoding(fileobj)
  151. def emit(self, record):
  152. if isinstance(record.args, dict):
  153. fmt = record.args.get(
  154. 'log_format',
  155. record.args.get('log_message', '')
  156. )
  157. message = fmt.format(**record.args)
  158. dt = datetime.fromtimestamp(record.args.get('log_time', 0))
  159. else:
  160. message = record.getMessage()
  161. if record.levelno == logging.ERROR and record.exc_info:
  162. message += '\n'
  163. for line in traceback.format_exception(*record.exc_info):
  164. message = message + line
  165. dt = datetime.fromtimestamp(record.created)
  166. msg = '{0} {1}{2}'.format(
  167. dt.strftime("%Y-%m-%dT%H:%M:%S%z"),
  168. message,
  169. os.linesep
  170. )
  171. if self._encode:
  172. msg = msg.encode('utf8')
  173. self._file.write(msg)
  174. def make_logger():
  175. # we want the namespace to be the calling context of "make_logger"
  176. # otherwise the root logger will be returned
  177. cf = inspect.currentframe().f_back
  178. if "self" in cf.f_locals:
  179. # We're probably in a class init or method
  180. cls = cf.f_locals["self"].__class__
  181. namespace = '{0}.{1}'.format(cls.__module__, cls.__name__)
  182. else:
  183. namespace = cf.f_globals["__name__"]
  184. if cf.f_code.co_name != "<module>":
  185. # If it's not the module, and not in a class instance, add the code
  186. # object's name.
  187. namespace = namespace + "." + cf.f_code.co_name
  188. logger = _TxaioLogWrapper(logging.getLogger(name=namespace))
  189. # remember this so we can set their levels properly once
  190. # start_logging is actually called
  191. _loggers.add(logger)
  192. return logger
  193. def start_logging(out=_stdout, level='info'):
  194. """
  195. Begin logging.
  196. :param out: if provided, a file-like object to log to. By default, this is
  197. stdout.
  198. :param level: the maximum log-level to emit (a string)
  199. """
  200. global _log_level, _loggers, _started_logging
  201. if level not in log_levels:
  202. raise RuntimeError(
  203. "Invalid log level '{0}'; valid are: {1}".format(
  204. level, ', '.join(log_levels)
  205. )
  206. )
  207. if _started_logging:
  208. return
  209. _started_logging = True
  210. _log_level = level
  211. handler = _TxaioFileHandler(out)
  212. logging.getLogger().addHandler(handler)
  213. # note: Don't need to call basicConfig() or similar, because we've
  214. # now added at least one handler to the root logger
  215. logging.raiseExceptions = True # FIXME
  216. level_to_stdlib = {
  217. 'critical': logging.CRITICAL,
  218. 'error': logging.ERROR,
  219. 'warn': logging.WARNING,
  220. 'info': logging.INFO,
  221. 'debug': logging.DEBUG,
  222. 'trace': logging.DEBUG,
  223. }
  224. logging.getLogger().setLevel(level_to_stdlib[level])
  225. # make sure any loggers we created before now have their log-level
  226. # set (any created after now will get it from _log_level
  227. for logger in _loggers:
  228. logger._set_log_level(level)
  229. def set_global_log_level(level):
  230. """
  231. Set the global log level on all loggers instantiated by txaio.
  232. """
  233. for logger in _loggers:
  234. logger._set_log_level(level)
  235. global _log_level
  236. _log_level = level
  237. def get_global_log_level():
  238. return _log_level
  239. # asyncio API methods; the module-level functions are (now, for
  240. # backwards-compat) exported from a default instance of this class
  241. _unspecified = object()
  242. class _AsyncioApi(object):
  243. using_twisted = False
  244. using_asyncio = True
  245. def __init__(self, config):
  246. self._config = config
  247. @property
  248. def _loop(self):
  249. # if configured explicetly, then use this loop
  250. if self._config.loop:
  251. return self._config.loop
  252. # otherwise give out the event loop of the thread this is called in
  253. # rather fetching the loop once in __init__, which may not neccessarily
  254. # be called from the thread we now run the event loop in.
  255. return asyncio.get_event_loop()
  256. def failure_message(self, fail):
  257. """
  258. :param fail: must be an IFailedFuture
  259. returns a unicode error-message
  260. """
  261. try:
  262. return '{0}: {1}'.format(
  263. fail._value.__class__.__name__,
  264. str(fail._value),
  265. )
  266. except Exception:
  267. return 'Failed to produce failure message for "{0}"'.format(fail)
  268. def failure_traceback(self, fail):
  269. """
  270. :param fail: must be an IFailedFuture
  271. returns a traceback instance
  272. """
  273. return fail._traceback
  274. def failure_format_traceback(self, fail):
  275. """
  276. :param fail: must be an IFailedFuture
  277. returns a string
  278. """
  279. try:
  280. f = io.StringIO()
  281. traceback.print_exception(
  282. fail._type,
  283. fail.value,
  284. fail._traceback,
  285. file=f,
  286. )
  287. return f.getvalue()
  288. except Exception:
  289. return "Failed to format failure traceback for '{0}'".format(fail)
  290. def create_future(self, result=_unspecified, error=_unspecified, canceller=_unspecified):
  291. if result is not _unspecified and error is not _unspecified:
  292. raise ValueError("Cannot have both result and error.")
  293. f = self._loop.create_future()
  294. if result is not _unspecified:
  295. resolve(f, result)
  296. elif error is not _unspecified:
  297. reject(f, error)
  298. # Twisted's only API for cancelling is to pass a
  299. # single-argument callable to the Deferred constructor, so
  300. # txaio apes that here for asyncio. The argument is the Future
  301. # that has been cancelled.
  302. if canceller is not _unspecified:
  303. def done(f):
  304. try:
  305. f.exception()
  306. except asyncio.CancelledError:
  307. canceller(f)
  308. f.add_done_callback(done)
  309. return f
  310. def create_future_success(self, result):
  311. return self.create_future(result=result)
  312. def create_future_error(self, error=None):
  313. f = self.create_future()
  314. reject(f, error)
  315. return f
  316. def as_future(self, fun, *args, **kwargs):
  317. try:
  318. res = fun(*args, **kwargs)
  319. except Exception:
  320. return create_future_error(create_failure())
  321. else:
  322. if isinstance(res, Future):
  323. return res
  324. elif iscoroutine(res):
  325. return self._loop.create_task(res)
  326. elif isinstance(res, AsyncGeneratorType):
  327. raise RuntimeError(
  328. "as_future() received an async generator function; does "
  329. "'{}' use 'yield' when you meant 'await'?".format(
  330. str(fun)
  331. )
  332. )
  333. else:
  334. return create_future_success(res)
  335. def is_future(self, obj):
  336. return iscoroutine(obj) or isinstance(obj, Future)
  337. def call_later(self, delay, fun, *args, **kwargs):
  338. # loop.call_later doesn't support kwargs
  339. real_call = functools.partial(fun, *args, **kwargs)
  340. return self._loop.call_later(delay, real_call)
  341. def make_batched_timer(self, bucket_seconds, chunk_size=100):
  342. """
  343. Creates and returns an object implementing
  344. :class:`txaio.IBatchedTimer`.
  345. :param bucket_seconds: the number of seconds in each bucket. That
  346. is, a value of 5 means that any timeout within a 5 second
  347. window will be in the same bucket, and get notified at the
  348. same time. This is only accurate to "milliseconds".
  349. :param chunk_size: when "doing" the callbacks in a particular
  350. bucket, this controls how many we do at once before yielding to
  351. the reactor.
  352. """
  353. def get_seconds():
  354. return self._loop.time()
  355. return _BatchedTimer(
  356. bucket_seconds * 1000.0, chunk_size,
  357. seconds_provider=get_seconds,
  358. delayed_call_creator=self.call_later,
  359. )
  360. def is_called(self, future):
  361. return future.done()
  362. def resolve(self, future, result=None):
  363. future.set_result(result)
  364. def reject(self, future, error=None):
  365. if error is None:
  366. error = create_failure() # will be error if we're not in an "except"
  367. elif isinstance(error, Exception):
  368. error = FailedFuture(type(error), error, None)
  369. else:
  370. if not isinstance(error, IFailedFuture):
  371. raise RuntimeError("reject requires an IFailedFuture or Exception")
  372. future.set_exception(error.value)
  373. def cancel(self, future, msg=None):
  374. if sys.version_info >= (3, 9):
  375. future.cancel(msg)
  376. else:
  377. future.cancel()
  378. def create_failure(self, exception=None):
  379. """
  380. This returns an object implementing IFailedFuture.
  381. If exception is None (the default) we MUST be called within an
  382. "except" block (such that sys.exc_info() returns useful
  383. information).
  384. """
  385. if exception:
  386. return FailedFuture(type(exception), exception, None)
  387. return FailedFuture(*sys.exc_info())
  388. def add_callbacks(self, future, callback, errback):
  389. """
  390. callback or errback may be None, but at least one must be
  391. non-None.
  392. """
  393. def done(f):
  394. try:
  395. res = f.result()
  396. if callback:
  397. callback(res)
  398. except (Exception, asyncio.CancelledError):
  399. if errback:
  400. errback(create_failure())
  401. return future.add_done_callback(done)
  402. def gather(self, futures, consume_exceptions=True):
  403. """
  404. This returns a Future that waits for all the Futures in the list
  405. ``futures``
  406. :param futures: a list of Futures (or coroutines?)
  407. :param consume_exceptions: if True, any errors are eaten and
  408. returned in the result list.
  409. """
  410. # from the asyncio docs: "If return_exceptions is True, exceptions
  411. # in the tasks are treated the same as successful results, and
  412. # gathered in the result list; otherwise, the first raised
  413. # exception will be immediately propagated to the returned
  414. # future."
  415. return asyncio.gather(*futures, return_exceptions=consume_exceptions)
  416. def sleep(self, delay):
  417. """
  418. Inline sleep for use in co-routines.
  419. :param delay: Time to sleep in seconds.
  420. :type delay: float
  421. """
  422. return asyncio.ensure_future(asyncio.sleep(delay))
  423. _default_api = _AsyncioApi(config)
  424. using_twisted = _default_api.using_twisted
  425. using_asyncio = _default_api.using_asyncio
  426. sleep = _default_api.sleep
  427. failure_message = _default_api.failure_message
  428. failure_traceback = _default_api.failure_traceback
  429. failure_format_traceback = _default_api.failure_format_traceback
  430. create_future = _default_api.create_future
  431. create_future_success = _default_api.create_future_success
  432. create_future_error = _default_api.create_future_error
  433. as_future = _default_api.as_future
  434. is_future = _default_api.is_future
  435. call_later = _default_api.call_later
  436. make_batched_timer = _default_api.make_batched_timer
  437. is_called = _default_api.is_called
  438. resolve = _default_api.resolve
  439. reject = _default_api.reject
  440. cancel = _default_api.cancel
  441. create_failure = _default_api.create_failure
  442. add_callbacks = _default_api.add_callbacks
  443. gather = _default_api.gather
  444. sleep = _default_api.sleep
  445. time_ns = _util.time_ns
  446. perf_counter_ns = _util.perf_counter_ns