Coverage for benefits/oauth/views.py: 97%
123 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-19 16:31 +0000
1import logging
3from django.shortcuts import redirect
4from django.template.response import TemplateResponse
5from django.urls import reverse
6from django.utils.decorators import decorator_from_middleware
7import sentry_sdk
9from benefits.routes import routes
10from benefits.core import models, session
11from benefits.core.middleware import AgencySessionRequired
12from . import analytics, redirects
13from .client import oauth, create_client
14from .middleware import FlowUsesClaimsVerificationSessionRequired
17logger = logging.getLogger(__name__)
19TEMPLATE_SYSTEM_ERROR = "oauth/system_error.html"
22def _oauth_client_or_error_redirect(request, flow: models.EnrollmentFlow):
23 """Calls `benefits.oauth.client.create_client()`.
25 If a client is created successfully, return it; Otherwise, return a redirect response to OAuth system error.
26 """
28 oauth_client = None
29 exception = None
31 try:
32 oauth_client = create_client(oauth, flow)
33 except Exception as ex:
34 exception = ex
36 if not oauth_client and not exception:
37 exception = Exception(f"oauth_client not registered: {flow.claims_provider.client_name}")
39 if exception:
40 analytics.error(request, message=str(exception), operation="init")
41 sentry_sdk.capture_exception(exception)
42 return redirect(routes.OAUTH_SYSTEM_ERROR)
44 return oauth_client
47@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired)
48def login(request):
49 """View implementing OIDC authorize_redirect."""
50 flow = session.flow(request)
52 oauth_client_result = _oauth_client_or_error_redirect(request, flow)
54 if hasattr(oauth_client_result, "authorize_redirect"):
55 # this looks like an oauth_client since it has the method we need
56 oauth_client = oauth_client_result
57 else:
58 # this does not look like an oauth_client, it's an error redirect
59 return oauth_client_result
61 route = reverse(routes.OAUTH_AUTHORIZE)
62 redirect_uri = redirects.generate_redirect_uri(request, route)
64 logger.debug(f"OAuth authorize_redirect with redirect_uri: {redirect_uri}")
66 analytics.started_sign_in(request)
67 exception = None
68 result = None
70 try:
71 result = oauth_client.authorize_redirect(request, redirect_uri)
72 except Exception as ex:
73 exception = ex
75 if result and result.status_code >= 400:
76 exception = Exception(f"authorize_redirect error response [{result.status_code}]: {result.content.decode()}")
77 elif result is None:
78 exception = Exception("authorize_redirect returned None")
80 if exception:
81 analytics.error(request, message=str(exception), operation="authorize_redirect")
82 sentry_sdk.capture_exception(exception)
83 result = redirect(routes.OAUTH_SYSTEM_ERROR)
85 return result
88@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired)
89def authorize(request):
90 """View implementing OIDC token authorization."""
91 flow = session.flow(request)
93 oauth_client_result = _oauth_client_or_error_redirect(request, flow)
95 if hasattr(oauth_client_result, "authorize_access_token"):
96 # this looks like an oauth_client since it has the method we need
97 oauth_client = oauth_client_result
98 else:
99 # this does not look like an oauth_client, it's an error redirect
100 return oauth_client_result
102 logger.debug("Attempting to authorize OAuth access token")
103 token = None
104 exception = None
106 try:
107 token = oauth_client.authorize_access_token(request)
108 except Exception as ex:
109 exception = ex
111 if token is None:
112 logger.warning("Could not authorize OAuth access token")
113 exception = Exception("oauth_client.authorize_access_token returned None")
115 if exception:
116 analytics.error(request, message=str(exception), operation="authorize_access_token")
117 sentry_sdk.capture_exception(exception)
118 return redirect(routes.OAUTH_SYSTEM_ERROR)
120 logger.debug("OAuth access token authorized")
122 # We store the id_token in the user's session. This is the minimal amount of information needed later to log the user out.
123 id_token = token["id_token"]
125 # We store the returned claim in case it can be used later in eligibility verification.
126 flow_claims = flow.claims_all_claims
127 stored_claims = []
129 error_claim = {}
131 if flow_claims: 131 ↛ 147line 131 didn't jump to line 147 because the condition on line 131 was always true
132 userinfo = token.get("userinfo")
134 if userinfo:
135 for claim in flow_claims:
136 claim_value = userinfo.get(claim)
137 # the claim comes back in userinfo like { "claim": "1" | "0" }
138 claim_value = int(claim_value) if claim_value else None
139 if claim_value is None:
140 logger.warning(f"userinfo did not contain: {claim}")
141 elif claim_value == 1:
142 # if userinfo contains our claim and the flag is 1 (true), store the *claim*
143 stored_claims.append(claim)
144 elif claim_value >= 10: 144 ↛ 135line 144 didn't jump to line 135 because the condition on line 144 was always true
145 error_claim[claim] = claim_value
147 session.update(request, oauth_token=id_token, oauth_claims=stored_claims)
148 analytics.finished_sign_in(request, error=error_claim)
150 return redirect(routes.ELIGIBILITY_CONFIRM)
153@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired)
154def cancel(request):
155 """View implementing cancellation of OIDC authorization."""
157 analytics.canceled_sign_in(request)
159 return redirect(routes.ELIGIBILITY_UNVERIFIED)
162@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired)
163def logout(request):
164 """View implementing OIDC and application sign out."""
165 flow = session.flow(request)
167 oauth_client_result = _oauth_client_or_error_redirect(request, flow)
169 if hasattr(oauth_client_result, "load_server_metadata"): 169 ↛ 175line 169 didn't jump to line 175 because the condition on line 169 was always true
170 # this looks like an oauth_client since it has the method we need
171 # (called in redirects.deauthorize_redirect)
172 oauth_client = oauth_client_result
173 else:
174 # this does not look like an oauth_client, it's an error redirect
175 return oauth_client_result
177 analytics.started_sign_out(request)
179 # overwrite the oauth session token, the user is signed out of the app
180 token = session.oauth_token(request)
181 session.logout(request)
183 route = reverse(routes.OAUTH_POST_LOGOUT)
184 redirect_uri = redirects.generate_redirect_uri(request, route)
186 logger.debug(f"OAuth end_session_endpoint with redirect_uri: {redirect_uri}")
188 # send the user through the end_session_endpoint, redirecting back to
189 # the post_logout route
190 return redirects.deauthorize_redirect(request, oauth_client, token, redirect_uri)
193@decorator_from_middleware(FlowUsesClaimsVerificationSessionRequired)
194def post_logout(request):
195 """View routes the user to their origin after sign out."""
197 analytics.finished_sign_out(request)
199 origin = session.origin(request)
200 return redirect(origin)
203@decorator_from_middleware(AgencySessionRequired)
204def system_error(request):
205 """View handler for an oauth system error."""
207 # overwrite origin so that CTA takes user to agency index
208 agency = session.agency(request)
209 session.update(request, origin=agency.index_url)
211 return TemplateResponse(request, TEMPLATE_SYSTEM_ERROR)