Coverage for benefits/enrollment_switchio/enrollment.py: 97%

109 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-08 16:26 +0000

1from dataclasses import dataclass 

2from datetime import datetime 

3from django.conf import settings 

4from django.http import HttpRequest 

5from django.urls import reverse 

6from requests import HTTPError 

7 

8from benefits.core.models.enrollment import EnrollmentFlow 

9from benefits.enrollment.enrollment import Status 

10from benefits.enrollment_switchio.models import SwitchioConfig 

11from benefits.routes import routes 

12from benefits.enrollment_switchio.api import ( 

13 EnrollmentClient, 

14 TokenizationClient, 

15 EshopResponseMode, 

16 Registration, 

17 RegistrationMode, 

18 RegistrationStatus, 

19) 

20 

21 

22@dataclass 

23class RegistrationResponse: 

24 status: Status 

25 registration: Registration 

26 exception: Exception = None 

27 status_code: int = None 

28 

29 

30@dataclass 

31class RegistrationStatusResponse: 

32 status: Status 

33 registration_status: RegistrationStatus 

34 exception: Exception = None 

35 status_code: int = None 

36 

37 

38@dataclass 

39class Token: 

40 token: str 

41 tokenVersion: int 

42 tokenState: str 

43 validFrom: datetime 

44 validTo: datetime 

45 testOnly: bool 

46 par: str = None 

47 

48 

49def request_registration(request, switchio_config: SwitchioConfig) -> RegistrationResponse: 

50 try: 

51 client = TokenizationClient( 

52 api_url=switchio_config.tokenization_api_base_url, 

53 api_key=switchio_config.tokenization_api_key, 

54 api_secret=switchio_config.tokenization_api_secret, 

55 private_key=switchio_config.private_key_data, 

56 client_certificate=switchio_config.client_certificate_data, 

57 ca_certificate=switchio_config.ca_certificate_data, 

58 ) 

59 

60 route = reverse(routes.ENROLLMENT_SWITCHIO_INDEX) 

61 redirect_url = _generate_redirect_uri(request, route) 

62 

63 registration = client.request_registration( 

64 eshopRedirectUrl=redirect_url, 

65 mode=RegistrationMode.REGISTER, 

66 eshopResponseMode=EshopResponseMode.QUERY, 

67 timeout=settings.REQUESTS_TIMEOUT, 

68 ) 

69 

70 return RegistrationResponse(status=Status.SUCCESS, registration=registration) 

71 except Exception as e: 

72 exception = e 

73 

74 if isinstance(e, HTTPError): 

75 status_code = e.response.status_code 

76 

77 if status_code >= 500: 

78 status = Status.SYSTEM_ERROR 

79 else: 

80 status = Status.EXCEPTION 

81 else: 

82 status_code = None 

83 status = Status.EXCEPTION 

84 

85 return RegistrationResponse(status=status, registration=None, exception=exception, status_code=status_code) 

86 

87 

88# copied from https://github.com/Office-of-Digital-Services/django-cdt-identity/blob/main/cdt_identity/views.py#L42-L50 

89def _generate_redirect_uri(request: HttpRequest, redirect_path: str): 

90 redirect_uri = str(request.build_absolute_uri(redirect_path)).lower() 

91 

92 # this is a temporary hack to ensure redirect URIs are HTTPS when the app is deployed 

93 # see https://github.com/cal-itp/benefits/issues/442 for more context 

94 if not redirect_uri.startswith("http://localhost"): 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true

95 redirect_uri = redirect_uri.replace("http://", "https://") 

96 

97 return redirect_uri 

98 

99 

100def get_registration_status(switchio_config: SwitchioConfig, registration_id: str) -> RegistrationStatusResponse: 

101 try: 

102 client = TokenizationClient( 

103 api_url=switchio_config.tokenization_api_base_url, 

104 api_key=switchio_config.tokenization_api_key, 

105 api_secret=switchio_config.tokenization_api_secret, 

106 private_key=switchio_config.private_key_data, 

107 client_certificate=switchio_config.client_certificate_data, 

108 ca_certificate=switchio_config.ca_certificate_data, 

109 ) 

110 

111 registration_status = client.get_registration_status( 

112 registration_id=registration_id, 

113 timeout=settings.REQUESTS_TIMEOUT, 

114 ) 

115 

116 return RegistrationStatusResponse(status=Status.SUCCESS, registration_status=registration_status, exception=None) 

117 except Exception as e: 

118 exception = e 

119 

120 if isinstance(e, HTTPError): 

121 status_code = e.response.status_code 

122 

123 if status_code >= 500: 

124 status = Status.SYSTEM_ERROR 

125 else: 

126 status = Status.EXCEPTION 

127 else: 

128 status_code = None 

129 status = Status.EXCEPTION 

130 

131 return RegistrationStatusResponse( 

132 status=status, registration_status=None, exception=exception, status_code=status_code 

133 ) 

134 

135 

136def get_latest_active_token_value(tokens): 

137 latest_active_token = None 

138 

139 for token_dict in tokens: 

140 token = Token(**token_dict) 

141 if token.tokenState == "active": 

142 if latest_active_token is None or token.validFrom > latest_active_token.validFrom: 142 ↛ 139line 142 didn't jump to line 139 because the condition on line 142 was always true

143 latest_active_token = token 

144 

145 return latest_active_token.token if latest_active_token else "" 

146 

147 

148def enroll(switchio_config: SwitchioConfig, flow: EnrollmentFlow, token: str) -> tuple[Status, Exception]: 

149 client = EnrollmentClient( 

150 api_url=switchio_config.enrollment_api_base_url, 

151 authorization_header_value=switchio_config.enrollment_api_authorization_header, 

152 private_key=switchio_config.private_key_data, 

153 client_certificate=switchio_config.client_certificate_data, 

154 ca_certificate=switchio_config.ca_certificate_data, 

155 ) 

156 

157 pto_id = switchio_config.pto_id 

158 group_id = flow.group_id 

159 

160 exception = None 

161 try: 

162 group = _get_group_for_token(client, pto_id, group_id, token) 

163 already_enrolled = group is not None 

164 

165 if not flow.supports_expiration: 165 ↛ 200line 165 didn't jump to line 200 because the condition on line 165 was always true

166 if not already_enrolled: 

167 # enroll user with no expiration date, return success 

168 client.add_group_to_token( 

169 pto_id=pto_id, 

170 group_id=group_id, 

171 token=token, 

172 timeout=settings.REQUESTS_TIMEOUT, 

173 ) 

174 status = Status.SUCCESS 

175 else: # already enrolled 

176 if group.expiresAt is None: 

177 # no action, return success 

178 status = Status.SUCCESS 

179 else: 

180 # remove expiration date, return success 

181 # (when you don't include an expiration date, Switchio will set the expiration date to null.) 

182 client.add_group_to_token( 

183 pto_id=pto_id, 

184 group_id=group_id, 

185 token=token, 

186 timeout=settings.REQUESTS_TIMEOUT, 

187 ) 

188 status = Status.SUCCESS 

189 except HTTPError as e: 

190 if e.response.status_code >= 500: 

191 status = Status.SYSTEM_ERROR 

192 exception = e 

193 else: 

194 status = Status.EXCEPTION 

195 exception = Exception(f"{e}: {e.response.json()}") 

196 except Exception as e: 

197 status = Status.EXCEPTION 

198 exception = e 

199 

200 return status, exception 

201 

202 

203def _get_group_for_token(client: EnrollmentClient, pto_id, group_id, token): 

204 already_enrolled_groups = client.get_groups_for_token(pto_id=pto_id, token=token, timeout=settings.REQUESTS_TIMEOUT) 

205 

206 for group in already_enrolled_groups: 

207 if group.group == group_id: 207 ↛ 206line 207 didn't jump to line 206 because the condition on line 207 was always true

208 return group 

209 

210 return None