Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_fdesc.py 7.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.internet.fdesc}.
  5. """
  6. import errno
  7. import os
  8. import sys
  9. try:
  10. import fcntl
  11. except ImportError:
  12. skip = "not supported on this platform"
  13. else:
  14. from twisted.internet import fdesc
  15. from twisted.python.util import untilConcludes
  16. from twisted.trial import unittest
  17. class NonBlockingTests(unittest.SynchronousTestCase):
  18. """
  19. Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}.
  20. """
  21. def test_setNonBlocking(self):
  22. """
  23. L{fdesc.setNonBlocking} sets a file description to non-blocking.
  24. """
  25. r, w = os.pipe()
  26. self.addCleanup(os.close, r)
  27. self.addCleanup(os.close, w)
  28. self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
  29. fdesc.setNonBlocking(r)
  30. self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
  31. def test_setBlocking(self):
  32. """
  33. L{fdesc.setBlocking} sets a file description to blocking.
  34. """
  35. r, w = os.pipe()
  36. self.addCleanup(os.close, r)
  37. self.addCleanup(os.close, w)
  38. fdesc.setNonBlocking(r)
  39. fdesc.setBlocking(r)
  40. self.assertFalse(fcntl.fcntl(r, fcntl.F_GETFL) & os.O_NONBLOCK)
  41. class ReadWriteTests(unittest.SynchronousTestCase):
  42. """
  43. Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}.
  44. """
  45. def setUp(self):
  46. """
  47. Create a non-blocking pipe that can be used in tests.
  48. """
  49. self.r, self.w = os.pipe()
  50. fdesc.setNonBlocking(self.r)
  51. fdesc.setNonBlocking(self.w)
  52. def tearDown(self):
  53. """
  54. Close pipes.
  55. """
  56. try:
  57. os.close(self.w)
  58. except OSError:
  59. pass
  60. try:
  61. os.close(self.r)
  62. except OSError:
  63. pass
  64. def write(self, d):
  65. """
  66. Write data to the pipe.
  67. """
  68. return fdesc.writeToFD(self.w, d)
  69. def read(self):
  70. """
  71. Read data from the pipe.
  72. """
  73. l = []
  74. res = fdesc.readFromFD(self.r, l.append)
  75. if res is None:
  76. if l:
  77. return l[0]
  78. else:
  79. return b""
  80. else:
  81. return res
  82. def test_writeAndRead(self):
  83. """
  84. Test that the number of bytes L{fdesc.writeToFD} reports as written
  85. with its return value are seen by L{fdesc.readFromFD}.
  86. """
  87. n = self.write(b"hello")
  88. self.assertTrue(n > 0)
  89. s = self.read()
  90. self.assertEqual(len(s), n)
  91. self.assertEqual(b"hello"[:n], s)
  92. def test_writeAndReadLarge(self):
  93. """
  94. Similar to L{test_writeAndRead}, but use a much larger string to verify
  95. the behavior for that case.
  96. """
  97. orig = b"0123456879" * 10000
  98. written = self.write(orig)
  99. self.assertTrue(written > 0)
  100. result = []
  101. resultlength = 0
  102. i = 0
  103. while resultlength < written or i < 50:
  104. result.append(self.read())
  105. resultlength += len(result[-1])
  106. # Increment a counter to be sure we'll exit at some point
  107. i += 1
  108. result = b"".join(result)
  109. self.assertEqual(len(result), written)
  110. self.assertEqual(orig[:written], result)
  111. def test_readFromEmpty(self):
  112. """
  113. Verify that reading from a file descriptor with no data does not raise
  114. an exception and does not result in the callback function being called.
  115. """
  116. l = []
  117. result = fdesc.readFromFD(self.r, l.append)
  118. self.assertEqual(l, [])
  119. self.assertIsNone(result)
  120. def test_readFromCleanClose(self):
  121. """
  122. Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor
  123. returns a connection done indicator.
  124. """
  125. os.close(self.w)
  126. self.assertEqual(self.read(), fdesc.CONNECTION_DONE)
  127. def test_writeToClosed(self):
  128. """
  129. Verify that writing with L{fdesc.writeToFD} when the read end is closed
  130. results in a connection lost indicator.
  131. """
  132. os.close(self.r)
  133. self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)
  134. def test_readFromInvalid(self):
  135. """
  136. Verify that reading with L{fdesc.readFromFD} when the read end is
  137. closed results in a connection lost indicator.
  138. """
  139. os.close(self.r)
  140. self.assertEqual(self.read(), fdesc.CONNECTION_LOST)
  141. def test_writeToInvalid(self):
  142. """
  143. Verify that writing with L{fdesc.writeToFD} when the write end is
  144. closed results in a connection lost indicator.
  145. """
  146. os.close(self.w)
  147. self.assertEqual(self.write(b"s"), fdesc.CONNECTION_LOST)
  148. def test_writeErrors(self):
  149. """
  150. Test error path for L{fdesc.writeTod}.
  151. """
  152. oldOsWrite = os.write
  153. def eagainWrite(fd, data):
  154. err = OSError()
  155. err.errno = errno.EAGAIN
  156. raise err
  157. os.write = eagainWrite
  158. try:
  159. self.assertEqual(self.write(b"s"), 0)
  160. finally:
  161. os.write = oldOsWrite
  162. def eintrWrite(fd, data):
  163. err = OSError()
  164. err.errno = errno.EINTR
  165. raise err
  166. os.write = eintrWrite
  167. try:
  168. self.assertEqual(self.write(b"s"), 0)
  169. finally:
  170. os.write = oldOsWrite
  171. class CloseOnExecTests(unittest.SynchronousTestCase):
  172. """
  173. Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}.
  174. """
  175. program = """
  176. import os, errno
  177. try:
  178. os.write(%d, b'lul')
  179. except OSError as e:
  180. if e.errno == errno.EBADF:
  181. os._exit(0)
  182. os._exit(5)
  183. except BaseException:
  184. os._exit(10)
  185. else:
  186. os._exit(20)
  187. """
  188. def _execWithFileDescriptor(self, fObj):
  189. pid = os.fork()
  190. if pid == 0:
  191. try:
  192. os.execv(
  193. sys.executable,
  194. [sys.executable, "-c", self.program % (fObj.fileno(),)],
  195. )
  196. except BaseException:
  197. import traceback
  198. traceback.print_exc()
  199. os._exit(30)
  200. else:
  201. # On Linux wait(2) doesn't seem ever able to fail with EINTR but
  202. # POSIX seems to allow it and on macOS it happens quite a lot.
  203. return untilConcludes(os.waitpid, pid, 0)[1]
  204. def test_setCloseOnExec(self):
  205. """
  206. A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited
  207. by a new process image created with one of the exec family of
  208. functions.
  209. """
  210. with open(self.mktemp(), "wb") as fObj:
  211. fdesc._setCloseOnExec(fObj.fileno())
  212. status = self._execWithFileDescriptor(fObj)
  213. self.assertTrue(os.WIFEXITED(status))
  214. self.assertEqual(os.WEXITSTATUS(status), 0)
  215. def test_unsetCloseOnExec(self):
  216. """
  217. A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by
  218. a new process image created with one of the exec family of functions.
  219. """
  220. with open(self.mktemp(), "wb") as fObj:
  221. fdesc._setCloseOnExec(fObj.fileno())
  222. fdesc._unsetCloseOnExec(fObj.fileno())
  223. status = self._execWithFileDescriptor(fObj)
  224. self.assertTrue(os.WIFEXITED(status))
  225. self.assertEqual(os.WEXITSTATUS(status), 20)