Coverage for benefits/core/session.py: 99%

141 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-19 16:31 +0000

1""" 

2The core application: helpers to work with request sessions. 

3""" 

4 

5from datetime import datetime, timedelta, timezone 

6import hashlib 

7import logging 

8import time 

9import uuid 

10 

11from django.urls import reverse 

12 

13from benefits.routes import routes 

14from . import models 

15 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20_AGENCY = "agency" 

21_DEBUG = "debug" 

22_DID = "did" 

23_ELIGIBLE = "eligibility" 

24_ENROLLMENT_TOKEN = "enrollment_token" 

25_ENROLLMENT_TOKEN_EXP = "enrollment_token_expiry" 

26_ENROLLMENT_EXP = "enrollment_expiry" 

27_FLOW = "flow" 

28_LANG = "lang" 

29_OAUTH_CLAIMS = "oauth_claims" 

30_OAUTH_TOKEN = "oauth_token" 

31_ORIGIN = "origin" 

32_START = "start" 

33_UID = "uid" 

34 

35 

36def agency(request): 

37 """Get the agency from the request's session, or None""" 

38 try: 

39 return models.TransitAgency.by_id(request.session[_AGENCY]) 

40 except (KeyError, models.TransitAgency.DoesNotExist): 

41 return None 

42 

43 

44def active_agency(request): 

45 """True if the request's session is configured with an active agency. False otherwise.""" 

46 a = agency(request) 

47 return a and a.active 

48 

49 

50def context_dict(request): 

51 """The request's session context as a dict.""" 

52 return { 

53 _AGENCY: agency(request).slug if active_agency(request) else None, 

54 _DEBUG: debug(request), 

55 _DID: did(request), 

56 _FLOW: flow(request), 

57 _ELIGIBLE: eligible(request), 

58 _ENROLLMENT_EXP: enrollment_expiry(request), 

59 _ENROLLMENT_TOKEN: enrollment_token(request), 

60 _ENROLLMENT_TOKEN_EXP: enrollment_token_expiry(request), 

61 _LANG: language(request), 

62 _OAUTH_TOKEN: oauth_token(request), 

63 _OAUTH_CLAIMS: oauth_claims(request), 

64 _ORIGIN: origin(request), 

65 _START: start(request), 

66 _UID: uid(request), 

67 } 

68 

69 

70def debug(request): 

71 """Get the DEBUG flag from the request's session.""" 

72 return bool(request.session.get(_DEBUG, False)) 

73 

74 

75def did(request): 

76 """ 

77 Get the session's device ID, a hashed version of the unique ID. If unset, 

78 the session is reset to initialize a value. 

79 

80 This value, like UID, is randomly generated per session and is needed for 

81 Amplitude to accurately track that a sequence of events came from a unique 

82 user. 

83 

84 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

85 """ 

86 d = request.session.get(_DID) 

87 if not d: 

88 reset(request) 

89 d = request.session.get(_DID) 

90 return str(d) 

91 

92 

93def eligible(request): 

94 """True if the request's session has confirmed eligibility. False otherwise.""" 

95 return request.session.get(_ELIGIBLE) 

96 

97 

98def enrollment_expiry(request): 

99 """Get the expiry date for a user's enrollment from session, or None.""" 

100 expiry = request.session.get(_ENROLLMENT_EXP) 

101 if expiry: 

102 return datetime.fromtimestamp(expiry, tz=timezone.utc) 

103 else: 

104 return None 

105 

106 

107def enrollment_reenrollment(request): 

108 """Get the reenrollment date for a user's enrollment from session, or None.""" 

109 expiry = enrollment_expiry(request) 

110 enrollment_flow = flow(request) 

111 

112 if enrollment_flow and enrollment_flow.supports_expiration and expiry: 

113 return expiry - timedelta(days=enrollment_flow.expiration_reenrollment_days) 

114 else: 

115 return None 

116 

117 

118def enrollment_token(request): 

119 """Get the enrollment token from the request's session, or None.""" 

120 return request.session.get(_ENROLLMENT_TOKEN) 

121 

122 

123def enrollment_token_expiry(request): 

124 """Get the enrollment token's expiry time from the request's session, or None.""" 

125 return request.session.get(_ENROLLMENT_TOKEN_EXP) 

126 

127 

128def enrollment_token_valid(request): 

129 """True if the request's session is configured with a valid token. False otherwise.""" 

130 if bool(enrollment_token(request)): 

131 exp = enrollment_token_expiry(request) 

132 # ensure token does not expire in the next 5 seconds 

133 valid = exp is None or exp > (time.time() + 5) 

134 return valid 

135 else: 

136 return False 

137 

138 

139def language(request): 

140 """Get the language configured for the request.""" 

141 return request.LANGUAGE_CODE 

142 

143 

144def logged_in(request): 

145 """Check if the current session has an OAuth token.""" 

146 return bool(oauth_token(request)) 

147 

148 

149def logout(request): 

150 """Reset the session claims and tokens.""" 

151 update(request, oauth_claims=[], oauth_token=False, enrollment_token=False) 

152 

153 

154def oauth_token(request): 

155 """Get the oauth token from the request's session, or None""" 

156 return request.session.get(_OAUTH_TOKEN) 

157 

158 

159def oauth_claims(request): 

160 """Get the oauth claims from the request's session, or None""" 

161 return request.session.get(_OAUTH_CLAIMS) 

162 

163 

164def oauth_extra_claims(request): 

165 """Get the extra oauth claims from the request's session, or None""" 

166 claims = oauth_claims(request) 

167 if claims: 

168 f = flow(request) 

169 if f and f.uses_claims_verification: 

170 claims.remove(f.claims_eligibility_claim) 

171 return claims 

172 raise Exception("Oauth claims but no flow") 

173 else: 

174 return None 

175 

176 

177def origin(request): 

178 """Get the origin for the request's session, or default to the index route.""" 

179 return request.session.get(_ORIGIN, reverse(routes.INDEX)) 

180 

181 

182def reset(request): 

183 """Reset the session for the request.""" 

184 logger.debug("Reset session") 

185 request.session[_AGENCY] = None 

186 request.session[_FLOW] = None 

187 request.session[_ELIGIBLE] = False 

188 request.session[_ORIGIN] = reverse(routes.INDEX) 

189 request.session[_ENROLLMENT_EXP] = None 

190 request.session[_ENROLLMENT_TOKEN] = None 

191 request.session[_ENROLLMENT_TOKEN_EXP] = None 

192 request.session[_OAUTH_TOKEN] = None 

193 request.session[_OAUTH_CLAIMS] = None 

194 

195 if _UID not in request.session or not request.session[_UID]: 

196 logger.debug("Reset session time and uid") 

197 request.session[_START] = int(time.time() * 1000) 

198 u = str(uuid.uuid4()) 

199 request.session[_UID] = u 

200 request.session[_DID] = str(uuid.UUID(hashlib.sha512(bytes(u, "utf8")).hexdigest()[:32])) 

201 

202 

203def start(request): 

204 """ 

205 Get the start time from the request's session, as integer milliseconds since 

206 Epoch. If unset, the session is reset to initialize a value. 

207 

208 Once started, does not reset after subsequent calls to session.reset() or 

209 session.start(). This value is needed for Amplitude to accurately track 

210 sessions. 

211 

212 See more: https://help.amplitude.com/hc/en-us/articles/115002323627-Tracking-Sessions 

213 """ 

214 s = request.session.get(_START) 

215 if not s: 

216 reset(request) 

217 s = request.session.get(_START) 

218 return s 

219 

220 

221def uid(request): 

222 """ 

223 Get the session's unique ID, a randomly generated UUID4 string. If unset, 

224 the session is reset to initialize a value. 

225 

226 This value, like DID, is needed for Amplitude to accurately track that a 

227 sequence of events came from a unique user. 

228 

229 See more: https://help.amplitude.com/hc/en-us/articles/115003135607-Track-unique-users-in-Amplitude 

230 

231 Although Amplitude advises *against* setting user_id for anonymous users, 

232 here a value is set on anonymous users anyway, as the users never sign-in 

233 and become de-anonymized to this app / Amplitude. 

234 """ 

235 u = request.session.get(_UID) 

236 if not u: 

237 reset(request) 

238 u = request.session.get(_UID) 

239 return u 

240 

241 

242def update( 

243 request, 

244 agency=None, 

245 debug=None, 

246 flow=None, 

247 eligible=None, 

248 enrollment_expiry=None, 

249 enrollment_token=None, 

250 enrollment_token_exp=None, 

251 oauth_token=None, 

252 oauth_claims=None, 

253 origin=None, 

254): 

255 """Update the request's session with non-null values.""" 

256 if agency is not None and isinstance(agency, models.TransitAgency): 

257 request.session[_AGENCY] = agency.id 

258 if debug is not None: 

259 request.session[_DEBUG] = debug 

260 if eligible is not None: 

261 request.session[_ELIGIBLE] = bool(eligible) 

262 if isinstance(enrollment_expiry, datetime): 

263 if enrollment_expiry.tzinfo is None or enrollment_expiry.tzinfo.utcoffset(enrollment_expiry) is None: 

264 # this is a naive datetime instance, update tzinfo for UTC 

265 # see notes under https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp 

266 # > There is no method to obtain the POSIX timestamp directly from a naive datetime instance representing UTC time. 

267 # > If your application uses this convention and your system timezone is not set to UTC, you can obtain the POSIX 

268 # > timestamp by supplying tzinfo=timezone.utc 

269 enrollment_expiry = enrollment_expiry.replace(tzinfo=timezone.utc) 

270 request.session[_ENROLLMENT_EXP] = enrollment_expiry.timestamp() 

271 if enrollment_token is not None: 

272 request.session[_ENROLLMENT_TOKEN] = enrollment_token 

273 request.session[_ENROLLMENT_TOKEN_EXP] = enrollment_token_exp 

274 if oauth_token is not None: 

275 request.session[_OAUTH_TOKEN] = oauth_token 

276 if oauth_claims is not None: 

277 request.session[_OAUTH_CLAIMS] = oauth_claims 

278 if origin is not None: 

279 request.session[_ORIGIN] = origin 

280 if flow is not None and isinstance(flow, models.EnrollmentFlow): 

281 request.session[_FLOW] = flow.id 

282 

283 

284def flow(request) -> models.EnrollmentFlow | None: 

285 """Get the EnrollmentFlow from the request's session, or None""" 

286 try: 

287 return models.EnrollmentFlow.by_id(request.session[_FLOW]) 

288 except (KeyError, models.EnrollmentFlow.DoesNotExist): 

289 return None