Coverage for benefits/enrollment_switchio/enrollment.py: 97%
109 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-08 16:26 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-08 16:26 +0000
1from dataclasses import dataclass
2from datetime import datetime
3from django.conf import settings
4from django.http import HttpRequest
5from django.urls import reverse
6from requests import HTTPError
8from benefits.core.models.enrollment import EnrollmentFlow
9from benefits.enrollment.enrollment import Status
10from benefits.enrollment_switchio.models import SwitchioConfig
11from benefits.routes import routes
12from benefits.enrollment_switchio.api import (
13 EnrollmentClient,
14 TokenizationClient,
15 EshopResponseMode,
16 Registration,
17 RegistrationMode,
18 RegistrationStatus,
19)
22@dataclass
23class RegistrationResponse:
24 status: Status
25 registration: Registration
26 exception: Exception = None
27 status_code: int = None
30@dataclass
31class RegistrationStatusResponse:
32 status: Status
33 registration_status: RegistrationStatus
34 exception: Exception = None
35 status_code: int = None
38@dataclass
39class Token:
40 token: str
41 tokenVersion: int
42 tokenState: str
43 validFrom: datetime
44 validTo: datetime
45 testOnly: bool
46 par: str = None
49def request_registration(request, switchio_config: SwitchioConfig) -> RegistrationResponse:
50 try:
51 client = TokenizationClient(
52 api_url=switchio_config.tokenization_api_base_url,
53 api_key=switchio_config.tokenization_api_key,
54 api_secret=switchio_config.tokenization_api_secret,
55 private_key=switchio_config.private_key_data,
56 client_certificate=switchio_config.client_certificate_data,
57 ca_certificate=switchio_config.ca_certificate_data,
58 )
60 route = reverse(routes.ENROLLMENT_SWITCHIO_INDEX)
61 redirect_url = _generate_redirect_uri(request, route)
63 registration = client.request_registration(
64 eshopRedirectUrl=redirect_url,
65 mode=RegistrationMode.REGISTER,
66 eshopResponseMode=EshopResponseMode.QUERY,
67 timeout=settings.REQUESTS_TIMEOUT,
68 )
70 return RegistrationResponse(status=Status.SUCCESS, registration=registration)
71 except Exception as e:
72 exception = e
74 if isinstance(e, HTTPError):
75 status_code = e.response.status_code
77 if status_code >= 500:
78 status = Status.SYSTEM_ERROR
79 else:
80 status = Status.EXCEPTION
81 else:
82 status_code = None
83 status = Status.EXCEPTION
85 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code)
88# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50
89def _generate_redirect_uri(request: HttpRequest, redirect_path: str):
90 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower()
92 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed
93 # see https://github.com/cal-itp/benefits/issues/442 for more context
94 if not redirect_uri.startswith("http://localhost"): 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true
95 redirect_uri = redirect_uri.replace("http://", "https://")
97 return redirect_uri
100def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse:
101 try:
102 client = TokenizationClient(
103 api_url=switchio_config.tokenization_api_base_url,
104 api_key=switchio_config.tokenization_api_key,
105 api_secret=switchio_config.tokenization_api_secret,
106 private_key=switchio_config.private_key_data,
107 client_certificate=switchio_config.client_certificate_data,
108 ca_certificate=switchio_config.ca_certificate_data,
109 )
111 registration_status = client.get_registration_status(
112 registration_id=registration_id,
113 timeout=settings.REQUESTS_TIMEOUT,
114 )
116 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None)
117 except Exception as e:
118 exception = e
120 if isinstance(e, HTTPError):
121 status_code = e.response.status_code
123 if status_code >= 500:
124 status = Status.SYSTEM_ERROR
125 else:
126 status = Status.EXCEPTION
127 else:
128 status_code = None
129 status = Status.EXCEPTION
131 return RegistrationStatusResponse(
132 status=status, registration_status=None, exception=exception, status_code=status_code
133 )
136def get_latest_active_token_value(tokens):
137 latest_active_token = None
139 for token_dict in tokens:
140 token = Token(**token_dict)
141 if token.tokenState == "active":
142 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 142 ↛ 139line 142 didn't jump to line 139 because the condition on line 142 was always true
143 latest_active_token = token
145 return latest_active_token.token if latest_active_token else ""
148def enroll(switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]:
149 client = EnrollmentClient(
150 api_url=switchio_config.enrollment_api_base_url,
151 authorization_header_value=switchio_config.enrollment_api_authorization_header,
152 private_key=switchio_config.private_key_data,
153 client_certificate=switchio_config.client_certificate_data,
154 ca_certificate=switchio_config.ca_certificate_data,
155 )
157 pto_id = switchio_config.pto_id
158 group_id = flow.group_id
160 exception = None
161 try:
162 group = _get_group_for_token(client, pto_id, group_id, token)
163 already_enrolled = group is not None
165 if not flow.supports_expiration: 165 ↛ 200line 165 didn't jump to line 200 because the condition on line 165 was always true
166 if not already_enrolled:
167 # enroll user with no expiration date, return success
168 client.add_group_to_token(
169 pto_id=pto_id,
170 group_id=group_id,
171 token=token,
172 timeout=settings.REQUESTS_TIMEOUT,
173 )
174 status = Status.SUCCESS
175 else: # already enrolled
176 if group.expiresAt is None:
177 # no action, return success
178 status = Status.SUCCESS
179 else:
180 # remove expiration date, return success
181 # (when you don't include an expiration date, Switchio will set the expiration date to null.)
182 client.add_group_to_token(
183 pto_id=pto_id,
184 group_id=group_id,
185 token=token,
186 timeout=settings.REQUESTS_TIMEOUT,
187 )
188 status = Status.SUCCESS
189 except HTTPError as e:
190 if e.response.status_code >= 500:
191 status = Status.SYSTEM_ERROR
192 exception = e
193 else:
194 status = Status.EXCEPTION
195 exception = Exception(f"{e}: {e.response.json()}")
196 except Exception as e:
197 status = Status.EXCEPTION
198 exception = e
200 return status, exception
203def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token):
204 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT)
206 for group in already_enrolled_groups:
207 if group.group == group_id: 207 ↛ 206line 207 didn't jump to line 206 because the condition on line 207 was always true
208 return group
210 return None