Coverage for eligibility_api / client.py: 97%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-23 21:00 +0000

1import logging 

2import requests 

3 

4from jwcrypto import common as jwcrypto 

5 

6from .tokens import RequestToken, ResponseToken, TokenError 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class ApiError(Exception): 

12 """Error calling the Eligibility Verification API.""" 

13 

14 pass 

15 

16 

17class Client: 

18 """Eligibility Verification API HTTP client.""" 

19 

20 def __init__( 

21 self, 

22 verify_url, 

23 issuer, 

24 agency, 

25 jws_signing_alg, 

26 client_private_key, 

27 jwe_encryption_alg, 

28 jwe_cek_enc, 

29 server_public_key, 

30 headers={}, 

31 timeout=5, 

32 ): 

33 self.verify_url = verify_url 

34 

35 self.issuer = issuer 

36 self.agency = agency 

37 self.jws_signing_alg = jws_signing_alg 

38 self.client_private_key = client_private_key 

39 self.jwe_encryption_alg = jwe_encryption_alg 

40 self.jwe_cek_enc = jwe_cek_enc 

41 self.server_public_key = server_public_key 

42 

43 if "authorization" in set(k.lower() for k in headers): 

44 raise ValueError('"Authorization" should not be set as an additional header.') 

45 

46 self.headers = headers 

47 self.timeout = timeout 

48 

49 def _tokenize_request(self, sub, name, types): 

50 """Create a request token.""" 

51 return RequestToken( 

52 types, 

53 self.agency, 

54 self.jws_signing_alg, 

55 self.client_private_key, 

56 self.jwe_encryption_alg, 

57 self.jwe_cek_enc, 

58 self.server_public_key, 

59 sub, 

60 name, 

61 self.issuer, 

62 ) 

63 

64 def _tokenize_response(self, response): 

65 """Parse a response token.""" 

66 return ResponseToken( 

67 response, 

68 self.jwe_encryption_alg, 

69 self.jwe_cek_enc, 

70 self.client_private_key, 

71 self.jws_signing_alg, 

72 self.server_public_key, 

73 ) 

74 

75 def _auth_headers(self, token): 

76 """Create headers for the request with the token and verifier API keys""" 

77 headers = dict(Authorization=f"Bearer {token}") 

78 

79 for key, value in self.headers.items(): 

80 headers[key] = value 

81 

82 return headers 

83 

84 def _request(self, sub, name, types): 

85 """Make an API request for eligibility verification.""" 

86 logger.debug("Start new eligibility verification request") 

87 

88 try: 

89 token = self._tokenize_request(sub, name, types) 

90 except jwcrypto.JWException: 

91 raise TokenError("Failed to tokenize form values") 

92 

93 try: 

94 logger.debug(f"GET request to {self.verify_url}") 

95 r = requests.get(self.verify_url, headers=self._auth_headers(token), timeout=self.timeout) 

96 except requests.ConnectionError: 

97 raise ApiError("Connection to verification server failed") 

98 except requests.Timeout: 

99 raise ApiError("Connection to verification server timed out") 

100 except requests.TooManyRedirects: 

101 raise ApiError("Too many redirects to verification server") 

102 except requests.HTTPError as e: 

103 raise ApiError(e) 

104 

105 expected_status_codes = {200, 400} 

106 if r.status_code in expected_status_codes: 

107 logger.debug("Process eligiblity verification response") 

108 return self._tokenize_response(r) 

109 else: 

110 logger.warning(f"Unexpected eligibility verification response status code: {r.status_code}") 

111 raise ApiError("Unexpected eligibility verification response") 

112 

113 def verify(self, sub, name, types): 

114 """Check eligibility for the subject and name.""" 

115 return self._request(sub, name, types)