Coverage for benefits/core/models.py: 98%

223 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-21 19:31 +0000

1""" 

2The core application: Common model definitions. 

3""" 

4 

5from functools import cached_property 

6import importlib 

7import logging 

8import uuid 

9 

10from django.conf import settings 

11from django.core.exceptions import ValidationError 

12from django.contrib.auth.models import Group, User 

13from django.db import models 

14from django.urls import reverse 

15from django.utils import timezone 

16 

17import requests 

18 

19from benefits.routes import routes 

20from benefits.secrets import NAME_VALIDATOR, get_secret_by_name 

21from multiselectfield import MultiSelectField 

22 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class SecretNameField(models.SlugField): 

28 """Field that stores the name of a secret held in a secret store. 

29 

30 The secret value itself MUST NEVER be stored in this field. 

31 """ 

32 

33 description = """Field that stores the name of a secret held in a secret store. 

34 

35 Secret names must be between 1-127 alphanumeric ASCII characters or hyphen characters. 

36 

37 The secret value itself MUST NEVER be stored in this field. 

38 """ 

39 

40 def __init__(self, *args, **kwargs): 

41 kwargs["validators"] = [NAME_VALIDATOR] 

42 # although the validator also checks for a max length of 127 

43 # this setting enforces the length at the database column level as well 

44 kwargs["max_length"] = 127 

45 # the default is False, but this is more explicit 

46 kwargs["allow_unicode"] = False 

47 super().__init__(*args, **kwargs) 

48 

49 

50class PemData(models.Model): 

51 """API Certificate or Key in PEM format.""" 

52 

53 id = models.AutoField(primary_key=True) 

54 # Human description of the PEM data 

55 label = models.TextField() 

56 # The name of a secret with data in utf-8 encoded PEM text format 

57 text_secret_name = SecretNameField(null=True, blank=True) 

58 # Public URL hosting the utf-8 encoded PEM text 

59 remote_url = models.TextField(null=True, blank=True) 

60 

61 def __str__(self): 

62 return self.label 

63 

64 @cached_property 

65 def data(self): 

66 """ 

67 Attempts to get data from `remote_url` or `text_secret_name`, with the latter taking precendence if both are defined. 

68 """ 

69 remote_data = None 

70 secret_data = None 

71 

72 if self.remote_url: 

73 remote_data = requests.get(self.remote_url, timeout=settings.REQUESTS_TIMEOUT).text 

74 if self.text_secret_name: 

75 try: 

76 secret_data = get_secret_by_name(self.text_secret_name) 

77 except Exception: 

78 secret_data = None 

79 

80 return secret_data if secret_data is not None else remote_data 

81 

82 

83class ClaimsProvider(models.Model): 

84 """An entity that provides claims for eligibility verification.""" 

85 

86 id = models.AutoField(primary_key=True) 

87 sign_out_button_template = models.TextField(null=True, blank=True, help_text="Template that renders sign-out button") 

88 sign_out_link_template = models.TextField(null=True, blank=True, help_text="Template that renders sign-out link") 

89 client_name = models.TextField(help_text="Unique identifier used to register this claims provider with Authlib registry") 

90 client_id_secret_name = SecretNameField( 

91 help_text="The name of the secret containing the client ID for this claims provider" 

92 ) 

93 authority = models.TextField(help_text="The fully qualified HTTPS domain name for an OAuth authority server") 

94 scheme = models.TextField(help_text="The authentication scheme to use") 

95 

96 @property 

97 def supports_sign_out(self): 

98 return bool(self.sign_out_button_template) or bool(self.sign_out_link_template) 

99 

100 @property 

101 def client_id(self): 

102 return get_secret_by_name(self.client_id_secret_name) 

103 

104 def __str__(self) -> str: 

105 return self.client_name 

106 

107 

108class TransitProcessor(models.Model): 

109 """An entity that applies transit agency fare rules to rider transactions.""" 

110 

111 id = models.AutoField(primary_key=True) 

112 name = models.TextField(help_text="Primary internal display name for this TransitProcessor instance, e.g. in the Admin.") 

113 api_base_url = models.TextField(help_text="The absolute base URL for the TransitProcessor's API, including https://.") 

114 card_tokenize_url = models.TextField( 

115 help_text="The absolute URL for the client-side card tokenization library provided by the TransitProcessor." 

116 ) 

117 card_tokenize_func = models.TextField( 

118 help_text="The function from the card tokenization library to call on the client to initiate the process." 

119 ) 

120 card_tokenize_env = models.TextField(help_text="The environment in which card tokenization is occurring.") 

121 portal_url = models.TextField( 

122 null=True, blank=True, help_text="The absolute base URL for the TransitProcessor's control portal, including https://." 

123 ) 

124 

125 def __str__(self): 

126 return self.name 

127 

128 

129class TransitAgency(models.Model): 

130 """An agency offering transit service.""" 

131 

132 id = models.AutoField(primary_key=True) 

133 active = models.BooleanField(default=False, help_text="Determines if this Agency is enabled for users") 

134 slug = models.TextField(help_text="Used for URL navigation for this agency, e.g. the agency homepage url is /{slug}") 

135 short_name = models.TextField(help_text="The user-facing short name for this agency. Often an uppercase acronym.") 

136 long_name = models.TextField( 

137 help_text="The user-facing long name for this agency. Often the short_name acronym, spelled out." 

138 ) 

139 info_url = models.URLField(help_text="URL of a website/page with more information about the agency's discounts") 

140 phone = models.TextField(help_text="Agency customer support phone number") 

141 index_template = models.TextField(help_text="The template used for this agency's landing page") 

142 eligibility_index_template = models.TextField(help_text="The template used for this agency's eligibility landing page") 

143 eligibility_api_id = models.TextField(help_text="The identifier for this agency used in Eligibility API calls.") 

144 eligibility_api_private_key = models.ForeignKey( 

145 PemData, 

146 related_name="+", 

147 on_delete=models.PROTECT, 

148 help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.", 

149 ) 

150 eligibility_api_public_key = models.ForeignKey( 

151 PemData, 

152 related_name="+", 

153 on_delete=models.PROTECT, 

154 help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 

155 ) 

156 eligibility_api_jws_signing_alg = models.TextField( 

157 help_text="The JWS-compatible signing algorithm used in Eligibility API calls." 

158 ) 

159 transit_processor = models.ForeignKey(TransitProcessor, on_delete=models.PROTECT) 

160 transit_processor_audience = models.TextField( 

161 help_text="This agency's audience value used to access the TransitProcessor's API.", default="" 

162 ) 

163 transit_processor_client_id = models.TextField( 

164 help_text="This agency's client_id value used to access the TransitProcessor's API.", default="" 

165 ) 

166 transit_processor_client_secret_name = SecretNameField( 

167 help_text="The name of the secret containing this agency's client_secret value used to access the TransitProcessor's API.", # noqa: E501 

168 default="", 

169 ) 

170 staff_group = models.OneToOneField( 

171 Group, 

172 on_delete=models.PROTECT, 

173 null=True, 

174 blank=True, 

175 default=None, 

176 help_text="The group of users associated with this TransitAgency.", 

177 related_name="transit_agency", 

178 ) 

179 sso_domain = models.TextField( 

180 null=True, 

181 blank=True, 

182 default="", 

183 help_text="The email domain of users to automatically add to this agency's staff group upon login.", 

184 ) 

185 customer_service_group = models.OneToOneField( 

186 Group, 

187 on_delete=models.PROTECT, 

188 null=True, 

189 blank=True, 

190 default=None, 

191 help_text="The group of users who are allowed to do in-person eligibility verification and enrollment.", 

192 related_name="+", 

193 ) 

194 

195 def __str__(self): 

196 return self.long_name 

197 

198 @property 

199 def index_url(self): 

200 """Public-facing URL to the TransitAgency's landing page.""" 

201 return reverse(routes.AGENCY_INDEX, args=[self.slug]) 

202 

203 @property 

204 def eligibility_index_url(self): 

205 """Public facing URL to the TransitAgency's eligibility page.""" 

206 return reverse(routes.AGENCY_ELIGIBILITY_INDEX, args=[self.slug]) 

207 

208 @property 

209 def eligibility_api_private_key_data(self): 

210 """This Agency's private key as a string.""" 

211 return self.eligibility_api_private_key.data 

212 

213 @property 

214 def eligibility_api_public_key_data(self): 

215 """This Agency's public key as a string.""" 

216 return self.eligibility_api_public_key.data 

217 

218 @property 

219 def transit_processor_client_secret(self): 

220 return get_secret_by_name(self.transit_processor_client_secret_name) 

221 

222 @property 

223 def enrollment_flows(self): 

224 return self.enrollmentflow_set 

225 

226 @staticmethod 

227 def by_id(id): 

228 """Get a TransitAgency instance by its ID.""" 

229 logger.debug(f"Get {TransitAgency.__name__} by id: {id}") 

230 return TransitAgency.objects.get(id=id) 

231 

232 @staticmethod 

233 def by_slug(slug): 

234 """Get a TransitAgency instance by its slug.""" 

235 logger.debug(f"Get {TransitAgency.__name__} by slug: {slug}") 

236 return TransitAgency.objects.filter(slug=slug).first() 

237 

238 @staticmethod 

239 def all_active(): 

240 """Get all TransitAgency instances marked active.""" 

241 logger.debug(f"Get all active {TransitAgency.__name__}") 

242 return TransitAgency.objects.filter(active=True) 

243 

244 @staticmethod 

245 def for_user(user: User): 

246 for group in user.groups.all(): 

247 if hasattr(group, "transit_agency"): 

248 return group.transit_agency # this is looking at the TransitAgency's staff_group 

249 

250 # the loop above returns the first match found. Return None if no match was found. 

251 return None 

252 

253 

254class EnrollmentMethods: 

255 DIGITAL = "digital" 

256 IN_PERSON = "in_person" 

257 

258 

259SUPPORTED_METHODS = ( 

260 (EnrollmentMethods.DIGITAL, EnrollmentMethods.DIGITAL.capitalize()), 

261 (EnrollmentMethods.IN_PERSON, EnrollmentMethods.IN_PERSON.replace("_", "-").capitalize()), 

262) 

263 

264 

265class EnrollmentFlow(models.Model): 

266 """Represents a user journey through the Benefits app for a single eligibility type.""" 

267 

268 id = models.AutoField(primary_key=True) 

269 system_name = models.TextField( 

270 help_text="Primary internal system name for this EnrollmentFlow instance, e.g. in analytics and Eligibility API requests." # noqa: 501 

271 ) 

272 display_order = models.PositiveSmallIntegerField(default=0, blank=False, null=False) 

273 claims_provider = models.ForeignKey( 

274 ClaimsProvider, 

275 on_delete=models.PROTECT, 

276 null=True, 

277 blank=True, 

278 help_text="An entity that provides claims for eligibility verification for this flow.", 

279 ) 

280 claims_scope = models.TextField( 

281 null=True, 

282 blank=True, 

283 help_text="A space-separated list of identifiers used to specify what access privileges are being requested", 

284 ) 

285 claims_claim = models.TextField( 

286 null=True, blank=True, help_text="The name of the claim (name/value pair) that is used to verify eligibility" 

287 ) 

288 claims_scheme_override = models.TextField( 

289 help_text="The authentication scheme to use (Optional). If blank, defaults to the value in Claims providers", 

290 default=None, 

291 null=True, 

292 blank=True, 

293 verbose_name="Claims scheme", 

294 ) 

295 eligibility_api_url = models.TextField( 

296 null=True, blank=True, help_text="Fully qualified URL for an Eligibility API server used by this flow." 

297 ) 

298 eligibility_api_auth_header = models.TextField( 

299 null=True, 

300 blank=True, 

301 help_text="The auth header to send in Eligibility API requests for this flow.", 

302 ) 

303 eligibility_api_auth_key_secret_name = SecretNameField( 

304 null=True, 

305 blank=True, 

306 help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests for this flow.", # noqa: 501 

307 ) 

308 eligibility_api_public_key = models.ForeignKey( 

309 PemData, 

310 related_name="+", 

311 on_delete=models.PROTECT, 

312 null=True, 

313 blank=True, 

314 help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses for this flow.", # noqa: E501 

315 ) 

316 eligibility_api_jwe_cek_enc = models.TextField( 

317 null=True, 

318 blank=True, 

319 help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests for this flow.", # noqa: E501 

320 ) 

321 eligibility_api_jwe_encryption_alg = models.TextField( 

322 null=True, 

323 blank=True, 

324 help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests for this flow.", 

325 ) 

326 eligibility_api_jws_signing_alg = models.TextField( 

327 null=True, 

328 blank=True, 

329 help_text="The JWS-compatible signing algorithm to use in Eligibility API requests for this flow.", 

330 ) 

331 selection_label_template = models.TextField( 

332 help_text="Path to a Django template that defines the end-user UI for selecting this flow among other options." 

333 ) 

334 eligibility_start_template = models.TextField( 

335 default="eligibility/start.html", help_text="Path to a Django template for the informational page of this flow." 

336 ) 

337 eligibility_form_class = models.TextField( 

338 null=True, 

339 blank=True, 

340 help_text="The fully qualified Python path of a form class used by this flow, e.g. benefits.eligibility.forms.FormClass", # noqa: E501 

341 ) 

342 eligibility_unverified_template = models.TextField( 

343 default="eligibility/unverified.html", 

344 help_text="Path to a Django template that defines the page when a user fails eligibility verification for this flow.", 

345 ) 

346 help_template = models.TextField( 

347 null=True, 

348 blank=True, 

349 help_text="Path to a Django template that defines the help text for this enrollment flow, used in building the dynamic help page for an agency", # noqa: E501 

350 ) 

351 label = models.TextField( 

352 null=True, 

353 help_text="A human readable label, used as the display text in Admin.", 

354 ) 

355 group_id = models.TextField(null=True, help_text="Reference to the TransitProcessor group for user enrollment") 

356 supports_expiration = models.BooleanField( 

357 default=False, help_text="Indicates if the enrollment expires or does not expire" 

358 ) 

359 expiration_days = models.PositiveSmallIntegerField( 

360 null=True, blank=True, help_text="If the enrollment supports expiration, number of days before the eligibility expires" 

361 ) 

362 expiration_reenrollment_days = models.PositiveSmallIntegerField( 

363 null=True, 

364 blank=True, 

365 help_text="If the enrollment supports expiration, number of days preceding the expiration date during which a user can re-enroll in the eligibilty", # noqa: E501 

366 ) 

367 enrollment_index_template = models.TextField( 

368 default="enrollment/index.html", 

369 help_text="Template for the Eligibility Confirmation page (which is the index of the enrollment Django app)", 

370 ) 

371 reenrollment_error_template = models.TextField( 

372 null=True, blank=True, help_text="Template for a re-enrollment error associated with the enrollment flow" 

373 ) 

374 enrollment_success_template = models.TextField( 

375 default="enrollment/success.html", help_text="Template for a successful enrollment associated with the enrollment flow" 

376 ) 

377 supported_enrollment_methods = MultiSelectField( 

378 choices=SUPPORTED_METHODS, 

379 max_choices=2, 

380 max_length=50, 

381 default=[EnrollmentMethods.DIGITAL, EnrollmentMethods.IN_PERSON], 

382 help_text="If the flow is supported by digital enrollment, in-person enrollment, or both", 

383 ) 

384 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT, null=True, blank=True) 

385 

386 class Meta: 

387 ordering = ["display_order"] 

388 

389 def __str__(self): 

390 return self.label 

391 

392 @property 

393 def eligibility_api_auth_key(self): 

394 if self.eligibility_api_auth_key_secret_name is not None: 394 ↛ 397line 394 didn't jump to line 397 because the condition on line 394 was always true

395 return get_secret_by_name(self.eligibility_api_auth_key_secret_name) 

396 else: 

397 return None 

398 

399 @property 

400 def eligibility_api_public_key_data(self): 

401 """This flow's Eligibility API public key as a string.""" 

402 return self.eligibility_api_public_key.data 

403 

404 @property 

405 def uses_claims_verification(self): 

406 """True if this flow verifies via the claims provider and has a scope and claim. False otherwise.""" 

407 return self.claims_provider is not None and bool(self.claims_scope) and bool(self.claims_claim) 

408 

409 @property 

410 def eligibility_verifier(self): 

411 """A str representing the entity that verifies eligibility for this flow. 

412 

413 Either the client name of the flow's claims provider, or the URL to the eligibility API. 

414 """ 

415 if self.uses_claims_verification: 

416 return self.claims_provider.client_name 

417 else: 

418 return self.eligibility_api_url 

419 

420 def eligibility_form_instance(self, *args, **kwargs): 

421 """Return an instance of this flow's EligibilityForm, or None.""" 

422 if not bool(self.eligibility_form_class): 

423 return None 

424 

425 # inspired by https://stackoverflow.com/a/30941292 

426 module_name, class_name = self.eligibility_form_class.rsplit(".", 1) 

427 FormClass = getattr(importlib.import_module(module_name), class_name) 

428 

429 return FormClass(*args, **kwargs) 

430 

431 @staticmethod 

432 def by_id(id): 

433 """Get an EnrollmentFlow instance by its ID.""" 

434 logger.debug(f"Get {EnrollmentFlow.__name__} by id: {id}") 

435 return EnrollmentFlow.objects.get(id=id) 

436 

437 def clean(self): 

438 supports_expiration = self.supports_expiration 

439 expiration_days = self.expiration_days 

440 expiration_reenrollment_days = self.expiration_reenrollment_days 

441 reenrollment_error_template = self.reenrollment_error_template 

442 

443 if supports_expiration: 

444 errors = {} 

445 message = "When support_expiration is True, this value must be greater than 0." 

446 if expiration_days is None or expiration_days <= 0: 

447 errors.update(expiration_days=ValidationError(message)) 

448 if expiration_reenrollment_days is None or expiration_reenrollment_days <= 0: 

449 errors.update(expiration_reenrollment_days=ValidationError(message)) 

450 if reenrollment_error_template is None: 

451 errors.update(reenrollment_error_template=ValidationError("Required when supports expiration is True.")) 

452 

453 if errors: 

454 raise ValidationError(errors) 

455 

456 @property 

457 def claims_scheme(self): 

458 if not self.claims_scheme_override: 

459 return self.claims_provider.scheme 

460 return self.claims_scheme_override 

461 

462 

463class EnrollmentEvent(models.Model): 

464 """A record of a successful enrollment.""" 

465 

466 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

467 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT) 

468 enrollment_flow = models.ForeignKey(EnrollmentFlow, on_delete=models.PROTECT) 

469 enrollment_method = models.TextField( 

470 choices={ 

471 EnrollmentMethods.DIGITAL: EnrollmentMethods.DIGITAL, 

472 EnrollmentMethods.IN_PERSON: EnrollmentMethods.IN_PERSON, 

473 } 

474 ) 

475 verified_by = models.TextField() 

476 enrollment_datetime = models.DateTimeField(default=timezone.now) 

477 expiration_datetime = models.DateTimeField(blank=True, null=True) 

478 

479 def __str__(self): 

480 dt = timezone.localtime(self.enrollment_datetime) 

481 ts = dt.strftime("%b %d, %Y, %I:%M %p") 

482 return f"{ts}, {self.transit_agency}, {self.enrollment_flow}"