Coverage for benefits / core / models / enrollment.py: 94%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 19:35 +0000

1import logging 

2import uuid 

3 

4from cdt_identity.models import ClaimsVerificationRequest, IdentityGatewayConfig 

5from django.core.exceptions import ValidationError 

6from django.db import models 

7from django.utils import timezone 

8from multiselectfield import MultiSelectField 

9 

10from benefits.core import context as core_context 

11from benefits.in_person import context as in_person_context 

12 

13from .common import PemData, SecretNameField, template_path 

14from .transit import TransitAgency 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class EnrollmentMethods: 

20 DIGITAL = "digital" 

21 IN_PERSON = "in_person" 

22 

23 

24SUPPORTED_METHODS = ( 

25 (EnrollmentMethods.DIGITAL, EnrollmentMethods.DIGITAL.capitalize()), 

26 (EnrollmentMethods.IN_PERSON, EnrollmentMethods.IN_PERSON.replace("_", "-").capitalize()), 

27) 

28 

29 

30class EligibilityApiVerificationRequest(models.Model): 

31 """Represents configuration for eligibility verification via Eligibility API calls.""" 

32 

33 id = models.AutoField(primary_key=True) 

34 label = models.SlugField( 

35 help_text="A human readable label, used as the display text in Admin.", 

36 ) 

37 api_url = models.URLField(help_text="Fully qualified URL for an Eligibility API server.") 

38 api_auth_header = models.CharField( 

39 help_text="The auth header to send in Eligibility API requests.", 

40 max_length=50, 

41 ) 

42 api_auth_key_secret_name = SecretNameField( 

43 help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests.", 

44 ) 

45 client_private_key = models.ForeignKey( 

46 PemData, 

47 related_name="+", 

48 on_delete=models.PROTECT, 

49 default=None, 

50 null=True, 

51 help_text="Private key used to sign Eligibility API tokens created on behalf of the Benefits client.", 

52 ) 

53 client_public_key = models.ForeignKey( 

54 PemData, 

55 related_name="+", 

56 on_delete=models.PROTECT, 

57 default=None, 

58 null=True, 

59 help_text="Public key corresponding to the Benefits client's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 

60 ) 

61 api_public_key = models.ForeignKey( 

62 PemData, 

63 related_name="+", 

64 on_delete=models.PROTECT, 

65 help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses.", 

66 ) 

67 api_jwe_cek_enc = models.CharField( 

68 help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests.", 

69 max_length=50, 

70 ) 

71 api_jwe_encryption_alg = models.CharField( 

72 help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests.", 

73 max_length=50, 

74 ) 

75 api_jws_signing_alg = models.CharField( 

76 help_text="The JWS-compatible signing algorithm to use in Eligibility API requests.", 

77 max_length=50, 

78 ) 

79 

80 def __str__(self): 

81 return self.label 

82 

83 @property 

84 def api_auth_key(self): 

85 """The Eligibility API auth key as a string.""" 

86 secret_field = self._meta.get_field("api_auth_key_secret_name") 

87 return secret_field.secret_value(self) 

88 

89 @property 

90 def client_private_key_data(self): 

91 """The private key used to sign Eligibility API tokens created by the Benefits client as a string.""" 

92 return self.client_private_key.data 

93 

94 @property 

95 def client_public_key_data(self): 

96 """The public key corresponding to the Benefits client's private key as a string.""" 

97 return self.client_public_key.data 

98 

99 @property 

100 def api_public_key_data(self): 

101 """The Eligibility API public key as a string.""" 

102 return self.api_public_key.data 

103 

104 

105class EnrollmentFlow(models.Model): 

106 """Represents a user journey through the Benefits app for a single eligibility type.""" 

107 

108 id = models.AutoField(primary_key=True) 

109 system_name = models.SlugField( 

110 choices=core_context.SystemName, 

111 help_text="Primary internal system name for this EnrollmentFlow instance, e.g. in analytics and Eligibility API requests.", # noqa: 501 

112 ) 

113 label = models.TextField( 

114 blank=True, 

115 default="", 

116 help_text="A human readable label, used as the display text in Admin.", 

117 ) 

118 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT, null=True, blank=True) 

119 supported_enrollment_methods = MultiSelectField( 

120 choices=SUPPORTED_METHODS, 

121 max_choices=2, 

122 max_length=50, 

123 default=[EnrollmentMethods.DIGITAL, EnrollmentMethods.IN_PERSON], 

124 help_text="If the flow is supported by digital enrollment, in-person enrollment, or both", 

125 ) 

126 sign_out_button_template = models.TextField(default="", blank=True, help_text="Template that renders sign-out button") 

127 sign_out_link_template = models.TextField(default="", blank=True, help_text="Template that renders sign-out link") 

128 oauth_config = models.ForeignKey( 

129 IdentityGatewayConfig, 

130 on_delete=models.PROTECT, 

131 null=True, 

132 blank=True, 

133 help_text="The IdG connection details for this flow.", 

134 ) 

135 claims_request = models.ForeignKey( 

136 ClaimsVerificationRequest, 

137 on_delete=models.PROTECT, 

138 null=True, 

139 blank=True, 

140 help_text="The claims request details for this flow.", 

141 ) 

142 api_request = models.ForeignKey( 

143 EligibilityApiVerificationRequest, 

144 on_delete=models.PROTECT, 

145 null=True, 

146 blank=True, 

147 help_text="The Eligibility API request details for this flow.", 

148 ) 

149 supports_expiration = models.BooleanField( 

150 default=False, help_text="Indicates if the enrollment expires or does not expire" 

151 ) 

152 expiration_days = models.PositiveSmallIntegerField( 

153 null=True, blank=True, help_text="If the enrollment supports expiration, number of days before the eligibility expires" 

154 ) 

155 expiration_reenrollment_days = models.PositiveSmallIntegerField( 

156 null=True, 

157 blank=True, 

158 help_text="If the enrollment supports expiration, number of days preceding the expiration date during which a user can re-enroll in the eligibilty", # noqa: E501 

159 ) 

160 display_order = models.PositiveSmallIntegerField(default=0, blank=False, null=False) 

161 

162 class Meta: 

163 ordering = ["display_order"] 

164 

165 def __str__(self): 

166 agency_slug = self.transit_agency.slug if self.transit_agency else "no agency" 

167 return f"{self.label} ({agency_slug})" 

168 

169 @property 

170 def group_id(self): 

171 if hasattr(self, "enrollmentgroup"): 

172 enrollment_group = self.enrollmentgroup 

173 

174 # these are the class names for models in enrollment_littlepay and enrollment_switchio 

175 if hasattr(enrollment_group, "littlepaygroup"): 

176 return str(enrollment_group.littlepaygroup.group_id) 

177 elif hasattr(enrollment_group, "switchiogroup"): 177 ↛ 180line 177 didn't jump to line 180 because the condition on line 177 was always true

178 return enrollment_group.switchiogroup.group_id 

179 else: 

180 return None 

181 else: 

182 return None 

183 

184 @property 

185 def agency_card_name(self): 

186 if self.uses_api_verification: 

187 return f"{self.transit_agency.slug}-agency-card" 

188 else: 

189 return "" 

190 

191 @property 

192 def eligibility_api_auth_key(self): 

193 if self.uses_api_verification: 193 ↛ 196line 193 didn't jump to line 196 because the condition on line 193 was always true

194 return self.api_request.api_auth_key 

195 else: 

196 return None 

197 

198 @property 

199 def eligibility_api_public_key_data(self): 

200 """This flow's Eligibility API public key as a string.""" 

201 if self.uses_api_verification: 201 ↛ 204line 201 didn't jump to line 204 because the condition on line 201 was always true

202 return self.api_request.api_public_key_data 

203 else: 

204 return None 

205 

206 @property 

207 def selection_label_template(self): 

208 prefix = "eligibility/includes/selection-label" 

209 if self.uses_api_verification: 

210 return f"{prefix}--{self.agency_card_name}.html" 

211 else: 

212 return f"{prefix}--{self.system_name}.html" 

213 

214 @property 

215 def uses_claims_verification(self): 

216 """True if this flow verifies via the Identity Gateway and has a scope and claim. False otherwise.""" 

217 return ( 

218 self.oauth_config is not None and bool(self.claims_request.scopes) and bool(self.claims_request.eligibility_claim) 

219 ) 

220 

221 @property 

222 def uses_api_verification(self): 

223 """True if this flow verifies via the Eligibility API. False otherwise.""" 

224 return self.api_request is not None 

225 

226 @property 

227 def claims_scheme(self): 

228 if self.uses_claims_verification: 228 ↛ 231line 228 didn't jump to line 231 because the condition on line 228 was always true

229 return self.claims_request.scheme or self.oauth_config.scheme 

230 else: 

231 return None 

232 

233 @property 

234 def eligibility_verifier(self): 

235 """A str representing the entity that verifies eligibility for this flow. 

236 

237 Either the client name of the flow's claims provider, or the URL to the eligibility API. 

238 """ 

239 if self.uses_claims_verification: 

240 return self.oauth_config.client_name 

241 elif self.uses_api_verification: 

242 return self.api_request.api_url 

243 else: 

244 return "undefined" 

245 

246 @property 

247 def in_person_eligibility_context(self): 

248 return in_person_context.eligibility_index[self.system_name].dict() 

249 

250 @property 

251 def supports_sign_out(self): 

252 return bool(self.sign_out_button_template) or bool(self.sign_out_link_template) 

253 

254 def clean(self): 

255 errors = [] 

256 

257 if self.transit_agency: 

258 templates = [ 

259 self.selection_label_template, 

260 ] 

261 

262 # since templates are calculated from the pattern or the override field 

263 # we can't add a field-level validation error 

264 # so just create directly for a missing template 

265 for t in templates: 

266 if not template_path(t): 

267 errors.append(ValidationError(f"Template not found: {t}")) 

268 

269 if EnrollmentMethods.IN_PERSON in self.supported_enrollment_methods: 269 ↛ 280line 269 didn't jump to line 280 because the condition on line 269 was always true

270 try: 

271 in_person_eligibility_context = self.in_person_eligibility_context 

272 except KeyError: 

273 in_person_eligibility_context = None 

274 

275 if not in_person_eligibility_context: 

276 errors.append( 

277 ValidationError(f"{self.system_name} not configured for In-person. Please uncheck to continue.") 

278 ) 

279 

280 if self.transit_agency.active and self.group_id is None: 280 ↛ 285line 280 didn't jump to line 285 because the condition on line 280 was always true

281 errors.append( 

282 ValidationError(f"{self.system_name} needs either a LittlepayGroup or SwitchioGroup linked to it.") 

283 ) 

284 

285 if errors: 

286 raise ValidationError(errors) 

287 

288 @staticmethod 

289 def by_id(id): 

290 """Get an EnrollmentFlow instance by its ID.""" 

291 logger.debug(f"Get {EnrollmentFlow.__name__} by id: {id}") 

292 return EnrollmentFlow.objects.get(id=id) 

293 

294 

295class EnrollmentGroup(models.Model): 

296 id = models.AutoField(primary_key=True) 

297 enrollment_flow = models.OneToOneField( 

298 EnrollmentFlow, 

299 on_delete=models.PROTECT, 

300 help_text="The enrollment flow that this group is for.", 

301 ) 

302 

303 def __str__(self): 

304 return str(self.enrollment_flow) 

305 

306 

307class EnrollmentEvent(models.Model): 

308 """A record of a successful enrollment.""" 

309 

310 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

311 transit_agency = models.ForeignKey(TransitAgency, on_delete=models.PROTECT) 

312 enrollment_flow = models.ForeignKey(EnrollmentFlow, on_delete=models.PROTECT) 

313 enrollment_method = models.TextField( 

314 choices={ 

315 EnrollmentMethods.DIGITAL: EnrollmentMethods.DIGITAL, 

316 EnrollmentMethods.IN_PERSON: EnrollmentMethods.IN_PERSON, 

317 } 

318 ) 

319 verified_by = models.TextField() 

320 enrollment_datetime = models.DateTimeField(default=timezone.now) 

321 expiration_datetime = models.DateTimeField(blank=True, null=True) 

322 extra_claims = models.TextField(blank=True, default="") 

323 

324 def __str__(self): 

325 dt = timezone.localtime(self.enrollment_datetime) 

326 ts = dt.strftime("%b %d, %Y, %I:%M %p") 

327 return f"{ts}, {self.transit_agency}, {self.enrollment_flow}"