Coverage for eligibility_api / client.py: 97%
55 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-23 21:00 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-23 21:00 +0000
1import logging
2import requests
4from jwcrypto import common as jwcrypto
6from .tokens import RequestToken, ResponseToken, TokenError
8logger = logging.getLogger(__name__)
11class ApiError(Exception):
12 """Error calling the Eligibility Verification API."""
14 pass
17class Client:
18 """Eligibility Verification API HTTP client."""
20 def __init__(
21 self,
22 verify_url,
23 issuer,
24 agency,
25 jws_signing_alg,
26 client_private_key,
27 jwe_encryption_alg,
28 jwe_cek_enc,
29 server_public_key,
30 headers={},
31 timeout=5,
32 ):
33 self.verify_url = verify_url
35 self.issuer = issuer
36 self.agency = agency
37 self.jws_signing_alg = jws_signing_alg
38 self.client_private_key = client_private_key
39 self.jwe_encryption_alg = jwe_encryption_alg
40 self.jwe_cek_enc = jwe_cek_enc
41 self.server_public_key = server_public_key
43 if "authorization" in set(k.lower() for k in headers):
44 raise ValueError('"Authorization" should not be set as an additional header.')
46 self.headers = headers
47 self.timeout = timeout
49 def _tokenize_request(self, sub, name, types):
50 """Create a request token."""
51 return RequestToken(
52 types,
53 self.agency,
54 self.jws_signing_alg,
55 self.client_private_key,
56 self.jwe_encryption_alg,
57 self.jwe_cek_enc,
58 self.server_public_key,
59 sub,
60 name,
61 self.issuer,
62 )
64 def _tokenize_response(self, response):
65 """Parse a response token."""
66 return ResponseToken(
67 response,
68 self.jwe_encryption_alg,
69 self.jwe_cek_enc,
70 self.client_private_key,
71 self.jws_signing_alg,
72 self.server_public_key,
73 )
75 def _auth_headers(self, token):
76 """Create headers for the request with the token and verifier API keys"""
77 headers = dict(Authorization=f"Bearer {token}")
79 for key, value in self.headers.items():
80 headers[key] = value
82 return headers
84 def _request(self, sub, name, types):
85 """Make an API request for eligibility verification."""
86 logger.debug("Start new eligibility verification request")
88 try:
89 token = self._tokenize_request(sub, name, types)
90 except jwcrypto.JWException:
91 raise TokenError("Failed to tokenize form values")
93 try:
94 logger.debug(f"GET request to {self.verify_url}")
95 r = requests.get(self.verify_url, headers=self._auth_headers(token), timeout=self.timeout)
96 except requests.ConnectionError:
97 raise ApiError("Connection to verification server failed")
98 except requests.Timeout:
99 raise ApiError("Connection to verification server timed out")
100 except requests.TooManyRedirects:
101 raise ApiError("Too many redirects to verification server")
102 except requests.HTTPError as e:
103 raise ApiError(e)
105 expected_status_codes = {200, 400}
106 if r.status_code in expected_status_codes:
107 logger.debug("Process eligiblity verification response")
108 return self._tokenize_response(r)
109 else:
110 logger.warning(f"Unexpected eligibility verification response status code: {r.status_code}")
111 raise ApiError("Unexpected eligibility verification response")
113 def verify(self, sub, name, types):
114 """Check eligibility for the subject and name."""
115 return self._request(sub, name, types)