|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878 |
- from django.db import models
- from django.db.migrations.operations.base import Operation
- from django.db.migrations.state import ModelState
- from django.db.models.options import normalize_together
- from django.utils.functional import cached_property
-
- from .fields import (
- AddField, AlterField, FieldOperation, RemoveField, RenameField,
- )
- from .utils import ModelTuple, field_references_model
-
-
- def _check_for_duplicates(arg_name, objs):
- used_vals = set()
- for val in objs:
- if val in used_vals:
- raise ValueError(
- "Found duplicate value %s in CreateModel %s argument." % (val, arg_name)
- )
- used_vals.add(val)
-
-
- class ModelOperation(Operation):
- def __init__(self, name):
- self.name = name
-
- @cached_property
- def name_lower(self):
- return self.name.lower()
-
- def references_model(self, name, app_label=None):
- return name.lower() == self.name_lower
-
- def reduce(self, operation, app_label=None):
- return (
- super().reduce(operation, app_label=app_label) or
- not operation.references_model(self.name, app_label)
- )
-
-
- class CreateModel(ModelOperation):
- """Create a model's table."""
-
- serialization_expand_args = ['fields', 'options', 'managers']
-
- def __init__(self, name, fields, options=None, bases=None, managers=None):
- self.fields = fields
- self.options = options or {}
- self.bases = bases or (models.Model,)
- self.managers = managers or []
- super().__init__(name)
- # Sanity-check that there are no duplicated field names, bases, or
- # manager names
- _check_for_duplicates('fields', (name for name, _ in self.fields))
- _check_for_duplicates('bases', (
- base._meta.label_lower if hasattr(base, '_meta') else
- base.lower() if isinstance(base, str) else base
- for base in self.bases
- ))
- _check_for_duplicates('managers', (name for name, _ in self.managers))
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- 'fields': self.fields,
- }
- if self.options:
- kwargs['options'] = self.options
- if self.bases and self.bases != (models.Model,):
- kwargs['bases'] = self.bases
- if self.managers and self.managers != [('objects', models.Manager())]:
- kwargs['managers'] = self.managers
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- state.add_model(ModelState(
- app_label,
- self.name,
- list(self.fields),
- dict(self.options),
- tuple(self.bases),
- list(self.managers),
- ))
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.create_model(model)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = from_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.delete_model(model)
-
- def describe(self):
- return "Create %smodel %s" % ("proxy " if self.options.get("proxy", False) else "", self.name)
-
- def references_model(self, name, app_label=None):
- name_lower = name.lower()
- if name_lower == self.name_lower:
- return True
-
- # Check we didn't inherit from the model
- model_tuple = ModelTuple(app_label, name_lower)
- for base in self.bases:
- if (base is not models.Model and isinstance(base, (models.base.ModelBase, str)) and
- ModelTuple.from_model(base) == model_tuple):
- return True
-
- # Check we have no FKs/M2Ms with it
- for _name, field in self.fields:
- if field_references_model(field, model_tuple):
- return True
- return False
-
- def reduce(self, operation, app_label=None):
- if (isinstance(operation, DeleteModel) and
- self.name_lower == operation.name_lower and
- not self.options.get("proxy", False)):
- return []
- elif isinstance(operation, RenameModel) and self.name_lower == operation.old_name_lower:
- return [
- CreateModel(
- operation.new_name,
- fields=self.fields,
- options=self.options,
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, AlterModelOptions) and self.name_lower == operation.name_lower:
- return [
- CreateModel(
- self.name,
- fields=self.fields,
- options={**self.options, **operation.options},
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, AlterTogetherOptionOperation) and self.name_lower == operation.name_lower:
- return [
- CreateModel(
- self.name,
- fields=self.fields,
- options={**self.options, **{operation.option_name: operation.option_value}},
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, AlterOrderWithRespectTo) and self.name_lower == operation.name_lower:
- return [
- CreateModel(
- self.name,
- fields=self.fields,
- options={**self.options, 'order_with_respect_to': operation.order_with_respect_to},
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, FieldOperation) and self.name_lower == operation.model_name_lower:
- if isinstance(operation, AddField):
- return [
- CreateModel(
- self.name,
- fields=self.fields + [(operation.name, operation.field)],
- options=self.options,
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, AlterField):
- return [
- CreateModel(
- self.name,
- fields=[
- (n, operation.field if n == operation.name else v)
- for n, v in self.fields
- ],
- options=self.options,
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, RemoveField):
- options = self.options.copy()
- for option_name in ('unique_together', 'index_together'):
- option = options.pop(option_name, None)
- if option:
- option = set(filter(bool, (
- tuple(f for f in fields if f != operation.name_lower) for fields in option
- )))
- if option:
- options[option_name] = option
- order_with_respect_to = options.get('order_with_respect_to')
- if order_with_respect_to == operation.name_lower:
- del options['order_with_respect_to']
- return [
- CreateModel(
- self.name,
- fields=[
- (n, v)
- for n, v in self.fields
- if n.lower() != operation.name_lower
- ],
- options=options,
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- elif isinstance(operation, RenameField):
- options = self.options.copy()
- for option_name in ('unique_together', 'index_together'):
- option = options.get(option_name)
- if option:
- options[option_name] = {
- tuple(operation.new_name if f == operation.old_name else f for f in fields)
- for fields in option
- }
- order_with_respect_to = options.get('order_with_respect_to')
- if order_with_respect_to == operation.old_name:
- options['order_with_respect_to'] = operation.new_name
- return [
- CreateModel(
- self.name,
- fields=[
- (operation.new_name if n == operation.old_name else n, v)
- for n, v in self.fields
- ],
- options=options,
- bases=self.bases,
- managers=self.managers,
- ),
- ]
- return super().reduce(operation, app_label=app_label)
-
-
- class DeleteModel(ModelOperation):
- """Drop a model's table."""
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- state.remove_model(app_label, self.name_lower)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = from_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.delete_model(model)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.create_model(model)
-
- def references_model(self, name, app_label=None):
- # The deleted model could be referencing the specified model through
- # related fields.
- return True
-
- def describe(self):
- return "Delete model %s" % self.name
-
-
- class RenameModel(ModelOperation):
- """Rename a model."""
-
- def __init__(self, old_name, new_name):
- self.old_name = old_name
- self.new_name = new_name
- super().__init__(old_name)
-
- @cached_property
- def old_name_lower(self):
- return self.old_name.lower()
-
- @cached_property
- def new_name_lower(self):
- return self.new_name.lower()
-
- def deconstruct(self):
- kwargs = {
- 'old_name': self.old_name,
- 'new_name': self.new_name,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- # Add a new model.
- renamed_model = state.models[app_label, self.old_name_lower].clone()
- renamed_model.name = self.new_name
- state.models[app_label, self.new_name_lower] = renamed_model
- # Repoint all fields pointing to the old model to the new one.
- old_model_tuple = ModelTuple(app_label, self.old_name_lower)
- new_remote_model = '%s.%s' % (app_label, self.new_name)
- to_reload = []
- for (model_app_label, model_name), model_state in state.models.items():
- model_changed = False
- for index, (name, field) in enumerate(model_state.fields):
- changed_field = None
- remote_field = field.remote_field
- if remote_field:
- remote_model_tuple = ModelTuple.from_model(
- remote_field.model, model_app_label, model_name
- )
- if remote_model_tuple == old_model_tuple:
- changed_field = field.clone()
- changed_field.remote_field.model = new_remote_model
- through_model = getattr(remote_field, 'through', None)
- if through_model:
- through_model_tuple = ModelTuple.from_model(
- through_model, model_app_label, model_name
- )
- if through_model_tuple == old_model_tuple:
- if changed_field is None:
- changed_field = field.clone()
- changed_field.remote_field.through = new_remote_model
- if changed_field:
- model_state.fields[index] = name, changed_field
- model_changed = True
- if model_changed:
- to_reload.append((model_app_label, model_name))
- # Reload models related to old model before removing the old model.
- state.reload_models(to_reload, delay=True)
- # Remove the old model.
- state.remove_model(app_label, self.old_name_lower)
- state.reload_model(app_label, self.new_name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- new_model = to_state.apps.get_model(app_label, self.new_name)
- if self.allow_migrate_model(schema_editor.connection.alias, new_model):
- old_model = from_state.apps.get_model(app_label, self.old_name)
- # Move the main table
- schema_editor.alter_db_table(
- new_model,
- old_model._meta.db_table,
- new_model._meta.db_table,
- )
- # Alter the fields pointing to us
- for related_object in old_model._meta.related_objects:
- if related_object.related_model == old_model:
- model = new_model
- related_key = (app_label, self.new_name_lower)
- else:
- model = related_object.related_model
- related_key = (
- related_object.related_model._meta.app_label,
- related_object.related_model._meta.model_name,
- )
- to_field = to_state.apps.get_model(
- *related_key
- )._meta.get_field(related_object.field.name)
- schema_editor.alter_field(
- model,
- related_object.field,
- to_field,
- )
- # Rename M2M fields whose name is based on this model's name.
- fields = zip(old_model._meta.local_many_to_many, new_model._meta.local_many_to_many)
- for (old_field, new_field) in fields:
- # Skip self-referential fields as these are renamed above.
- if new_field.model == new_field.related_model or not new_field.remote_field.through._meta.auto_created:
- continue
- # Rename the M2M table that's based on this model's name.
- old_m2m_model = old_field.remote_field.through
- new_m2m_model = new_field.remote_field.through
- schema_editor.alter_db_table(
- new_m2m_model,
- old_m2m_model._meta.db_table,
- new_m2m_model._meta.db_table,
- )
- # Rename the column in the M2M table that's based on this
- # model's name.
- schema_editor.alter_field(
- new_m2m_model,
- old_m2m_model._meta.get_field(old_model._meta.model_name),
- new_m2m_model._meta.get_field(new_model._meta.model_name),
- )
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- self.new_name_lower, self.old_name_lower = self.old_name_lower, self.new_name_lower
- self.new_name, self.old_name = self.old_name, self.new_name
-
- self.database_forwards(app_label, schema_editor, from_state, to_state)
-
- self.new_name_lower, self.old_name_lower = self.old_name_lower, self.new_name_lower
- self.new_name, self.old_name = self.old_name, self.new_name
-
- def references_model(self, name, app_label=None):
- return (
- name.lower() == self.old_name_lower or
- name.lower() == self.new_name_lower
- )
-
- def describe(self):
- return "Rename model %s to %s" % (self.old_name, self.new_name)
-
- def reduce(self, operation, app_label=None):
- if (isinstance(operation, RenameModel) and
- self.new_name_lower == operation.old_name_lower):
- return [
- RenameModel(
- self.old_name,
- operation.new_name,
- ),
- ]
- # Skip `ModelOperation.reduce` as we want to run `references_model`
- # against self.new_name.
- return (
- super(ModelOperation, self).reduce(operation, app_label=app_label) or
- not operation.references_model(self.new_name, app_label)
- )
-
-
- class AlterModelTable(ModelOperation):
- """Rename a model's table."""
-
- def __init__(self, name, table):
- self.table = table
- super().__init__(name)
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- 'table': self.table,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- state.models[app_label, self.name_lower].options["db_table"] = self.table
- state.reload_model(app_label, self.name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- new_model = to_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, new_model):
- old_model = from_state.apps.get_model(app_label, self.name)
- schema_editor.alter_db_table(
- new_model,
- old_model._meta.db_table,
- new_model._meta.db_table,
- )
- # Rename M2M fields whose name is based on this model's db_table
- for (old_field, new_field) in zip(old_model._meta.local_many_to_many, new_model._meta.local_many_to_many):
- if new_field.remote_field.through._meta.auto_created:
- schema_editor.alter_db_table(
- new_field.remote_field.through,
- old_field.remote_field.through._meta.db_table,
- new_field.remote_field.through._meta.db_table,
- )
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- return self.database_forwards(app_label, schema_editor, from_state, to_state)
-
- def describe(self):
- return "Rename table for %s to %s" % (
- self.name,
- self.table if self.table is not None else "(default)"
- )
-
- def reduce(self, operation, app_label=None):
- if isinstance(operation, (AlterModelTable, DeleteModel)) and self.name_lower == operation.name_lower:
- return [operation]
- return super().reduce(operation, app_label=app_label)
-
-
- class ModelOptionOperation(ModelOperation):
- def reduce(self, operation, app_label=None):
- if isinstance(operation, (self.__class__, DeleteModel)) and self.name_lower == operation.name_lower:
- return [operation]
- return super().reduce(operation, app_label=app_label)
-
-
- class AlterTogetherOptionOperation(ModelOptionOperation):
- option_name = None
-
- def __init__(self, name, option_value):
- if option_value:
- option_value = set(normalize_together(option_value))
- setattr(self, self.option_name, option_value)
- super().__init__(name)
-
- @cached_property
- def option_value(self):
- return getattr(self, self.option_name)
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- self.option_name: self.option_value,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.name_lower]
- model_state.options[self.option_name] = self.option_value
- state.reload_model(app_label, self.name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- new_model = to_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, new_model):
- old_model = from_state.apps.get_model(app_label, self.name)
- alter_together = getattr(schema_editor, 'alter_%s' % self.option_name)
- alter_together(
- new_model,
- getattr(old_model._meta, self.option_name, set()),
- getattr(new_model._meta, self.option_name, set()),
- )
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- return self.database_forwards(app_label, schema_editor, from_state, to_state)
-
- def references_field(self, model_name, name, app_label=None):
- return (
- self.references_model(model_name, app_label) and
- (
- not self.option_value or
- any((name in fields) for fields in self.option_value)
- )
- )
-
- def describe(self):
- return "Alter %s for %s (%s constraint(s))" % (self.option_name, self.name, len(self.option_value or ''))
-
-
- class AlterUniqueTogether(AlterTogetherOptionOperation):
- """
- Change the value of unique_together to the target one.
- Input value of unique_together must be a set of tuples.
- """
- option_name = 'unique_together'
-
- def __init__(self, name, unique_together):
- super().__init__(name, unique_together)
-
-
- class AlterIndexTogether(AlterTogetherOptionOperation):
- """
- Change the value of index_together to the target one.
- Input value of index_together must be a set of tuples.
- """
- option_name = "index_together"
-
- def __init__(self, name, index_together):
- super().__init__(name, index_together)
-
-
- class AlterOrderWithRespectTo(ModelOptionOperation):
- """Represent a change with the order_with_respect_to option."""
-
- option_name = 'order_with_respect_to'
-
- def __init__(self, name, order_with_respect_to):
- self.order_with_respect_to = order_with_respect_to
- super().__init__(name)
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- 'order_with_respect_to': self.order_with_respect_to,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.name_lower]
- model_state.options['order_with_respect_to'] = self.order_with_respect_to
- state.reload_model(app_label, self.name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- to_model = to_state.apps.get_model(app_label, self.name)
- if self.allow_migrate_model(schema_editor.connection.alias, to_model):
- from_model = from_state.apps.get_model(app_label, self.name)
- # Remove a field if we need to
- if from_model._meta.order_with_respect_to and not to_model._meta.order_with_respect_to:
- schema_editor.remove_field(from_model, from_model._meta.get_field("_order"))
- # Add a field if we need to (altering the column is untouched as
- # it's likely a rename)
- elif to_model._meta.order_with_respect_to and not from_model._meta.order_with_respect_to:
- field = to_model._meta.get_field("_order")
- if not field.has_default():
- field.default = 0
- schema_editor.add_field(
- from_model,
- field,
- )
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- self.database_forwards(app_label, schema_editor, from_state, to_state)
-
- def references_field(self, model_name, name, app_label=None):
- return (
- self.references_model(model_name, app_label) and
- (
- self.order_with_respect_to is None or
- name == self.order_with_respect_to
- )
- )
-
- def describe(self):
- return "Set order_with_respect_to on %s to %s" % (self.name, self.order_with_respect_to)
-
-
- class AlterModelOptions(ModelOptionOperation):
- """
- Set new model options that don't directly affect the database schema
- (like verbose_name, permissions, ordering). Python code in migrations
- may still need them.
- """
-
- # Model options we want to compare and preserve in an AlterModelOptions op
- ALTER_OPTION_KEYS = [
- "base_manager_name",
- "default_manager_name",
- "default_related_name",
- "get_latest_by",
- "managed",
- "ordering",
- "permissions",
- "default_permissions",
- "select_on_save",
- "verbose_name",
- "verbose_name_plural",
- ]
-
- def __init__(self, name, options):
- self.options = options
- super().__init__(name)
-
- def deconstruct(self):
- kwargs = {
- 'name': self.name,
- 'options': self.options,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs
- )
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.name_lower]
- model_state.options = {**model_state.options, **self.options}
- for key in self.ALTER_OPTION_KEYS:
- if key not in self.options:
- model_state.options.pop(key, False)
- state.reload_model(app_label, self.name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- pass
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- pass
-
- def describe(self):
- return "Change Meta options on %s" % self.name
-
-
- class AlterModelManagers(ModelOptionOperation):
- """Alter the model's managers."""
-
- serialization_expand_args = ['managers']
-
- def __init__(self, name, managers):
- self.managers = managers
- super().__init__(name)
-
- def deconstruct(self):
- return (
- self.__class__.__qualname__,
- [self.name, self.managers],
- {}
- )
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.name_lower]
- model_state.managers = list(self.managers)
- state.reload_model(app_label, self.name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- pass
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- pass
-
- def describe(self):
- return "Change managers on %s" % self.name
-
-
- class IndexOperation(Operation):
- option_name = 'indexes'
-
- @cached_property
- def model_name_lower(self):
- return self.model_name.lower()
-
-
- class AddIndex(IndexOperation):
- """Add an index on a model."""
-
- def __init__(self, model_name, index):
- self.model_name = model_name
- if not index.name:
- raise ValueError(
- "Indexes passed to AddIndex operations require a name "
- "argument. %r doesn't have one." % index
- )
- self.index = index
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.model_name_lower]
- model_state.options[self.option_name] = [*model_state.options[self.option_name], self.index.clone()]
- state.reload_model(app_label, self.model_name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.add_index(model, self.index)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = from_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.remove_index(model, self.index)
-
- def deconstruct(self):
- kwargs = {
- 'model_name': self.model_name,
- 'index': self.index,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs,
- )
-
- def describe(self):
- return 'Create index %s on field(s) %s of model %s' % (
- self.index.name,
- ', '.join(self.index.fields),
- self.model_name,
- )
-
-
- class RemoveIndex(IndexOperation):
- """Remove an index from a model."""
-
- def __init__(self, model_name, name):
- self.model_name = model_name
- self.name = name
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.model_name_lower]
- indexes = model_state.options[self.option_name]
- model_state.options[self.option_name] = [idx for idx in indexes if idx.name != self.name]
- state.reload_model(app_label, self.model_name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = from_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- from_model_state = from_state.models[app_label, self.model_name_lower]
- index = from_model_state.get_index_by_name(self.name)
- schema_editor.remove_index(model, index)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- to_model_state = to_state.models[app_label, self.model_name_lower]
- index = to_model_state.get_index_by_name(self.name)
- schema_editor.add_index(model, index)
-
- def deconstruct(self):
- kwargs = {
- 'model_name': self.model_name,
- 'name': self.name,
- }
- return (
- self.__class__.__qualname__,
- [],
- kwargs,
- )
-
- def describe(self):
- return 'Remove index %s from %s' % (self.name, self.model_name)
-
-
- class AddConstraint(IndexOperation):
- option_name = 'constraints'
-
- def __init__(self, model_name, constraint):
- self.model_name = model_name
- self.constraint = constraint
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.model_name_lower]
- model_state.options[self.option_name] = [*model_state.options[self.option_name], self.constraint]
- state.reload_model(app_label, self.model_name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.add_constraint(model, self.constraint)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- schema_editor.remove_constraint(model, self.constraint)
-
- def deconstruct(self):
- return self.__class__.__name__, [], {
- 'model_name': self.model_name,
- 'constraint': self.constraint,
- }
-
- def describe(self):
- return 'Create constraint %s on model %s' % (self.constraint.name, self.model_name)
-
-
- class RemoveConstraint(IndexOperation):
- option_name = 'constraints'
-
- def __init__(self, model_name, name):
- self.model_name = model_name
- self.name = name
-
- def state_forwards(self, app_label, state):
- model_state = state.models[app_label, self.model_name_lower]
- constraints = model_state.options[self.option_name]
- model_state.options[self.option_name] = [c for c in constraints if c.name != self.name]
- state.reload_model(app_label, self.model_name_lower, delay=True)
-
- def database_forwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- from_model_state = from_state.models[app_label, self.model_name_lower]
- constraint = from_model_state.get_constraint_by_name(self.name)
- schema_editor.remove_constraint(model, constraint)
-
- def database_backwards(self, app_label, schema_editor, from_state, to_state):
- model = to_state.apps.get_model(app_label, self.model_name)
- if self.allow_migrate_model(schema_editor.connection.alias, model):
- to_model_state = to_state.models[app_label, self.model_name_lower]
- constraint = to_model_state.get_constraint_by_name(self.name)
- schema_editor.add_constraint(model, constraint)
-
- def deconstruct(self):
- return self.__class__.__name__, [], {
- 'model_name': self.model_name,
- 'name': self.name,
- }
-
- def describe(self):
- return 'Remove constraint %s from model %s' % (self.name, self.model_name)
|