12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994 |
- import logging
- import operator
- from datetime import datetime
- from django.conf import settings
- from django.db.backends.ddl_references import (
- Columns,
- Expressions,
- ForeignKeyName,
- IndexName,
- Statement,
- Table,
- )
- from django.db.backends.utils import names_digest, split_identifier, truncate_name
- from django.db.models import NOT_PROVIDED, Deferrable, Index
- from django.db.models.sql import Query
- from django.db.transaction import TransactionManagementError, atomic
- from django.utils import timezone
- logger = logging.getLogger("django.db.backends.schema")
- def _is_relevant_relation(relation, altered_field):
- """
- When altering the given field, must constraints on its model from the given
- relation be temporarily dropped?
- """
- field = relation.field
- if field.many_to_many:
- # M2M reverse field
- return False
- if altered_field.primary_key and field.to_fields == [None]:
- # Foreign key constraint on the primary key, which is being altered.
- return True
- # Is the constraint targeting the field being altered?
- return altered_field.name in field.to_fields
- def _all_related_fields(model):
- # Related fields must be returned in a deterministic order.
- return sorted(
- model._meta._get_fields(
- forward=False,
- reverse=True,
- include_hidden=True,
- include_parents=False,
- ),
- key=operator.attrgetter("name"),
- )
- def _related_non_m2m_objects(old_field, new_field):
- # Filter out m2m objects from reverse relations.
- # Return (old_relation, new_relation) tuples.
- related_fields = zip(
- (
- obj
- for obj in _all_related_fields(old_field.model)
- if _is_relevant_relation(obj, old_field)
- ),
- (
- obj
- for obj in _all_related_fields(new_field.model)
- if _is_relevant_relation(obj, new_field)
- ),
- )
- for old_rel, new_rel in related_fields:
- yield old_rel, new_rel
- yield from _related_non_m2m_objects(
- old_rel.remote_field,
- new_rel.remote_field,
- )
- class BaseDatabaseSchemaEditor:
- """
- This class and its subclasses are responsible for emitting schema-changing
- statements to the databases - model creation/removal/alteration, field
- renaming, index fiddling, and so on.
- """
- # Overrideable SQL templates
- sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
- sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
- sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s"
- sql_delete_table = "DROP TABLE %(table)s CASCADE"
- sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
- sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
- sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
- sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
- sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
- sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
- sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
- sql_alter_column_no_default_null = sql_alter_column_no_default
- sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
- sql_rename_column = (
- "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
- )
- sql_update_with_default = (
- "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
- )
- sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
- sql_check_constraint = "CHECK (%(check)s)"
- sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
- sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
- sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
- sql_delete_check = sql_delete_constraint
- sql_create_unique = (
- "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
- "UNIQUE%(nulls_distinct)s (%(columns)s)%(deferrable)s"
- )
- sql_delete_unique = sql_delete_constraint
- sql_create_fk = (
- "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
- "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
- )
- sql_create_inline_fk = None
- sql_create_column_inline_fk = None
- sql_delete_fk = sql_delete_constraint
- sql_create_index = (
- "CREATE INDEX %(name)s ON %(table)s "
- "(%(columns)s)%(include)s%(extra)s%(condition)s"
- )
- sql_create_unique_index = (
- "CREATE UNIQUE INDEX %(name)s ON %(table)s "
- "(%(columns)s)%(include)s%(condition)s%(nulls_distinct)s"
- )
- sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
- sql_delete_index = "DROP INDEX %(name)s"
- sql_create_pk = (
- "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
- )
- sql_delete_pk = sql_delete_constraint
- sql_delete_procedure = "DROP PROCEDURE %(procedure)s"
- sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
- sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
- def __init__(self, connection, collect_sql=False, atomic=True):
- self.connection = connection
- self.collect_sql = collect_sql
- if self.collect_sql:
- self.collected_sql = []
- self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
- # State-managing methods
- def __enter__(self):
- self.deferred_sql = []
- if self.atomic_migration:
- self.atomic = atomic(self.connection.alias)
- self.atomic.__enter__()
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type is None:
- for sql in self.deferred_sql:
- self.execute(sql)
- if self.atomic_migration:
- self.atomic.__exit__(exc_type, exc_value, traceback)
- # Core utility functions
- def execute(self, sql, params=()):
- """Execute the given SQL statement, with optional parameters."""
- # Don't perform the transactional DDL check if SQL is being collected
- # as it's not going to be executed anyway.
- if (
- not self.collect_sql
- and self.connection.in_atomic_block
- and not self.connection.features.can_rollback_ddl
- ):
- raise TransactionManagementError(
- "Executing DDL statements while in a transaction on databases "
- "that can't perform a rollback is prohibited."
- )
- # Account for non-string statement objects.
- sql = str(sql)
- # Log the command we're running, then run it
- logger.debug(
- "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
- )
- if self.collect_sql:
- ending = "" if sql.rstrip().endswith(";") else ";"
- if params is not None:
- self.collected_sql.append(
- (sql % tuple(map(self.quote_value, params))) + ending
- )
- else:
- self.collected_sql.append(sql + ending)
- else:
- with self.connection.cursor() as cursor:
- cursor.execute(sql, params)
- def quote_name(self, name):
- return self.connection.ops.quote_name(name)
- def table_sql(self, model):
- """Take a model and return its table definition."""
- # Add any unique_togethers (always deferred, as some fields might be
- # created afterward, like geometry fields with some backends).
- for field_names in model._meta.unique_together:
- fields = [model._meta.get_field(field) for field in field_names]
- self.deferred_sql.append(self._create_unique_sql(model, fields))
- # Create column SQL, add FK deferreds if needed.
- column_sqls = []
- params = []
- for field in model._meta.local_fields:
- # SQL.
- definition, extra_params = self.column_sql(model, field)
- if definition is None:
- continue
- # Check constraints can go on the column SQL here.
- db_params = field.db_parameters(connection=self.connection)
- if db_params["check"]:
- definition += " " + self.sql_check_constraint % db_params
- # Autoincrement SQL (for backends with inline variant).
- col_type_suffix = field.db_type_suffix(connection=self.connection)
- if col_type_suffix:
- definition += " %s" % col_type_suffix
- params.extend(extra_params)
- # FK.
- if field.remote_field and field.db_constraint:
- to_table = field.remote_field.model._meta.db_table
- to_column = field.remote_field.model._meta.get_field(
- field.remote_field.field_name
- ).column
- if self.sql_create_inline_fk:
- definition += " " + self.sql_create_inline_fk % {
- "to_table": self.quote_name(to_table),
- "to_column": self.quote_name(to_column),
- }
- elif self.connection.features.supports_foreign_keys:
- self.deferred_sql.append(
- self._create_fk_sql(
- model, field, "_fk_%(to_table)s_%(to_column)s"
- )
- )
- # Add the SQL to our big list.
- column_sqls.append(
- "%s %s"
- % (
- self.quote_name(field.column),
- definition,
- )
- )
- # Autoincrement SQL (for backends with post table definition
- # variant).
- if field.get_internal_type() in (
- "AutoField",
- "BigAutoField",
- "SmallAutoField",
- ):
- autoinc_sql = self.connection.ops.autoinc_sql(
- model._meta.db_table, field.column
- )
- if autoinc_sql:
- self.deferred_sql.extend(autoinc_sql)
- constraints = [
- constraint.constraint_sql(model, self)
- for constraint in model._meta.constraints
- ]
- sql = self.sql_create_table % {
- "table": self.quote_name(model._meta.db_table),
- "definition": ", ".join(
- str(constraint)
- for constraint in (*column_sqls, *constraints)
- if constraint
- ),
- }
- if model._meta.db_tablespace:
- tablespace_sql = self.connection.ops.tablespace_sql(
- model._meta.db_tablespace
- )
- if tablespace_sql:
- sql += " " + tablespace_sql
- return sql, params
- # Field <-> database mapping functions
- def _iter_column_sql(
- self, column_db_type, params, model, field, field_db_params, include_default
- ):
- yield column_db_type
- if collation := field_db_params.get("collation"):
- yield self._collate_sql(collation)
- if self.connection.features.supports_comments_inline and field.db_comment:
- yield self._comment_sql(field.db_comment)
- # Work out nullability.
- null = field.null
- # Add database default.
- if field.db_default is not NOT_PROVIDED:
- default_sql, default_params = self.db_default_sql(field)
- yield f"DEFAULT {default_sql}"
- params.extend(default_params)
- include_default = False
- # Include a default value, if requested.
- include_default = (
- include_default
- and not self.skip_default(field)
- and
- # Don't include a default value if it's a nullable field and the
- # default cannot be dropped in the ALTER COLUMN statement (e.g.
- # MySQL longtext and longblob).
- not (null and self.skip_default_on_alter(field))
- )
- if include_default:
- default_value = self.effective_default(field)
- if default_value is not None:
- column_default = "DEFAULT " + self._column_default_sql(field)
- if self.connection.features.requires_literal_defaults:
- # Some databases can't take defaults as a parameter (Oracle).
- # If this is the case, the individual schema backend should
- # implement prepare_default().
- yield column_default % self.prepare_default(default_value)
- else:
- yield column_default
- params.append(default_value)
- # Oracle treats the empty string ('') as null, so coerce the null
- # option whenever '' is a possible value.
- if (
- field.empty_strings_allowed
- and not field.primary_key
- and self.connection.features.interprets_empty_strings_as_nulls
- ):
- null = True
- if field.generated:
- yield self._column_generated_sql(field)
- elif not null:
- yield "NOT NULL"
- elif not self.connection.features.implied_column_null:
- yield "NULL"
- if field.primary_key:
- yield "PRIMARY KEY"
- elif field.unique:
- yield "UNIQUE"
- # Optionally add the tablespace if it's an implicitly indexed column.
- tablespace = field.db_tablespace or model._meta.db_tablespace
- if (
- tablespace
- and self.connection.features.supports_tablespaces
- and field.unique
- ):
- yield self.connection.ops.tablespace_sql(tablespace, inline=True)
- def column_sql(self, model, field, include_default=False):
- """
- Return the column definition for a field. The field must already have
- had set_attributes_from_name() called.
- """
- # Get the column's type and use that as the basis of the SQL.
- field_db_params = field.db_parameters(connection=self.connection)
- column_db_type = field_db_params["type"]
- # Check for fields that aren't actually columns (e.g. M2M).
- if column_db_type is None:
- return None, None
- params = []
- return (
- " ".join(
- # This appends to the params being returned.
- self._iter_column_sql(
- column_db_type,
- params,
- model,
- field,
- field_db_params,
- include_default,
- )
- ),
- params,
- )
- def skip_default(self, field):
- """
- Some backends don't accept default values for certain columns types
- (i.e. MySQL longtext and longblob).
- """
- return False
- def skip_default_on_alter(self, field):
- """
- Some backends don't accept default values for certain columns types
- (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
- """
- return False
- def prepare_default(self, value):
- """
- Only used for backends which have requires_literal_defaults feature
- """
- raise NotImplementedError(
- "subclasses of BaseDatabaseSchemaEditor for backends which have "
- "requires_literal_defaults must provide a prepare_default() method"
- )
- def _column_default_sql(self, field):
- """
- Return the SQL to use in a DEFAULT clause. The resulting string should
- contain a '%s' placeholder for a default value.
- """
- return "%s"
- def db_default_sql(self, field):
- """Return the sql and params for the field's database default."""
- from django.db.models.expressions import Value
- sql = "%s" if isinstance(field.db_default, Value) else "(%s)"
- query = Query(model=field.model)
- compiler = query.get_compiler(connection=self.connection)
- default_sql, params = compiler.compile(field.db_default)
- if self.connection.features.requires_literal_defaults:
- # Some databases doesn't support parameterized defaults (Oracle,
- # SQLite). If this is the case, the individual schema backend
- # should implement prepare_default().
- default_sql %= tuple(self.prepare_default(p) for p in params)
- params = []
- return sql % default_sql, params
- def _column_generated_sql(self, field):
- """Return the SQL to use in a GENERATED ALWAYS clause."""
- expression_sql, params = field.generated_sql(self.connection)
- persistency_sql = "STORED" if field.db_persist else "VIRTUAL"
- if params:
- expression_sql = expression_sql % tuple(self.quote_value(p) for p in params)
- return f"GENERATED ALWAYS AS ({expression_sql}) {persistency_sql}"
- @staticmethod
- def _effective_default(field):
- # This method allows testing its logic without a connection.
- if field.has_default():
- default = field.get_default()
- elif field.generated:
- default = None
- elif not field.null and field.blank and field.empty_strings_allowed:
- if field.get_internal_type() == "BinaryField":
- default = b""
- else:
- default = ""
- elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
- internal_type = field.get_internal_type()
- if internal_type == "DateTimeField":
- default = timezone.now()
- else:
- default = datetime.now()
- if internal_type == "DateField":
- default = default.date()
- elif internal_type == "TimeField":
- default = default.time()
- else:
- default = None
- return default
- def effective_default(self, field):
- """Return a field's effective database default value."""
- return field.get_db_prep_save(self._effective_default(field), self.connection)
- def quote_value(self, value):
- """
- Return a quoted version of the value so it's safe to use in an SQL
- string. This is not safe against injection from user code; it is
- intended only for use in making SQL scripts or preparing default values
- for particularly tricky backends (defaults are not user-defined, though,
- so this is safe).
- """
- raise NotImplementedError()
- # Actions
- def create_model(self, model):
- """
- Create a table and any accompanying indexes or unique constraints for
- the given `model`.
- """
- sql, params = self.table_sql(model)
- # Prevent using [] as params, in the case a literal '%' is used in the
- # definition.
- self.execute(sql, params or None)
- if self.connection.features.supports_comments:
- # Add table comment.
- if model._meta.db_table_comment:
- self.alter_db_table_comment(model, None, model._meta.db_table_comment)
- # Add column comments.
- if not self.connection.features.supports_comments_inline:
- for field in model._meta.local_fields:
- if field.db_comment:
- field_db_params = field.db_parameters(
- connection=self.connection
- )
- field_type = field_db_params["type"]
- self.execute(
- *self._alter_column_comment_sql(
- model, field, field_type, field.db_comment
- )
- )
- # Add any field index and index_together's (deferred as SQLite
- # _remake_table needs it).
- self.deferred_sql.extend(self._model_indexes_sql(model))
- # Make M2M tables
- for field in model._meta.local_many_to_many:
- if field.remote_field.through._meta.auto_created:
- self.create_model(field.remote_field.through)
- def delete_model(self, model):
- """Delete a model from the database."""
- # Handle auto-created intermediary models
- for field in model._meta.local_many_to_many:
- if field.remote_field.through._meta.auto_created:
- self.delete_model(field.remote_field.through)
- # Delete the table
- self.execute(
- self.sql_delete_table
- % {
- "table": self.quote_name(model._meta.db_table),
- }
- )
- # Remove all deferred statements referencing the deleted table.
- for sql in list(self.deferred_sql):
- if isinstance(sql, Statement) and sql.references_table(
- model._meta.db_table
- ):
- self.deferred_sql.remove(sql)
- def add_index(self, model, index):
- """Add an index on a model."""
- if (
- index.contains_expressions
- and not self.connection.features.supports_expression_indexes
- ):
- return None
- # Index.create_sql returns interpolated SQL which makes params=None a
- # necessity to avoid escaping attempts on execution.
- self.execute(index.create_sql(model, self), params=None)
- def remove_index(self, model, index):
- """Remove an index from a model."""
- if (
- index.contains_expressions
- and not self.connection.features.supports_expression_indexes
- ):
- return None
- self.execute(index.remove_sql(model, self))
- def rename_index(self, model, old_index, new_index):
- if self.connection.features.can_rename_index:
- self.execute(
- self._rename_index_sql(model, old_index.name, new_index.name),
- params=None,
- )
- else:
- self.remove_index(model, old_index)
- self.add_index(model, new_index)
- def add_constraint(self, model, constraint):
- """Add a constraint to a model."""
- sql = constraint.create_sql(model, self)
- if sql:
- # Constraint.create_sql returns interpolated SQL which makes
- # params=None a necessity to avoid escaping attempts on execution.
- self.execute(sql, params=None)
- def remove_constraint(self, model, constraint):
- """Remove a constraint from a model."""
- sql = constraint.remove_sql(model, self)
- if sql:
- self.execute(sql)
- def alter_unique_together(self, model, old_unique_together, new_unique_together):
- """
- Deal with a model changing its unique_together. The input
- unique_togethers must be doubly-nested, not the single-nested
- ["foo", "bar"] format.
- """
- olds = {tuple(fields) for fields in old_unique_together}
- news = {tuple(fields) for fields in new_unique_together}
- # Deleted uniques
- for fields in olds.difference(news):
- self._delete_composed_index(
- model,
- fields,
- {"unique": True, "primary_key": False},
- self.sql_delete_unique,
- )
- # Created uniques
- for field_names in news.difference(olds):
- fields = [model._meta.get_field(field) for field in field_names]
- self.execute(self._create_unique_sql(model, fields))
- def alter_index_together(self, model, old_index_together, new_index_together):
- """
- Deal with a model changing its index_together. The input
- index_togethers must be doubly-nested, not the single-nested
- ["foo", "bar"] format.
- """
- olds = {tuple(fields) for fields in old_index_together}
- news = {tuple(fields) for fields in new_index_together}
- # Deleted indexes
- for fields in olds.difference(news):
- self._delete_composed_index(
- model,
- fields,
- {"index": True, "unique": False},
- self.sql_delete_index,
- )
- # Created indexes
- for field_names in news.difference(olds):
- fields = [model._meta.get_field(field) for field in field_names]
- self.execute(self._create_index_sql(model, fields=fields, suffix="_idx"))
- def _delete_composed_index(self, model, fields, constraint_kwargs, sql):
- meta_constraint_names = {
- constraint.name for constraint in model._meta.constraints
- }
- meta_index_names = {constraint.name for constraint in model._meta.indexes}
- columns = [model._meta.get_field(field).column for field in fields]
- constraint_names = self._constraint_names(
- model,
- columns,
- exclude=meta_constraint_names | meta_index_names,
- **constraint_kwargs,
- )
- if (
- constraint_kwargs.get("unique") is True
- and constraint_names
- and self.connection.features.allows_multiple_constraints_on_same_fields
- ):
- # Constraint matching the unique_together name.
- default_name = str(
- self._unique_constraint_name(model._meta.db_table, columns, quote=False)
- )
- if default_name in constraint_names:
- constraint_names = [default_name]
- if len(constraint_names) != 1:
- raise ValueError(
- "Found wrong number (%s) of constraints for %s(%s)"
- % (
- len(constraint_names),
- model._meta.db_table,
- ", ".join(columns),
- )
- )
- self.execute(self._delete_constraint_sql(sql, model, constraint_names[0]))
- def alter_db_table(self, model, old_db_table, new_db_table):
- """Rename the table a model points to."""
- if old_db_table == new_db_table or (
- self.connection.features.ignores_table_name_case
- and old_db_table.lower() == new_db_table.lower()
- ):
- return
- self.execute(
- self.sql_rename_table
- % {
- "old_table": self.quote_name(old_db_table),
- "new_table": self.quote_name(new_db_table),
- }
- )
- # Rename all references to the old table name.
- for sql in self.deferred_sql:
- if isinstance(sql, Statement):
- sql.rename_table_references(old_db_table, new_db_table)
- def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
- if self.sql_alter_table_comment and self.connection.features.supports_comments:
- self.execute(
- self.sql_alter_table_comment
- % {
- "table": self.quote_name(model._meta.db_table),
- "comment": self.quote_value(new_db_table_comment or ""),
- }
- )
- def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace):
- """Move a model's table between tablespaces."""
- self.execute(
- self.sql_retablespace_table
- % {
- "table": self.quote_name(model._meta.db_table),
- "old_tablespace": self.quote_name(old_db_tablespace),
- "new_tablespace": self.quote_name(new_db_tablespace),
- }
- )
- def add_field(self, model, field):
- """
- Create a field on a model. Usually involves adding a column, but may
- involve adding a table instead (for M2M fields).
- """
- # Special-case implicit M2M tables
- if field.many_to_many and field.remote_field.through._meta.auto_created:
- return self.create_model(field.remote_field.through)
- # Get the column's definition
- definition, params = self.column_sql(model, field, include_default=True)
- # It might not actually have a column behind it
- if definition is None:
- return
- if col_type_suffix := field.db_type_suffix(connection=self.connection):
- definition += f" {col_type_suffix}"
- # Check constraints can go on the column SQL here
- db_params = field.db_parameters(connection=self.connection)
- if db_params["check"]:
- definition += " " + self.sql_check_constraint % db_params
- if (
- field.remote_field
- and self.connection.features.supports_foreign_keys
- and field.db_constraint
- ):
- constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
- # Add FK constraint inline, if supported.
- if self.sql_create_column_inline_fk:
- to_table = field.remote_field.model._meta.db_table
- to_column = field.remote_field.model._meta.get_field(
- field.remote_field.field_name
- ).column
- namespace, _ = split_identifier(model._meta.db_table)
- definition += " " + self.sql_create_column_inline_fk % {
- "name": self._fk_constraint_name(model, field, constraint_suffix),
- "namespace": "%s." % self.quote_name(namespace)
- if namespace
- else "",
- "column": self.quote_name(field.column),
- "to_table": self.quote_name(to_table),
- "to_column": self.quote_name(to_column),
- "deferrable": self.connection.ops.deferrable_sql(),
- }
- # Otherwise, add FK constraints later.
- else:
- self.deferred_sql.append(
- self._create_fk_sql(model, field, constraint_suffix)
- )
- # Build the SQL and run it
- sql = self.sql_create_column % {
- "table": self.quote_name(model._meta.db_table),
- "column": self.quote_name(field.column),
- "definition": definition,
- }
- self.execute(sql, params)
- # Drop the default if we need to
- if (
- field.db_default is NOT_PROVIDED
- and not self.skip_default_on_alter(field)
- and self.effective_default(field) is not None
- ):
- changes_sql, params = self._alter_column_default_sql(
- model, None, field, drop=True
- )
- sql = self.sql_alter_column % {
- "table": self.quote_name(model._meta.db_table),
- "changes": changes_sql,
- }
- self.execute(sql, params)
- # Add field comment, if required.
- if (
- field.db_comment
- and self.connection.features.supports_comments
- and not self.connection.features.supports_comments_inline
- ):
- field_type = db_params["type"]
- self.execute(
- *self._alter_column_comment_sql(
- model, field, field_type, field.db_comment
- )
- )
- # Add an index, if required
- self.deferred_sql.extend(self._field_indexes_sql(model, field))
- # Reset connection if required
- if self.connection.features.connection_persists_old_columns:
- self.connection.close()
- def remove_field(self, model, field):
- """
- Remove a field from a model. Usually involves deleting a column,
- but for M2Ms may involve deleting a table.
- """
- # Special-case implicit M2M tables
- if field.many_to_many and field.remote_field.through._meta.auto_created:
- return self.delete_model(field.remote_field.through)
- # It might not actually have a column behind it
- if field.db_parameters(connection=self.connection)["type"] is None:
- return
- # Drop any FK constraints, MySQL requires explicit deletion
- if field.remote_field:
- fk_names = self._constraint_names(model, [field.column], foreign_key=True)
- for fk_name in fk_names:
- self.execute(self._delete_fk_sql(model, fk_name))
- # Delete the column
- sql = self.sql_delete_column % {
- "table": self.quote_name(model._meta.db_table),
- "column": self.quote_name(field.column),
- }
- self.execute(sql)
- # Reset connection if required
- if self.connection.features.connection_persists_old_columns:
- self.connection.close()
- # Remove all deferred statements referencing the deleted column.
- for sql in list(self.deferred_sql):
- if isinstance(sql, Statement) and sql.references_column(
- model._meta.db_table, field.column
- ):
- self.deferred_sql.remove(sql)
- def alter_field(self, model, old_field, new_field, strict=False):
- """
- Allow a field's type, uniqueness, nullability, default, column,
- constraints, etc. to be modified.
- `old_field` is required to compute the necessary changes.
- If `strict` is True, raise errors if the old column does not match
- `old_field` precisely.
- """
- if not self._field_should_be_altered(old_field, new_field):
- return
- # Ensure this field is even column-based
- old_db_params = old_field.db_parameters(connection=self.connection)
- old_type = old_db_params["type"]
- new_db_params = new_field.db_parameters(connection=self.connection)
- new_type = new_db_params["type"]
- if (old_type is None and old_field.remote_field is None) or (
- new_type is None and new_field.remote_field is None
- ):
- raise ValueError(
- "Cannot alter field %s into %s - they do not properly define "
- "db_type (are you using a badly-written custom field?)"
- % (old_field, new_field),
- )
- elif (
- old_type is None
- and new_type is None
- and (
- old_field.remote_field.through
- and new_field.remote_field.through
- and old_field.remote_field.through._meta.auto_created
- and new_field.remote_field.through._meta.auto_created
- )
- ):
- return self._alter_many_to_many(model, old_field, new_field, strict)
- elif (
- old_type is None
- and new_type is None
- and (
- old_field.remote_field.through
- and new_field.remote_field.through
- and not old_field.remote_field.through._meta.auto_created
- and not new_field.remote_field.through._meta.auto_created
- )
- ):
- # Both sides have through models; this is a no-op.
- return
- elif old_type is None or new_type is None:
- raise ValueError(
- "Cannot alter field %s into %s - they are not compatible types "
- "(you cannot alter to or from M2M fields, or add or remove "
- "through= on M2M fields)" % (old_field, new_field)
- )
- elif old_field.generated != new_field.generated or (
- new_field.generated
- and (
- old_field.db_persist != new_field.db_persist
- or old_field.generated_sql(self.connection)
- != new_field.generated_sql(self.connection)
- )
- ):
- raise ValueError(
- f"Modifying GeneratedFields is not supported - the field {new_field} "
- "must be removed and re-added with the new definition."
- )
- self._alter_field(
- model,
- old_field,
- new_field,
- old_type,
- new_type,
- old_db_params,
- new_db_params,
- strict,
- )
- def _field_db_check(self, field, field_db_params):
- # Always check constraints with the same mocked column name to avoid
- # recreating constrains when the column is renamed.
- check_constraints = self.connection.data_type_check_constraints
- data = field.db_type_parameters(self.connection)
- data["column"] = "__column_name__"
- try:
- return check_constraints[field.get_internal_type()] % data
- except KeyError:
- return None
- def _alter_field(
- self,
- model,
- old_field,
- new_field,
- old_type,
- new_type,
- old_db_params,
- new_db_params,
- strict=False,
- ):
- """Perform a "physical" (non-ManyToMany) field update."""
- # Drop any FK constraints, we'll remake them later
- fks_dropped = set()
- if (
- self.connection.features.supports_foreign_keys
- and old_field.remote_field
- and old_field.db_constraint
- and self._field_should_be_altered(
- old_field,
- new_field,
- ignore={"db_comment"},
- )
- ):
- fk_names = self._constraint_names(
- model, [old_field.column], foreign_key=True
- )
- if strict and len(fk_names) != 1:
- raise ValueError(
- "Found wrong number (%s) of foreign key constraints for %s.%s"
- % (
- len(fk_names),
- model._meta.db_table,
- old_field.column,
- )
- )
- for fk_name in fk_names:
- fks_dropped.add((old_field.column,))
- self.execute(self._delete_fk_sql(model, fk_name))
- # Has unique been removed?
- if old_field.unique and (
- not new_field.unique or self._field_became_primary_key(old_field, new_field)
- ):
- # Find the unique constraint for this field
- meta_constraint_names = {
- constraint.name for constraint in model._meta.constraints
- }
- constraint_names = self._constraint_names(
- model,
- [old_field.column],
- unique=True,
- primary_key=False,
- exclude=meta_constraint_names,
- )
- if strict and len(constraint_names) != 1:
- raise ValueError(
- "Found wrong number (%s) of unique constraints for %s.%s"
- % (
- len(constraint_names),
- model._meta.db_table,
- old_field.column,
- )
- )
- for constraint_name in constraint_names:
- self.execute(self._delete_unique_sql(model, constraint_name))
- # Drop incoming FK constraints if the field is a primary key or unique,
- # which might be a to_field target, and things are going to change.
- old_collation = old_db_params.get("collation")
- new_collation = new_db_params.get("collation")
- drop_foreign_keys = (
- self.connection.features.supports_foreign_keys
- and (
- (old_field.primary_key and new_field.primary_key)
- or (old_field.unique and new_field.unique)
- )
- and ((old_type != new_type) or (old_collation != new_collation))
- )
- if drop_foreign_keys:
- # '_meta.related_field' also contains M2M reverse fields, these
- # will be filtered out
- for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
- rel_fk_names = self._constraint_names(
- new_rel.related_model, [new_rel.field.column], foreign_key=True
- )
- for fk_name in rel_fk_names:
- self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
- # Removed an index? (no strict check, as multiple indexes are possible)
- # Remove indexes if db_index switched to False or a unique constraint
- # will now be used in lieu of an index. The following lines from the
- # truth table show all True cases; the rest are False:
- #
- # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
- # ------------------------------------------------------------------------------
- # True | False | False | False
- # True | False | False | True
- # True | False | True | True
- if (
- old_field.db_index
- and not old_field.unique
- and (not new_field.db_index or new_field.unique)
- ):
- # Find the index for this field
- meta_index_names = {index.name for index in model._meta.indexes}
- # Retrieve only BTREE indexes since this is what's created with
- # db_index=True.
- index_names = self._constraint_names(
- model,
- [old_field.column],
- index=True,
- type_=Index.suffix,
- exclude=meta_index_names,
- )
- for index_name in index_names:
- # The only way to check if an index was created with
- # db_index=True or with Index(['field'], name='foo')
- # is to look at its name (refs #28053).
- self.execute(self._delete_index_sql(model, index_name))
- # Change check constraints?
- old_db_check = self._field_db_check(old_field, old_db_params)
- new_db_check = self._field_db_check(new_field, new_db_params)
- if old_db_check != new_db_check and old_db_check:
- meta_constraint_names = {
- constraint.name for constraint in model._meta.constraints
- }
- constraint_names = self._constraint_names(
- model,
- [old_field.column],
- check=True,
- exclude=meta_constraint_names,
- )
- if strict and len(constraint_names) != 1:
- raise ValueError(
- "Found wrong number (%s) of check constraints for %s.%s"
- % (
- len(constraint_names),
- model._meta.db_table,
- old_field.column,
- )
- )
- for constraint_name in constraint_names:
- self.execute(self._delete_check_sql(model, constraint_name))
- # Have they renamed the column?
- if old_field.column != new_field.column:
- self.execute(
- self._rename_field_sql(
- model._meta.db_table, old_field, new_field, new_type
- )
- )
- # Rename all references to the renamed column.
- for sql in self.deferred_sql:
- if isinstance(sql, Statement):
- sql.rename_column_references(
- model._meta.db_table, old_field.column, new_field.column
- )
- # Next, start accumulating actions to do
- actions = []
- null_actions = []
- post_actions = []
- # Type suffix change? (e.g. auto increment).
- old_type_suffix = old_field.db_type_suffix(connection=self.connection)
- new_type_suffix = new_field.db_type_suffix(connection=self.connection)
- # Type, collation, or comment change?
- if (
- old_type != new_type
- or old_type_suffix != new_type_suffix
- or old_collation != new_collation
- or (
- self.connection.features.supports_comments
- and old_field.db_comment != new_field.db_comment
- )
- ):
- fragment, other_actions = self._alter_column_type_sql(
- model, old_field, new_field, new_type, old_collation, new_collation
- )
- actions.append(fragment)
- post_actions.extend(other_actions)
- if new_field.db_default is not NOT_PROVIDED:
- if (
- old_field.db_default is NOT_PROVIDED
- or new_field.db_default != old_field.db_default
- ):
- actions.append(
- self._alter_column_database_default_sql(model, old_field, new_field)
- )
- elif old_field.db_default is not NOT_PROVIDED:
- actions.append(
- self._alter_column_database_default_sql(
- model, old_field, new_field, drop=True
- )
- )
- # When changing a column NULL constraint to NOT NULL with a given
- # default value, we need to perform 4 steps:
- # 1. Add a default for new incoming writes
- # 2. Update existing NULL rows with new default
- # 3. Replace NULL constraint with NOT NULL
- # 4. Drop the default again.
- # Default change?
- needs_database_default = False
- if (
- old_field.null
- and not new_field.null
- and new_field.db_default is NOT_PROVIDED
- ):
- old_default = self.effective_default(old_field)
- new_default = self.effective_default(new_field)
- if (
- not self.skip_default_on_alter(new_field)
- and old_default != new_default
- and new_default is not None
- ):
- needs_database_default = True
- actions.append(
- self._alter_column_default_sql(model, old_field, new_field)
- )
- # Nullability change?
- if old_field.null != new_field.null:
- fragment = self._alter_column_null_sql(model, old_field, new_field)
- if fragment:
- null_actions.append(fragment)
- # Only if we have a default and there is a change from NULL to NOT NULL
- four_way_default_alteration = (
- new_field.has_default() or new_field.db_default is not NOT_PROVIDED
- ) and (old_field.null and not new_field.null)
- if actions or null_actions:
- if not four_way_default_alteration:
- # If we don't have to do a 4-way default alteration we can
- # directly run a (NOT) NULL alteration
- actions += null_actions
- # Combine actions together if we can (e.g. postgres)
- if self.connection.features.supports_combined_alters and actions:
- sql, params = tuple(zip(*actions))
- actions = [(", ".join(sql), sum(params, []))]
- # Apply those actions
- for sql, params in actions:
- self.execute(
- self.sql_alter_column
- % {
- "table": self.quote_name(model._meta.db_table),
- "changes": sql,
- },
- params,
- )
- if four_way_default_alteration:
- if new_field.db_default is NOT_PROVIDED:
- default_sql = "%s"
- params = [new_default]
- else:
- default_sql, params = self.db_default_sql(new_field)
- # Update existing rows with default value
- self.execute(
- self.sql_update_with_default
- % {
- "table": self.quote_name(model._meta.db_table),
- "column": self.quote_name(new_field.column),
- "default": default_sql,
- },
- params,
- )
- # Since we didn't run a NOT NULL change before we need to do it
- # now
- for sql, params in null_actions:
- self.execute(
- self.sql_alter_column
- % {
- "table": self.quote_name(model._meta.db_table),
- "changes": sql,
- },
- params,
- )
- if post_actions:
- for sql, params in post_actions:
- self.execute(sql, params)
- # If primary_key changed to False, delete the primary key constraint.
- if old_field.primary_key and not new_field.primary_key:
- self._delete_primary_key(model, strict)
- # Added a unique?
- if self._unique_should_be_added(old_field, new_field):
- self.execute(self._create_unique_sql(model, [new_field]))
- # Added an index? Add an index if db_index switched to True or a unique
- # constraint will no longer be used in lieu of an index. The following
- # lines from the truth table show all True cases; the rest are False:
- #
- # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
- # ------------------------------------------------------------------------------
- # False | False | True | False
- # False | True | True | False
- # True | True | True | False
- if (
- (not old_field.db_index or old_field.unique)
- and new_field.db_index
- and not new_field.unique
- ):
- self.execute(self._create_index_sql(model, fields=[new_field]))
- # Type alteration on primary key? Then we need to alter the column
- # referring to us.
- rels_to_update = []
- if drop_foreign_keys:
- rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
- # Changed to become primary key?
- if self._field_became_primary_key(old_field, new_field):
- # Make the new one
- self.execute(self._create_primary_key_sql(model, new_field))
- # Update all referencing columns
- rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
- # Handle our type alters on the other end of rels from the PK stuff above
- for old_rel, new_rel in rels_to_update:
- rel_db_params = new_rel.field.db_parameters(connection=self.connection)
- rel_type = rel_db_params["type"]
- rel_collation = rel_db_params.get("collation")
- old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
- old_rel_collation = old_rel_db_params.get("collation")
- fragment, other_actions = self._alter_column_type_sql(
- new_rel.related_model,
- old_rel.field,
- new_rel.field,
- rel_type,
- old_rel_collation,
- rel_collation,
- )
- self.execute(
- self.sql_alter_column
- % {
- "table": self.quote_name(new_rel.related_model._meta.db_table),
- "changes": fragment[0],
- },
- fragment[1],
- )
- for sql, params in other_actions:
- self.execute(sql, params)
- # Does it have a foreign key?
- if (
- self.connection.features.supports_foreign_keys
- and new_field.remote_field
- and (
- fks_dropped or not old_field.remote_field or not old_field.db_constraint
- )
- and new_field.db_constraint
- ):
- self.execute(
- self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
- )
- # Rebuild FKs that pointed to us if we previously had to drop them
- if drop_foreign_keys:
- for _, rel in rels_to_update:
- if rel.field.db_constraint:
- self.execute(
- self._create_fk_sql(rel.related_model, rel.field, "_fk")
- )
- # Does it have check constraints we need to add?
- if old_db_check != new_db_check and new_db_check:
- constraint_name = self._create_index_name(
- model._meta.db_table, [new_field.column], suffix="_check"
- )
- self.execute(
- self._create_check_sql(model, constraint_name, new_db_params["check"])
- )
- # Drop the default if we need to
- # (Django usually does not use in-database defaults)
- if needs_database_default:
- changes_sql, params = self._alter_column_default_sql(
- model, old_field, new_field, drop=True
- )
- sql = self.sql_alter_column % {
- "table": self.quote_name(model._meta.db_table),
- "changes": changes_sql,
- }
- self.execute(sql, params)
- # Reset connection if required
- if self.connection.features.connection_persists_old_columns:
- self.connection.close()
- def _alter_column_null_sql(self, model, old_field, new_field):
- """
- Hook to specialize column null alteration.
- Return a (sql, params) fragment to set a column to null or non-null
- as required by new_field, or None if no changes are required.
- """
- if (
- self.connection.features.interprets_empty_strings_as_nulls
- and new_field.empty_strings_allowed
- ):
- # The field is nullable in the database anyway, leave it alone.
- return
- else:
- new_db_params = new_field.db_parameters(connection=self.connection)
- sql = (
- self.sql_alter_column_null
- if new_field.null
- else self.sql_alter_column_not_null
- )
- return (
- sql
- % {
- "column": self.quote_name(new_field.column),
- "type": new_db_params["type"],
- },
- [],
- )
- def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
- """
- Hook to specialize column default alteration.
- Return a (sql, params) fragment to add or drop (depending on the drop
- argument) a default to new_field's column.
- """
- new_default = self.effective_default(new_field)
- default = self._column_default_sql(new_field)
- params = [new_default]
- if drop:
- params = []
- elif self.connection.features.requires_literal_defaults:
- # Some databases (Oracle) can't take defaults as a parameter
- # If this is the case, the SchemaEditor for that database should
- # implement prepare_default().
- default = self.prepare_default(new_default)
- params = []
- new_db_params = new_field.db_parameters(connection=self.connection)
- if drop:
- if new_field.null:
- sql = self.sql_alter_column_no_default_null
- else:
- sql = self.sql_alter_column_no_default
- else:
- sql = self.sql_alter_column_default
- return (
- sql
- % {
- "column": self.quote_name(new_field.column),
- "type": new_db_params["type"],
- "default": default,
- },
- params,
- )
- def _alter_column_database_default_sql(
- self, model, old_field, new_field, drop=False
- ):
- """
- Hook to specialize column database default alteration.
- Return a (sql, params) fragment to add or drop (depending on the drop
- argument) a default to new_field's column.
- """
- if drop:
- sql = self.sql_alter_column_no_default
- default_sql = ""
- params = []
- else:
- sql = self.sql_alter_column_default
- default_sql, params = self.db_default_sql(new_field)
- new_db_params = new_field.db_parameters(connection=self.connection)
- return (
- sql
- % {
- "column": self.quote_name(new_field.column),
- "type": new_db_params["type"],
- "default": default_sql,
- },
- params,
- )
- def _alter_column_type_sql(
- self, model, old_field, new_field, new_type, old_collation, new_collation
- ):
- """
- Hook to specialize column type alteration for different backends,
- for cases when a creation type is different to an alteration type
- (e.g. SERIAL in PostgreSQL, PostGIS fields).
- Return a 2-tuple of: an SQL fragment of (sql, params) to insert into
- an ALTER TABLE statement and a list of extra (sql, params) tuples to
- run once the field is altered.
- """
- other_actions = []
- if collate_sql := self._collate_sql(
- new_collation, old_collation, model._meta.db_table
- ):
- collate_sql = f" {collate_sql}"
- else:
- collate_sql = ""
- # Comment change?
- comment_sql = ""
- if self.connection.features.supports_comments and not new_field.many_to_many:
- if old_field.db_comment != new_field.db_comment:
- # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
- # 'COMMENT ON ...' at the same time.
- sql, params = self._alter_column_comment_sql(
- model, new_field, new_type, new_field.db_comment
- )
- if sql:
- other_actions.append((sql, params))
- if new_field.db_comment:
- comment_sql = self._comment_sql(new_field.db_comment)
- return (
- (
- self.sql_alter_column_type
- % {
- "column": self.quote_name(new_field.column),
- "type": new_type,
- "collation": collate_sql,
- "comment": comment_sql,
- },
- [],
- ),
- other_actions,
- )
- def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
- return (
- self.sql_alter_column_comment
- % {
- "table": self.quote_name(model._meta.db_table),
- "column": self.quote_name(new_field.column),
- "comment": self._comment_sql(new_db_comment),
- },
- [],
- )
- def _comment_sql(self, comment):
- return self.quote_value(comment or "")
- def _alter_many_to_many(self, model, old_field, new_field, strict):
- """Alter M2Ms to repoint their to= endpoints."""
- # Rename the through table
- if (
- old_field.remote_field.through._meta.db_table
- != new_field.remote_field.through._meta.db_table
- ):
- self.alter_db_table(
- old_field.remote_field.through,
- old_field.remote_field.through._meta.db_table,
- new_field.remote_field.through._meta.db_table,
- )
- # Repoint the FK to the other side
- self.alter_field(
- new_field.remote_field.through,
- # The field that points to the target model is needed, so we can
- # tell alter_field to change it - this is m2m_reverse_field_name()
- # (as opposed to m2m_field_name(), which points to our model).
- old_field.remote_field.through._meta.get_field(
- old_field.m2m_reverse_field_name()
- ),
- new_field.remote_field.through._meta.get_field(
- new_field.m2m_reverse_field_name()
- ),
- )
- self.alter_field(
- new_field.remote_field.through,
- # for self-referential models we need to alter field from the other end too
- old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
- new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
- )
- def _create_index_name(self, table_name, column_names, suffix=""):
- """
- Generate a unique name for an index/unique constraint.
- The name is divided into 3 parts: the table name, the column names,
- and a unique digest and suffix.
- """
- _, table_name = split_identifier(table_name)
- hash_suffix_part = "%s%s" % (
- names_digest(table_name, *column_names, length=8),
- suffix,
- )
- max_length = self.connection.ops.max_name_length() or 200
- # If everything fits into max_length, use that name.
- index_name = "%s_%s_%s" % (table_name, "_".join(column_names), hash_suffix_part)
- if len(index_name) <= max_length:
- return index_name
- # Shorten a long suffix.
- if len(hash_suffix_part) > max_length / 3:
- hash_suffix_part = hash_suffix_part[: max_length // 3]
- other_length = (max_length - len(hash_suffix_part)) // 2 - 1
- index_name = "%s_%s_%s" % (
- table_name[:other_length],
- "_".join(column_names)[:other_length],
- hash_suffix_part,
- )
- # Prepend D if needed to prevent the name from starting with an
- # underscore or a number (not permitted on Oracle).
- if index_name[0] == "_" or index_name[0].isdigit():
- index_name = "D%s" % index_name[:-1]
- return index_name
- def _get_index_tablespace_sql(self, model, fields, db_tablespace=None):
- if db_tablespace is None:
- if len(fields) == 1 and fields[0].db_tablespace:
- db_tablespace = fields[0].db_tablespace
- elif settings.DEFAULT_INDEX_TABLESPACE:
- db_tablespace = settings.DEFAULT_INDEX_TABLESPACE
- elif model._meta.db_tablespace:
- db_tablespace = model._meta.db_tablespace
- if db_tablespace is not None:
- return " " + self.connection.ops.tablespace_sql(db_tablespace)
- return ""
- def _index_condition_sql(self, condition):
- if condition:
- return " WHERE " + condition
- return ""
- def _index_include_sql(self, model, columns):
- if not columns or not self.connection.features.supports_covering_indexes:
- return ""
- return Statement(
- " INCLUDE (%(columns)s)",
- columns=Columns(model._meta.db_table, columns, self.quote_name),
- )
- def _create_index_sql(
- self,
- model,
- *,
- fields=None,
- name=None,
- suffix="",
- using="",
- db_tablespace=None,
- col_suffixes=(),
- sql=None,
- opclasses=(),
- condition=None,
- include=None,
- expressions=None,
- ):
- """
- Return the SQL statement to create the index for one or several fields
- or expressions. `sql` can be specified if the syntax differs from the
- standard (GIS indexes, ...).
- """
- fields = fields or []
- expressions = expressions or []
- compiler = Query(model, alias_cols=False).get_compiler(
- connection=self.connection,
- )
- tablespace_sql = self._get_index_tablespace_sql(
- model, fields, db_tablespace=db_tablespace
- )
- columns = [field.column for field in fields]
- sql_create_index = sql or self.sql_create_index
- table = model._meta.db_table
- def create_index_name(*args, **kwargs):
- nonlocal name
- if name is None:
- name = self._create_index_name(*args, **kwargs)
- return self.quote_name(name)
- return Statement(
- sql_create_index,
- table=Table(table, self.quote_name),
- name=IndexName(table, columns, suffix, create_index_name),
- using=using,
- columns=(
- self._index_columns(table, columns, col_suffixes, opclasses)
- if columns
- else Expressions(table, expressions, compiler, self.quote_value)
- ),
- extra=tablespace_sql,
- condition=self._index_condition_sql(condition),
- include=self._index_include_sql(model, include),
- )
- def _delete_index_sql(self, model, name, sql=None):
- return Statement(
- sql or self.sql_delete_index,
- table=Table(model._meta.db_table, self.quote_name),
- name=self.quote_name(name),
- )
- def _rename_index_sql(self, model, old_name, new_name):
- return Statement(
- self.sql_rename_index,
- table=Table(model._meta.db_table, self.quote_name),
- old_name=self.quote_name(old_name),
- new_name=self.quote_name(new_name),
- )
- def _index_columns(self, table, columns, col_suffixes, opclasses):
- return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
- def _model_indexes_sql(self, model):
- """
- Return a list of all index SQL statements (field indexes,
- index_together, Meta.indexes) for the specified model.
- """
- if not model._meta.managed or model._meta.proxy or model._meta.swapped:
- return []
- output = []
- for field in model._meta.local_fields:
- output.extend(self._field_indexes_sql(model, field))
- # RemovedInDjango51Warning.
- for field_names in model._meta.index_together:
- fields = [model._meta.get_field(field) for field in field_names]
- output.append(self._create_index_sql(model, fields=fields, suffix="_idx"))
- for index in model._meta.indexes:
- if (
- not index.contains_expressions
- or self.connection.features.supports_expression_indexes
- ):
- output.append(index.create_sql(model, self))
- return output
- def _field_indexes_sql(self, model, field):
- """
- Return a list of all index SQL statements for the specified field.
- """
- output = []
- if self._field_should_be_indexed(model, field):
- output.append(self._create_index_sql(model, fields=[field]))
- return output
- def _field_should_be_altered(self, old_field, new_field, ignore=None):
- if not old_field.concrete and not new_field.concrete:
- return False
- ignore = ignore or set()
- _, old_path, old_args, old_kwargs = old_field.deconstruct()
- _, new_path, new_args, new_kwargs = new_field.deconstruct()
- # Don't alter when:
- # - changing only a field name
- # - changing an attribute that doesn't affect the schema
- # - changing an attribute in the provided set of ignored attributes
- # - adding only a db_column and the column name is not changed
- # - db_table does not change for model referenced by foreign keys
- for attr in ignore.union(old_field.non_db_attrs):
- old_kwargs.pop(attr, None)
- for attr in ignore.union(new_field.non_db_attrs):
- new_kwargs.pop(attr, None)
- if (
- not new_field.many_to_many
- and old_field.remote_field
- and new_field.remote_field
- and old_field.remote_field.model._meta.db_table
- == new_field.remote_field.model._meta.db_table
- ):
- old_kwargs.pop("to", None)
- new_kwargs.pop("to", None)
- return self.quote_name(old_field.column) != self.quote_name(
- new_field.column
- ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
- def _field_should_be_indexed(self, model, field):
- return field.db_index and not field.unique
- def _field_became_primary_key(self, old_field, new_field):
- return not old_field.primary_key and new_field.primary_key
- def _unique_should_be_added(self, old_field, new_field):
- return (
- not new_field.primary_key
- and new_field.unique
- and (not old_field.unique or old_field.primary_key)
- )
- def _rename_field_sql(self, table, old_field, new_field, new_type):
- return self.sql_rename_column % {
- "table": self.quote_name(table),
- "old_column": self.quote_name(old_field.column),
- "new_column": self.quote_name(new_field.column),
- "type": new_type,
- }
- def _create_fk_sql(self, model, field, suffix):
- table = Table(model._meta.db_table, self.quote_name)
- name = self._fk_constraint_name(model, field, suffix)
- column = Columns(model._meta.db_table, [field.column], self.quote_name)
- to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
- to_column = Columns(
- field.target_field.model._meta.db_table,
- [field.target_field.column],
- self.quote_name,
- )
- deferrable = self.connection.ops.deferrable_sql()
- return Statement(
- self.sql_create_fk,
- table=table,
- name=name,
- column=column,
- to_table=to_table,
- to_column=to_column,
- deferrable=deferrable,
- )
- def _fk_constraint_name(self, model, field, suffix):
- def create_fk_name(*args, **kwargs):
- return self.quote_name(self._create_index_name(*args, **kwargs))
- return ForeignKeyName(
- model._meta.db_table,
- [field.column],
- split_identifier(field.target_field.model._meta.db_table)[1],
- [field.target_field.column],
- suffix,
- create_fk_name,
- )
- def _delete_fk_sql(self, model, name):
- return self._delete_constraint_sql(self.sql_delete_fk, model, name)
- def _deferrable_constraint_sql(self, deferrable):
- if deferrable is None:
- return ""
- if deferrable == Deferrable.DEFERRED:
- return " DEFERRABLE INITIALLY DEFERRED"
- if deferrable == Deferrable.IMMEDIATE:
- return " DEFERRABLE INITIALLY IMMEDIATE"
- def _unique_index_nulls_distinct_sql(self, nulls_distinct):
- if nulls_distinct is False:
- return " NULLS NOT DISTINCT"
- elif nulls_distinct is True:
- return " NULLS DISTINCT"
- return ""
- def _unique_supported(
- self,
- condition=None,
- deferrable=None,
- include=None,
- expressions=None,
- nulls_distinct=None,
- ):
- return (
- (not condition or self.connection.features.supports_partial_indexes)
- and (
- not deferrable
- or self.connection.features.supports_deferrable_unique_constraints
- )
- and (not include or self.connection.features.supports_covering_indexes)
- and (
- not expressions or self.connection.features.supports_expression_indexes
- )
- and (
- nulls_distinct is None
- or self.connection.features.supports_nulls_distinct_unique_constraints
- )
- )
- def _unique_sql(
- self,
- model,
- fields,
- name,
- condition=None,
- deferrable=None,
- include=None,
- opclasses=None,
- expressions=None,
- nulls_distinct=None,
- ):
- if not self._unique_supported(
- condition=condition,
- deferrable=deferrable,
- include=include,
- expressions=expressions,
- nulls_distinct=nulls_distinct,
- ):
- return None
- if (
- condition
- or include
- or opclasses
- or expressions
- or nulls_distinct is not None
- ):
- # Databases support conditional, covering, functional unique,
- # and nulls distinct constraints via a unique index.
- sql = self._create_unique_sql(
- model,
- fields,
- name=name,
- condition=condition,
- include=include,
- opclasses=opclasses,
- expressions=expressions,
- nulls_distinct=nulls_distinct,
- )
- if sql:
- self.deferred_sql.append(sql)
- return None
- constraint = self.sql_unique_constraint % {
- "columns": ", ".join([self.quote_name(field.column) for field in fields]),
- "deferrable": self._deferrable_constraint_sql(deferrable),
- }
- return self.sql_constraint % {
- "name": self.quote_name(name),
- "constraint": constraint,
- }
- def _create_unique_sql(
- self,
- model,
- fields,
- name=None,
- condition=None,
- deferrable=None,
- include=None,
- opclasses=None,
- expressions=None,
- nulls_distinct=None,
- ):
- if not self._unique_supported(
- condition=condition,
- deferrable=deferrable,
- include=include,
- expressions=expressions,
- nulls_distinct=nulls_distinct,
- ):
- return None
- compiler = Query(model, alias_cols=False).get_compiler(
- connection=self.connection
- )
- table = model._meta.db_table
- columns = [field.column for field in fields]
- if name is None:
- name = self._unique_constraint_name(table, columns, quote=True)
- else:
- name = self.quote_name(name)
- if condition or include or opclasses or expressions:
- sql = self.sql_create_unique_index
- else:
- sql = self.sql_create_unique
- if columns:
- columns = self._index_columns(
- table, columns, col_suffixes=(), opclasses=opclasses
- )
- else:
- columns = Expressions(table, expressions, compiler, self.quote_value)
- return Statement(
- sql,
- table=Table(table, self.quote_name),
- name=name,
- columns=columns,
- condition=self._index_condition_sql(condition),
- deferrable=self._deferrable_constraint_sql(deferrable),
- include=self._index_include_sql(model, include),
- nulls_distinct=self._unique_index_nulls_distinct_sql(nulls_distinct),
- )
- def _unique_constraint_name(self, table, columns, quote=True):
- if quote:
- def create_unique_name(*args, **kwargs):
- return self.quote_name(self._create_index_name(*args, **kwargs))
- else:
- create_unique_name = self._create_index_name
- return IndexName(table, columns, "_uniq", create_unique_name)
- def _delete_unique_sql(
- self,
- model,
- name,
- condition=None,
- deferrable=None,
- include=None,
- opclasses=None,
- expressions=None,
- nulls_distinct=None,
- ):
- if not self._unique_supported(
- condition=condition,
- deferrable=deferrable,
- include=include,
- expressions=expressions,
- nulls_distinct=nulls_distinct,
- ):
- return None
- if condition or include or opclasses or expressions:
- sql = self.sql_delete_index
- else:
- sql = self.sql_delete_unique
- return self._delete_constraint_sql(sql, model, name)
- def _check_sql(self, name, check):
- return self.sql_constraint % {
- "name": self.quote_name(name),
- "constraint": self.sql_check_constraint % {"check": check},
- }
- def _create_check_sql(self, model, name, check):
- if not self.connection.features.supports_table_check_constraints:
- return None
- return Statement(
- self.sql_create_check,
- table=Table(model._meta.db_table, self.quote_name),
- name=self.quote_name(name),
- check=check,
- )
- def _delete_check_sql(self, model, name):
- if not self.connection.features.supports_table_check_constraints:
- return None
- return self._delete_constraint_sql(self.sql_delete_check, model, name)
- def _delete_constraint_sql(self, template, model, name):
- return Statement(
- template,
- table=Table(model._meta.db_table, self.quote_name),
- name=self.quote_name(name),
- )
- def _constraint_names(
- self,
- model,
- column_names=None,
- unique=None,
- primary_key=None,
- index=None,
- foreign_key=None,
- check=None,
- type_=None,
- exclude=None,
- ):
- """Return all constraint names matching the columns and conditions."""
- if column_names is not None:
- column_names = [
- self.connection.introspection.identifier_converter(
- truncate_name(name, self.connection.ops.max_name_length())
- )
- if self.connection.features.truncates_names
- else self.connection.introspection.identifier_converter(name)
- for name in column_names
- ]
- with self.connection.cursor() as cursor:
- constraints = self.connection.introspection.get_constraints(
- cursor, model._meta.db_table
- )
- result = []
- for name, infodict in constraints.items():
- if column_names is None or column_names == infodict["columns"]:
- if unique is not None and infodict["unique"] != unique:
- continue
- if primary_key is not None and infodict["primary_key"] != primary_key:
- continue
- if index is not None and infodict["index"] != index:
- continue
- if check is not None and infodict["check"] != check:
- continue
- if foreign_key is not None and not infodict["foreign_key"]:
- continue
- if type_ is not None and infodict["type"] != type_:
- continue
- if not exclude or name not in exclude:
- result.append(name)
- return result
- def _delete_primary_key(self, model, strict=False):
- constraint_names = self._constraint_names(model, primary_key=True)
- if strict and len(constraint_names) != 1:
- raise ValueError(
- "Found wrong number (%s) of PK constraints for %s"
- % (
- len(constraint_names),
- model._meta.db_table,
- )
- )
- for constraint_name in constraint_names:
- self.execute(self._delete_primary_key_sql(model, constraint_name))
- def _create_primary_key_sql(self, model, field):
- return Statement(
- self.sql_create_pk,
- table=Table(model._meta.db_table, self.quote_name),
- name=self.quote_name(
- self._create_index_name(
- model._meta.db_table, [field.column], suffix="_pk"
- )
- ),
- columns=Columns(model._meta.db_table, [field.column], self.quote_name),
- )
- def _delete_primary_key_sql(self, model, name):
- return self._delete_constraint_sql(self.sql_delete_pk, model, name)
- def _collate_sql(self, collation, old_collation=None, table_name=None):
- return "COLLATE " + self.quote_name(collation) if collation else ""
- def remove_procedure(self, procedure_name, param_types=()):
- sql = self.sql_delete_procedure % {
- "procedure": self.quote_name(procedure_name),
- "param_types": ",".join(param_types),
- }
- self.execute(sql)
|