Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_iutils.py 13KB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test running processes with the APIs in L{twisted.internet.utils}.
  5. """
  6. import os
  7. import signal
  8. import stat
  9. import sys
  10. import warnings
  11. from unittest import skipIf
  12. from twisted.internet import error, interfaces, reactor, utils
  13. from twisted.internet.defer import Deferred
  14. from twisted.python.runtime import platform
  15. from twisted.python.test.test_util import SuppressedWarningsTests
  16. from twisted.trial.unittest import SynchronousTestCase, TestCase
  17. class ProcessUtilsTests(TestCase):
  18. """
  19. Test running a process using L{getProcessOutput}, L{getProcessValue}, and
  20. L{getProcessOutputAndValue}.
  21. """
  22. if interfaces.IReactorProcess(reactor, None) is None:
  23. skip = "reactor doesn't implement IReactorProcess"
  24. output = None
  25. value = None
  26. exe = sys.executable
  27. def makeSourceFile(self, sourceLines):
  28. """
  29. Write the given list of lines to a text file and return the absolute
  30. path to it.
  31. """
  32. script = self.mktemp()
  33. with open(script, "wt") as scriptFile:
  34. scriptFile.write(os.linesep.join(sourceLines) + os.linesep)
  35. return os.path.abspath(script)
  36. def test_output(self):
  37. """
  38. L{getProcessOutput} returns a L{Deferred} which fires with the complete
  39. output of the process it runs after that process exits.
  40. """
  41. scriptFile = self.makeSourceFile(
  42. [
  43. "import sys",
  44. "for s in b'hello world\\n':",
  45. " s = bytes([s])",
  46. " sys.stdout.buffer.write(s)",
  47. " sys.stdout.flush()",
  48. ]
  49. )
  50. d = utils.getProcessOutput(self.exe, ["-u", scriptFile])
  51. return d.addCallback(self.assertEqual, b"hello world\n")
  52. def test_outputWithErrorIgnored(self):
  53. """
  54. The L{Deferred} returned by L{getProcessOutput} is fired with an
  55. L{IOError} L{Failure} if the child process writes to stderr.
  56. """
  57. # make sure stderr raises an error normally
  58. scriptFile = self.makeSourceFile(
  59. ["import sys", 'sys.stderr.write("hello world\\n")']
  60. )
  61. d = utils.getProcessOutput(self.exe, ["-u", scriptFile])
  62. d = self.assertFailure(d, IOError)
  63. def cbFailed(err):
  64. return self.assertFailure(err.processEnded, error.ProcessDone)
  65. d.addCallback(cbFailed)
  66. return d
  67. def test_outputWithErrorCollected(self):
  68. """
  69. If a C{True} value is supplied for the C{errortoo} parameter to
  70. L{getProcessOutput}, the returned L{Deferred} fires with the child's
  71. stderr output as well as its stdout output.
  72. """
  73. scriptFile = self.makeSourceFile(
  74. [
  75. "import sys",
  76. # Write the same value to both because ordering isn't guaranteed so
  77. # this simplifies the test.
  78. 'sys.stdout.write("foo")',
  79. "sys.stdout.flush()",
  80. 'sys.stderr.write("foo")',
  81. "sys.stderr.flush()",
  82. ]
  83. )
  84. d = utils.getProcessOutput(self.exe, ["-u", scriptFile], errortoo=True)
  85. return d.addCallback(self.assertEqual, b"foofoo")
  86. def test_value(self):
  87. """
  88. The L{Deferred} returned by L{getProcessValue} is fired with the exit
  89. status of the child process.
  90. """
  91. scriptFile = self.makeSourceFile(["raise SystemExit(1)"])
  92. d = utils.getProcessValue(self.exe, ["-u", scriptFile])
  93. return d.addCallback(self.assertEqual, 1)
  94. def test_outputAndValue(self):
  95. """
  96. The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
  97. three-tuple, the elements of which give the data written to the child's
  98. stdout, the data written to the child's stderr, and the exit status of
  99. the child.
  100. """
  101. scriptFile = self.makeSourceFile(
  102. [
  103. "import sys",
  104. "sys.stdout.buffer.write(b'hello world!\\n')",
  105. "sys.stderr.buffer.write(b'goodbye world!\\n')",
  106. "sys.exit(1)",
  107. ]
  108. )
  109. def gotOutputAndValue(out_err_code):
  110. out, err, code = out_err_code
  111. self.assertEqual(out, b"hello world!\n")
  112. self.assertEqual(err, b"goodbye world!\n")
  113. self.assertEqual(code, 1)
  114. d = utils.getProcessOutputAndValue(self.exe, ["-u", scriptFile])
  115. return d.addCallback(gotOutputAndValue)
  116. @skipIf(platform.isWindows(), "Windows doesn't have real signals.")
  117. def test_outputSignal(self):
  118. """
  119. If the child process exits because of a signal, the L{Deferred}
  120. returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
  121. containing the child's stdout, stderr, and the signal which caused
  122. it to exit.
  123. """
  124. # Use SIGKILL here because it's guaranteed to be delivered. Using
  125. # SIGHUP might not work in, e.g., a buildbot slave run under the
  126. # 'nohup' command.
  127. scriptFile = self.makeSourceFile(
  128. [
  129. "import sys, os, signal",
  130. "sys.stdout.write('stdout bytes\\n')",
  131. "sys.stderr.write('stderr bytes\\n')",
  132. "sys.stdout.flush()",
  133. "sys.stderr.flush()",
  134. "os.kill(os.getpid(), signal.SIGKILL)",
  135. ]
  136. )
  137. def gotOutputAndValue(out_err_sig):
  138. out, err, sig = out_err_sig
  139. self.assertEqual(out, b"stdout bytes\n")
  140. self.assertEqual(err, b"stderr bytes\n")
  141. self.assertEqual(sig, signal.SIGKILL)
  142. d = utils.getProcessOutputAndValue(self.exe, ["-u", scriptFile])
  143. d = self.assertFailure(d, tuple)
  144. return d.addCallback(gotOutputAndValue)
  145. def _pathTest(self, utilFunc, check):
  146. dir = os.path.abspath(self.mktemp())
  147. os.makedirs(dir)
  148. scriptFile = self.makeSourceFile(
  149. ["import os, sys", "sys.stdout.write(os.getcwd())"]
  150. )
  151. d = utilFunc(self.exe, ["-u", scriptFile], path=dir)
  152. d.addCallback(check, dir.encode(sys.getfilesystemencoding()))
  153. return d
  154. def test_getProcessOutputPath(self):
  155. """
  156. L{getProcessOutput} runs the given command with the working directory
  157. given by the C{path} parameter.
  158. """
  159. return self._pathTest(utils.getProcessOutput, self.assertEqual)
  160. def test_getProcessValuePath(self):
  161. """
  162. L{getProcessValue} runs the given command with the working directory
  163. given by the C{path} parameter.
  164. """
  165. def check(result, ignored):
  166. self.assertEqual(result, 0)
  167. return self._pathTest(utils.getProcessValue, check)
  168. def test_getProcessOutputAndValuePath(self):
  169. """
  170. L{getProcessOutputAndValue} runs the given command with the working
  171. directory given by the C{path} parameter.
  172. """
  173. def check(out_err_status, dir):
  174. out, err, status = out_err_status
  175. self.assertEqual(out, dir)
  176. self.assertEqual(status, 0)
  177. return self._pathTest(utils.getProcessOutputAndValue, check)
  178. def _defaultPathTest(self, utilFunc, check):
  179. # Make another directory to mess around with.
  180. dir = os.path.abspath(self.mktemp())
  181. os.makedirs(dir)
  182. scriptFile = self.makeSourceFile(
  183. ["import os, sys", "cdir = os.getcwd()", "sys.stdout.write(cdir)"]
  184. )
  185. # Switch to it, but make sure we switch back
  186. self.addCleanup(os.chdir, os.getcwd())
  187. os.chdir(dir)
  188. # Remember its default permissions.
  189. originalMode = stat.S_IMODE(os.stat(".").st_mode)
  190. # On macOS Catalina (and maybe elsewhere), os.getcwd() sometimes fails
  191. # with EACCES if u+rx is missing from the working directory, so don't
  192. # reduce it further than this.
  193. os.chmod(dir, stat.S_IXUSR | stat.S_IRUSR)
  194. # Restore the permissions to their original state later (probably
  195. # adding at least u+w), because otherwise it might be hard to delete
  196. # the trial temporary directory.
  197. self.addCleanup(os.chmod, dir, originalMode)
  198. d = utilFunc(self.exe, ["-u", scriptFile])
  199. d.addCallback(check, dir.encode(sys.getfilesystemencoding()))
  200. return d
  201. def test_getProcessOutputDefaultPath(self):
  202. """
  203. If no value is supplied for the C{path} parameter, L{getProcessOutput}
  204. runs the given command in the same working directory as the parent
  205. process and succeeds even if the current working directory is not
  206. accessible.
  207. """
  208. return self._defaultPathTest(utils.getProcessOutput, self.assertEqual)
  209. def test_getProcessValueDefaultPath(self):
  210. """
  211. If no value is supplied for the C{path} parameter, L{getProcessValue}
  212. runs the given command in the same working directory as the parent
  213. process and succeeds even if the current working directory is not
  214. accessible.
  215. """
  216. def check(result, ignored):
  217. self.assertEqual(result, 0)
  218. return self._defaultPathTest(utils.getProcessValue, check)
  219. def test_getProcessOutputAndValueDefaultPath(self):
  220. """
  221. If no value is supplied for the C{path} parameter,
  222. L{getProcessOutputAndValue} runs the given command in the same working
  223. directory as the parent process and succeeds even if the current
  224. working directory is not accessible.
  225. """
  226. def check(out_err_status, dir):
  227. out, err, status = out_err_status
  228. self.assertEqual(out, dir)
  229. self.assertEqual(status, 0)
  230. return self._defaultPathTest(utils.getProcessOutputAndValue, check)
  231. def test_get_processOutputAndValueStdin(self):
  232. """
  233. Standard input can be made available to the child process by passing
  234. bytes for the `stdinBytes` parameter.
  235. """
  236. scriptFile = self.makeSourceFile(
  237. [
  238. "import sys",
  239. "sys.stdout.write(sys.stdin.read())",
  240. ]
  241. )
  242. stdinBytes = b"These are the bytes to see."
  243. d = utils.getProcessOutputAndValue(
  244. self.exe,
  245. ["-u", scriptFile],
  246. stdinBytes=stdinBytes,
  247. )
  248. def gotOutputAndValue(out_err_code):
  249. out, err, code = out_err_code
  250. # Avoid making an exact equality comparison in case there is extra
  251. # random output on stdout (warnings, stray print statements,
  252. # logging, who knows).
  253. self.assertIn(stdinBytes, out)
  254. self.assertEqual(0, code)
  255. d.addCallback(gotOutputAndValue)
  256. return d
  257. class SuppressWarningsTests(SynchronousTestCase):
  258. """
  259. Tests for L{utils.suppressWarnings}.
  260. """
  261. def test_suppressWarnings(self):
  262. """
  263. L{utils.suppressWarnings} decorates a function so that the given
  264. warnings are suppressed.
  265. """
  266. result = []
  267. def showwarning(self, *a, **kw):
  268. result.append((a, kw))
  269. self.patch(warnings, "showwarning", showwarning)
  270. def f(msg):
  271. warnings.warn(msg)
  272. g = utils.suppressWarnings(f, (("ignore",), dict(message="This is message")))
  273. # Start off with a sanity check - calling the original function
  274. # should emit the warning.
  275. f("Sanity check message")
  276. self.assertEqual(len(result), 1)
  277. # Now that that's out of the way, call the wrapped function, and
  278. # make sure no new warnings show up.
  279. g("This is message")
  280. self.assertEqual(len(result), 1)
  281. # Finally, emit another warning which should not be ignored, and
  282. # make sure it is not.
  283. g("Unignored message")
  284. self.assertEqual(len(result), 2)
  285. class DeferredSuppressedWarningsTests(SuppressedWarningsTests):
  286. """
  287. Tests for L{utils.runWithWarningsSuppressed}, the version that supports
  288. Deferreds.
  289. """
  290. # Override the non-Deferred-supporting function from the base class with
  291. # the function we are testing in this class:
  292. runWithWarningsSuppressed = staticmethod(utils.runWithWarningsSuppressed)
  293. def test_deferredCallback(self):
  294. """
  295. If the function called by L{utils.runWithWarningsSuppressed} returns a
  296. C{Deferred}, the warning filters aren't removed until the Deferred
  297. fires.
  298. """
  299. filters = [(("ignore", ".*foo.*"), {}), (("ignore", ".*bar.*"), {})]
  300. result = Deferred()
  301. self.runWithWarningsSuppressed(filters, lambda: result)
  302. warnings.warn("ignore foo")
  303. result.callback(3)
  304. warnings.warn("ignore foo 2")
  305. self.assertEqual(["ignore foo 2"], [w["message"] for w in self.flushWarnings()])
  306. def test_deferredErrback(self):
  307. """
  308. If the function called by L{utils.runWithWarningsSuppressed} returns a
  309. C{Deferred}, the warning filters aren't removed until the Deferred
  310. fires with an errback.
  311. """
  312. filters = [(("ignore", ".*foo.*"), {}), (("ignore", ".*bar.*"), {})]
  313. result = Deferred()
  314. d = self.runWithWarningsSuppressed(filters, lambda: result)
  315. warnings.warn("ignore foo")
  316. result.errback(ZeroDivisionError())
  317. d.addErrback(lambda f: f.trap(ZeroDivisionError))
  318. warnings.warn("ignore foo 2")
  319. self.assertEqual(["ignore foo 2"], [w["message"] for w in self.flushWarnings()])