Coverage for benefits/core/models.py: 99%
241 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
1"""
2The core application: Common model definitions.
3"""
5from functools import cached_property
6import importlib
7import logging
8import os
9import uuid
11from django.conf import settings
12from django.core.exceptions import ValidationError
13from django.contrib.auth.models import Group, User
14from django.db import models
15from django.urls import reverse
16from django.utils import timezone
18import requests
20from benefits.routes import routes
21from benefits.secrets import NAME_VALIDATOR, get_secret_by_name
22from multiselectfield import MultiSelectField
25logger = logging.getLogger(__name__)
28class SecretNameField(models.SlugField):
29 """Field that stores the name of a secret held in a secret store.
31 The secret value itself MUST NEVER be stored in this field.
32 """
34 description = """Field that stores the name of a secret held in a secret store.
36 Secret names must be between 1-127 alphanumeric ASCII characters or hyphen characters.
38 The secret value itself MUST NEVER be stored in this field.
39 """
41 def __init__(self, *args, **kwargs):
42 kwargs["validators"] = [NAME_VALIDATOR]
43 # although the validator also checks for a max length of 127
44 # this setting enforces the length at the database column level as well
45 kwargs["max_length"] = 127
46 # the default is False, but this is more explicit
47 kwargs["allow_unicode"] = False
48 super().__init__(*args, **kwargs)
51class PemData(models.Model):
52 """API Certificate or Key in PEM format."""
54 id = models.AutoField(primary_key=True)
55 # Human description of the PEM data
56 label = models.TextField()
57 # The name of a secret with data in utf-8 encoded PEM text format
58 text_secret_name = SecretNameField(null=True, blank=True)
59 # Public URL hosting the utf-8 encoded PEM text
60 remote_url = models.TextField(null=True, blank=True)
62 def __str__(self):
63 return self.label
65 @cached_property
66 def data(self):
67 """
68 Attempts to get data from `remote_url` or `text_secret_name`, with the latter taking precendence if both are defined.
69 """
70 remote_data = None
71 secret_data = None
73 if self.remote_url:
74 remote_data = requests.get(self.remote_url, timeout=settings.REQUESTS_TIMEOUT).text
75 if self.text_secret_name:
76 try:
77 secret_data = get_secret_by_name(self.text_secret_name)
78 except Exception:
79 secret_data = None
81 return secret_data if secret_data is not None else remote_data
84class ClaimsProvider(models.Model):
85 """An entity that provides claims for eligibility verification."""
87 id = models.AutoField(primary_key=True)
88 sign_out_button_template = models.TextField(null=True, blank=True, help_text="Template that renders sign-out button")
89 sign_out_link_template = models.TextField(null=True, blank=True, help_text="Template that renders sign-out link")
90 client_name = models.TextField(help_text="Unique identifier used to register this claims provider with Authlib registry")
91 client_id_secret_name = SecretNameField(
92 help_text="The name of the secret containing the client ID for this claims provider"
93 )
94 authority = models.TextField(help_text="The fully qualified HTTPS domain name for an OAuth authority server")
95 scheme = models.TextField(help_text="The authentication scheme to use")
97 @property
98 def supports_sign_out(self):
99 return bool(self.sign_out_button_template) or bool(self.sign_out_link_template)
101 @property
102 def client_id(self):
103 return get_secret_by_name(self.client_id_secret_name)
105 def __str__(self) -> str:
106 return self.client_name
109class TransitProcessor(models.Model):
110 """An entity that applies transit agency fare rules to rider transactions."""
112 id = models.AutoField(primary_key=True)
113 name = models.TextField(help_text="Primary internal display name for this TransitProcessor instance, e.g. in the Admin.")
114 api_base_url = models.TextField(help_text="The absolute base URL for the TransitProcessor's API, including https://.")
115 card_tokenize_url = models.TextField(
116 help_text="The absolute URL for the client-side card tokenization library provided by the TransitProcessor."
117 )
118 card_tokenize_func = models.TextField(
119 help_text="The function from the card tokenization library to call on the client to initiate the process."
120 )
121 card_tokenize_env = models.TextField(help_text="The environment in which card tokenization is occurring.")
122 portal_url = models.TextField(
123 null=True, blank=True, help_text="The absolute base URL for the TransitProcessor's control portal, including https://."
124 )
126 def __str__(self):
127 return self.name
130def _agency_logo(instance, filename, size):
131 base, ext = os.path.splitext(filename)
132 return f"agencies/{instance.slug}-{size}" + ext
135def agency_logo_small(instance, filename):
136 return _agency_logo(instance, filename, "sm")
139def agency_logo_large(instance, filename):
140 return _agency_logo(instance, filename, "lg")
143class TransitAgency(models.Model):
144 """An agency offering transit service."""
146 id = models.AutoField(primary_key=True)
147 active = models.BooleanField(default=False, help_text="Determines if this Agency is enabled for users")
148 slug = models.TextField(help_text="Used for URL navigation for this agency, e.g. the agency homepage url is /{slug}")
149 short_name = models.TextField(help_text="The user-facing short name for this agency. Often an uppercase acronym.")
150 long_name = models.TextField(
151 help_text="The user-facing long name for this agency. Often the short_name acronym, spelled out."
152 )
153 info_url = models.URLField(help_text="URL of a website/page with more information about the agency's discounts")
154 phone = models.TextField(help_text="Agency customer support phone number")
155 index_template = models.TextField(help_text="The template used for this agency's landing page")
156 eligibility_index_template = models.TextField(help_text="The template used for this agency's eligibility landing page")
157 eligibility_api_id = models.TextField(help_text="The identifier for this agency used in Eligibility API calls.")
158 eligibility_api_private_key = models.ForeignKey(
159 PemData,
160 related_name="+",
161 on_delete=models.PROTECT,
162 help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.",
163 )
164 eligibility_api_public_key = models.ForeignKey(
165 PemData,
166 related_name="+",
167 on_delete=models.PROTECT,
168 help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501
169 )
170 eligibility_api_jws_signing_alg = models.TextField(
171 help_text="The JWS-compatible signing algorithm used in Eligibility API calls."
172 )
173 transit_processor = models.ForeignKey(TransitProcessor, on_delete=models.PROTECT)
174 transit_processor_audience = models.TextField(
175 help_text="This agency's audience value used to access the TransitProcessor's API.", default=""
176 )
177 transit_processor_client_id = models.TextField(
178 help_text="This agency's client_id value used to access the TransitProcessor's API.", default=""
179 )
180 transit_processor_client_secret_name = SecretNameField(
181 help_text="The name of the secret containing this agency's client_secret value used to access the TransitProcessor's API.", # noqa: E501
182 default="",
183 )
184 staff_group = models.OneToOneField(
185 Group,
186 on_delete=models.PROTECT,
187 null=True,
188 blank=True,
189 default=None,
190 help_text="The group of users associated with this TransitAgency.",
191 related_name="transit_agency",
192 )
193 sso_domain = models.TextField(
194 null=True,
195 blank=True,
196 default="",
197 help_text="The email domain of users to automatically add to this agency's staff group upon login.",
198 )
199 customer_service_group = models.OneToOneField(
200 Group,
201 on_delete=models.PROTECT,
202 null=True,
203 blank=True,
204 default=None,
205 help_text="The group of users who are allowed to do in-person eligibility verification and enrollment.",
206 related_name="+",
207 )
208 logo_large = models.ImageField(
209 default=None,
210 null=True,
211 blank=True,
212 upload_to=agency_logo_large,
213 help_text="The large version of the transit agency's logo.",
214 )
215 logo_small = models.ImageField(
216 default=None,
217 null=True,
218 blank=True,
219 upload_to=agency_logo_small,
220 help_text="The small version of the transit agency's logo.",
221 )
223 def __str__(self):
224 return self.long_name
226 @property
227 def index_url(self):
228 """Public-facing URL to the TransitAgency's landing page."""
229 return reverse(routes.AGENCY_INDEX, args=[self.slug])
231 @property
232 def eligibility_index_url(self):
233 """Public facing URL to the TransitAgency's eligibility page."""
234 return reverse(routes.AGENCY_ELIGIBILITY_INDEX, args=[self.slug])
236 @property
237 def eligibility_api_private_key_data(self):
238 """This Agency's private key as a string."""
239 return self.eligibility_api_private_key.data
241 @property
242 def eligibility_api_public_key_data(self):
243 """This Agency's public key as a string."""
244 return self.eligibility_api_public_key.data
246 @property
247 def transit_processor_client_secret(self):
248 return get_secret_by_name(self.transit_processor_client_secret_name)
250 @property
251 def enrollment_flows(self):
252 return self.enrollmentflow_set
254 @staticmethod
255 def by_id(id):
256 """Get a TransitAgency instance by its ID."""
257 logger.debug(f"Get {TransitAgency.__name__} by id: {id}")
258 return TransitAgency.objects.get(id=id)
260 @staticmethod
261 def by_slug(slug):
262 """Get a TransitAgency instance by its slug."""
263 logger.debug(f"Get {TransitAgency.__name__} by slug: {slug}")
264 return TransitAgency.objects.filter(slug=slug).first()
266 @staticmethod
267 def all_active():
268 """Get all TransitAgency instances marked active."""
269 logger.debug(f"Get all active {TransitAgency.__name__}")
270 return TransitAgency.objects.filter(active=True)
272 @staticmethod
273 def for_user(user: User):
274 for group in user.groups.all():
275 if hasattr(group, "transit_agency"):
276 return group.transit_agency # this is looking at the TransitAgency's staff_group
278 # the loop above returns the first match found. Return None if no match was found.
279 return None
282class EnrollmentMethods:
283 DIGITAL = "digital"
284 IN_PERSON = "in_person"
287SUPPORTED_METHODS = (
288 (EnrollmentMethods.DIGITAL, EnrollmentMethods.DIGITAL.capitalize()),
289 (EnrollmentMethods.IN_PERSON, EnrollmentMethods.IN_PERSON.replace("_", "-").capitalize()),
290)
293class EnrollmentFlow(models.Model):
294 """Represents a user journey through the Benefits app for a single eligibility type."""
296 id = models.AutoField(primary_key=True)
297 system_name = models.TextField(
298 help_text="Primary internal system name for this EnrollmentFlow instance, e.g. in analytics and Eligibility API requests." # noqa: 501
299 )
300 display_order = models.PositiveSmallIntegerField(default=0, blank=False, null=False)
301 claims_provider = models.ForeignKey(
302 ClaimsProvider,
303 on_delete=models.PROTECT,
304 null=True,
305 blank=True,
306 help_text="An entity that provides claims for eligibility verification for this flow.",
307 )
308 claims_scope = models.TextField(
309 null=True,
310 blank=True,
311 help_text="A space-separated list of identifiers used to specify what access privileges are being requested",
312 )
313 claims_eligibility_claim = models.TextField(
314 null=True, blank=True, help_text="The name of the claim that is used to verify eligibility"
315 )
316 claims_extra_claims = models.TextField(null=True, blank=True, help_text="A space-separated list of any additional claims")
317 claims_scheme_override = models.TextField(
318 help_text="The authentication scheme to use (Optional). If blank, defaults to the value in Claims providers",
319 default=None,
320 null=True,
321 blank=True,
322 verbose_name="Claims scheme",
323 )
324 eligibility_api_url = models.TextField(
325 null=True, blank=True, help_text="Fully qualified URL for an Eligibility API server used by this flow."
326 )
327 eligibility_api_auth_header = models.TextField(
328 null=True,
329 blank=True,
330 help_text="The auth header to send in Eligibility API requests for this flow.",
331 )
332 eligibility_api_auth_key_secret_name = SecretNameField(
333 null=True,
334 blank=True,
335 help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests for this flow.", # noqa: 501
336 )
337 eligibility_api_public_key = models.ForeignKey(
338 PemData,
339 related_name="+",
340 on_delete=models.PROTECT,
341 null=True,
342 blank=True,
343 help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses for this flow.", # noqa: E501
344 )
345 eligibility_api_jwe_cek_enc = models.TextField(
346 null=True,
347 blank=True,
348 help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests for this flow.", # noqa: E501
349 )
350 eligibility_api_jwe_encryption_alg = models.TextField(
351 null=True,
352 blank=True,
353 help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests for this flow.",
354 )
355 eligibility_api_jws_signing_alg = models.TextField(
356 null=True,
357 blank=True,
358 help_text="The JWS-compatible signing algorithm to use in Eligibility API requests for this flow.",
359 )
360 selection_label_template = models.TextField(
361 help_text="Path to a Django template that defines the end-user UI for selecting this flow among other options."
362 )
363 eligibility_start_template = models.TextField(
364 default="eligibility/start.html", help_text="Path to a Django template for the informational page of this flow."
365 )
366 eligibility_form_class = models.TextField(
367 null=True,
368 blank=True,
369 help_text="The fully qualified Python path of a form class used by this flow, e.g. benefits.eligibility.forms.FormClass", # noqa: E501
370 )
371 eligibility_unverified_template = models.TextField(
372 default="eligibility/unverified.html",
373 help_text="Path to a Django template that defines the page when a user fails eligibility verification for this flow.",
374 )
375 help_template = models.TextField(
376 null=True,
377 blank=True,
378 help_text="Path to a Django template that defines the help text for this enrollment flow, used in building the dynamic help page for an agency", # noqa: E501
379 )
380 label = models.TextField(
381 null=True,
382 help_text="A human readable label, used as the display text in Admin.",
383 )
384 group_id = models.TextField(null=True, help_text="Reference to the TransitProcessor group for user enrollment")
385 supports_expiration = models.BooleanField(
386 default=False, help_text="Indicates if the enrollment expires or does not expire"
387 )
388 expiration_days = models.PositiveSmallIntegerField(
389 null=True, blank=True, help_text="If the enrollment supports expiration, number of days before the eligibility expires"
390 )
391 expiration_reenrollment_days = models.PositiveSmallIntegerField(
392 null=True,
393 blank=True,
394 help_text="If the enrollment supports expiration, number of days preceding the expiration date during which a user can re-enroll in the eligibilty", # noqa: E501
395 )
396 enrollment_index_template = models.TextField(
397 default="enrollment/index.html",
398 help_text="Template for the Eligibility Confirmation page (which is the index of the enrollment Django app)",
399 )
400 reenrollment_error_template = models.TextField(
401 null=True, blank=True, help_text="Template for a re-enrollment error associated with the enrollment flow"
402 )
403 enrollment_success_template = models.TextField(
404 default="enrollment/success.html", help_text="Template for a successful enrollment associated with the enrollment flow"
405 )
406 supported_enrollment_methods = MultiSelectField(
407 choices=SUPPORTED_METHODS,
408 max_choices=2,
409 max_length=50,
410 default=[EnrollmentMethods.DIGITAL, EnrollmentMethods.IN_PERSON],
411 help_text="If the flow is supported by digital enrollment, in-person enrollment, or both",
412 )
413 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT, null=True, blank=True)
415 class Meta:
416 ordering = ["display_order"]
418 def __str__(self):
419 return self.label
421 @property
422 def eligibility_api_auth_key(self):
423 if self.eligibility_api_auth_key_secret_name is not None: 423 ↛ 426line 423 didn't jump to line 426 because the condition on line 423 was always true
424 return get_secret_by_name(self.eligibility_api_auth_key_secret_name)
425 else:
426 return None
428 @property
429 def eligibility_api_public_key_data(self):
430 """This flow's Eligibility API public key as a string."""
431 return self.eligibility_api_public_key.data
433 @property
434 def uses_claims_verification(self):
435 """True if this flow verifies via the claims provider and has a scope and claim. False otherwise."""
436 return self.claims_provider is not None and bool(self.claims_scope) and bool(self.claims_eligibility_claim)
438 @property
439 def eligibility_verifier(self):
440 """A str representing the entity that verifies eligibility for this flow.
442 Either the client name of the flow's claims provider, or the URL to the eligibility API.
443 """
444 if self.uses_claims_verification:
445 return self.claims_provider.client_name
446 else:
447 return self.eligibility_api_url
449 def eligibility_form_instance(self, *args, **kwargs):
450 """Return an instance of this flow's EligibilityForm, or None."""
451 if not bool(self.eligibility_form_class):
452 return None
454 # inspired by https://stackoverflow.com/a/30941292
455 module_name, class_name = self.eligibility_form_class.rsplit(".", 1)
456 FormClass = getattr(importlib.import_module(module_name), class_name)
458 return FormClass(*args, **kwargs)
460 @staticmethod
461 def by_id(id):
462 """Get an EnrollmentFlow instance by its ID."""
463 logger.debug(f"Get {EnrollmentFlow.__name__} by id: {id}")
464 return EnrollmentFlow.objects.get(id=id)
466 def clean(self):
467 supports_expiration = self.supports_expiration
468 expiration_days = self.expiration_days
469 expiration_reenrollment_days = self.expiration_reenrollment_days
470 reenrollment_error_template = self.reenrollment_error_template
472 if supports_expiration:
473 errors = {}
474 message = "When support_expiration is True, this value must be greater than 0."
475 if expiration_days is None or expiration_days <= 0:
476 errors.update(expiration_days=ValidationError(message))
477 if expiration_reenrollment_days is None or expiration_reenrollment_days <= 0:
478 errors.update(expiration_reenrollment_days=ValidationError(message))
479 if reenrollment_error_template is None:
480 errors.update(reenrollment_error_template=ValidationError("Required when supports expiration is True."))
482 if errors:
483 raise ValidationError(errors)
485 @property
486 def claims_scheme(self):
487 if not self.claims_scheme_override:
488 return self.claims_provider.scheme
489 return self.claims_scheme_override
491 @property
492 def claims_all_claims(self):
493 claims = [self.claims_eligibility_claim]
494 if self.claims_extra_claims is not None:
495 claims.extend(self.claims_extra_claims.split())
496 return claims
499class EnrollmentEvent(models.Model):
500 """A record of a successful enrollment."""
502 id = models.UUIDField(primary_key=True, default=uuid.uuid4)
503 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT)
504 enrollment_flow = models.ForeignKey(EnrollmentFlow, on_delete=models.PROTECT)
505 enrollment_method = models.TextField(
506 choices={
507 EnrollmentMethods.DIGITAL: EnrollmentMethods.DIGITAL,
508 EnrollmentMethods.IN_PERSON: EnrollmentMethods.IN_PERSON,
509 }
510 )
511 verified_by = models.TextField()
512 enrollment_datetime = models.DateTimeField(default=timezone.now)
513 expiration_datetime = models.DateTimeField(blank=True, null=True)
514 extra_claims = models.TextField(blank=True, null=True)
516 def __str__(self):
517 dt = timezone.localtime(self.enrollment_datetime)
518 ts = dt.strftime("%b %d, %Y, %I:%M %p")
519 return f"{ts}, {self.transit_agency}, {self.enrollment_flow}"