Coverage for benefits/core/admin.py: 98%
43 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 00:56 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 00:56 +0000
1"""
2The core application: Admin interface configuration.
3"""
5import logging
6import requests
8from adminsortable2.admin import SortableAdminMixin
9from django.conf import settings
10from django.contrib import admin
11from django.contrib.auth.models import Group
12from django.http import HttpRequest
14from . import models
16logger = logging.getLogger(__name__)
19GOOGLE_USER_INFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
21logger.debug("Register models with admin site")
22admin.site.register(models.PemData)
25@admin.register(models.ClaimsProvider)
26class ClaimsProviderAdmin(admin.ModelAdmin): # pragma: no cover
27 def get_exclude(self, request, obj=None):
28 if not request.user.is_superuser:
29 return ["client_id_secret_name"]
30 else:
31 return super().get_exclude(request, obj)
33 def get_readonly_fields(self, request, obj=None):
34 if not request.user.is_superuser:
35 return [
36 "sign_out_button_template",
37 "sign_out_link_template",
38 "authority",
39 "scheme",
40 ]
41 else:
42 return super().get_readonly_fields(request, obj)
45@admin.register(models.EnrollmentFlow)
46class SortableEnrollmentFlowAdmin(SortableAdminMixin, admin.ModelAdmin): # pragma: no cover
47 list_display = ("label", "transit_agency", "supported_enrollment_methods")
49 def get_exclude(self, request, obj=None):
50 if not request.user.is_superuser:
51 return [
52 "eligibility_api_auth_header",
53 "eligibility_api_auth_key_secret_name",
54 "eligibility_api_public_key",
55 "eligibility_api_jwe_cek_enc",
56 "eligibility_api_jwe_encryption_alg",
57 "eligibility_api_jws_signing_alg",
58 "eligibility_form_class",
59 ]
60 else:
61 return super().get_exclude(request, obj)
63 def get_readonly_fields(self, request, obj=None):
64 if not request.user.is_superuser:
65 return [
66 "claims_provider",
67 "eligibility_api_url",
68 "eligibility_start_template_override",
69 "eligibility_unverified_template_override",
70 "help_template",
71 "selection_label_template_override",
72 "claims_scheme_override",
73 "enrollment_index_template",
74 "reenrollment_error_template",
75 "enrollment_success_template_override",
76 ]
77 else:
78 return super().get_readonly_fields(request, obj)
81@admin.register(models.TransitProcessor)
82class TransitProcessorAdmin(admin.ModelAdmin): # pragma: no cover
83 def get_exclude(self, request, obj=None):
84 if not request.user.is_superuser:
85 return []
86 else:
87 return super().get_exclude(request, obj)
89 def get_readonly_fields(self, request, obj=None):
90 if not request.user.is_superuser:
91 return [
92 "card_tokenize_url",
93 "card_tokenize_func",
94 "card_tokenize_env",
95 ]
96 else:
97 return super().get_readonly_fields(request, obj)
100@admin.register(models.TransitAgency)
101class TransitAgencyAdmin(admin.ModelAdmin): # pragma: no cover
102 def get_exclude(self, request, obj=None):
103 if not request.user.is_superuser:
104 return [
105 "eligibility_api_private_key",
106 "eligibility_api_public_key",
107 "transit_processor_client_id",
108 "transit_processor_client_secret_name",
109 "transit_processor_audience",
110 ]
111 else:
112 return super().get_exclude(request, obj)
114 def get_readonly_fields(self, request, obj=None):
115 if not request.user.is_superuser:
116 return [
117 "eligibility_api_id",
118 "transit_processor",
119 "index_template_override",
120 "eligibility_index_template_override",
121 ]
122 else:
123 return super().get_readonly_fields(request, obj)
126@admin.register(models.EnrollmentEvent)
127class EnrollmentEventAdmin(admin.ModelAdmin): # pragma: no cover
128 list_display = ("enrollment_datetime", "transit_agency", "enrollment_flow", "enrollment_method", "verified_by")
130 def get_readonly_fields(self, request: HttpRequest, obj=None):
131 return ["id"]
133 def has_add_permission(self, request: HttpRequest, obj=None):
134 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
135 return False
136 elif request.user and (request.user.is_superuser or is_staff_member(request.user)):
137 return True
138 else:
139 return False
141 def has_change_permission(self, request: HttpRequest, obj=None):
142 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
143 return False
144 elif request.user and request.user.is_superuser:
145 return True
146 else:
147 return False
149 def has_delete_permission(self, request: HttpRequest, obj=None):
150 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
151 return False
152 elif request.user and (request.user.is_superuser or is_staff_member(request.user)):
153 return True
154 else:
155 return False
157 def has_view_permission(self, request: HttpRequest, obj=None):
158 if request.user and (request.user.is_superuser or is_staff_member(request.user)):
159 return True
160 else:
161 return False
164def pre_login_user(user, request):
165 logger.debug(f"Running pre-login callback for user: {user.username}")
166 add_google_sso_userinfo(user, request)
167 add_staff_user_to_group(user, request)
168 add_transit_agency_staff_user_to_group(user, request)
171def add_google_sso_userinfo(user, request):
172 token = request.session.get("google_sso_access_token")
173 if token:
174 headers = {
175 "Authorization": f"Bearer {token}",
176 }
178 # Request Google user info to get name and email
179 response = requests.get(GOOGLE_USER_INFO_URL, headers=headers, timeout=settings.REQUESTS_TIMEOUT)
180 user_data = response.json()
181 logger.debug(f"Updating user data from Google for user with email: {user_data['email']}")
183 user.first_name = user_data["given_name"]
184 user.last_name = user_data["family_name"]
185 user.username = user_data["email"]
186 user.email = user_data["email"]
187 user.save()
188 else:
189 logger.warning("google_sso_access_token not found in session.")
192def add_staff_user_to_group(user, request):
193 if user.email in settings.GOOGLE_SSO_STAFF_LIST:
194 staff_group = Group.objects.get(name=settings.STAFF_GROUP_NAME)
195 staff_group.user_set.add(user)
198def add_transit_agency_staff_user_to_group(user, request):
199 user_sso_domain = user.email.split("@")[1]
200 if user_sso_domain: 200 ↛ exitline 200 didn't return from function 'add_transit_agency_staff_user_to_group' because the condition on line 200 was always true
201 agency = models.TransitAgency.objects.filter(sso_domain=user_sso_domain).first()
202 if agency is not None and agency.staff_group:
203 agency.staff_group.user_set.add(user)
206def is_staff_member(user):
207 staff_group = Group.objects.get(name=settings.STAFF_GROUP_NAME)
208 return staff_group.user_set.contains(user)