Coverage for benefits/oauth/views.py: 97%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-19 16:31 +0000

1import logging 

2 

3from django.shortcuts import redirect 

4from django.template.response import TemplateResponse 

5from django.urls import reverse 

6from django.utils.decorators import decorator_from_middleware 

7import sentry_sdk 

8 

9from benefits.routes import routes 

10from benefits.core import models, session 

11from benefits.core.middleware import AgencySessionRequired 

12from . import analytics, redirects 

13from .client import oauth, create_client 

14from .middleware import FlowUsesClaimsVerificationSessionRequired 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19TEMPLATE_SYSTEM_ERROR = "oauth/system_error.html" 

20 

21 

22def _oauth_client_or_error_redirect(request, flow: models.EnrollmentFlow): 

23 """Calls `benefits.oauth.client.create_client()`. 

24 

25 If a client is created successfully, return it; Otherwise, return a redirect response to OAuth system error. 

26 """ 

27 

28 oauth_client = None 

29 exception = None 

30 

31 try: 

32 oauth_client = create_client(oauth, flow) 

33 except Exception as ex: 

34 exception = ex 

35 

36 if not oauth_client and not exception: 

37 exception = Exception(f"oauth_client not registered: {flow.claims_provider.client_name}") 

38 

39 if exception: 

40 analytics.error(request, message=str(exception), operation="init") 

41 sentry_sdk.capture_exception(exception) 

42 return redirect(routes.OAUTH_SYSTEM_ERROR) 

43 

44 return oauth_client 

45 

46 

47@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

48def login(request): 

49 """View implementing OIDC authorize_redirect.""" 

50 flow = session.flow(request) 

51 

52 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

53 

54 if hasattr(oauth_client_result, "authorize_redirect"): 

55 # this looks like an oauth_client since it has the method we need 

56 oauth_client = oauth_client_result 

57 else: 

58 # this does not look like an oauth_client, it's an error redirect 

59 return oauth_client_result 

60 

61 route = reverse(routes.OAUTH_AUTHORIZE) 

62 redirect_uri = redirects.generate_redirect_uri(request, route) 

63 

64 logger.debug(f"OAuth authorize_redirect with redirect_uri: {redirect_uri}") 

65 

66 analytics.started_sign_in(request) 

67 exception = None 

68 result = None 

69 

70 try: 

71 result = oauth_client.authorize_redirect(request, redirect_uri) 

72 except Exception as ex: 

73 exception = ex 

74 

75 if result and result.status_code >= 400: 

76 exception = Exception(f"authorize_redirect error response [{result.status_code}]: {result.content.decode()}") 

77 elif result is None: 

78 exception = Exception("authorize_redirect returned None") 

79 

80 if exception: 

81 analytics.error(request, message=str(exception), operation="authorize_redirect") 

82 sentry_sdk.capture_exception(exception) 

83 result = redirect(routes.OAUTH_SYSTEM_ERROR) 

84 

85 return result 

86 

87 

88@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

89def authorize(request): 

90 """View implementing OIDC token authorization.""" 

91 flow = session.flow(request) 

92 

93 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

94 

95 if hasattr(oauth_client_result, "authorize_access_token"): 

96 # this looks like an oauth_client since it has the method we need 

97 oauth_client = oauth_client_result 

98 else: 

99 # this does not look like an oauth_client, it's an error redirect 

100 return oauth_client_result 

101 

102 logger.debug("Attempting to authorize OAuth access token") 

103 token = None 

104 exception = None 

105 

106 try: 

107 token = oauth_client.authorize_access_token(request) 

108 except Exception as ex: 

109 exception = ex 

110 

111 if token is None: 

112 logger.warning("Could not authorize OAuth access token") 

113 exception = Exception("oauth_client.authorize_access_token returned None") 

114 

115 if exception: 

116 analytics.error(request, message=str(exception), operation="authorize_access_token") 

117 sentry_sdk.capture_exception(exception) 

118 return redirect(routes.OAUTH_SYSTEM_ERROR) 

119 

120 logger.debug("OAuth access token authorized") 

121 

122 # We store the id_token in the user's session. This is the minimal amount of information needed later to log the user out. 

123 id_token = token["id_token"] 

124 

125 # We store the returned claim in case it can be used later in eligibility verification. 

126 flow_claims = flow.claims_all_claims 

127 stored_claims = [] 

128 

129 error_claim = {} 

130 

131 if flow_claims: 131 ↛ 147line 131 didn't jump to line 147 because the condition on line 131 was always true

132 userinfo = token.get("userinfo") 

133 

134 if userinfo: 

135 for claim in flow_claims: 

136 claim_value = userinfo.get(claim) 

137 # the claim comes back in userinfo like { "claim": "1" | "0" } 

138 claim_value = int(claim_value) if claim_value else None 

139 if claim_value is None: 

140 logger.warning(f"userinfo did not contain: {claim}") 

141 elif claim_value == 1: 

142 # if userinfo contains our claim and the flag is 1 (true), store the *claim* 

143 stored_claims.append(claim) 

144 elif claim_value >= 10: 144 ↛ 135line 144 didn't jump to line 135 because the condition on line 144 was always true

145 error_claim[claim] = claim_value 

146 

147 session.update(request, oauth_token=id_token, oauth_claims=stored_claims) 

148 analytics.finished_sign_in(request, error=error_claim) 

149 

150 return redirect(routes.ELIGIBILITY_CONFIRM) 

151 

152 

153@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

154def cancel(request): 

155 """View implementing cancellation of OIDC authorization.""" 

156 

157 analytics.canceled_sign_in(request) 

158 

159 return redirect(routes.ELIGIBILITY_UNVERIFIED) 

160 

161 

162@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

163def logout(request): 

164 """View implementing OIDC and application sign out.""" 

165 flow = session.flow(request) 

166 

167 oauth_client_result = _oauth_client_or_error_redirect(request, flow) 

168 

169 if hasattr(oauth_client_result, "load_server_metadata"): 169 ↛ 175line 169 didn't jump to line 175 because the condition on line 169 was always true

170 # this looks like an oauth_client since it has the method we need 

171 # (called in redirects.deauthorize_redirect) 

172 oauth_client = oauth_client_result 

173 else: 

174 # this does not look like an oauth_client, it's an error redirect 

175 return oauth_client_result 

176 

177 analytics.started_sign_out(request) 

178 

179 # overwrite the oauth session token, the user is signed out of the app 

180 token = session.oauth_token(request) 

181 session.logout(request) 

182 

183 route = reverse(routes.OAUTH_POST_LOGOUT) 

184 redirect_uri = redirects.generate_redirect_uri(request, route) 

185 

186 logger.debug(f"OAuth end_session_endpoint with redirect_uri: {redirect_uri}") 

187 

188 # send the user through the end_session_endpoint, redirecting back to 

189 # the post_logout route 

190 return redirects.deauthorize_redirect(request, oauth_client, token, redirect_uri) 

191 

192 

193@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired) 

194def post_logout(request): 

195 """View routes the user to their origin after sign out.""" 

196 

197 analytics.finished_sign_out(request) 

198 

199 origin = session.origin(request) 

200 return redirect(origin) 

201 

202 

203@decorator_from_middleware(AgencySessionRequired) 

204def system_error(request): 

205 """View handler for an oauth system error.""" 

206 

207 # overwrite origin so that CTA takes user to agency index 

208 agency = session.agency(request) 

209 session.update(request, origin=agency.index_url) 

210 

211 return TemplateResponse(request, TEMPLATE_SYSTEM_ERROR)