Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pywin32_testutil.py 11KB

1 year ago

  1. # Utilities for the pywin32 tests
  2. import gc
  3. import os
  4. import site
  5. import sys
  6. import unittest
  7. import winerror
  8. ##
  9. ## General purpose utilities for the test suite.
  10. ##
  11. # The test suite has lots of string constants containing binary data, but
  12. # the strings are used in various "bytes" contexts.
  13. def str2bytes(sval):
  14. if sys.version_info < (3, 0) and isinstance(sval, str):
  15. sval = sval.decode("latin1")
  16. return sval.encode("latin1")
  17. # Sometimes we want to pass a string that should explicitly be treated as
  18. # a memory blob.
  19. def str2memory(sval):
  20. if sys.version_info < (3, 0):
  21. return buffer(sval)
  22. # py3k.
  23. return memoryview(sval.encode("latin1"))
  24. # Sometimes we want to pass an object that exposes its memory
  25. def ob2memory(ob):
  26. if sys.version_info < (3, 0):
  27. return buffer(ob)
  28. # py3k.
  29. return memoryview(ob)
  30. ##
  31. ## unittest related stuff
  32. ##
  33. # This is a specialized TestCase adaptor which wraps a real test.
  34. class LeakTestCase(unittest.TestCase):
  35. """An 'adaptor' which takes another test. In debug builds we execute the
  36. test once to remove one-off side-effects, then capture the total
  37. reference count, then execute the test a few times. If the total
  38. refcount at the end is greater than we first captured, we have a leak!
  39. In release builds the test is executed just once, as normal.
  40. Generally used automatically by the test runner - you can safely
  41. ignore this.
  42. """
  43. def __init__(self, real_test):
  44. unittest.TestCase.__init__(self)
  45. self.real_test = real_test
  46. self.num_test_cases = 1
  47. self.num_leak_iters = 2 # seems to be enough!
  48. if hasattr(sys, "gettotalrefcount"):
  49. self.num_test_cases = self.num_test_cases + self.num_leak_iters
  50. def countTestCases(self):
  51. return self.num_test_cases
  52. def __call__(self, result=None):
  53. # For the COM suite's sake, always ensure we don't leak
  54. # gateways/interfaces
  55. from pythoncom import _GetGatewayCount, _GetInterfaceCount
  56. gc.collect()
  57. ni = _GetInterfaceCount()
  58. ng = _GetGatewayCount()
  59. self.real_test(result)
  60. # Failed - no point checking anything else
  61. if result.shouldStop or not result.wasSuccessful():
  62. return
  63. self._do_leak_tests(result)
  64. gc.collect()
  65. lost_i = _GetInterfaceCount() - ni
  66. lost_g = _GetGatewayCount() - ng
  67. if lost_i or lost_g:
  68. msg = "%d interface objects and %d gateway objects leaked" % (
  69. lost_i,
  70. lost_g,
  71. )
  72. exc = AssertionError(msg)
  73. result.addFailure(self.real_test, (exc.__class__, exc, None))
  74. def runTest(self):
  75. assert 0, "not used"
  76. def _do_leak_tests(self, result=None):
  77. try:
  78. gtrc = sys.gettotalrefcount
  79. except AttributeError:
  80. return # can't do leak tests in this build
  81. # Assume already called once, to prime any caches etc
  82. gc.collect()
  83. trc = gtrc()
  84. for i in range(self.num_leak_iters):
  85. self.real_test(result)
  86. if result.shouldStop:
  87. break
  88. del i # created after we remembered the refcount!
  89. # int division here means one or 2 stray references won't force
  90. # failure, but one per loop
  91. gc.collect()
  92. lost = (gtrc() - trc) // self.num_leak_iters
  93. if lost < 0:
  94. msg = "LeakTest: %s appeared to gain %d references!!" % (
  95. self.real_test,
  96. -lost,
  97. )
  98. result.addFailure(self.real_test, (AssertionError, msg, None))
  99. if lost > 0:
  100. msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
  101. exc = AssertionError(msg)
  102. result.addFailure(self.real_test, (exc.__class__, exc, None))
  103. class TestLoader(unittest.TestLoader):
  104. def loadTestsFromTestCase(self, testCaseClass):
  105. """Return a suite of all tests cases contained in testCaseClass"""
  106. leak_tests = []
  107. for name in self.getTestCaseNames(testCaseClass):
  108. real_test = testCaseClass(name)
  109. leak_test = self._getTestWrapper(real_test)
  110. leak_tests.append(leak_test)
  111. return self.suiteClass(leak_tests)
  112. def fixupTestsForLeakTests(self, test):
  113. if isinstance(test, unittest.TestSuite):
  114. test._tests = [self.fixupTestsForLeakTests(t) for t in test._tests]
  115. return test
  116. else:
  117. # just a normal test case.
  118. return self._getTestWrapper(test)
  119. def _getTestWrapper(self, test):
  120. # one or 2 tests in the COM test suite set this...
  121. no_leak_tests = getattr(test, "no_leak_tests", False)
  122. if no_leak_tests:
  123. print("Test says it doesn't want leak tests!")
  124. return test
  125. return LeakTestCase(test)
  126. def loadTestsFromModule(self, mod):
  127. if hasattr(mod, "suite"):
  128. tests = mod.suite()
  129. else:
  130. tests = unittest.TestLoader.loadTestsFromModule(self, mod)
  131. return self.fixupTestsForLeakTests(tests)
  132. def loadTestsFromName(self, name, module=None):
  133. test = unittest.TestLoader.loadTestsFromName(self, name, module)
  134. if isinstance(test, unittest.TestSuite):
  135. pass # hmmm? print "Don't wrap suites yet!", test._tests
  136. elif isinstance(test, unittest.TestCase):
  137. test = self._getTestWrapper(test)
  138. else:
  139. print("XXX - what is", test)
  140. return test
  141. # Lots of classes necessary to support one simple feature: we want a 3rd
  142. # test result state - "SKIPPED" - to indicate that the test wasn't able
  143. # to be executed for various reasons. Inspired by bzr's tests, but it
  144. # has other concepts, such as "Expected Failure", which we don't bother
  145. # with.
  146. # win32 error codes that probably mean we need to be elevated (ie, if we
  147. # aren't elevated, we treat these error codes as 'skipped')
  148. non_admin_error_codes = [
  149. winerror.ERROR_ACCESS_DENIED,
  150. winerror.ERROR_PRIVILEGE_NOT_HELD,
  151. ]
  152. _is_admin = None
  153. def check_is_admin():
  154. global _is_admin
  155. if _is_admin is None:
  156. import pythoncom
  157. from win32com.shell.shell import IsUserAnAdmin
  158. try:
  159. _is_admin = IsUserAnAdmin()
  160. except pythoncom.com_error as exc:
  161. if exc.hresult != winerror.E_NOTIMPL:
  162. raise
  163. # not impl on this platform - must be old - assume is admin
  164. _is_admin = True
  165. return _is_admin
  166. # Find a test "fixture" (eg, binary test file) expected to be very close to
  167. # the test being run.
  168. # If the tests are being run from the "installed" version, then these fixtures
  169. # probably don't exist - the test is "skipped".
  170. # But it's fatal if we think we might be running from a pywin32 source tree.
  171. def find_test_fixture(basename, extra_dir="."):
  172. # look for the test file in various places
  173. candidates = [
  174. os.path.dirname(sys.argv[0]),
  175. extra_dir,
  176. ".",
  177. ]
  178. for candidate in candidates:
  179. fname = os.path.join(candidate, basename)
  180. if os.path.isfile(fname):
  181. return fname
  182. else:
  183. # Can't find it - see if this is expected or not.
  184. # This module is typically always in the installed dir, so use argv[0]
  185. this_file = os.path.normcase(os.path.abspath(sys.argv[0]))
  186. dirs_to_check = site.getsitepackages()[:]
  187. if site.USER_SITE:
  188. dirs_to_check.append(site.USER_SITE)
  189. for d in dirs_to_check:
  190. d = os.path.normcase(d)
  191. if os.path.commonprefix([this_file, d]) == d:
  192. # looks like we are in an installed Python, so skip the text.
  193. raise TestSkipped(f"Can't find test fixture '{fname}'")
  194. # Looks like we are running from source, so this is fatal.
  195. raise RuntimeError(f"Can't find test fixture '{fname}'")
  196. # If this exception is raised by a test, the test is reported as a 'skip'
  197. class TestSkipped(Exception):
  198. pass
  199. # This appears to have been "upgraded" to non-private in 3.11
  200. try:
  201. TextTestResult = unittest._TextTestResult
  202. except AttributeError:
  203. TextTestResult = unittest.TextTestResult
  204. # The 'TestResult' subclass that records the failures and has the special
  205. # handling for the TestSkipped exception.
  206. class TestResult(TextTestResult):
  207. def __init__(self, *args, **kw):
  208. super(TestResult, self).__init__(*args, **kw)
  209. self.skips = {} # count of skips for each reason.
  210. def addError(self, test, err):
  211. """Called when an error has occurred. 'err' is a tuple of values as
  212. returned by sys.exc_info().
  213. """
  214. # translate a couple of 'well-known' exceptions into 'skipped'
  215. import pywintypes
  216. exc_val = err[1]
  217. # translate ERROR_ACCESS_DENIED for non-admin users to be skipped.
  218. # (access denied errors for an admin user aren't expected.)
  219. if (
  220. isinstance(exc_val, pywintypes.error)
  221. and exc_val.winerror in non_admin_error_codes
  222. and not check_is_admin()
  223. ):
  224. exc_val = TestSkipped(exc_val)
  225. # and COM errors due to objects not being registered (the com test
  226. # suite will attempt to catch this and handle it itself if the user
  227. # is admin)
  228. elif isinstance(exc_val, pywintypes.com_error) and exc_val.hresult in [
  229. winerror.CO_E_CLASSSTRING,
  230. winerror.REGDB_E_CLASSNOTREG,
  231. winerror.TYPE_E_LIBNOTREGISTERED,
  232. ]:
  233. exc_val = TestSkipped(exc_val)
  234. # NotImplemented generally means the platform doesn't support the
  235. # functionality.
  236. elif isinstance(exc_val, NotImplementedError):
  237. exc_val = TestSkipped(NotImplementedError)
  238. if isinstance(exc_val, TestSkipped):
  239. reason = exc_val.args[0]
  240. # if the reason itself is another exception, get its args.
  241. try:
  242. reason = tuple(reason.args)
  243. except (AttributeError, TypeError):
  244. pass
  245. self.skips.setdefault(reason, 0)
  246. self.skips[reason] += 1
  247. if self.showAll:
  248. self.stream.writeln("SKIP (%s)" % (reason,))
  249. elif self.dots:
  250. self.stream.write("S")
  251. self.stream.flush()
  252. return
  253. super(TestResult, self).addError(test, err)
  254. def printErrors(self):
  255. super(TestResult, self).printErrors()
  256. for reason, num_skipped in self.skips.items():
  257. self.stream.writeln("SKIPPED: %d tests - %s" % (num_skipped, reason))
  258. # TestRunner subclass necessary just to get our TestResult hooked up.
  259. class TestRunner(unittest.TextTestRunner):
  260. def _makeResult(self):
  261. return TestResult(self.stream, self.descriptions, self.verbosity)
  262. # TestProgream subclass necessary just to get our TestRunner hooked up,
  263. # which is necessary to get our TestResult hooked up *sob*
  264. class TestProgram(unittest.TestProgram):
  265. def runTests(self):
  266. # clobber existing runner - *sob* - it shouldn't be this hard
  267. self.testRunner = TestRunner(verbosity=self.verbosity)
  268. unittest.TestProgram.runTests(self)
  269. # A convenient entry-point - if used, 'SKIPPED' exceptions will be supressed.
  270. def testmain(*args, **kw):
  271. new_kw = kw.copy()
  272. if "testLoader" not in new_kw:
  273. new_kw["testLoader"] = TestLoader()
  274. program_class = new_kw.get("testProgram", TestProgram)
  275. program_class(*args, **new_kw)