Coverage for benefits/core/session.py: 99%
140 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-22 21:13 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-22 21:13 +0000
1"""
2The core application: helpers to work with request sessions.
3"""
5from datetime import datetime, timedelta, timezone
6import hashlib
7import logging
8import time
9import uuid
11from cdt_identity.claims import ClaimsResult
12from cdt_identity.session import Session as OAuthSession
13from django.urls import reverse
15from benefits.routes import routes
16from . import models
19logger = logging.getLogger(__name__)
22_AGENCY = "agency"
23_DEBUG = "debug"
24_DID = "did"
25_ELIGIBLE = "eligibility"
26_ENROLLMENT_TOKEN = "enrollment_token"
27_ENROLLMENT_TOKEN_EXP = "enrollment_token_expiry"
28_ENROLLMENT_EXP = "enrollment_expiry"
29_FLOW = "flow"
30_LANG = "lang"
31_LOGGED_IN = "logged_in"
32_ORIGIN = "origin"
33_START = "start"
34_UID = "uid"
37def agency(request):
38 """Get the agency from the request's session, or None"""
39 try:
40 return models.TransitAgency.by_id(request.session[_AGENCY])
41 except (KeyError, models.TransitAgency.DoesNotExist):
42 return None
45def active_agency(request):
46 """True if the request's session is configured with an active agency. False otherwise."""
47 a = agency(request)
48 return a and a.active
51def context_dict(request):
52 """The request's session context as a dict."""
53 return {
54 _AGENCY: agency(request).slug if active_agency(request) else None,
55 _DEBUG: debug(request),
56 _DID: did(request),
57 _FLOW: flow(request),
58 _ELIGIBLE: eligible(request),
59 _ENROLLMENT_EXP: enrollment_expiry(request),
60 _ENROLLMENT_TOKEN: enrollment_token(request),
61 _ENROLLMENT_TOKEN_EXP: enrollment_token_expiry(request),
62 _LANG: language(request),
63 _LOGGED_IN: logged_in(request),
64 _ORIGIN: origin(request),
65 _START: start(request),
66 _UID: uid(request),
67 }
70def debug(request):
71 """Get the DEBUG flag from the request's session."""
72 return bool(request.session.get(_DEBUG, False))
75def did(request):
76 """
77 Get the session's device ID, a hashed version of the unique ID. If unset,
78 the session is reset to initialize a value.
80 This value, like UID, is randomly generated per session and is needed for
81 Amplitude to accurately track that a sequence of events came from a unique
82 user.
84 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude
85 """
86 d = request.session.get(_DID)
87 if not d:
88 reset(request)
89 d = request.session.get(_DID)
90 return str(d)
93def eligible(request):
94 """True if the request's session has confirmed eligibility. False otherwise."""
95 return request.session.get(_ELIGIBLE)
98def enrollment_expiry(request):
99 """Get the expiry date for a user's enrollment from session, or None."""
100 expiry = request.session.get(_ENROLLMENT_EXP)
101 if expiry:
102 return datetime.fromtimestamp(expiry, tz=timezone.utc)
103 else:
104 return None
107def enrollment_reenrollment(request):
108 """Get the reenrollment date for a user's enrollment from session, or None."""
109 expiry = enrollment_expiry(request)
110 enrollment_flow = flow(request)
112 if enrollment_flow and enrollment_flow.supports_expiration and expiry:
113 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days)
114 else:
115 return None
118def enrollment_token(request):
119 """Get the enrollment token from the request's session, or None."""
120 return request.session.get(_ENROLLMENT_TOKEN)
123def enrollment_token_expiry(request):
124 """Get the enrollment token's expiry time from the request's session, or None."""
125 return request.session.get(_ENROLLMENT_TOKEN_EXP)
128def enrollment_token_valid(request):
129 """True if the request's session is configured with a valid token. False otherwise."""
130 if bool(enrollment_token(request)):
131 exp = enrollment_token_expiry(request)
132 # ensure token does not expire in the next 5 seconds
133 valid = exp is None or exp > (time.time() + 5)
134 return valid
135 else:
136 return False
139def language(request):
140 """Get the language configured for the request."""
141 return request.LANGUAGE_CODE
144def logged_in(request):
145 """Get the user's status of having logged in with OAuth from the request's session, or None"""
146 return bool(request.session.get(_LOGGED_IN))
149def logout(request):
150 """Reset the session claims and tokens."""
151 OAuthSession(request, claims_result=ClaimsResult())
152 update(request, logged_in=False, enrollment_token=False)
155def oauth_extra_claims(request):
156 """Get the extra oauth claims from the request's session, or None"""
157 claims = [claim for claim, value in OAuthSession(request).claims_result.verified.items() if value]
159 if claims:
160 f = flow(request)
161 if f and f.uses_claims_verification:
162 claims.remove(f.claims_request.eligibility_claim)
163 return claims or None
164 raise Exception("Oauth claims but no flow")
165 else:
166 return None
169def origin(request):
170 """Get the origin for the request's session, or default to the index route."""
171 return request.session.get(_ORIGIN, reverse(routes.INDEX))
174def reset(request):
175 """Reset the session for the request."""
176 logger.debug("Reset session")
177 request.session[_AGENCY] = None
178 request.session[_FLOW] = None
179 request.session[_ELIGIBLE] = False
180 request.session[_ORIGIN] = reverse(routes.INDEX)
181 request.session[_ENROLLMENT_EXP] = None
182 request.session[_ENROLLMENT_TOKEN] = None
183 request.session[_ENROLLMENT_TOKEN_EXP] = None
184 request.session[_LOGGED_IN] = False
185 OAuthSession(request, reset=True)
187 if _UID not in request.session or not request.session[_UID]:
188 logger.debug("Reset session time and uid")
189 request.session[_START] = int(time.time() * 1000)
190 u = str(uuid.uuid4())
191 request.session[_UID] = u
192 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32]))
195def start(request):
196 """
197 Get the start time from the request's session, as integer milliseconds since
198 Epoch. If unset, the session is reset to initialize a value.
200 Once started, does not reset after subsequent calls to session.reset() or
201 session.start(). This value is needed for Amplitude to accurately track
202 sessions.
204 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions
205 """
206 s = request.session.get(_START)
207 if not s:
208 reset(request)
209 s = request.session.get(_START)
210 return s
213def uid(request):
214 """
215 Get the session's unique ID, a randomly generated UUID4 string. If unset,
216 the session is reset to initialize a value.
218 This value, like DID, is needed for Amplitude to accurately track that a
219 sequence of events came from a unique user.
221 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude
223 Although Amplitude advises *against* setting user_id for anonymous users,
224 here a value is set on anonymous users anyway, as the users never sign-in
225 and become de-anonymized to this app / Amplitude.
226 """
227 u = request.session.get(_UID)
228 if not u:
229 reset(request)
230 u = request.session.get(_UID)
231 return u
234def update(
235 request,
236 agency=None,
237 debug=None,
238 flow=None,
239 eligible=None,
240 enrollment_expiry=None,
241 enrollment_token=None,
242 enrollment_token_exp=None,
243 logged_in=None,
244 origin=None,
245):
246 """Update the request's session with non-null values."""
247 if agency is not None and isinstance(agency, models.TransitAgency):
248 request.session[_AGENCY] = agency.id
249 if debug is not None:
250 request.session[_DEBUG] = debug
251 if eligible is not None:
252 request.session[_ELIGIBLE] = bool(eligible)
253 if isinstance(enrollment_expiry, datetime):
254 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None:
255 # this is a naive datetime instance, update tzinfo for UTC
256 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp
257 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time.
258 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX
259 # > timestamp by supplying tzinfo=timezone.utc
260 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc)
261 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp()
262 if enrollment_token is not None:
263 request.session[_ENROLLMENT_TOKEN] = enrollment_token
264 request.session[_ENROLLMENT_TOKEN_EXP] = enrollment_token_exp
265 if logged_in is not None:
266 request.session[_LOGGED_IN] = logged_in
267 if origin is not None:
268 request.session[_ORIGIN] = origin
269 if flow is not None and isinstance(flow, models.EnrollmentFlow):
270 request.session[_FLOW] = flow.id
271 oauth_session = OAuthSession(request)
272 oauth_session.client_config = flow.oauth_config
273 oauth_session.claims_request = flow.claims_request
276def flow(request) -> models.EnrollmentFlow | None:
277 """Get the EnrollmentFlow from the request's session, or None"""
278 try:
279 return models.EnrollmentFlow.by_id(request.session[_FLOW])
280 except (KeyError, models.EnrollmentFlow.DoesNotExist):
281 return None