Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

sync.py 20KB


  1. import asyncio
  2. import asyncio.coroutines
  3. import contextvars
  4. import functools
  5. import inspect
  6. import os
  7. import sys
  8. import threading
  9. import warnings
  10. import weakref
  11. from concurrent.futures import Future, ThreadPoolExecutor
  12. from typing import Any, Callable, Dict, Optional, overload
  13. from .current_thread_executor import CurrentThreadExecutor
  14. from .local import Local
  15. def _restore_context(context):
  16. # Check for changes in contextvars, and set them to the current
  17. # context for downstream consumers
  18. for cvar in context:
  19. try:
  20. if cvar.get() != context.get(cvar):
  21. cvar.set(context.get(cvar))
  22. except LookupError:
  23. cvar.set(context.get(cvar))
  24. # Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
  25. # inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
  26. # The latter is replaced with the inspect.markcoroutinefunction decorator.
  27. # Until 3.12 is the minimum supported Python version, provide a shim.
  28. # Django 4.0 only supports 3.8+, so don't concern with the _or_partial backport.
  29. # Type hint: should be generic: whatever T it takes it returns. (Same id)
  30. def markcoroutinefunction(func: Any) -> Any:
  31. if hasattr(inspect, "markcoroutinefunction"):
  32. return inspect.markcoroutinefunction(func)
  33. else:
  34. func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
  35. return func
  36. def iscoroutinefunction(func: Any) -> bool:
  37. if hasattr(inspect, "markcoroutinefunction"):
  38. return inspect.iscoroutinefunction(func)
  39. else:
  40. return asyncio.iscoroutinefunction(func)
  41. def _iscoroutinefunction_or_partial(func: Any) -> bool:
  42. # Python < 3.8 does not correctly determine partially wrapped
  43. # coroutine functions are coroutine functions, hence the need for
  44. # this to exist. Code taken from CPython.
  45. if sys.version_info >= (3, 8):
  46. return iscoroutinefunction(func)
  47. else:
  48. while inspect.ismethod(func):
  49. func = func.__func__
  50. while isinstance(func, functools.partial):
  51. func = func.func
  52. return iscoroutinefunction(func)
  53. class ThreadSensitiveContext:
  54. """Async context manager to manage context for thread sensitive mode
  55. This context manager controls which thread pool executor is used when in
  56. thread sensitive mode. By default, a single thread pool executor is shared
  57. within a process.
  58. In Python 3.7+, the ThreadSensitiveContext() context manager may be used to
  59. specify a thread pool per context.
  60. This context manager is re-entrant, so only the outer-most call to
  61. ThreadSensitiveContext will set the context.
  62. Usage:
  63. >>> import time
  64. >>> async with ThreadSensitiveContext():
  65. ... await sync_to_async(time.sleep, 1)()
  66. """
  67. def __init__(self):
  68. self.token = None
  69. async def __aenter__(self):
  70. try:
  71. SyncToAsync.thread_sensitive_context.get()
  72. except LookupError:
  73. self.token = SyncToAsync.thread_sensitive_context.set(self)
  74. return self
  75. async def __aexit__(self, exc, value, tb):
  76. if not self.token:
  77. return
  78. executor = SyncToAsync.context_to_thread_executor.pop(self, None)
  79. if executor:
  80. executor.shutdown()
  81. SyncToAsync.thread_sensitive_context.reset(self.token)
  82. class AsyncToSync:
  83. """
  84. Utility class which turns an awaitable that only works on the thread with
  85. the event loop into a synchronous callable that works in a subthread.
  86. If the call stack contains an async loop, the code runs there.
  87. Otherwise, the code runs in a new loop in a new thread.
  88. Either way, this thread then pauses and waits to run any thread_sensitive
  89. code called from further down the call stack using SyncToAsync, before
  90. finally exiting once the async task returns.
  91. """
  92. # Maps launched Tasks to the threads that launched them (for locals impl)
  93. launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {}
  94. # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref
  95. # Local, not a threadlocal, so that tasks can work out what their parent used.
  96. executors = Local()
  97. # When we can't find a CurrentThreadExecutor from the context, such as
  98. # inside create_task, we'll look it up here from the running event loop.
  99. loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
  100. def __init__(self, awaitable, force_new_loop=False):
  101. if not callable(awaitable) or (
  102. not _iscoroutinefunction_or_partial(awaitable)
  103. and not _iscoroutinefunction_or_partial(
  104. getattr(awaitable, "__call__", awaitable)
  105. )
  106. ):
  107. # Python does not have very reliable detection of async functions
  108. # (lots of false negatives) so this is just a warning.
  109. warnings.warn(
  110. "async_to_sync was passed a non-async-marked callable", stacklevel=2
  111. )
  112. self.awaitable = awaitable
  113. try:
  114. self.__self__ = self.awaitable.__self__
  115. except AttributeError:
  116. pass
  117. if force_new_loop:
  118. # They have asked that we always run in a new sub-loop.
  119. self.main_event_loop = None
  120. else:
  121. try:
  122. self.main_event_loop = asyncio.get_running_loop()
  123. except RuntimeError:
  124. # There's no event loop in this thread. Look for the threadlocal if
  125. # we're inside SyncToAsync
  126. main_event_loop_pid = getattr(
  127. SyncToAsync.threadlocal, "main_event_loop_pid", None
  128. )
  129. # We make sure the parent loop is from the same process - if
  130. # they've forked, this is not going to be valid any more (#194)
  131. if main_event_loop_pid and main_event_loop_pid == os.getpid():
  132. self.main_event_loop = getattr(
  133. SyncToAsync.threadlocal, "main_event_loop", None
  134. )
  135. else:
  136. self.main_event_loop = None
  137. def __call__(self, *args, **kwargs):
  138. # You can't call AsyncToSync from a thread with a running event loop
  139. try:
  140. event_loop = asyncio.get_running_loop()
  141. except RuntimeError:
  142. pass
  143. else:
  144. if event_loop.is_running():
  145. raise RuntimeError(
  146. "You cannot use AsyncToSync in the same thread as an async event loop - "
  147. "just await the async function directly."
  148. )
  149. # Wrapping context in list so it can be reassigned from within
  150. # `main_wrap`.
  151. context = [contextvars.copy_context()]
  152. # Make a future for the return information
  153. call_result = Future()
  154. # Get the source thread
  155. source_thread = threading.current_thread()
  156. # Make a CurrentThreadExecutor we'll use to idle in this thread - we
  157. # need one for every sync frame, even if there's one above us in the
  158. # same thread.
  159. if hasattr(self.executors, "current"):
  160. old_current_executor = self.executors.current
  161. else:
  162. old_current_executor = None
  163. current_executor = CurrentThreadExecutor()
  164. self.executors.current = current_executor
  165. loop = None
  166. # Use call_soon_threadsafe to schedule a synchronous callback on the
  167. # main event loop's thread if it's there, otherwise make a new loop
  168. # in this thread.
  169. try:
  170. awaitable = self.main_wrap(
  171. args, kwargs, call_result, source_thread, sys.exc_info(), context
  172. )
  173. if not (self.main_event_loop and self.main_event_loop.is_running()):
  174. # Make our own event loop - in a new thread - and run inside that.
  175. loop = asyncio.new_event_loop()
  176. self.loop_thread_executors[loop] = current_executor
  177. loop_executor = ThreadPoolExecutor(max_workers=1)
  178. loop_future = loop_executor.submit(
  179. self._run_event_loop, loop, awaitable
  180. )
  181. if current_executor:
  182. # Run the CurrentThreadExecutor until the future is done
  183. current_executor.run_until_future(loop_future)
  184. # Wait for future and/or allow for exception propagation
  185. loop_future.result()
  186. else:
  187. # Call it inside the existing loop
  188. self.main_event_loop.call_soon_threadsafe(
  189. self.main_event_loop.create_task, awaitable
  190. )
  191. if current_executor:
  192. # Run the CurrentThreadExecutor until the future is done
  193. current_executor.run_until_future(call_result)
  194. finally:
  195. # Clean up any executor we were running
  196. if loop is not None:
  197. del self.loop_thread_executors[loop]
  198. if hasattr(self.executors, "current"):
  199. del self.executors.current
  200. if old_current_executor:
  201. self.executors.current = old_current_executor
  202. _restore_context(context[0])
  203. # Wait for results from the future.
  204. return call_result.result()
  205. def _run_event_loop(self, loop, coro):
  206. """
  207. Runs the given event loop (designed to be called in a thread).
  208. """
  209. asyncio.set_event_loop(loop)
  210. try:
  211. loop.run_until_complete(coro)
  212. finally:
  213. try:
  214. # mimic asyncio.run() behavior
  215. # cancel unexhausted async generators
  216. tasks = asyncio.all_tasks(loop)
  217. for task in tasks:
  218. task.cancel()
  219. async def gather():
  220. await asyncio.gather(*tasks, return_exceptions=True)
  221. loop.run_until_complete(gather())
  222. for task in tasks:
  223. if task.cancelled():
  224. continue
  225. if task.exception() is not None:
  226. loop.call_exception_handler(
  227. {
  228. "message": "unhandled exception during loop shutdown",
  229. "exception": task.exception(),
  230. "task": task,
  231. }
  232. )
  233. if hasattr(loop, "shutdown_asyncgens"):
  234. loop.run_until_complete(loop.shutdown_asyncgens())
  235. finally:
  236. loop.close()
  237. asyncio.set_event_loop(self.main_event_loop)
  238. def __get__(self, parent, objtype):
  239. """
  240. Include self for methods
  241. """
  242. func = functools.partial(self.__call__, parent)
  243. return functools.update_wrapper(func, self.awaitable)
  244. async def main_wrap(
  245. self, args, kwargs, call_result, source_thread, exc_info, context
  246. ):
  247. """
  248. Wraps the awaitable with something that puts the result into the
  249. result/exception future.
  250. """
  251. if context is not None:
  252. _restore_context(context[0])
  253. current_task = SyncToAsync.get_current_task()
  254. self.launch_map[current_task] = source_thread
  255. try:
  256. # If we have an exception, run the function inside the except block
  257. # after raising it so exc_info is correctly populated.
  258. if exc_info[1]:
  259. try:
  260. raise exc_info[1]
  261. except BaseException:
  262. result = await self.awaitable(*args, **kwargs)
  263. else:
  264. result = await self.awaitable(*args, **kwargs)
  265. except BaseException as e:
  266. call_result.set_exception(e)
  267. else:
  268. call_result.set_result(result)
  269. finally:
  270. del self.launch_map[current_task]
  271. context[0] = contextvars.copy_context()
  272. class SyncToAsync:
  273. """
  274. Utility class which turns a synchronous callable into an awaitable that
  275. runs in a threadpool. It also sets a threadlocal inside the thread so
  276. calls to AsyncToSync can escape it.
  277. If thread_sensitive is passed, the code will run in the same thread as any
  278. outer code. This is needed for underlying Python code that is not
  279. threadsafe (for example, code which handles SQLite database connections).
  280. If the outermost program is async (i.e. SyncToAsync is outermost), then
  281. this will be a dedicated single sub-thread that all sync code runs in,
  282. one after the other. If the outermost program is sync (i.e. AsyncToSync is
  283. outermost), this will just be the main thread. This is achieved by idling
  284. with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
  285. rather than just blocking.
  286. If executor is passed in, that will be used instead of the loop's default executor.
  287. In order to pass in an executor, thread_sensitive must be set to False, otherwise
  288. a TypeError will be raised.
  289. """
  290. # Maps launched threads to the coroutines that spawned them
  291. launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {}
  292. # Storage for main event loop references
  293. threadlocal = threading.local()
  294. # Single-thread executor for thread-sensitive code
  295. single_thread_executor = ThreadPoolExecutor(max_workers=1)
  296. # Maintain a contextvar for the current execution context. Optionally used
  297. # for thread sensitive mode.
  298. thread_sensitive_context: "contextvars.ContextVar[str]" = contextvars.ContextVar(
  299. "thread_sensitive_context"
  300. )
  301. # Contextvar that is used to detect if the single thread executor
  302. # would be awaited on while already being used in the same context
  303. deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
  304. "deadlock_context"
  305. )
  306. # Maintaining a weak reference to the context ensures that thread pools are
  307. # erased once the context goes out of scope. This terminates the thread pool.
  308. context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = (
  309. weakref.WeakKeyDictionary()
  310. )
  311. def __init__(
  312. self,
  313. func: Callable[..., Any],
  314. thread_sensitive: bool = True,
  315. executor: Optional["ThreadPoolExecutor"] = None,
  316. ) -> None:
  317. if (
  318. not callable(func)
  319. or _iscoroutinefunction_or_partial(func)
  320. or _iscoroutinefunction_or_partial(getattr(func, "__call__", func))
  321. ):
  322. raise TypeError("sync_to_async can only be applied to sync functions.")
  323. self.func = func
  324. functools.update_wrapper(self, func)
  325. self._thread_sensitive = thread_sensitive
  326. markcoroutinefunction(self)
  327. if thread_sensitive and executor is not None:
  328. raise TypeError("executor must not be set when thread_sensitive is True")
  329. self._executor = executor
  330. try:
  331. self.__self__ = func.__self__ # type: ignore
  332. except AttributeError:
  333. pass
  334. async def __call__(self, *args, **kwargs):
  335. loop = asyncio.get_running_loop()
  336. # Work out what thread to run the code in
  337. if self._thread_sensitive:
  338. if hasattr(AsyncToSync.executors, "current"):
  339. # If we have a parent sync thread above somewhere, use that
  340. executor = AsyncToSync.executors.current
  341. elif self.thread_sensitive_context and self.thread_sensitive_context.get(
  342. None
  343. ):
  344. # If we have a way of retrieving the current context, attempt
  345. # to use a per-context thread pool executor
  346. thread_sensitive_context = self.thread_sensitive_context.get()
  347. if thread_sensitive_context in self.context_to_thread_executor:
  348. # Re-use thread executor in current context
  349. executor = self.context_to_thread_executor[thread_sensitive_context]
  350. else:
  351. # Create new thread executor in current context
  352. executor = ThreadPoolExecutor(max_workers=1)
  353. self.context_to_thread_executor[thread_sensitive_context] = executor
  354. elif loop in AsyncToSync.loop_thread_executors:
  355. # Re-use thread executor for running loop
  356. executor = AsyncToSync.loop_thread_executors[loop]
  357. elif self.deadlock_context and self.deadlock_context.get(False):
  358. raise RuntimeError(
  359. "Single thread executor already being used, would deadlock"
  360. )
  361. else:
  362. # Otherwise, we run it in a fixed single thread
  363. executor = self.single_thread_executor
  364. if self.deadlock_context:
  365. self.deadlock_context.set(True)
  366. else:
  367. # Use the passed in executor, or the loop's default if it is None
  368. executor = self._executor
  369. context = contextvars.copy_context()
  370. child = functools.partial(self.func, *args, **kwargs)
  371. func = context.run
  372. args = (child,)
  373. kwargs = {}
  374. try:
  375. # Run the code in the right thread
  376. future = loop.run_in_executor(
  377. executor,
  378. functools.partial(
  379. self.thread_handler,
  380. loop,
  381. self.get_current_task(),
  382. sys.exc_info(),
  383. func,
  384. *args,
  385. **kwargs,
  386. ),
  387. )
  388. ret = await asyncio.wait_for(future, timeout=None)
  389. finally:
  390. _restore_context(context)
  391. if self.deadlock_context:
  392. self.deadlock_context.set(False)
  393. return ret
  394. def __get__(self, parent, objtype):
  395. """
  396. Include self for methods
  397. """
  398. func = functools.partial(self.__call__, parent)
  399. return functools.update_wrapper(func, self.func)
  400. def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):
  401. """
  402. Wraps the sync application with exception handling.
  403. """
  404. # Set the threadlocal for AsyncToSync
  405. self.threadlocal.main_event_loop = loop
  406. self.threadlocal.main_event_loop_pid = os.getpid()
  407. # Set the task mapping (used for the locals module)
  408. current_thread = threading.current_thread()
  409. if AsyncToSync.launch_map.get(source_task) == current_thread:
  410. # Our parent task was launched from this same thread, so don't make
  411. # a launch map entry - let it shortcut over us! (and stop infinite loops)
  412. parent_set = False
  413. else:
  414. self.launch_map[current_thread] = source_task
  415. parent_set = True
  416. # Run the function
  417. try:
  418. # If we have an exception, run the function inside the except block
  419. # after raising it so exc_info is correctly populated.
  420. if exc_info[1]:
  421. try:
  422. raise exc_info[1]
  423. except BaseException:
  424. return func(*args, **kwargs)
  425. else:
  426. return func(*args, **kwargs)
  427. finally:
  428. # Only delete the launch_map parent if we set it, otherwise it is
  429. # from someone else.
  430. if parent_set:
  431. del self.launch_map[current_thread]
  432. @staticmethod
  433. def get_current_task():
  434. """
  435. Implementation of asyncio.current_task()
  436. that returns None if there is no task.
  437. """
  438. try:
  439. return asyncio.current_task()
  440. except RuntimeError:
  441. return None
  442. # Lowercase aliases (and decorator friendliness)
  443. async_to_sync = AsyncToSync
  444. @overload
  445. def sync_to_async(
  446. func: None = None,
  447. thread_sensitive: bool = True,
  448. executor: Optional["ThreadPoolExecutor"] = None,
  449. ) -> Callable[[Callable[..., Any]], SyncToAsync]:
  450. ...
  451. @overload
  452. def sync_to_async(
  453. func: Callable[..., Any],
  454. thread_sensitive: bool = True,
  455. executor: Optional["ThreadPoolExecutor"] = None,
  456. ) -> SyncToAsync:
  457. ...
  458. def sync_to_async(
  459. func=None,
  460. thread_sensitive=True,
  461. executor=None,
  462. ):
  463. if func is None:
  464. return lambda f: SyncToAsync(
  465. f,
  466. thread_sensitive=thread_sensitive,
  467. executor=executor,
  468. )
  469. return SyncToAsync(
  470. func,
  471. thread_sensitive=thread_sensitive,
  472. executor=executor,
  473. )