Coverage for benefits / core / session.py: 98%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-01 15:39 +0000

1""" 

2The core application: helpers to work with request sessions. 

3""" 

4 

5import hashlib 

6import logging 

7import time 

8import uuid 

9from datetime import datetime, timedelta, timezone 

10 

11from cdt_identity.claims import ClaimsResult 

12from cdt_identity.session import Session as OAuthSession 

13from django.urls import reverse 

14 

15from benefits.enrollment_littlepay.models import LittlepayGroup 

16from benefits.enrollment_littlepay.session import Session as LittlepaySession 

17from benefits.enrollment_switchio.models import SwitchioGroup 

18from benefits.enrollment_switchio.session import Session as SwitchioSession 

19from benefits.routes import routes 

20 

21from . import models 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26_AGENCY = "agency" 

27_DEBUG = "debug" 

28_DID = "did" 

29_ELIGIBLE = "eligibility" 

30_ENROLLMENT_EXP = "enrollment_expiry" 

31_FLOW = "flow" 

32_GROUP = "group" # EnrollmentGroup, not django.auth Group 

33_LANG = "lang" 

34_LOGGED_IN = "logged_in" 

35_ORIGIN = "origin" 

36_START = "start" 

37_UID = "uid" 

38 

39 

40def agency(request): 

41 """Get the agency from the request's session, or None""" 

42 try: 

43 return models.TransitAgency.by_id(request.session[_AGENCY]) 

44 except (KeyError, models.TransitAgency.DoesNotExist): 

45 return None 

46 

47 

48def active_agency(request): 

49 """True if the request's session is configured with an active agency. False otherwise.""" 

50 a = agency(request) 

51 return a and a.active 

52 

53 

54def context_dict(request): 

55 """The request's session context as a dict.""" 

56 littlepay_session = LittlepaySession(request) 

57 switchio_session = SwitchioSession(request) 

58 return { 

59 _AGENCY: agency(request).slug if active_agency(request) else None, 

60 _DEBUG: debug(request), 

61 _DID: did(request), 

62 _FLOW: flow(request), 

63 _GROUP: group(request), 

64 _ELIGIBLE: eligible(request), 

65 _ENROLLMENT_EXP: enrollment_expiry(request), 

66 littlepay_session._keys_access_token: littlepay_session.access_token, 

67 littlepay_session._keys_access_token_expiry: littlepay_session.access_token_expiry, 

68 switchio_session._keys_registration_id: switchio_session.registration_id, 

69 _LANG: language(request), 

70 _LOGGED_IN: logged_in(request), 

71 _ORIGIN: origin(request), 

72 _START: start(request), 

73 _UID: uid(request), 

74 } 

75 

76 

77def debug(request): 

78 """Get the DEBUG flag from the request's session.""" 

79 return bool(request.session.get(_DEBUG, False)) 

80 

81 

82def did(request): 

83 """ 

84 Get the session's device ID, a hashed version of the unique ID. If unset, 

85 the session is reset to initialize a value. 

86 

87 This value, like UID, is randomly generated per session and is needed for 

88 Amplitude to accurately track that a sequence of events came from a unique 

89 user. 

90 

91 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

92 """ 

93 d = request.session.get(_DID) 

94 if not d: 

95 reset(request) 

96 d = request.session.get(_DID) 

97 return str(d) 

98 

99 

100def eligible(request): 

101 """True if the request's session has confirmed eligibility. False otherwise.""" 

102 return request.session.get(_ELIGIBLE) 

103 

104 

105def enrollment_expiry(request): 

106 """Get the expiry date for a user's enrollment from session, or None.""" 

107 expiry = request.session.get(_ENROLLMENT_EXP) 

108 if expiry: 

109 return datetime.fromtimestamp(expiry, tz=timezone.utc) 

110 else: 

111 return None 

112 

113 

114def enrollment_reenrollment(request): 

115 """Get the reenrollment date for a user's enrollment from session, or None.""" 

116 expiry = enrollment_expiry(request) 

117 enrollment_flow = flow(request) 

118 

119 if enrollment_flow and enrollment_flow.supports_expiration and expiry: 

120 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days) 

121 else: 

122 return None 

123 

124 

125def flow(request) -> models.EnrollmentFlow | None: 

126 """Get the EnrollmentFlow from the request's session, or None""" 

127 try: 

128 return models.EnrollmentFlow.by_id(request.session[_FLOW]) 

129 except (KeyError, models.EnrollmentFlow.DoesNotExist): 

130 return None 

131 

132 

133def group(request) -> models.EnrollmentGroup | None: 

134 """Get the EnrollmentGroup from the request's session, or None""" 

135 

136 if agency(request): 

137 match agency(request).transit_processor: 

138 case "littlepay": 

139 group_model = LittlepayGroup 

140 case "switchio": 

141 group_model = SwitchioGroup 

142 case _: 

143 return None 

144 

145 try: 

146 return group_model.by_id(request.session[_GROUP]) 

147 except (KeyError, group_model.DoesNotExist): 

148 return None 

149 

150 return None 

151 

152 

153def language(request): 

154 """Get the language configured for the request.""" 

155 return request.LANGUAGE_CODE 

156 

157 

158def logged_in(request): 

159 """Get the user's status of having logged in with OAuth from the request's session, or None""" 

160 return bool(request.session.get(_LOGGED_IN)) 

161 

162 

163def logout(request): 

164 """Reset the session claims and tokens.""" 

165 LittlepaySession(request, reset=True) 

166 SwitchioSession(request, reset=True) 

167 OAuthSession(request, claims_result=ClaimsResult()) 

168 update(request, logged_in=False) 

169 

170 

171def oauth_extra_claims(request): 

172 """Get the extra oauth claims from the request's session, or None""" 

173 claims = [claim for claim, value in OAuthSession(request).claims_result.verified.items() if value] 

174 

175 if claims: 

176 f = flow(request) 

177 if f and f.uses_claims_verification: 

178 claims.remove(f.claims_request.eligibility_claim) 

179 return claims or None 

180 raise Exception("Oauth claims but no flow") 

181 else: 

182 return None 

183 

184 

185def origin(request): 

186 """Get the origin for the request's session, or default to the index route.""" 

187 return request.session.get(_ORIGIN, reverse(routes.INDEX)) 

188 

189 

190def reset(request): 

191 """Reset the session for the request.""" 

192 logger.debug("Reset session") 

193 request.session[_AGENCY] = None 

194 request.session[_FLOW] = None 

195 request.session[_GROUP] = None 

196 request.session[_ELIGIBLE] = False 

197 request.session[_ORIGIN] = reverse(routes.INDEX) 

198 request.session[_ENROLLMENT_EXP] = None 

199 request.session[_LOGGED_IN] = False 

200 LittlepaySession(request, reset=True) 

201 SwitchioSession(request, reset=True) 

202 OAuthSession(request, reset=True) 

203 

204 if _UID not in request.session or not request.session[_UID]: 

205 logger.debug("Reset session time and uid") 

206 request.session[_START] = int(time.time() * 1000) 

207 u = str(uuid.uuid4()) 

208 request.session[_UID] = u 

209 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32])) 

210 

211 

212def start(request): 

213 """ 

214 Get the start time from the request's session, as integer milliseconds since 

215 Epoch. If unset, the session is reset to initialize a value. 

216 

217 Once started, does not reset after subsequent calls to session.reset() or 

218 session.start(). This value is needed for Amplitude to accurately track 

219 sessions. 

220 

221 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions 

222 """ 

223 s = request.session.get(_START) 

224 if not s: 

225 reset(request) 

226 s = request.session.get(_START) 

227 return s 

228 

229 

230def uid(request): 

231 """ 

232 Get the session's unique ID, a randomly generated UUID4 string. If unset, 

233 the session is reset to initialize a value. 

234 

235 This value, like DID, is needed for Amplitude to accurately track that a 

236 sequence of events came from a unique user. 

237 

238 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

239 

240 Although Amplitude advises *against* setting user_id for anonymous users, 

241 here a value is set on anonymous users anyway, as the users never sign-in 

242 and become de-anonymized to this app / Amplitude. 

243 """ 

244 u = request.session.get(_UID) 

245 if not u: 

246 reset(request) 

247 u = request.session.get(_UID) 

248 return u 

249 

250 

251def update( 

252 request, 

253 agency=None, 

254 debug=None, 

255 flow=None, 

256 group=None, 

257 eligible=None, 

258 enrollment_expiry=None, 

259 logged_in=None, 

260 origin=None, 

261): 

262 """Update the request's session with non-null values.""" 

263 if agency is not None and isinstance(agency, models.TransitAgency): 

264 request.session[_AGENCY] = agency.id 

265 if debug is not None: 

266 request.session[_DEBUG] = debug 

267 if eligible is not None: 

268 request.session[_ELIGIBLE] = bool(eligible) 

269 if isinstance(enrollment_expiry, datetime): 

270 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None: 

271 # this is a naive datetime instance, update tzinfo for UTC 

272 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp 

273 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time. 

274 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX 

275 # > timestamp by supplying tzinfo=timezone.utc 

276 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc) 

277 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp() 

278 if logged_in is not None: 

279 request.session[_LOGGED_IN] = logged_in 

280 if origin is not None: 

281 request.session[_ORIGIN] = origin 

282 if flow is not None and isinstance(flow, models.EnrollmentFlow): 

283 request.session[_FLOW] = flow.id 

284 oauth_session = OAuthSession(request) 

285 oauth_session.client_config = flow.oauth_config 

286 oauth_session.claims_request = flow.claims_request 

287 if group is not None and isinstance(group, models.EnrollmentGroup): 

288 request.session[_GROUP] = group.id