Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_unix.py 13KB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
  5. """
  6. import os
  7. import socket
  8. import sys
  9. from unittest import skipIf
  10. from twisted.internet import address, defer, error, interfaces, protocol, reactor, utils
  11. from twisted.python import lockfile
  12. from twisted.python.compat import networkString
  13. from twisted.python.filepath import FilePath
  14. from twisted.test.test_tcp import MyClientFactory, MyServerFactory
  15. from twisted.trial import unittest
  16. class FailedConnectionClientFactory(protocol.ClientFactory):
  17. def __init__(self, onFail):
  18. self.onFail = onFail
  19. def clientConnectionFailed(self, connector, reason):
  20. self.onFail.errback(reason)
  21. @skipIf(
  22. not interfaces.IReactorUNIX(reactor, None),
  23. "This reactor does not support UNIX domain sockets",
  24. )
  25. class UnixSocketTests(unittest.TestCase):
  26. """
  27. Test unix sockets.
  28. """
  29. if not interfaces.IReactorUNIX(reactor, None):
  30. skip = "This reactor does not support UNIX domain sockets"
  31. def test_peerBind(self):
  32. """
  33. The address passed to the server factory's C{buildProtocol} method and
  34. the address returned by the connected protocol's transport's C{getPeer}
  35. method match the address the client socket is bound to.
  36. """
  37. filename = self.mktemp()
  38. peername = self.mktemp()
  39. serverFactory = MyServerFactory()
  40. connMade = serverFactory.protocolConnectionMade = defer.Deferred()
  41. unixPort = reactor.listenUNIX(filename, serverFactory)
  42. self.addCleanup(unixPort.stopListening)
  43. unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  44. self.addCleanup(unixSocket.close)
  45. unixSocket.bind(peername)
  46. unixSocket.connect(filename)
  47. def cbConnMade(proto):
  48. expected = address.UNIXAddress(peername)
  49. self.assertEqual(serverFactory.peerAddresses, [expected])
  50. self.assertEqual(proto.transport.getPeer(), expected)
  51. connMade.addCallback(cbConnMade)
  52. return connMade
  53. def test_dumber(self):
  54. """
  55. L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
  56. started with L{IReactorUNIX.listenUNIX}.
  57. """
  58. filename = self.mktemp()
  59. serverFactory = MyServerFactory()
  60. serverConnMade = defer.Deferred()
  61. serverFactory.protocolConnectionMade = serverConnMade
  62. unixPort = reactor.listenUNIX(filename, serverFactory)
  63. self.addCleanup(unixPort.stopListening)
  64. clientFactory = MyClientFactory()
  65. clientConnMade = defer.Deferred()
  66. clientFactory.protocolConnectionMade = clientConnMade
  67. reactor.connectUNIX(filename, clientFactory)
  68. d = defer.gatherResults([serverConnMade, clientConnMade])
  69. def allConnected(args):
  70. serverProtocol, clientProtocol = args
  71. # Incidental assertion which may or may not be redundant with some
  72. # other test. This probably deserves its own test method.
  73. self.assertEqual(
  74. clientFactory.peerAddresses, [address.UNIXAddress(filename)]
  75. )
  76. clientProtocol.transport.loseConnection()
  77. serverProtocol.transport.loseConnection()
  78. d.addCallback(allConnected)
  79. return d
  80. def test_pidFile(self):
  81. """
  82. A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
  83. called and released when the Deferred returned by the L{IListeningPort}
  84. provider's C{stopListening} method is called back.
  85. """
  86. filename = self.mktemp()
  87. serverFactory = MyServerFactory()
  88. serverConnMade = defer.Deferred()
  89. serverFactory.protocolConnectionMade = serverConnMade
  90. unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
  91. self.assertTrue(lockfile.isLocked(filename + ".lock"))
  92. # XXX This part would test something about the checkPID parameter, but
  93. # it doesn't actually. It should be rewritten to test the several
  94. # different possible behaviors. -exarkun
  95. clientFactory = MyClientFactory()
  96. clientConnMade = defer.Deferred()
  97. clientFactory.protocolConnectionMade = clientConnMade
  98. reactor.connectUNIX(filename, clientFactory, checkPID=1)
  99. d = defer.gatherResults([serverConnMade, clientConnMade])
  100. def _portStuff(args):
  101. serverProtocol, clientProto = args
  102. # Incidental assertion which may or may not be redundant with some
  103. # other test. This probably deserves its own test method.
  104. self.assertEqual(
  105. clientFactory.peerAddresses, [address.UNIXAddress(filename)]
  106. )
  107. clientProto.transport.loseConnection()
  108. serverProtocol.transport.loseConnection()
  109. return unixPort.stopListening()
  110. d.addCallback(_portStuff)
  111. def _check(ignored):
  112. self.assertFalse(lockfile.isLocked(filename + ".lock"), "locked")
  113. d.addCallback(_check)
  114. return d
  115. def test_socketLocking(self):
  116. """
  117. L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
  118. the name of a file on which a server is already listening.
  119. """
  120. filename = self.mktemp()
  121. serverFactory = MyServerFactory()
  122. unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
  123. self.assertRaises(
  124. error.CannotListenError,
  125. reactor.listenUNIX,
  126. filename,
  127. serverFactory,
  128. wantPID=True,
  129. )
  130. def stoppedListening(ign):
  131. unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
  132. return unixPort.stopListening()
  133. return unixPort.stopListening().addCallback(stoppedListening)
  134. def _uncleanSocketTest(self, callback):
  135. self.filename = self.mktemp()
  136. source = networkString(
  137. (
  138. "from twisted.internet import protocol, reactor\n"
  139. "reactor.listenUNIX(%r, protocol.ServerFactory(),"
  140. "wantPID=True)\n"
  141. )
  142. % (self.filename,)
  143. )
  144. env = {b"PYTHONPATH": FilePath(os.pathsep.join(sys.path)).asBytesMode().path}
  145. pyExe = FilePath(sys.executable).asBytesMode().path
  146. d = utils.getProcessValue(pyExe, (b"-u", b"-c", source), env=env)
  147. d.addCallback(callback)
  148. return d
  149. def test_uncleanServerSocketLocking(self):
  150. """
  151. If passed C{True} for the C{wantPID} parameter, a server can be started
  152. listening with L{IReactorUNIX.listenUNIX} when passed the name of a
  153. file on which a previous server which has not exited cleanly has been
  154. listening using the C{wantPID} option.
  155. """
  156. def ranStupidChild(ign):
  157. # If this next call succeeds, our lock handling is correct.
  158. p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
  159. return p.stopListening()
  160. return self._uncleanSocketTest(ranStupidChild)
  161. def test_connectToUncleanServer(self):
  162. """
  163. If passed C{True} for the C{checkPID} parameter, a client connection
  164. attempt made with L{IReactorUNIX.connectUNIX} fails with
  165. L{error.BadFileError}.
  166. """
  167. def ranStupidChild(ign):
  168. d = defer.Deferred()
  169. f = FailedConnectionClientFactory(d)
  170. reactor.connectUNIX(self.filename, f, checkPID=True)
  171. return self.assertFailure(d, error.BadFileError)
  172. return self._uncleanSocketTest(ranStupidChild)
  173. def _reprTest(self, serverFactory, factoryName):
  174. """
  175. Test the C{__str__} and C{__repr__} implementations of a UNIX port when
  176. used with the given factory.
  177. """
  178. filename = self.mktemp()
  179. unixPort = reactor.listenUNIX(filename, serverFactory)
  180. connectedString = f"<{factoryName} on {filename!r}>"
  181. self.assertEqual(repr(unixPort), connectedString)
  182. self.assertEqual(str(unixPort), connectedString)
  183. d = defer.maybeDeferred(unixPort.stopListening)
  184. def stoppedListening(ign):
  185. unconnectedString = f"<{factoryName} (not listening)>"
  186. self.assertEqual(repr(unixPort), unconnectedString)
  187. self.assertEqual(str(unixPort), unconnectedString)
  188. d.addCallback(stoppedListening)
  189. return d
  190. def test_reprWithNewStyleFactory(self):
  191. """
  192. The two string representations of the L{IListeningPort} returned by
  193. L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
  194. class being used and the filename on which the port is listening or
  195. indicates that the port is not listening.
  196. """
  197. class NewStyleFactory:
  198. def doStart(self):
  199. pass
  200. def doStop(self):
  201. pass
  202. # Sanity check
  203. self.assertIsInstance(NewStyleFactory, type)
  204. return self._reprTest(
  205. NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory"
  206. )
  207. class ClientProto(protocol.ConnectedDatagramProtocol):
  208. started = stopped = False
  209. gotback = None
  210. def __init__(self):
  211. self.deferredStarted = defer.Deferred()
  212. self.deferredGotBack = defer.Deferred()
  213. def stopProtocol(self):
  214. self.stopped = True
  215. def startProtocol(self):
  216. self.started = True
  217. self.deferredStarted.callback(None)
  218. def datagramReceived(self, data):
  219. self.gotback = data
  220. self.deferredGotBack.callback(None)
  221. class ServerProto(protocol.DatagramProtocol):
  222. started = stopped = False
  223. gotwhat = gotfrom = None
  224. def __init__(self):
  225. self.deferredStarted = defer.Deferred()
  226. self.deferredGotWhat = defer.Deferred()
  227. def stopProtocol(self):
  228. self.stopped = True
  229. def startProtocol(self):
  230. self.started = True
  231. self.deferredStarted.callback(None)
  232. def datagramReceived(self, data, addr):
  233. self.gotfrom = addr
  234. self.transport.write(b"hi back", addr)
  235. self.gotwhat = data
  236. self.deferredGotWhat.callback(None)
  237. @skipIf(
  238. not interfaces.IReactorUNIXDatagram(reactor, None),
  239. "This reactor does not support UNIX datagram sockets",
  240. )
  241. class DatagramUnixSocketTests(unittest.TestCase):
  242. """
  243. Test datagram UNIX sockets.
  244. """
  245. def test_exchange(self):
  246. """
  247. Test that a datagram can be sent to and received by a server and vice
  248. versa.
  249. """
  250. clientaddr = self.mktemp()
  251. serveraddr = self.mktemp()
  252. sp = ServerProto()
  253. cp = ClientProto()
  254. s = reactor.listenUNIXDatagram(serveraddr, sp)
  255. self.addCleanup(s.stopListening)
  256. c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
  257. self.addCleanup(c.stopListening)
  258. d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
  259. def write(ignored):
  260. cp.transport.write(b"hi")
  261. return defer.gatherResults([sp.deferredGotWhat, cp.deferredGotBack])
  262. def _cbTestExchange(ignored):
  263. self.assertEqual(b"hi", sp.gotwhat)
  264. self.assertEqual(clientaddr, sp.gotfrom)
  265. self.assertEqual(b"hi back", cp.gotback)
  266. d.addCallback(write)
  267. d.addCallback(_cbTestExchange)
  268. return d
  269. def test_cannotListen(self):
  270. """
  271. L{IReactorUNIXDatagram.listenUNIXDatagram} raises
  272. L{error.CannotListenError} if the unix socket specified is already in
  273. use.
  274. """
  275. addr = self.mktemp()
  276. p = ServerProto()
  277. s = reactor.listenUNIXDatagram(addr, p)
  278. self.assertRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
  279. s.stopListening()
  280. os.unlink(addr)
  281. # test connecting to bound and connected (somewhere else) address
  282. def _reprTest(self, serverProto, protocolName):
  283. """
  284. Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
  285. port when used with the given protocol.
  286. """
  287. filename = self.mktemp()
  288. unixPort = reactor.listenUNIXDatagram(filename, serverProto)
  289. connectedString = f"<{protocolName} on {filename!r}>"
  290. self.assertEqual(repr(unixPort), connectedString)
  291. self.assertEqual(str(unixPort), connectedString)
  292. stopDeferred = defer.maybeDeferred(unixPort.stopListening)
  293. def stoppedListening(ign):
  294. unconnectedString = f"<{protocolName} (not listening)>"
  295. self.assertEqual(repr(unixPort), unconnectedString)
  296. self.assertEqual(str(unixPort), unconnectedString)
  297. stopDeferred.addCallback(stoppedListening)
  298. return stopDeferred
  299. def test_reprWithNewStyleProtocol(self):
  300. """
  301. The two string representations of the L{IListeningPort} returned by
  302. L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
  303. new-style protocol class being used and the filename on which the port
  304. is listening or indicates that the port is not listening.
  305. """
  306. class NewStyleProtocol:
  307. def makeConnection(self, transport):
  308. pass
  309. def doStop(self):
  310. pass
  311. # Sanity check
  312. self.assertIsInstance(NewStyleProtocol, type)
  313. return self._reprTest(
  314. NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol"
  315. )