creation.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import sys
  2. from django.core.exceptions import ImproperlyConfigured
  3. from django.db.backends.base.creation import BaseDatabaseCreation
  4. from django.db.backends.postgresql.psycopg_any import errors
  5. from django.db.backends.utils import strip_quotes
  6. class DatabaseCreation(BaseDatabaseCreation):
  7. def _quote_name(self, name):
  8. return self.connection.ops.quote_name(name)
  9. def _get_database_create_suffix(self, encoding=None, template=None):
  10. suffix = ""
  11. if encoding:
  12. suffix += " ENCODING '{}'".format(encoding)
  13. if template:
  14. suffix += " TEMPLATE {}".format(self._quote_name(template))
  15. return suffix and "WITH" + suffix
  16. def sql_table_creation_suffix(self):
  17. test_settings = self.connection.settings_dict["TEST"]
  18. if test_settings.get("COLLATION") is not None:
  19. raise ImproperlyConfigured(
  20. "PostgreSQL does not support collation setting at database "
  21. "creation time."
  22. )
  23. return self._get_database_create_suffix(
  24. encoding=test_settings["CHARSET"],
  25. template=test_settings.get("TEMPLATE"),
  26. )
  27. def _database_exists(self, cursor, database_name):
  28. cursor.execute(
  29. "SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s",
  30. [strip_quotes(database_name)],
  31. )
  32. return cursor.fetchone() is not None
  33. def _execute_create_test_db(self, cursor, parameters, keepdb=False):
  34. try:
  35. if keepdb and self._database_exists(cursor, parameters["dbname"]):
  36. # If the database should be kept and it already exists, don't
  37. # try to create a new one.
  38. return
  39. super()._execute_create_test_db(cursor, parameters, keepdb)
  40. except Exception as e:
  41. if not isinstance(e.__cause__, errors.DuplicateDatabase):
  42. # All errors except "database already exists" cancel tests.
  43. self.log("Got an error creating the test database: %s" % e)
  44. sys.exit(2)
  45. elif not keepdb:
  46. # If the database should be kept, ignore "database already
  47. # exists".
  48. raise
  49. def _clone_test_db(self, suffix, verbosity, keepdb=False):
  50. # CREATE DATABASE ... WITH TEMPLATE ... requires closing connections
  51. # to the template database.
  52. self.connection.close()
  53. source_database_name = self.connection.settings_dict["NAME"]
  54. target_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
  55. test_db_params = {
  56. "dbname": self._quote_name(target_database_name),
  57. "suffix": self._get_database_create_suffix(template=source_database_name),
  58. }
  59. with self._nodb_cursor() as cursor:
  60. try:
  61. self._execute_create_test_db(cursor, test_db_params, keepdb)
  62. except Exception:
  63. try:
  64. if verbosity >= 1:
  65. self.log(
  66. "Destroying old test database for alias %s..."
  67. % (
  68. self._get_database_display_str(
  69. verbosity, target_database_name
  70. ),
  71. )
  72. )
  73. cursor.execute("DROP DATABASE %(dbname)s" % test_db_params)
  74. self._execute_create_test_db(cursor, test_db_params, keepdb)
  75. except Exception as e:
  76. self.log("Got an error cloning the test database: %s" % e)
  77. sys.exit(2)