Coverage for benefits/enrollment_switchio/enrollment.py: 98%
125 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-10 16:52 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-10 16:52 +0000
1from dataclasses import dataclass
2from datetime import datetime
3from django.conf import settings
4from django.http import HttpRequest
5from django.urls import reverse
6from requests import HTTPError
8from benefits.core import session
9from benefits.core.models.enrollment import EnrollmentFlow
10from benefits.enrollment.enrollment import Status, _calculate_expiry, _is_expired, _is_within_reenrollment_window
11from benefits.enrollment_switchio.models import SwitchioConfig
12from benefits.routes import routes
13from benefits.enrollment_switchio.api import (
14 EnrollmentClient,
15 TokenizationClient,
16 EshopResponseMode,
17 Registration,
18 RegistrationMode,
19 RegistrationStatus,
20)
23@dataclass
24class RegistrationResponse:
25 status: Status
26 registration: Registration
27 exception: Exception = None
28 status_code: int = None
31@dataclass
32class RegistrationStatusResponse:
33 status: Status
34 registration_status: RegistrationStatus
35 exception: Exception = None
36 status_code: int = None
39@dataclass
40class Token:
41 token: str
42 tokenVersion: int
43 tokenState: str
44 validFrom: datetime
45 validTo: datetime
46 testOnly: bool
47 par: str = None
50def request_registration(
51 request, switchio_config: SwitchioConfig, redirect_route: str = routes.ENROLLMENT_SWITCHIO_INDEX
52) -> RegistrationResponse:
53 try:
54 client = TokenizationClient(
55 api_url=switchio_config.tokenization_api_base_url,
56 api_key=switchio_config.tokenization_api_key,
57 api_secret=switchio_config.tokenization_api_secret,
58 private_key=switchio_config.private_key_data,
59 client_certificate=switchio_config.client_certificate_data,
60 ca_certificate=switchio_config.ca_certificate_data,
61 )
63 route = reverse(redirect_route)
64 redirect_url = _generate_redirect_uri(request, route)
66 registration = client.request_registration(
67 eshopRedirectUrl=redirect_url,
68 mode=RegistrationMode.REGISTER,
69 eshopResponseMode=EshopResponseMode.QUERY,
70 timeout=settings.REQUESTS_TIMEOUT,
71 )
73 return RegistrationResponse(status=Status.SUCCESS, registration=registration)
74 except Exception as e:
75 exception = e
77 if isinstance(e, HTTPError):
78 status_code = e.response.status_code
80 if status_code >= 500:
81 status = Status.SYSTEM_ERROR
82 else:
83 status = Status.EXCEPTION
84 else:
85 status_code = None
86 status = Status.EXCEPTION
88 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code)
91# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50
92def _generate_redirect_uri(request: HttpRequest, redirect_path: str):
93 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower()
95 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed
96 # see https://github.com/cal-itp/benefits/issues/442 for more context
97 if not redirect_uri.startswith("http://localhost"): 97 ↛ 100line 97 didn't jump to line 100 because the condition on line 97 was always true
98 redirect_uri = redirect_uri.replace("http://", "https://")
100 return redirect_uri
103def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse:
104 try:
105 client = TokenizationClient(
106 api_url=switchio_config.tokenization_api_base_url,
107 api_key=switchio_config.tokenization_api_key,
108 api_secret=switchio_config.tokenization_api_secret,
109 private_key=switchio_config.private_key_data,
110 client_certificate=switchio_config.client_certificate_data,
111 ca_certificate=switchio_config.ca_certificate_data,
112 )
114 registration_status = client.get_registration_status(
115 registration_id=registration_id,
116 timeout=settings.REQUESTS_TIMEOUT,
117 )
119 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None)
120 except Exception as e:
121 exception = e
123 if isinstance(e, HTTPError):
124 status_code = e.response.status_code
126 if status_code >= 500:
127 status = Status.SYSTEM_ERROR
128 else:
129 status = Status.EXCEPTION
130 else:
131 status_code = None
132 status = Status.EXCEPTION
134 return RegistrationStatusResponse(
135 status=status, registration_status=None, exception=exception, status_code=status_code
136 )
139def get_latest_active_token_value(tokens):
140 latest_active_token = None
142 for token_dict in tokens:
143 token = Token(**token_dict)
144 if token.tokenState == "active":
145 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 145 ↛ 142line 145 didn't jump to line 142 because the condition on line 145 was always true
146 latest_active_token = token
148 return latest_active_token.token if latest_active_token else ""
151def enroll(request, switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]:
152 client = EnrollmentClient(
153 api_url=switchio_config.enrollment_api_base_url,
154 authorization_header_value=switchio_config.enrollment_api_authorization_header,
155 private_key=switchio_config.private_key_data,
156 client_certificate=switchio_config.client_certificate_data,
157 ca_certificate=switchio_config.ca_certificate_data,
158 )
160 pto_id = switchio_config.pto_id
161 group_id = flow.group_id
163 exception = None
164 try:
165 group = _get_group_for_token(client, pto_id, group_id, token)
166 already_enrolled = group is not None
168 if flow.supports_expiration:
169 # set expiry on session
170 if already_enrolled and group.expiresAt is not None:
171 session.update(request, enrollment_expiry=group.expiresAt)
172 else:
173 session.update(request, enrollment_expiry=_calculate_expiry(flow.expiration_days))
175 if not already_enrolled:
176 # enroll user with an expiration date, return success
177 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
178 status = Status.SUCCESS
179 else: # already_enrolled
180 if group.expiresAt is None:
181 # update expiration of existing enrollment, return success
182 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
183 status = Status.SUCCESS
184 else:
185 is_expired = _is_expired(group.expiresAt)
186 is_within_reenrollment_window = _is_within_reenrollment_window(
187 group.expiresAt, session.enrollment_reenrollment(request)
188 )
190 if is_expired or is_within_reenrollment_window:
191 # update expiration of existing enrollment, return success
192 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request))
193 status = Status.SUCCESS
194 else:
195 # re-enrollment error, return enrollment error with expiration and reenrollment_date
196 status = Status.REENROLLMENT_ERROR
197 else: # flow does not support expiration
198 if not already_enrolled:
199 # enroll user with no expiration date, return success
200 client.add_group_to_token(
201 pto_id=pto_id,
202 group_id=group_id,
203 token=token,
204 timeout=settings.REQUESTS_TIMEOUT,
205 )
206 status = Status.SUCCESS
207 else: # already enrolled
208 if group.expiresAt is None:
209 # no action, return success
210 status = Status.SUCCESS
211 else:
212 # remove expiration date, return success
213 # (when you don't include an expiration date, Switchio will set the expiration date to null.)
214 client.add_group_to_token(
215 pto_id=pto_id,
216 group_id=group_id,
217 token=token,
218 timeout=settings.REQUESTS_TIMEOUT,
219 )
220 status = Status.SUCCESS
221 except HTTPError as e:
222 if e.response.status_code >= 500:
223 status = Status.SYSTEM_ERROR
224 exception = e
225 else:
226 status = Status.EXCEPTION
227 exception = Exception(f"{e}: {e.response.json()}")
228 except Exception as e:
229 status = Status.EXCEPTION
230 exception = e
232 return status, exception
235def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token):
236 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT)
238 for group in already_enrolled_groups:
239 if group.group == group_id: 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true
240 return group
242 return None