Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_asynctest.py 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. # -*- test-case-name: twisted.trial.test -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Things likely to be used by writers of unit tests.
  6. Maintainer: Jonathan Lange
  7. """
  8. import inspect
  9. import warnings
  10. from typing import List
  11. from zope.interface import implementer
  12. # We can't import reactor at module-level because this code runs before trial
  13. # installs a user-specified reactor, installing the default reactor and
  14. # breaking reactor installation. See also #6047.
  15. from twisted.internet import defer, utils
  16. from twisted.python import failure
  17. from twisted.trial import itrial, util
  18. from twisted.trial._synctest import FailTest, SkipTest, SynchronousTestCase
  19. _wait_is_running: List[None] = []
  20. @implementer(itrial.ITestCase)
  21. class TestCase(SynchronousTestCase):
  22. """
  23. A unit test. The atom of the unit testing universe.
  24. This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
  25. from the standard library. The main feature is the ability to return
  26. C{Deferred}s from tests and fixture methods and to have the suite wait for
  27. those C{Deferred}s to fire. Also provides new assertions such as
  28. L{assertFailure}.
  29. @ivar timeout: A real number of seconds. If set, the test will
  30. raise an error if it takes longer than C{timeout} seconds.
  31. If not set, util.DEFAULT_TIMEOUT_DURATION is used.
  32. """
  33. def __init__(self, methodName="runTest"):
  34. """
  35. Construct an asynchronous test case for C{methodName}.
  36. @param methodName: The name of a method on C{self}. This method should
  37. be a unit test. That is, it should be a short method that calls some of
  38. the assert* methods. If C{methodName} is unspecified,
  39. L{SynchronousTestCase.runTest} will be used as the test method. This is
  40. mostly useful for testing Trial.
  41. """
  42. super().__init__(methodName)
  43. def assertFailure(self, deferred, *expectedFailures):
  44. """
  45. Fail if C{deferred} does not errback with one of C{expectedFailures}.
  46. Returns the original Deferred with callbacks added. You will need
  47. to return this Deferred from your test case.
  48. """
  49. def _cb(ignore):
  50. raise self.failureException(
  51. f"did not catch an error, instead got {ignore!r}"
  52. )
  53. def _eb(failure):
  54. if failure.check(*expectedFailures):
  55. return failure.value
  56. else:
  57. output = "\nExpected: {!r}\nGot:\n{}".format(
  58. expectedFailures, str(failure)
  59. )
  60. raise self.failureException(output)
  61. return deferred.addCallbacks(_cb, _eb)
  62. failUnlessFailure = assertFailure
  63. def _run(self, methodName, result):
  64. from twisted.internet import reactor
  65. timeout = self.getTimeout()
  66. def onTimeout(d):
  67. e = defer.TimeoutError(
  68. f"{self!r} ({methodName}) still running at {timeout} secs"
  69. )
  70. f = failure.Failure(e)
  71. # try to errback the deferred that the test returns (for no gorram
  72. # reason) (see issue1005 and test_errorPropagation in
  73. # test_deferred)
  74. try:
  75. d.errback(f)
  76. except defer.AlreadyCalledError:
  77. # if the deferred has been called already but the *back chain
  78. # is still unfinished, crash the reactor and report timeout
  79. # error ourself.
  80. reactor.crash()
  81. self._timedOut = True # see self._wait
  82. todo = self.getTodo()
  83. if todo is not None and todo.expected(f):
  84. result.addExpectedFailure(self, f, todo)
  85. else:
  86. result.addError(self, f)
  87. onTimeout = utils.suppressWarnings(
  88. onTimeout, util.suppress(category=DeprecationWarning)
  89. )
  90. method = getattr(self, methodName)
  91. if inspect.isgeneratorfunction(method):
  92. exc = TypeError(
  93. "{!r} is a generator function and therefore will never run".format(
  94. method
  95. )
  96. )
  97. return defer.fail(exc)
  98. d = defer.maybeDeferred(
  99. utils.runWithWarningsSuppressed, self._getSuppress(), method
  100. )
  101. call = reactor.callLater(timeout, onTimeout, d)
  102. d.addBoth(lambda x: call.active() and call.cancel() or x)
  103. return d
  104. def __call__(self, *args, **kwargs):
  105. return self.run(*args, **kwargs)
  106. def deferSetUp(self, ignored, result):
  107. d = self._run("setUp", result)
  108. d.addCallbacks(
  109. self.deferTestMethod,
  110. self._ebDeferSetUp,
  111. callbackArgs=(result,),
  112. errbackArgs=(result,),
  113. )
  114. return d
  115. def _ebDeferSetUp(self, failure, result):
  116. if failure.check(SkipTest):
  117. result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
  118. else:
  119. result.addError(self, failure)
  120. if failure.check(KeyboardInterrupt):
  121. result.stop()
  122. return self.deferRunCleanups(None, result)
  123. def deferTestMethod(self, ignored, result):
  124. d = self._run(self._testMethodName, result)
  125. d.addCallbacks(
  126. self._cbDeferTestMethod,
  127. self._ebDeferTestMethod,
  128. callbackArgs=(result,),
  129. errbackArgs=(result,),
  130. )
  131. d.addBoth(self.deferRunCleanups, result)
  132. d.addBoth(self.deferTearDown, result)
  133. return d
  134. def _cbDeferTestMethod(self, ignored, result):
  135. if self.getTodo() is not None:
  136. result.addUnexpectedSuccess(self, self.getTodo())
  137. else:
  138. self._passed = True
  139. return ignored
  140. def _ebDeferTestMethod(self, f, result):
  141. todo = self.getTodo()
  142. if todo is not None and todo.expected(f):
  143. result.addExpectedFailure(self, f, todo)
  144. elif f.check(self.failureException, FailTest):
  145. result.addFailure(self, f)
  146. elif f.check(KeyboardInterrupt):
  147. result.addError(self, f)
  148. result.stop()
  149. elif f.check(SkipTest):
  150. result.addSkip(
  151. self, self._getSkipReason(getattr(self, self._testMethodName), f.value)
  152. )
  153. else:
  154. result.addError(self, f)
  155. def deferTearDown(self, ignored, result):
  156. d = self._run("tearDown", result)
  157. d.addErrback(self._ebDeferTearDown, result)
  158. return d
  159. def _ebDeferTearDown(self, failure, result):
  160. result.addError(self, failure)
  161. if failure.check(KeyboardInterrupt):
  162. result.stop()
  163. self._passed = False
  164. @defer.inlineCallbacks
  165. def deferRunCleanups(self, ignored, result):
  166. """
  167. Run any scheduled cleanups and report errors (if any) to the result.
  168. object.
  169. """
  170. failures = []
  171. while len(self._cleanups) > 0:
  172. func, args, kwargs = self._cleanups.pop()
  173. try:
  174. yield func(*args, **kwargs)
  175. except Exception:
  176. failures.append(failure.Failure())
  177. for f in failures:
  178. result.addError(self, f)
  179. self._passed = False
  180. def _cleanUp(self, result):
  181. try:
  182. clean = util._Janitor(self, result).postCaseCleanup()
  183. if not clean:
  184. self._passed = False
  185. except BaseException:
  186. result.addError(self, failure.Failure())
  187. self._passed = False
  188. for error in self._observer.getErrors():
  189. result.addError(self, error)
  190. self._passed = False
  191. self.flushLoggedErrors()
  192. self._removeObserver()
  193. if self._passed:
  194. result.addSuccess(self)
  195. def _classCleanUp(self, result):
  196. try:
  197. util._Janitor(self, result).postClassCleanup()
  198. except BaseException:
  199. result.addError(self, failure.Failure())
  200. def _makeReactorMethod(self, name):
  201. """
  202. Create a method which wraps the reactor method C{name}. The new
  203. method issues a deprecation warning and calls the original.
  204. """
  205. def _(*a, **kw):
  206. warnings.warn(
  207. "reactor.%s cannot be used inside unit tests. "
  208. "In the future, using %s will fail the test and may "
  209. "crash or hang the test run." % (name, name),
  210. stacklevel=2,
  211. category=DeprecationWarning,
  212. )
  213. return self._reactorMethods[name](*a, **kw)
  214. return _
  215. def _deprecateReactor(self, reactor):
  216. """
  217. Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
  218. each method is wrapped in a function that issues a deprecation
  219. warning, then calls the original.
  220. @param reactor: The Twisted reactor.
  221. """
  222. self._reactorMethods = {}
  223. for name in ["crash", "iterate", "stop"]:
  224. self._reactorMethods[name] = getattr(reactor, name)
  225. setattr(reactor, name, self._makeReactorMethod(name))
  226. def _undeprecateReactor(self, reactor):
  227. """
  228. Restore the deprecated reactor methods. Undoes what
  229. L{_deprecateReactor} did.
  230. @param reactor: The Twisted reactor.
  231. """
  232. for name, method in self._reactorMethods.items():
  233. setattr(reactor, name, method)
  234. self._reactorMethods = {}
  235. def _runFixturesAndTest(self, result):
  236. """
  237. Really run C{setUp}, the test method, and C{tearDown}. Any of these may
  238. return L{defer.Deferred}s. After they complete, do some reactor cleanup.
  239. @param result: A L{TestResult} object.
  240. """
  241. from twisted.internet import reactor
  242. self._deprecateReactor(reactor)
  243. self._timedOut = False
  244. try:
  245. d = self.deferSetUp(None, result)
  246. try:
  247. self._wait(d)
  248. finally:
  249. self._cleanUp(result)
  250. self._classCleanUp(result)
  251. finally:
  252. self._undeprecateReactor(reactor)
  253. def addCleanup(self, f, *args, **kwargs):
  254. """
  255. Extend the base cleanup feature with support for cleanup functions which
  256. return Deferreds.
  257. If the function C{f} returns a Deferred, C{TestCase} will wait until the
  258. Deferred has fired before proceeding to the next function.
  259. """
  260. return super().addCleanup(f, *args, **kwargs)
  261. def getSuppress(self):
  262. return self._getSuppress()
  263. def getTimeout(self):
  264. """
  265. Returns the timeout value set on this test. Checks on the instance
  266. first, then the class, then the module, then packages. As soon as it
  267. finds something with a C{timeout} attribute, returns that. Returns
  268. L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
  269. L{TestCase} docstring for more details.
  270. """
  271. timeout = util.acquireAttribute(
  272. self._parents, "timeout", util.DEFAULT_TIMEOUT_DURATION
  273. )
  274. try:
  275. return float(timeout)
  276. except (ValueError, TypeError):
  277. # XXX -- this is here because sometimes people will have methods
  278. # called 'timeout', or set timeout to 'orange', or something
  279. # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
  280. # both do this.
  281. warnings.warn(
  282. "'timeout' attribute needs to be a number.", category=DeprecationWarning
  283. )
  284. return util.DEFAULT_TIMEOUT_DURATION
  285. def _wait(self, d, running=_wait_is_running):
  286. """Take a Deferred that only ever callbacks. Block until it happens."""
  287. if running:
  288. raise RuntimeError("_wait is not reentrant")
  289. from twisted.internet import reactor
  290. results = []
  291. def append(any):
  292. if results is not None:
  293. results.append(any)
  294. def crash(ign):
  295. if results is not None:
  296. reactor.crash()
  297. crash = utils.suppressWarnings(
  298. crash,
  299. util.suppress(
  300. message=r"reactor\.crash cannot be used.*", category=DeprecationWarning
  301. ),
  302. )
  303. def stop():
  304. reactor.crash()
  305. stop = utils.suppressWarnings(
  306. stop,
  307. util.suppress(
  308. message=r"reactor\.crash cannot be used.*", category=DeprecationWarning
  309. ),
  310. )
  311. running.append(None)
  312. try:
  313. d.addBoth(append)
  314. if results:
  315. # d might have already been fired, in which case append is
  316. # called synchronously. Avoid any reactor stuff.
  317. return
  318. d.addBoth(crash)
  319. reactor.stop = stop
  320. try:
  321. reactor.run()
  322. finally:
  323. del reactor.stop
  324. # If the reactor was crashed elsewhere due to a timeout, hopefully
  325. # that crasher also reported an error. Just return.
  326. # _timedOut is most likely to be set when d has fired but hasn't
  327. # completed its callback chain (see self._run)
  328. if results or self._timedOut: # defined in run() and _run()
  329. return
  330. # If the timeout didn't happen, and we didn't get a result or
  331. # a failure, then the user probably aborted the test, so let's
  332. # just raise KeyboardInterrupt.
  333. # FIXME: imagine this:
  334. # web/test/test_webclient.py:
  335. # exc = self.assertRaises(error.Error, wait, method(url))
  336. #
  337. # wait() will raise KeyboardInterrupt, and assertRaises will
  338. # swallow it. Therefore, wait() raising KeyboardInterrupt is
  339. # insufficient to stop trial. A suggested solution is to have
  340. # this code set a "stop trial" flag, or otherwise notify trial
  341. # that it should really try to stop as soon as possible.
  342. raise KeyboardInterrupt()
  343. finally:
  344. results = None
  345. running.pop()