introspection.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from collections import namedtuple
  2. # Structure returned by DatabaseIntrospection.get_table_list()
  3. TableInfo = namedtuple("TableInfo", ["name", "type"])
  4. # Structure returned by the DB-API cursor.description interface (PEP 249)
  5. FieldInfo = namedtuple(
  6. "FieldInfo",
  7. "name type_code display_size internal_size precision scale null_ok "
  8. "default collation",
  9. )
  10. class BaseDatabaseIntrospection:
  11. """Encapsulate backend-specific introspection utilities."""
  12. data_types_reverse = {}
  13. def __init__(self, connection):
  14. self.connection = connection
  15. def get_field_type(self, data_type, description):
  16. """
  17. Hook for a database backend to use the cursor description to
  18. match a Django field type to a database column.
  19. For Oracle, the column data_type on its own is insufficient to
  20. distinguish between a FloatField and IntegerField, for example.
  21. """
  22. return self.data_types_reverse[data_type]
  23. def identifier_converter(self, name):
  24. """
  25. Apply a conversion to the identifier for the purposes of comparison.
  26. The default identifier converter is for case sensitive comparison.
  27. """
  28. return name
  29. def table_names(self, cursor=None, include_views=False):
  30. """
  31. Return a list of names of all tables that exist in the database.
  32. Sort the returned table list by Python's default sorting. Do NOT use
  33. the database's ORDER BY here to avoid subtle differences in sorting
  34. order between databases.
  35. """
  36. def get_names(cursor):
  37. return sorted(
  38. ti.name
  39. for ti in self.get_table_list(cursor)
  40. if include_views or ti.type == "t"
  41. )
  42. if cursor is None:
  43. with self.connection.cursor() as cursor:
  44. return get_names(cursor)
  45. return get_names(cursor)
  46. def get_table_list(self, cursor):
  47. """
  48. Return an unsorted list of TableInfo named tuples of all tables and
  49. views that exist in the database.
  50. """
  51. raise NotImplementedError(
  52. "subclasses of BaseDatabaseIntrospection may require a get_table_list() "
  53. "method"
  54. )
  55. def get_table_description(self, cursor, table_name):
  56. """
  57. Return a description of the table with the DB-API cursor.description
  58. interface.
  59. """
  60. raise NotImplementedError(
  61. "subclasses of BaseDatabaseIntrospection may require a "
  62. "get_table_description() method."
  63. )
  64. def get_migratable_models(self):
  65. from django.apps import apps
  66. from django.db import router
  67. return (
  68. model
  69. for app_config in apps.get_app_configs()
  70. for model in router.get_migratable_models(app_config, self.connection.alias)
  71. if model._meta.can_migrate(self.connection)
  72. )
  73. def django_table_names(self, only_existing=False, include_views=True):
  74. """
  75. Return a list of all table names that have associated Django models and
  76. are in INSTALLED_APPS.
  77. If only_existing is True, include only the tables in the database.
  78. """
  79. tables = set()
  80. for model in self.get_migratable_models():
  81. if not model._meta.managed:
  82. continue
  83. tables.add(model._meta.db_table)
  84. tables.update(
  85. f.m2m_db_table()
  86. for f in model._meta.local_many_to_many
  87. if f.remote_field.through._meta.managed
  88. )
  89. tables = list(tables)
  90. if only_existing:
  91. existing_tables = set(self.table_names(include_views=include_views))
  92. tables = [
  93. t for t in tables if self.identifier_converter(t) in existing_tables
  94. ]
  95. return tables
  96. def installed_models(self, tables):
  97. """
  98. Return a set of all models represented by the provided list of table
  99. names.
  100. """
  101. tables = set(map(self.identifier_converter, tables))
  102. return {
  103. m
  104. for m in self.get_migratable_models()
  105. if self.identifier_converter(m._meta.db_table) in tables
  106. }
  107. def sequence_list(self):
  108. """
  109. Return a list of information about all DB sequences for all models in
  110. all apps.
  111. """
  112. sequence_list = []
  113. with self.connection.cursor() as cursor:
  114. for model in self.get_migratable_models():
  115. if not model._meta.managed:
  116. continue
  117. if model._meta.swapped:
  118. continue
  119. sequence_list.extend(
  120. self.get_sequences(
  121. cursor, model._meta.db_table, model._meta.local_fields
  122. )
  123. )
  124. for f in model._meta.local_many_to_many:
  125. # If this is an m2m using an intermediate table,
  126. # we don't need to reset the sequence.
  127. if f.remote_field.through._meta.auto_created:
  128. sequence = self.get_sequences(cursor, f.m2m_db_table())
  129. sequence_list.extend(
  130. sequence or [{"table": f.m2m_db_table(), "column": None}]
  131. )
  132. return sequence_list
  133. def get_sequences(self, cursor, table_name, table_fields=()):
  134. """
  135. Return a list of introspected sequences for table_name. Each sequence
  136. is a dict: {'table': <table_name>, 'column': <column_name>}. An optional
  137. 'name' key can be added if the backend supports named sequences.
  138. """
  139. raise NotImplementedError(
  140. "subclasses of BaseDatabaseIntrospection may require a get_sequences() "
  141. "method"
  142. )
  143. def get_relations(self, cursor, table_name):
  144. """
  145. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  146. representing all foreign keys in the given table.
  147. """
  148. raise NotImplementedError(
  149. "subclasses of BaseDatabaseIntrospection may require a "
  150. "get_relations() method."
  151. )
  152. def get_primary_key_column(self, cursor, table_name):
  153. """
  154. Return the name of the primary key column for the given table.
  155. """
  156. columns = self.get_primary_key_columns(cursor, table_name)
  157. return columns[0] if columns else None
  158. def get_primary_key_columns(self, cursor, table_name):
  159. """Return a list of primary key columns for the given table."""
  160. for constraint in self.get_constraints(cursor, table_name).values():
  161. if constraint["primary_key"]:
  162. return constraint["columns"]
  163. return None
  164. def get_constraints(self, cursor, table_name):
  165. """
  166. Retrieve any constraints or keys (unique, pk, fk, check, index)
  167. across one or more columns.
  168. Return a dict mapping constraint names to their attributes,
  169. where attributes is a dict with keys:
  170. * columns: List of columns this covers
  171. * primary_key: True if primary key, False otherwise
  172. * unique: True if this is a unique constraint, False otherwise
  173. * foreign_key: (table, column) of target, or None
  174. * check: True if check constraint, False otherwise
  175. * index: True if index, False otherwise.
  176. * orders: The order (ASC/DESC) defined for the columns of indexes
  177. * type: The type of the index (btree, hash, etc.)
  178. Some backends may return special constraint names that don't exist
  179. if they don't name constraints of a certain type (e.g. SQLite)
  180. """
  181. raise NotImplementedError(
  182. "subclasses of BaseDatabaseIntrospection may require a get_constraints() "
  183. "method"
  184. )