123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- import logging
- import multiprocessing
- import os
- import pickle
- import tempfile
- import traceback
- from concurrent.futures import CancelledError
-
-
- class BaseDaphneTestingInstance:
- """
- Launches an instance of Daphne in a subprocess, with a host and port
- attribute allowing you to call it.
-
- Works as a context manager.
- """
-
- startup_timeout = 2
-
- def __init__(
- self, xff=False, http_timeout=None, request_buffer_size=None, *, application
- ):
- self.xff = xff
- self.http_timeout = http_timeout
- self.host = "127.0.0.1"
- self.request_buffer_size = request_buffer_size
- self.application = application
-
- def get_application(self):
- return self.application
-
- def __enter__(self):
- # Option Daphne features
- kwargs = {}
- if self.request_buffer_size:
- kwargs["request_buffer_size"] = self.request_buffer_size
- # Optionally enable X-Forwarded-For support.
- if self.xff:
- kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For"
- kwargs["proxy_forwarded_port_header"] = "X-Forwarded-Port"
- kwargs["proxy_forwarded_proto_header"] = "X-Forwarded-Proto"
- if self.http_timeout:
- kwargs["http_timeout"] = self.http_timeout
- # Start up process
- self.process = DaphneProcess(
- host=self.host,
- get_application=self.get_application,
- kwargs=kwargs,
- setup=self.process_setup,
- teardown=self.process_teardown,
- )
- self.process.start()
- # Wait for the port
- if self.process.ready.wait(self.startup_timeout):
- self.port = self.process.port.value
- return self
- else:
- if self.process.errors.empty():
- raise RuntimeError("Daphne did not start up, no error caught")
- else:
- error, traceback = self.process.errors.get(False)
- raise RuntimeError("Daphne did not start up:\n%s" % traceback)
-
- def __exit__(self, exc_type, exc_value, traceback):
- # Shut down the process
- self.process.terminate()
- del self.process
-
- def process_setup(self):
- """
- Called by the process just before it starts serving.
- """
- pass
-
- def process_teardown(self):
- """
- Called by the process just after it stops serving
- """
- pass
-
- def get_received(self):
- pass
-
-
- class DaphneTestingInstance(BaseDaphneTestingInstance):
- def __init__(self, *args, **kwargs):
- self.lock = multiprocessing.Lock()
- super().__init__(*args, **kwargs, application=TestApplication(lock=self.lock))
-
- def __enter__(self):
- # Clear result storage
- TestApplication.delete_setup()
- TestApplication.delete_result()
- return super().__enter__()
-
- def get_received(self):
- """
- Returns the scope and messages the test application has received
- so far. Note you'll get all messages since scope start, not just any
- new ones since the last call.
-
- Also checks for any exceptions in the application. If there are,
- raises them.
- """
- try:
- with self.lock:
- inner_result = TestApplication.load_result()
- except FileNotFoundError:
- raise ValueError("No results available yet.")
- # Check for exception
- if "exception" in inner_result:
- raise inner_result["exception"]
- return inner_result["scope"], inner_result["messages"]
-
- def add_send_messages(self, messages):
- """
- Adds messages for the application to send back.
- The next time it receives an incoming message, it will reply with these.
- """
- TestApplication.save_setup(response_messages=messages)
-
-
- class DaphneProcess(multiprocessing.Process):
- """
- Process subclass that launches and runs a Daphne instance, communicating the
- port it ends up listening on back to the parent process.
- """
-
- def __init__(self, host, get_application, kwargs=None, setup=None, teardown=None):
- super().__init__()
- self.host = host
- self.get_application = get_application
- self.kwargs = kwargs or {}
- self.setup = setup
- self.teardown = teardown
- self.port = multiprocessing.Value("i")
- self.ready = multiprocessing.Event()
- self.errors = multiprocessing.Queue()
-
- def run(self):
- # OK, now we are in a forked child process, and want to use the reactor.
- # However, FreeBSD systems like MacOS do not fork the underlying Kqueue,
- # which asyncio (hence asyncioreactor) is built on.
- # Therefore, we should uninstall the broken reactor and install a new one.
- _reinstall_reactor()
-
- from twisted.internet import reactor
-
- from .endpoints import build_endpoint_description_strings
- from .server import Server
-
- application = self.get_application()
-
- try:
- # Create the server class
- endpoints = build_endpoint_description_strings(host=self.host, port=0)
- self.server = Server(
- application=application,
- endpoints=endpoints,
- signal_handlers=False,
- **self.kwargs
- )
- # Set up a poller to look for the port
- reactor.callLater(0.1, self.resolve_port)
- # Run with setup/teardown
- if self.setup is not None:
- self.setup()
- try:
- self.server.run()
- finally:
- if self.teardown is not None:
- self.teardown()
- except BaseException as e:
- # Put the error on our queue so the parent gets it
- self.errors.put((e, traceback.format_exc()))
-
- def resolve_port(self):
- from twisted.internet import reactor
-
- if self.server.listening_addresses:
- self.port.value = self.server.listening_addresses[0][1]
- self.ready.set()
- else:
- reactor.callLater(0.1, self.resolve_port)
-
-
- class TestApplication:
- """
- An application that receives one or more messages, sends a response,
- and then quits the server. For testing.
- """
-
- setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio")
- result_storage = os.path.join(tempfile.gettempdir(), "result.testio")
-
- def __init__(self, lock):
- self.lock = lock
- self.messages = []
-
- async def __call__(self, scope, receive, send):
- self.scope = scope
- # Receive input and send output
- logging.debug("test app coroutine alive")
- try:
- while True:
- # Receive a message and save it into the result store
- self.messages.append(await receive())
- self.lock.acquire()
- logging.debug("test app received %r", self.messages[-1])
- self.save_result(self.scope, self.messages)
- self.lock.release()
- # See if there are any messages to send back
- setup = self.load_setup()
- self.delete_setup()
- for message in setup["response_messages"]:
- await send(message)
- logging.debug("test app sent %r", message)
- except Exception as e:
- if isinstance(e, CancelledError):
- # Don't catch task-cancelled errors!
- raise
- else:
- self.save_exception(e)
-
- @classmethod
- def save_setup(cls, response_messages):
- """
- Stores setup information.
- """
- with open(cls.setup_storage, "wb") as fh:
- pickle.dump({"response_messages": response_messages}, fh)
-
- @classmethod
- def load_setup(cls):
- """
- Returns setup details.
- """
- try:
- with open(cls.setup_storage, "rb") as fh:
- return pickle.load(fh)
- except FileNotFoundError:
- return {"response_messages": []}
-
- @classmethod
- def save_result(cls, scope, messages):
- """
- Saves details of what happened to the result storage.
- We could use pickle here, but that seems wrong, still, somehow.
- """
- with open(cls.result_storage, "wb") as fh:
- pickle.dump({"scope": scope, "messages": messages}, fh)
-
- @classmethod
- def save_exception(cls, exception):
- """
- Saves details of what happened to the result storage.
- We could use pickle here, but that seems wrong, still, somehow.
- """
- with open(cls.result_storage, "wb") as fh:
- pickle.dump({"exception": exception}, fh)
-
- @classmethod
- def load_result(cls):
- """
- Returns result details.
- """
- with open(cls.result_storage, "rb") as fh:
- return pickle.load(fh)
-
- @classmethod
- def delete_setup(cls):
- """
- Clears setup storage files.
- """
- try:
- os.unlink(cls.setup_storage)
- except OSError:
- pass
-
- @classmethod
- def delete_result(cls):
- """
- Clears result storage files.
- """
- try:
- os.unlink(cls.result_storage)
- except OSError:
- pass
-
-
- def _reinstall_reactor():
- import asyncio
- import sys
-
- from twisted.internet import asyncioreactor
-
- # Uninstall the reactor.
- if "twisted.internet.reactor" in sys.modules:
- del sys.modules["twisted.internet.reactor"]
-
- # The daphne.server module may have already installed the reactor.
- # If so, using this module will use uninstalled one, thus we should
- # reimport this module too.
- if "daphne.server" in sys.modules:
- del sys.modules["daphne.server"]
-
- event_loop = asyncio.new_event_loop()
- asyncioreactor.install(event_loop)
- asyncio.set_event_loop(event_loop)
|