Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_threadpool.py 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.python.threadpool}
  5. """
  6. import gc
  7. import pickle
  8. import threading
  9. import time
  10. import weakref
  11. from twisted._threads import Team, createMemoryWorker
  12. from twisted.python import context, failure, threadable, threadpool
  13. from twisted.trial import unittest
  14. class Synchronization:
  15. failures = 0
  16. def __init__(self, N, waiting):
  17. self.N = N
  18. self.waiting = waiting
  19. self.lock = threading.Lock()
  20. self.runs = []
  21. def run(self):
  22. # This is the testy part: this is supposed to be invoked
  23. # serially from multiple threads. If that is actually the
  24. # case, we will never fail to acquire this lock. If it is
  25. # *not* the case, we might get here while someone else is
  26. # holding the lock.
  27. if self.lock.acquire(False):
  28. if not len(self.runs) % 5:
  29. # Constant selected based on empirical data to maximize the
  30. # chance of a quick failure if this code is broken.
  31. time.sleep(0.0002)
  32. self.lock.release()
  33. else:
  34. self.failures += 1
  35. # This is just the only way I can think of to wake up the test
  36. # method. It doesn't actually have anything to do with the
  37. # test.
  38. self.lock.acquire()
  39. self.runs.append(None)
  40. if len(self.runs) == self.N:
  41. self.waiting.release()
  42. self.lock.release()
  43. synchronized = ["run"]
  44. threadable.synchronize(Synchronization)
  45. class ThreadPoolTests(unittest.SynchronousTestCase):
  46. """
  47. Test threadpools.
  48. """
  49. def getTimeout(self):
  50. """
  51. Return number of seconds to wait before giving up.
  52. """
  53. return 5 # Really should be order of magnitude less
  54. def _waitForLock(self, lock):
  55. items = range(1000000)
  56. for i in items:
  57. if lock.acquire(False):
  58. break
  59. time.sleep(1e-5)
  60. else:
  61. self.fail("A long time passed without succeeding")
  62. def test_attributes(self):
  63. """
  64. L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
  65. L{ThreadPool.__init__}.
  66. """
  67. pool = threadpool.ThreadPool(12, 22)
  68. self.assertEqual(pool.min, 12)
  69. self.assertEqual(pool.max, 22)
  70. def test_start(self):
  71. """
  72. L{ThreadPool.start} creates the minimum number of threads specified.
  73. """
  74. pool = threadpool.ThreadPool(0, 5)
  75. pool.start()
  76. self.addCleanup(pool.stop)
  77. self.assertEqual(len(pool.threads), 0)
  78. pool = threadpool.ThreadPool(3, 10)
  79. self.assertEqual(len(pool.threads), 0)
  80. pool.start()
  81. self.addCleanup(pool.stop)
  82. self.assertEqual(len(pool.threads), 3)
  83. def test_adjustingWhenPoolStopped(self):
  84. """
  85. L{ThreadPool.adjustPoolsize} only modifies the pool size and does not
  86. start new workers while the pool is not running.
  87. """
  88. pool = threadpool.ThreadPool(0, 5)
  89. pool.start()
  90. pool.stop()
  91. pool.adjustPoolsize(2)
  92. self.assertEqual(len(pool.threads), 0)
  93. def test_threadCreationArguments(self):
  94. """
  95. Test that creating threads in the threadpool with application-level
  96. objects as arguments doesn't results in those objects never being
  97. freed, with the thread maintaining a reference to them as long as it
  98. exists.
  99. """
  100. tp = threadpool.ThreadPool(0, 1)
  101. tp.start()
  102. self.addCleanup(tp.stop)
  103. # Sanity check - no threads should have been started yet.
  104. self.assertEqual(tp.threads, [])
  105. # Here's our function
  106. def worker(arg):
  107. pass
  108. # weakref needs an object subclass
  109. class Dumb:
  110. pass
  111. # And here's the unique object
  112. unique = Dumb()
  113. workerRef = weakref.ref(worker)
  114. uniqueRef = weakref.ref(unique)
  115. # Put some work in
  116. tp.callInThread(worker, unique)
  117. # Add an event to wait completion
  118. event = threading.Event()
  119. tp.callInThread(event.set)
  120. event.wait(self.getTimeout())
  121. del worker
  122. del unique
  123. gc.collect()
  124. self.assertIsNone(uniqueRef())
  125. self.assertIsNone(workerRef())
  126. def test_threadCreationArgumentsCallInThreadWithCallback(self):
  127. """
  128. As C{test_threadCreationArguments} above, but for
  129. callInThreadWithCallback.
  130. """
  131. tp = threadpool.ThreadPool(0, 1)
  132. tp.start()
  133. self.addCleanup(tp.stop)
  134. # Sanity check - no threads should have been started yet.
  135. self.assertEqual(tp.threads, [])
  136. # this holds references obtained in onResult
  137. refdict = {} # name -> ref value
  138. onResultWait = threading.Event()
  139. onResultDone = threading.Event()
  140. resultRef = []
  141. # result callback
  142. def onResult(success, result):
  143. # Spin the GC, which should now delete worker and unique if it's
  144. # not held on to by callInThreadWithCallback after it is complete
  145. gc.collect()
  146. onResultWait.wait(self.getTimeout())
  147. refdict["workerRef"] = workerRef()
  148. refdict["uniqueRef"] = uniqueRef()
  149. onResultDone.set()
  150. resultRef.append(weakref.ref(result))
  151. # Here's our function
  152. def worker(arg, test):
  153. return Dumb()
  154. # weakref needs an object subclass
  155. class Dumb:
  156. pass
  157. # And here's the unique object
  158. unique = Dumb()
  159. onResultRef = weakref.ref(onResult)
  160. workerRef = weakref.ref(worker)
  161. uniqueRef = weakref.ref(unique)
  162. # Put some work in
  163. tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
  164. del worker
  165. del unique
  166. # let onResult collect the refs
  167. onResultWait.set()
  168. # wait for onResult
  169. onResultDone.wait(self.getTimeout())
  170. gc.collect()
  171. self.assertIsNone(uniqueRef())
  172. self.assertIsNone(workerRef())
  173. # XXX There's a race right here - has onResult in the worker thread
  174. # returned and the locals in _worker holding it and the result been
  175. # deleted yet?
  176. del onResult
  177. gc.collect()
  178. self.assertIsNone(onResultRef())
  179. self.assertIsNone(resultRef[0]())
  180. # The callback shouldn't have been able to resolve the references.
  181. self.assertEqual(list(refdict.values()), [None, None])
  182. def test_persistence(self):
  183. """
  184. Threadpools can be pickled and unpickled, which should preserve the
  185. number of threads and other parameters.
  186. """
  187. pool = threadpool.ThreadPool(7, 20)
  188. self.assertEqual(pool.min, 7)
  189. self.assertEqual(pool.max, 20)
  190. # check that unpickled threadpool has same number of threads
  191. copy = pickle.loads(pickle.dumps(pool))
  192. self.assertEqual(copy.min, 7)
  193. self.assertEqual(copy.max, 20)
  194. def _threadpoolTest(self, method):
  195. """
  196. Test synchronization of calls made with C{method}, which should be
  197. one of the mechanisms of the threadpool to execute work in threads.
  198. """
  199. # This is a schizophrenic test: it seems to be trying to test
  200. # both the callInThread()/dispatch() behavior of the ThreadPool as well
  201. # as the serialization behavior of threadable.synchronize(). It
  202. # would probably make more sense as two much simpler tests.
  203. N = 10
  204. tp = threadpool.ThreadPool()
  205. tp.start()
  206. self.addCleanup(tp.stop)
  207. waiting = threading.Lock()
  208. waiting.acquire()
  209. actor = Synchronization(N, waiting)
  210. for i in range(N):
  211. method(tp, actor)
  212. self._waitForLock(waiting)
  213. self.assertFalse(actor.failures, f"run() re-entered {actor.failures} times")
  214. def test_callInThread(self):
  215. """
  216. Call C{_threadpoolTest} with C{callInThread}.
  217. """
  218. return self._threadpoolTest(lambda tp, actor: tp.callInThread(actor.run))
  219. def test_callInThreadException(self):
  220. """
  221. L{ThreadPool.callInThread} logs exceptions raised by the callable it
  222. is passed.
  223. """
  224. class NewError(Exception):
  225. pass
  226. def raiseError():
  227. raise NewError()
  228. tp = threadpool.ThreadPool(0, 1)
  229. tp.callInThread(raiseError)
  230. tp.start()
  231. tp.stop()
  232. errors = self.flushLoggedErrors(NewError)
  233. self.assertEqual(len(errors), 1)
  234. def test_callInThreadWithCallback(self):
  235. """
  236. L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
  237. two-tuple of C{(True, result)} where C{result} is the value returned
  238. by the callable supplied.
  239. """
  240. waiter = threading.Lock()
  241. waiter.acquire()
  242. results = []
  243. def onResult(success, result):
  244. waiter.release()
  245. results.append(success)
  246. results.append(result)
  247. tp = threadpool.ThreadPool(0, 1)
  248. tp.callInThreadWithCallback(onResult, lambda: "test")
  249. tp.start()
  250. try:
  251. self._waitForLock(waiter)
  252. finally:
  253. tp.stop()
  254. self.assertTrue(results[0])
  255. self.assertEqual(results[1], "test")
  256. def test_callInThreadWithCallbackExceptionInCallback(self):
  257. """
  258. L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
  259. two-tuple of C{(False, failure)} where C{failure} represents the
  260. exception raised by the callable supplied.
  261. """
  262. class NewError(Exception):
  263. pass
  264. def raiseError():
  265. raise NewError()
  266. waiter = threading.Lock()
  267. waiter.acquire()
  268. results = []
  269. def onResult(success, result):
  270. waiter.release()
  271. results.append(success)
  272. results.append(result)
  273. tp = threadpool.ThreadPool(0, 1)
  274. tp.callInThreadWithCallback(onResult, raiseError)
  275. tp.start()
  276. try:
  277. self._waitForLock(waiter)
  278. finally:
  279. tp.stop()
  280. self.assertFalse(results[0])
  281. self.assertIsInstance(results[1], failure.Failure)
  282. self.assertTrue(issubclass(results[1].type, NewError))
  283. def test_callInThreadWithCallbackExceptionInOnResult(self):
  284. """
  285. L{ThreadPool.callInThreadWithCallback} logs the exception raised by
  286. C{onResult}.
  287. """
  288. class NewError(Exception):
  289. pass
  290. waiter = threading.Lock()
  291. waiter.acquire()
  292. results = []
  293. def onResult(success, result):
  294. results.append(success)
  295. results.append(result)
  296. raise NewError()
  297. tp = threadpool.ThreadPool(0, 1)
  298. tp.callInThreadWithCallback(onResult, lambda: None)
  299. tp.callInThread(waiter.release)
  300. tp.start()
  301. try:
  302. self._waitForLock(waiter)
  303. finally:
  304. tp.stop()
  305. errors = self.flushLoggedErrors(NewError)
  306. self.assertEqual(len(errors), 1)
  307. self.assertTrue(results[0])
  308. self.assertIsNone(results[1])
  309. def test_callbackThread(self):
  310. """
  311. L{ThreadPool.callInThreadWithCallback} calls the function it is
  312. given and the C{onResult} callback in the same thread.
  313. """
  314. threadIds = []
  315. event = threading.Event()
  316. def onResult(success, result):
  317. threadIds.append(threading.current_thread().ident)
  318. event.set()
  319. def func():
  320. threadIds.append(threading.current_thread().ident)
  321. tp = threadpool.ThreadPool(0, 1)
  322. tp.callInThreadWithCallback(onResult, func)
  323. tp.start()
  324. self.addCleanup(tp.stop)
  325. event.wait(self.getTimeout())
  326. self.assertEqual(len(threadIds), 2)
  327. self.assertEqual(threadIds[0], threadIds[1])
  328. def test_callbackContext(self):
  329. """
  330. The context L{ThreadPool.callInThreadWithCallback} is invoked in is
  331. shared by the context the callable and C{onResult} callback are
  332. invoked in.
  333. """
  334. myctx = context.theContextTracker.currentContext().contexts[-1]
  335. myctx["testing"] = "this must be present"
  336. contexts = []
  337. event = threading.Event()
  338. def onResult(success, result):
  339. ctx = context.theContextTracker.currentContext().contexts[-1]
  340. contexts.append(ctx)
  341. event.set()
  342. def func():
  343. ctx = context.theContextTracker.currentContext().contexts[-1]
  344. contexts.append(ctx)
  345. tp = threadpool.ThreadPool(0, 1)
  346. tp.callInThreadWithCallback(onResult, func)
  347. tp.start()
  348. self.addCleanup(tp.stop)
  349. event.wait(self.getTimeout())
  350. self.assertEqual(len(contexts), 2)
  351. self.assertEqual(myctx, contexts[0])
  352. self.assertEqual(myctx, contexts[1])
  353. def test_existingWork(self):
  354. """
  355. Work added to the threadpool before its start should be executed once
  356. the threadpool is started: this is ensured by trying to release a lock
  357. previously acquired.
  358. """
  359. waiter = threading.Lock()
  360. waiter.acquire()
  361. tp = threadpool.ThreadPool(0, 1)
  362. tp.callInThread(waiter.release) # Before start()
  363. tp.start()
  364. try:
  365. self._waitForLock(waiter)
  366. finally:
  367. tp.stop()
  368. def test_workerStateTransition(self):
  369. """
  370. As the worker receives and completes work, it transitions between
  371. the working and waiting states.
  372. """
  373. pool = threadpool.ThreadPool(0, 1)
  374. pool.start()
  375. self.addCleanup(pool.stop)
  376. # Sanity check
  377. self.assertEqual(pool.workers, 0)
  378. self.assertEqual(len(pool.waiters), 0)
  379. self.assertEqual(len(pool.working), 0)
  380. # Fire up a worker and give it some 'work'
  381. threadWorking = threading.Event()
  382. threadFinish = threading.Event()
  383. def _thread():
  384. threadWorking.set()
  385. threadFinish.wait(10)
  386. pool.callInThread(_thread)
  387. threadWorking.wait(10)
  388. self.assertEqual(pool.workers, 1)
  389. self.assertEqual(len(pool.waiters), 0)
  390. self.assertEqual(len(pool.working), 1)
  391. # Finish work, and spin until state changes
  392. threadFinish.set()
  393. while not len(pool.waiters):
  394. time.sleep(0.0005)
  395. # Make sure state changed correctly
  396. self.assertEqual(len(pool.waiters), 1)
  397. self.assertEqual(len(pool.working), 0)
  398. class RaceConditionTests(unittest.SynchronousTestCase):
  399. def setUp(self):
  400. self.threadpool = threadpool.ThreadPool(0, 10)
  401. self.event = threading.Event()
  402. self.threadpool.start()
  403. def done():
  404. self.threadpool.stop()
  405. del self.threadpool
  406. self.addCleanup(done)
  407. def getTimeout(self):
  408. """
  409. A reasonable number of seconds to time out.
  410. """
  411. return 5
  412. def test_synchronization(self):
  413. """
  414. If multiple threads are waiting on an event (via blocking on something
  415. in a callable passed to L{threadpool.ThreadPool.callInThread}), and
  416. there is spare capacity in the threadpool, sending another callable
  417. which will cause those to un-block to
  418. L{threadpool.ThreadPool.callInThread} will reliably run that callable
  419. and un-block the blocked threads promptly.
  420. @note: This is not really a unit test, it is a stress-test. You may
  421. need to run it with C{trial -u} to fail reliably if there is a
  422. problem. It is very hard to regression-test for this particular
  423. bug - one where the thread pool may consider itself as having
  424. "enough capacity" when it really needs to spin up a new thread if
  425. it possibly can - in a deterministic way, since the bug can only be
  426. provoked by subtle race conditions.
  427. """
  428. timeout = self.getTimeout()
  429. self.threadpool.callInThread(self.event.set)
  430. self.event.wait(timeout)
  431. self.event.clear()
  432. for i in range(3):
  433. self.threadpool.callInThread(self.event.wait)
  434. self.threadpool.callInThread(self.event.set)
  435. self.event.wait(timeout)
  436. if not self.event.isSet():
  437. self.event.set()
  438. self.fail("'set' did not run in thread; timed out waiting on 'wait'.")
  439. class MemoryPool(threadpool.ThreadPool):
  440. """
  441. A deterministic threadpool that uses in-memory data structures to queue
  442. work rather than threads to execute work.
  443. """
  444. def __init__(self, coordinator, failTest, newWorker, *args, **kwargs):
  445. """
  446. Initialize this L{MemoryPool} with a test case.
  447. @param coordinator: a worker used to coordinate work in the L{Team}
  448. underlying this threadpool.
  449. @type coordinator: L{twisted._threads.IExclusiveWorker}
  450. @param failTest: A 1-argument callable taking an exception and raising
  451. a test-failure exception.
  452. @type failTest: 1-argument callable taking (L{Failure}) and raising
  453. L{unittest.FailTest}.
  454. @param newWorker: a 0-argument callable that produces a new
  455. L{twisted._threads.IWorker} provider on each invocation.
  456. @type newWorker: 0-argument callable returning
  457. L{twisted._threads.IWorker}.
  458. """
  459. self._coordinator = coordinator
  460. self._failTest = failTest
  461. self._newWorker = newWorker
  462. threadpool.ThreadPool.__init__(self, *args, **kwargs)
  463. def _pool(self, currentLimit, threadFactory):
  464. """
  465. Override testing hook to create a deterministic threadpool.
  466. @param currentLimit: A 1-argument callable which returns the current
  467. threadpool size limit.
  468. @param threadFactory: ignored in this invocation; a 0-argument callable
  469. that would produce a thread.
  470. @return: a L{Team} backed by the coordinator and worker passed to
  471. L{MemoryPool.__init__}.
  472. """
  473. def respectLimit():
  474. # The expression in this method copied and pasted from
  475. # twisted.threads._pool, which is unfortunately bound up
  476. # with lots of actual-threading stuff.
  477. stats = team.statistics()
  478. if (stats.busyWorkerCount + stats.idleWorkerCount) >= currentLimit():
  479. return None
  480. return self._newWorker()
  481. team = Team(
  482. coordinator=self._coordinator,
  483. createWorker=respectLimit,
  484. logException=self._failTest,
  485. )
  486. return team
  487. class PoolHelper:
  488. """
  489. A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually
  490. use threads, by using the internal interfaces in L{twisted._threads}.
  491. @ivar performCoordination: a 0-argument callable that will perform one unit
  492. of "coordination" - work involved in delegating work to other threads -
  493. and return L{True} if it did any work, L{False} otherwise.
  494. @ivar workers: the workers which represent the threads within the pool -
  495. the workers other than the coordinator.
  496. @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where
  497. C{workPerformer} is a 0-argument callable like C{performCoordination}.
  498. @ivar threadpool: a modified L{threadpool.ThreadPool} to test.
  499. @type threadpool: L{MemoryPool}
  500. """
  501. def __init__(self, testCase, *args, **kwargs):
  502. """
  503. Create a L{PoolHelper}.
  504. @param testCase: a test case attached to this helper.
  505. @type args: The arguments passed to a L{threadpool.ThreadPool}.
  506. @type kwargs: The arguments passed to a L{threadpool.ThreadPool}
  507. """
  508. coordinator, self.performCoordination = createMemoryWorker()
  509. self.workers = []
  510. def newWorker():
  511. self.workers.append(createMemoryWorker())
  512. return self.workers[-1][0]
  513. self.threadpool = MemoryPool(
  514. coordinator, testCase.fail, newWorker, *args, **kwargs
  515. )
  516. def performAllCoordination(self):
  517. """
  518. Perform all currently scheduled "coordination", which is the work
  519. involved in delegating work to other threads.
  520. """
  521. while self.performCoordination():
  522. pass
  523. class MemoryBackedTests(unittest.SynchronousTestCase):
  524. """
  525. Tests using L{PoolHelper} to deterministically test properties of the
  526. threadpool implementation.
  527. """
  528. def test_workBeforeStarting(self):
  529. """
  530. If a threadpool is told to do work before starting, then upon starting
  531. up, it will start enough workers to handle all of the enqueued work
  532. that it's been given.
  533. """
  534. helper = PoolHelper(self, 0, 10)
  535. n = 5
  536. for x in range(n):
  537. helper.threadpool.callInThread(lambda: None)
  538. helper.performAllCoordination()
  539. self.assertEqual(helper.workers, [])
  540. helper.threadpool.start()
  541. helper.performAllCoordination()
  542. self.assertEqual(len(helper.workers), n)
  543. def test_tooMuchWorkBeforeStarting(self):
  544. """
  545. If the amount of work before starting exceeds the maximum number of
  546. threads allowed to the threadpool, only the maximum count will be
  547. started.
  548. """
  549. helper = PoolHelper(self, 0, 10)
  550. n = 50
  551. for x in range(n):
  552. helper.threadpool.callInThread(lambda: None)
  553. helper.performAllCoordination()
  554. self.assertEqual(helper.workers, [])
  555. helper.threadpool.start()
  556. helper.performAllCoordination()
  557. self.assertEqual(len(helper.workers), helper.threadpool.max)