123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592 |
- """Selectors module.
-
- This module allows high-level and efficient I/O multiplexing, built upon the
- `select` module primitives.
-
- The following code adapted from trollius.selectors.
- """
-
-
- from abc import ABCMeta, abstractmethod
- from collections import namedtuple, Mapping
- import math
- import select
- import sys
-
- from gunicorn._compat import wrap_error, InterruptedError
- from gunicorn import six
-
-
- # generic events, that must be mapped to implementation-specific ones
- EVENT_READ = (1 << 0)
- EVENT_WRITE = (1 << 1)
-
-
- def _fileobj_to_fd(fileobj):
- """Return a file descriptor from a file object.
-
- Parameters:
- fileobj -- file object or file descriptor
-
- Returns:
- corresponding file descriptor
-
- Raises:
- ValueError if the object is invalid
- """
- if isinstance(fileobj, six.integer_types):
- fd = fileobj
- else:
- try:
- fd = int(fileobj.fileno())
- except (AttributeError, TypeError, ValueError):
- raise ValueError("Invalid file object: "
- "{0!r}".format(fileobj))
- if fd < 0:
- raise ValueError("Invalid file descriptor: {0}".format(fd))
- return fd
-
-
- SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
- """Object used to associate a file object to its backing file descriptor,
- selected event mask and attached data."""
-
-
- class _SelectorMapping(Mapping):
- """Mapping of file objects to selector keys."""
-
- def __init__(self, selector):
- self._selector = selector
-
- def __len__(self):
- return len(self._selector._fd_to_key)
-
- def __getitem__(self, fileobj):
- try:
- fd = self._selector._fileobj_lookup(fileobj)
- return self._selector._fd_to_key[fd]
- except KeyError:
- raise KeyError("{0!r} is not registered".format(fileobj))
-
- def __iter__(self):
- return iter(self._selector._fd_to_key)
-
-
- class BaseSelector(six.with_metaclass(ABCMeta)):
- """Selector abstract base class.
-
- A selector supports registering file objects to be monitored for specific
- I/O events.
-
- A file object is a file descriptor or any object with a `fileno()` method.
- An arbitrary object can be attached to the file object, which can be used
- for example to store context information, a callback, etc.
-
- A selector can use various implementations (select(), poll(), epoll()...)
- depending on the platform. The default `Selector` class uses the most
- efficient implementation on the current platform.
- """
-
- @abstractmethod
- def register(self, fileobj, events, data=None):
- """Register a file object.
-
- Parameters:
- fileobj -- file object or file descriptor
- events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
- data -- attached data
-
- Returns:
- SelectorKey instance
-
- Raises:
- ValueError if events is invalid
- KeyError if fileobj is already registered
- OSError if fileobj is closed or otherwise is unacceptable to
- the underlying system call (if a system call is made)
-
- Note:
- OSError may or may not be raised
- """
- raise NotImplementedError
-
- @abstractmethod
- def unregister(self, fileobj):
- """Unregister a file object.
-
- Parameters:
- fileobj -- file object or file descriptor
-
- Returns:
- SelectorKey instance
-
- Raises:
- KeyError if fileobj is not registered
-
- Note:
- If fileobj is registered but has since been closed this does
- *not* raise OSError (even if the wrapped syscall does)
- """
- raise NotImplementedError
-
- def modify(self, fileobj, events, data=None):
- """Change a registered file object monitored events or attached data.
-
- Parameters:
- fileobj -- file object or file descriptor
- events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
- data -- attached data
-
- Returns:
- SelectorKey instance
-
- Raises:
- Anything that unregister() or register() raises
- """
- self.unregister(fileobj)
- return self.register(fileobj, events, data)
-
- @abstractmethod
- def select(self, timeout=None):
- """Perform the actual selection, until some monitored file objects are
- ready or a timeout expires.
-
- Parameters:
- timeout -- if timeout > 0, this specifies the maximum wait time, in
- seconds
- if timeout <= 0, the select() call won't block, and will
- report the currently ready file objects
- if timeout is None, select() will block until a monitored
- file object becomes ready
-
- Returns:
- list of (key, events) for ready file objects
- `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
- """
- raise NotImplementedError
-
- def close(self):
- """Close the selector.
-
- This must be called to make sure that any underlying resource is freed.
- """
- pass
-
- def get_key(self, fileobj):
- """Return the key associated to a registered file object.
-
- Returns:
- SelectorKey for this file object
- """
- mapping = self.get_map()
- try:
- return mapping[fileobj]
- except KeyError:
- raise KeyError("{0!r} is not registered".format(fileobj))
-
- @abstractmethod
- def get_map(self):
- """Return a mapping of file objects to selector keys."""
- raise NotImplementedError
-
- def __enter__(self):
- return self
-
- def __exit__(self, *args):
- self.close()
-
-
- class _BaseSelectorImpl(BaseSelector):
- """Base selector implementation."""
-
- def __init__(self):
- # this maps file descriptors to keys
- self._fd_to_key = {}
- # read-only mapping returned by get_map()
- self._map = _SelectorMapping(self)
-
- def _fileobj_lookup(self, fileobj):
- """Return a file descriptor from a file object.
-
- This wraps _fileobj_to_fd() to do an exhaustive search in case
- the object is invalid but we still have it in our map. This
- is used by unregister() so we can unregister an object that
- was previously registered even if it is closed. It is also
- used by _SelectorMapping.
- """
- try:
- return _fileobj_to_fd(fileobj)
- except ValueError:
- # Do an exhaustive search.
- for key in self._fd_to_key.values():
- if key.fileobj is fileobj:
- return key.fd
- # Raise ValueError after all.
- raise
-
- def register(self, fileobj, events, data=None):
- if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
- raise ValueError("Invalid events: {0!r}".format(events))
-
- key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
-
- if key.fd in self._fd_to_key:
- raise KeyError("{0!r} (FD {1}) is already registered"
- .format(fileobj, key.fd))
-
- self._fd_to_key[key.fd] = key
- return key
-
- def unregister(self, fileobj):
- try:
- key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
- except KeyError:
- raise KeyError("{0!r} is not registered".format(fileobj))
- return key
-
- def modify(self, fileobj, events, data=None):
- # TODO: Subclasses can probably optimize this even further.
- try:
- key = self._fd_to_key[self._fileobj_lookup(fileobj)]
- except KeyError:
- raise KeyError("{0!r} is not registered".format(fileobj))
- if events != key.events:
- self.unregister(fileobj)
- key = self.register(fileobj, events, data)
- elif data != key.data:
- # Use a shortcut to update the data.
- key = key._replace(data=data)
- self._fd_to_key[key.fd] = key
- return key
-
- def close(self):
- self._fd_to_key.clear()
-
- def get_map(self):
- return self._map
-
- def _key_from_fd(self, fd):
- """Return the key associated to a given file descriptor.
-
- Parameters:
- fd -- file descriptor
-
- Returns:
- corresponding key, or None if not found
- """
- try:
- return self._fd_to_key[fd]
- except KeyError:
- return None
-
-
- class SelectSelector(_BaseSelectorImpl):
- """Select-based selector."""
-
- def __init__(self):
- super(SelectSelector, self).__init__()
- self._readers = set()
- self._writers = set()
-
- def register(self, fileobj, events, data=None):
- key = super(SelectSelector, self).register(fileobj, events, data)
- if events & EVENT_READ:
- self._readers.add(key.fd)
- if events & EVENT_WRITE:
- self._writers.add(key.fd)
- return key
-
- def unregister(self, fileobj):
- key = super(SelectSelector, self).unregister(fileobj)
- self._readers.discard(key.fd)
- self._writers.discard(key.fd)
- return key
-
- if sys.platform == 'win32':
- def _select(self, r, w, _, timeout=None):
- r, w, x = select.select(r, w, w, timeout)
- return r, w + x, []
- else:
- _select = select.select
-
- def select(self, timeout=None):
- timeout = None if timeout is None else max(timeout, 0)
- ready = []
- try:
- r, w, _ = wrap_error(self._select,
- self._readers, self._writers, [], timeout)
- except InterruptedError:
- return ready
- r = set(r)
- w = set(w)
- for fd in r | w:
- events = 0
- if fd in r:
- events |= EVENT_READ
- if fd in w:
- events |= EVENT_WRITE
-
- key = self._key_from_fd(fd)
- if key:
- ready.append((key, events & key.events))
- return ready
-
-
- if hasattr(select, 'poll'):
-
- class PollSelector(_BaseSelectorImpl):
- """Poll-based selector."""
-
- def __init__(self):
- super(PollSelector, self).__init__()
- self._poll = select.poll()
-
- def register(self, fileobj, events, data=None):
- key = super(PollSelector, self).register(fileobj, events, data)
- poll_events = 0
- if events & EVENT_READ:
- poll_events |= select.POLLIN
- if events & EVENT_WRITE:
- poll_events |= select.POLLOUT
- self._poll.register(key.fd, poll_events)
- return key
-
- def unregister(self, fileobj):
- key = super(PollSelector, self).unregister(fileobj)
- self._poll.unregister(key.fd)
- return key
-
- def select(self, timeout=None):
- if timeout is None:
- timeout = None
- elif timeout <= 0:
- timeout = 0
- else:
- # poll() has a resolution of 1 millisecond, round away from
- # zero to wait *at least* timeout seconds.
- timeout = int(math.ceil(timeout * 1e3))
- ready = []
- try:
- fd_event_list = wrap_error(self._poll.poll, timeout)
- except InterruptedError:
- return ready
- for fd, event in fd_event_list:
- events = 0
- if event & ~select.POLLIN:
- events |= EVENT_WRITE
- if event & ~select.POLLOUT:
- events |= EVENT_READ
-
- key = self._key_from_fd(fd)
- if key:
- ready.append((key, events & key.events))
- return ready
-
-
- if hasattr(select, 'epoll'):
-
- class EpollSelector(_BaseSelectorImpl):
- """Epoll-based selector."""
-
- def __init__(self):
- super(EpollSelector, self).__init__()
- self._epoll = select.epoll()
-
- def fileno(self):
- return self._epoll.fileno()
-
- def register(self, fileobj, events, data=None):
- key = super(EpollSelector, self).register(fileobj, events, data)
- epoll_events = 0
- if events & EVENT_READ:
- epoll_events |= select.EPOLLIN
- if events & EVENT_WRITE:
- epoll_events |= select.EPOLLOUT
- self._epoll.register(key.fd, epoll_events)
- return key
-
- def unregister(self, fileobj):
- key = super(EpollSelector, self).unregister(fileobj)
- try:
- self._epoll.unregister(key.fd)
- except OSError:
- # This can happen if the FD was closed since it
- # was registered.
- pass
- return key
-
- def select(self, timeout=None):
- if timeout is None:
- timeout = -1
- elif timeout <= 0:
- timeout = 0
- else:
- # epoll_wait() has a resolution of 1 millisecond, round away
- # from zero to wait *at least* timeout seconds.
- timeout = math.ceil(timeout * 1e3) * 1e-3
- max_ev = len(self._fd_to_key)
- ready = []
- try:
- fd_event_list = wrap_error(self._epoll.poll, timeout, max_ev)
- except InterruptedError:
- return ready
- for fd, event in fd_event_list:
- events = 0
- if event & ~select.EPOLLIN:
- events |= EVENT_WRITE
- if event & ~select.EPOLLOUT:
- events |= EVENT_READ
-
- key = self._key_from_fd(fd)
- if key:
- ready.append((key, events & key.events))
- return ready
-
- def close(self):
- self._epoll.close()
- super(EpollSelector, self).close()
-
-
- if hasattr(select, 'devpoll'):
-
- class DevpollSelector(_BaseSelectorImpl):
- """Solaris /dev/poll selector."""
-
- def __init__(self):
- super(DevpollSelector, self).__init__()
- self._devpoll = select.devpoll()
-
- def fileno(self):
- return self._devpoll.fileno()
-
- def register(self, fileobj, events, data=None):
- key = super(DevpollSelector, self).register(fileobj, events, data)
- poll_events = 0
- if events & EVENT_READ:
- poll_events |= select.POLLIN
- if events & EVENT_WRITE:
- poll_events |= select.POLLOUT
- self._devpoll.register(key.fd, poll_events)
- return key
-
- def unregister(self, fileobj):
- key = super(DevpollSelector, self).unregister(fileobj)
- self._devpoll.unregister(key.fd)
- return key
-
- def select(self, timeout=None):
- if timeout is None:
- timeout = None
- elif timeout <= 0:
- timeout = 0
- else:
- # devpoll() has a resolution of 1 millisecond, round away from
- # zero to wait *at least* timeout seconds.
- timeout = math.ceil(timeout * 1e3)
- ready = []
- try:
- fd_event_list = self._devpoll.poll(timeout)
- except InterruptedError:
- return ready
- for fd, event in fd_event_list:
- events = 0
- if event & ~select.POLLIN:
- events |= EVENT_WRITE
- if event & ~select.POLLOUT:
- events |= EVENT_READ
-
- key = self._key_from_fd(fd)
- if key:
- ready.append((key, events & key.events))
- return ready
-
- def close(self):
- self._devpoll.close()
- super(DevpollSelector, self).close()
-
-
- if hasattr(select, 'kqueue'):
-
- class KqueueSelector(_BaseSelectorImpl):
- """Kqueue-based selector."""
-
- def __init__(self):
- super(KqueueSelector, self).__init__()
- self._kqueue = select.kqueue()
-
- def fileno(self):
- return self._kqueue.fileno()
-
- def register(self, fileobj, events, data=None):
- key = super(KqueueSelector, self).register(fileobj, events, data)
- if events & EVENT_READ:
- kev = select.kevent(key.fd, select.KQ_FILTER_READ,
- select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- if events & EVENT_WRITE:
- kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
- select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- return key
-
- def unregister(self, fileobj):
- key = super(KqueueSelector, self).unregister(fileobj)
- if key.events & EVENT_READ:
- kev = select.kevent(key.fd, select.KQ_FILTER_READ,
- select.KQ_EV_DELETE)
- try:
- self._kqueue.control([kev], 0, 0)
- except OSError:
- # This can happen if the FD was closed since it
- # was registered.
- pass
- if key.events & EVENT_WRITE:
- kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
- select.KQ_EV_DELETE)
- try:
- self._kqueue.control([kev], 0, 0)
- except OSError:
- # See comment above.
- pass
- return key
-
- def select(self, timeout=None):
- timeout = None if timeout is None else max(timeout, 0)
- max_ev = len(self._fd_to_key)
- ready = []
- try:
- kev_list = wrap_error(self._kqueue.control,
- None, max_ev, timeout)
- except InterruptedError:
- return ready
- for kev in kev_list:
- fd = kev.ident
- flag = kev.filter
- events = 0
- if flag == select.KQ_FILTER_READ:
- events |= EVENT_READ
- if flag == select.KQ_FILTER_WRITE:
- events |= EVENT_WRITE
-
- key = self._key_from_fd(fd)
- if key:
- ready.append((key, events & key.events))
- return ready
-
- def close(self):
- self._kqueue.close()
- super(KqueueSelector, self).close()
-
-
- # Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select.
- # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
- if 'KqueueSelector' in globals():
- DefaultSelector = KqueueSelector
- elif 'EpollSelector' in globals():
- DefaultSelector = EpollSelector
- elif 'DevpollSelector' in globals():
- DefaultSelector = DevpollSelector
- elif 'PollSelector' in globals():
- DefaultSelector = PollSelector
- else:
- DefaultSelector = SelectSelector
|