Coverage for benefits / core / management / commands / ensure_db.py: 96%

183 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 15:39 +0000

1import os 

2 

3import psycopg 

4from django.conf import settings 

5from django.contrib.auth import get_user_model 

6from django.core.management import call_command 

7from django.core.management.base import BaseCommand, CommandError 

8from django.db import DEFAULT_DB_ALIAS 

9from psycopg import Connection, sql 

10 

11 

12class Command(BaseCommand): 

13 help = ( 

14 "Ensures databases and their users exist, runs migrations, " 

15 "and creates a superuser if specified by environment variables and not already present." 

16 ) 

17 

18 def _admin_connection(self, database=None): 

19 # Try to get HOST/PORT from the default database settings first 

20 # Fallback to environment variables if not found in settings 

21 default_db_settings = settings.DATABASES.get(DEFAULT_DB_ALIAS, {}) 

22 db_host = default_db_settings.get("HOST") 

23 db_port = default_db_settings.get("PORT") 

24 

25 postgres_maintenance_db = os.environ.get("POSTGRES_DB", "postgres") 

26 target_db = database or postgres_maintenance_db 

27 admin_user = os.environ.get("POSTGRES_USER", "postgres") 

28 admin_password = os.environ.get("POSTGRES_PASSWORD") 

29 

30 if not admin_password: 

31 raise CommandError("POSTGRES_PASSWORD environment variable not set. Cannot establish admin connection.") 

32 

33 self.stdout.write(f"Attempting admin connection as user: {admin_user} to database: {target_db}") 

34 try: 

35 conn = psycopg.connect( 

36 host=db_host, 

37 port=db_port, 

38 user=admin_user, 

39 password=admin_password, 

40 dbname=target_db, 

41 autocommit=True, # Ensure commands are executed immediately 

42 ) 

43 return conn 

44 except psycopg.Error as e: 

45 raise CommandError(f"Admin connection to PostgreSQL failed: {e}") from e 

46 

47 def _reset(self, admin_conn: Connection): 

48 self.stdout.write(self.style.WARNING("Resetting database users and databases...")) 

49 cursor = admin_conn.cursor() 

50 # This is the role executing this _reset method (e.g. 'postgres') 

51 admin_user = admin_conn.info.user 

52 try: 

53 for db_alias, db_config in settings.DATABASES.items(): 

54 validated_config = self._validate_config(db_alias, db_config) 

55 if not validated_config: 

56 continue # Skip this alias if validation failed 

57 

58 db_name, db_user, _ = validated_config # db_user is the app user, e.g. 'django' 

59 try: 

60 self.stdout.write(f"Preparing to reset {db_user} and {db_name}...") 

61 

62 # 1. Attempt to transfer ownership of any objects owned by db_user to admin_user. 

63 # This allows admin_user (if not full superuser) to drop databases owned by db_user. 

64 # If db_user does not exist, psycopg.errors.UndefinedObject will be raised. 

65 # We catch this and pass, as there would be no objects to reassign. 

66 try: 

67 reassign_query = sql.SQL("REASSIGN OWNED BY {owned_by_role} TO {new_owner_role}").format( 

68 owned_by_role=sql.Identifier(db_user), new_owner_role=sql.Identifier(admin_user) 

69 ) 

70 cursor.execute(reassign_query) 

71 except psycopg.errors.UndefinedObject: 

72 # If db_user doesn't exist, there's nothing to reassign. Pass silently. 

73 pass 

74 

75 # 2. Drop the database 

76 # If REASSIGN OWNED was successful and db_user owned it, admin_user is now the owner. 

77 drop_db_query = sql.SQL("DROP DATABASE IF EXISTS {db} WITH (FORCE)").format(db=sql.Identifier(db_name)) 

78 cursor.execute(drop_db_query) 

79 self.stdout.write(f"Database {db_name} dropped.") 

80 

81 # 3. Revoke the app role from the admin role to break membership 

82 try: 

83 revoke_query = sql.SQL("REVOKE {role_to_revoke} FROM {grantee_admin}").format( 

84 role_to_revoke=sql.Identifier(db_user), grantee_admin=sql.Identifier(admin_user) 

85 ) 

86 cursor.execute(revoke_query) 

87 except psycopg.errors.UndefinedObject: 

88 # Expected if db_user (app_role) or admin_user doesn't exist. 

89 pass 

90 

91 # 4. Drop the role 

92 drop_role_query = sql.SQL("DROP USER IF EXISTS {user}").format(user=sql.Identifier(db_user)) 

93 cursor.execute(drop_role_query) 

94 self.stdout.write(f"Role {db_user} dropped.") 

95 self.stdout.write(self.style.SUCCESS(f"Database {db_name} and role {db_user} reset successfully.")) 

96 except psycopg.Error as e: 

97 self.stderr.write(self.style.ERROR(f"Failed database reset for database {db_alias}: {e}")) 

98 if hasattr(e, "diag") and e.diag and e.diag.message_detail: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 self.stderr.write(self.style.ERROR(f"DETAIL: {e.diag.message_detail}")) 

100 raise 

101 

102 finally: 

103 if cursor: 103 ↛ exitline 103 didn't return from function '_reset' because the condition on line 103 was always true

104 cursor.close() 

105 

106 def _validate_config(self, db_alias: str, db_config: dict) -> tuple[str, str, str] | None: 

107 """ 

108 Validates the database configuration for PostgreSQL engine and completeness. 

109 Returns (db_name, db_user, db_password) or None if validation fails. 

110 """ 

111 if db_config.get("ENGINE") != "django.db.backends.postgresql": 

112 self.stdout.write(self.style.WARNING(f"Skipping database {db_alias}, ENGINE is not PostgreSQL.")) 

113 return None 

114 

115 db_name = db_config.get("NAME") 

116 db_user = db_config.get("USER") 

117 db_password = db_config.get("PASSWORD") 

118 

119 if not all([db_name, db_user, db_password]): 

120 self.stderr.write( 

121 self.style.ERROR( 

122 f"Skipping database {db_alias} with incomplete configuration (missing NAME, USER, or PASSWORD)." 

123 ) 

124 ) 

125 return None 

126 return db_name, db_user, db_password 

127 

128 def _user_exists(self, cursor: psycopg.Cursor, username: str) -> bool: 

129 """Checks if a PostgreSQL user exists.""" 

130 cursor.execute("SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = %s", [username]) 

131 return cursor.fetchone() is not None 

132 

133 def _create_database_user(self, cursor: psycopg.Cursor, admin_user: str, db_alias: str, username: str, password: str): 

134 """Creates a PostgreSQL user.""" 

135 self.stdout.write(f"User: {username} for database: {db_alias} not found. Creating...") 

136 try: 

137 # Use sql.Literal for the password to ensure it's correctly quoted 

138 query = sql.SQL("CREATE USER {user} WITH PASSWORD {password_literal}").format( 

139 user=sql.Identifier(username), password_literal=sql.Literal(password) 

140 ) 

141 cursor.execute(query) 

142 # grant the specific username role to the admin_user 

143 # to allow the admin_user to create database(s) on behalf of the db_user 

144 query = sql.SQL("GRANT {user} TO {admin}").format(user=sql.Identifier(username), admin=sql.Identifier(admin_user)) 

145 cursor.execute(query) 

146 self.stdout.write(self.style.SUCCESS("User created successfully")) 

147 except psycopg.Error as e: 

148 self.stderr.write(self.style.ERROR(f"Failed to create user {username} for database {db_alias}: {e}")) 

149 raise 

150 

151 def _database_exists(self, cursor: psycopg.Cursor, db_name: str) -> bool: 

152 """Checks if a PostgreSQL database exists.""" 

153 cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", [db_name]) 

154 return cursor.fetchone() is not None 

155 

156 def _create_database(self, cursor: psycopg.Cursor, db_alias: str, db_name: str, owner_username: str): 

157 """Creates a PostgreSQL database with the specified owner.""" 

158 self.stdout.write(f"Database {db_name} not found. Creating...") 

159 # Ensure owner exists before attempting to create the database 

160 if not self._user_exists(cursor, owner_username): 

161 self.stderr.write( 

162 self.style.ERROR( 

163 f"Cannot create database: {db_name} because user: {owner_username} does not exist or was not created" 

164 ) 

165 ) 

166 raise CommandError(f"Owner user {owner_username} for database {db_name} not found during database creation.") 

167 try: 

168 # Use sql.Literal for the password to ensure it's correctly quoted 

169 query = sql.SQL("CREATE DATABASE {db} WITH OWNER {owner} ENCODING {encoding}").format( 

170 db=sql.Identifier(db_name), 

171 owner=sql.Identifier(owner_username), 

172 encoding=sql.Literal("UTF-8"), 

173 ) 

174 cursor.execute(query) 

175 self.stdout.write(self.style.SUCCESS("Database created successfully")) 

176 except psycopg.Error as e: 

177 self.stderr.write(self.style.ERROR(f"Failed to create database {db_name} for alias {db_alias}: {e}")) 

178 raise 

179 

180 def _ensure_schema_permissions(self, db_name: str, db_user_to_grant: str): 

181 """ 

182 Grants USAGE and CREATE permissions on the public schema of a newly created database to the specified user. 

183 Connects to the target database using admin credentials. Failure is considered critical as it's for a new database. 

184 """ 

185 self.stdout.write(f"Ensuring schema permissions for user: {db_user_to_grant} in database: {db_name}") 

186 admin_conn = None 

187 try: 

188 admin_conn = self._admin_connection(db_name) 

189 with admin_conn.cursor() as cursor: 

190 grant_query = sql.SQL("GRANT USAGE, CREATE ON SCHEMA public TO {user}").format( 

191 user=sql.Identifier(db_user_to_grant) 

192 ) 

193 cursor.execute(grant_query) 

194 self.stdout.write("Schema permissions confirmed") 

195 except psycopg.Error as e: 

196 self.stderr.write( 

197 self.style.ERROR( 

198 f"Failed to grant schema permissions for user: {db_user_to_grant} in database: {db_name} : {e}" 

199 ) 

200 ) 

201 raise CommandError(f"Failed to set schema permissions for newly created database: {db_name}.") from e 

202 finally: 

203 if admin_conn: 

204 admin_conn.close() 

205 

206 def _ensure_users_and_db(self, admin_conn: Connection): 

207 self.stdout.write(self.style.MIGRATE_HEADING("Checking and creating database users and databases...")) 

208 cursor = admin_conn.cursor() 

209 try: 

210 for db_alias, db_config in settings.DATABASES.items(): 

211 validated_config = self._validate_config(db_alias, db_config) 

212 if not validated_config: 

213 continue # Skip this alias if validation failed 

214 

215 db_name, db_user, db_password = validated_config 

216 

217 # Ensure DB User Exists 

218 if not self._user_exists(cursor, db_user): 

219 self._create_database_user(cursor, admin_conn.info.user, db_alias, db_user, db_password) 

220 else: 

221 self.stdout.write(f"User found: {db_user}") 

222 

223 # Ensure Database Exists 

224 if not self._database_exists(cursor, db_name): 

225 self._create_database(cursor, db_alias, db_name, db_user) # db_user is the owner 

226 self._ensure_schema_permissions(db_name, db_user) 

227 else: 

228 self.stdout.write(f"Database found: {db_name}") 

229 finally: 

230 if cursor: 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true

231 cursor.close() 

232 self.stdout.write("Database and user checks complete.") 

233 

234 def _run_migrations(self): 

235 self.stdout.write(self.style.MIGRATE_HEADING("Running migrations...")) 

236 for db_alias in settings.DATABASES.keys(): 

237 if settings.DATABASES[db_alias].get("ENGINE") != "django.db.backends.postgresql": 

238 self.stdout.write( 

239 self.style.WARNING(f"Skipping migrations for database: {db_alias}. ENGINE is not PostgreSQL.") 

240 ) 

241 continue 

242 try: 

243 self.stdout.write(f"For database: {db_alias}") 

244 call_command("migrate", database=db_alias, interactive=False) 

245 self.stdout.write(self.style.SUCCESS(f"Migrations complete for database: {db_alias}")) 

246 except Exception as e: # Catch more general errors from call_command 

247 self.stderr.write(self.style.ERROR(f"Error running migrations for database: {db_alias}: {str(e)}")) 

248 # Re-raise as CommandError to potentially stop the whole process if a migration fails 

249 raise CommandError(f"Migration failed for {db_alias}.") from e 

250 

251 # initialize the cache table according to Django's requirements 

252 # https://docs.djangoproject.com/en/5.2/topics/cache/#creating-the-cache-table 

253 self.stdout.write("Configuring db-backed cache...") 

254 call_command("createcachetable") 

255 

256 self.stdout.write("All migrations processed.") 

257 

258 def _ensure_superuser(self): 

259 self.stdout.write(self.style.MIGRATE_HEADING("Checking for superuser...")) 

260 DJANGO_SUPERUSER_USERNAME = os.environ.get("DJANGO_SUPERUSER_USERNAME") 

261 DJANGO_SUPERUSER_EMAIL = os.environ.get("DJANGO_SUPERUSER_EMAIL") 

262 DJANGO_SUPERUSER_PASSWORD = os.environ.get("DJANGO_SUPERUSER_PASSWORD") 

263 

264 if DJANGO_SUPERUSER_USERNAME: 

265 User = get_user_model() 

266 # Check against the default database 

267 if User.objects.using(DEFAULT_DB_ALIAS).filter(username=DJANGO_SUPERUSER_USERNAME).exists(): 

268 self.stdout.write(f"Superuser: {DJANGO_SUPERUSER_USERNAME} already exists in database: {DEFAULT_DB_ALIAS}") 

269 else: 

270 if DJANGO_SUPERUSER_EMAIL and DJANGO_SUPERUSER_PASSWORD: 

271 self.stdout.write(f"Superuser: {DJANGO_SUPERUSER_USERNAME} not found. Creating...") 

272 # Note: createsuperuser --no-input relies on DJANGO_SUPERUSER_PASSWORD env var implicitly for password. 

273 # Explicitly passing username and email makes it clearer. 

274 call_command( 

275 "createsuperuser", 

276 interactive=False, 

277 username=DJANGO_SUPERUSER_USERNAME, 

278 email=DJANGO_SUPERUSER_EMAIL, 

279 ) 

280 else: 

281 self.stdout.write( 

282 self.style.WARNING( 

283 f"Cannot create superuser: {DJANGO_SUPERUSER_USERNAME}. " 

284 "DJANGO_SUPERUSER_EMAIL or DJANGO_SUPERUSER_PASSWORD environment variables are not set." 

285 ) 

286 ) 

287 else: 

288 self.stdout.write("DJANGO_SUPERUSER_USERNAME environment variable not set. Skipping superuser creation.") 

289 

290 def add_arguments(self, parser): 

291 parser.add_argument("--reset", action="store_true", help="Completely reset the database(s) (DESTRUCTIVE).") 

292 

293 def handle(self, *args, **options): 

294 # database and user setup (requires admin connection) 

295 admin_conn = None 

296 reset = options.get("reset", False) 

297 try: 

298 admin_conn = self._admin_connection() 

299 if reset: 

300 self._reset(admin_conn) 

301 self._ensure_users_and_db(admin_conn) 

302 except Exception as e: 

303 self.stderr.write(self.style.ERROR(str(e))) 

304 return 

305 finally: 

306 if admin_conn and not admin_conn.closed: 306 ↛ 310line 306 didn't jump to line 310 because the condition on line 306 was always true

307 admin_conn.close() 

308 

309 # migrations 

310 self._run_migrations() 

311 

312 # superuser 

313 self._ensure_superuser() 

314 

315 self.stdout.write(self.style.SUCCESS("ensure_db command finished successfully."))