Coverage for benefits/enrollment_switchio/api.py: 89%
71 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 19:55 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-03 19:55 +0000
1from dataclasses import dataclass
2from datetime import datetime
3from enum import Enum
4import hashlib
5import hmac
6import json
7from tempfile import NamedTemporaryFile
8import requests
11@dataclass
12class Registration:
13 regId: str
14 gtwUrl: str
17class RegistrationMode(Enum):
18 REGISTER = "register"
19 IDENTIFY = "identify"
22class EshopResponseMode(Enum):
23 FRAGMENT = "fragment"
24 QUERY = "query"
25 FORM_POST = "form_post"
26 POST_MESSAGE = "post_message"
29@dataclass
30class RegistrationStatus:
31 regState: str
32 created: datetime
33 mode: str
34 tokens: list[dict]
35 eshopResponseMode: str
36 identType: str = None
37 maskCln: str = None
38 cardExp: str = None
41class Client:
43 def __init__(
44 self,
45 api_url,
46 api_key,
47 api_secret,
48 private_key,
49 client_certificate,
50 ca_certificate,
51 ):
52 self.api_url = api_url
53 self.api_key = api_key
54 self.api_secret = api_secret
55 self.private_key = private_key
56 self.client_certificate = client_certificate
57 self.ca_certificate = ca_certificate
59 def _signature_input_string(self, timestamp: str, method: str, request_path: str, body: str = None):
60 if body is None:
61 body = ""
63 return f"{timestamp}{method}{request_path}{body}"
65 def _stp_signature(self, timestamp: str, method: str, request_path, body: str = None):
66 input_string = self._signature_input_string(timestamp, method, request_path, body)
68 # must encode inputs for hashing, according to https://stackoverflow.com/a/66958131
69 byte_key = self.api_secret.encode("utf-8")
70 message = input_string.encode("utf-8")
71 stp_signature = hmac.new(byte_key, message, hashlib.sha256).hexdigest()
73 return stp_signature
75 def _get_headers(self, method, request_path, request_body: dict = None):
76 timestamp = str(int(datetime.now().timestamp()))
78 return {
79 "Content-Type": "application/json",
80 "STP-APIKEY": self.api_key,
81 "STP-TIMESTAMP": timestamp,
82 "STP-SIGNATURE": self._stp_signature(
83 timestamp=timestamp,
84 method=method,
85 request_path=request_path,
86 body=json.dumps(request_body) if request_body else None,
87 ),
88 }
90 def request_registration(
91 self,
92 eshopRedirectUrl: str,
93 mode: RegistrationMode,
94 eshopResponseMode: EshopResponseMode,
95 timeout=5,
96 ) -> Registration:
97 registration_path = "/api/v1/registration"
98 request_body = {
99 "eshopRedirectUrl": eshopRedirectUrl,
100 "mode": mode.value,
101 "eshopResponseMode": eshopResponseMode.value,
102 }
104 response = self._cert_request(
105 lambda verify, cert: requests.post(
106 self.api_url.strip("/") + registration_path,
107 json=request_body,
108 headers=self._get_headers(method="POST", request_path=registration_path, request_body=request_body),
109 cert=cert,
110 verify=verify,
111 timeout=timeout,
112 )
113 )
115 response.raise_for_status()
117 return Registration(**response.json())
119 def get_registration_status(self, registration_id, timeout=5) -> RegistrationStatus:
120 request_path = f"/api/v1/registration/{registration_id}"
122 response = self._cert_request(
123 lambda verify, cert: requests.get(
124 self.api_url.strip("/") + request_path,
125 headers=self._get_headers(method="GET", request_path=request_path),
126 cert=cert,
127 verify=verify,
128 timeout=timeout,
129 )
130 )
132 response.raise_for_status()
134 return RegistrationStatus(**response.json())
136 # see https://github.com/cal-itp/benefits/issues/2848 for more context about this
137 def _cert_request(self, request_func):
138 """
139 Creates named (on-disk) temp files for client cert auth.
140 * request_func: curried callable from `requests` library (e.g. `requests.get`).
141 """
142 # requests library reads temp files from file path
143 # The "with" context destroys temp files when response comes back
144 with NamedTemporaryFile("w+") as cert, NamedTemporaryFile("w+") as key, NamedTemporaryFile("w+") as ca:
145 # write client cert data to temp files
146 # resetting so they can be read again by requests
147 cert.write(self.client_certificate)
148 cert.seek(0)
150 key.write(self.private_key)
151 key.seek(0)
153 ca.write(self.ca_certificate)
154 ca.seek(0)
156 # request using temp file paths
157 return request_func(verify=ca.name, cert=(cert.name, key.name))