Coverage for benefits/core/models/enrollment.py: 99%

166 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-29 21:21 +0000

1import importlib 

2import logging 

3import uuid 

4 

5from django.core.exceptions import ValidationError 

6from django.db import models 

7from django.utils import timezone 

8from multiselectfield import MultiSelectField 

9 

10from .common import PemData, SecretNameField, template_path 

11from .claims import ClaimsProvider 

12from .transit import TransitAgency 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class EnrollmentMethods: 

18 DIGITAL = "digital" 

19 IN_PERSON = "in_person" 

20 

21 

22SUPPORTED_METHODS = ( 

23 (EnrollmentMethods.DIGITAL, EnrollmentMethods.DIGITAL.capitalize()), 

24 (EnrollmentMethods.IN_PERSON, EnrollmentMethods.IN_PERSON.replace("_", "-").capitalize()), 

25) 

26 

27 

28class EnrollmentFlow(models.Model): 

29 """Represents a user journey through the Benefits app for a single eligibility type.""" 

30 

31 id = models.AutoField(primary_key=True) 

32 system_name = models.SlugField( 

33 help_text="Primary internal system name for this EnrollmentFlow instance, e.g. in analytics and Eligibility API requests." # noqa: 501 

34 ) 

35 label = models.TextField( 

36 blank=True, 

37 default="", 

38 help_text="A human readable label, used as the display text in Admin.", 

39 ) 

40 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT, null=True, blank=True) 

41 supported_enrollment_methods = MultiSelectField( 

42 choices=SUPPORTED_METHODS, 

43 max_choices=2, 

44 max_length=50, 

45 default=[EnrollmentMethods.DIGITAL, EnrollmentMethods.IN_PERSON], 

46 help_text="If the flow is supported by digital enrollment, in-person enrollment, or both", 

47 ) 

48 group_id = models.TextField( 

49 blank=True, default="", help_text="Reference to the TransitProcessor group for user enrollment" 

50 ) 

51 claims_provider = models.ForeignKey( 

52 ClaimsProvider, 

53 on_delete=models.PROTECT, 

54 null=True, 

55 blank=True, 

56 help_text="An entity that provides claims for eligibility verification for this flow.", 

57 ) 

58 claims_scope = models.TextField( 

59 blank=True, 

60 default="", 

61 help_text="A space-separated list of identifiers used to specify what access privileges are being requested", 

62 ) 

63 claims_eligibility_claim = models.TextField( 

64 blank=True, default="", help_text="The name of the claim that is used to verify eligibility" 

65 ) 

66 claims_extra_claims = models.TextField(blank=True, default="", help_text="A space-separated list of any additional claims") 

67 claims_scheme_override = models.TextField( 

68 help_text="The authentication scheme to use (Optional). If blank, defaults to the value in Claims providers", 

69 default="", 

70 blank=True, 

71 verbose_name="Claims scheme", 

72 ) 

73 eligibility_api_url = models.TextField( 

74 blank=True, default="", help_text="Fully qualified URL for an Eligibility API server used by this flow." 

75 ) 

76 eligibility_api_auth_header = models.TextField( 

77 blank=True, 

78 default="", 

79 help_text="The auth header to send in Eligibility API requests for this flow.", 

80 ) 

81 eligibility_api_auth_key_secret_name = SecretNameField( 

82 blank=True, 

83 default="", 

84 help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests for this flow.", # noqa: 501 

85 ) 

86 eligibility_api_public_key = models.ForeignKey( 

87 PemData, 

88 related_name="+", 

89 on_delete=models.PROTECT, 

90 null=True, 

91 blank=True, 

92 help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses for this flow.", # noqa: E501 

93 ) 

94 eligibility_api_jwe_cek_enc = models.TextField( 

95 blank=True, 

96 default="", 

97 help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests for this flow.", # noqa: E501 

98 ) 

99 eligibility_api_jwe_encryption_alg = models.TextField( 

100 blank=True, 

101 default="", 

102 help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests for this flow.", 

103 ) 

104 eligibility_api_jws_signing_alg = models.TextField( 

105 blank=True, 

106 default="", 

107 help_text="The JWS-compatible signing algorithm to use in Eligibility API requests for this flow.", 

108 ) 

109 eligibility_form_class = models.TextField( 

110 blank=True, 

111 default="", 

112 help_text="The fully qualified Python path of a form class used by this flow, e.g. benefits.eligibility.forms.FormClass", # noqa: E501 

113 ) 

114 selection_label_template_override = models.TextField( 

115 blank=True, 

116 default="", 

117 help_text="Override the default template that defines the end-user UI for selecting this flow among other options.", 

118 ) 

119 eligibility_start_template_override = models.TextField( 

120 blank=True, 

121 default="", 

122 help_text="Override the default template for the informational page of this flow.", 

123 ) 

124 eligibility_unverified_template_override = models.TextField( 

125 blank=True, 

126 default="", 

127 help_text="Override the default template that defines the page when a user fails eligibility verification for this flow.", # noqa: E501 

128 ) 

129 help_template = models.TextField( 

130 blank=True, 

131 default="", 

132 help_text="Path to a Django template that defines the help text for this enrollment flow, used in building the dynamic help page for an agency", # noqa: E501 

133 ) 

134 supports_expiration = models.BooleanField( 

135 default=False, help_text="Indicates if the enrollment expires or does not expire" 

136 ) 

137 expiration_days = models.PositiveSmallIntegerField( 

138 null=True, blank=True, help_text="If the enrollment supports expiration, number of days before the eligibility expires" 

139 ) 

140 expiration_reenrollment_days = models.PositiveSmallIntegerField( 

141 null=True, 

142 blank=True, 

143 help_text="If the enrollment supports expiration, number of days preceding the expiration date during which a user can re-enroll in the eligibilty", # noqa: E501 

144 ) 

145 enrollment_index_template_override = models.TextField( 

146 blank=True, 

147 default="", 

148 help_text="Override the default template for the Eligibility Confirmation page (the index of the enrollment app)", 

149 ) 

150 reenrollment_error_template = models.TextField( 

151 blank=True, default="", help_text="Template for a re-enrollment error associated with the enrollment flow" 

152 ) 

153 enrollment_success_template_override = models.TextField( 

154 blank=True, 

155 default="", 

156 help_text="Override the default template for a successful enrollment associated with the enrollment flow", 

157 ) 

158 display_order = models.PositiveSmallIntegerField(default=0, blank=False, null=False) 

159 

160 class Meta: 

161 ordering = ["display_order"] 

162 

163 def __str__(self): 

164 return self.label 

165 

166 @property 

167 def agency_card_name(self): 

168 if self.uses_claims_verification: 

169 return "" 

170 else: 

171 return f"{self.transit_agency.slug}-agency-card" 

172 

173 @property 

174 def eligibility_api_auth_key(self): 

175 if self.eligibility_api_auth_key_secret_name is not None: 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true

176 secret_field = self._meta.get_field("eligibility_api_auth_key_secret_name") 

177 return secret_field.secret_value(self) 

178 else: 

179 return None 

180 

181 @property 

182 def eligibility_api_public_key_data(self): 

183 """This flow's Eligibility API public key as a string.""" 

184 return self.eligibility_api_public_key.data 

185 

186 @property 

187 def selection_label_template(self): 

188 prefix = "eligibility/includes/selection-label" 

189 if self.uses_claims_verification: 

190 return self.selection_label_template_override or f"{prefix}--{self.system_name}.html" 

191 else: 

192 return self.selection_label_template_override or f"{prefix}--{self.agency_card_name}.html" 

193 

194 @property 

195 def eligibility_start_template(self): 

196 prefix = "eligibility/start" 

197 if self.uses_claims_verification: 

198 return self.eligibility_start_template_override or f"{prefix}--{self.system_name}.html" 

199 else: 

200 return self.eligibility_start_template_override or f"{prefix}--{self.agency_card_name}.html" 

201 

202 @property 

203 def eligibility_unverified_template(self): 

204 prefix = "eligibility/unverified" 

205 if self.uses_claims_verification: 

206 return self.eligibility_unverified_template_override or f"{prefix}.html" 

207 else: 

208 return self.eligibility_unverified_template_override or f"{prefix}--{self.agency_card_name}.html" 

209 

210 @property 

211 def uses_claims_verification(self): 

212 """True if this flow verifies via the claims provider and has a scope and claim. False otherwise.""" 

213 return self.claims_provider is not None and bool(self.claims_scope) and bool(self.claims_eligibility_claim) 

214 

215 @property 

216 def claims_scheme(self): 

217 return self.claims_scheme_override or self.claims_provider.scheme 

218 

219 @property 

220 def claims_all_claims(self): 

221 claims = [self.claims_eligibility_claim] 

222 if self.claims_extra_claims is not None: 

223 claims.extend(self.claims_extra_claims.split()) 

224 return claims 

225 

226 @property 

227 def eligibility_verifier(self): 

228 """A str representing the entity that verifies eligibility for this flow. 

229 

230 Either the client name of the flow's claims provider, or the URL to the eligibility API. 

231 """ 

232 if self.uses_claims_verification: 

233 return self.claims_provider.client_name 

234 else: 

235 return self.eligibility_api_url 

236 

237 @property 

238 def enrollment_index_template(self): 

239 prefix = "enrollment/index" 

240 if self.uses_claims_verification: 

241 return self.enrollment_index_template_override or f"{prefix}.html" 

242 else: 

243 return self.enrollment_index_template_override or f"{prefix}--agency-card.html" 

244 

245 @property 

246 def enrollment_success_template(self): 

247 prefix = "enrollment/success" 

248 if self.uses_claims_verification: 

249 return self.enrollment_success_template_override or f"{prefix}--{self.transit_agency.slug}.html" 

250 else: 

251 return self.enrollment_success_template_override or f"{prefix}--{self.agency_card_name}.html" 

252 

253 def clean(self): 

254 field_errors = {} 

255 template_errors = [] 

256 

257 if self.supports_expiration: 

258 expiration_days = self.expiration_days 

259 expiration_reenrollment_days = self.expiration_reenrollment_days 

260 reenrollment_error_template = self.reenrollment_error_template 

261 

262 message = "When support_expiration is True, this value must be greater than 0." 

263 if expiration_days is None or expiration_days <= 0: 

264 field_errors.update(expiration_days=ValidationError(message)) 

265 if expiration_reenrollment_days is None or expiration_reenrollment_days <= 0: 

266 field_errors.update(expiration_reenrollment_days=ValidationError(message)) 

267 if not reenrollment_error_template: 

268 field_errors.update(reenrollment_error_template=ValidationError("Required when supports expiration is True.")) 

269 

270 if self.transit_agency: 

271 if self.claims_provider: 

272 message = "Required for claims verification." 

273 needed = dict( 

274 claims_scope=self.claims_scope, 

275 claims_eligibility_claim=self.claims_eligibility_claim, 

276 ) 

277 field_errors.update({k: ValidationError(message) for k, v in needed.items() if not v}) 

278 else: 

279 message = "Required for Eligibility API verification." 

280 needed = dict( 

281 eligibility_api_auth_header=self.eligibility_api_auth_header, 

282 eligibility_api_auth_key_secret_name=self.eligibility_api_auth_key_secret_name, 

283 eligibility_api_jwe_cek_enc=self.eligibility_api_jwe_cek_enc, 

284 eligibility_api_jwe_encryption_alg=self.eligibility_api_jwe_encryption_alg, 

285 eligibility_api_jws_signing_alg=self.eligibility_api_jws_signing_alg, 

286 eligibility_api_public_key=self.eligibility_api_public_key, 

287 eligibility_api_url=self.eligibility_api_url, 

288 eligibility_form_class=self.eligibility_form_class, 

289 ) 

290 field_errors.update({k: ValidationError(message) for k, v in needed.items() if not v}) 

291 

292 templates = [ 

293 self.selection_label_template, 

294 self.eligibility_start_template, 

295 self.eligibility_unverified_template, 

296 self.enrollment_index_template, 

297 self.enrollment_success_template, 

298 ] 

299 if self.supports_expiration: 

300 templates.append(self.reenrollment_error_template) 

301 

302 # since templates are calculated from the pattern or the override field 

303 # we can't add a field-level validation error 

304 # so just create directly for a missing template 

305 for t in templates: 

306 if not template_path(t): 

307 template_errors.append(ValidationError(f"Template not found: {t}")) 

308 

309 if field_errors: 

310 raise ValidationError(field_errors) 

311 if template_errors: 

312 raise ValidationError(template_errors) 

313 

314 def eligibility_form_instance(self, *args, **kwargs): 

315 """Return an instance of this flow's EligibilityForm, or None.""" 

316 if not bool(self.eligibility_form_class): 

317 return None 

318 

319 # inspired by https://stackoverflow.com/a/30941292 

320 module_name, class_name = self.eligibility_form_class.rsplit(".", 1) 

321 FormClass = getattr(importlib.import_module(module_name), class_name) 

322 

323 return FormClass(*args, **kwargs) 

324 

325 @staticmethod 

326 def by_id(id): 

327 """Get an EnrollmentFlow instance by its ID.""" 

328 logger.debug(f"Get {EnrollmentFlow.__name__} by id: {id}") 

329 return EnrollmentFlow.objects.get(id=id) 

330 

331 

332class EnrollmentEvent(models.Model): 

333 """A record of a successful enrollment.""" 

334 

335 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

336 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT) 

337 enrollment_flow = models.ForeignKey(EnrollmentFlow, on_delete=models.PROTECT) 

338 enrollment_method = models.TextField( 

339 choices={ 

340 EnrollmentMethods.DIGITAL: EnrollmentMethods.DIGITAL, 

341 EnrollmentMethods.IN_PERSON: EnrollmentMethods.IN_PERSON, 

342 } 

343 ) 

344 verified_by = models.TextField() 

345 enrollment_datetime = models.DateTimeField(default=timezone.now) 

346 expiration_datetime = models.DateTimeField(blank=True, null=True) 

347 extra_claims = models.TextField(blank=True, default="") 

348 

349 def __str__(self): 

350 dt = timezone.localtime(self.enrollment_datetime) 

351 ts = dt.strftime("%b %d, %Y, %I:%M %p") 

352 return f"{ts}, {self.transit_agency}, {self.enrollment_flow}"