Coverage for eligibility_api/client.py: 97%

54 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 15:35 +0000

1import logging 

2import requests 

3 

4from jwcrypto import common as jwcrypto 

5 

6from .tokens import RequestToken, ResponseToken, TokenError 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class ApiError(Exception): 

13 """Error calling the Eligibility Verification API.""" 

14 

15 pass 

16 

17 

18class Client: 

19 """Eligibility Verification API HTTP client.""" 

20 

21 def __init__( 

22 self, 

23 verify_url, 

24 issuer, 

25 agency, 

26 jws_signing_alg, 

27 client_private_key, 

28 jwe_encryption_alg, 

29 jwe_cek_enc, 

30 server_public_key, 

31 headers={}, 

32 ): 

33 self.verify_url = verify_url 

34 

35 self.issuer = issuer 

36 self.agency = agency 

37 self.jws_signing_alg = jws_signing_alg 

38 self.client_private_key = client_private_key 

39 self.jwe_encryption_alg = jwe_encryption_alg 

40 self.jwe_cek_enc = jwe_cek_enc 

41 self.server_public_key = server_public_key 

42 

43 if "authorization" in set(k.lower() for k in headers): 

44 raise ValueError('"Authorization" should not be set as an additional header.') 

45 

46 self.headers = headers 

47 

48 def _tokenize_request(self, sub, name, types): 

49 """Create a request token.""" 

50 return RequestToken( 

51 types, 

52 self.agency, 

53 self.jws_signing_alg, 

54 self.client_private_key, 

55 self.jwe_encryption_alg, 

56 self.jwe_cek_enc, 

57 self.server_public_key, 

58 sub, 

59 name, 

60 self.issuer, 

61 ) 

62 

63 def _tokenize_response(self, response): 

64 """Parse a response token.""" 

65 return ResponseToken( 

66 response, 

67 self.jwe_encryption_alg, 

68 self.jwe_cek_enc, 

69 self.client_private_key, 

70 self.jws_signing_alg, 

71 self.server_public_key, 

72 ) 

73 

74 def _auth_headers(self, token): 

75 """Create headers for the request with the token and verifier API keys""" 

76 headers = dict(Authorization=f"Bearer {token}") 

77 

78 for key, value in self.headers.items(): 

79 headers[key] = value 

80 

81 return headers 

82 

83 def _request(self, sub, name, types): 

84 """Make an API request for eligibility verification.""" 

85 logger.debug("Start new eligibility verification request") 

86 

87 try: 

88 token = self._tokenize_request(sub, name, types) 

89 except jwcrypto.JWException: 

90 raise TokenError("Failed to tokenize form values") 

91 

92 try: 

93 logger.debug(f"GET request to {self.verify_url}") 

94 r = requests.get(self.verify_url, headers=self._auth_headers(token)) 

95 except requests.ConnectionError: 

96 raise ApiError("Connection to verification server failed") 

97 except requests.Timeout: 

98 raise ApiError("Connection to verification server timed out") 

99 except requests.TooManyRedirects: 

100 raise ApiError("Too many redirects to verification server") 

101 except requests.HTTPError as e: 

102 raise ApiError(e) 

103 

104 expected_status_codes = {200, 400} 

105 if r.status_code in expected_status_codes: 

106 logger.debug("Process eligiblity verification response") 

107 return self._tokenize_response(r) 

108 else: 

109 logger.warning(f"Unexpected eligibility verification response status code: {r.status_code}") 

110 raise ApiError("Unexpected eligibility verification response") 

111 

112 def verify(self, sub, name, types): 

113 """Check eligibility for the subject and name.""" 

114 return self._request(sub, name, types)