Coverage for benefits/core/session.py: 99%

140 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-22 21:13 +0000

1""" 

2The core application: helpers to work with request sessions. 

3""" 

4 

5from datetime import datetime, timedelta, timezone 

6import hashlib 

7import logging 

8import time 

9import uuid 

10 

11from cdt_identity.claims import ClaimsResult 

12from cdt_identity.session import Session as OAuthSession 

13from django.urls import reverse 

14 

15from benefits.routes import routes 

16from . import models 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22_AGENCY = "agency" 

23_DEBUG = "debug" 

24_DID = "did" 

25_ELIGIBLE = "eligibility" 

26_ENROLLMENT_TOKEN = "enrollment_token" 

27_ENROLLMENT_TOKEN_EXP = "enrollment_token_expiry" 

28_ENROLLMENT_EXP = "enrollment_expiry" 

29_FLOW = "flow" 

30_LANG = "lang" 

31_LOGGED_IN = "logged_in" 

32_ORIGIN = "origin" 

33_START = "start" 

34_UID = "uid" 

35 

36 

37def agency(request): 

38 """Get the agency from the request's session, or None""" 

39 try: 

40 return models.TransitAgency.by_id(request.session[_AGENCY]) 

41 except (KeyError, models.TransitAgency.DoesNotExist): 

42 return None 

43 

44 

45def active_agency(request): 

46 """True if the request's session is configured with an active agency. False otherwise.""" 

47 a = agency(request) 

48 return a and a.active 

49 

50 

51def context_dict(request): 

52 """The request's session context as a dict.""" 

53 return { 

54 _AGENCY: agency(request).slug if active_agency(request) else None, 

55 _DEBUG: debug(request), 

56 _DID: did(request), 

57 _FLOW: flow(request), 

58 _ELIGIBLE: eligible(request), 

59 _ENROLLMENT_EXP: enrollment_expiry(request), 

60 _ENROLLMENT_TOKEN: enrollment_token(request), 

61 _ENROLLMENT_TOKEN_EXP: enrollment_token_expiry(request), 

62 _LANG: language(request), 

63 _LOGGED_IN: logged_in(request), 

64 _ORIGIN: origin(request), 

65 _START: start(request), 

66 _UID: uid(request), 

67 } 

68 

69 

70def debug(request): 

71 """Get the DEBUG flag from the request's session.""" 

72 return bool(request.session.get(_DEBUG, False)) 

73 

74 

75def did(request): 

76 """ 

77 Get the session's device ID, a hashed version of the unique ID. If unset, 

78 the session is reset to initialize a value. 

79 

80 This value, like UID, is randomly generated per session and is needed for 

81 Amplitude to accurately track that a sequence of events came from a unique 

82 user. 

83 

84 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

85 """ 

86 d = request.session.get(_DID) 

87 if not d: 

88 reset(request) 

89 d = request.session.get(_DID) 

90 return str(d) 

91 

92 

93def eligible(request): 

94 """True if the request's session has confirmed eligibility. False otherwise.""" 

95 return request.session.get(_ELIGIBLE) 

96 

97 

98def enrollment_expiry(request): 

99 """Get the expiry date for a user's enrollment from session, or None.""" 

100 expiry = request.session.get(_ENROLLMENT_EXP) 

101 if expiry: 

102 return datetime.fromtimestamp(expiry, tz=timezone.utc) 

103 else: 

104 return None 

105 

106 

107def enrollment_reenrollment(request): 

108 """Get the reenrollment date for a user's enrollment from session, or None.""" 

109 expiry = enrollment_expiry(request) 

110 enrollment_flow = flow(request) 

111 

112 if enrollment_flow and enrollment_flow.supports_expiration and expiry: 

113 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days) 

114 else: 

115 return None 

116 

117 

118def enrollment_token(request): 

119 """Get the enrollment token from the request's session, or None.""" 

120 return request.session.get(_ENROLLMENT_TOKEN) 

121 

122 

123def enrollment_token_expiry(request): 

124 """Get the enrollment token's expiry time from the request's session, or None.""" 

125 return request.session.get(_ENROLLMENT_TOKEN_EXP) 

126 

127 

128def enrollment_token_valid(request): 

129 """True if the request's session is configured with a valid token. False otherwise.""" 

130 if bool(enrollment_token(request)): 

131 exp = enrollment_token_expiry(request) 

132 # ensure token does not expire in the next 5 seconds 

133 valid = exp is None or exp > (time.time() + 5) 

134 return valid 

135 else: 

136 return False 

137 

138 

139def language(request): 

140 """Get the language configured for the request.""" 

141 return request.LANGUAGE_CODE 

142 

143 

144def logged_in(request): 

145 """Get the user's status of having logged in with OAuth from the request's session, or None""" 

146 return bool(request.session.get(_LOGGED_IN)) 

147 

148 

149def logout(request): 

150 """Reset the session claims and tokens.""" 

151 OAuthSession(request, claims_result=ClaimsResult()) 

152 update(request, logged_in=False, enrollment_token=False) 

153 

154 

155def oauth_extra_claims(request): 

156 """Get the extra oauth claims from the request's session, or None""" 

157 claims = [claim for claim, value in OAuthSession(request).claims_result.verified.items() if value] 

158 

159 if claims: 

160 f = flow(request) 

161 if f and f.uses_claims_verification: 

162 claims.remove(f.claims_request.eligibility_claim) 

163 return claims or None 

164 raise Exception("Oauth claims but no flow") 

165 else: 

166 return None 

167 

168 

169def origin(request): 

170 """Get the origin for the request's session, or default to the index route.""" 

171 return request.session.get(_ORIGIN, reverse(routes.INDEX)) 

172 

173 

174def reset(request): 

175 """Reset the session for the request.""" 

176 logger.debug("Reset session") 

177 request.session[_AGENCY] = None 

178 request.session[_FLOW] = None 

179 request.session[_ELIGIBLE] = False 

180 request.session[_ORIGIN] = reverse(routes.INDEX) 

181 request.session[_ENROLLMENT_EXP] = None 

182 request.session[_ENROLLMENT_TOKEN] = None 

183 request.session[_ENROLLMENT_TOKEN_EXP] = None 

184 request.session[_LOGGED_IN] = False 

185 OAuthSession(request, reset=True) 

186 

187 if _UID not in request.session or not request.session[_UID]: 

188 logger.debug("Reset session time and uid") 

189 request.session[_START] = int(time.time() * 1000) 

190 u = str(uuid.uuid4()) 

191 request.session[_UID] = u 

192 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32])) 

193 

194 

195def start(request): 

196 """ 

197 Get the start time from the request's session, as integer milliseconds since 

198 Epoch. If unset, the session is reset to initialize a value. 

199 

200 Once started, does not reset after subsequent calls to session.reset() or 

201 session.start(). This value is needed for Amplitude to accurately track 

202 sessions. 

203 

204 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions 

205 """ 

206 s = request.session.get(_START) 

207 if not s: 

208 reset(request) 

209 s = request.session.get(_START) 

210 return s 

211 

212 

213def uid(request): 

214 """ 

215 Get the session's unique ID, a randomly generated UUID4 string. If unset, 

216 the session is reset to initialize a value. 

217 

218 This value, like DID, is needed for Amplitude to accurately track that a 

219 sequence of events came from a unique user. 

220 

221 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

222 

223 Although Amplitude advises *against* setting user_id for anonymous users, 

224 here a value is set on anonymous users anyway, as the users never sign-in 

225 and become de-anonymized to this app / Amplitude. 

226 """ 

227 u = request.session.get(_UID) 

228 if not u: 

229 reset(request) 

230 u = request.session.get(_UID) 

231 return u 

232 

233 

234def update( 

235 request, 

236 agency=None, 

237 debug=None, 

238 flow=None, 

239 eligible=None, 

240 enrollment_expiry=None, 

241 enrollment_token=None, 

242 enrollment_token_exp=None, 

243 logged_in=None, 

244 origin=None, 

245): 

246 """Update the request's session with non-null values.""" 

247 if agency is not None and isinstance(agency, models.TransitAgency): 

248 request.session[_AGENCY] = agency.id 

249 if debug is not None: 

250 request.session[_DEBUG] = debug 

251 if eligible is not None: 

252 request.session[_ELIGIBLE] = bool(eligible) 

253 if isinstance(enrollment_expiry, datetime): 

254 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None: 

255 # this is a naive datetime instance, update tzinfo for UTC 

256 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp 

257 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time. 

258 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX 

259 # > timestamp by supplying tzinfo=timezone.utc 

260 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc) 

261 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp() 

262 if enrollment_token is not None: 

263 request.session[_ENROLLMENT_TOKEN] = enrollment_token 

264 request.session[_ENROLLMENT_TOKEN_EXP] = enrollment_token_exp 

265 if logged_in is not None: 

266 request.session[_LOGGED_IN] = logged_in 

267 if origin is not None: 

268 request.session[_ORIGIN] = origin 

269 if flow is not None and isinstance(flow, models.EnrollmentFlow): 

270 request.session[_FLOW] = flow.id 

271 oauth_session = OAuthSession(request) 

272 oauth_session.client_config = flow.oauth_config 

273 oauth_session.claims_request = flow.claims_request 

274 

275 

276def flow(request) -> models.EnrollmentFlow | None: 

277 """Get the EnrollmentFlow from the request's session, or None""" 

278 try: 

279 return models.EnrollmentFlow.by_id(request.session[_FLOW]) 

280 except (KeyError, models.EnrollmentFlow.DoesNotExist): 

281 return None