Coverage for benefits/enrollment_switchio/api.py: 89%

71 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 19:55 +0000

1from dataclasses import dataclass 

2from datetime import datetime 

3from enum import Enum 

4import hashlib 

5import hmac 

6import json 

7from tempfile import NamedTemporaryFile 

8import requests 

9 

10 

11@dataclass 

12class Registration: 

13 regId: str 

14 gtwUrl: str 

15 

16 

17class RegistrationMode(Enum): 

18 REGISTER = "register" 

19 IDENTIFY = "identify" 

20 

21 

22class EshopResponseMode(Enum): 

23 FRAGMENT = "fragment" 

24 QUERY = "query" 

25 FORM_POST = "form_post" 

26 POST_MESSAGE = "post_message" 

27 

28 

29@dataclass 

30class RegistrationStatus: 

31 regState: str 

32 created: datetime 

33 mode: str 

34 tokens: list[dict] 

35 eshopResponseMode: str 

36 identType: str = None 

37 maskCln: str = None 

38 cardExp: str = None 

39 

40 

41class Client: 

42 

43 def __init__( 

44 self, 

45 api_url, 

46 api_key, 

47 api_secret, 

48 private_key, 

49 client_certificate, 

50 ca_certificate, 

51 ): 

52 self.api_url = api_url 

53 self.api_key = api_key 

54 self.api_secret = api_secret 

55 self.private_key = private_key 

56 self.client_certificate = client_certificate 

57 self.ca_certificate = ca_certificate 

58 

59 def _signature_input_string(self, timestamp: str, method: str, request_path: str, body: str = None): 

60 if body is None: 

61 body = "" 

62 

63 return f"{timestamp}{method}{request_path}{body}" 

64 

65 def _stp_signature(self, timestamp: str, method: str, request_path, body: str = None): 

66 input_string = self._signature_input_string(timestamp, method, request_path, body) 

67 

68 # must encode inputs for hashing, according to https://stackoverflow.com/a/66958131 

69 byte_key = self.api_secret.encode("utf-8") 

70 message = input_string.encode("utf-8") 

71 stp_signature = hmac.new(byte_key, message, hashlib.sha256).hexdigest() 

72 

73 return stp_signature 

74 

75 def _get_headers(self, method, request_path, request_body: dict = None): 

76 timestamp = str(int(datetime.now().timestamp())) 

77 

78 return { 

79 "Content-Type": "application/json", 

80 "STP-APIKEY": self.api_key, 

81 "STP-TIMESTAMP": timestamp, 

82 "STP-SIGNATURE": self._stp_signature( 

83 timestamp=timestamp, 

84 method=method, 

85 request_path=request_path, 

86 body=json.dumps(request_body) if request_body else None, 

87 ), 

88 } 

89 

90 def request_registration( 

91 self, 

92 eshopRedirectUrl: str, 

93 mode: RegistrationMode, 

94 eshopResponseMode: EshopResponseMode, 

95 timeout=5, 

96 ) -> Registration: 

97 registration_path = "/api/v1/registration" 

98 request_body = { 

99 "eshopRedirectUrl": eshopRedirectUrl, 

100 "mode": mode.value, 

101 "eshopResponseMode": eshopResponseMode.value, 

102 } 

103 

104 response = self._cert_request( 

105 lambda verify, cert: requests.post( 

106 self.api_url.strip("/") + registration_path, 

107 json=request_body, 

108 headers=self._get_headers(method="POST", request_path=registration_path, request_body=request_body), 

109 cert=cert, 

110 verify=verify, 

111 timeout=timeout, 

112 ) 

113 ) 

114 

115 response.raise_for_status() 

116 

117 return Registration(**response.json()) 

118 

119 def get_registration_status(self, registration_id, timeout=5) -> RegistrationStatus: 

120 request_path = f"/api/v1/registration/{registration_id}" 

121 

122 response = self._cert_request( 

123 lambda verify, cert: requests.get( 

124 self.api_url.strip("/") + request_path, 

125 headers=self._get_headers(method="GET", request_path=request_path), 

126 cert=cert, 

127 verify=verify, 

128 timeout=timeout, 

129 ) 

130 ) 

131 

132 response.raise_for_status() 

133 

134 return RegistrationStatus(**response.json()) 

135 

136 # see https://github.com/cal-itp/benefits/issues/2848 for more context about this 

137 def _cert_request(self, request_func): 

138 """ 

139 Creates named (on-disk) temp files for client cert auth. 

140 * request_func: curried callable from `requests` library (e.g. `requests.get`). 

141 """ 

142 # requests library reads temp files from file path 

143 # The "with" context destroys temp files when response comes back 

144 with NamedTemporaryFile("w+") as cert, NamedTemporaryFile("w+") as key, NamedTemporaryFile("w+") as ca: 

145 # write client cert data to temp files 

146 # resetting so they can be read again by requests 

147 cert.write(self.client_certificate) 

148 cert.seek(0) 

149 

150 key.write(self.private_key) 

151 key.seek(0) 

152 

153 ca.write(self.ca_certificate) 

154 ca.seek(0) 

155 

156 # request using temp file paths 

157 return request_func(verify=ca.name, cert=(cert.name, key.name))