Coverage for benefits/core/admin.py: 98%
43 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
1"""
2The core application: Admin interface configuration.
3"""
5import logging
6import requests
8from adminsortable2.admin import SortableAdminMixin
9from django.conf import settings
10from django.contrib import admin
11from django.contrib.auth.models import Group
12from django.http import HttpRequest
14from . import models
16logger = logging.getLogger(__name__)
19GOOGLE_USER_INFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
21logger.debug("Register models with admin site")
22admin.site.register(models.PemData)
25@admin.register(models.ClaimsProvider)
26class ClaimsProviderAdmin(admin.ModelAdmin): # pragma: no cover
27 def get_exclude(self, request, obj=None):
28 if not request.user.is_superuser:
29 return ["client_id_secret_name"]
30 else:
31 return super().get_exclude(request, obj)
33 def get_readonly_fields(self, request, obj=None):
34 if not request.user.is_superuser:
35 return [
36 "sign_out_button_template",
37 "sign_out_link_template",
38 "authority",
39 "scheme",
40 ]
41 else:
42 return super().get_readonly_fields(request, obj)
45@admin.register(models.EnrollmentFlow)
46class SortableEnrollmentFlowAdmin(SortableAdminMixin, admin.ModelAdmin): # pragma: no cover
47 list_display = ("label", "transit_agency", "supported_enrollment_methods")
49 def get_exclude(self, request, obj=None):
50 if not request.user.is_superuser:
51 return [
52 "eligibility_api_auth_header",
53 "eligibility_api_auth_key_secret_name",
54 "eligibility_api_public_key",
55 "eligibility_api_jwe_cek_enc",
56 "eligibility_api_jwe_encryption_alg",
57 "eligibility_api_jws_signing_alg",
58 "eligibility_form_class",
59 ]
60 else:
61 return super().get_exclude(request, obj)
63 def get_readonly_fields(self, request, obj=None):
64 if not request.user.is_superuser:
65 return [
66 "claims_provider",
67 "eligibility_api_url",
68 "eligibility_start_template",
69 "eligibility_unverified_template",
70 "help_template",
71 "selection_label_template",
72 "claims_scheme_override",
73 "enrollment_index_template",
74 "reenrollment_error_template",
75 "enrollment_success_template",
76 ]
77 else:
78 return super().get_readonly_fields(request, obj)
81@admin.register(models.TransitProcessor)
82class TransitProcessorAdmin(admin.ModelAdmin): # pragma: no cover
83 def get_exclude(self, request, obj=None):
84 if not request.user.is_superuser:
85 return []
86 else:
87 return super().get_exclude(request, obj)
89 def get_readonly_fields(self, request, obj=None):
90 if not request.user.is_superuser:
91 return [
92 "card_tokenize_url",
93 "card_tokenize_func",
94 "card_tokenize_env",
95 ]
96 else:
97 return super().get_readonly_fields(request, obj)
100@admin.register(models.TransitAgency)
101class TransitAgencyAdmin(admin.ModelAdmin): # pragma: no cover
102 def get_exclude(self, request, obj=None):
103 if not request.user.is_superuser:
104 return [
105 "eligibility_api_private_key",
106 "eligibility_api_public_key",
107 "eligibility_api_jws_signing_alg",
108 "transit_processor_client_id",
109 "transit_processor_client_secret_name",
110 "transit_processor_audience",
111 ]
112 else:
113 return super().get_exclude(request, obj)
115 def get_readonly_fields(self, request, obj=None):
116 if not request.user.is_superuser:
117 return [
118 "eligibility_api_id",
119 "transit_processor",
120 "index_template",
121 "eligibility_index_template",
122 ]
123 else:
124 return super().get_readonly_fields(request, obj)
127@admin.register(models.EnrollmentEvent)
128class EnrollmentEventAdmin(admin.ModelAdmin): # pragma: no cover
129 list_display = ("enrollment_datetime", "transit_agency", "enrollment_flow", "enrollment_method", "verified_by")
131 def get_readonly_fields(self, request: HttpRequest, obj=None):
132 return ["id"]
134 def has_add_permission(self, request: HttpRequest, obj=None):
135 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
136 return False
137 elif request.user and (request.user.is_superuser or is_staff_member(request.user)):
138 return True
139 else:
140 return False
142 def has_change_permission(self, request: HttpRequest, obj=None):
143 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
144 return False
145 elif request.user and request.user.is_superuser:
146 return True
147 else:
148 return False
150 def has_delete_permission(self, request: HttpRequest, obj=None):
151 if settings.RUNTIME_ENVIRONMENT() == settings.RUNTIME_ENVS.PROD:
152 return False
153 elif request.user and (request.user.is_superuser or is_staff_member(request.user)):
154 return True
155 else:
156 return False
158 def has_view_permission(self, request: HttpRequest, obj=None):
159 if request.user and (request.user.is_superuser or is_staff_member(request.user)):
160 return True
161 else:
162 return False
165def pre_login_user(user, request):
166 logger.debug(f"Running pre-login callback for user: {user.username}")
167 add_google_sso_userinfo(user, request)
168 add_staff_user_to_group(user, request)
169 add_transit_agency_staff_user_to_group(user, request)
172def add_google_sso_userinfo(user, request):
173 token = request.session.get("google_sso_access_token")
174 if token:
175 headers = {
176 "Authorization": f"Bearer {token}",
177 }
179 # Request Google user info to get name and email
180 response = requests.get(GOOGLE_USER_INFO_URL, headers=headers, timeout=settings.REQUESTS_TIMEOUT)
181 user_data = response.json()
182 logger.debug(f"Updating user data from Google for user with email: {user_data['email']}")
184 user.first_name = user_data["given_name"]
185 user.last_name = user_data["family_name"]
186 user.username = user_data["email"]
187 user.email = user_data["email"]
188 user.save()
189 else:
190 logger.warning("google_sso_access_token not found in session.")
193def add_staff_user_to_group(user, request):
194 if user.email in settings.GOOGLE_SSO_STAFF_LIST:
195 staff_group = Group.objects.get(name=settings.STAFF_GROUP_NAME)
196 staff_group.user_set.add(user)
199def add_transit_agency_staff_user_to_group(user, request):
200 user_sso_domain = user.email.split("@")[1]
201 if user_sso_domain: 201 ↛ exitline 201 didn't return from function 'add_transit_agency_staff_user_to_group' because the condition on line 201 was always true
202 agency = models.TransitAgency.objects.filter(sso_domain=user_sso_domain).first()
203 if agency is not None and agency.staff_group:
204 agency.staff_group.user_set.add(user)
207def is_staff_member(user):
208 staff_group = Group.objects.get(name=settings.STAFF_GROUP_NAME)
209 return staff_group.user_set.contains(user)