|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- import pkgutil
- from importlib import import_module
-
- from django.conf import settings
- from django.core.exceptions import ImproperlyConfigured
-
- # For backwards compatibility with Django < 3.2
- from django.utils.connection import ConnectionDoesNotExist # NOQA: F401
- from django.utils.connection import BaseConnectionHandler
- from django.utils.functional import cached_property
- from django.utils.module_loading import import_string
-
- DEFAULT_DB_ALIAS = "default"
- DJANGO_VERSION_PICKLE_KEY = "_django_version"
-
-
- class Error(Exception):
- pass
-
-
- class InterfaceError(Error):
- pass
-
-
- class DatabaseError(Error):
- pass
-
-
- class DataError(DatabaseError):
- pass
-
-
- class OperationalError(DatabaseError):
- pass
-
-
- class IntegrityError(DatabaseError):
- pass
-
-
- class InternalError(DatabaseError):
- pass
-
-
- class ProgrammingError(DatabaseError):
- pass
-
-
- class NotSupportedError(DatabaseError):
- pass
-
-
- class DatabaseErrorWrapper:
- """
- Context manager and decorator that reraises backend-specific database
- exceptions using Django's common wrappers.
- """
-
- def __init__(self, wrapper):
- """
- wrapper is a database wrapper.
-
- It must have a Database attribute defining PEP-249 exceptions.
- """
- self.wrapper = wrapper
-
- def __enter__(self):
- pass
-
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type is None:
- return
- for dj_exc_type in (
- DataError,
- OperationalError,
- IntegrityError,
- InternalError,
- ProgrammingError,
- NotSupportedError,
- DatabaseError,
- InterfaceError,
- Error,
- ):
- db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
- if issubclass(exc_type, db_exc_type):
- dj_exc_value = dj_exc_type(*exc_value.args)
- # Only set the 'errors_occurred' flag for errors that may make
- # the connection unusable.
- if dj_exc_type not in (DataError, IntegrityError):
- self.wrapper.errors_occurred = True
- raise dj_exc_value.with_traceback(traceback) from exc_value
-
- def __call__(self, func):
- # Note that we are intentionally not using @wraps here for performance
- # reasons. Refs #21109.
- def inner(*args, **kwargs):
- with self:
- return func(*args, **kwargs)
-
- return inner
-
-
- def load_backend(backend_name):
- """
- Return a database backend's "base" module given a fully qualified database
- backend name, or raise an error if it doesn't exist.
- """
- # This backend was renamed in Django 1.9.
- if backend_name == "django.db.backends.postgresql_psycopg2":
- backend_name = "django.db.backends.postgresql"
-
- try:
- return import_module("%s.base" % backend_name)
- except ImportError as e_user:
- # The database backend wasn't found. Display a helpful error message
- # listing all built-in database backends.
- import django.db.backends
-
- builtin_backends = [
- name
- for _, name, ispkg in pkgutil.iter_modules(django.db.backends.__path__)
- if ispkg and name not in {"base", "dummy"}
- ]
- if backend_name not in ["django.db.backends.%s" % b for b in builtin_backends]:
- backend_reprs = map(repr, sorted(builtin_backends))
- raise ImproperlyConfigured(
- "%r isn't an available database backend or couldn't be "
- "imported. Check the above exception. To use one of the "
- "built-in backends, use 'django.db.backends.XXX', where XXX "
- "is one of:\n"
- " %s" % (backend_name, ", ".join(backend_reprs))
- ) from e_user
- else:
- # If there's some other error, this must be an error in Django
- raise
-
-
- class ConnectionHandler(BaseConnectionHandler):
- settings_name = "DATABASES"
- # Connections needs to still be an actual thread local, as it's truly
- # thread-critical. Database backends should use @async_unsafe to protect
- # their code from async contexts, but this will give those contexts
- # separate connections in case it's needed as well. There's no cleanup
- # after async contexts, though, so we don't allow that if we can help it.
- thread_critical = True
-
- def configure_settings(self, databases):
- databases = super().configure_settings(databases)
- if databases == {}:
- databases[DEFAULT_DB_ALIAS] = {"ENGINE": "django.db.backends.dummy"}
- elif DEFAULT_DB_ALIAS not in databases:
- raise ImproperlyConfigured(
- f"You must define a '{DEFAULT_DB_ALIAS}' database."
- )
- elif databases[DEFAULT_DB_ALIAS] == {}:
- databases[DEFAULT_DB_ALIAS]["ENGINE"] = "django.db.backends.dummy"
-
- # Configure default settings.
- for conn in databases.values():
- conn.setdefault("ATOMIC_REQUESTS", False)
- conn.setdefault("AUTOCOMMIT", True)
- conn.setdefault("ENGINE", "django.db.backends.dummy")
- if conn["ENGINE"] == "django.db.backends." or not conn["ENGINE"]:
- conn["ENGINE"] = "django.db.backends.dummy"
- conn.setdefault("CONN_MAX_AGE", 0)
- conn.setdefault("CONN_HEALTH_CHECKS", False)
- conn.setdefault("OPTIONS", {})
- conn.setdefault("TIME_ZONE", None)
- for setting in ["NAME", "USER", "PASSWORD", "HOST", "PORT"]:
- conn.setdefault(setting, "")
-
- test_settings = conn.setdefault("TEST", {})
- default_test_settings = [
- ("CHARSET", None),
- ("COLLATION", None),
- ("MIGRATE", True),
- ("MIRROR", None),
- ("NAME", None),
- ]
- for key, value in default_test_settings:
- test_settings.setdefault(key, value)
- return databases
-
- @property
- def databases(self):
- # Maintained for backward compatibility as some 3rd party packages have
- # made use of this private API in the past. It is no longer used within
- # Django itself.
- return self.settings
-
- def create_connection(self, alias):
- db = self.settings[alias]
- backend = load_backend(db["ENGINE"])
- return backend.DatabaseWrapper(db, alias)
-
-
- class ConnectionRouter:
- def __init__(self, routers=None):
- """
- If routers is not specified, default to settings.DATABASE_ROUTERS.
- """
- self._routers = routers
-
- @cached_property
- def routers(self):
- if self._routers is None:
- self._routers = settings.DATABASE_ROUTERS
- routers = []
- for r in self._routers:
- if isinstance(r, str):
- router = import_string(r)()
- else:
- router = r
- routers.append(router)
- return routers
-
- def _router_func(action):
- def _route_db(self, model, **hints):
- chosen_db = None
- for router in self.routers:
- try:
- method = getattr(router, action)
- except AttributeError:
- # If the router doesn't have a method, skip to the next one.
- pass
- else:
- chosen_db = method(model, **hints)
- if chosen_db:
- return chosen_db
- instance = hints.get("instance")
- if instance is not None and instance._state.db:
- return instance._state.db
- return DEFAULT_DB_ALIAS
-
- return _route_db
-
- db_for_read = _router_func("db_for_read")
- db_for_write = _router_func("db_for_write")
-
- def allow_relation(self, obj1, obj2, **hints):
- for router in self.routers:
- try:
- method = router.allow_relation
- except AttributeError:
- # If the router doesn't have a method, skip to the next one.
- pass
- else:
- allow = method(obj1, obj2, **hints)
- if allow is not None:
- return allow
- return obj1._state.db == obj2._state.db
-
- def allow_migrate(self, db, app_label, **hints):
- for router in self.routers:
- try:
- method = router.allow_migrate
- except AttributeError:
- # If the router doesn't have a method, skip to the next one.
- continue
-
- allow = method(db, app_label, **hints)
-
- if allow is not None:
- return allow
- return True
-
- def allow_migrate_model(self, db, model):
- return self.allow_migrate(
- db,
- model._meta.app_label,
- model_name=model._meta.model_name,
- model=model,
- )
-
- def get_migratable_models(self, app_config, db, include_auto_created=False):
- """Return app models allowed to be migrated on provided db."""
- models = app_config.get_models(include_auto_created=include_auto_created)
- return [model for model in models if self.allow_migrate_model(db, model)]
|