Coverage for benefits/oauth/views.py: 98%

122 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-21 19:31 +0000

1import logging 

2 

3from django.shortcuts import redirect 

4from django.template.response import TemplateResponse 

5from django.urls import reverse 

6from django.utils.decorators import decorator_from_middleware 

7import sentry_sdk 

8 

9from benefits.routes import routes 

10from benefits.core import models, session 

11from benefits.core.middleware import AgencySessionRequired 

12from . import analytics, redirects 

13from .client import oauth, create_client 

14from .middleware import FlowUsesClaimsVerificationSessionRequired 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19TEMPLATE_SYSTEM_ERROR = "oauth/system_error.html" 

20 

21 

22def _oauth_client_or_error_redirect(request, flow: models.EnrollmentFlow): 

23 """Calls `benefits.oauth.client.create_client()`. 

24 

25 If a client is created successfully, return it; Otherwise, return a redirect response to OAuth system error. 

26 """ 

27 

28 oauth_client = None 

29 exception = None 

30 

31 try: 

32 oauth_client = create_client(oauth, flow) 

33 except Exception as ex: 

34 exception = ex 

35 

36 if not oauth_client and not exception: 

37 exception = Exception(f"oauth_client not registered: {flow.claims_provider.client_name}") 

38 

39 if exception: 

40 analytics.error(request, message=str(exception), operation="init") 

41 sentry_sdk.capture_exception(exception) 

42 return redirect(routes.OAUTH_SYSTEM_ERROR) 

43 

44 return oauth_client 

45 

46 

47@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

48def login(request): 

49 """View implementing OIDC authorize_redirect.""" 

50 flow = session.flow(request) 

51 

52 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

53 

54 if hasattr(oauth_client_result, "authorize_redirect"): 

55 # this looks like an oauth_client since it has the method we need 

56 oauth_client = oauth_client_result 

57 else: 

58 # this does not look like an oauth_client, it's an error redirect 

59 return oauth_client_result 

60 

61 route = reverse(routes.OAUTH_AUTHORIZE) 

62 redirect_uri = redirects.generate_redirect_uri(request, route) 

63 

64 logger.debug(f"OAuth authorize_redirect with redirect_uri: {redirect_uri}") 

65 

66 analytics.started_sign_in(request) 

67 exception = None 

68 result = None 

69 

70 try: 

71 result = oauth_client.authorize_redirect(request, redirect_uri) 

72 except Exception as ex: 

73 exception = ex 

74 

75 if result and result.status_code >= 400: 

76 exception = Exception(f"authorize_redirect error response [{result.status_code}]: {result.content.decode()}") 

77 elif result is None: 

78 exception = Exception("authorize_redirect returned None") 

79 

80 if exception: 

81 analytics.error(request, message=str(exception), operation="authorize_redirect") 

82 sentry_sdk.capture_exception(exception) 

83 result = redirect(routes.OAUTH_SYSTEM_ERROR) 

84 

85 return result 

86 

87 

88@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

89def authorize(request): 

90 """View implementing OIDC token authorization.""" 

91 flow = session.flow(request) 

92 

93 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

94 

95 if hasattr(oauth_client_result, "authorize_access_token"): 

96 # this looks like an oauth_client since it has the method we need 

97 oauth_client = oauth_client_result 

98 else: 

99 # this does not look like an oauth_client, it's an error redirect 

100 return oauth_client_result 

101 

102 logger.debug("Attempting to authorize OAuth access token") 

103 token = None 

104 exception = None 

105 

106 try: 

107 token = oauth_client.authorize_access_token(request) 

108 except Exception as ex: 

109 exception = ex 

110 

111 if token is None: 

112 logger.warning("Could not authorize OAuth access token") 

113 exception = Exception("oauth_client.authorize_access_token returned None") 

114 

115 if exception: 

116 analytics.error(request, message=str(exception), operation="authorize_access_token") 

117 sentry_sdk.capture_exception(exception) 

118 return redirect(routes.OAUTH_SYSTEM_ERROR) 

119 

120 logger.debug("OAuth access token authorized") 

121 

122 # We store the id_token in the user's session. This is the minimal amount of information needed later to log the user out. 

123 id_token = token["id_token"] 

124 

125 # We store the returned claim in case it can be used later in eligibility verification. 

126 flow_claim = flow.claims_claim 

127 stored_claim = None 

128 

129 error_claim = None 

130 

131 if flow_claim: 131 ↛ 146line 131 didn't jump to line 146 because the condition on line 131 was always true

132 userinfo = token.get("userinfo") 

133 

134 if userinfo: 

135 claim_value = userinfo.get(flow_claim) 

136 # the claim comes back in userinfo like { "claim": "1" | "0" } 

137 claim_value = int(claim_value) if claim_value else None 

138 if claim_value is None: 

139 logger.warning(f"userinfo did not contain: {flow_claim}") 

140 elif claim_value == 1: 

141 # if userinfo contains our claim and the flag is 1 (true), store the *claim* 

142 stored_claim = flow_claim 

143 elif claim_value >= 10: 

144 error_claim = claim_value 

145 

146 session.update(request, oauth_token=id_token, oauth_claim=stored_claim) 

147 analytics.finished_sign_in(request, error=error_claim) 

148 

149 return redirect(routes.ELIGIBILITY_CONFIRM) 

150 

151 

152@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

153def cancel(request): 

154 """View implementing cancellation of OIDC authorization.""" 

155 

156 analytics.canceled_sign_in(request) 

157 

158 return redirect(routes.ELIGIBILITY_UNVERIFIED) 

159 

160 

161@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

162def logout(request): 

163 """View implementing OIDC and application sign out.""" 

164 flow = session.flow(request) 

165 

166 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

167 

168 if hasattr(oauth_client_result, "load_server_metadata"): 168 ↛ 174line 168 didn't jump to line 174 because the condition on line 168 was always true

169 # this looks like an oauth_client since it has the method we need 

170 # (called in redirects.deauthorize_redirect) 

171 oauth_client = oauth_client_result 

172 else: 

173 # this does not look like an oauth_client, it's an error redirect 

174 return oauth_client_result 

175 

176 analytics.started_sign_out(request) 

177 

178 # overwrite the oauth session token, the user is signed out of the app 

179 token = session.oauth_token(request) 

180 session.logout(request) 

181 

182 route = reverse(routes.OAUTH_POST_LOGOUT) 

183 redirect_uri = redirects.generate_redirect_uri(request, route) 

184 

185 logger.debug(f"OAuth end_session_endpoint with redirect_uri: {redirect_uri}") 

186 

187 # send the user through the end_session_endpoint, redirecting back to 

188 # the post_logout route 

189 return redirects.deauthorize_redirect(request, oauth_client, token, redirect_uri) 

190 

191 

192@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

193def post_logout(request): 

194 """View routes the user to their origin after sign out.""" 

195 

196 analytics.finished_sign_out(request) 

197 

198 origin = session.origin(request) 

199 return redirect(origin) 

200 

201 

202@decorator_from_middleware(AgencySessionRequired) 

203def system_error(request): 

204 """View handler for an oauth system error.""" 

205 

206 # overwrite origin so that CTA takes user to agency index 

207 agency = session.agency(request) 

208 session.update(request, origin=agency.index_url) 

209 

210 return TemplateResponse(request, TEMPLATE_SYSTEM_ERROR)