Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

testing.py 29KB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950
  1. # -*- test-case-name: twisted.internet.test.test_testing -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Assorted functionality which is commonly useful when writing unit tests.
  6. """
  7. from collections.abc import Sequence
  8. from io import BytesIO
  9. from socket import AF_INET, AF_INET6
  10. from typing import Any, Callable
  11. from zope.interface import implementedBy, implementer
  12. from zope.interface.verify import verifyClass
  13. from twisted.internet import address, error, protocol, task
  14. from twisted.internet.abstract import _dataMustBeBytes, isIPv6Address
  15. from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
  16. from twisted.internet.defer import Deferred
  17. from twisted.internet.error import UnsupportedAddressFamily
  18. from twisted.internet.interfaces import (
  19. IConnector,
  20. IConsumer,
  21. IListeningPort,
  22. IProtocol,
  23. IPushProducer,
  24. IReactorCore,
  25. IReactorFDSet,
  26. IReactorSocket,
  27. IReactorSSL,
  28. IReactorTCP,
  29. IReactorUNIX,
  30. ITransport,
  31. )
  32. from twisted.internet.task import Clock
  33. from twisted.logger import ILogObserver
  34. from twisted.protocols import basic
  35. from twisted.python import failure
  36. __all__ = [
  37. "AccumulatingProtocol",
  38. "LineSendingProtocol",
  39. "FakeDatagramTransport",
  40. "StringTransport",
  41. "StringTransportWithDisconnection",
  42. "StringIOWithoutClosing",
  43. "_FakeConnector",
  44. "_FakePort",
  45. "MemoryReactor",
  46. "MemoryReactorClock",
  47. "RaisingMemoryReactor",
  48. "NonStreamingProducer",
  49. "waitUntilAllDisconnected",
  50. "EventLoggingObserver",
  51. ]
  52. class AccumulatingProtocol(protocol.Protocol):
  53. """
  54. L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
  55. the data delivered to it and can fire a Deferred when it is connected or
  56. disconnected.
  57. @ivar made: A flag indicating whether C{connectionMade} has been called.
  58. @ivar data: Bytes giving all the data passed to C{dataReceived}.
  59. @ivar closed: A flag indicated whether C{connectionLost} has been called.
  60. @ivar closedReason: The value of the I{reason} parameter passed to
  61. C{connectionLost}.
  62. @ivar closedDeferred: If set to a L{Deferred}, this will be fired when
  63. C{connectionLost} is called.
  64. """
  65. made = closed = 0
  66. closedReason = None
  67. closedDeferred = None
  68. data = b""
  69. factory = None
  70. def connectionMade(self):
  71. self.made = 1
  72. if self.factory is not None and self.factory.protocolConnectionMade is not None:
  73. d = self.factory.protocolConnectionMade
  74. self.factory.protocolConnectionMade = None
  75. d.callback(self)
  76. def dataReceived(self, data):
  77. self.data += data
  78. def connectionLost(self, reason):
  79. self.closed = 1
  80. self.closedReason = reason
  81. if self.closedDeferred is not None:
  82. d, self.closedDeferred = self.closedDeferred, None
  83. d.callback(None)
  84. class LineSendingProtocol(basic.LineReceiver):
  85. lostConn = False
  86. def __init__(self, lines, start=True):
  87. self.lines = lines[:]
  88. self.response = []
  89. self.start = start
  90. def connectionMade(self):
  91. if self.start:
  92. for line in self.lines:
  93. self.sendLine(line)
  94. def lineReceived(self, line):
  95. if not self.start:
  96. for line in self.lines:
  97. self.sendLine(line)
  98. self.lines = []
  99. self.response.append(line)
  100. def connectionLost(self, reason):
  101. self.lostConn = True
  102. class FakeDatagramTransport:
  103. noAddr = object()
  104. def __init__(self):
  105. self.written = []
  106. def write(self, packet, addr=noAddr):
  107. self.written.append((packet, addr))
  108. @implementer(ITransport, IConsumer, IPushProducer)
  109. class StringTransport:
  110. """
  111. A transport implementation which buffers data in memory and keeps track of
  112. its other state without providing any behavior.
  113. L{StringTransport} has a number of attributes which are not part of any of
  114. the interfaces it claims to implement. These attributes are provided for
  115. testing purposes. Implementation code should not use any of these
  116. attributes; they are not provided by other transports.
  117. @ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
  118. called, then C{True}.
  119. @ivar disconnected: A C{bool} which is C{False} until L{abortConnection} is
  120. called, then C{True}.
  121. @ivar producer: If a producer is currently registered, C{producer} is a
  122. reference to it. Otherwise, L{None}.
  123. @ivar streaming: If a producer is currently registered, C{streaming} refers
  124. to the value of the second parameter passed to C{registerProducer}.
  125. @ivar hostAddr: L{None} or an object which will be returned as the host
  126. address of this transport. If L{None}, a nasty tuple will be returned
  127. instead.
  128. @ivar peerAddr: L{None} or an object which will be returned as the peer
  129. address of this transport. If L{None}, a nasty tuple will be returned
  130. instead.
  131. @ivar producerState: The state of this L{StringTransport} in its capacity
  132. as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
  133. C{'stopped'}.
  134. @ivar io: A L{io.BytesIO} which holds the data which has been written to
  135. this transport since the last call to L{clear}. Use L{value} instead
  136. of accessing this directly.
  137. @ivar _lenient: By default L{StringTransport} enforces that
  138. L{resumeProducing} is not called after the connection is lost. This is
  139. to ensure that any code that does call L{resumeProducing} after the
  140. connection is lost is not blindly expecting L{resumeProducing} to have
  141. any impact.
  142. However, if your test case is calling L{resumeProducing} after
  143. connection close on purpose, and you know it won't block expecting
  144. further data to show up, this flag may safely be set to L{True}.
  145. Defaults to L{False}.
  146. @type lenient: L{bool}
  147. """
  148. disconnecting = False
  149. disconnected = False
  150. producer = None
  151. streaming = None
  152. hostAddr = None
  153. peerAddr = None
  154. producerState = "producing"
  155. def __init__(self, hostAddress=None, peerAddress=None, lenient=False):
  156. self.clear()
  157. if hostAddress is not None:
  158. self.hostAddr = hostAddress
  159. if peerAddress is not None:
  160. self.peerAddr = peerAddress
  161. self.connected = True
  162. self._lenient = lenient
  163. def clear(self):
  164. """
  165. Discard all data written to this transport so far.
  166. This is not a transport method. It is intended for tests. Do not use
  167. it in implementation code.
  168. """
  169. self.io = BytesIO()
  170. def value(self):
  171. """
  172. Retrieve all data which has been buffered by this transport.
  173. This is not a transport method. It is intended for tests. Do not use
  174. it in implementation code.
  175. @return: A C{bytes} giving all data written to this transport since the
  176. last call to L{clear}.
  177. @rtype: C{bytes}
  178. """
  179. return self.io.getvalue()
  180. # ITransport
  181. def write(self, data):
  182. _dataMustBeBytes(data)
  183. self.io.write(data)
  184. def writeSequence(self, data):
  185. self.io.write(b"".join(data))
  186. def loseConnection(self):
  187. """
  188. Close the connection. Does nothing besides toggle the C{disconnecting}
  189. instance variable to C{True}.
  190. """
  191. self.disconnecting = True
  192. def abortConnection(self):
  193. """
  194. Abort the connection. Same as C{loseConnection}, but also toggles the
  195. C{aborted} instance variable to C{True}.
  196. """
  197. self.disconnected = True
  198. self.loseConnection()
  199. def getPeer(self):
  200. if self.peerAddr is None:
  201. return address.IPv4Address("TCP", "192.168.1.1", 54321)
  202. return self.peerAddr
  203. def getHost(self):
  204. if self.hostAddr is None:
  205. return address.IPv4Address("TCP", "10.0.0.1", 12345)
  206. return self.hostAddr
  207. # IConsumer
  208. def registerProducer(self, producer, streaming):
  209. if self.producer is not None:
  210. raise RuntimeError("Cannot register two producers")
  211. self.producer = producer
  212. self.streaming = streaming
  213. def unregisterProducer(self):
  214. if self.producer is None:
  215. raise RuntimeError("Cannot unregister a producer unless one is registered")
  216. self.producer = None
  217. self.streaming = None
  218. # IPushProducer
  219. def _checkState(self):
  220. if self.disconnecting and not self._lenient:
  221. raise RuntimeError("Cannot resume producing after loseConnection")
  222. if self.producerState == "stopped":
  223. raise RuntimeError("Cannot resume a stopped producer")
  224. def pauseProducing(self):
  225. self._checkState()
  226. self.producerState = "paused"
  227. def stopProducing(self):
  228. self.producerState = "stopped"
  229. def resumeProducing(self):
  230. self._checkState()
  231. self.producerState = "producing"
  232. class StringTransportWithDisconnection(StringTransport):
  233. """
  234. A L{StringTransport} which on disconnection will trigger the connection
  235. lost on the attached protocol.
  236. """
  237. protocol: IProtocol
  238. def loseConnection(self):
  239. if self.connected:
  240. self.connected = False
  241. self.protocol.connectionLost(failure.Failure(error.ConnectionDone("Bye.")))
  242. class StringIOWithoutClosing(BytesIO):
  243. """
  244. A BytesIO that can't be closed.
  245. """
  246. def close(self):
  247. """
  248. Do nothing.
  249. """
  250. @implementer(IListeningPort)
  251. class _FakePort:
  252. """
  253. A fake L{IListeningPort} to be used in tests.
  254. @ivar _hostAddress: The L{IAddress} this L{IListeningPort} is pretending
  255. to be listening on.
  256. """
  257. def __init__(self, hostAddress):
  258. """
  259. @param hostAddress: An L{IAddress} this L{IListeningPort} should
  260. pretend to be listening on.
  261. """
  262. self._hostAddress = hostAddress
  263. def startListening(self):
  264. """
  265. Fake L{IListeningPort.startListening} that doesn't do anything.
  266. """
  267. def stopListening(self):
  268. """
  269. Fake L{IListeningPort.stopListening} that doesn't do anything.
  270. """
  271. def getHost(self):
  272. """
  273. Fake L{IListeningPort.getHost} that returns our L{IAddress}.
  274. """
  275. return self._hostAddress
  276. @implementer(IConnector)
  277. class _FakeConnector:
  278. """
  279. A fake L{IConnector} that allows us to inspect if it has been told to stop
  280. connecting.
  281. @ivar stoppedConnecting: has this connector's
  282. L{_FakeConnector.stopConnecting} method been invoked yet?
  283. @ivar _address: An L{IAddress} provider that represents our destination.
  284. """
  285. _disconnected = False
  286. stoppedConnecting = False
  287. def __init__(self, address):
  288. """
  289. @param address: An L{IAddress} provider that represents this
  290. connector's destination.
  291. """
  292. self._address = address
  293. def stopConnecting(self):
  294. """
  295. Implement L{IConnector.stopConnecting} and set
  296. L{_FakeConnector.stoppedConnecting} to C{True}
  297. """
  298. self.stoppedConnecting = True
  299. def disconnect(self):
  300. """
  301. Implement L{IConnector.disconnect} as a no-op.
  302. """
  303. self._disconnected = True
  304. def connect(self):
  305. """
  306. Implement L{IConnector.connect} as a no-op.
  307. """
  308. def getDestination(self):
  309. """
  310. Implement L{IConnector.getDestination} to return the C{address} passed
  311. to C{__init__}.
  312. """
  313. return self._address
  314. @implementer(
  315. IReactorCore, IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket, IReactorFDSet
  316. )
  317. class MemoryReactor:
  318. """
  319. A fake reactor to be used in tests. This reactor doesn't actually do
  320. much that's useful yet. It accepts TCP connection setup attempts, but
  321. they will never succeed.
  322. @ivar hasInstalled: Keeps track of whether this reactor has been installed.
  323. @type hasInstalled: L{bool}
  324. @ivar running: Keeps track of whether this reactor is running.
  325. @type running: L{bool}
  326. @ivar hasStopped: Keeps track of whether this reactor has been stopped.
  327. @type hasStopped: L{bool}
  328. @ivar hasCrashed: Keeps track of whether this reactor has crashed.
  329. @type hasCrashed: L{bool}
  330. @ivar whenRunningHooks: Keeps track of hooks registered with
  331. C{callWhenRunning}.
  332. @type whenRunningHooks: L{list}
  333. @ivar triggers: Keeps track of hooks registered with
  334. C{addSystemEventTrigger}.
  335. @type triggers: L{dict}
  336. @ivar tcpClients: Keeps track of connection attempts (ie, calls to
  337. C{connectTCP}).
  338. @type tcpClients: L{list}
  339. @ivar tcpServers: Keeps track of server listen attempts (ie, calls to
  340. C{listenTCP}).
  341. @type tcpServers: L{list}
  342. @ivar sslClients: Keeps track of connection attempts (ie, calls to
  343. C{connectSSL}).
  344. @type sslClients: L{list}
  345. @ivar sslServers: Keeps track of server listen attempts (ie, calls to
  346. C{listenSSL}).
  347. @type sslServers: L{list}
  348. @ivar unixClients: Keeps track of connection attempts (ie, calls to
  349. C{connectUNIX}).
  350. @type unixClients: L{list}
  351. @ivar unixServers: Keeps track of server listen attempts (ie, calls to
  352. C{listenUNIX}).
  353. @type unixServers: L{list}
  354. @ivar adoptedPorts: Keeps track of server listen attempts (ie, calls to
  355. C{adoptStreamPort}).
  356. @ivar adoptedStreamConnections: Keeps track of stream-oriented
  357. connections added using C{adoptStreamConnection}.
  358. """
  359. def __init__(self):
  360. """
  361. Initialize the tracking lists.
  362. """
  363. self.hasInstalled = False
  364. self.running = False
  365. self.hasRun = True
  366. self.hasStopped = True
  367. self.hasCrashed = True
  368. self.whenRunningHooks = []
  369. self.triggers = {}
  370. self.tcpClients = []
  371. self.tcpServers = []
  372. self.sslClients = []
  373. self.sslServers = []
  374. self.unixClients = []
  375. self.unixServers = []
  376. self.adoptedPorts = []
  377. self.adoptedStreamConnections = []
  378. self.connectors = []
  379. self.readers = set()
  380. self.writers = set()
  381. def install(self):
  382. """
  383. Fake install callable to emulate reactor module installation.
  384. """
  385. self.hasInstalled = True
  386. def resolve(self, name, timeout=10):
  387. """
  388. Not implemented; raises L{NotImplementedError}.
  389. """
  390. raise NotImplementedError()
  391. def run(self):
  392. """
  393. Fake L{IReactorCore.run}.
  394. Sets C{self.running} to L{True}, runs all of the hooks passed to
  395. C{self.callWhenRunning}, then calls C{self.stop} to simulate a request
  396. to stop the reactor.
  397. Sets C{self.hasRun} to L{True}.
  398. """
  399. assert self.running is False
  400. self.running = True
  401. self.hasRun = True
  402. for f, args, kwargs in self.whenRunningHooks:
  403. f(*args, **kwargs)
  404. self.stop()
  405. # That we stopped means we can return, phew.
  406. def stop(self):
  407. """
  408. Fake L{IReactorCore.run}.
  409. Sets C{self.running} to L{False}.
  410. Sets C{self.hasStopped} to L{True}.
  411. """
  412. self.running = False
  413. self.hasStopped = True
  414. def crash(self):
  415. """
  416. Fake L{IReactorCore.crash}.
  417. Sets C{self.running} to L{None}, because that feels crashy.
  418. Sets C{self.hasCrashed} to L{True}.
  419. """
  420. self.running = None
  421. self.hasCrashed = True
  422. def iterate(self, delay=0):
  423. """
  424. Not implemented; raises L{NotImplementedError}.
  425. """
  426. raise NotImplementedError()
  427. def fireSystemEvent(self, eventType):
  428. """
  429. Not implemented; raises L{NotImplementedError}.
  430. """
  431. raise NotImplementedError()
  432. def addSystemEventTrigger(
  433. self, phase: str, eventType: str, callable: Callable[..., Any], *args, **kw
  434. ):
  435. """
  436. Fake L{IReactorCore.run}.
  437. Keep track of trigger by appending it to
  438. self.triggers[phase][eventType].
  439. """
  440. phaseTriggers = self.triggers.setdefault(phase, {})
  441. eventTypeTriggers = phaseTriggers.setdefault(eventType, [])
  442. eventTypeTriggers.append((callable, args, kw))
  443. def removeSystemEventTrigger(self, triggerID):
  444. """
  445. Not implemented; raises L{NotImplementedError}.
  446. """
  447. raise NotImplementedError()
  448. def callWhenRunning(self, callable: Callable[..., Any], *args, **kw):
  449. """
  450. Fake L{IReactorCore.callWhenRunning}.
  451. Keeps a list of invocations to make in C{self.whenRunningHooks}.
  452. """
  453. self.whenRunningHooks.append((callable, args, kw))
  454. def adoptStreamPort(self, fileno, addressFamily, factory):
  455. """
  456. Fake L{IReactorSocket.adoptStreamPort}, that logs the call and returns
  457. an L{IListeningPort}.
  458. """
  459. if addressFamily == AF_INET:
  460. addr = IPv4Address("TCP", "0.0.0.0", 1234)
  461. elif addressFamily == AF_INET6:
  462. addr = IPv6Address("TCP", "::", 1234)
  463. else:
  464. raise UnsupportedAddressFamily()
  465. self.adoptedPorts.append((fileno, addressFamily, factory))
  466. return _FakePort(addr)
  467. def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
  468. """
  469. Record the given stream connection in C{adoptedStreamConnections}.
  470. @see:
  471. L{twisted.internet.interfaces.IReactorSocket.adoptStreamConnection}
  472. """
  473. self.adoptedStreamConnections.append((fileDescriptor, addressFamily, factory))
  474. def adoptDatagramPort(self, fileno, addressFamily, protocol, maxPacketSize=8192):
  475. """
  476. Fake L{IReactorSocket.adoptDatagramPort}, that logs the call and
  477. returns a fake L{IListeningPort}.
  478. @see: L{twisted.internet.interfaces.IReactorSocket.adoptDatagramPort}
  479. """
  480. if addressFamily == AF_INET:
  481. addr = IPv4Address("UDP", "0.0.0.0", 1234)
  482. elif addressFamily == AF_INET6:
  483. addr = IPv6Address("UDP", "::", 1234)
  484. else:
  485. raise UnsupportedAddressFamily()
  486. self.adoptedPorts.append((fileno, addressFamily, protocol, maxPacketSize))
  487. return _FakePort(addr)
  488. def listenTCP(self, port, factory, backlog=50, interface=""):
  489. """
  490. Fake L{IReactorTCP.listenTCP}, that logs the call and
  491. returns an L{IListeningPort}.
  492. """
  493. self.tcpServers.append((port, factory, backlog, interface))
  494. if isIPv6Address(interface):
  495. address = IPv6Address("TCP", interface, port)
  496. else:
  497. address = IPv4Address("TCP", "0.0.0.0", port)
  498. return _FakePort(address)
  499. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  500. """
  501. Fake L{IReactorTCP.connectTCP}, that logs the call and
  502. returns an L{IConnector}.
  503. """
  504. self.tcpClients.append((host, port, factory, timeout, bindAddress))
  505. if isIPv6Address(host):
  506. conn = _FakeConnector(IPv6Address("TCP", host, port))
  507. else:
  508. conn = _FakeConnector(IPv4Address("TCP", host, port))
  509. factory.startedConnecting(conn)
  510. self.connectors.append(conn)
  511. return conn
  512. def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
  513. """
  514. Fake L{IReactorSSL.listenSSL}, that logs the call and
  515. returns an L{IListeningPort}.
  516. """
  517. self.sslServers.append((port, factory, contextFactory, backlog, interface))
  518. return _FakePort(IPv4Address("TCP", "0.0.0.0", port))
  519. def connectSSL(
  520. self, host, port, factory, contextFactory, timeout=30, bindAddress=None
  521. ):
  522. """
  523. Fake L{IReactorSSL.connectSSL}, that logs the call and returns an
  524. L{IConnector}.
  525. """
  526. self.sslClients.append(
  527. (host, port, factory, contextFactory, timeout, bindAddress)
  528. )
  529. conn = _FakeConnector(IPv4Address("TCP", host, port))
  530. factory.startedConnecting(conn)
  531. self.connectors.append(conn)
  532. return conn
  533. def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
  534. """
  535. Fake L{IReactorUNIX.listenUNIX}, that logs the call and returns an
  536. L{IListeningPort}.
  537. """
  538. self.unixServers.append((address, factory, backlog, mode, wantPID))
  539. return _FakePort(UNIXAddress(address))
  540. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  541. """
  542. Fake L{IReactorUNIX.connectUNIX}, that logs the call and returns an
  543. L{IConnector}.
  544. """
  545. self.unixClients.append((address, factory, timeout, checkPID))
  546. conn = _FakeConnector(UNIXAddress(address))
  547. factory.startedConnecting(conn)
  548. self.connectors.append(conn)
  549. return conn
  550. def addReader(self, reader):
  551. """
  552. Fake L{IReactorFDSet.addReader} which adds the reader to a local set.
  553. """
  554. self.readers.add(reader)
  555. def removeReader(self, reader):
  556. """
  557. Fake L{IReactorFDSet.removeReader} which removes the reader from a
  558. local set.
  559. """
  560. self.readers.discard(reader)
  561. def addWriter(self, writer):
  562. """
  563. Fake L{IReactorFDSet.addWriter} which adds the writer to a local set.
  564. """
  565. self.writers.add(writer)
  566. def removeWriter(self, writer):
  567. """
  568. Fake L{IReactorFDSet.removeWriter} which removes the writer from a
  569. local set.
  570. """
  571. self.writers.discard(writer)
  572. def getReaders(self):
  573. """
  574. Fake L{IReactorFDSet.getReaders} which returns a list of readers from
  575. the local set.
  576. """
  577. return list(self.readers)
  578. def getWriters(self):
  579. """
  580. Fake L{IReactorFDSet.getWriters} which returns a list of writers from
  581. the local set.
  582. """
  583. return list(self.writers)
  584. def removeAll(self):
  585. """
  586. Fake L{IReactorFDSet.removeAll} which removed all readers and writers
  587. from the local sets.
  588. """
  589. self.readers.clear()
  590. self.writers.clear()
  591. for iface in implementedBy(MemoryReactor):
  592. verifyClass(iface, MemoryReactor)
  593. class MemoryReactorClock(MemoryReactor, Clock):
  594. def __init__(self):
  595. MemoryReactor.__init__(self)
  596. Clock.__init__(self)
  597. @implementer(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
  598. class RaisingMemoryReactor:
  599. """
  600. A fake reactor to be used in tests. It accepts TCP connection setup
  601. attempts, but they will fail.
  602. @ivar _listenException: An instance of an L{Exception}
  603. @ivar _connectException: An instance of an L{Exception}
  604. """
  605. def __init__(self, listenException=None, connectException=None):
  606. """
  607. @param listenException: An instance of an L{Exception} to raise
  608. when any C{listen} method is called.
  609. @param connectException: An instance of an L{Exception} to raise
  610. when any C{connect} method is called.
  611. """
  612. self._listenException = listenException
  613. self._connectException = connectException
  614. def adoptStreamPort(self, fileno, addressFamily, factory):
  615. """
  616. Fake L{IReactorSocket.adoptStreamPort}, that raises
  617. L{_listenException}.
  618. """
  619. raise self._listenException
  620. def listenTCP(self, port, factory, backlog=50, interface=""):
  621. """
  622. Fake L{IReactorTCP.listenTCP}, that raises L{_listenException}.
  623. """
  624. raise self._listenException
  625. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  626. """
  627. Fake L{IReactorTCP.connectTCP}, that raises L{_connectException}.
  628. """
  629. raise self._connectException
  630. def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
  631. """
  632. Fake L{IReactorSSL.listenSSL}, that raises L{_listenException}.
  633. """
  634. raise self._listenException
  635. def connectSSL(
  636. self, host, port, factory, contextFactory, timeout=30, bindAddress=None
  637. ):
  638. """
  639. Fake L{IReactorSSL.connectSSL}, that raises L{_connectException}.
  640. """
  641. raise self._connectException
  642. def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
  643. """
  644. Fake L{IReactorUNIX.listenUNIX}, that raises L{_listenException}.
  645. """
  646. raise self._listenException
  647. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  648. """
  649. Fake L{IReactorUNIX.connectUNIX}, that raises L{_connectException}.
  650. """
  651. raise self._connectException
  652. def adoptDatagramPort(self, fileDescriptor, addressFamily, protocol, maxPacketSize):
  653. """
  654. Fake L{IReactorSocket.adoptDatagramPort}, that raises
  655. L{_connectException}.
  656. """
  657. raise self._connectException
  658. def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
  659. """
  660. Fake L{IReactorSocket.adoptStreamConnection}, that raises
  661. L{_connectException}.
  662. """
  663. raise self._connectException
  664. class NonStreamingProducer:
  665. """
  666. A pull producer which writes 10 times only.
  667. """
  668. counter = 0
  669. stopped = False
  670. def __init__(self, consumer):
  671. self.consumer = consumer
  672. self.result = Deferred()
  673. def resumeProducing(self):
  674. """
  675. Write the counter value once.
  676. """
  677. if self.consumer is None or self.counter >= 10:
  678. raise RuntimeError("BUG: resume after unregister/stop.")
  679. else:
  680. self.consumer.write(b"%d" % (self.counter,))
  681. self.counter += 1
  682. if self.counter == 10:
  683. self.consumer.unregisterProducer()
  684. self._done()
  685. def pauseProducing(self):
  686. """
  687. An implementation of C{IPushProducer.pauseProducing}. This should never
  688. be called on a pull producer, so this just raises an error.
  689. """
  690. raise RuntimeError("BUG: pause should never be called.")
  691. def _done(self):
  692. """
  693. Fire a L{Deferred} so that users can wait for this to complete.
  694. """
  695. self.consumer = None
  696. d = self.result
  697. del self.result
  698. d.callback(None)
  699. def stopProducing(self):
  700. """
  701. Stop all production.
  702. """
  703. self.stopped = True
  704. self._done()
  705. def waitUntilAllDisconnected(reactor, protocols):
  706. """
  707. Take a list of disconnecting protocols, callback a L{Deferred} when they're
  708. all done.
  709. This is a hack to make some older tests less flaky, as
  710. L{ITransport.loseConnection} is not atomic on all reactors (for example,
  711. the CoreFoundation, which sometimes takes a reactor turn for CFSocket to
  712. realise). New tests should either not use real sockets in testing, or take
  713. the advice in
  714. I{https://jml.io/pages/how-to-disconnect-in-twisted-really.html} to heart.
  715. @param reactor: The reactor to schedule the checks on.
  716. @type reactor: L{IReactorTime}
  717. @param protocols: The protocols to wait for disconnecting.
  718. @type protocols: A L{list} of L{IProtocol}s.
  719. """
  720. lc = None
  721. def _check():
  722. if True not in [x.transport.connected for x in protocols]:
  723. lc.stop()
  724. lc = task.LoopingCall(_check)
  725. lc.clock = reactor
  726. return lc.start(0.01, now=True)
  727. @implementer(ILogObserver)
  728. class EventLoggingObserver(Sequence):
  729. """
  730. L{ILogObserver} That stores its events in a list for later inspection.
  731. This class is similar to L{LimitedHistoryLogObserver} save that the
  732. internal buffer is public and intended for external inspection. The
  733. observer implements the sequence protocol to ease iteration of the events.
  734. @ivar _events: The events captured by this observer
  735. @type _events: L{list}
  736. """
  737. def __init__(self):
  738. self._events = []
  739. def __len__(self):
  740. return len(self._events)
  741. def __getitem__(self, index):
  742. return self._events[index]
  743. def __iter__(self):
  744. return iter(self._events)
  745. def __call__(self, event):
  746. """
  747. @see: L{ILogObserver}
  748. """
  749. self._events.append(event)
  750. @classmethod
  751. def createWithCleanup(cls, testInstance, publisher):
  752. """
  753. Create an L{EventLoggingObserver} instance that observes the provided
  754. publisher and will be cleaned up with addCleanup().
  755. @param testInstance: Test instance in which this logger is used.
  756. @type testInstance: L{twisted.trial.unittest.TestCase}
  757. @param publisher: Log publisher to observe.
  758. @type publisher: twisted.logger.LogPublisher
  759. @return: An EventLoggingObserver configured to observe the provided
  760. publisher.
  761. @rtype: L{twisted.test.proto_helpers.EventLoggingObserver}
  762. """
  763. obs = cls()
  764. publisher.addObserver(obs)
  765. testInstance.addCleanup(lambda: publisher.removeObserver(obs))
  766. return obs