Coverage for benefits/core/models.py: 99%

327 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-19 00:56 +0000

1""" 

2The core application: Common model definitions. 

3""" 

4 

5from functools import cached_property 

6import importlib 

7import logging 

8import os 

9from pathlib import Path 

10import uuid 

11 

12from django import template 

13from django.conf import settings 

14from django.core.exceptions import ValidationError 

15from django.contrib.auth.models import Group, User 

16from django.db import models 

17from django.urls import reverse 

18from django.utils import timezone 

19 

20import requests 

21 

22from benefits.routes import routes 

23from benefits.secrets import NAME_VALIDATOR, get_secret_by_name 

24from multiselectfield import MultiSelectField 

25 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def template_path(template_name: str) -> Path: 

31 """Get a `pathlib.Path` for the named template, or None if it can't be found. 

32 

33 A `template_name` is the app-local name, e.g. `enrollment/success.html`. 

34 

35 Adapted from https://stackoverflow.com/a/75863472. 

36 """ 

37 if template_name: 

38 for engine in template.engines.all(): 

39 for loader in engine.engine.template_loaders: 

40 for origin in loader.get_template_sources(template_name): 

41 path = Path(origin.name) 

42 if path.exists() and path.is_file(): 

43 return path 

44 return None 

45 

46 

47class SecretNameField(models.SlugField): 

48 """Field that stores the name of a secret held in a secret store. 

49 

50 The secret value itself MUST NEVER be stored in this field. 

51 """ 

52 

53 description = """Field that stores the name of a secret held in a secret store. 

54 

55 Secret names must be between 1-127 alphanumeric ASCII characters or hyphen characters. 

56 

57 The secret value itself MUST NEVER be stored in this field. 

58 """ 

59 

60 def __init__(self, *args, **kwargs): 

61 kwargs["validators"] = [NAME_VALIDATOR] 

62 # although the validator also checks for a max length of 127 

63 # this setting enforces the length at the database column level as well 

64 kwargs["max_length"] = 127 

65 # the default is False, but this is more explicit 

66 kwargs["allow_unicode"] = False 

67 super().__init__(*args, **kwargs) 

68 

69 

70class PemData(models.Model): 

71 """API Certificate or Key in PEM format.""" 

72 

73 id = models.AutoField(primary_key=True) 

74 # Human description of the PEM data 

75 label = models.TextField() 

76 # The name of a secret with data in utf-8 encoded PEM text format 

77 text_secret_name = SecretNameField(default="", blank=True) 

78 # Public URL hosting the utf-8 encoded PEM text 

79 remote_url = models.TextField(default="", blank=True) 

80 

81 def __str__(self): 

82 return self.label 

83 

84 @cached_property 

85 def data(self): 

86 """ 

87 Attempts to get data from `remote_url` or `text_secret_name`, with the latter taking precendence if both are defined. 

88 """ 

89 remote_data = None 

90 secret_data = None 

91 

92 if self.remote_url: 

93 remote_data = requests.get(self.remote_url, timeout=settings.REQUESTS_TIMEOUT).text 

94 if self.text_secret_name: 

95 try: 

96 secret_data = get_secret_by_name(self.text_secret_name) 

97 except Exception: 

98 secret_data = None 

99 

100 return secret_data if secret_data is not None else remote_data 

101 

102 

103class ClaimsProvider(models.Model): 

104 """An entity that provides claims for eligibility verification.""" 

105 

106 id = models.AutoField(primary_key=True) 

107 sign_out_button_template = models.TextField(default="", blank=True, help_text="Template that renders sign-out button") 

108 sign_out_link_template = models.TextField(default="", blank=True, help_text="Template that renders sign-out link") 

109 client_name = models.TextField(help_text="Unique identifier used to register this claims provider with Authlib registry") 

110 client_id_secret_name = SecretNameField( 

111 help_text="The name of the secret containing the client ID for this claims provider" 

112 ) 

113 authority = models.TextField(help_text="The fully qualified HTTPS domain name for an OAuth authority server") 

114 scheme = models.TextField(help_text="The authentication scheme to use") 

115 

116 @property 

117 def supports_sign_out(self): 

118 return bool(self.sign_out_button_template) or bool(self.sign_out_link_template) 

119 

120 @property 

121 def client_id(self): 

122 return get_secret_by_name(self.client_id_secret_name) 

123 

124 def __str__(self) -> str: 

125 return self.client_name 

126 

127 

128class TransitProcessor(models.Model): 

129 """An entity that applies transit agency fare rules to rider transactions.""" 

130 

131 id = models.AutoField(primary_key=True) 

132 name = models.TextField(help_text="Primary internal display name for this TransitProcessor instance, e.g. in the Admin.") 

133 api_base_url = models.TextField(help_text="The absolute base URL for the TransitProcessor's API, including https://.") 

134 card_tokenize_url = models.TextField( 

135 help_text="The absolute URL for the client-side card tokenization library provided by the TransitProcessor." 

136 ) 

137 card_tokenize_func = models.TextField( 

138 help_text="The function from the card tokenization library to call on the client to initiate the process." 

139 ) 

140 card_tokenize_env = models.TextField(help_text="The environment in which card tokenization is occurring.") 

141 portal_url = models.TextField( 

142 default="", 

143 blank=True, 

144 help_text="The absolute base URL for the TransitProcessor's control portal, including https://.", 

145 ) 

146 

147 def __str__(self): 

148 return self.name 

149 

150 

151def _agency_logo(instance, filename, size): 

152 base, ext = os.path.splitext(filename) 

153 return f"agencies/{instance.slug}-{size}" + ext 

154 

155 

156def agency_logo_small(instance, filename): 

157 return _agency_logo(instance, filename, "sm") 

158 

159 

160def agency_logo_large(instance, filename): 

161 return _agency_logo(instance, filename, "lg") 

162 

163 

164class TransitAgency(models.Model): 

165 """An agency offering transit service.""" 

166 

167 id = models.AutoField(primary_key=True) 

168 active = models.BooleanField(default=False, help_text="Determines if this Agency is enabled for users") 

169 slug = models.SlugField(help_text="Used for URL navigation for this agency, e.g. the agency homepage url is /{slug}") 

170 short_name = models.TextField( 

171 default="", blank=True, help_text="The user-facing short name for this agency. Often an uppercase acronym." 

172 ) 

173 long_name = models.TextField( 

174 default="", 

175 blank=True, 

176 help_text="The user-facing long name for this agency. Often the short_name acronym, spelled out.", 

177 ) 

178 info_url = models.URLField( 

179 default="", 

180 blank=True, 

181 help_text="URL of a website/page with more information about the agency's discounts", 

182 ) 

183 phone = models.TextField(default="", blank=True, help_text="Agency customer support phone number") 

184 index_template_override = models.TextField( 

185 help_text="Override the default template used for this agency's landing page", 

186 blank=True, 

187 default="", 

188 ) 

189 eligibility_index_template_override = models.TextField( 

190 help_text="Override the default template used for this agency's eligibility landing page", 

191 blank=True, 

192 default="", 

193 ) 

194 eligibility_api_id = models.TextField( 

195 help_text="The identifier for this agency used in Eligibility API calls.", 

196 blank=True, 

197 default="", 

198 ) 

199 eligibility_api_private_key = models.ForeignKey( 

200 PemData, 

201 related_name="+", 

202 on_delete=models.PROTECT, 

203 help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.", 

204 null=True, 

205 blank=True, 

206 default=None, 

207 ) 

208 eligibility_api_public_key = models.ForeignKey( 

209 PemData, 

210 related_name="+", 

211 on_delete=models.PROTECT, 

212 help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 

213 null=True, 

214 blank=True, 

215 default=None, 

216 ) 

217 transit_processor = models.ForeignKey( 

218 TransitProcessor, 

219 on_delete=models.PROTECT, 

220 null=True, 

221 blank=True, 

222 default=None, 

223 help_text="This agency's TransitProcessor.", 

224 ) 

225 transit_processor_audience = models.TextField( 

226 help_text="This agency's audience value used to access the TransitProcessor's API.", default="", blank=True 

227 ) 

228 transit_processor_client_id = models.TextField( 

229 help_text="This agency's client_id value used to access the TransitProcessor's API.", default="", blank=True 

230 ) 

231 transit_processor_client_secret_name = SecretNameField( 

232 help_text="The name of the secret containing this agency's client_secret value used to access the TransitProcessor's API.", # noqa: E501 

233 default="", 

234 blank=True, 

235 ) 

236 staff_group = models.OneToOneField( 

237 Group, 

238 on_delete=models.PROTECT, 

239 null=True, 

240 blank=True, 

241 default=None, 

242 help_text="The group of users associated with this TransitAgency.", 

243 related_name="transit_agency", 

244 ) 

245 sso_domain = models.TextField( 

246 blank=True, 

247 default="", 

248 help_text="The email domain of users to automatically add to this agency's staff group upon login.", 

249 ) 

250 customer_service_group = models.OneToOneField( 

251 Group, 

252 on_delete=models.PROTECT, 

253 null=True, 

254 blank=True, 

255 default=None, 

256 help_text="The group of users who are allowed to do in-person eligibility verification and enrollment.", 

257 related_name="+", 

258 ) 

259 logo_large = models.ImageField( 

260 default="", 

261 blank=True, 

262 upload_to=agency_logo_large, 

263 help_text="The large version of the transit agency's logo.", 

264 ) 

265 logo_small = models.ImageField( 

266 default="", 

267 blank=True, 

268 upload_to=agency_logo_small, 

269 help_text="The small version of the transit agency's logo.", 

270 ) 

271 

272 def __str__(self): 

273 return self.long_name 

274 

275 @property 

276 def index_template(self): 

277 return self.index_template_override or f"core/index--{self.slug}.html" 

278 

279 @property 

280 def index_url(self): 

281 """Public-facing URL to the TransitAgency's landing page.""" 

282 return reverse(routes.AGENCY_INDEX, args=[self.slug]) 

283 

284 @property 

285 def eligibility_index_template(self): 

286 return self.eligibility_index_template_override or f"eligibility/index--{self.slug}.html" 

287 

288 @property 

289 def eligibility_index_url(self): 

290 """Public facing URL to the TransitAgency's eligibility page.""" 

291 return reverse(routes.AGENCY_ELIGIBILITY_INDEX, args=[self.slug]) 

292 

293 @property 

294 def eligibility_api_private_key_data(self): 

295 """This Agency's private key as a string.""" 

296 return self.eligibility_api_private_key.data 

297 

298 @property 

299 def eligibility_api_public_key_data(self): 

300 """This Agency's public key as a string.""" 

301 return self.eligibility_api_public_key.data 

302 

303 @property 

304 def transit_processor_client_secret(self): 

305 return get_secret_by_name(self.transit_processor_client_secret_name) 

306 

307 @property 

308 def enrollment_flows(self): 

309 return self.enrollmentflow_set 

310 

311 def clean(self): 

312 field_errors = {} 

313 template_errors = [] 

314 

315 if self.active: 

316 for flow in self.enrollment_flows.all(): 

317 try: 

318 flow.clean() 

319 except ValidationError: 

320 raise ValidationError(f"Invalid EnrollmentFlow: {flow.label}") 

321 

322 message = "This field is required for active transit agencies." 

323 needed = dict( 

324 short_name=self.short_name, 

325 long_name=self.long_name, 

326 phone=self.phone, 

327 info_url=self.info_url, 

328 logo_large=self.logo_large, 

329 logo_small=self.logo_small, 

330 ) 

331 if self.transit_processor: 331 ↛ 339line 331 didn't jump to line 339 because the condition on line 331 was always true

332 needed.update( 

333 dict( 

334 transit_processor_audience=self.transit_processor_audience, 

335 transit_processor_client_id=self.transit_processor_client_id, 

336 transit_processor_client_secret_name=self.transit_processor_client_secret_name, 

337 ) 

338 ) 

339 field_errors.update({k: ValidationError(message) for k, v in needed.items() if not v}) 

340 

341 # since templates are calculated from the pattern or the override field 

342 # we can't add a field-level validation error 

343 # so just create directly for a missing template 

344 for t in [self.index_template, self.eligibility_index_template]: 

345 if not template_path(t): 

346 template_errors.append(ValidationError(f"Template not found: {t}")) 

347 

348 if field_errors: 

349 raise ValidationError(field_errors) 

350 if template_errors: 

351 raise ValidationError(template_errors) 

352 

353 @staticmethod 

354 def by_id(id): 

355 """Get a TransitAgency instance by its ID.""" 

356 logger.debug(f"Get {TransitAgency.__name__} by id: {id}") 

357 return TransitAgency.objects.get(id=id) 

358 

359 @staticmethod 

360 def by_slug(slug): 

361 """Get a TransitAgency instance by its slug.""" 

362 logger.debug(f"Get {TransitAgency.__name__} by slug: {slug}") 

363 return TransitAgency.objects.filter(slug=slug).first() 

364 

365 @staticmethod 

366 def all_active(): 

367 """Get all TransitAgency instances marked active.""" 

368 logger.debug(f"Get all active {TransitAgency.__name__}") 

369 return TransitAgency.objects.filter(active=True) 

370 

371 @staticmethod 

372 def for_user(user: User): 

373 for group in user.groups.all(): 

374 if hasattr(group, "transit_agency"): 

375 return group.transit_agency # this is looking at the TransitAgency's staff_group 

376 

377 # the loop above returns the first match found. Return None if no match was found. 

378 return None 

379 

380 

381class EnrollmentMethods: 

382 DIGITAL = "digital" 

383 IN_PERSON = "in_person" 

384 

385 

386SUPPORTED_METHODS = ( 

387 (EnrollmentMethods.DIGITAL, EnrollmentMethods.DIGITAL.capitalize()), 

388 (EnrollmentMethods.IN_PERSON, EnrollmentMethods.IN_PERSON.replace("_", "-").capitalize()), 

389) 

390 

391 

392class EnrollmentFlow(models.Model): 

393 """Represents a user journey through the Benefits app for a single eligibility type.""" 

394 

395 id = models.AutoField(primary_key=True) 

396 system_name = models.SlugField( 

397 help_text="Primary internal system name for this EnrollmentFlow instance, e.g. in analytics and Eligibility API requests." # noqa: 501 

398 ) 

399 label = models.TextField( 

400 blank=True, 

401 default="", 

402 help_text="A human readable label, used as the display text in Admin.", 

403 ) 

404 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT, null=True, blank=True) 

405 supported_enrollment_methods = MultiSelectField( 

406 choices=SUPPORTED_METHODS, 

407 max_choices=2, 

408 max_length=50, 

409 default=[EnrollmentMethods.DIGITAL, EnrollmentMethods.IN_PERSON], 

410 help_text="If the flow is supported by digital enrollment, in-person enrollment, or both", 

411 ) 

412 group_id = models.TextField( 

413 blank=True, default="", help_text="Reference to the TransitProcessor group for user enrollment" 

414 ) 

415 claims_provider = models.ForeignKey( 

416 ClaimsProvider, 

417 on_delete=models.PROTECT, 

418 null=True, 

419 blank=True, 

420 help_text="An entity that provides claims for eligibility verification for this flow.", 

421 ) 

422 claims_scope = models.TextField( 

423 blank=True, 

424 default="", 

425 help_text="A space-separated list of identifiers used to specify what access privileges are being requested", 

426 ) 

427 claims_eligibility_claim = models.TextField( 

428 blank=True, default="", help_text="The name of the claim that is used to verify eligibility" 

429 ) 

430 claims_extra_claims = models.TextField(blank=True, default="", help_text="A space-separated list of any additional claims") 

431 claims_scheme_override = models.TextField( 

432 help_text="The authentication scheme to use (Optional). If blank, defaults to the value in Claims providers", 

433 default="", 

434 blank=True, 

435 verbose_name="Claims scheme", 

436 ) 

437 eligibility_api_url = models.TextField( 

438 blank=True, default="", help_text="Fully qualified URL for an Eligibility API server used by this flow." 

439 ) 

440 eligibility_api_auth_header = models.TextField( 

441 blank=True, 

442 default="", 

443 help_text="The auth header to send in Eligibility API requests for this flow.", 

444 ) 

445 eligibility_api_auth_key_secret_name = SecretNameField( 

446 blank=True, 

447 default="", 

448 help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests for this flow.", # noqa: 501 

449 ) 

450 eligibility_api_public_key = models.ForeignKey( 

451 PemData, 

452 related_name="+", 

453 on_delete=models.PROTECT, 

454 null=True, 

455 blank=True, 

456 help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses for this flow.", # noqa: E501 

457 ) 

458 eligibility_api_jwe_cek_enc = models.TextField( 

459 blank=True, 

460 default="", 

461 help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests for this flow.", # noqa: E501 

462 ) 

463 eligibility_api_jwe_encryption_alg = models.TextField( 

464 blank=True, 

465 default="", 

466 help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests for this flow.", 

467 ) 

468 eligibility_api_jws_signing_alg = models.TextField( 

469 blank=True, 

470 default="", 

471 help_text="The JWS-compatible signing algorithm to use in Eligibility API requests for this flow.", 

472 ) 

473 eligibility_form_class = models.TextField( 

474 blank=True, 

475 default="", 

476 help_text="The fully qualified Python path of a form class used by this flow, e.g. benefits.eligibility.forms.FormClass", # noqa: E501 

477 ) 

478 selection_label_template_override = models.TextField( 

479 blank=True, 

480 default="", 

481 help_text="Override the default template that defines the end-user UI for selecting this flow among other options.", 

482 ) 

483 eligibility_start_template_override = models.TextField( 

484 blank=True, 

485 default="", 

486 help_text="Override the default template for the informational page of this flow.", 

487 ) 

488 eligibility_unverified_template_override = models.TextField( 

489 blank=True, 

490 default="", 

491 help_text="Override the default template that defines the page when a user fails eligibility verification for this flow.", # noqa: E501 

492 ) 

493 help_template = models.TextField( 

494 blank=True, 

495 default="", 

496 help_text="Path to a Django template that defines the help text for this enrollment flow, used in building the dynamic help page for an agency", # noqa: E501 

497 ) 

498 supports_expiration = models.BooleanField( 

499 default=False, help_text="Indicates if the enrollment expires or does not expire" 

500 ) 

501 expiration_days = models.PositiveSmallIntegerField( 

502 null=True, blank=True, help_text="If the enrollment supports expiration, number of days before the eligibility expires" 

503 ) 

504 expiration_reenrollment_days = models.PositiveSmallIntegerField( 

505 null=True, 

506 blank=True, 

507 help_text="If the enrollment supports expiration, number of days preceding the expiration date during which a user can re-enroll in the eligibilty", # noqa: E501 

508 ) 

509 enrollment_index_template_override = models.TextField( 

510 blank=True, 

511 default="", 

512 help_text="Override the default template for the Eligibility Confirmation page (the index of the enrollment app)", 

513 ) 

514 reenrollment_error_template = models.TextField( 

515 blank=True, default="", help_text="Template for a re-enrollment error associated with the enrollment flow" 

516 ) 

517 enrollment_success_template_override = models.TextField( 

518 blank=True, 

519 default="", 

520 help_text="Override the default template for a successful enrollment associated with the enrollment flow", 

521 ) 

522 display_order = models.PositiveSmallIntegerField(default=0, blank=False, null=False) 

523 

524 class Meta: 

525 ordering = ["display_order"] 

526 

527 def __str__(self): 

528 return self.label 

529 

530 @property 

531 def agency_card_name(self): 

532 if self.uses_claims_verification: 

533 return "" 

534 else: 

535 return f"{self.transit_agency.slug}-agency-card" 

536 

537 @property 

538 def eligibility_api_auth_key(self): 

539 if self.eligibility_api_auth_key_secret_name is not None: 539 ↛ 542line 539 didn't jump to line 542 because the condition on line 539 was always true

540 return get_secret_by_name(self.eligibility_api_auth_key_secret_name) 

541 else: 

542 return None 

543 

544 @property 

545 def eligibility_api_public_key_data(self): 

546 """This flow's Eligibility API public key as a string.""" 

547 return self.eligibility_api_public_key.data 

548 

549 @property 

550 def selection_label_template(self): 

551 prefix = "eligibility/includes/selection-label" 

552 if self.uses_claims_verification: 

553 return self.selection_label_template_override or f"{prefix}--{self.system_name}.html" 

554 else: 

555 return self.selection_label_template_override or f"{prefix}--{self.agency_card_name}.html" 

556 

557 @property 

558 def eligibility_start_template(self): 

559 prefix = "eligibility/start" 

560 if self.uses_claims_verification: 

561 return self.eligibility_start_template_override or f"{prefix}--{self.system_name}.html" 

562 else: 

563 return self.eligibility_start_template_override or f"{prefix}--{self.agency_card_name}.html" 

564 

565 @property 

566 def eligibility_unverified_template(self): 

567 prefix = "eligibility/unverified" 

568 if self.uses_claims_verification: 

569 return self.eligibility_unverified_template_override or f"{prefix}.html" 

570 else: 

571 return self.eligibility_unverified_template_override or f"{prefix}--{self.agency_card_name}.html" 

572 

573 @property 

574 def uses_claims_verification(self): 

575 """True if this flow verifies via the claims provider and has a scope and claim. False otherwise.""" 

576 return self.claims_provider is not None and bool(self.claims_scope) and bool(self.claims_eligibility_claim) 

577 

578 @property 

579 def claims_scheme(self): 

580 return self.claims_scheme_override or self.claims_provider.scheme 

581 

582 @property 

583 def claims_all_claims(self): 

584 claims = [self.claims_eligibility_claim] 

585 if self.claims_extra_claims is not None: 

586 claims.extend(self.claims_extra_claims.split()) 

587 return claims 

588 

589 @property 

590 def eligibility_verifier(self): 

591 """A str representing the entity that verifies eligibility for this flow. 

592 

593 Either the client name of the flow's claims provider, or the URL to the eligibility API. 

594 """ 

595 if self.uses_claims_verification: 

596 return self.claims_provider.client_name 

597 else: 

598 return self.eligibility_api_url 

599 

600 @property 

601 def enrollment_index_template(self): 

602 prefix = "enrollment/index" 

603 if self.uses_claims_verification: 

604 return self.enrollment_index_template_override or f"{prefix}.html" 

605 else: 

606 return self.enrollment_index_template_override or f"{prefix}--agency-card.html" 

607 

608 @property 

609 def enrollment_success_template(self): 

610 prefix = "enrollment/success" 

611 if self.uses_claims_verification: 

612 return self.enrollment_success_template_override or f"{prefix}--{self.transit_agency.slug}.html" 

613 else: 

614 return self.enrollment_success_template_override or f"{prefix}--{self.agency_card_name}.html" 

615 

616 def clean(self): 

617 field_errors = {} 

618 template_errors = [] 

619 

620 if self.supports_expiration: 

621 expiration_days = self.expiration_days 

622 expiration_reenrollment_days = self.expiration_reenrollment_days 

623 reenrollment_error_template = self.reenrollment_error_template 

624 

625 message = "When support_expiration is True, this value must be greater than 0." 

626 if expiration_days is None or expiration_days <= 0: 

627 field_errors.update(expiration_days=ValidationError(message)) 

628 if expiration_reenrollment_days is None or expiration_reenrollment_days <= 0: 

629 field_errors.update(expiration_reenrollment_days=ValidationError(message)) 

630 if not reenrollment_error_template: 

631 field_errors.update(reenrollment_error_template=ValidationError("Required when supports expiration is True.")) 

632 

633 if self.transit_agency: 

634 if self.claims_provider: 

635 message = "Required for claims verification." 

636 needed = dict( 

637 claims_scope=self.claims_scope, 

638 claims_eligibility_claim=self.claims_eligibility_claim, 

639 ) 

640 field_errors.update({k: ValidationError(message) for k, v in needed.items() if not v}) 

641 else: 

642 message = "Required for Eligibility API verification." 

643 needed = dict( 

644 eligibility_api_auth_header=self.eligibility_api_auth_header, 

645 eligibility_api_auth_key_secret_name=self.eligibility_api_auth_key_secret_name, 

646 eligibility_api_jwe_cek_enc=self.eligibility_api_jwe_cek_enc, 

647 eligibility_api_jwe_encryption_alg=self.eligibility_api_jwe_encryption_alg, 

648 eligibility_api_jws_signing_alg=self.eligibility_api_jws_signing_alg, 

649 eligibility_api_public_key=self.eligibility_api_public_key, 

650 eligibility_api_url=self.eligibility_api_url, 

651 eligibility_form_class=self.eligibility_form_class, 

652 ) 

653 field_errors.update({k: ValidationError(message) for k, v in needed.items() if not v}) 

654 

655 templates = [ 

656 self.selection_label_template, 

657 self.eligibility_start_template, 

658 self.eligibility_unverified_template, 

659 self.enrollment_index_template, 

660 self.enrollment_success_template, 

661 ] 

662 if self.supports_expiration: 

663 templates.append(self.reenrollment_error_template) 

664 

665 # since templates are calculated from the pattern or the override field 

666 # we can't add a field-level validation error 

667 # so just create directly for a missing template 

668 for t in templates: 

669 if not template_path(t): 

670 template_errors.append(ValidationError(f"Template not found: {t}")) 

671 

672 if field_errors: 

673 raise ValidationError(field_errors) 

674 if template_errors: 

675 raise ValidationError(template_errors) 

676 

677 def eligibility_form_instance(self, *args, **kwargs): 

678 """Return an instance of this flow's EligibilityForm, or None.""" 

679 if not bool(self.eligibility_form_class): 

680 return None 

681 

682 # inspired by https://stackoverflow.com/a/30941292 

683 module_name, class_name = self.eligibility_form_class.rsplit(".", 1) 

684 FormClass = getattr(importlib.import_module(module_name), class_name) 

685 

686 return FormClass(*args, **kwargs) 

687 

688 @staticmethod 

689 def by_id(id): 

690 """Get an EnrollmentFlow instance by its ID.""" 

691 logger.debug(f"Get {EnrollmentFlow.__name__} by id: {id}") 

692 return EnrollmentFlow.objects.get(id=id) 

693 

694 

695class EnrollmentEvent(models.Model): 

696 """A record of a successful enrollment.""" 

697 

698 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

699 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT) 

700 enrollment_flow = models.ForeignKey(EnrollmentFlow, on_delete=models.PROTECT) 

701 enrollment_method = models.TextField( 

702 choices={ 

703 EnrollmentMethods.DIGITAL: EnrollmentMethods.DIGITAL, 

704 EnrollmentMethods.IN_PERSON: EnrollmentMethods.IN_PERSON, 

705 } 

706 ) 

707 verified_by = models.TextField() 

708 enrollment_datetime = models.DateTimeField(default=timezone.now) 

709 expiration_datetime = models.DateTimeField(blank=True, null=True) 

710 extra_claims = models.TextField(blank=True, default="") 

711 

712 def __str__(self): 

713 dt = timezone.localtime(self.enrollment_datetime) 

714 ts = dt.strftime("%b %d, %Y, %I:%M %p") 

715 return f"{ts}, {self.transit_agency}, {self.enrollment_flow}"