Coverage for benefits / enrollment_switchio / enrollment.py: 98%
125 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 19:35 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 19:35 +0000
1from dataclasses import dataclass
2from datetime import datetime
4from django.conf import settings
5from django.http import HttpRequest
6from django.urls import reverse
7from requests import HTTPError
9from benefits.core import session
10from benefits.core.models.enrollment import EnrollmentFlow
11from benefits.enrollment.enrollment import Status, _calculate_expiry, _is_expired, _is_within_reenrollment_window
12from benefits.enrollment_switchio.api import (
13 EnrollmentClient,
14 EshopResponseMode,
15 Registration,
16 RegistrationMode,
17 RegistrationStatus,
18 TokenizationClient,
19)
20from benefits.enrollment_switchio.models import SwitchioConfig
21from benefits.routes import routes
24@dataclass
25class RegistrationResponse:
26 status: Status
27 registration: Registration
28 exception: Exception = None
29 status_code: int = None
32@dataclass
33class RegistrationStatusResponse:
34 status: Status
35 registration_status: RegistrationStatus
36 exception: Exception = None
37 status_code: int = None
40@dataclass
41class Token:
42 token: str
43 tokenVersion: int
44 tokenState: str
45 validFrom: datetime
46 validTo: datetime
47 testOnly: bool
48 par: str = None
51def request_registration(
52 request, switchio_config: SwitchioConfig, redirect_route: str = routes.ENROLLMENT_SWITCHIO_INDEX
53) -> RegistrationResponse:
54 try:
55 client = TokenizationClient(
56 api_url=switchio_config.tokenization_api_base_url,
57 api_key=switchio_config.tokenization_api_key,
58 api_secret=switchio_config.tokenization_api_secret,
59 private_key=switchio_config.private_key_data,
60 client_certificate=switchio_config.client_certificate_data,
61 ca_certificate=switchio_config.ca_certificate_data,
62 )
64 route = reverse(redirect_route)
65 redirect_url = _generate_redirect_uri(request, route)
67 registration = client.request_registration(
68 eshopRedirectUrl=redirect_url,
69 mode=RegistrationMode.REGISTER,
70 eshopResponseMode=EshopResponseMode.QUERY,
71 timeout=settings.REQUESTS_TIMEOUT,
72 )
74 return RegistrationResponse(status=Status.SUCCESS, registration=registration)
75 except Exception as e:
76 exception = e
78 if isinstance(e, HTTPError):
79 status_code = e.response.status_code
81 if status_code >= 500:
82 status = Status.SYSTEM_ERROR
83 else:
84 status = Status.EXCEPTION
85 else:
86 status_code = None
87 status = Status.EXCEPTION
89 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code)
92# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50
93def _generate_redirect_uri(request: HttpRequest, redirect_path: str):
94 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower()
96 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed
97 # see https://github.com/cal-itp/benefits/issues/442 for more context
98 if not redirect_uri.startswith("http://localhost"): 98 ↛ 101line 98 didn't jump to line 101 because the condition on line 98 was always true
99 redirect_uri = redirect_uri.replace("http://", "https://")
101 return redirect_uri
104def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse:
105 try:
106 client = TokenizationClient(
107 api_url=switchio_config.tokenization_api_base_url,
108 api_key=switchio_config.tokenization_api_key,
109 api_secret=switchio_config.tokenization_api_secret,
110 private_key=switchio_config.private_key_data,
111 client_certificate=switchio_config.client_certificate_data,
112 ca_certificate=switchio_config.ca_certificate_data,
113 )
115 registration_status = client.get_registration_status(
116 registration_id=registration_id,
117 timeout=settings.REQUESTS_TIMEOUT,
118 )
120 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None)
121 except Exception as e:
122 exception = e
124 if isinstance(e, HTTPError):
125 status_code = e.response.status_code
127 if status_code >= 500:
128 status = Status.SYSTEM_ERROR
129 else:
130 status = Status.EXCEPTION
131 else:
132 status_code = None
133 status = Status.EXCEPTION
135 return RegistrationStatusResponse(
136 status=status, registration_status=None, exception=exception, status_code=status_code
137 )
140def get_latest_active_token_value(tokens):
141 latest_active_token = None
143 for token_dict in tokens:
144 token = Token(**token_dict)
145 if token.tokenState == "active":
146 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 146 ↛ 143line 146 didn't jump to line 143 because the condition on line 146 was always true
147 latest_active_token = token
149 return latest_active_token.token if latest_active_token else ""
152def enroll(request, switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]:
153 client = EnrollmentClient(
154 api_url=switchio_config.enrollment_api_base_url,
155 authorization_header_value=switchio_config.enrollment_api_authorization_header,
156 private_key=switchio_config.private_key_data,
157 client_certificate=switchio_config.client_certificate_data,
158 ca_certificate=switchio_config.ca_certificate_data,
159 )
161 pto_id = switchio_config.pto_id
162 group_id = flow.group_id
164 exception = None
165 try:
166 group = _get_group_for_token(client, pto_id, group_id, token)
167 already_enrolled = group is not None
169 if flow.supports_expiration:
170 # set expiry on session
171 if already_enrolled and group.expiresAt is not None:
172 session.update(request, enrollment_expiry=group.expiresAt)
173 else:
174 session.update(request, enrollment_expiry=_calculate_expiry(flow.expiration_days))
176 if not already_enrolled:
177 # enroll user with an expiration date, return success
178 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
179 status = Status.SUCCESS
180 else: # already_enrolled
181 if group.expiresAt is None:
182 # update expiration of existing enrollment, return success
183 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
184 status = Status.SUCCESS
185 else:
186 is_expired = _is_expired(group.expiresAt)
187 is_within_reenrollment_window = _is_within_reenrollment_window(
188 group.expiresAt, session.enrollment_reenrollment(request)
189 )
191 if is_expired or is_within_reenrollment_window:
192 # update expiration of existing enrollment, return success
193 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
194 status = Status.SUCCESS
195 else:
196 # re-enrollment error, return enrollment error with expiration and reenrollment_date
197 status = Status.REENROLLMENT_ERROR
198 else: # flow does not support expiration
199 if not already_enrolled:
200 # enroll user with no expiration date, return success
201 client.add_group_to_token(
202 pto_id=pto_id,
203 group_id=group_id,
204 token=token,
205 timeout=settings.REQUESTS_TIMEOUT,
206 )
207 status = Status.SUCCESS
208 else: # already enrolled
209 if group.expiresAt is None:
210 # no action, return success
211 status = Status.SUCCESS
212 else:
213 # remove expiration date, return success
214 # (when you don't include an expiration date, Switchio will set the expiration date to null.)
215 client.add_group_to_token(
216 pto_id=pto_id,
217 group_id=group_id,
218 token=token,
219 timeout=settings.REQUESTS_TIMEOUT,
220 )
221 status = Status.SUCCESS
222 except HTTPError as e:
223 if e.response.status_code >= 500:
224 status = Status.SYSTEM_ERROR
225 exception = e
226 else:
227 status = Status.EXCEPTION
228 exception = Exception(f"{e}: {e.response.json()}")
229 except Exception as e:
230 status = Status.EXCEPTION
231 exception = e
233 return status, exception
236def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token):
237 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT)
239 for group in already_enrolled_groups:
240 if group.group == group_id: 240 ↛ 239line 240 didn't jump to line 239 because the condition on line 240 was always true
241 return group
243 return None