|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647 |
- import functools
- import inspect
- from functools import partial
-
- from django import forms
- from django.apps import apps
- from django.conf import SettingsReference
- from django.core import checks, exceptions
- from django.db import connection, router
- from django.db.backends import utils
- from django.db.models import Q
- from django.db.models.constants import LOOKUP_SEP
- from django.db.models.deletion import CASCADE, SET_DEFAULT, SET_NULL
- from django.db.models.query_utils import PathInfo
- from django.db.models.utils import make_model_tuple
- from django.utils.functional import cached_property
- from django.utils.translation import gettext_lazy as _
-
- from . import Field
- from .mixins import FieldCacheMixin
- from .related_descriptors import (
- ForwardManyToOneDescriptor, ForwardOneToOneDescriptor,
- ManyToManyDescriptor, ReverseManyToOneDescriptor,
- ReverseOneToOneDescriptor,
- )
- from .related_lookups import (
- RelatedExact, RelatedGreaterThan, RelatedGreaterThanOrEqual, RelatedIn,
- RelatedIsNull, RelatedLessThan, RelatedLessThanOrEqual,
- )
- from .reverse_related import (
- ForeignObjectRel, ManyToManyRel, ManyToOneRel, OneToOneRel,
- )
-
- RECURSIVE_RELATIONSHIP_CONSTANT = 'self'
-
-
- def resolve_relation(scope_model, relation):
- """
- Transform relation into a model or fully-qualified model string of the form
- "app_label.ModelName", relative to scope_model.
-
- The relation argument can be:
- * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
- the model argument will be returned.
- * A bare model name without an app_label, in which case scope_model's
- app_label will be prepended.
- * An "app_label.ModelName" string.
- * A model class, which will be returned unchanged.
- """
- # Check for recursive relations
- if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
- relation = scope_model
-
- # Look for an "app.Model" relation
- if isinstance(relation, str):
- if "." not in relation:
- relation = "%s.%s" % (scope_model._meta.app_label, relation)
-
- return relation
-
-
- def lazy_related_operation(function, model, *related_models, **kwargs):
- """
- Schedule `function` to be called once `model` and all `related_models`
- have been imported and registered with the app registry. `function` will
- be called with the newly-loaded model classes as its positional arguments,
- plus any optional keyword arguments.
-
- The `model` argument must be a model class. Each subsequent positional
- argument is another model, or a reference to another model - see
- `resolve_relation()` for the various forms these may take. Any relative
- references will be resolved relative to `model`.
-
- This is a convenience wrapper for `Apps.lazy_model_operation` - the app
- registry model used is the one found in `model._meta.apps`.
- """
- models = [model] + [resolve_relation(model, rel) for rel in related_models]
- model_keys = (make_model_tuple(m) for m in models)
- apps = model._meta.apps
- return apps.lazy_model_operation(partial(function, **kwargs), *model_keys)
-
-
- class RelatedField(FieldCacheMixin, Field):
- """Base class that all relational fields inherit from."""
-
- # Field flags
- one_to_many = False
- one_to_one = False
- many_to_many = False
- many_to_one = False
-
- @cached_property
- def related_model(self):
- # Can't cache this property until all the models are loaded.
- apps.check_models_ready()
- return self.remote_field.model
-
- def check(self, **kwargs):
- return [
- *super().check(**kwargs),
- *self._check_related_name_is_valid(),
- *self._check_related_query_name_is_valid(),
- *self._check_relation_model_exists(),
- *self._check_referencing_to_swapped_model(),
- *self._check_clashes(),
- ]
-
- def _check_related_name_is_valid(self):
- import keyword
- related_name = self.remote_field.related_name
- if related_name is None:
- return []
- is_valid_id = not keyword.iskeyword(related_name) and related_name.isidentifier()
- if not (is_valid_id or related_name.endswith('+')):
- return [
- checks.Error(
- "The name '%s' is invalid related_name for field %s.%s" %
- (self.remote_field.related_name, self.model._meta.object_name,
- self.name),
- hint="Related name must be a valid Python identifier or end with a '+'",
- obj=self,
- id='fields.E306',
- )
- ]
- return []
-
- def _check_related_query_name_is_valid(self):
- if self.remote_field.is_hidden():
- return []
- rel_query_name = self.related_query_name()
- errors = []
- if rel_query_name.endswith('_'):
- errors.append(
- checks.Error(
- "Reverse query name '%s' must not end with an underscore."
- % (rel_query_name,),
- hint=("Add or change a related_name or related_query_name "
- "argument for this field."),
- obj=self,
- id='fields.E308',
- )
- )
- if LOOKUP_SEP in rel_query_name:
- errors.append(
- checks.Error(
- "Reverse query name '%s' must not contain '%s'."
- % (rel_query_name, LOOKUP_SEP),
- hint=("Add or change a related_name or related_query_name "
- "argument for this field."),
- obj=self,
- id='fields.E309',
- )
- )
- return errors
-
- def _check_relation_model_exists(self):
- rel_is_missing = self.remote_field.model not in self.opts.apps.get_models()
- rel_is_string = isinstance(self.remote_field.model, str)
- model_name = self.remote_field.model if rel_is_string else self.remote_field.model._meta.object_name
- if rel_is_missing and (rel_is_string or not self.remote_field.model._meta.swapped):
- return [
- checks.Error(
- "Field defines a relation with model '%s', which is either "
- "not installed, or is abstract." % model_name,
- obj=self,
- id='fields.E300',
- )
- ]
- return []
-
- def _check_referencing_to_swapped_model(self):
- if (self.remote_field.model not in self.opts.apps.get_models() and
- not isinstance(self.remote_field.model, str) and
- self.remote_field.model._meta.swapped):
- model = "%s.%s" % (
- self.remote_field.model._meta.app_label,
- self.remote_field.model._meta.object_name
- )
- return [
- checks.Error(
- "Field defines a relation with the model '%s', which has "
- "been swapped out." % model,
- hint="Update the relation to point at 'settings.%s'." % self.remote_field.model._meta.swappable,
- obj=self,
- id='fields.E301',
- )
- ]
- return []
-
- def _check_clashes(self):
- """Check accessor and reverse query name clashes."""
- from django.db.models.base import ModelBase
-
- errors = []
- opts = self.model._meta
-
- # `f.remote_field.model` may be a string instead of a model. Skip if model name is
- # not resolved.
- if not isinstance(self.remote_field.model, ModelBase):
- return []
-
- # Consider that we are checking field `Model.foreign` and the models
- # are:
- #
- # class Target(models.Model):
- # model = models.IntegerField()
- # model_set = models.IntegerField()
- #
- # class Model(models.Model):
- # foreign = models.ForeignKey(Target)
- # m2m = models.ManyToManyField(Target)
-
- # rel_opts.object_name == "Target"
- rel_opts = self.remote_field.model._meta
- # If the field doesn't install a backward relation on the target model
- # (so `is_hidden` returns True), then there are no clashes to check
- # and we can skip these fields.
- rel_is_hidden = self.remote_field.is_hidden()
- rel_name = self.remote_field.get_accessor_name() # i. e. "model_set"
- rel_query_name = self.related_query_name() # i. e. "model"
- field_name = "%s.%s" % (opts.object_name, self.name) # i. e. "Model.field"
-
- # Check clashes between accessor or reverse query name of `field`
- # and any other field name -- i.e. accessor for Model.foreign is
- # model_set and it clashes with Target.model_set.
- potential_clashes = rel_opts.fields + rel_opts.many_to_many
- for clash_field in potential_clashes:
- clash_name = "%s.%s" % (rel_opts.object_name, clash_field.name) # i.e. "Target.model_set"
- if not rel_is_hidden and clash_field.name == rel_name:
- errors.append(
- checks.Error(
- "Reverse accessor for '%s' clashes with field name '%s'." % (field_name, clash_name),
- hint=("Rename field '%s', or add/change a related_name "
- "argument to the definition for field '%s'.") % (clash_name, field_name),
- obj=self,
- id='fields.E302',
- )
- )
-
- if clash_field.name == rel_query_name:
- errors.append(
- checks.Error(
- "Reverse query name for '%s' clashes with field name '%s'." % (field_name, clash_name),
- hint=("Rename field '%s', or add/change a related_name "
- "argument to the definition for field '%s'.") % (clash_name, field_name),
- obj=self,
- id='fields.E303',
- )
- )
-
- # Check clashes between accessors/reverse query names of `field` and
- # any other field accessor -- i. e. Model.foreign accessor clashes with
- # Model.m2m accessor.
- potential_clashes = (r for r in rel_opts.related_objects if r.field is not self)
- for clash_field in potential_clashes:
- clash_name = "%s.%s" % ( # i. e. "Model.m2m"
- clash_field.related_model._meta.object_name,
- clash_field.field.name)
- if not rel_is_hidden and clash_field.get_accessor_name() == rel_name:
- errors.append(
- checks.Error(
- "Reverse accessor for '%s' clashes with reverse accessor for '%s'." % (field_name, clash_name),
- hint=("Add or change a related_name argument "
- "to the definition for '%s' or '%s'.") % (field_name, clash_name),
- obj=self,
- id='fields.E304',
- )
- )
-
- if clash_field.get_accessor_name() == rel_query_name:
- errors.append(
- checks.Error(
- "Reverse query name for '%s' clashes with reverse query name for '%s'."
- % (field_name, clash_name),
- hint=("Add or change a related_name argument "
- "to the definition for '%s' or '%s'.") % (field_name, clash_name),
- obj=self,
- id='fields.E305',
- )
- )
-
- return errors
-
- def db_type(self, connection):
- # By default related field will not have a column as it relates to
- # columns from another table.
- return None
-
- def contribute_to_class(self, cls, name, private_only=False, **kwargs):
-
- super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
-
- self.opts = cls._meta
-
- if not cls._meta.abstract:
- if self.remote_field.related_name:
- related_name = self.remote_field.related_name
- else:
- related_name = self.opts.default_related_name
- if related_name:
- related_name = related_name % {
- 'class': cls.__name__.lower(),
- 'model_name': cls._meta.model_name.lower(),
- 'app_label': cls._meta.app_label.lower()
- }
- self.remote_field.related_name = related_name
-
- if self.remote_field.related_query_name:
- related_query_name = self.remote_field.related_query_name % {
- 'class': cls.__name__.lower(),
- 'app_label': cls._meta.app_label.lower(),
- }
- self.remote_field.related_query_name = related_query_name
-
- def resolve_related_class(model, related, field):
- field.remote_field.model = related
- field.do_related_class(related, model)
- lazy_related_operation(resolve_related_class, cls, self.remote_field.model, field=self)
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- if self.remote_field.limit_choices_to:
- kwargs['limit_choices_to'] = self.remote_field.limit_choices_to
- if self.remote_field.related_name is not None:
- kwargs['related_name'] = self.remote_field.related_name
- if self.remote_field.related_query_name is not None:
- kwargs['related_query_name'] = self.remote_field.related_query_name
- return name, path, args, kwargs
-
- def get_forward_related_filter(self, obj):
- """
- Return the keyword arguments that when supplied to
- self.model.object.filter(), would select all instances related through
- this field to the remote obj. This is used to build the querysets
- returned by related descriptors. obj is an instance of
- self.related_field.model.
- """
- return {
- '%s__%s' % (self.name, rh_field.name): getattr(obj, rh_field.attname)
- for _, rh_field in self.related_fields
- }
-
- def get_reverse_related_filter(self, obj):
- """
- Complement to get_forward_related_filter(). Return the keyword
- arguments that when passed to self.related_field.model.object.filter()
- select all instances of self.related_field.model related through
- this field to obj. obj is an instance of self.model.
- """
- base_filter = {
- rh_field.attname: getattr(obj, lh_field.attname)
- for lh_field, rh_field in self.related_fields
- }
- descriptor_filter = self.get_extra_descriptor_filter(obj)
- base_q = Q(**base_filter)
- if isinstance(descriptor_filter, dict):
- return base_q & Q(**descriptor_filter)
- elif descriptor_filter:
- return base_q & descriptor_filter
- return base_q
-
- @property
- def swappable_setting(self):
- """
- Get the setting that this is powered from for swapping, or None
- if it's not swapped in / marked with swappable=False.
- """
- if self.swappable:
- # Work out string form of "to"
- if isinstance(self.remote_field.model, str):
- to_string = self.remote_field.model
- else:
- to_string = self.remote_field.model._meta.label
- return apps.get_swappable_settings_name(to_string)
- return None
-
- def set_attributes_from_rel(self):
- self.name = (
- self.name or
- (self.remote_field.model._meta.model_name + '_' + self.remote_field.model._meta.pk.name)
- )
- if self.verbose_name is None:
- self.verbose_name = self.remote_field.model._meta.verbose_name
- self.remote_field.set_field_name()
-
- def do_related_class(self, other, cls):
- self.set_attributes_from_rel()
- self.contribute_to_related_class(other, self.remote_field)
-
- def get_limit_choices_to(self):
- """
- Return ``limit_choices_to`` for this model field.
-
- If it is a callable, it will be invoked and the result will be
- returned.
- """
- if callable(self.remote_field.limit_choices_to):
- return self.remote_field.limit_choices_to()
- return self.remote_field.limit_choices_to
-
- def formfield(self, **kwargs):
- """
- Pass ``limit_choices_to`` to the field being constructed.
-
- Only passes it if there is a type that supports related fields.
- This is a similar strategy used to pass the ``queryset`` to the field
- being constructed.
- """
- defaults = {}
- if hasattr(self.remote_field, 'get_related_field'):
- # If this is a callable, do not invoke it here. Just pass
- # it in the defaults for when the form class will later be
- # instantiated.
- limit_choices_to = self.remote_field.limit_choices_to
- defaults.update({
- 'limit_choices_to': limit_choices_to,
- })
- defaults.update(kwargs)
- return super().formfield(**defaults)
-
- def related_query_name(self):
- """
- Define the name that can be used to identify this related object in a
- table-spanning query.
- """
- return self.remote_field.related_query_name or self.remote_field.related_name or self.opts.model_name
-
- @property
- def target_field(self):
- """
- When filtering against this relation, return the field on the remote
- model against which the filtering should happen.
- """
- target_fields = self.get_path_info()[-1].target_fields
- if len(target_fields) > 1:
- raise exceptions.FieldError(
- "The relation has multiple target fields, but only single target field was asked for")
- return target_fields[0]
-
- def get_cache_name(self):
- return self.name
-
-
- class ForeignObject(RelatedField):
- """
- Abstraction of the ForeignKey relation to support multi-column relations.
- """
-
- # Field flags
- many_to_many = False
- many_to_one = True
- one_to_many = False
- one_to_one = False
-
- requires_unique_target = True
- related_accessor_class = ReverseManyToOneDescriptor
- forward_related_accessor_class = ForwardManyToOneDescriptor
- rel_class = ForeignObjectRel
-
- def __init__(self, to, on_delete, from_fields, to_fields, rel=None, related_name=None,
- related_query_name=None, limit_choices_to=None, parent_link=False,
- swappable=True, **kwargs):
-
- if rel is None:
- rel = self.rel_class(
- self, to,
- related_name=related_name,
- related_query_name=related_query_name,
- limit_choices_to=limit_choices_to,
- parent_link=parent_link,
- on_delete=on_delete,
- )
-
- super().__init__(rel=rel, **kwargs)
-
- self.from_fields = from_fields
- self.to_fields = to_fields
- self.swappable = swappable
-
- def check(self, **kwargs):
- return [
- *super().check(**kwargs),
- *self._check_to_fields_exist(),
- *self._check_unique_target(),
- ]
-
- def _check_to_fields_exist(self):
- # Skip nonexistent models.
- if isinstance(self.remote_field.model, str):
- return []
-
- errors = []
- for to_field in self.to_fields:
- if to_field:
- try:
- self.remote_field.model._meta.get_field(to_field)
- except exceptions.FieldDoesNotExist:
- errors.append(
- checks.Error(
- "The to_field '%s' doesn't exist on the related "
- "model '%s'."
- % (to_field, self.remote_field.model._meta.label),
- obj=self,
- id='fields.E312',
- )
- )
- return errors
-
- def _check_unique_target(self):
- rel_is_string = isinstance(self.remote_field.model, str)
- if rel_is_string or not self.requires_unique_target:
- return []
-
- try:
- self.foreign_related_fields
- except exceptions.FieldDoesNotExist:
- return []
-
- if not self.foreign_related_fields:
- return []
-
- unique_foreign_fields = {
- frozenset([f.name])
- for f in self.remote_field.model._meta.get_fields()
- if getattr(f, 'unique', False)
- }
- unique_foreign_fields.update({
- frozenset(ut)
- for ut in self.remote_field.model._meta.unique_together
- })
- foreign_fields = {f.name for f in self.foreign_related_fields}
- has_unique_constraint = any(u <= foreign_fields for u in unique_foreign_fields)
-
- if not has_unique_constraint and len(self.foreign_related_fields) > 1:
- field_combination = ', '.join(
- "'%s'" % rel_field.name for rel_field in self.foreign_related_fields
- )
- model_name = self.remote_field.model.__name__
- return [
- checks.Error(
- "No subset of the fields %s on model '%s' is unique."
- % (field_combination, model_name),
- hint=(
- "Add unique=True on any of those fields or add at "
- "least a subset of them to a unique_together constraint."
- ),
- obj=self,
- id='fields.E310',
- )
- ]
- elif not has_unique_constraint:
- field_name = self.foreign_related_fields[0].name
- model_name = self.remote_field.model.__name__
- return [
- checks.Error(
- "'%s.%s' must set unique=True because it is referenced by "
- "a foreign key." % (model_name, field_name),
- obj=self,
- id='fields.E311',
- )
- ]
- else:
- return []
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- kwargs['on_delete'] = self.remote_field.on_delete
- kwargs['from_fields'] = self.from_fields
- kwargs['to_fields'] = self.to_fields
-
- if self.remote_field.parent_link:
- kwargs['parent_link'] = self.remote_field.parent_link
- # Work out string form of "to"
- if isinstance(self.remote_field.model, str):
- kwargs['to'] = self.remote_field.model
- else:
- kwargs['to'] = "%s.%s" % (
- self.remote_field.model._meta.app_label,
- self.remote_field.model._meta.object_name,
- )
- # If swappable is True, then see if we're actually pointing to the target
- # of a swap.
- swappable_setting = self.swappable_setting
- if swappable_setting is not None:
- # If it's already a settings reference, error
- if hasattr(kwargs['to'], "setting_name"):
- if kwargs['to'].setting_name != swappable_setting:
- raise ValueError(
- "Cannot deconstruct a ForeignKey pointing to a model "
- "that is swapped in place of more than one model (%s and %s)"
- % (kwargs['to'].setting_name, swappable_setting)
- )
- # Set it
- kwargs['to'] = SettingsReference(
- kwargs['to'],
- swappable_setting,
- )
- return name, path, args, kwargs
-
- def resolve_related_fields(self):
- if not self.from_fields or len(self.from_fields) != len(self.to_fields):
- raise ValueError('Foreign Object from and to fields must be the same non-zero length')
- if isinstance(self.remote_field.model, str):
- raise ValueError('Related model %r cannot be resolved' % self.remote_field.model)
- related_fields = []
- for index in range(len(self.from_fields)):
- from_field_name = self.from_fields[index]
- to_field_name = self.to_fields[index]
- from_field = (self if from_field_name == 'self'
- else self.opts.get_field(from_field_name))
- to_field = (self.remote_field.model._meta.pk if to_field_name is None
- else self.remote_field.model._meta.get_field(to_field_name))
- related_fields.append((from_field, to_field))
- return related_fields
-
- @property
- def related_fields(self):
- if not hasattr(self, '_related_fields'):
- self._related_fields = self.resolve_related_fields()
- return self._related_fields
-
- @property
- def reverse_related_fields(self):
- return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
-
- @property
- def local_related_fields(self):
- return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
-
- @property
- def foreign_related_fields(self):
- return tuple(rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field)
-
- def get_local_related_value(self, instance):
- return self.get_instance_value_for_fields(instance, self.local_related_fields)
-
- def get_foreign_related_value(self, instance):
- return self.get_instance_value_for_fields(instance, self.foreign_related_fields)
-
- @staticmethod
- def get_instance_value_for_fields(instance, fields):
- ret = []
- opts = instance._meta
- for field in fields:
- # Gotcha: in some cases (like fixture loading) a model can have
- # different values in parent_ptr_id and parent's id. So, use
- # instance.pk (that is, parent_ptr_id) when asked for instance.id.
- if field.primary_key:
- possible_parent_link = opts.get_ancestor_link(field.model)
- if (not possible_parent_link or
- possible_parent_link.primary_key or
- possible_parent_link.model._meta.abstract):
- ret.append(instance.pk)
- continue
- ret.append(getattr(instance, field.attname))
- return tuple(ret)
-
- def get_attname_column(self):
- attname, column = super().get_attname_column()
- return attname, None
-
- def get_joining_columns(self, reverse_join=False):
- source = self.reverse_related_fields if reverse_join else self.related_fields
- return tuple((lhs_field.column, rhs_field.column) for lhs_field, rhs_field in source)
-
- def get_reverse_joining_columns(self):
- return self.get_joining_columns(reverse_join=True)
-
- def get_extra_descriptor_filter(self, instance):
- """
- Return an extra filter condition for related object fetching when
- user does 'instance.fieldname', that is the extra filter is used in
- the descriptor of the field.
-
- The filter should be either a dict usable in .filter(**kwargs) call or
- a Q-object. The condition will be ANDed together with the relation's
- joining columns.
-
- A parallel method is get_extra_restriction() which is used in
- JOIN and subquery conditions.
- """
- return {}
-
- def get_extra_restriction(self, where_class, alias, related_alias):
- """
- Return a pair condition used for joining and subquery pushdown. The
- condition is something that responds to as_sql(compiler, connection)
- method.
-
- Note that currently referring both the 'alias' and 'related_alias'
- will not work in some conditions, like subquery pushdown.
-
- A parallel method is get_extra_descriptor_filter() which is used in
- instance.fieldname related object fetching.
- """
- return None
-
- def get_path_info(self, filtered_relation=None):
- """Get path from this field to the related model."""
- opts = self.remote_field.model._meta
- from_opts = self.model._meta
- return [PathInfo(
- from_opts=from_opts,
- to_opts=opts,
- target_fields=self.foreign_related_fields,
- join_field=self,
- m2m=False,
- direct=True,
- filtered_relation=filtered_relation,
- )]
-
- def get_reverse_path_info(self, filtered_relation=None):
- """Get path from the related model to this field's model."""
- opts = self.model._meta
- from_opts = self.remote_field.model._meta
- return [PathInfo(
- from_opts=from_opts,
- to_opts=opts,
- target_fields=(opts.pk,),
- join_field=self.remote_field,
- m2m=not self.unique,
- direct=False,
- filtered_relation=filtered_relation,
- )]
-
- @classmethod
- @functools.lru_cache(maxsize=None)
- def get_lookups(cls):
- bases = inspect.getmro(cls)
- bases = bases[:bases.index(ForeignObject) + 1]
- class_lookups = [parent.__dict__.get('class_lookups', {}) for parent in bases]
- return cls.merge_dicts(class_lookups)
-
- def contribute_to_class(self, cls, name, private_only=False, **kwargs):
- super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
- setattr(cls, self.name, self.forward_related_accessor_class(self))
-
- def contribute_to_related_class(self, cls, related):
- # Internal FK's - i.e., those with a related name ending with '+' -
- # and swapped models don't get a related descriptor.
- if not self.remote_field.is_hidden() and not related.related_model._meta.swapped:
- setattr(cls._meta.concrete_model, related.get_accessor_name(), self.related_accessor_class(related))
- # While 'limit_choices_to' might be a callable, simply pass
- # it along for later - this is too early because it's still
- # model load time.
- if self.remote_field.limit_choices_to:
- cls._meta.related_fkey_lookups.append(self.remote_field.limit_choices_to)
-
-
- ForeignObject.register_lookup(RelatedIn)
- ForeignObject.register_lookup(RelatedExact)
- ForeignObject.register_lookup(RelatedLessThan)
- ForeignObject.register_lookup(RelatedGreaterThan)
- ForeignObject.register_lookup(RelatedGreaterThanOrEqual)
- ForeignObject.register_lookup(RelatedLessThanOrEqual)
- ForeignObject.register_lookup(RelatedIsNull)
-
-
- class ForeignKey(ForeignObject):
- """
- Provide a many-to-one relation by adding a column to the local model
- to hold the remote value.
-
- By default ForeignKey will target the pk of the remote model but this
- behavior can be changed by using the ``to_field`` argument.
- """
-
- # Field flags
- many_to_many = False
- many_to_one = True
- one_to_many = False
- one_to_one = False
-
- rel_class = ManyToOneRel
-
- empty_strings_allowed = False
- default_error_messages = {
- 'invalid': _('%(model)s instance with %(field)s %(value)r does not exist.')
- }
- description = _("Foreign Key (type determined by related field)")
-
- def __init__(self, to, on_delete, related_name=None, related_query_name=None,
- limit_choices_to=None, parent_link=False, to_field=None,
- db_constraint=True, **kwargs):
- try:
- to._meta.model_name
- except AttributeError:
- assert isinstance(to, str), (
- "%s(%r) is invalid. First parameter to ForeignKey must be "
- "either a model, a model name, or the string %r" % (
- self.__class__.__name__, to,
- RECURSIVE_RELATIONSHIP_CONSTANT,
- )
- )
- else:
- # For backwards compatibility purposes, we need to *try* and set
- # the to_field during FK construction. It won't be guaranteed to
- # be correct until contribute_to_class is called. Refs #12190.
- to_field = to_field or (to._meta.pk and to._meta.pk.name)
-
- kwargs['rel'] = self.rel_class(
- self, to, to_field,
- related_name=related_name,
- related_query_name=related_query_name,
- limit_choices_to=limit_choices_to,
- parent_link=parent_link,
- on_delete=on_delete,
- )
- kwargs.setdefault('db_index', True)
-
- super().__init__(to, on_delete, from_fields=['self'], to_fields=[to_field], **kwargs)
-
- self.db_constraint = db_constraint
-
- def check(self, **kwargs):
- return [
- *super().check(**kwargs),
- *self._check_on_delete(),
- *self._check_unique(),
- ]
-
- def _check_on_delete(self):
- on_delete = getattr(self.remote_field, 'on_delete', None)
- if on_delete == SET_NULL and not self.null:
- return [
- checks.Error(
- 'Field specifies on_delete=SET_NULL, but cannot be null.',
- hint='Set null=True argument on the field, or change the on_delete rule.',
- obj=self,
- id='fields.E320',
- )
- ]
- elif on_delete == SET_DEFAULT and not self.has_default():
- return [
- checks.Error(
- 'Field specifies on_delete=SET_DEFAULT, but has no default value.',
- hint='Set a default value, or change the on_delete rule.',
- obj=self,
- id='fields.E321',
- )
- ]
- else:
- return []
-
- def _check_unique(self, **kwargs):
- return [
- checks.Warning(
- 'Setting unique=True on a ForeignKey has the same effect as using a OneToOneField.',
- hint='ForeignKey(unique=True) is usually better served by a OneToOneField.',
- obj=self,
- id='fields.W342',
- )
- ] if self.unique else []
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- del kwargs['to_fields']
- del kwargs['from_fields']
- # Handle the simpler arguments
- if self.db_index:
- del kwargs['db_index']
- else:
- kwargs['db_index'] = False
- if self.db_constraint is not True:
- kwargs['db_constraint'] = self.db_constraint
- # Rel needs more work.
- to_meta = getattr(self.remote_field.model, "_meta", None)
- if self.remote_field.field_name and (
- not to_meta or (to_meta.pk and self.remote_field.field_name != to_meta.pk.name)):
- kwargs['to_field'] = self.remote_field.field_name
- return name, path, args, kwargs
-
- def to_python(self, value):
- return self.target_field.to_python(value)
-
- @property
- def target_field(self):
- return self.foreign_related_fields[0]
-
- def get_reverse_path_info(self, filtered_relation=None):
- """Get path from the related model to this field's model."""
- opts = self.model._meta
- from_opts = self.remote_field.model._meta
- return [PathInfo(
- from_opts=from_opts,
- to_opts=opts,
- target_fields=(opts.pk,),
- join_field=self.remote_field,
- m2m=not self.unique,
- direct=False,
- filtered_relation=filtered_relation,
- )]
-
- def validate(self, value, model_instance):
- if self.remote_field.parent_link:
- return
- super().validate(value, model_instance)
- if value is None:
- return
-
- using = router.db_for_read(self.remote_field.model, instance=model_instance)
- qs = self.remote_field.model._default_manager.using(using).filter(
- **{self.remote_field.field_name: value}
- )
- qs = qs.complex_filter(self.get_limit_choices_to())
- if not qs.exists():
- raise exceptions.ValidationError(
- self.error_messages['invalid'],
- code='invalid',
- params={
- 'model': self.remote_field.model._meta.verbose_name, 'pk': value,
- 'field': self.remote_field.field_name, 'value': value,
- }, # 'pk' is included for backwards compatibility
- )
-
- def get_attname(self):
- return '%s_id' % self.name
-
- def get_attname_column(self):
- attname = self.get_attname()
- column = self.db_column or attname
- return attname, column
-
- def get_default(self):
- """Return the to_field if the default value is an object."""
- field_default = super().get_default()
- if isinstance(field_default, self.remote_field.model):
- return getattr(field_default, self.target_field.attname)
- return field_default
-
- def get_db_prep_save(self, value, connection):
- if value is None or (value == '' and
- (not self.target_field.empty_strings_allowed or
- connection.features.interprets_empty_strings_as_nulls)):
- return None
- else:
- return self.target_field.get_db_prep_save(value, connection=connection)
-
- def get_db_prep_value(self, value, connection, prepared=False):
- return self.target_field.get_db_prep_value(value, connection, prepared)
-
- def contribute_to_related_class(self, cls, related):
- super().contribute_to_related_class(cls, related)
- if self.remote_field.field_name is None:
- self.remote_field.field_name = cls._meta.pk.name
-
- def formfield(self, *, using=None, **kwargs):
- if isinstance(self.remote_field.model, str):
- raise ValueError("Cannot create form field for %r yet, because "
- "its related model %r has not been loaded yet" %
- (self.name, self.remote_field.model))
- return super().formfield(**{
- 'form_class': forms.ModelChoiceField,
- 'queryset': self.remote_field.model._default_manager.using(using),
- 'to_field_name': self.remote_field.field_name,
- **kwargs,
- })
-
- def db_check(self, connection):
- return []
-
- def db_type(self, connection):
- return self.target_field.rel_db_type(connection=connection)
-
- def db_parameters(self, connection):
- return {"type": self.db_type(connection), "check": self.db_check(connection)}
-
- def convert_empty_strings(self, value, expression, connection):
- if (not value) and isinstance(value, str):
- return None
- return value
-
- def get_db_converters(self, connection):
- converters = super().get_db_converters(connection)
- if connection.features.interprets_empty_strings_as_nulls:
- converters += [self.convert_empty_strings]
- return converters
-
- def get_col(self, alias, output_field=None):
- if output_field is None:
- output_field = self.target_field
- while isinstance(output_field, ForeignKey):
- output_field = output_field.target_field
- if output_field is self:
- raise ValueError('Cannot resolve output_field.')
- return super().get_col(alias, output_field)
-
-
- class OneToOneField(ForeignKey):
- """
- A OneToOneField is essentially the same as a ForeignKey, with the exception
- that it always carries a "unique" constraint with it and the reverse
- relation always returns the object pointed to (since there will only ever
- be one), rather than returning a list.
- """
-
- # Field flags
- many_to_many = False
- many_to_one = False
- one_to_many = False
- one_to_one = True
-
- related_accessor_class = ReverseOneToOneDescriptor
- forward_related_accessor_class = ForwardOneToOneDescriptor
- rel_class = OneToOneRel
-
- description = _("One-to-one relationship")
-
- def __init__(self, to, on_delete, to_field=None, **kwargs):
- kwargs['unique'] = True
- super().__init__(to, on_delete, to_field=to_field, **kwargs)
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- if "unique" in kwargs:
- del kwargs['unique']
- return name, path, args, kwargs
-
- def formfield(self, **kwargs):
- if self.remote_field.parent_link:
- return None
- return super().formfield(**kwargs)
-
- def save_form_data(self, instance, data):
- if isinstance(data, self.remote_field.model):
- setattr(instance, self.name, data)
- else:
- setattr(instance, self.attname, data)
-
- def _check_unique(self, **kwargs):
- # Override ForeignKey since check isn't applicable here.
- return []
-
-
- def create_many_to_many_intermediary_model(field, klass):
- from django.db import models
-
- def set_managed(model, related, through):
- through._meta.managed = model._meta.managed or related._meta.managed
-
- to_model = resolve_relation(klass, field.remote_field.model)
- name = '%s_%s' % (klass._meta.object_name, field.name)
- lazy_related_operation(set_managed, klass, to_model, name)
-
- to = make_model_tuple(to_model)[1]
- from_ = klass._meta.model_name
- if to == from_:
- to = 'to_%s' % to
- from_ = 'from_%s' % from_
-
- meta = type('Meta', (), {
- 'db_table': field._get_m2m_db_table(klass._meta),
- 'auto_created': klass,
- 'app_label': klass._meta.app_label,
- 'db_tablespace': klass._meta.db_tablespace,
- 'unique_together': (from_, to),
- 'verbose_name': _('%(from)s-%(to)s relationship') % {'from': from_, 'to': to},
- 'verbose_name_plural': _('%(from)s-%(to)s relationships') % {'from': from_, 'to': to},
- 'apps': field.model._meta.apps,
- })
- # Construct and return the new class.
- return type(name, (models.Model,), {
- 'Meta': meta,
- '__module__': klass.__module__,
- from_: models.ForeignKey(
- klass,
- related_name='%s+' % name,
- db_tablespace=field.db_tablespace,
- db_constraint=field.remote_field.db_constraint,
- on_delete=CASCADE,
- ),
- to: models.ForeignKey(
- to_model,
- related_name='%s+' % name,
- db_tablespace=field.db_tablespace,
- db_constraint=field.remote_field.db_constraint,
- on_delete=CASCADE,
- )
- })
-
-
- class ManyToManyField(RelatedField):
- """
- Provide a many-to-many relation by using an intermediary model that
- holds two ForeignKey fields pointed at the two sides of the relation.
-
- Unless a ``through`` model was provided, ManyToManyField will use the
- create_many_to_many_intermediary_model factory to automatically generate
- the intermediary model.
- """
-
- # Field flags
- many_to_many = True
- many_to_one = False
- one_to_many = False
- one_to_one = False
-
- rel_class = ManyToManyRel
-
- description = _("Many-to-many relationship")
-
- def __init__(self, to, related_name=None, related_query_name=None,
- limit_choices_to=None, symmetrical=None, through=None,
- through_fields=None, db_constraint=True, db_table=None,
- swappable=True, **kwargs):
- try:
- to._meta
- except AttributeError:
- assert isinstance(to, str), (
- "%s(%r) is invalid. First parameter to ManyToManyField must be "
- "either a model, a model name, or the string %r" %
- (self.__class__.__name__, to, RECURSIVE_RELATIONSHIP_CONSTANT)
- )
-
- if symmetrical is None:
- symmetrical = (to == RECURSIVE_RELATIONSHIP_CONSTANT)
-
- if through is not None:
- assert db_table is None, (
- "Cannot specify a db_table if an intermediary model is used."
- )
-
- kwargs['rel'] = self.rel_class(
- self, to,
- related_name=related_name,
- related_query_name=related_query_name,
- limit_choices_to=limit_choices_to,
- symmetrical=symmetrical,
- through=through,
- through_fields=through_fields,
- db_constraint=db_constraint,
- )
- self.has_null_arg = 'null' in kwargs
-
- super().__init__(**kwargs)
-
- self.db_table = db_table
- self.swappable = swappable
-
- def check(self, **kwargs):
- return [
- *super().check(**kwargs),
- *self._check_unique(**kwargs),
- *self._check_relationship_model(**kwargs),
- *self._check_ignored_options(**kwargs),
- *self._check_table_uniqueness(**kwargs),
- ]
-
- def _check_unique(self, **kwargs):
- if self.unique:
- return [
- checks.Error(
- 'ManyToManyFields cannot be unique.',
- obj=self,
- id='fields.E330',
- )
- ]
- return []
-
- def _check_ignored_options(self, **kwargs):
- warnings = []
-
- if self.has_null_arg:
- warnings.append(
- checks.Warning(
- 'null has no effect on ManyToManyField.',
- obj=self,
- id='fields.W340',
- )
- )
-
- if self._validators:
- warnings.append(
- checks.Warning(
- 'ManyToManyField does not support validators.',
- obj=self,
- id='fields.W341',
- )
- )
- if (self.remote_field.limit_choices_to and self.remote_field.through and
- not self.remote_field.through._meta.auto_created):
- warnings.append(
- checks.Warning(
- 'limit_choices_to has no effect on ManyToManyField '
- 'with a through model.',
- obj=self,
- id='fields.W343',
- )
- )
-
- return warnings
-
- def _check_relationship_model(self, from_model=None, **kwargs):
- if hasattr(self.remote_field.through, '_meta'):
- qualified_model_name = "%s.%s" % (
- self.remote_field.through._meta.app_label, self.remote_field.through.__name__)
- else:
- qualified_model_name = self.remote_field.through
-
- errors = []
-
- if self.remote_field.through not in self.opts.apps.get_models(include_auto_created=True):
- # The relationship model is not installed.
- errors.append(
- checks.Error(
- "Field specifies a many-to-many relation through model "
- "'%s', which has not been installed." % qualified_model_name,
- obj=self,
- id='fields.E331',
- )
- )
-
- else:
- assert from_model is not None, (
- "ManyToManyField with intermediate "
- "tables cannot be checked if you don't pass the model "
- "where the field is attached to."
- )
- # Set some useful local variables
- to_model = resolve_relation(from_model, self.remote_field.model)
- from_model_name = from_model._meta.object_name
- if isinstance(to_model, str):
- to_model_name = to_model
- else:
- to_model_name = to_model._meta.object_name
- relationship_model_name = self.remote_field.through._meta.object_name
- self_referential = from_model == to_model
-
- # Check symmetrical attribute.
- if (self_referential and self.remote_field.symmetrical and
- not self.remote_field.through._meta.auto_created):
- errors.append(
- checks.Error(
- 'Many-to-many fields with intermediate tables must not be symmetrical.',
- obj=self,
- id='fields.E332',
- )
- )
-
- # Count foreign keys in intermediate model
- if self_referential:
- seen_self = sum(
- from_model == getattr(field.remote_field, 'model', None)
- for field in self.remote_field.through._meta.fields
- )
-
- if seen_self > 2 and not self.remote_field.through_fields:
- errors.append(
- checks.Error(
- "The model is used as an intermediate model by "
- "'%s', but it has more than two foreign keys "
- "to '%s', which is ambiguous. You must specify "
- "which two foreign keys Django should use via the "
- "through_fields keyword argument." % (self, from_model_name),
- hint="Use through_fields to specify which two foreign keys Django should use.",
- obj=self.remote_field.through,
- id='fields.E333',
- )
- )
-
- else:
- # Count foreign keys in relationship model
- seen_from = sum(
- from_model == getattr(field.remote_field, 'model', None)
- for field in self.remote_field.through._meta.fields
- )
- seen_to = sum(
- to_model == getattr(field.remote_field, 'model', None)
- for field in self.remote_field.through._meta.fields
- )
-
- if seen_from > 1 and not self.remote_field.through_fields:
- errors.append(
- checks.Error(
- ("The model is used as an intermediate model by "
- "'%s', but it has more than one foreign key "
- "from '%s', which is ambiguous. You must specify "
- "which foreign key Django should use via the "
- "through_fields keyword argument.") % (self, from_model_name),
- hint=(
- 'If you want to create a recursive relationship, '
- 'use ForeignKey("self", symmetrical=False, through="%s").'
- ) % relationship_model_name,
- obj=self,
- id='fields.E334',
- )
- )
-
- if seen_to > 1 and not self.remote_field.through_fields:
- errors.append(
- checks.Error(
- "The model is used as an intermediate model by "
- "'%s', but it has more than one foreign key "
- "to '%s', which is ambiguous. You must specify "
- "which foreign key Django should use via the "
- "through_fields keyword argument." % (self, to_model_name),
- hint=(
- 'If you want to create a recursive relationship, '
- 'use ForeignKey("self", symmetrical=False, through="%s").'
- ) % relationship_model_name,
- obj=self,
- id='fields.E335',
- )
- )
-
- if seen_from == 0 or seen_to == 0:
- errors.append(
- checks.Error(
- "The model is used as an intermediate model by "
- "'%s', but it does not have a foreign key to '%s' or '%s'." % (
- self, from_model_name, to_model_name
- ),
- obj=self.remote_field.through,
- id='fields.E336',
- )
- )
-
- # Validate `through_fields`.
- if self.remote_field.through_fields is not None:
- # Validate that we're given an iterable of at least two items
- # and that none of them is "falsy".
- if not (len(self.remote_field.through_fields) >= 2 and
- self.remote_field.through_fields[0] and self.remote_field.through_fields[1]):
- errors.append(
- checks.Error(
- "Field specifies 'through_fields' but does not provide "
- "the names of the two link fields that should be used "
- "for the relation through model '%s'." % qualified_model_name,
- hint="Make sure you specify 'through_fields' as through_fields=('field1', 'field2')",
- obj=self,
- id='fields.E337',
- )
- )
-
- # Validate the given through fields -- they should be actual
- # fields on the through model, and also be foreign keys to the
- # expected models.
- else:
- assert from_model is not None, (
- "ManyToManyField with intermediate "
- "tables cannot be checked if you don't pass the model "
- "where the field is attached to."
- )
-
- source, through, target = from_model, self.remote_field.through, self.remote_field.model
- source_field_name, target_field_name = self.remote_field.through_fields[:2]
-
- for field_name, related_model in ((source_field_name, source),
- (target_field_name, target)):
-
- possible_field_names = []
- for f in through._meta.fields:
- if hasattr(f, 'remote_field') and getattr(f.remote_field, 'model', None) == related_model:
- possible_field_names.append(f.name)
- if possible_field_names:
- hint = "Did you mean one of the following foreign keys to '%s': %s?" % (
- related_model._meta.object_name,
- ', '.join(possible_field_names),
- )
- else:
- hint = None
-
- try:
- field = through._meta.get_field(field_name)
- except exceptions.FieldDoesNotExist:
- errors.append(
- checks.Error(
- "The intermediary model '%s' has no field '%s'."
- % (qualified_model_name, field_name),
- hint=hint,
- obj=self,
- id='fields.E338',
- )
- )
- else:
- if not (hasattr(field, 'remote_field') and
- getattr(field.remote_field, 'model', None) == related_model):
- errors.append(
- checks.Error(
- "'%s.%s' is not a foreign key to '%s'." % (
- through._meta.object_name, field_name,
- related_model._meta.object_name,
- ),
- hint=hint,
- obj=self,
- id='fields.E339',
- )
- )
-
- return errors
-
- def _check_table_uniqueness(self, **kwargs):
- if isinstance(self.remote_field.through, str) or not self.remote_field.through._meta.managed:
- return []
- registered_tables = {
- model._meta.db_table: model
- for model in self.opts.apps.get_models(include_auto_created=True)
- if model != self.remote_field.through and model._meta.managed
- }
- m2m_db_table = self.m2m_db_table()
- model = registered_tables.get(m2m_db_table)
- # The second condition allows multiple m2m relations on a model if
- # some point to a through model that proxies another through model.
- if model and model._meta.concrete_model != self.remote_field.through._meta.concrete_model:
- if model._meta.auto_created:
- def _get_field_name(model):
- for field in model._meta.auto_created._meta.many_to_many:
- if field.remote_field.through is model:
- return field.name
- opts = model._meta.auto_created._meta
- clashing_obj = '%s.%s' % (opts.label, _get_field_name(model))
- else:
- clashing_obj = model._meta.label
- return [
- checks.Error(
- "The field's intermediary table '%s' clashes with the "
- "table name of '%s'." % (m2m_db_table, clashing_obj),
- obj=self,
- id='fields.E340',
- )
- ]
- return []
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- # Handle the simpler arguments.
- if self.db_table is not None:
- kwargs['db_table'] = self.db_table
- if self.remote_field.db_constraint is not True:
- kwargs['db_constraint'] = self.remote_field.db_constraint
- # Rel needs more work.
- if isinstance(self.remote_field.model, str):
- kwargs['to'] = self.remote_field.model
- else:
- kwargs['to'] = "%s.%s" % (
- self.remote_field.model._meta.app_label,
- self.remote_field.model._meta.object_name,
- )
- if getattr(self.remote_field, 'through', None) is not None:
- if isinstance(self.remote_field.through, str):
- kwargs['through'] = self.remote_field.through
- elif not self.remote_field.through._meta.auto_created:
- kwargs['through'] = "%s.%s" % (
- self.remote_field.through._meta.app_label,
- self.remote_field.through._meta.object_name,
- )
- # If swappable is True, then see if we're actually pointing to the target
- # of a swap.
- swappable_setting = self.swappable_setting
- if swappable_setting is not None:
- # If it's already a settings reference, error.
- if hasattr(kwargs['to'], "setting_name"):
- if kwargs['to'].setting_name != swappable_setting:
- raise ValueError(
- "Cannot deconstruct a ManyToManyField pointing to a "
- "model that is swapped in place of more than one model "
- "(%s and %s)" % (kwargs['to'].setting_name, swappable_setting)
- )
-
- kwargs['to'] = SettingsReference(
- kwargs['to'],
- swappable_setting,
- )
- return name, path, args, kwargs
-
- def _get_path_info(self, direct=False, filtered_relation=None):
- """Called by both direct and indirect m2m traversal."""
- int_model = self.remote_field.through
- linkfield1 = int_model._meta.get_field(self.m2m_field_name())
- linkfield2 = int_model._meta.get_field(self.m2m_reverse_field_name())
- if direct:
- join1infos = linkfield1.get_reverse_path_info()
- join2infos = linkfield2.get_path_info(filtered_relation)
- else:
- join1infos = linkfield2.get_reverse_path_info()
- join2infos = linkfield1.get_path_info(filtered_relation)
-
- # Get join infos between the last model of join 1 and the first model
- # of join 2. Assume the only reason these may differ is due to model
- # inheritance.
- join1_final = join1infos[-1].to_opts
- join2_initial = join2infos[0].from_opts
- if join1_final is join2_initial:
- intermediate_infos = []
- elif issubclass(join1_final.model, join2_initial.model):
- intermediate_infos = join1_final.get_path_to_parent(join2_initial.model)
- else:
- intermediate_infos = join2_initial.get_path_from_parent(join1_final.model)
-
- return [*join1infos, *intermediate_infos, *join2infos]
-
- def get_path_info(self, filtered_relation=None):
- return self._get_path_info(direct=True, filtered_relation=filtered_relation)
-
- def get_reverse_path_info(self, filtered_relation=None):
- return self._get_path_info(direct=False, filtered_relation=filtered_relation)
-
- def _get_m2m_db_table(self, opts):
- """
- Function that can be curried to provide the m2m table name for this
- relation.
- """
- if self.remote_field.through is not None:
- return self.remote_field.through._meta.db_table
- elif self.db_table:
- return self.db_table
- else:
- m2m_table_name = '%s_%s' % (utils.strip_quotes(opts.db_table), self.name)
- return utils.truncate_name(m2m_table_name, connection.ops.max_name_length())
-
- def _get_m2m_attr(self, related, attr):
- """
- Function that can be curried to provide the source accessor or DB
- column name for the m2m table.
- """
- cache_attr = '_m2m_%s_cache' % attr
- if hasattr(self, cache_attr):
- return getattr(self, cache_attr)
- if self.remote_field.through_fields is not None:
- link_field_name = self.remote_field.through_fields[0]
- else:
- link_field_name = None
- for f in self.remote_field.through._meta.fields:
- if (f.is_relation and f.remote_field.model == related.related_model and
- (link_field_name is None or link_field_name == f.name)):
- setattr(self, cache_attr, getattr(f, attr))
- return getattr(self, cache_attr)
-
- def _get_m2m_reverse_attr(self, related, attr):
- """
- Function that can be curried to provide the related accessor or DB
- column name for the m2m table.
- """
- cache_attr = '_m2m_reverse_%s_cache' % attr
- if hasattr(self, cache_attr):
- return getattr(self, cache_attr)
- found = False
- if self.remote_field.through_fields is not None:
- link_field_name = self.remote_field.through_fields[1]
- else:
- link_field_name = None
- for f in self.remote_field.through._meta.fields:
- if f.is_relation and f.remote_field.model == related.model:
- if link_field_name is None and related.related_model == related.model:
- # If this is an m2m-intermediate to self,
- # the first foreign key you find will be
- # the source column. Keep searching for
- # the second foreign key.
- if found:
- setattr(self, cache_attr, getattr(f, attr))
- break
- else:
- found = True
- elif link_field_name is None or link_field_name == f.name:
- setattr(self, cache_attr, getattr(f, attr))
- break
- return getattr(self, cache_attr)
-
- def contribute_to_class(self, cls, name, **kwargs):
- # To support multiple relations to self, it's useful to have a non-None
- # related name on symmetrical relations for internal reasons. The
- # concept doesn't make a lot of sense externally ("you want me to
- # specify *what* on my non-reversible relation?!"), so we set it up
- # automatically. The funky name reduces the chance of an accidental
- # clash.
- if self.remote_field.symmetrical and (
- self.remote_field.model == "self" or self.remote_field.model == cls._meta.object_name):
- self.remote_field.related_name = "%s_rel_+" % name
- elif self.remote_field.is_hidden():
- # If the backwards relation is disabled, replace the original
- # related_name with one generated from the m2m field name. Django
- # still uses backwards relations internally and we need to avoid
- # clashes between multiple m2m fields with related_name == '+'.
- self.remote_field.related_name = "_%s_%s_+" % (cls.__name__.lower(), name)
-
- super().contribute_to_class(cls, name, **kwargs)
-
- # The intermediate m2m model is not auto created if:
- # 1) There is a manually specified intermediate, or
- # 2) The class owning the m2m field is abstract.
- # 3) The class owning the m2m field has been swapped out.
- if not cls._meta.abstract:
- if self.remote_field.through:
- def resolve_through_model(_, model, field):
- field.remote_field.through = model
- lazy_related_operation(resolve_through_model, cls, self.remote_field.through, field=self)
- elif not cls._meta.swapped:
- self.remote_field.through = create_many_to_many_intermediary_model(self, cls)
-
- # Add the descriptor for the m2m relation.
- setattr(cls, self.name, ManyToManyDescriptor(self.remote_field, reverse=False))
-
- # Set up the accessor for the m2m table name for the relation.
- self.m2m_db_table = partial(self._get_m2m_db_table, cls._meta)
-
- def contribute_to_related_class(self, cls, related):
- # Internal M2Ms (i.e., those with a related name ending with '+')
- # and swapped models don't get a related descriptor.
- if not self.remote_field.is_hidden() and not related.related_model._meta.swapped:
- setattr(cls, related.get_accessor_name(), ManyToManyDescriptor(self.remote_field, reverse=True))
-
- # Set up the accessors for the column names on the m2m table.
- self.m2m_column_name = partial(self._get_m2m_attr, related, 'column')
- self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, 'column')
-
- self.m2m_field_name = partial(self._get_m2m_attr, related, 'name')
- self.m2m_reverse_field_name = partial(self._get_m2m_reverse_attr, related, 'name')
-
- get_m2m_rel = partial(self._get_m2m_attr, related, 'remote_field')
- self.m2m_target_field_name = lambda: get_m2m_rel().field_name
- get_m2m_reverse_rel = partial(self._get_m2m_reverse_attr, related, 'remote_field')
- self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
-
- def set_attributes_from_rel(self):
- pass
-
- def value_from_object(self, obj):
- return [] if obj.pk is None else list(getattr(obj, self.attname).all())
-
- def save_form_data(self, instance, data):
- getattr(instance, self.attname).set(data)
-
- def formfield(self, *, using=None, **kwargs):
- defaults = {
- 'form_class': forms.ModelMultipleChoiceField,
- 'queryset': self.remote_field.model._default_manager.using(using),
- **kwargs,
- }
- # If initial is passed in, it's a list of related objects, but the
- # MultipleChoiceField takes a list of IDs.
- if defaults.get('initial') is not None:
- initial = defaults['initial']
- if callable(initial):
- initial = initial()
- defaults['initial'] = [i.pk for i in initial]
- return super().formfield(**defaults)
-
- def db_check(self, connection):
- return None
-
- def db_type(self, connection):
- # A ManyToManyField is not represented by a single column,
- # so return None.
- return None
-
- def db_parameters(self, connection):
- return {"type": None, "check": None}
|