Coverage for benefits / enrollment_switchio / enrollment.py: 98%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 19:35 +0000

1from dataclasses import dataclass 

2from datetime import datetime 

3 

4from django.conf import settings 

5from django.http import HttpRequest 

6from django.urls import reverse 

7from requests import HTTPError 

8 

9from benefits.core import session 

10from benefits.core.models.enrollment import EnrollmentFlow 

11from benefits.enrollment.enrollment import Status, _calculate_expiry, _is_expired, _is_within_reenrollment_window 

12from benefits.enrollment_switchio.api import ( 

13 EnrollmentClient, 

14 EshopResponseMode, 

15 Registration, 

16 RegistrationMode, 

17 RegistrationStatus, 

18 TokenizationClient, 

19) 

20from benefits.enrollment_switchio.models import SwitchioConfig 

21from benefits.routes import routes 

22 

23 

24@dataclass 

25class RegistrationResponse: 

26 status: Status 

27 registration: Registration 

28 exception: Exception = None 

29 status_code: int = None 

30 

31 

32@dataclass 

33class RegistrationStatusResponse: 

34 status: Status 

35 registration_status: RegistrationStatus 

36 exception: Exception = None 

37 status_code: int = None 

38 

39 

40@dataclass 

41class Token: 

42 token: str 

43 tokenVersion: int 

44 tokenState: str 

45 validFrom: datetime 

46 validTo: datetime 

47 testOnly: bool 

48 par: str = None 

49 

50 

51def request_registration( 

52 request, switchio_config: SwitchioConfig, redirect_route: str = routes.ENROLLMENT_SWITCHIO_INDEX 

53) -> RegistrationResponse: 

54 try: 

55 client = TokenizationClient( 

56 api_url=switchio_config.tokenization_api_base_url, 

57 api_key=switchio_config.tokenization_api_key, 

58 api_secret=switchio_config.tokenization_api_secret, 

59 private_key=switchio_config.private_key_data, 

60 client_certificate=switchio_config.client_certificate_data, 

61 ca_certificate=switchio_config.ca_certificate_data, 

62 ) 

63 

64 route = reverse(redirect_route) 

65 redirect_url = _generate_redirect_uri(request, route) 

66 

67 registration = client.request_registration( 

68 eshopRedirectUrl=redirect_url, 

69 mode=RegistrationMode.REGISTER, 

70 eshopResponseMode=EshopResponseMode.QUERY, 

71 timeout=settings.REQUESTS_TIMEOUT, 

72 ) 

73 

74 return RegistrationResponse(status=Status.SUCCESS, registration=registration) 

75 except Exception as e: 

76 exception = e 

77 

78 if isinstance(e, HTTPError): 

79 status_code = e.response.status_code 

80 

81 if status_code >= 500: 

82 status = Status.SYSTEM_ERROR 

83 else: 

84 status = Status.EXCEPTION 

85 else: 

86 status_code = None 

87 status = Status.EXCEPTION 

88 

89 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code) 

90 

91 

92# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50 

93def _generate_redirect_uri(request: HttpRequest, redirect_path: str): 

94 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower() 

95 

96 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed 

97 # see https://github.com/cal-itp/benefits/issues/442 for more context 

98 if not redirect_uri.startswith("http://localhost"): 98 ↛ 101line 98 didn't jump to line 101 because the condition on line 98 was always true

99 redirect_uri = redirect_uri.replace("http://", "https://") 

100 

101 return redirect_uri 

102 

103 

104def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse: 

105 try: 

106 client = TokenizationClient( 

107 api_url=switchio_config.tokenization_api_base_url, 

108 api_key=switchio_config.tokenization_api_key, 

109 api_secret=switchio_config.tokenization_api_secret, 

110 private_key=switchio_config.private_key_data, 

111 client_certificate=switchio_config.client_certificate_data, 

112 ca_certificate=switchio_config.ca_certificate_data, 

113 ) 

114 

115 registration_status = client.get_registration_status( 

116 registration_id=registration_id, 

117 timeout=settings.REQUESTS_TIMEOUT, 

118 ) 

119 

120 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None) 

121 except Exception as e: 

122 exception = e 

123 

124 if isinstance(e, HTTPError): 

125 status_code = e.response.status_code 

126 

127 if status_code >= 500: 

128 status = Status.SYSTEM_ERROR 

129 else: 

130 status = Status.EXCEPTION 

131 else: 

132 status_code = None 

133 status = Status.EXCEPTION 

134 

135 return RegistrationStatusResponse( 

136 status=status, registration_status=None, exception=exception, status_code=status_code 

137 ) 

138 

139 

140def get_latest_active_token_value(tokens): 

141 latest_active_token = None 

142 

143 for token_dict in tokens: 

144 token = Token(**token_dict) 

145 if token.tokenState == "active": 

146 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 146 ↛ 143line 146 didn't jump to line 143 because the condition on line 146 was always true

147 latest_active_token = token 

148 

149 return latest_active_token.token if latest_active_token else "" 

150 

151 

152def enroll(request, switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]: 

153 client = EnrollmentClient( 

154 api_url=switchio_config.enrollment_api_base_url, 

155 authorization_header_value=switchio_config.enrollment_api_authorization_header, 

156 private_key=switchio_config.private_key_data, 

157 client_certificate=switchio_config.client_certificate_data, 

158 ca_certificate=switchio_config.ca_certificate_data, 

159 ) 

160 

161 pto_id = switchio_config.pto_id 

162 group_id = flow.group_id 

163 

164 exception = None 

165 try: 

166 group = _get_group_for_token(client, pto_id, group_id, token) 

167 already_enrolled = group is not None 

168 

169 if flow.supports_expiration: 

170 # set expiry on session 

171 if already_enrolled and group.expiresAt is not None: 

172 session.update(request, enrollment_expiry=group.expiresAt) 

173 else: 

174 session.update(request, enrollment_expiry=_calculate_expiry(flow.expiration_days)) 

175 

176 if not already_enrolled: 

177 # enroll user with an expiration date, return success 

178 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

179 status = Status.SUCCESS 

180 else: # already_enrolled 

181 if group.expiresAt is None: 

182 # update expiration of existing enrollment, return success 

183 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

184 status = Status.SUCCESS 

185 else: 

186 is_expired = _is_expired(group.expiresAt) 

187 is_within_reenrollment_window = _is_within_reenrollment_window( 

188 group.expiresAt, session.enrollment_reenrollment(request) 

189 ) 

190 

191 if is_expired or is_within_reenrollment_window: 

192 # update expiration of existing enrollment, return success 

193 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

194 status = Status.SUCCESS 

195 else: 

196 # re-enrollment error, return enrollment error with expiration and reenrollment_date 

197 status = Status.REENROLLMENT_ERROR 

198 else: # flow does not support expiration 

199 if not already_enrolled: 

200 # enroll user with no expiration date, return success 

201 client.add_group_to_token( 

202 pto_id=pto_id, 

203 group_id=group_id, 

204 token=token, 

205 timeout=settings.REQUESTS_TIMEOUT, 

206 ) 

207 status = Status.SUCCESS 

208 else: # already enrolled 

209 if group.expiresAt is None: 

210 # no action, return success 

211 status = Status.SUCCESS 

212 else: 

213 # remove expiration date, return success 

214 # (when you don't include an expiration date, Switchio will set the expiration date to null.) 

215 client.add_group_to_token( 

216 pto_id=pto_id, 

217 group_id=group_id, 

218 token=token, 

219 timeout=settings.REQUESTS_TIMEOUT, 

220 ) 

221 status = Status.SUCCESS 

222 except HTTPError as e: 

223 if e.response.status_code >= 500: 

224 status = Status.SYSTEM_ERROR 

225 exception = e 

226 else: 

227 status = Status.EXCEPTION 

228 exception = Exception(f"{e}: {e.response.json()}") 

229 except Exception as e: 

230 status = Status.EXCEPTION 

231 exception = e 

232 

233 return status, exception 

234 

235 

236def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token): 

237 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT) 

238 

239 for group in already_enrolled_groups: 

240 if group.group == group_id: 240 ↛ 239line 240 didn't jump to line 239 because the condition on line 240 was always true

241 return group 

242 

243 return None