schema.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. from django.db.backends.base.schema import BaseDatabaseSchemaEditor
  2. from django.db.models import NOT_PROVIDED, F, UniqueConstraint
  3. from django.db.models.constants import LOOKUP_SEP
  4. class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
  5. sql_rename_table = "RENAME TABLE %(old_table)s TO %(new_table)s"
  6. sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
  7. sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
  8. sql_alter_column_type = "MODIFY %(column)s %(type)s%(collation)s%(comment)s"
  9. sql_alter_column_no_default_null = "ALTER COLUMN %(column)s SET DEFAULT NULL"
  10. # No 'CASCADE' which works as a no-op in MySQL but is undocumented
  11. sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
  12. sql_delete_unique = "ALTER TABLE %(table)s DROP INDEX %(name)s"
  13. sql_create_column_inline_fk = (
  14. ", ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
  15. "REFERENCES %(to_table)s(%(to_column)s)"
  16. )
  17. sql_delete_fk = "ALTER TABLE %(table)s DROP FOREIGN KEY %(name)s"
  18. sql_delete_index = "DROP INDEX %(name)s ON %(table)s"
  19. sql_rename_index = "ALTER TABLE %(table)s RENAME INDEX %(old_name)s TO %(new_name)s"
  20. sql_create_pk = (
  21. "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
  22. )
  23. sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
  24. sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
  25. sql_alter_table_comment = "ALTER TABLE %(table)s COMMENT = %(comment)s"
  26. sql_alter_column_comment = None
  27. @property
  28. def sql_delete_check(self):
  29. if self.connection.mysql_is_mariadb:
  30. # The name of the column check constraint is the same as the field
  31. # name on MariaDB. Adding IF EXISTS clause prevents migrations
  32. # crash. Constraint is removed during a "MODIFY" column statement.
  33. return "ALTER TABLE %(table)s DROP CONSTRAINT IF EXISTS %(name)s"
  34. return "ALTER TABLE %(table)s DROP CHECK %(name)s"
  35. @property
  36. def sql_rename_column(self):
  37. is_mariadb = self.connection.mysql_is_mariadb
  38. if is_mariadb and self.connection.mysql_version < (10, 5, 2):
  39. # MariaDB < 10.5.2 doesn't support an
  40. # "ALTER TABLE ... RENAME COLUMN" statement.
  41. return "ALTER TABLE %(table)s CHANGE %(old_column)s %(new_column)s %(type)s"
  42. return super().sql_rename_column
  43. def quote_value(self, value):
  44. self.connection.ensure_connection()
  45. # MySQLdb escapes to string, PyMySQL to bytes.
  46. quoted = self.connection.connection.escape(
  47. value, self.connection.connection.encoders
  48. )
  49. if isinstance(value, str) and isinstance(quoted, bytes):
  50. quoted = quoted.decode()
  51. return quoted
  52. def _is_limited_data_type(self, field):
  53. db_type = field.db_type(self.connection)
  54. return (
  55. db_type is not None
  56. and db_type.lower() in self.connection._limited_data_types
  57. )
  58. def skip_default(self, field):
  59. if not self._supports_limited_data_type_defaults:
  60. return self._is_limited_data_type(field)
  61. return False
  62. def skip_default_on_alter(self, field):
  63. if self._is_limited_data_type(field) and not self.connection.mysql_is_mariadb:
  64. # MySQL doesn't support defaults for BLOB and TEXT in the
  65. # ALTER COLUMN statement.
  66. return True
  67. return False
  68. @property
  69. def _supports_limited_data_type_defaults(self):
  70. # MariaDB and MySQL >= 8.0.13 support defaults for BLOB and TEXT.
  71. if self.connection.mysql_is_mariadb:
  72. return True
  73. return self.connection.mysql_version >= (8, 0, 13)
  74. def _column_default_sql(self, field):
  75. if (
  76. not self.connection.mysql_is_mariadb
  77. and self._supports_limited_data_type_defaults
  78. and self._is_limited_data_type(field)
  79. ):
  80. # MySQL supports defaults for BLOB and TEXT columns only if the
  81. # default value is written as an expression i.e. in parentheses.
  82. return "(%s)"
  83. return super()._column_default_sql(field)
  84. def add_field(self, model, field):
  85. super().add_field(model, field)
  86. # Simulate the effect of a one-off default.
  87. # field.default may be unhashable, so a set isn't used for "in" check.
  88. if self.skip_default(field) and field.default not in (None, NOT_PROVIDED):
  89. effective_default = self.effective_default(field)
  90. self.execute(
  91. "UPDATE %(table)s SET %(column)s = %%s"
  92. % {
  93. "table": self.quote_name(model._meta.db_table),
  94. "column": self.quote_name(field.column),
  95. },
  96. [effective_default],
  97. )
  98. def remove_constraint(self, model, constraint):
  99. if (
  100. isinstance(constraint, UniqueConstraint)
  101. and constraint.create_sql(model, self) is not None
  102. ):
  103. self._create_missing_fk_index(
  104. model,
  105. fields=constraint.fields,
  106. expressions=constraint.expressions,
  107. )
  108. super().remove_constraint(model, constraint)
  109. def remove_index(self, model, index):
  110. self._create_missing_fk_index(
  111. model,
  112. fields=[field_name for field_name, _ in index.fields_orders],
  113. expressions=index.expressions,
  114. )
  115. super().remove_index(model, index)
  116. def _field_should_be_indexed(self, model, field):
  117. if not super()._field_should_be_indexed(model, field):
  118. return False
  119. storage = self.connection.introspection.get_storage_engine(
  120. self.connection.cursor(), model._meta.db_table
  121. )
  122. # No need to create an index for ForeignKey fields except if
  123. # db_constraint=False because the index from that constraint won't be
  124. # created.
  125. if (
  126. storage == "InnoDB"
  127. and field.get_internal_type() == "ForeignKey"
  128. and field.db_constraint
  129. ):
  130. return False
  131. return not self._is_limited_data_type(field)
  132. def _create_missing_fk_index(
  133. self,
  134. model,
  135. *,
  136. fields,
  137. expressions=None,
  138. ):
  139. """
  140. MySQL can remove an implicit FK index on a field when that field is
  141. covered by another index like a unique_together. "covered" here means
  142. that the more complex index has the FK field as its first field (see
  143. https://bugs.mysql.com/bug.php?id=37910).
  144. Manually create an implicit FK index to make it possible to remove the
  145. composed index.
  146. """
  147. first_field_name = None
  148. if fields:
  149. first_field_name = fields[0]
  150. elif (
  151. expressions
  152. and self.connection.features.supports_expression_indexes
  153. and isinstance(expressions[0], F)
  154. and LOOKUP_SEP not in expressions[0].name
  155. ):
  156. first_field_name = expressions[0].name
  157. if not first_field_name:
  158. return
  159. first_field = model._meta.get_field(first_field_name)
  160. if first_field.get_internal_type() == "ForeignKey":
  161. column = self.connection.introspection.identifier_converter(
  162. first_field.column
  163. )
  164. with self.connection.cursor() as cursor:
  165. constraint_names = [
  166. name
  167. for name, infodict in self.connection.introspection.get_constraints(
  168. cursor, model._meta.db_table
  169. ).items()
  170. if infodict["index"] and infodict["columns"][0] == column
  171. ]
  172. # There are no other indexes that starts with the FK field, only
  173. # the index that is expected to be deleted.
  174. if len(constraint_names) == 1:
  175. self.execute(
  176. self._create_index_sql(model, fields=[first_field], suffix="")
  177. )
  178. def _delete_composed_index(self, model, fields, *args):
  179. self._create_missing_fk_index(model, fields=fields)
  180. return super()._delete_composed_index(model, fields, *args)
  181. def _set_field_new_type(self, field, new_type):
  182. """
  183. Keep the NULL and DEFAULT properties of the old field. If it has
  184. changed, it will be handled separately.
  185. """
  186. if field.db_default is not NOT_PROVIDED:
  187. default_sql, params = self.db_default_sql(field)
  188. default_sql %= tuple(self.quote_value(p) for p in params)
  189. new_type += f" DEFAULT {default_sql}"
  190. if field.null:
  191. new_type += " NULL"
  192. else:
  193. new_type += " NOT NULL"
  194. return new_type
  195. def _alter_column_type_sql(
  196. self, model, old_field, new_field, new_type, old_collation, new_collation
  197. ):
  198. new_type = self._set_field_new_type(old_field, new_type)
  199. return super()._alter_column_type_sql(
  200. model, old_field, new_field, new_type, old_collation, new_collation
  201. )
  202. def _field_db_check(self, field, field_db_params):
  203. if self.connection.mysql_is_mariadb and self.connection.mysql_version >= (
  204. 10,
  205. 5,
  206. 2,
  207. ):
  208. return super()._field_db_check(field, field_db_params)
  209. # On MySQL and MariaDB < 10.5.2 (no support for
  210. # "ALTER TABLE ... RENAME COLUMN" statements), check constraints with
  211. # the column name as it requires explicit recreation when the column is
  212. # renamed.
  213. return field_db_params["check"]
  214. def _rename_field_sql(self, table, old_field, new_field, new_type):
  215. new_type = self._set_field_new_type(old_field, new_type)
  216. return super()._rename_field_sql(table, old_field, new_field, new_type)
  217. def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
  218. # Comment is alter when altering the column type.
  219. return "", []
  220. def _comment_sql(self, comment):
  221. comment_sql = super()._comment_sql(comment)
  222. return f" COMMENT {comment_sql}"
  223. def _alter_column_null_sql(self, model, old_field, new_field):
  224. if new_field.db_default is NOT_PROVIDED:
  225. return super()._alter_column_null_sql(model, old_field, new_field)
  226. new_db_params = new_field.db_parameters(connection=self.connection)
  227. type_sql = self._set_field_new_type(new_field, new_db_params["type"])
  228. return (
  229. "MODIFY %(column)s %(type)s"
  230. % {
  231. "column": self.quote_name(new_field.column),
  232. "type": type_sql,
  233. },
  234. [],
  235. )