Coverage for eligibility_api/client.py: 97%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-26 22:46 +0000

1import logging 

2import requests 

3 

4from jwcrypto import common as jwcrypto 

5 

6from .tokens import RequestToken, ResponseToken, TokenError 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class ApiError(Exception): 

13 """Error calling the Eligibility Verification API.""" 

14 

15 pass 

16 

17 

18class Client: 

19 """Eligibility Verification API HTTP client.""" 

20 

21 def __init__( 

22 self, 

23 verify_url, 

24 issuer, 

25 agency, 

26 jws_signing_alg, 

27 client_private_key, 

28 jwe_encryption_alg, 

29 jwe_cek_enc, 

30 server_public_key, 

31 headers={}, 

32 timeout=5, 

33 ): 

34 self.verify_url = verify_url 

35 

36 self.issuer = issuer 

37 self.agency = agency 

38 self.jws_signing_alg = jws_signing_alg 

39 self.client_private_key = client_private_key 

40 self.jwe_encryption_alg = jwe_encryption_alg 

41 self.jwe_cek_enc = jwe_cek_enc 

42 self.server_public_key = server_public_key 

43 

44 if "authorization" in set(k.lower() for k in headers): 

45 raise ValueError('"Authorization" should not be set as an additional header.') 

46 

47 self.headers = headers 

48 self.timeout = timeout 

49 

50 def _tokenize_request(self, sub, name, types): 

51 """Create a request token.""" 

52 return RequestToken( 

53 types, 

54 self.agency, 

55 self.jws_signing_alg, 

56 self.client_private_key, 

57 self.jwe_encryption_alg, 

58 self.jwe_cek_enc, 

59 self.server_public_key, 

60 sub, 

61 name, 

62 self.issuer, 

63 ) 

64 

65 def _tokenize_response(self, response): 

66 """Parse a response token.""" 

67 return ResponseToken( 

68 response, 

69 self.jwe_encryption_alg, 

70 self.jwe_cek_enc, 

71 self.client_private_key, 

72 self.jws_signing_alg, 

73 self.server_public_key, 

74 ) 

75 

76 def _auth_headers(self, token): 

77 """Create headers for the request with the token and verifier API keys""" 

78 headers = dict(Authorization=f"Bearer {token}") 

79 

80 for key, value in self.headers.items(): 

81 headers[key] = value 

82 

83 return headers 

84 

85 def _request(self, sub, name, types): 

86 """Make an API request for eligibility verification.""" 

87 logger.debug("Start new eligibility verification request") 

88 

89 try: 

90 token = self._tokenize_request(sub, name, types) 

91 except jwcrypto.JWException: 

92 raise TokenError("Failed to tokenize form values") 

93 

94 try: 

95 logger.debug(f"GET request to {self.verify_url}") 

96 r = requests.get(self.verify_url, headers=self._auth_headers(token), timeout=self.timeout) 

97 except requests.ConnectionError: 

98 raise ApiError("Connection to verification server failed") 

99 except requests.Timeout: 

100 raise ApiError("Connection to verification server timed out") 

101 except requests.TooManyRedirects: 

102 raise ApiError("Too many redirects to verification server") 

103 except requests.HTTPError as e: 

104 raise ApiError(e) 

105 

106 expected_status_codes = {200, 400} 

107 if r.status_code in expected_status_codes: 

108 logger.debug("Process eligiblity verification response") 

109 return self._tokenize_response(r) 

110 else: 

111 logger.warning(f"Unexpected eligibility verification response status code: {r.status_code}") 

112 raise ApiError("Unexpected eligibility verification response") 

113 

114 def verify(self, sub, name, types): 

115 """Check eligibility for the subject and name.""" 

116 return self._request(sub, name, types)