Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

threaded_extension.py 7.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """An ISAPI extension base class implemented using a thread-pool."""
  2. # $Id$
  3. import sys
  4. import threading
  5. import time
  6. import traceback
  7. from pywintypes import OVERLAPPED
  8. from win32event import INFINITE
  9. from win32file import (
  10. CloseHandle,
  11. CreateIoCompletionPort,
  12. GetQueuedCompletionStatus,
  13. PostQueuedCompletionStatus,
  14. )
  15. from win32security import SetThreadToken
  16. import isapi.simple
  17. from isapi import ExtensionError, isapicon
  18. ISAPI_REQUEST = 1
  19. ISAPI_SHUTDOWN = 2
  20. class WorkerThread(threading.Thread):
  21. def __init__(self, extension, io_req_port):
  22. self.running = False
  23. self.io_req_port = io_req_port
  24. self.extension = extension
  25. threading.Thread.__init__(self)
  26. # We wait 15 seconds for a thread to terminate, but if it fails to,
  27. # we don't want the process to hang at exit waiting for it...
  28. self.setDaemon(True)
  29. def run(self):
  30. self.running = True
  31. while self.running:
  32. errCode, bytes, key, overlapped = GetQueuedCompletionStatus(
  33. self.io_req_port, INFINITE
  34. )
  35. if key == ISAPI_SHUTDOWN and overlapped is None:
  36. break
  37. # Let the parent extension handle the command.
  38. dispatcher = self.extension.dispatch_map.get(key)
  39. if dispatcher is None:
  40. raise RuntimeError("Bad request '%s'" % (key,))
  41. dispatcher(errCode, bytes, key, overlapped)
  42. def call_handler(self, cblock):
  43. self.extension.Dispatch(cblock)
  44. # A generic thread-pool based extension, using IO Completion Ports.
  45. # Sub-classes can override one method to implement a simple extension, or
  46. # may leverage the CompletionPort to queue their own requests, and implement a
  47. # fully asynch extension.
  48. class ThreadPoolExtension(isapi.simple.SimpleExtension):
  49. "Base class for an ISAPI extension based around a thread-pool"
  50. max_workers = 20
  51. worker_shutdown_wait = 15000 # 15 seconds for workers to quit...
  52. def __init__(self):
  53. self.workers = []
  54. # extensible dispatch map, for sub-classes that need to post their
  55. # own requests to the completion port.
  56. # Each of these functions is called with the result of
  57. # GetQueuedCompletionStatus for our port.
  58. self.dispatch_map = {
  59. ISAPI_REQUEST: self.DispatchConnection,
  60. }
  61. def GetExtensionVersion(self, vi):
  62. isapi.simple.SimpleExtension.GetExtensionVersion(self, vi)
  63. # As per Q192800, the CompletionPort should be created with the number
  64. # of processors, even if the number of worker threads is much larger.
  65. # Passing 0 means the system picks the number.
  66. self.io_req_port = CreateIoCompletionPort(-1, None, 0, 0)
  67. # start up the workers
  68. self.workers = []
  69. for i in range(self.max_workers):
  70. worker = WorkerThread(self, self.io_req_port)
  71. worker.start()
  72. self.workers.append(worker)
  73. def HttpExtensionProc(self, control_block):
  74. overlapped = OVERLAPPED()
  75. overlapped.object = control_block
  76. PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_REQUEST, overlapped)
  77. return isapicon.HSE_STATUS_PENDING
  78. def TerminateExtension(self, status):
  79. for worker in self.workers:
  80. worker.running = False
  81. for worker in self.workers:
  82. PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_SHUTDOWN, None)
  83. # wait for them to terminate - pity we aren't using 'native' threads
  84. # as then we could do a smart wait - but now we need to poll....
  85. end_time = time.time() + self.worker_shutdown_wait / 1000
  86. alive = self.workers
  87. while alive:
  88. if time.time() > end_time:
  89. # xxx - might be nice to log something here.
  90. break
  91. time.sleep(0.2)
  92. alive = [w for w in alive if w.is_alive()]
  93. self.dispatch_map = {} # break circles
  94. CloseHandle(self.io_req_port)
  95. # This is the one operation the base class supports - a simple
  96. # Connection request. We setup the thread-token, and dispatch to the
  97. # sub-class's 'Dispatch' method.
  98. def DispatchConnection(self, errCode, bytes, key, overlapped):
  99. control_block = overlapped.object
  100. # setup the correct user for this request
  101. hRequestToken = control_block.GetImpersonationToken()
  102. SetThreadToken(None, hRequestToken)
  103. try:
  104. try:
  105. self.Dispatch(control_block)
  106. except:
  107. self.HandleDispatchError(control_block)
  108. finally:
  109. # reset the security context
  110. SetThreadToken(None, None)
  111. def Dispatch(self, ecb):
  112. """Overridden by the sub-class to handle connection requests.
  113. This class creates a thread-pool using a Windows completion port,
  114. and dispatches requests via this port. Sub-classes can generally
  115. implement each connection request using blocking reads and writes, and
  116. the thread-pool will still provide decent response to the end user.
  117. The sub-class can set a max_workers attribute (default is 20). Note
  118. that this generally does *not* mean 20 threads will all be concurrently
  119. running, via the magic of Windows completion ports.
  120. There is no default implementation - sub-classes must implement this.
  121. """
  122. raise NotImplementedError("sub-classes should override Dispatch")
  123. def HandleDispatchError(self, ecb):
  124. """Handles errors in the Dispatch method.
  125. When a Dispatch method call fails, this method is called to handle
  126. the exception. The default implementation formats the traceback
  127. in the browser.
  128. """
  129. ecb.HttpStatusCode = isapicon.HSE_STATUS_ERROR
  130. # control_block.LogData = "we failed!"
  131. exc_typ, exc_val, exc_tb = sys.exc_info()
  132. limit = None
  133. try:
  134. try:
  135. import cgi
  136. ecb.SendResponseHeaders(
  137. "200 OK", "Content-type: text/html\r\n\r\n", False
  138. )
  139. print(file=ecb)
  140. print("<H3>Traceback (most recent call last):</H3>", file=ecb)
  141. list = traceback.format_tb(
  142. exc_tb, limit
  143. ) + traceback.format_exception_only(exc_typ, exc_val)
  144. print(
  145. "<PRE>%s<B>%s</B></PRE>"
  146. % (
  147. cgi.escape("".join(list[:-1])),
  148. cgi.escape(list[-1]),
  149. ),
  150. file=ecb,
  151. )
  152. except ExtensionError:
  153. # The client disconnected without reading the error body -
  154. # its probably not a real browser at the other end, ignore it.
  155. pass
  156. except:
  157. print("FAILED to render the error message!")
  158. traceback.print_exc()
  159. print("ORIGINAL extension error:")
  160. traceback.print_exception(exc_typ, exc_val, exc_tb)
  161. finally:
  162. # holding tracebacks in a local of a frame that may itself be
  163. # part of a traceback used to be evil and cause leaks!
  164. exc_tb = None
  165. ecb.DoneWithSession()