Coverage for benefits/core/session.py: 98%

131 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-03 19:55 +0000

1""" 

2The core application: helpers to work with request sessions. 

3""" 

4 

5from datetime import datetime, timedelta, timezone 

6import hashlib 

7import logging 

8import time 

9import uuid 

10 

11from cdt_identity.claims import ClaimsResult 

12from cdt_identity.session import Session as OAuthSession 

13from django.urls import reverse 

14 

15from benefits.enrollment_littlepay.session import Session as LittlepaySession 

16from benefits.enrollment_switchio.session import Session as SwitchioSession 

17from benefits.routes import routes 

18from . import models 

19 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24_AGENCY = "agency" 

25_DEBUG = "debug" 

26_DID = "did" 

27_ELIGIBLE = "eligibility" 

28_ENROLLMENT_EXP = "enrollment_expiry" 

29_FLOW = "flow" 

30_LANG = "lang" 

31_LOGGED_IN = "logged_in" 

32_ORIGIN = "origin" 

33_START = "start" 

34_UID = "uid" 

35 

36 

37def agency(request): 

38 """Get the agency from the request's session, or None""" 

39 try: 

40 return models.TransitAgency.by_id(request.session[_AGENCY]) 

41 except (KeyError, models.TransitAgency.DoesNotExist): 

42 return None 

43 

44 

45def active_agency(request): 

46 """True if the request's session is configured with an active agency. False otherwise.""" 

47 a = agency(request) 

48 return a and a.active 

49 

50 

51def context_dict(request): 

52 """The request's session context as a dict.""" 

53 littlepay_session = LittlepaySession(request) 

54 switchio_session = SwitchioSession(request) 

55 return { 

56 _AGENCY: agency(request).slug if active_agency(request) else None, 

57 _DEBUG: debug(request), 

58 _DID: did(request), 

59 _FLOW: flow(request), 

60 _ELIGIBLE: eligible(request), 

61 _ENROLLMENT_EXP: enrollment_expiry(request), 

62 littlepay_session._keys_access_token: littlepay_session.access_token, 

63 littlepay_session._keys_access_token_expiry: littlepay_session.access_token_expiry, 

64 switchio_session._keys_registration_id: switchio_session.registration_id, 

65 _LANG: language(request), 

66 _LOGGED_IN: logged_in(request), 

67 _ORIGIN: origin(request), 

68 _START: start(request), 

69 _UID: uid(request), 

70 } 

71 

72 

73def debug(request): 

74 """Get the DEBUG flag from the request's session.""" 

75 return bool(request.session.get(_DEBUG, False)) 

76 

77 

78def did(request): 

79 """ 

80 Get the session's device ID, a hashed version of the unique ID. If unset, 

81 the session is reset to initialize a value. 

82 

83 This value, like UID, is randomly generated per session and is needed for 

84 Amplitude to accurately track that a sequence of events came from a unique 

85 user. 

86 

87 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

88 """ 

89 d = request.session.get(_DID) 

90 if not d: 

91 reset(request) 

92 d = request.session.get(_DID) 

93 return str(d) 

94 

95 

96def eligible(request): 

97 """True if the request's session has confirmed eligibility. False otherwise.""" 

98 return request.session.get(_ELIGIBLE) 

99 

100 

101def enrollment_expiry(request): 

102 """Get the expiry date for a user's enrollment from session, or None.""" 

103 expiry = request.session.get(_ENROLLMENT_EXP) 

104 if expiry: 

105 return datetime.fromtimestamp(expiry, tz=timezone.utc) 

106 else: 

107 return None 

108 

109 

110def enrollment_reenrollment(request): 

111 """Get the reenrollment date for a user's enrollment from session, or None.""" 

112 expiry = enrollment_expiry(request) 

113 enrollment_flow = flow(request) 

114 

115 if enrollment_flow and enrollment_flow.supports_expiration and expiry: 

116 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days) 

117 else: 

118 return None 

119 

120 

121def language(request): 

122 """Get the language configured for the request.""" 

123 return request.LANGUAGE_CODE 

124 

125 

126def logged_in(request): 

127 """Get the user's status of having logged in with OAuth from the request's session, or None""" 

128 return bool(request.session.get(_LOGGED_IN)) 

129 

130 

131def logout(request): 

132 """Reset the session claims and tokens.""" 

133 LittlepaySession(request, reset=True) 

134 SwitchioSession(request, reset=True) 

135 OAuthSession(request, claims_result=ClaimsResult()) 

136 update(request, logged_in=False) 

137 

138 

139def oauth_extra_claims(request): 

140 """Get the extra oauth claims from the request's session, or None""" 

141 claims = [claim for claim, value in OAuthSession(request).claims_result.verified.items() if value] 

142 

143 if claims: 

144 f = flow(request) 

145 if f and f.uses_claims_verification: 

146 claims.remove(f.claims_request.eligibility_claim) 

147 return claims or None 

148 raise Exception("Oauth claims but no flow") 

149 else: 

150 return None 

151 

152 

153def origin(request): 

154 """Get the origin for the request's session, or default to the index route.""" 

155 return request.session.get(_ORIGIN, reverse(routes.INDEX)) 

156 

157 

158def reset(request): 

159 """Reset the session for the request.""" 

160 logger.debug("Reset session") 

161 request.session[_AGENCY] = None 

162 request.session[_FLOW] = None 

163 request.session[_ELIGIBLE] = False 

164 request.session[_ORIGIN] = reverse(routes.INDEX) 

165 request.session[_ENROLLMENT_EXP] = None 

166 request.session[_LOGGED_IN] = False 

167 LittlepaySession(request, reset=True) 

168 SwitchioSession(request, reset=True) 

169 OAuthSession(request, reset=True) 

170 

171 if _UID not in request.session or not request.session[_UID]: 

172 logger.debug("Reset session time and uid") 

173 request.session[_START] = int(time.time() * 1000) 

174 u = str(uuid.uuid4()) 

175 request.session[_UID] = u 

176 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32])) 

177 

178 

179def start(request): 

180 """ 

181 Get the start time from the request's session, as integer milliseconds since 

182 Epoch. If unset, the session is reset to initialize a value. 

183 

184 Once started, does not reset after subsequent calls to session.reset() or 

185 session.start(). This value is needed for Amplitude to accurately track 

186 sessions. 

187 

188 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions 

189 """ 

190 s = request.session.get(_START) 

191 if not s: 

192 reset(request) 

193 s = request.session.get(_START) 

194 return s 

195 

196 

197def uid(request): 

198 """ 

199 Get the session's unique ID, a randomly generated UUID4 string. If unset, 

200 the session is reset to initialize a value. 

201 

202 This value, like DID, is needed for Amplitude to accurately track that a 

203 sequence of events came from a unique user. 

204 

205 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

206 

207 Although Amplitude advises *against* setting user_id for anonymous users, 

208 here a value is set on anonymous users anyway, as the users never sign-in 

209 and become de-anonymized to this app / Amplitude. 

210 """ 

211 u = request.session.get(_UID) 

212 if not u: 

213 reset(request) 

214 u = request.session.get(_UID) 

215 return u 

216 

217 

218def update( 

219 request, 

220 agency=None, 

221 debug=None, 

222 flow=None, 

223 eligible=None, 

224 enrollment_expiry=None, 

225 logged_in=None, 

226 origin=None, 

227): 

228 """Update the request's session with non-null values.""" 

229 if agency is not None and isinstance(agency, models.TransitAgency): 

230 request.session[_AGENCY] = agency.id 

231 if debug is not None: 

232 request.session[_DEBUG] = debug 

233 if eligible is not None: 

234 request.session[_ELIGIBLE] = bool(eligible) 

235 if isinstance(enrollment_expiry, datetime): 

236 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None: 

237 # this is a naive datetime instance, update tzinfo for UTC 

238 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp 

239 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time. 

240 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX 

241 # > timestamp by supplying tzinfo=timezone.utc 

242 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc) 

243 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp() 

244 if logged_in is not None: 

245 request.session[_LOGGED_IN] = logged_in 

246 if origin is not None: 

247 request.session[_ORIGIN] = origin 

248 if flow is not None and isinstance(flow, models.EnrollmentFlow): 

249 request.session[_FLOW] = flow.id 

250 oauth_session = OAuthSession(request) 

251 oauth_session.client_config = flow.oauth_config 

252 oauth_session.claims_request = flow.claims_request 

253 

254 

255def flow(request) -> models.EnrollmentFlow | None: 

256 """Get the EnrollmentFlow from the request's session, or None""" 

257 try: 

258 return models.EnrollmentFlow.by_id(request.session[_FLOW]) 

259 except (KeyError, models.EnrollmentFlow.DoesNotExist): 

260 return None