123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- # This has to be done first as Twisted is import-order-sensitive with reactors
- import asyncio # isort:skip
- import os # isort:skip
- import sys # isort:skip
- import warnings # isort:skip
- from concurrent.futures import ThreadPoolExecutor # isort:skip
- from twisted.internet import asyncioreactor # isort:skip
-
-
- twisted_loop = asyncio.new_event_loop()
- if "ASGI_THREADS" in os.environ:
- twisted_loop.set_default_executor(
- ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
- )
-
- current_reactor = sys.modules.get("twisted.internet.reactor", None)
- if current_reactor is not None:
- if not isinstance(current_reactor, asyncioreactor.AsyncioSelectorReactor):
- warnings.warn(
- "Something has already installed a non-asyncio Twisted reactor. Attempting to uninstall it; "
- + "you can fix this warning by importing daphne.server early in your codebase or "
- + "finding the package that imports Twisted and importing it later on.",
- UserWarning,
- )
- del sys.modules["twisted.internet.reactor"]
- asyncioreactor.install(twisted_loop)
- else:
- asyncioreactor.install(twisted_loop)
-
- import logging
- import time
- from concurrent.futures import CancelledError
- from functools import partial
-
- from twisted.internet import defer, reactor
- from twisted.internet.endpoints import serverFromString
- from twisted.logger import STDLibLogObserver, globalLogBeginner
- from twisted.web import http
-
- from .http_protocol import HTTPFactory
- from .ws_protocol import WebSocketFactory
-
- logger = logging.getLogger(__name__)
-
-
- class Server:
- def __init__(
- self,
- application,
- endpoints=None,
- signal_handlers=True,
- action_logger=None,
- http_timeout=None,
- request_buffer_size=8192,
- websocket_timeout=86400,
- websocket_connect_timeout=20,
- ping_interval=20,
- ping_timeout=30,
- root_path="",
- proxy_forwarded_address_header=None,
- proxy_forwarded_port_header=None,
- proxy_forwarded_proto_header=None,
- verbosity=1,
- websocket_handshake_timeout=5,
- application_close_timeout=10,
- ready_callable=None,
- server_name="daphne",
- ):
- self.application = application
- self.endpoints = endpoints or []
- self.listeners = []
- self.listening_addresses = []
- self.signal_handlers = signal_handlers
- self.action_logger = action_logger
- self.http_timeout = http_timeout
- self.ping_interval = ping_interval
- self.ping_timeout = ping_timeout
- self.request_buffer_size = request_buffer_size
- self.proxy_forwarded_address_header = proxy_forwarded_address_header
- self.proxy_forwarded_port_header = proxy_forwarded_port_header
- self.proxy_forwarded_proto_header = proxy_forwarded_proto_header
- self.websocket_timeout = websocket_timeout
- self.websocket_connect_timeout = websocket_connect_timeout
- self.websocket_handshake_timeout = websocket_handshake_timeout
- self.application_close_timeout = application_close_timeout
- self.root_path = root_path
- self.verbosity = verbosity
- self.abort_start = False
- self.ready_callable = ready_callable
- self.server_name = server_name
- # Check our construction is actually sensible
- if not self.endpoints:
- logger.error("No endpoints. This server will not listen on anything.")
- sys.exit(1)
-
- def run(self):
- # A dict of protocol: {"application_instance":, "connected":, "disconnected":} dicts
- self.connections = {}
- # Make the factory
- self.http_factory = HTTPFactory(self)
- self.ws_factory = WebSocketFactory(self, server=self.server_name)
- self.ws_factory.setProtocolOptions(
- autoPingTimeout=self.ping_timeout,
- allowNullOrigin=True,
- openHandshakeTimeout=self.websocket_handshake_timeout,
- )
- if self.verbosity <= 1:
- # Redirect the Twisted log to nowhere
- globalLogBeginner.beginLoggingTo(
- [lambda _: None], redirectStandardIO=False, discardBuffer=True
- )
- else:
- globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
-
- # Detect what Twisted features are enabled
- if http.H2_ENABLED:
- logger.info("HTTP/2 support enabled")
- else:
- logger.info(
- "HTTP/2 support not enabled (install the http2 and tls Twisted extras)"
- )
-
- # Kick off the timeout loop
- reactor.callLater(1, self.application_checker)
- reactor.callLater(2, self.timeout_checker)
-
- for socket_description in self.endpoints:
- logger.info("Configuring endpoint %s", socket_description)
- ep = serverFromString(reactor, str(socket_description))
- listener = ep.listen(self.http_factory)
- listener.addCallback(self.listen_success)
- listener.addErrback(self.listen_error)
- self.listeners.append(listener)
-
- # Set the asyncio reactor's event loop as global
- # TODO: Should we instead pass the global one into the reactor?
- asyncio.set_event_loop(reactor._asyncioEventloop)
-
- # Verbosity 3 turns on asyncio debug to find those blocking yields
- if self.verbosity >= 3:
- asyncio.get_event_loop().set_debug(True)
-
- reactor.addSystemEventTrigger("before", "shutdown", self.kill_all_applications)
- if not self.abort_start:
- # Trigger the ready flag if we had one
- if self.ready_callable:
- self.ready_callable()
- # Run the reactor
- reactor.run(installSignalHandlers=self.signal_handlers)
-
- def listen_success(self, port):
- """
- Called when a listen succeeds so we can store port details (if there are any)
- """
- if hasattr(port, "getHost"):
- host = port.getHost()
- if hasattr(host, "host") and hasattr(host, "port"):
- self.listening_addresses.append((host.host, host.port))
- logger.info(
- "Listening on TCP address %s:%s",
- port.getHost().host,
- port.getHost().port,
- )
-
- def listen_error(self, failure):
- logger.critical("Listen failure: %s", failure.getErrorMessage())
- self.stop()
-
- def stop(self):
- """
- Force-stops the server.
- """
- if reactor.running:
- reactor.stop()
- else:
- self.abort_start = True
-
- ### Protocol handling
-
- def protocol_connected(self, protocol):
- """
- Adds a protocol as a current connection.
- """
- if protocol in self.connections:
- raise RuntimeError("Protocol %r was added to main list twice!" % protocol)
- self.connections[protocol] = {"connected": time.time()}
-
- def protocol_disconnected(self, protocol):
- # Set its disconnected time (the loops will come and clean it up)
- # Do not set it if it is already set. Overwriting it might
- # cause it to never be cleaned up.
- # See https://github.com/django/channels/issues/1181
- if "disconnected" not in self.connections[protocol]:
- self.connections[protocol]["disconnected"] = time.time()
-
- ### Internal event/message handling
-
- def create_application(self, protocol, scope):
- """
- Creates a new application instance that fronts a Protocol instance
- for one of our supported protocols. Pass it the protocol,
- and it will work out the type, supply appropriate callables, and
- return you the application's input queue
- """
- # Make sure the protocol has not had another application made for it
- assert "application_instance" not in self.connections[protocol]
- # Make an instance of the application
- input_queue = asyncio.Queue()
- scope.setdefault("asgi", {"version": "3.0"})
- application_instance = self.application(
- scope=scope,
- receive=input_queue.get,
- send=partial(self.handle_reply, protocol),
- )
- # Run it, and stash the future for later checking
- if protocol not in self.connections:
- return None
- self.connections[protocol]["application_instance"] = asyncio.ensure_future(
- application_instance,
- loop=asyncio.get_event_loop(),
- )
- return input_queue
-
- async def handle_reply(self, protocol, message):
- """
- Coroutine that jumps the reply message from asyncio to Twisted
- """
- # Don't do anything if the connection is closed or does not exist
- if protocol not in self.connections or self.connections[protocol].get(
- "disconnected", None
- ):
- return
- try:
- self.check_headers_type(message)
- except ValueError:
- # Ensure to send SOME reply.
- protocol.basic_error(500, b"Server Error", "Server Error")
- raise
- # Let the protocol handle it
- protocol.handle_reply(message)
-
- @staticmethod
- def check_headers_type(message):
- if not message["type"] == "http.response.start":
- return
- for k, v in message.get("headers", []):
- if not isinstance(k, bytes):
- raise ValueError(
- "Header name '{}' expected to be `bytes`, but got `{}`".format(
- k, type(k)
- )
- )
- if not isinstance(v, bytes):
- raise ValueError(
- "Header value '{}' expected to be `bytes`, but got `{}`".format(
- v, type(v)
- )
- )
-
- ### Utility
-
- def application_checker(self):
- """
- Goes through the set of current application Futures and cleans up
- any that are done/prints exceptions for any that errored.
- """
- for protocol, details in list(self.connections.items()):
- disconnected = details.get("disconnected", None)
- application_instance = details.get("application_instance", None)
- # First, see if the protocol disconnected and the app has taken
- # too long to close up
- if (
- disconnected
- and time.time() - disconnected > self.application_close_timeout
- ):
- if application_instance and not application_instance.done():
- logger.warning(
- "Application instance %r for connection %s took too long to shut down and was killed.",
- application_instance,
- repr(protocol),
- )
- application_instance.cancel()
- # Then see if the app is done and we should reap it
- if application_instance and application_instance.done():
- try:
- exception = application_instance.exception()
- except (CancelledError, asyncio.CancelledError):
- # Future cancellation. We can ignore this.
- pass
- else:
- if exception:
- if isinstance(exception, KeyboardInterrupt):
- # Protocol is asking the server to exit (likely during test)
- self.stop()
- else:
- logger.error(
- "Exception inside application: %s",
- exception,
- exc_info=exception,
- )
- if not disconnected:
- protocol.handle_exception(exception)
- del self.connections[protocol]["application_instance"]
- application_instance = None
- # Check to see if protocol is closed and app is closed so we can remove it
- if not application_instance and disconnected:
- del self.connections[protocol]
- reactor.callLater(1, self.application_checker)
-
- def kill_all_applications(self):
- """
- Kills all application coroutines before reactor exit.
- """
- # Send cancel to all coroutines
- wait_for = []
- for details in self.connections.values():
- application_instance = details["application_instance"]
- if not application_instance.done():
- application_instance.cancel()
- wait_for.append(application_instance)
- logger.info("Killed %i pending application instances", len(wait_for))
- # Make Twisted wait until they're all dead
- wait_deferred = defer.Deferred.fromFuture(asyncio.gather(*wait_for))
- wait_deferred.addErrback(lambda x: None)
- return wait_deferred
-
- def timeout_checker(self):
- """
- Called periodically to enforce timeout rules on all connections.
- Also checks pings at the same time.
- """
- for protocol in list(self.connections.keys()):
- protocol.check_timeouts()
- reactor.callLater(2, self.timeout_checker)
-
- def log_action(self, protocol, action, details):
- """
- Dispatches to any registered action logger, if there is one.
- """
- if self.action_logger:
- self.action_logger(protocol, action, details)
|