Coverage for eligibility_api/client.py: 97%
55 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-26 22:46 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-26 22:46 +0000
1import logging
2import requests
4from jwcrypto import common as jwcrypto
6from .tokens import RequestToken, ResponseToken, TokenError
9logger = logging.getLogger(__name__)
12class ApiError(Exception):
13 """Error calling the Eligibility Verification API."""
15 pass
18class Client:
19 """Eligibility Verification API HTTP client."""
21 def __init__(
22 self,
23 verify_url,
24 issuer,
25 agency,
26 jws_signing_alg,
27 client_private_key,
28 jwe_encryption_alg,
29 jwe_cek_enc,
30 server_public_key,
31 headers={},
32 timeout=5,
33 ):
34 self.verify_url = verify_url
36 self.issuer = issuer
37 self.agency = agency
38 self.jws_signing_alg = jws_signing_alg
39 self.client_private_key = client_private_key
40 self.jwe_encryption_alg = jwe_encryption_alg
41 self.jwe_cek_enc = jwe_cek_enc
42 self.server_public_key = server_public_key
44 if "authorization" in set(k.lower() for k in headers):
45 raise ValueError('"Authorization" should not be set as an additional header.')
47 self.headers = headers
48 self.timeout = timeout
50 def _tokenize_request(self, sub, name, types):
51 """Create a request token."""
52 return RequestToken(
53 types,
54 self.agency,
55 self.jws_signing_alg,
56 self.client_private_key,
57 self.jwe_encryption_alg,
58 self.jwe_cek_enc,
59 self.server_public_key,
60 sub,
61 name,
62 self.issuer,
63 )
65 def _tokenize_response(self, response):
66 """Parse a response token."""
67 return ResponseToken(
68 response,
69 self.jwe_encryption_alg,
70 self.jwe_cek_enc,
71 self.client_private_key,
72 self.jws_signing_alg,
73 self.server_public_key,
74 )
76 def _auth_headers(self, token):
77 """Create headers for the request with the token and verifier API keys"""
78 headers = dict(Authorization=f"Bearer {token}")
80 for key, value in self.headers.items():
81 headers[key] = value
83 return headers
85 def _request(self, sub, name, types):
86 """Make an API request for eligibility verification."""
87 logger.debug("Start new eligibility verification request")
89 try:
90 token = self._tokenize_request(sub, name, types)
91 except jwcrypto.JWException:
92 raise TokenError("Failed to tokenize form values")
94 try:
95 logger.debug(f"GET request to {self.verify_url}")
96 r = requests.get(self.verify_url, headers=self._auth_headers(token), timeout=self.timeout)
97 except requests.ConnectionError:
98 raise ApiError("Connection to verification server failed")
99 except requests.Timeout:
100 raise ApiError("Connection to verification server timed out")
101 except requests.TooManyRedirects:
102 raise ApiError("Too many redirects to verification server")
103 except requests.HTTPError as e:
104 raise ApiError(e)
106 expected_status_codes = {200, 400}
107 if r.status_code in expected_status_codes:
108 logger.debug("Process eligiblity verification response")
109 return self._tokenize_response(r)
110 else:
111 logger.warning(f"Unexpected eligibility verification response status code: {r.status_code}")
112 raise ApiError("Unexpected eligibility verification response")
114 def verify(self, sub, name, types):
115 """Check eligibility for the subject and name."""
116 return self._request(sub, name, types)