from collections import defaultdict from django.apps import apps from django.db import models from django.utils.translation import gettext_lazy as _ class ContentTypeManager(models.Manager): use_in_migrations = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Cache shared by all the get_for_* methods to speed up # ContentType retrieval. self._cache = {} def get_by_natural_key(self, app_label, model): try: ct = self._cache[self.db][(app_label, model)] except KeyError: ct = self.get(app_label=app_label, model=model) self._add_to_cache(self.db, ct) return ct def _get_opts(self, model, for_concrete_model): if for_concrete_model: model = model._meta.concrete_model return model._meta def _get_from_cache(self, opts): key = (opts.app_label, opts.model_name) return self._cache[self.db][key] def get_for_model(self, model, for_concrete_model=True): """ Return the ContentType object for a given model, creating the ContentType if necessary. Lookups are cached so that subsequent lookups for the same model don't hit the database. """ opts = self._get_opts(model, for_concrete_model) try: return self._get_from_cache(opts) except KeyError: pass # The ContentType entry was not found in the cache, therefore we # proceed to load or create it. try: # Start with get() and not get_or_create() in order to use # the db_for_read (see #20401). ct = self.get(app_label=opts.app_label, model=opts.model_name) except self.model.DoesNotExist: # Not found in the database; we proceed to create it. This time # use get_or_create to take care of any race conditions. ct, created = self.get_or_create( app_label=opts.app_label, model=opts.model_name, ) self._add_to_cache(self.db, ct) return ct def get_for_models(self, *models, for_concrete_models=True): """ Given *models, return a dictionary mapping {model: content_type}. """ results = {} # Models that aren't already in the cache. needed_app_labels = set() needed_models = set() # Mapping of opts to the list of models requiring it. needed_opts = defaultdict(list) for model in models: opts = self._get_opts(model, for_concrete_models) try: ct = self._get_from_cache(opts) except KeyError: needed_app_labels.add(opts.app_label) needed_models.add(opts.model_name) needed_opts[opts].append(model) else: results[model] = ct if needed_opts: # Lookup required content types from the DB. cts = self.filter( app_label__in=needed_app_labels, model__in=needed_models ) for ct in cts: opts_models = needed_opts.pop(ct.model_class()._meta, []) for model in opts_models: results[model] = ct self._add_to_cache(self.db, ct) # Create content types that weren't in the cache or DB. for opts, opts_models in needed_opts.items(): ct = self.create( app_label=opts.app_label, model=opts.model_name, ) self._add_to_cache(self.db, ct) for model in opts_models: results[model] = ct return results def get_for_id(self, id): """ Lookup a ContentType by ID. Use the same shared cache as get_for_model (though ContentTypes are obviously not created on-the-fly by get_by_id). """ try: ct = self._cache[self.db][id] except KeyError: # This could raise a DoesNotExist; that's correct behavior and will # make sure that only correct ctypes get stored in the cache dict. ct = self.get(pk=id) self._add_to_cache(self.db, ct) return ct def clear_cache(self): """ Clear out the content-type cache. """ self._cache.clear() def _add_to_cache(self, using, ct): """Insert a ContentType into the cache.""" # Note it's possible for ContentType objects to be stale; model_class() will return None. # Hence, there is no reliance on model._meta.app_label here, just using the model fields instead. key = (ct.app_label, ct.model) self._cache.setdefault(using, {})[key] = ct self._cache.setdefault(using, {})[ct.id] = ct class ContentType(models.Model): app_label = models.CharField(max_length=100) model = models.CharField(_('python model class name'), max_length=100) objects = ContentTypeManager() class Meta: verbose_name = _('content type') verbose_name_plural = _('content types') db_table = 'django_content_type' unique_together = (('app_label', 'model'),) def __str__(self): return self.name @property def name(self): model = self.model_class() if not model: return self.model return str(model._meta.verbose_name) def model_class(self): """Return the model class for this type of content.""" try: return apps.get_model(self.app_label, self.model) except LookupError: return None def get_object_for_this_type(self, **kwargs): """ Return an object of this type for the keyword arguments given. Basically, this is a proxy around this object_type's get_object() model method. The ObjectNotExist exception, if thrown, will not be caught, so code that calls this method should catch it. """ return self.model_class()._base_manager.using(self._state.db).get(**kwargs) def get_all_objects_for_this_type(self, **kwargs): """ Return all objects of this type for the keyword arguments given. """ return self.model_class()._base_manager.using(self._state.db).filter(**kwargs) def natural_key(self): return (self.app_label, self.model)