introspection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. from collections import namedtuple
  2. import sqlparse
  3. from MySQLdb.constants import FIELD_TYPE
  4. from django.db.backends.base.introspection import BaseDatabaseIntrospection
  5. from django.db.backends.base.introspection import FieldInfo as BaseFieldInfo
  6. from django.db.backends.base.introspection import TableInfo as BaseTableInfo
  7. from django.db.models import Index
  8. from django.utils.datastructures import OrderedSet
  9. FieldInfo = namedtuple(
  10. "FieldInfo",
  11. BaseFieldInfo._fields
  12. + ("extra", "is_unsigned", "has_json_constraint", "comment", "data_type"),
  13. )
  14. InfoLine = namedtuple(
  15. "InfoLine",
  16. "col_name data_type max_len num_prec num_scale extra column_default "
  17. "collation is_unsigned comment",
  18. )
  19. TableInfo = namedtuple("TableInfo", BaseTableInfo._fields + ("comment",))
  20. class DatabaseIntrospection(BaseDatabaseIntrospection):
  21. data_types_reverse = {
  22. FIELD_TYPE.BLOB: "TextField",
  23. FIELD_TYPE.CHAR: "CharField",
  24. FIELD_TYPE.DECIMAL: "DecimalField",
  25. FIELD_TYPE.NEWDECIMAL: "DecimalField",
  26. FIELD_TYPE.DATE: "DateField",
  27. FIELD_TYPE.DATETIME: "DateTimeField",
  28. FIELD_TYPE.DOUBLE: "FloatField",
  29. FIELD_TYPE.FLOAT: "FloatField",
  30. FIELD_TYPE.INT24: "IntegerField",
  31. FIELD_TYPE.JSON: "JSONField",
  32. FIELD_TYPE.LONG: "IntegerField",
  33. FIELD_TYPE.LONGLONG: "BigIntegerField",
  34. FIELD_TYPE.SHORT: "SmallIntegerField",
  35. FIELD_TYPE.STRING: "CharField",
  36. FIELD_TYPE.TIME: "TimeField",
  37. FIELD_TYPE.TIMESTAMP: "DateTimeField",
  38. FIELD_TYPE.TINY: "IntegerField",
  39. FIELD_TYPE.TINY_BLOB: "TextField",
  40. FIELD_TYPE.MEDIUM_BLOB: "TextField",
  41. FIELD_TYPE.LONG_BLOB: "TextField",
  42. FIELD_TYPE.VAR_STRING: "CharField",
  43. }
  44. def get_field_type(self, data_type, description):
  45. field_type = super().get_field_type(data_type, description)
  46. if "auto_increment" in description.extra:
  47. if field_type == "IntegerField":
  48. return "AutoField"
  49. elif field_type == "BigIntegerField":
  50. return "BigAutoField"
  51. elif field_type == "SmallIntegerField":
  52. return "SmallAutoField"
  53. if description.is_unsigned:
  54. if field_type == "BigIntegerField":
  55. return "PositiveBigIntegerField"
  56. elif field_type == "IntegerField":
  57. return "PositiveIntegerField"
  58. elif field_type == "SmallIntegerField":
  59. return "PositiveSmallIntegerField"
  60. if description.data_type.upper() == "UUID":
  61. return "UUIDField"
  62. # JSON data type is an alias for LONGTEXT in MariaDB, use check
  63. # constraints clauses to introspect JSONField.
  64. if description.has_json_constraint:
  65. return "JSONField"
  66. return field_type
  67. def get_table_list(self, cursor):
  68. """Return a list of table and view names in the current database."""
  69. cursor.execute(
  70. """
  71. SELECT
  72. table_name,
  73. table_type,
  74. table_comment
  75. FROM information_schema.tables
  76. WHERE table_schema = DATABASE()
  77. """
  78. )
  79. return [
  80. TableInfo(row[0], {"BASE TABLE": "t", "VIEW": "v"}.get(row[1]), row[2])
  81. for row in cursor.fetchall()
  82. ]
  83. def get_table_description(self, cursor, table_name):
  84. """
  85. Return a description of the table with the DB-API cursor.description
  86. interface."
  87. """
  88. json_constraints = {}
  89. if (
  90. self.connection.mysql_is_mariadb
  91. and self.connection.features.can_introspect_json_field
  92. ):
  93. # JSON data type is an alias for LONGTEXT in MariaDB, select
  94. # JSON_VALID() constraints to introspect JSONField.
  95. cursor.execute(
  96. """
  97. SELECT c.constraint_name AS column_name
  98. FROM information_schema.check_constraints AS c
  99. WHERE
  100. c.table_name = %s AND
  101. LOWER(c.check_clause) =
  102. 'json_valid(`' + LOWER(c.constraint_name) + '`)' AND
  103. c.constraint_schema = DATABASE()
  104. """,
  105. [table_name],
  106. )
  107. json_constraints = {row[0] for row in cursor.fetchall()}
  108. # A default collation for the given table.
  109. cursor.execute(
  110. """
  111. SELECT table_collation
  112. FROM information_schema.tables
  113. WHERE table_schema = DATABASE()
  114. AND table_name = %s
  115. """,
  116. [table_name],
  117. )
  118. row = cursor.fetchone()
  119. default_column_collation = row[0] if row else ""
  120. # information_schema database gives more accurate results for some figures:
  121. # - varchar length returned by cursor.description is an internal length,
  122. # not visible length (#5725)
  123. # - precision and scale (for decimal fields) (#5014)
  124. # - auto_increment is not available in cursor.description
  125. cursor.execute(
  126. """
  127. SELECT
  128. column_name, data_type, character_maximum_length,
  129. numeric_precision, numeric_scale, extra, column_default,
  130. CASE
  131. WHEN collation_name = %s THEN NULL
  132. ELSE collation_name
  133. END AS collation_name,
  134. CASE
  135. WHEN column_type LIKE '%% unsigned' THEN 1
  136. ELSE 0
  137. END AS is_unsigned,
  138. column_comment
  139. FROM information_schema.columns
  140. WHERE table_name = %s AND table_schema = DATABASE()
  141. """,
  142. [default_column_collation, table_name],
  143. )
  144. field_info = {line[0]: InfoLine(*line) for line in cursor.fetchall()}
  145. cursor.execute(
  146. "SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name)
  147. )
  148. def to_int(i):
  149. return int(i) if i is not None else i
  150. fields = []
  151. for line in cursor.description:
  152. info = field_info[line[0]]
  153. fields.append(
  154. FieldInfo(
  155. *line[:2],
  156. to_int(info.max_len) or line[2],
  157. to_int(info.max_len) or line[3],
  158. to_int(info.num_prec) or line[4],
  159. to_int(info.num_scale) or line[5],
  160. line[6],
  161. info.column_default,
  162. info.collation,
  163. info.extra,
  164. info.is_unsigned,
  165. line[0] in json_constraints,
  166. info.comment,
  167. info.data_type,
  168. )
  169. )
  170. return fields
  171. def get_sequences(self, cursor, table_name, table_fields=()):
  172. for field_info in self.get_table_description(cursor, table_name):
  173. if "auto_increment" in field_info.extra:
  174. # MySQL allows only one auto-increment column per table.
  175. return [{"table": table_name, "column": field_info.name}]
  176. return []
  177. def get_relations(self, cursor, table_name):
  178. """
  179. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  180. representing all foreign keys in the given table.
  181. """
  182. cursor.execute(
  183. """
  184. SELECT column_name, referenced_column_name, referenced_table_name
  185. FROM information_schema.key_column_usage
  186. WHERE table_name = %s
  187. AND table_schema = DATABASE()
  188. AND referenced_table_schema = DATABASE()
  189. AND referenced_table_name IS NOT NULL
  190. AND referenced_column_name IS NOT NULL
  191. """,
  192. [table_name],
  193. )
  194. return {
  195. field_name: (other_field, other_table)
  196. for field_name, other_field, other_table in cursor.fetchall()
  197. }
  198. def get_storage_engine(self, cursor, table_name):
  199. """
  200. Retrieve the storage engine for a given table. Return the default
  201. storage engine if the table doesn't exist.
  202. """
  203. cursor.execute(
  204. """
  205. SELECT engine
  206. FROM information_schema.tables
  207. WHERE
  208. table_name = %s AND
  209. table_schema = DATABASE()
  210. """,
  211. [table_name],
  212. )
  213. result = cursor.fetchone()
  214. if not result:
  215. return self.connection.features._mysql_storage_engine
  216. return result[0]
  217. def _parse_constraint_columns(self, check_clause, columns):
  218. check_columns = OrderedSet()
  219. statement = sqlparse.parse(check_clause)[0]
  220. tokens = (token for token in statement.flatten() if not token.is_whitespace)
  221. for token in tokens:
  222. if (
  223. token.ttype == sqlparse.tokens.Name
  224. and self.connection.ops.quote_name(token.value) == token.value
  225. and token.value[1:-1] in columns
  226. ):
  227. check_columns.add(token.value[1:-1])
  228. return check_columns
  229. def get_constraints(self, cursor, table_name):
  230. """
  231. Retrieve any constraints or keys (unique, pk, fk, check, index) across
  232. one or more columns.
  233. """
  234. constraints = {}
  235. # Get the actual constraint names and columns
  236. name_query = """
  237. SELECT kc.`constraint_name`, kc.`column_name`,
  238. kc.`referenced_table_name`, kc.`referenced_column_name`,
  239. c.`constraint_type`
  240. FROM
  241. information_schema.key_column_usage AS kc,
  242. information_schema.table_constraints AS c
  243. WHERE
  244. kc.table_schema = DATABASE() AND
  245. (
  246. kc.referenced_table_schema = DATABASE() OR
  247. kc.referenced_table_schema IS NULL
  248. ) AND
  249. c.table_schema = kc.table_schema AND
  250. c.constraint_name = kc.constraint_name AND
  251. c.constraint_type != 'CHECK' AND
  252. kc.table_name = %s
  253. ORDER BY kc.`ordinal_position`
  254. """
  255. cursor.execute(name_query, [table_name])
  256. for constraint, column, ref_table, ref_column, kind in cursor.fetchall():
  257. if constraint not in constraints:
  258. constraints[constraint] = {
  259. "columns": OrderedSet(),
  260. "primary_key": kind == "PRIMARY KEY",
  261. "unique": kind in {"PRIMARY KEY", "UNIQUE"},
  262. "index": False,
  263. "check": False,
  264. "foreign_key": (ref_table, ref_column) if ref_column else None,
  265. }
  266. if self.connection.features.supports_index_column_ordering:
  267. constraints[constraint]["orders"] = []
  268. constraints[constraint]["columns"].add(column)
  269. # Add check constraints.
  270. if self.connection.features.can_introspect_check_constraints:
  271. unnamed_constraints_index = 0
  272. columns = {
  273. info.name for info in self.get_table_description(cursor, table_name)
  274. }
  275. if self.connection.mysql_is_mariadb:
  276. type_query = """
  277. SELECT c.constraint_name, c.check_clause
  278. FROM information_schema.check_constraints AS c
  279. WHERE
  280. c.constraint_schema = DATABASE() AND
  281. c.table_name = %s
  282. """
  283. else:
  284. type_query = """
  285. SELECT cc.constraint_name, cc.check_clause
  286. FROM
  287. information_schema.check_constraints AS cc,
  288. information_schema.table_constraints AS tc
  289. WHERE
  290. cc.constraint_schema = DATABASE() AND
  291. tc.table_schema = cc.constraint_schema AND
  292. cc.constraint_name = tc.constraint_name AND
  293. tc.constraint_type = 'CHECK' AND
  294. tc.table_name = %s
  295. """
  296. cursor.execute(type_query, [table_name])
  297. for constraint, check_clause in cursor.fetchall():
  298. constraint_columns = self._parse_constraint_columns(
  299. check_clause, columns
  300. )
  301. # Ensure uniqueness of unnamed constraints. Unnamed unique
  302. # and check columns constraints have the same name as
  303. # a column.
  304. if set(constraint_columns) == {constraint}:
  305. unnamed_constraints_index += 1
  306. constraint = "__unnamed_constraint_%s__" % unnamed_constraints_index
  307. constraints[constraint] = {
  308. "columns": constraint_columns,
  309. "primary_key": False,
  310. "unique": False,
  311. "index": False,
  312. "check": True,
  313. "foreign_key": None,
  314. }
  315. # Now add in the indexes
  316. cursor.execute(
  317. "SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name)
  318. )
  319. for table, non_unique, index, colseq, column, order, type_ in [
  320. x[:6] + (x[10],) for x in cursor.fetchall()
  321. ]:
  322. if index not in constraints:
  323. constraints[index] = {
  324. "columns": OrderedSet(),
  325. "primary_key": False,
  326. "unique": not non_unique,
  327. "check": False,
  328. "foreign_key": None,
  329. }
  330. if self.connection.features.supports_index_column_ordering:
  331. constraints[index]["orders"] = []
  332. constraints[index]["index"] = True
  333. constraints[index]["type"] = (
  334. Index.suffix if type_ == "BTREE" else type_.lower()
  335. )
  336. constraints[index]["columns"].add(column)
  337. if self.connection.features.supports_index_column_ordering:
  338. constraints[index]["orders"].append("DESC" if order == "D" else "ASC")
  339. # Convert the sorted sets to lists
  340. for constraint in constraints.values():
  341. constraint["columns"] = list(constraint["columns"])
  342. return constraints