Coverage for benefits / core / management / commands / ensure_db.py: 96%
183 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 15:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 15:39 +0000
1import os
3import psycopg
4from django.conf import settings
5from django.contrib.auth import get_user_model
6from django.core.management import call_command
7from django.core.management.base import BaseCommand, CommandError
8from django.db import DEFAULT_DB_ALIAS
9from psycopg import Connection, sql
12class Command(BaseCommand):
13 help = (
14 "Ensures databases and their users exist, runs migrations, "
15 "and creates a superuser if specified by environment variables and not already present."
16 )
18 def _admin_connection(self, database=None):
19 # Try to get HOST/PORT from the default database settings first
20 # Fallback to environment variables if not found in settings
21 default_db_settings = settings.DATABASES.get(DEFAULT_DB_ALIAS, {})
22 db_host = default_db_settings.get("HOST")
23 db_port = default_db_settings.get("PORT")
25 postgres_maintenance_db = os.environ.get("POSTGRES_DB", "postgres")
26 target_db = database or postgres_maintenance_db
27 admin_user = os.environ.get("POSTGRES_USER", "postgres")
28 admin_password = os.environ.get("POSTGRES_PASSWORD")
30 if not admin_password:
31 raise CommandError("POSTGRES_PASSWORD environment variable not set. Cannot establish admin connection.")
33 self.stdout.write(f"Attempting admin connection as user: {admin_user} to database: {target_db}")
34 try:
35 conn = psycopg.connect(
36 host=db_host,
37 port=db_port,
38 user=admin_user,
39 password=admin_password,
40 dbname=target_db,
41 autocommit=True, # Ensure commands are executed immediately
42 )
43 return conn
44 except psycopg.Error as e:
45 raise CommandError(f"Admin connection to PostgreSQL failed: {e}") from e
47 def _reset(self, admin_conn: Connection):
48 self.stdout.write(self.style.WARNING("Resetting database users and databases..."))
49 cursor = admin_conn.cursor()
50 # This is the role executing this _reset method (e.g. 'postgres')
51 admin_user = admin_conn.info.user
52 try:
53 for db_alias, db_config in settings.DATABASES.items():
54 validated_config = self._validate_config(db_alias, db_config)
55 if not validated_config:
56 continue # Skip this alias if validation failed
58 db_name, db_user, _ = validated_config # db_user is the app user, e.g. 'django'
59 try:
60 self.stdout.write(f"Preparing to reset {db_user} and {db_name}...")
62 # 1. Attempt to transfer ownership of any objects owned by db_user to admin_user.
63 # This allows admin_user (if not full superuser) to drop databases owned by db_user.
64 # If db_user does not exist, psycopg.errors.UndefinedObject will be raised.
65 # We catch this and pass, as there would be no objects to reassign.
66 try:
67 reassign_query = sql.SQL("REASSIGN OWNED BY {owned_by_role} TO {new_owner_role}").format(
68 owned_by_role=sql.Identifier(db_user), new_owner_role=sql.Identifier(admin_user)
69 )
70 cursor.execute(reassign_query)
71 except psycopg.errors.UndefinedObject:
72 # If db_user doesn't exist, there's nothing to reassign. Pass silently.
73 pass
75 # 2. Drop the database
76 # If REASSIGN OWNED was successful and db_user owned it, admin_user is now the owner.
77 drop_db_query = sql.SQL("DROP DATABASE IF EXISTS {db} WITH (FORCE)").format(db=sql.Identifier(db_name))
78 cursor.execute(drop_db_query)
79 self.stdout.write(f"Database {db_name} dropped.")
81 # 3. Revoke the app role from the admin role to break membership
82 try:
83 revoke_query = sql.SQL("REVOKE {role_to_revoke} FROM {grantee_admin}").format(
84 role_to_revoke=sql.Identifier(db_user), grantee_admin=sql.Identifier(admin_user)
85 )
86 cursor.execute(revoke_query)
87 except psycopg.errors.UndefinedObject:
88 # Expected if db_user (app_role) or admin_user doesn't exist.
89 pass
91 # 4. Drop the role
92 drop_role_query = sql.SQL("DROP USER IF EXISTS {user}").format(user=sql.Identifier(db_user))
93 cursor.execute(drop_role_query)
94 self.stdout.write(f"Role {db_user} dropped.")
95 self.stdout.write(self.style.SUCCESS(f"Database {db_name} and role {db_user} reset successfully."))
96 except psycopg.Error as e:
97 self.stderr.write(self.style.ERROR(f"Failed database reset for database {db_alias}: {e}"))
98 if hasattr(e, "diag") and e.diag and e.diag.message_detail: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 self.stderr.write(self.style.ERROR(f"DETAIL: {e.diag.message_detail}"))
100 raise
102 finally:
103 if cursor: 103 ↛ exitline 103 didn't return from function '_reset' because the condition on line 103 was always true
104 cursor.close()
106 def _validate_config(self, db_alias: str, db_config: dict) -> tuple[str, str, str] | None:
107 """
108 Validates the database configuration for PostgreSQL engine and completeness.
109 Returns (db_name, db_user, db_password) or None if validation fails.
110 """
111 if db_config.get("ENGINE") != "django.db.backends.postgresql":
112 self.stdout.write(self.style.WARNING(f"Skipping database {db_alias}, ENGINE is not PostgreSQL."))
113 return None
115 db_name = db_config.get("NAME")
116 db_user = db_config.get("USER")
117 db_password = db_config.get("PASSWORD")
119 if not all([db_name, db_user, db_password]):
120 self.stderr.write(
121 self.style.ERROR(
122 f"Skipping database {db_alias} with incomplete configuration (missing NAME, USER, or PASSWORD)."
123 )
124 )
125 return None
126 return db_name, db_user, db_password
128 def _user_exists(self, cursor: psycopg.Cursor, username: str) -> bool:
129 """Checks if a PostgreSQL user exists."""
130 cursor.execute("SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = %s", [username])
131 return cursor.fetchone() is not None
133 def _create_database_user(self, cursor: psycopg.Cursor, admin_user: str, db_alias: str, username: str, password: str):
134 """Creates a PostgreSQL user."""
135 self.stdout.write(f"User: {username} for database: {db_alias} not found. Creating...")
136 try:
137 # Use sql.Literal for the password to ensure it's correctly quoted
138 query = sql.SQL("CREATE USER {user} WITH PASSWORD {password_literal}").format(
139 user=sql.Identifier(username), password_literal=sql.Literal(password)
140 )
141 cursor.execute(query)
142 # grant the specific username role to the admin_user
143 # to allow the admin_user to create database(s) on behalf of the db_user
144 query = sql.SQL("GRANT {user} TO {admin}").format(user=sql.Identifier(username), admin=sql.Identifier(admin_user))
145 cursor.execute(query)
146 self.stdout.write(self.style.SUCCESS("User created successfully"))
147 except psycopg.Error as e:
148 self.stderr.write(self.style.ERROR(f"Failed to create user {username} for database {db_alias}: {e}"))
149 raise
151 def _database_exists(self, cursor: psycopg.Cursor, db_name: str) -> bool:
152 """Checks if a PostgreSQL database exists."""
153 cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", [db_name])
154 return cursor.fetchone() is not None
156 def _create_database(self, cursor: psycopg.Cursor, db_alias: str, db_name: str, owner_username: str):
157 """Creates a PostgreSQL database with the specified owner."""
158 self.stdout.write(f"Database {db_name} not found. Creating...")
159 # Ensure owner exists before attempting to create the database
160 if not self._user_exists(cursor, owner_username):
161 self.stderr.write(
162 self.style.ERROR(
163 f"Cannot create database: {db_name} because user: {owner_username} does not exist or was not created"
164 )
165 )
166 raise CommandError(f"Owner user {owner_username} for database {db_name} not found during database creation.")
167 try:
168 # Use sql.Literal for the password to ensure it's correctly quoted
169 query = sql.SQL("CREATE DATABASE {db} WITH OWNER {owner} ENCODING {encoding}").format(
170 db=sql.Identifier(db_name),
171 owner=sql.Identifier(owner_username),
172 encoding=sql.Literal("UTF-8"),
173 )
174 cursor.execute(query)
175 self.stdout.write(self.style.SUCCESS("Database created successfully"))
176 except psycopg.Error as e:
177 self.stderr.write(self.style.ERROR(f"Failed to create database {db_name} for alias {db_alias}: {e}"))
178 raise
180 def _ensure_schema_permissions(self, db_name: str, db_user_to_grant: str):
181 """
182 Grants USAGE and CREATE permissions on the public schema of a newly created database to the specified user.
183 Connects to the target database using admin credentials. Failure is considered critical as it's for a new database.
184 """
185 self.stdout.write(f"Ensuring schema permissions for user: {db_user_to_grant} in database: {db_name}")
186 admin_conn = None
187 try:
188 admin_conn = self._admin_connection(db_name)
189 with admin_conn.cursor() as cursor:
190 grant_query = sql.SQL("GRANT USAGE, CREATE ON SCHEMA public TO {user}").format(
191 user=sql.Identifier(db_user_to_grant)
192 )
193 cursor.execute(grant_query)
194 self.stdout.write("Schema permissions confirmed")
195 except psycopg.Error as e:
196 self.stderr.write(
197 self.style.ERROR(
198 f"Failed to grant schema permissions for user: {db_user_to_grant} in database: {db_name} : {e}"
199 )
200 )
201 raise CommandError(f"Failed to set schema permissions for newly created database: {db_name}.") from e
202 finally:
203 if admin_conn:
204 admin_conn.close()
206 def _ensure_users_and_db(self, admin_conn: Connection):
207 self.stdout.write(self.style.MIGRATE_HEADING("Checking and creating database users and databases..."))
208 cursor = admin_conn.cursor()
209 try:
210 for db_alias, db_config in settings.DATABASES.items():
211 validated_config = self._validate_config(db_alias, db_config)
212 if not validated_config:
213 continue # Skip this alias if validation failed
215 db_name, db_user, db_password = validated_config
217 # Ensure DB User Exists
218 if not self._user_exists(cursor, db_user):
219 self._create_database_user(cursor, admin_conn.info.user, db_alias, db_user, db_password)
220 else:
221 self.stdout.write(f"User found: {db_user}")
223 # Ensure Database Exists
224 if not self._database_exists(cursor, db_name):
225 self._create_database(cursor, db_alias, db_name, db_user) # db_user is the owner
226 self._ensure_schema_permissions(db_name, db_user)
227 else:
228 self.stdout.write(f"Database found: {db_name}")
229 finally:
230 if cursor: 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true
231 cursor.close()
232 self.stdout.write("Database and user checks complete.")
234 def _run_migrations(self):
235 self.stdout.write(self.style.MIGRATE_HEADING("Running migrations..."))
236 for db_alias in settings.DATABASES.keys():
237 if settings.DATABASES[db_alias].get("ENGINE") != "django.db.backends.postgresql":
238 self.stdout.write(
239 self.style.WARNING(f"Skipping migrations for database: {db_alias}. ENGINE is not PostgreSQL.")
240 )
241 continue
242 try:
243 self.stdout.write(f"For database: {db_alias}")
244 call_command("migrate", database=db_alias, interactive=False)
245 self.stdout.write(self.style.SUCCESS(f"Migrations complete for database: {db_alias}"))
246 except Exception as e: # Catch more general errors from call_command
247 self.stderr.write(self.style.ERROR(f"Error running migrations for database: {db_alias}: {str(e)}"))
248 # Re-raise as CommandError to potentially stop the whole process if a migration fails
249 raise CommandError(f"Migration failed for {db_alias}.") from e
251 # initialize the cache table according to Django's requirements
252 # https://docs.djangoproject.com/en/5.2/topics/cache/#creating-the-cache-table
253 self.stdout.write("Configuring db-backed cache...")
254 call_command("createcachetable")
256 self.stdout.write("All migrations processed.")
258 def _ensure_superuser(self):
259 self.stdout.write(self.style.MIGRATE_HEADING("Checking for superuser..."))
260 DJANGO_SUPERUSER_USERNAME = os.environ.get("DJANGO_SUPERUSER_USERNAME")
261 DJANGO_SUPERUSER_EMAIL = os.environ.get("DJANGO_SUPERUSER_EMAIL")
262 DJANGO_SUPERUSER_PASSWORD = os.environ.get("DJANGO_SUPERUSER_PASSWORD")
264 if DJANGO_SUPERUSER_USERNAME:
265 User = get_user_model()
266 # Check against the default database
267 if User.objects.using(DEFAULT_DB_ALIAS).filter(username=DJANGO_SUPERUSER_USERNAME).exists():
268 self.stdout.write(f"Superuser: {DJANGO_SUPERUSER_USERNAME} already exists in database: {DEFAULT_DB_ALIAS}")
269 else:
270 if DJANGO_SUPERUSER_EMAIL and DJANGO_SUPERUSER_PASSWORD:
271 self.stdout.write(f"Superuser: {DJANGO_SUPERUSER_USERNAME} not found. Creating...")
272 # Note: createsuperuser --no-input relies on DJANGO_SUPERUSER_PASSWORD env var implicitly for password.
273 # Explicitly passing username and email makes it clearer.
274 call_command(
275 "createsuperuser",
276 interactive=False,
277 username=DJANGO_SUPERUSER_USERNAME,
278 email=DJANGO_SUPERUSER_EMAIL,
279 )
280 else:
281 self.stdout.write(
282 self.style.WARNING(
283 f"Cannot create superuser: {DJANGO_SUPERUSER_USERNAME}. "
284 "DJANGO_SUPERUSER_EMAIL or DJANGO_SUPERUSER_PASSWORD environment variables are not set."
285 )
286 )
287 else:
288 self.stdout.write("DJANGO_SUPERUSER_USERNAME environment variable not set. Skipping superuser creation.")
290 def add_arguments(self, parser):
291 parser.add_argument("--reset", action="store_true", help="Completely reset the database(s) (DESTRUCTIVE).")
293 def handle(self, *args, **options):
294 # database and user setup (requires admin connection)
295 admin_conn = None
296 reset = options.get("reset", False)
297 try:
298 admin_conn = self._admin_connection()
299 if reset:
300 self._reset(admin_conn)
301 self._ensure_users_and_db(admin_conn)
302 except Exception as e:
303 self.stderr.write(self.style.ERROR(str(e)))
304 return
305 finally:
306 if admin_conn and not admin_conn.closed: 306 ↛ 310line 306 didn't jump to line 310 because the condition on line 306 was always true
307 admin_conn.close()
309 # migrations
310 self._run_migrations()
312 # superuser
313 self._ensure_superuser()
315 self.stdout.write(self.style.SUCCESS("ensure_db command finished successfully."))