123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- from django.db.backends.base.schema import BaseDatabaseSchemaEditor
- from django.db.models import NOT_PROVIDED, F, UniqueConstraint
- from django.db.models.constants import LOOKUP_SEP
- class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
- sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
- sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
- sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
- sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
- sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
- # No 'CASCADE' which works as a no-op in MySQL but is undocumented
- sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
- sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
- sql_create_column_inline_fk = (
- ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
- "REFERENCES %(to_table)s(%(to_column)s)"
- )
- sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
- sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
- sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
- sql_create_pk = (
- "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
- )
- sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
- sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
- sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
- sql_alter_column_comment = None
- @property
- def sql_delete_check(self):
- if self.connection.mysql_is_mariadb:
- # The name of the column check constraint is the same as the field
- # name on MariaDB. Adding IF EXISTS clause prevents migrations
- # crash. Constraint is removed during a "MODIFY" column statement.
- return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
- return "ALTER TABLE %(table)s DROP CHECK %(name)s"
- @property
- def sql_rename_column(self):
- is_mariadb = self.connection.mysql_is_mariadb
- if is_mariadb and self.connection.mysql_version < (10, 5, 2):
- # MariaDB < 10.5.2 doesn't support an
- # "ALTER TABLE ... RENAME COLUMN" statement.
- return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
- return super().sql_rename_column
- def quote_value(self, value):
- self.connection.ensure_connection()
- # MySQLdb escapes to string, PyMySQL to bytes.
- quoted = self.connection.connection.escape(
- value, self.connection.connection.encoders
- )
- if isinstance(value, str) and isinstance(quoted, bytes):
- quoted = quoted.decode()
- return quoted
- def _is_limited_data_type(self, field):
- db_type = field.db_type(self.connection)
- return (
- db_type is not None
- and db_type.lower() in self.connection._limited_data_types
- )
- def skip_default(self, field):
- if not self._supports_limited_data_type_defaults:
- return self._is_limited_data_type(field)
- return False
- def skip_default_on_alter(self, field):
- if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
- # MySQL doesn't support defaults for BLOB and TEXT in the
- # ALTER COLUMN statement.
- return True
- return False
- @property
- def _supports_limited_data_type_defaults(self):
- # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
- if self.connection.mysql_is_mariadb:
- return True
- return self.connection.mysql_version >= (8, 0, 13)
- def _column_default_sql(self, field):
- if (
- not self.connection.mysql_is_mariadb
- and self._supports_limited_data_type_defaults
- and self._is_limited_data_type(field)
- ):
- # MySQL supports defaults for BLOB and TEXT columns only if the
- # default value is written as an expression i.e. in parentheses.
- return "(%s)"
- return super()._column_default_sql(field)
- def add_field(self, model, field):
- super().add_field(model, field)
- # Simulate the effect of a one-off default.
- # field.default may be unhashable, so a set isn't used for "in" check.
- if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
- effective_default = self.effective_default(field)
- self.execute(
- "UPDATE %(table)s SET %(column)s = %%s"
- % {
- "table": self.quote_name(model._meta.db_table),
- "column": self.quote_name(field.column),
- },
- [effective_default],
- )
- def remove_constraint(self, model, constraint):
- if (
- isinstance(constraint, UniqueConstraint)
- and constraint.create_sql(model, self) is not None
- ):
- self._create_missing_fk_index(
- model,
- fields=constraint.fields,
- expressions=constraint.expressions,
- )
- super().remove_constraint(model, constraint)
- def remove_index(self, model, index):
- self._create_missing_fk_index(
- model,
- fields=[field_name for field_name, _ in index.fields_orders],
- expressions=index.expressions,
- )
- super().remove_index(model, index)
- def _field_should_be_indexed(self, model, field):
- if not super()._field_should_be_indexed(model, field):
- return False
- storage = self.connection.introspection.get_storage_engine(
- self.connection.cursor(), model._meta.db_table
- )
- # No need to create an index for ForeignKey fields except if
- # db_constraint=False because the index from that constraint won't be
- # created.
- if (
- storage == "InnoDB"
- and field.get_internal_type() == "ForeignKey"
- and field.db_constraint
- ):
- return False
- return not self._is_limited_data_type(field)
- def _create_missing_fk_index(
- self,
- model,
- *,
- fields,
- expressions=None,
- ):
- """
- MySQL can remove an implicit FK index on a field when that field is
- covered by another index like a unique_together. "covered" here means
- that the more complex index has the FK field as its first field (see
- https://bugs.mysql.com/bug.php?id=37910).
- Manually create an implicit FK index to make it possible to remove the
- composed index.
- """
- first_field_name = None
- if fields:
- first_field_name = fields[0]
- elif (
- expressions
- and self.connection.features.supports_expression_indexes
- and isinstance(expressions[0], F)
- and LOOKUP_SEP not in expressions[0].name
- ):
- first_field_name = expressions[0].name
- if not first_field_name:
- return
- first_field = model._meta.get_field(first_field_name)
- if first_field.get_internal_type() == "ForeignKey":
- column = self.connection.introspection.identifier_converter(
- first_field.column
- )
- with self.connection.cursor() as cursor:
- constraint_names = [
- name
- for name, infodict in self.connection.introspection.get_constraints(
- cursor, model._meta.db_table
- ).items()
- if infodict["index"] and infodict["columns"][0] == column
- ]
- # There are no other indexes that starts with the FK field, only
- # the index that is expected to be deleted.
- if len(constraint_names) == 1:
- self.execute(
- self._create_index_sql(model, fields=[first_field], suffix="")
- )
- def _delete_composed_index(self, model, fields, *args):
- self._create_missing_fk_index(model, fields=fields)
- return super()._delete_composed_index(model, fields, *args)
- def _set_field_new_type(self, field, new_type):
- """
- Keep the NULL and DEFAULT properties of the old field. If it has
- changed, it will be handled separately.
- """
- if field.db_default is not NOT_PROVIDED:
- default_sql, params = self.db_default_sql(field)
- default_sql %= tuple(self.quote_value(p) for p in params)
- new_type += f" DEFAULT {default_sql}"
- if field.null:
- new_type += " NULL"
- else:
- new_type += " NOT NULL"
- return new_type
- def _alter_column_type_sql(
- self, model, old_field, new_field, new_type, old_collation, new_collation
- ):
- new_type = self._set_field_new_type(old_field, new_type)
- return super()._alter_column_type_sql(
- model, old_field, new_field, new_type, old_collation, new_collation
- )
- def _field_db_check(self, field, field_db_params):
- if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
- 10,
- 5,
- 2,
- ):
- return super()._field_db_check(field, field_db_params)
- # On MySQL and MariaDB < 10.5.2 (no support for
- # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
- # the column name as it requires explicit recreation when the column is
- # renamed.
- return field_db_params["check"]
- def _rename_field_sql(self, table, old_field, new_field, new_type):
- new_type = self._set_field_new_type(old_field, new_type)
- return super()._rename_field_sql(table, old_field, new_field, new_type)
- def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
- # Comment is alter when altering the column type.
- return "", []
- def _comment_sql(self, comment):
- comment_sql = super()._comment_sql(comment)
- return f" COMMENT {comment_sql}"
- def _alter_column_null_sql(self, model, old_field, new_field):
- if new_field.db_default is NOT_PROVIDED:
- return super()._alter_column_null_sql(model, old_field, new_field)
- new_db_params = new_field.db_parameters(connection=self.connection)
- type_sql = self._set_field_new_type(new_field, new_db_params["type"])
- return (
- "MODIFY %(column)s %(type)s"
- % {
- "column": self.quote_name(new_field.column),
- "type": type_sql,
- },
- [],
- )
|