Coverage for benefits / enrollment_switchio / enrollment.py: 98%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 15:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-01 15:39 +0000
1from dataclasses import dataclass
2from datetime import datetime
4from django.conf import settings
5from django.http import HttpRequest
6from django.urls import reverse
7from requests import HTTPError
9from benefits.core import session
10from benefits.core.models.enrollment import EnrollmentFlow
11from benefits.enrollment.enrollment import Status, _calculate_expiry, _is_expired, _is_within_reenrollment_window
12from benefits.enrollment_switchio.api import (
13 EnrollmentClient,
14 EshopResponseMode,
15 Registration,
16 RegistrationMode,
17 RegistrationStatus,
18 TokenizationClient,
19)
20from benefits.enrollment_switchio.models import SwitchioConfig, SwitchioGroup
21from benefits.routes import routes
24@dataclass
25class RegistrationResponse:
26 status: Status
27 registration: Registration
28 exception: Exception = None
29 status_code: int = None
32@dataclass
33class RegistrationStatusResponse:
34 status: Status
35 registration_status: RegistrationStatus
36 exception: Exception = None
37 status_code: int = None
40@dataclass
41class Token:
42 token: str
43 tokenVersion: int
44 tokenState: str
45 validFrom: datetime
46 validTo: datetime
47 testOnly: bool
48 par: str = None
51def request_registration(
52 request, switchio_config: SwitchioConfig, redirect_route: str = routes.ENROLLMENT_SWITCHIO_INDEX
53) -> RegistrationResponse:
54 try:
55 client = TokenizationClient(
56 api_url=switchio_config.tokenization_api_base_url,
57 api_key=switchio_config.tokenization_api_key,
58 api_secret=switchio_config.tokenization_api_secret,
59 private_key=switchio_config.private_key_data,
60 client_certificate=switchio_config.client_certificate_data,
61 ca_certificate=switchio_config.ca_certificate_data,
62 )
64 route = reverse(redirect_route)
65 redirect_url = _generate_redirect_uri(request, route)
67 registration = client.request_registration(
68 eshopRedirectUrl=redirect_url,
69 mode=RegistrationMode.REGISTER,
70 eshopResponseMode=EshopResponseMode.QUERY,
71 timeout=settings.REQUESTS_TIMEOUT,
72 )
74 return RegistrationResponse(status=Status.SUCCESS, registration=registration)
75 except Exception as e:
76 exception = e
78 if isinstance(e, HTTPError):
79 status_code = e.response.status_code
81 if status_code >= 500:
82 status = Status.SYSTEM_ERROR
83 else:
84 status = Status.EXCEPTION
85 else:
86 status_code = None
87 status = Status.EXCEPTION
89 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code)
92# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50
93def _generate_redirect_uri(request: HttpRequest, redirect_path: str):
94 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower()
96 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed
97 # see https://github.com/cal-itp/benefits/issues/442 for more context
98 if not redirect_uri.startswith("http://localhost"): 98 ↛ 101line 98 didn't jump to line 101 because the condition on line 98 was always true
99 redirect_uri = redirect_uri.replace("http://", "https://")
101 return redirect_uri
104def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse:
105 try:
106 client = TokenizationClient(
107 api_url=switchio_config.tokenization_api_base_url,
108 api_key=switchio_config.tokenization_api_key,
109 api_secret=switchio_config.tokenization_api_secret,
110 private_key=switchio_config.private_key_data,
111 client_certificate=switchio_config.client_certificate_data,
112 ca_certificate=switchio_config.ca_certificate_data,
113 )
115 registration_status = client.get_registration_status(
116 registration_id=registration_id,
117 timeout=settings.REQUESTS_TIMEOUT,
118 )
120 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None)
121 except Exception as e:
122 exception = e
124 if isinstance(e, HTTPError):
125 status_code = e.response.status_code
127 if status_code >= 500:
128 status = Status.SYSTEM_ERROR
129 else:
130 status = Status.EXCEPTION
131 else:
132 status_code = None
133 status = Status.EXCEPTION
135 return RegistrationStatusResponse(
136 status=status, registration_status=None, exception=exception, status_code=status_code
137 )
140def get_latest_active_token_value(tokens):
141 latest_active_token = None
143 for token_dict in tokens:
144 token = Token(**token_dict)
145 if token.tokenState == "active":
146 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 146 ↛ 143line 146 didn't jump to line 143 because the condition on line 146 was always true
147 latest_active_token = token
149 return latest_active_token.token if latest_active_token else ""
152def enroll(
153 request, switchio_config: SwitchioConfig, flow: EnrollmentFlow, group: SwitchioGroup, token: str
154) -> tuple[Status, Exception]:
155 client = EnrollmentClient(
156 api_url=switchio_config.enrollment_api_base_url,
157 authorization_header_value=switchio_config.enrollment_api_authorization_header,
158 private_key=switchio_config.private_key_data,
159 client_certificate=switchio_config.client_certificate_data,
160 ca_certificate=switchio_config.ca_certificate_data,
161 )
163 pto_id = switchio_config.pto_id
164 group_id = group.group_id
166 exception = None
167 try:
168 group = _get_group_for_token(client, pto_id, group_id, token)
169 already_enrolled = group is not None
171 if flow.supports_expiration:
172 # set expiry on session
173 if already_enrolled and group.expiresAt is not None:
174 session.update(request, enrollment_expiry=group.expiresAt)
175 else:
176 session.update(request, enrollment_expiry=_calculate_expiry(flow.expiration_days))
178 if not already_enrolled:
179 # enroll user with an expiration date, return success
180 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
181 status = Status.SUCCESS
182 else: # already_enrolled
183 if group.expiresAt is None:
184 # update expiration of existing enrollment, return success
185 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
186 status = Status.SUCCESS
187 else:
188 is_expired = _is_expired(group.expiresAt)
189 is_within_reenrollment_window = _is_within_reenrollment_window(
190 group.expiresAt, session.enrollment_reenrollment(request)
191 )
193 if is_expired or is_within_reenrollment_window:
194 # update expiration of existing enrollment, return success
195 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
196 status = Status.SUCCESS
197 else:
198 # re-enrollment error, return enrollment error with expiration and reenrollment_date
199 status = Status.REENROLLMENT_ERROR
200 else: # flow does not support expiration
201 if not already_enrolled:
202 # enroll user with no expiration date, return success
203 client.add_group_to_token(
204 pto_id=pto_id,
205 group_id=group_id,
206 token=token,
207 timeout=settings.REQUESTS_TIMEOUT,
208 )
209 status = Status.SUCCESS
210 else: # already enrolled
211 if group.expiresAt is None:
212 # no action, return success
213 status = Status.SUCCESS
214 else:
215 # remove expiration date, return success
216 # (when you don't include an expiration date, Switchio will set the expiration date to null.)
217 client.add_group_to_token(
218 pto_id=pto_id,
219 group_id=group_id,
220 token=token,
221 timeout=settings.REQUESTS_TIMEOUT,
222 )
223 status = Status.SUCCESS
224 except HTTPError as e:
225 if e.response.status_code >= 500:
226 status = Status.SYSTEM_ERROR
227 exception = e
228 else:
229 status = Status.EXCEPTION
230 exception = Exception(f"{e}: {e.response.text}")
231 except Exception as e:
232 status = Status.EXCEPTION
233 exception = e
235 return status, exception
238def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token):
239 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT)
241 for group in already_enrolled_groups:
242 if group.group == group_id: 242 ↛ 241line 242 didn't jump to line 241 because the condition on line 242 was always true
243 return group
245 return None