123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- import copy
- from decimal import Decimal
- from django.apps.registry import Apps
- from django.db import NotSupportedError
- from django.db.backends.base.schema import BaseDatabaseSchemaEditor
- from django.db.backends.ddl_references import Statement
- from django.db.backends.utils import strip_quotes
- from django.db.models import NOT_PROVIDED, UniqueConstraint
- class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
- sql_delete_table = "DROP TABLE %(table)s"
- sql_create_fk = None
- sql_create_inline_fk = (
- "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED"
- )
- sql_create_column_inline_fk = sql_create_inline_fk
- sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
- sql_create_unique = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)"
- sql_delete_unique = "DROP INDEX %(name)s"
- sql_alter_table_comment = None
- sql_alter_column_comment = None
- def __enter__(self):
- # Some SQLite schema alterations need foreign key constraints to be
- # disabled. Enforce it here for the duration of the schema edition.
- if not self.connection.disable_constraint_checking():
- raise NotSupportedError(
- "SQLite schema editor cannot be used while foreign key "
- "constraint checks are enabled. Make sure to disable them "
- "before entering a transaction.atomic() context because "
- "SQLite does not support disabling them in the middle of "
- "a multi-statement transaction."
- )
- return super().__enter__()
- def __exit__(self, exc_type, exc_value, traceback):
- self.connection.check_constraints()
- super().__exit__(exc_type, exc_value, traceback)
- self.connection.enable_constraint_checking()
- def quote_value(self, value):
- # The backend "mostly works" without this function and there are use
- # cases for compiling Python without the sqlite3 libraries (e.g.
- # security hardening).
- try:
- import sqlite3
- value = sqlite3.adapt(value)
- except ImportError:
- pass
- except sqlite3.ProgrammingError:
- pass
- # Manual emulation of SQLite parameter quoting
- if isinstance(value, bool):
- return str(int(value))
- elif isinstance(value, (Decimal, float, int)):
- return str(value)
- elif isinstance(value, str):
- return "'%s'" % value.replace("'", "''")
- elif value is None:
- return "NULL"
- elif isinstance(value, (bytes, bytearray, memoryview)):
- # Bytes are only allowed for BLOB fields, encoded as string
- # literals containing hexadecimal data and preceded by a single "X"
- # character.
- return "X'%s'" % value.hex()
- else:
- raise ValueError(
- "Cannot quote parameter value %r of type %s" % (value, type(value))
- )
- def prepare_default(self, value):
- return self.quote_value(value)
- def _remake_table(
- self, model, create_field=None, delete_field=None, alter_fields=None
- ):
- """
- Shortcut to transform a model from old_model into new_model
- This follows the correct procedure to perform non-rename or column
- addition operations based on SQLite's documentation
- https://www.sqlite.org/lang_altertable.html#caution
- The essential steps are:
- 1. Create a table with the updated definition called "new__app_model"
- 2. Copy the data from the existing "app_model" table to the new table
- 3. Drop the "app_model" table
- 4. Rename the "new__app_model" table to "app_model"
- 5. Restore any index of the previous "app_model" table.
- """
- # Self-referential fields must be recreated rather than copied from
- # the old model to ensure their remote_field.field_name doesn't refer
- # to an altered field.
- def is_self_referential(f):
- return f.is_relation and f.remote_field.model is model
- # Work out the new fields dict / mapping
- body = {
- f.name: f.clone() if is_self_referential(f) else f
- for f in model._meta.local_concrete_fields
- }
- # Since mapping might mix column names and default values,
- # its values must be already quoted.
- mapping = {
- f.column: self.quote_name(f.column)
- for f in model._meta.local_concrete_fields
- if f.generated is False
- }
- # This maps field names (not columns) for things like unique_together
- rename_mapping = {}
- # If any of the new or altered fields is introducing a new PK,
- # remove the old one
- restore_pk_field = None
- alter_fields = alter_fields or []
- if getattr(create_field, "primary_key", False) or any(
- getattr(new_field, "primary_key", False) for _, new_field in alter_fields
- ):
- for name, field in list(body.items()):
- if field.primary_key and not any(
- # Do not remove the old primary key when an altered field
- # that introduces a primary key is the same field.
- name == new_field.name
- for _, new_field in alter_fields
- ):
- field.primary_key = False
- restore_pk_field = field
- if field.auto_created:
- del body[name]
- del mapping[field.column]
- # Add in any created fields
- if create_field:
- body[create_field.name] = create_field
- # Choose a default and insert it into the copy map
- if (
- create_field.db_default is NOT_PROVIDED
- and not (create_field.many_to_many or create_field.generated)
- and create_field.concrete
- ):
- mapping[create_field.column] = self.prepare_default(
- self.effective_default(create_field)
- )
- # Add in any altered fields
- for alter_field in alter_fields:
- old_field, new_field = alter_field
- body.pop(old_field.name, None)
- mapping.pop(old_field.column, None)
- body[new_field.name] = new_field
- if old_field.null and not new_field.null:
- if new_field.db_default is NOT_PROVIDED:
- default = self.prepare_default(self.effective_default(new_field))
- else:
- default, _ = self.db_default_sql(new_field)
- case_sql = "coalesce(%(col)s, %(default)s)" % {
- "col": self.quote_name(old_field.column),
- "default": default,
- }
- mapping[new_field.column] = case_sql
- else:
- mapping[new_field.column] = self.quote_name(old_field.column)
- rename_mapping[old_field.name] = new_field.name
- # Remove any deleted fields
- if delete_field:
- del body[delete_field.name]
- mapping.pop(delete_field.column, None)
- # Remove any implicit M2M tables
- if (
- delete_field.many_to_many
- and delete_field.remote_field.through._meta.auto_created
- ):
- return self.delete_model(delete_field.remote_field.through)
- # Work inside a new app registry
- apps = Apps()
- # Work out the new value of unique_together, taking renames into
- # account
- unique_together = [
- [rename_mapping.get(n, n) for n in unique]
- for unique in model._meta.unique_together
- ]
- # RemovedInDjango51Warning.
- # Work out the new value for index_together, taking renames into
- # account
- index_together = [
- [rename_mapping.get(n, n) for n in index]
- for index in model._meta.index_together
- ]
- indexes = model._meta.indexes
- if delete_field:
- indexes = [
- index for index in indexes if delete_field.name not in index.fields
- ]
- constraints = list(model._meta.constraints)
- # Provide isolated instances of the fields to the new model body so
- # that the existing model's internals aren't interfered with when
- # the dummy model is constructed.
- body_copy = copy.deepcopy(body)
- # Construct a new model with the new fields to allow self referential
- # primary key to resolve to. This model won't ever be materialized as a
- # table and solely exists for foreign key reference resolution purposes.
- # This wouldn't be required if the schema editor was operating on model
- # states instead of rendered models.
- meta_contents = {
- "app_label": model._meta.app_label,
- "db_table": model._meta.db_table,
- "unique_together": unique_together,
- "index_together": index_together, # RemovedInDjango51Warning.
- "indexes": indexes,
- "constraints": constraints,
- "apps": apps,
- }
- meta = type("Meta", (), meta_contents)
- body_copy["Meta"] = meta
- body_copy["__module__"] = model.__module__
- type(model._meta.object_name, model.__bases__, body_copy)
- # Construct a model with a renamed table name.
- body_copy = copy.deepcopy(body)
- meta_contents = {
- "app_label": model._meta.app_label,
- "db_table": "new__%s" % strip_quotes(model._meta.db_table),
- "unique_together": unique_together,
- "index_together": index_together, # RemovedInDjango51Warning.
- "indexes": indexes,
- "constraints": constraints,
- "apps": apps,
- }
- meta = type("Meta", (), meta_contents)
- body_copy["Meta"] = meta
- body_copy["__module__"] = model.__module__
- new_model = type("New%s" % model._meta.object_name, model.__bases__, body_copy)
- # Create a new table with the updated schema.
- self.create_model(new_model)
- # Copy data from the old table into the new table
- self.execute(
- "INSERT INTO %s (%s) SELECT %s FROM %s"
- % (
- self.quote_name(new_model._meta.db_table),
- ", ".join(self.quote_name(x) for x in mapping),
- ", ".join(mapping.values()),
- self.quote_name(model._meta.db_table),
- )
- )
- # Delete the old table to make way for the new
- self.delete_model(model, handle_autom2m=False)
- # Rename the new table to take way for the old
- self.alter_db_table(
- new_model,
- new_model._meta.db_table,
- model._meta.db_table,
- )
- # Run deferred SQL on correct table
- for sql in self.deferred_sql:
- self.execute(sql)
- self.deferred_sql = []
- # Fix any PK-removed field
- if restore_pk_field:
- restore_pk_field.primary_key = True
- def delete_model(self, model, handle_autom2m=True):
- if handle_autom2m:
- super().delete_model(model)
- else:
- # Delete the table (and only that)
- self.execute(
- self.sql_delete_table
- % {
- "table": self.quote_name(model._meta.db_table),
- }
- )
- # Remove all deferred statements referencing the deleted table.
- for sql in list(self.deferred_sql):
- if isinstance(sql, Statement) and sql.references_table(
- model._meta.db_table
- ):
- self.deferred_sql.remove(sql)
- def add_field(self, model, field):
- """Create a field on a model."""
- from django.db.models.expressions import Value
- # Special-case implicit M2M tables.
- if field.many_to_many and field.remote_field.through._meta.auto_created:
- self.create_model(field.remote_field.through)
- elif (
- # Primary keys and unique fields are not supported in ALTER TABLE
- # ADD COLUMN.
- field.primary_key
- or field.unique
- or not field.null
- # Fields with default values cannot by handled by ALTER TABLE ADD
- # COLUMN statement because DROP DEFAULT is not supported in
- # ALTER TABLE.
- or self.effective_default(field) is not None
- # Fields with non-constant defaults cannot by handled by ALTER
- # TABLE ADD COLUMN statement.
- or (
- field.db_default is not NOT_PROVIDED
- and not isinstance(field.db_default, Value)
- )
- ):
- self._remake_table(model, create_field=field)
- else:
- super().add_field(model, field)
- def remove_field(self, model, field):
- """
- Remove a field from a model. Usually involves deleting a column,
- but for M2Ms may involve deleting a table.
- """
- # M2M fields are a special case
- if field.many_to_many:
- # For implicit M2M tables, delete the auto-created table
- if field.remote_field.through._meta.auto_created:
- self.delete_model(field.remote_field.through)
- # For explicit "through" M2M fields, do nothing
- elif (
- self.connection.features.can_alter_table_drop_column
- # Primary keys, unique fields, indexed fields, and foreign keys are
- # not supported in ALTER TABLE DROP COLUMN.
- and not field.primary_key
- and not field.unique
- and not field.db_index
- and not (field.remote_field and field.db_constraint)
- ):
- super().remove_field(model, field)
- # For everything else, remake.
- else:
- # It might not actually have a column behind it
- if field.db_parameters(connection=self.connection)["type"] is None:
- return
- self._remake_table(model, delete_field=field)
- def _alter_field(
- self,
- model,
- old_field,
- new_field,
- old_type,
- new_type,
- old_db_params,
- new_db_params,
- strict=False,
- ):
- """Perform a "physical" (non-ManyToMany) field update."""
- # Use "ALTER TABLE ... RENAME COLUMN" if only the column name
- # changed and there aren't any constraints.
- if (
- old_field.column != new_field.column
- and self.column_sql(model, old_field) == self.column_sql(model, new_field)
- and not (
- old_field.remote_field
- and old_field.db_constraint
- or new_field.remote_field
- and new_field.db_constraint
- )
- ):
- return self.execute(
- self._rename_field_sql(
- model._meta.db_table, old_field, new_field, new_type
- )
- )
- # Alter by remaking table
- self._remake_table(model, alter_fields=[(old_field, new_field)])
- # Rebuild tables with FKs pointing to this field.
- old_collation = old_db_params.get("collation")
- new_collation = new_db_params.get("collation")
- if new_field.unique and (
- old_type != new_type or old_collation != new_collation
- ):
- related_models = set()
- opts = new_field.model._meta
- for remote_field in opts.related_objects:
- # Ignore self-relationship since the table was already rebuilt.
- if remote_field.related_model == model:
- continue
- if not remote_field.many_to_many:
- if remote_field.field_name == new_field.name:
- related_models.add(remote_field.related_model)
- elif new_field.primary_key and remote_field.through._meta.auto_created:
- related_models.add(remote_field.through)
- if new_field.primary_key:
- for many_to_many in opts.many_to_many:
- # Ignore self-relationship since the table was already rebuilt.
- if many_to_many.related_model == model:
- continue
- if many_to_many.remote_field.through._meta.auto_created:
- related_models.add(many_to_many.remote_field.through)
- for related_model in related_models:
- self._remake_table(related_model)
- def _alter_many_to_many(self, model, old_field, new_field, strict):
- """Alter M2Ms to repoint their to= endpoints."""
- if (
- old_field.remote_field.through._meta.db_table
- == new_field.remote_field.through._meta.db_table
- ):
- # The field name didn't change, but some options did, so we have to
- # propagate this altering.
- self._remake_table(
- old_field.remote_field.through,
- alter_fields=[
- (
- # The field that points to the target model is needed,
- # so that table can be remade with the new m2m field -
- # this is m2m_reverse_field_name().
- old_field.remote_field.through._meta.get_field(
- old_field.m2m_reverse_field_name()
- ),
- new_field.remote_field.through._meta.get_field(
- new_field.m2m_reverse_field_name()
- ),
- ),
- (
- # The field that points to the model itself is needed,
- # so that table can be remade with the new self field -
- # this is m2m_field_name().
- old_field.remote_field.through._meta.get_field(
- old_field.m2m_field_name()
- ),
- new_field.remote_field.through._meta.get_field(
- new_field.m2m_field_name()
- ),
- ),
- ],
- )
- return
- # Make a new through table
- self.create_model(new_field.remote_field.through)
- # Copy the data across
- self.execute(
- "INSERT INTO %s (%s) SELECT %s FROM %s"
- % (
- self.quote_name(new_field.remote_field.through._meta.db_table),
- ", ".join(
- [
- "id",
- new_field.m2m_column_name(),
- new_field.m2m_reverse_name(),
- ]
- ),
- ", ".join(
- [
- "id",
- old_field.m2m_column_name(),
- old_field.m2m_reverse_name(),
- ]
- ),
- self.quote_name(old_field.remote_field.through._meta.db_table),
- )
- )
- # Delete the old through table
- self.delete_model(old_field.remote_field.through)
- def add_constraint(self, model, constraint):
- if isinstance(constraint, UniqueConstraint) and (
- constraint.condition
- or constraint.contains_expressions
- or constraint.include
- or constraint.deferrable
- ):
- super().add_constraint(model, constraint)
- else:
- self._remake_table(model)
- def remove_constraint(self, model, constraint):
- if isinstance(constraint, UniqueConstraint) and (
- constraint.condition
- or constraint.contains_expressions
- or constraint.include
- or constraint.deferrable
- ):
- super().remove_constraint(model, constraint)
- else:
- self._remake_table(model)
- def _collate_sql(self, collation):
- return "COLLATE " + collation
|