Funktionierender Prototyp des Serious Games zur Vermittlung von Wissen zu Software-Engineering-Arbeitsmodellen.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

win32eventreactor.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. A win32event based implementation of the Twisted main loop.
  5. This requires pywin32 (formerly win32all) or ActivePython to be installed.
  6. To install the event loop (and you should do this before any connections,
  7. listeners or connectors are added)::
  8. from twisted.internet import win32eventreactor
  9. win32eventreactor.install()
  10. LIMITATIONS:
  11. 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
  12. 2. Process running has some problems (see L{twisted.internet.process} docstring).
  13. TODO:
  14. 1. Event loop handling of writes is *very* problematic (this is causing failed tests).
  15. Switch to doing it the correct way, whatever that means (see below).
  16. 2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
  17. 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
  18. ALTERNATIVE SOLUTIONS:
  19. - IIRC, sockets can only be registered once. So we switch to a structure
  20. like the poll() reactor, thus allowing us to deal with write events in
  21. a decent fashion. This should allow us to pass tests, but we're still
  22. limited to 64 events.
  23. Or:
  24. - Instead of doing a reactor, we make this an addon to the select reactor.
  25. The WFMO event loop runs in a separate thread. This means no need to maintain
  26. separate code for networking, 64 event limit doesn't apply to sockets,
  27. we can run processes and other win32 stuff in default event loop. The
  28. only problem is that we're stuck with the icky socket based waker.
  29. Another benefit is that this could be extended to support >64 events
  30. in a simpler manner than the previous solution.
  31. The 2nd solution is probably what will get implemented.
  32. """
  33. import sys
  34. # System imports
  35. import time
  36. from threading import Thread
  37. from weakref import WeakKeyDictionary
  38. from zope.interface import implementer
  39. # Win32 imports
  40. from win32file import ( # type: ignore[import]
  41. FD_ACCEPT,
  42. FD_CLOSE,
  43. FD_CONNECT,
  44. FD_READ,
  45. WSAEventSelect,
  46. )
  47. try:
  48. # WSAEnumNetworkEvents was added in pywin32 215
  49. from win32file import WSAEnumNetworkEvents
  50. except ImportError:
  51. import warnings
  52. warnings.warn(
  53. "Reliable disconnection notification requires pywin32 215 or later",
  54. category=UserWarning,
  55. )
  56. def WSAEnumNetworkEvents(fd, event):
  57. return {FD_READ}
  58. import win32gui # type: ignore[import]
  59. from win32event import ( # type: ignore[import]
  60. QS_ALLINPUT,
  61. WAIT_OBJECT_0,
  62. WAIT_TIMEOUT,
  63. CreateEvent,
  64. MsgWaitForMultipleObjects,
  65. )
  66. # Twisted imports
  67. from twisted.internet import posixbase
  68. from twisted.internet.interfaces import IReactorFDSet, IReactorWin32Events
  69. from twisted.internet.threads import blockingCallFromThread
  70. from twisted.python import failure, log, threadable
  71. @implementer(IReactorFDSet, IReactorWin32Events)
  72. class Win32Reactor(posixbase.PosixReactorBase):
  73. """
  74. Reactor that uses Win32 event APIs.
  75. @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
  76. win32 event object used to check for read events for that descriptor.
  77. @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
  78. arbitrary value. Keys in this dictionary will be given a chance to
  79. write out their data.
  80. @ivar _events: A dictionary mapping win32 event object to tuples of
  81. L{FileDescriptor} instances and event masks.
  82. @ivar _closedAndReading: Along with C{_closedAndNotReading}, keeps track of
  83. descriptors which have had close notification delivered from the OS but
  84. which we have not finished reading data from. MsgWaitForMultipleObjects
  85. will only deliver close notification to us once, so we remember it in
  86. these two dictionaries until we're ready to act on it. The OS has
  87. delivered close notification for each descriptor in this dictionary, and
  88. the descriptors are marked as allowed to handle read events in the
  89. reactor, so they can be processed. When a descriptor is marked as not
  90. allowed to handle read events in the reactor (ie, it is passed to
  91. L{IReactorFDSet.removeReader}), it is moved out of this dictionary and
  92. into C{_closedAndNotReading}. The descriptors are keys in this
  93. dictionary. The values are arbitrary.
  94. @type _closedAndReading: C{dict}
  95. @ivar _closedAndNotReading: These descriptors have had close notification
  96. delivered from the OS, but are not marked as allowed to handle read
  97. events in the reactor. They are saved here to record their closed
  98. state, but not processed at all. When one of these descriptors is
  99. passed to L{IReactorFDSet.addReader}, it is moved out of this dictionary
  100. and into C{_closedAndReading}. The descriptors are keys in this
  101. dictionary. The values are arbitrary. This is a weak key dictionary so
  102. that if an application tells the reactor to stop reading from a
  103. descriptor and then forgets about that descriptor itself, the reactor
  104. will also forget about it.
  105. @type _closedAndNotReading: C{WeakKeyDictionary}
  106. """
  107. dummyEvent = CreateEvent(None, 0, 0, None)
  108. def __init__(self):
  109. self._reads = {}
  110. self._writes = {}
  111. self._events = {}
  112. self._closedAndReading = {}
  113. self._closedAndNotReading = WeakKeyDictionary()
  114. posixbase.PosixReactorBase.__init__(self)
  115. def _makeSocketEvent(self, fd, action, why):
  116. """
  117. Make a win32 event object for a socket.
  118. """
  119. event = CreateEvent(None, 0, 0, None)
  120. WSAEventSelect(fd, event, why)
  121. self._events[event] = (fd, action)
  122. return event
  123. def addEvent(self, event, fd, action):
  124. """
  125. Add a new win32 event to the event loop.
  126. """
  127. self._events[event] = (fd, action)
  128. def removeEvent(self, event):
  129. """
  130. Remove an event.
  131. """
  132. del self._events[event]
  133. def addReader(self, reader):
  134. """
  135. Add a socket FileDescriptor for notification of data available to read.
  136. """
  137. if reader not in self._reads:
  138. self._reads[reader] = self._makeSocketEvent(
  139. reader, "doRead", FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE
  140. )
  141. # If the reader is closed, move it over to the dictionary of reading
  142. # descriptors.
  143. if reader in self._closedAndNotReading:
  144. self._closedAndReading[reader] = True
  145. del self._closedAndNotReading[reader]
  146. def addWriter(self, writer):
  147. """
  148. Add a socket FileDescriptor for notification of data available to write.
  149. """
  150. if writer not in self._writes:
  151. self._writes[writer] = 1
  152. def removeReader(self, reader):
  153. """Remove a Selectable for notification of data available to read."""
  154. if reader in self._reads:
  155. del self._events[self._reads[reader]]
  156. del self._reads[reader]
  157. # If the descriptor is closed, move it out of the dictionary of
  158. # reading descriptors into the dictionary of waiting descriptors.
  159. if reader in self._closedAndReading:
  160. self._closedAndNotReading[reader] = True
  161. del self._closedAndReading[reader]
  162. def removeWriter(self, writer):
  163. """Remove a Selectable for notification of data available to write."""
  164. if writer in self._writes:
  165. del self._writes[writer]
  166. def removeAll(self):
  167. """
  168. Remove all selectables, and return a list of them.
  169. """
  170. return self._removeAll(self._reads, self._writes)
  171. def getReaders(self):
  172. return list(self._reads.keys())
  173. def getWriters(self):
  174. return list(self._writes.keys())
  175. def doWaitForMultipleEvents(self, timeout):
  176. log.msg(channel="system", event="iteration", reactor=self)
  177. if timeout is None:
  178. timeout = 100
  179. # Keep track of whether we run any application code before we get to the
  180. # MsgWaitForMultipleObjects. If so, there's a chance it will schedule a
  181. # new timed call or stop the reactor or do something else that means we
  182. # shouldn't block in MsgWaitForMultipleObjects for the full timeout.
  183. ranUserCode = False
  184. # If any descriptors are trying to close, try to get them out of the way
  185. # first.
  186. for reader in list(self._closedAndReading.keys()):
  187. ranUserCode = True
  188. self._runAction("doRead", reader)
  189. for fd in list(self._writes.keys()):
  190. ranUserCode = True
  191. log.callWithLogger(fd, self._runWrite, fd)
  192. if ranUserCode:
  193. # If application code *might* have scheduled an event, assume it
  194. # did. If we're wrong, we'll get back here shortly anyway. If
  195. # we're right, we'll be sure to handle the event (including reactor
  196. # shutdown) in a timely manner.
  197. timeout = 0
  198. if not (self._events or self._writes):
  199. # sleep so we don't suck up CPU time
  200. time.sleep(timeout)
  201. return
  202. handles = list(self._events.keys()) or [self.dummyEvent]
  203. timeout = int(timeout * 1000)
  204. val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT)
  205. if val == WAIT_TIMEOUT:
  206. return
  207. elif val == WAIT_OBJECT_0 + len(handles):
  208. exit = win32gui.PumpWaitingMessages()
  209. if exit:
  210. self.callLater(0, self.stop)
  211. return
  212. elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
  213. event = handles[val - WAIT_OBJECT_0]
  214. fd, action = self._events[event]
  215. if fd in self._reads:
  216. # Before anything, make sure it's still a valid file descriptor.
  217. fileno = fd.fileno()
  218. if fileno == -1:
  219. self._disconnectSelectable(fd, posixbase._NO_FILEDESC, False)
  220. return
  221. # Since it's a socket (not another arbitrary event added via
  222. # addEvent) and we asked for FD_READ | FD_CLOSE, check to see if
  223. # we actually got FD_CLOSE. This needs a special check because
  224. # it only gets delivered once. If we miss it, it's gone forever
  225. # and we'll never know that the connection is closed.
  226. events = WSAEnumNetworkEvents(fileno, event)
  227. if FD_CLOSE in events:
  228. self._closedAndReading[fd] = True
  229. log.callWithLogger(fd, self._runAction, action, fd)
  230. def _runWrite(self, fd):
  231. closed = 0
  232. try:
  233. closed = fd.doWrite()
  234. except BaseException:
  235. closed = sys.exc_info()[1]
  236. log.deferr()
  237. if closed:
  238. self.removeReader(fd)
  239. self.removeWriter(fd)
  240. try:
  241. fd.connectionLost(failure.Failure(closed))
  242. except BaseException:
  243. log.deferr()
  244. elif closed is None:
  245. return 1
  246. def _runAction(self, action, fd):
  247. try:
  248. closed = getattr(fd, action)()
  249. except BaseException:
  250. closed = sys.exc_info()[1]
  251. log.deferr()
  252. if closed:
  253. self._disconnectSelectable(fd, closed, action == "doRead")
  254. doIteration = doWaitForMultipleEvents
  255. class _ThreadFDWrapper:
  256. """
  257. This wraps an event handler and translates notification in the helper
  258. L{Win32Reactor} thread into a notification in the primary reactor thread.
  259. @ivar _reactor: The primary reactor, the one to which event notification
  260. will be sent.
  261. @ivar _fd: The L{FileDescriptor} to which the event will be dispatched.
  262. @ivar _action: A C{str} giving the method of C{_fd} which handles the event.
  263. @ivar _logPrefix: The pre-fetched log prefix string for C{_fd}, so that
  264. C{_fd.logPrefix} does not need to be called in a non-main thread.
  265. """
  266. def __init__(self, reactor, fd, action, logPrefix):
  267. self._reactor = reactor
  268. self._fd = fd
  269. self._action = action
  270. self._logPrefix = logPrefix
  271. def logPrefix(self):
  272. """
  273. Return the original handler's log prefix, as it was given to
  274. C{__init__}.
  275. """
  276. return self._logPrefix
  277. def _execute(self):
  278. """
  279. Callback fired when the associated event is set. Run the C{action}
  280. callback on the wrapped descriptor in the main reactor thread and raise
  281. or return whatever it raises or returns to cause this event handler to
  282. be removed from C{self._reactor} if appropriate.
  283. """
  284. return blockingCallFromThread(
  285. self._reactor, lambda: getattr(self._fd, self._action)()
  286. )
  287. def connectionLost(self, reason):
  288. """
  289. Pass through to the wrapped descriptor, but in the main reactor thread
  290. instead of the helper C{Win32Reactor} thread.
  291. """
  292. self._reactor.callFromThread(self._fd.connectionLost, reason)
  293. @implementer(IReactorWin32Events)
  294. class _ThreadedWin32EventsMixin:
  295. """
  296. This mixin implements L{IReactorWin32Events} for another reactor by running
  297. a L{Win32Reactor} in a separate thread and dispatching work to it.
  298. @ivar _reactor: The L{Win32Reactor} running in the other thread. This is
  299. L{None} until it is actually needed.
  300. @ivar _reactorThread: The L{threading.Thread} which is running the
  301. L{Win32Reactor}. This is L{None} until it is actually needed.
  302. """
  303. _reactor = None
  304. _reactorThread = None
  305. def _unmakeHelperReactor(self):
  306. """
  307. Stop and discard the reactor started by C{_makeHelperReactor}.
  308. """
  309. self._reactor.callFromThread(self._reactor.stop)
  310. self._reactor = None
  311. def _makeHelperReactor(self):
  312. """
  313. Create and (in a new thread) start a L{Win32Reactor} instance to use for
  314. the implementation of L{IReactorWin32Events}.
  315. """
  316. self._reactor = Win32Reactor()
  317. # This is a helper reactor, it is not the global reactor and its thread
  318. # is not "the" I/O thread. Prevent it from registering it as such.
  319. self._reactor._registerAsIOThread = False
  320. self._reactorThread = Thread(target=self._reactor.run, args=(False,))
  321. self.addSystemEventTrigger("after", "shutdown", self._unmakeHelperReactor)
  322. self._reactorThread.start()
  323. def addEvent(self, event, fd, action):
  324. """
  325. @see: L{IReactorWin32Events}
  326. """
  327. if self._reactor is None:
  328. self._makeHelperReactor()
  329. self._reactor.callFromThread(
  330. self._reactor.addEvent,
  331. event,
  332. _ThreadFDWrapper(self, fd, action, fd.logPrefix()),
  333. "_execute",
  334. )
  335. def removeEvent(self, event):
  336. """
  337. @see: L{IReactorWin32Events}
  338. """
  339. self._reactor.callFromThread(self._reactor.removeEvent, event)
  340. def install():
  341. threadable.init(1)
  342. r = Win32Reactor()
  343. from . import main
  344. main.installReactor(r)
  345. __all__ = ["Win32Reactor", "install"]