Coverage for benefits/core/session.py: 99%
141 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 00:56 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 00:56 +0000
1"""
2The core application: helpers to work with request sessions.
3"""
5from datetime import datetime, timedelta, timezone
6import hashlib
7import logging
8import time
9import uuid
11from django.urls import reverse
13from benefits.routes import routes
14from . import models
17logger = logging.getLogger(__name__)
20_AGENCY = "agency"
21_DEBUG = "debug"
22_DID = "did"
23_ELIGIBLE = "eligibility"
24_ENROLLMENT_TOKEN = "enrollment_token"
25_ENROLLMENT_TOKEN_EXP = "enrollment_token_expiry"
26_ENROLLMENT_EXP = "enrollment_expiry"
27_FLOW = "flow"
28_LANG = "lang"
29_OAUTH_CLAIMS = "oauth_claims"
30_OAUTH_TOKEN = "oauth_token"
31_ORIGIN = "origin"
32_START = "start"
33_UID = "uid"
36def agency(request):
37 """Get the agency from the request's session, or None"""
38 try:
39 return models.TransitAgency.by_id(request.session[_AGENCY])
40 except (KeyError, models.TransitAgency.DoesNotExist):
41 return None
44def active_agency(request):
45 """True if the request's session is configured with an active agency. False otherwise."""
46 a = agency(request)
47 return a and a.active
50def context_dict(request):
51 """The request's session context as a dict."""
52 return {
53 _AGENCY: agency(request).slug if active_agency(request) else None,
54 _DEBUG: debug(request),
55 _DID: did(request),
56 _FLOW: flow(request),
57 _ELIGIBLE: eligible(request),
58 _ENROLLMENT_EXP: enrollment_expiry(request),
59 _ENROLLMENT_TOKEN: enrollment_token(request),
60 _ENROLLMENT_TOKEN_EXP: enrollment_token_expiry(request),
61 _LANG: language(request),
62 _OAUTH_TOKEN: oauth_token(request),
63 _OAUTH_CLAIMS: oauth_claims(request),
64 _ORIGIN: origin(request),
65 _START: start(request),
66 _UID: uid(request),
67 }
70def debug(request):
71 """Get the DEBUG flag from the request's session."""
72 return bool(request.session.get(_DEBUG, False))
75def did(request):
76 """
77 Get the session's device ID, a hashed version of the unique ID. If unset,
78 the session is reset to initialize a value.
80 This value, like UID, is randomly generated per session and is needed for
81 Amplitude to accurately track that a sequence of events came from a unique
82 user.
84 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude
85 """
86 d = request.session.get(_DID)
87 if not d:
88 reset(request)
89 d = request.session.get(_DID)
90 return str(d)
93def eligible(request):
94 """True if the request's session has confirmed eligibility. False otherwise."""
95 return request.session.get(_ELIGIBLE)
98def enrollment_expiry(request):
99 """Get the expiry date for a user's enrollment from session, or None."""
100 expiry = request.session.get(_ENROLLMENT_EXP)
101 if expiry:
102 return datetime.fromtimestamp(expiry, tz=timezone.utc)
103 else:
104 return None
107def enrollment_reenrollment(request):
108 """Get the reenrollment date for a user's enrollment from session, or None."""
109 expiry = enrollment_expiry(request)
110 enrollment_flow = flow(request)
112 if enrollment_flow and enrollment_flow.supports_expiration and expiry:
113 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days)
114 else:
115 return None
118def enrollment_token(request):
119 """Get the enrollment token from the request's session, or None."""
120 return request.session.get(_ENROLLMENT_TOKEN)
123def enrollment_token_expiry(request):
124 """Get the enrollment token's expiry time from the request's session, or None."""
125 return request.session.get(_ENROLLMENT_TOKEN_EXP)
128def enrollment_token_valid(request):
129 """True if the request's session is configured with a valid token. False otherwise."""
130 if bool(enrollment_token(request)):
131 exp = enrollment_token_expiry(request)
132 # ensure token does not expire in the next 5 seconds
133 valid = exp is None or exp > (time.time() + 5)
134 return valid
135 else:
136 return False
139def language(request):
140 """Get the language configured for the request."""
141 return request.LANGUAGE_CODE
144def logged_in(request):
145 """Check if the current session has an OAuth token."""
146 return bool(oauth_token(request))
149def logout(request):
150 """Reset the session claims and tokens."""
151 update(request, oauth_claims=[], oauth_token=False, enrollment_token=False)
154def oauth_token(request):
155 """Get the oauth token from the request's session, or None"""
156 return request.session.get(_OAUTH_TOKEN)
159def oauth_claims(request):
160 """Get the oauth claims from the request's session, or None"""
161 return request.session.get(_OAUTH_CLAIMS)
164def oauth_extra_claims(request):
165 """Get the extra oauth claims from the request's session, or None"""
166 claims = oauth_claims(request)
167 if claims:
168 f = flow(request)
169 if f and f.uses_claims_verification:
170 claims.remove(f.claims_eligibility_claim)
171 return claims
172 raise Exception("Oauth claims but no flow")
173 else:
174 return None
177def origin(request):
178 """Get the origin for the request's session, or default to the index route."""
179 return request.session.get(_ORIGIN, reverse(routes.INDEX))
182def reset(request):
183 """Reset the session for the request."""
184 logger.debug("Reset session")
185 request.session[_AGENCY] = None
186 request.session[_FLOW] = None
187 request.session[_ELIGIBLE] = False
188 request.session[_ORIGIN] = reverse(routes.INDEX)
189 request.session[_ENROLLMENT_EXP] = None
190 request.session[_ENROLLMENT_TOKEN] = None
191 request.session[_ENROLLMENT_TOKEN_EXP] = None
192 request.session[_OAUTH_TOKEN] = None
193 request.session[_OAUTH_CLAIMS] = None
195 if _UID not in request.session or not request.session[_UID]:
196 logger.debug("Reset session time and uid")
197 request.session[_START] = int(time.time() * 1000)
198 u = str(uuid.uuid4())
199 request.session[_UID] = u
200 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32]))
203def start(request):
204 """
205 Get the start time from the request's session, as integer milliseconds since
206 Epoch. If unset, the session is reset to initialize a value.
208 Once started, does not reset after subsequent calls to session.reset() or
209 session.start(). This value is needed for Amplitude to accurately track
210 sessions.
212 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions
213 """
214 s = request.session.get(_START)
215 if not s:
216 reset(request)
217 s = request.session.get(_START)
218 return s
221def uid(request):
222 """
223 Get the session's unique ID, a randomly generated UUID4 string. If unset,
224 the session is reset to initialize a value.
226 This value, like DID, is needed for Amplitude to accurately track that a
227 sequence of events came from a unique user.
229 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude
231 Although Amplitude advises *against* setting user_id for anonymous users,
232 here a value is set on anonymous users anyway, as the users never sign-in
233 and become de-anonymized to this app / Amplitude.
234 """
235 u = request.session.get(_UID)
236 if not u:
237 reset(request)
238 u = request.session.get(_UID)
239 return u
242def update(
243 request,
244 agency=None,
245 debug=None,
246 flow=None,
247 eligible=None,
248 enrollment_expiry=None,
249 enrollment_token=None,
250 enrollment_token_exp=None,
251 oauth_token=None,
252 oauth_claims=None,
253 origin=None,
254):
255 """Update the request's session with non-null values."""
256 if agency is not None and isinstance(agency, models.TransitAgency):
257 request.session[_AGENCY] = agency.id
258 if debug is not None:
259 request.session[_DEBUG] = debug
260 if eligible is not None:
261 request.session[_ELIGIBLE] = bool(eligible)
262 if isinstance(enrollment_expiry, datetime):
263 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None:
264 # this is a naive datetime instance, update tzinfo for UTC
265 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp
266 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time.
267 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX
268 # > timestamp by supplying tzinfo=timezone.utc
269 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc)
270 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp()
271 if enrollment_token is not None:
272 request.session[_ENROLLMENT_TOKEN] = enrollment_token
273 request.session[_ENROLLMENT_TOKEN_EXP] = enrollment_token_exp
274 if oauth_token is not None:
275 request.session[_OAUTH_TOKEN] = oauth_token
276 if oauth_claims is not None:
277 request.session[_OAUTH_CLAIMS] = oauth_claims
278 if origin is not None:
279 request.session[_ORIGIN] = origin
280 if flow is not None and isinstance(flow, models.EnrollmentFlow):
281 request.session[_FLOW] = flow.id
284def flow(request) -> models.EnrollmentFlow | None:
285 """Get the EnrollmentFlow from the request's session, or None"""
286 try:
287 return models.EnrollmentFlow.by_id(request.session[_FLOW])
288 except (KeyError, models.EnrollmentFlow.DoesNotExist):
289 return None