Coverage for benefits/enrollment_switchio/enrollment.py: 98%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-10 16:52 +0000

1from dataclasses import dataclass 

2from datetime import datetime 

3from django.conf import settings 

4from django.http import HttpRequest 

5from django.urls import reverse 

6from requests import HTTPError 

7 

8from benefits.core import session 

9from benefits.core.models.enrollment import EnrollmentFlow 

10from benefits.enrollment.enrollment import Status, _calculate_expiry, _is_expired, _is_within_reenrollment_window 

11from benefits.enrollment_switchio.models import SwitchioConfig 

12from benefits.routes import routes 

13from benefits.enrollment_switchio.api import ( 

14 EnrollmentClient, 

15 TokenizationClient, 

16 EshopResponseMode, 

17 Registration, 

18 RegistrationMode, 

19 RegistrationStatus, 

20) 

21 

22 

23@dataclass 

24class RegistrationResponse: 

25 status: Status 

26 registration: Registration 

27 exception: Exception = None 

28 status_code: int = None 

29 

30 

31@dataclass 

32class RegistrationStatusResponse: 

33 status: Status 

34 registration_status: RegistrationStatus 

35 exception: Exception = None 

36 status_code: int = None 

37 

38 

39@dataclass 

40class Token: 

41 token: str 

42 tokenVersion: int 

43 tokenState: str 

44 validFrom: datetime 

45 validTo: datetime 

46 testOnly: bool 

47 par: str = None 

48 

49 

50def request_registration( 

51 request, switchio_config: SwitchioConfig, redirect_route: str = routes.ENROLLMENT_SWITCHIO_INDEX 

52) -> RegistrationResponse: 

53 try: 

54 client = TokenizationClient( 

55 api_url=switchio_config.tokenization_api_base_url, 

56 api_key=switchio_config.tokenization_api_key, 

57 api_secret=switchio_config.tokenization_api_secret, 

58 private_key=switchio_config.private_key_data, 

59 client_certificate=switchio_config.client_certificate_data, 

60 ca_certificate=switchio_config.ca_certificate_data, 

61 ) 

62 

63 route = reverse(redirect_route) 

64 redirect_url = _generate_redirect_uri(request, route) 

65 

66 registration = client.request_registration( 

67 eshopRedirectUrl=redirect_url, 

68 mode=RegistrationMode.REGISTER, 

69 eshopResponseMode=EshopResponseMode.QUERY, 

70 timeout=settings.REQUESTS_TIMEOUT, 

71 ) 

72 

73 return RegistrationResponse(status=Status.SUCCESS, registration=registration) 

74 except Exception as e: 

75 exception = e 

76 

77 if isinstance(e, HTTPError): 

78 status_code = e.response.status_code 

79 

80 if status_code >= 500: 

81 status = Status.SYSTEM_ERROR 

82 else: 

83 status = Status.EXCEPTION 

84 else: 

85 status_code = None 

86 status = Status.EXCEPTION 

87 

88 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code) 

89 

90 

91# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50 

92def _generate_redirect_uri(request: HttpRequest, redirect_path: str): 

93 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower() 

94 

95 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed 

96 # see https://github.com/cal-itp/benefits/issues/442 for more context 

97 if not redirect_uri.startswith("http://localhost"): 97 ↛ 100line 97 didn't jump to line 100 because the condition on line 97 was always true

98 redirect_uri = redirect_uri.replace("http://", "https://") 

99 

100 return redirect_uri 

101 

102 

103def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse: 

104 try: 

105 client = TokenizationClient( 

106 api_url=switchio_config.tokenization_api_base_url, 

107 api_key=switchio_config.tokenization_api_key, 

108 api_secret=switchio_config.tokenization_api_secret, 

109 private_key=switchio_config.private_key_data, 

110 client_certificate=switchio_config.client_certificate_data, 

111 ca_certificate=switchio_config.ca_certificate_data, 

112 ) 

113 

114 registration_status = client.get_registration_status( 

115 registration_id=registration_id, 

116 timeout=settings.REQUESTS_TIMEOUT, 

117 ) 

118 

119 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None) 

120 except Exception as e: 

121 exception = e 

122 

123 if isinstance(e, HTTPError): 

124 status_code = e.response.status_code 

125 

126 if status_code >= 500: 

127 status = Status.SYSTEM_ERROR 

128 else: 

129 status = Status.EXCEPTION 

130 else: 

131 status_code = None 

132 status = Status.EXCEPTION 

133 

134 return RegistrationStatusResponse( 

135 status=status, registration_status=None, exception=exception, status_code=status_code 

136 ) 

137 

138 

139def get_latest_active_token_value(tokens): 

140 latest_active_token = None 

141 

142 for token_dict in tokens: 

143 token = Token(**token_dict) 

144 if token.tokenState == "active": 

145 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 145 ↛ 142line 145 didn't jump to line 142 because the condition on line 145 was always true

146 latest_active_token = token 

147 

148 return latest_active_token.token if latest_active_token else "" 

149 

150 

151def enroll(request, switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]: 

152 client = EnrollmentClient( 

153 api_url=switchio_config.enrollment_api_base_url, 

154 authorization_header_value=switchio_config.enrollment_api_authorization_header, 

155 private_key=switchio_config.private_key_data, 

156 client_certificate=switchio_config.client_certificate_data, 

157 ca_certificate=switchio_config.ca_certificate_data, 

158 ) 

159 

160 pto_id = switchio_config.pto_id 

161 group_id = flow.group_id 

162 

163 exception = None 

164 try: 

165 group = _get_group_for_token(client, pto_id, group_id, token) 

166 already_enrolled = group is not None 

167 

168 if flow.supports_expiration: 

169 # set expiry on session 

170 if already_enrolled and group.expiresAt is not None: 

171 session.update(request, enrollment_expiry=group.expiresAt) 

172 else: 

173 session.update(request, enrollment_expiry=_calculate_expiry(flow.expiration_days)) 

174 

175 if not already_enrolled: 

176 # enroll user with an expiration date, return success 

177 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

178 status = Status.SUCCESS 

179 else: # already_enrolled 

180 if group.expiresAt is None: 

181 # update expiration of existing enrollment, return success 

182 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

183 status = Status.SUCCESS 

184 else: 

185 is_expired = _is_expired(group.expiresAt) 

186 is_within_reenrollment_window = _is_within_reenrollment_window( 

187 group.expiresAt, session.enrollment_reenrollment(request) 

188 ) 

189 

190 if is_expired or is_within_reenrollment_window: 

191 # update expiration of existing enrollment, return success 

192 client.add_group_to_token(pto_id, group_id, token, expiry=session.enrollment_expiry(request)) 

193 status = Status.SUCCESS 

194 else: 

195 # re-enrollment error, return enrollment error with expiration and reenrollment_date 

196 status = Status.REENROLLMENT_ERROR 

197 else: # flow does not support expiration 

198 if not already_enrolled: 

199 # enroll user with no expiration date, return success 

200 client.add_group_to_token( 

201 pto_id=pto_id, 

202 group_id=group_id, 

203 token=token, 

204 timeout=settings.REQUESTS_TIMEOUT, 

205 ) 

206 status = Status.SUCCESS 

207 else: # already enrolled 

208 if group.expiresAt is None: 

209 # no action, return success 

210 status = Status.SUCCESS 

211 else: 

212 # remove expiration date, return success 

213 # (when you don't include an expiration date, Switchio will set the expiration date to null.) 

214 client.add_group_to_token( 

215 pto_id=pto_id, 

216 group_id=group_id, 

217 token=token, 

218 timeout=settings.REQUESTS_TIMEOUT, 

219 ) 

220 status = Status.SUCCESS 

221 except HTTPError as e: 

222 if e.response.status_code >= 500: 

223 status = Status.SYSTEM_ERROR 

224 exception = e 

225 else: 

226 status = Status.EXCEPTION 

227 exception = Exception(f"{e}: {e.response.json()}") 

228 except Exception as e: 

229 status = Status.EXCEPTION 

230 exception = e 

231 

232 return status, exception 

233 

234 

235def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token): 

236 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT) 

237 

238 for group in already_enrolled_groups: 

239 if group.group == group_id: 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true

240 return group 

241 

242 return None