Coverage for app_modules/rate_limit.py: 50%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2rate_limit.py 

3 

4Purpose: 

5 Implements robust, database-backed login rate limiting and lockout to 

6 mitigate credential stuffing and brute-force attacks. Tracks failed login 

7 attempts per (username, ip) pair with a sliding window and enforces a 

8 temporary lock after too many failures. 

9 

10How it works: 

11 - On each login POST, `is_login_allowed(username, ip)` is called. 

12 - If an active lock exists (locked_until > now), it returns (False, retry_s). 

13 - On password failure, `record_login_failure(username, ip)`: 

14 - Resets the window if last_failed_at is outside the window. 

15 - Increments fail_count and, when threshold is reached, sets locked_until. 

16 - On success, `record_login_success(username, ip)` resets counters. 

17 

18Defense-in-depth: 

19 - Also enforces an IP-level limiter (`is_ip_allowed(ip)`) to avoid trivial 

20 bypass by switching usernames. Both username+ip and ip-only checks must pass. 

21 

22Configuration (env vars): 

23 - LOGIN_MAX_FAILS: max failures in the window before lock (default: 5) 

24 - LOGIN_WINDOW_SECONDS: sliding window duration in seconds (default: 900) 

25 - LOGIN_LOCK_SECONDS: lock duration after threshold exceeded (default: 900) 

26""" 

27 

28from __future__ import annotations 

29 

30import os 

31import time 

32from typing import Tuple 

33 

34from .db import get_db 

35 

36 

37def _now_ts() -> int: 

38 return int(time.time()) 

39 

40 

41def _is_localhost_ip(ip: str) -> bool: 

42 """Check if the IP address is localhost and should be exempt from rate limiting.""" 

43 if not ip: 

44 return False 

45 

46 # Handle common localhost addresses 

47 localhost_ips = { 

48 '127.0.0.1', # IPv4 localhost 

49 '::1', # IPv6 localhost 

50 '0.0.0.0', # Sometimes used in tests 

51 'localhost' # In case hostname is passed 

52 } 

53 

54 if ip in localhost_ips: 

55 return True 

56 

57 # Check for any IP in 127.x.x.x range 

58 if ip.startswith('127.'): 

59 return True 

60 

61 return False 

62 

63 

64def _cfg_int(name: str, default: int) -> int: 

65 try: 

66 return int(os.getenv(name, str(default))) 

67 except Exception: 

68 return default 

69 

70 

71LOGIN_MAX_FAILS = _cfg_int('LOGIN_MAX_FAILS', 10) 

72LOGIN_WINDOW_SECONDS = _cfg_int('LOGIN_WINDOW_SECONDS', 300) 

73LOGIN_LOCK_SECONDS = _cfg_int('LOGIN_LOCK_SECONDS', 300) 

74LOCK_IP_ON_ACCOUNT_LOCK = os.getenv('LOGIN_LOCK_IP_ON_ACCOUNT_LOCK', '1') == '1' 

75 

76 

77def _ensure_schema(): 

78 db = get_db() 

79 db.execute( 

80 """ 

81 CREATE TABLE IF NOT EXISTS login_attempts ( 

82 id INTEGER PRIMARY KEY AUTOINCREMENT, 

83 username TEXT NOT NULL, 

84 ip TEXT NOT NULL, 

85 fail_count INTEGER NOT NULL DEFAULT 0, 

86 last_failed_at INTEGER NOT NULL DEFAULT 0, 

87 locked_until INTEGER NOT NULL DEFAULT 0, 

88 UNIQUE(username, ip) 

89 ) 

90 """ 

91 ) 

92 db.commit() 

93 

94 # IP-only attempts table 

95 db.execute( 

96 """ 

97 CREATE TABLE IF NOT EXISTS login_ip_attempts ( 

98 ip TEXT PRIMARY KEY, 

99 fail_count INTEGER NOT NULL DEFAULT 0, 

100 last_failed_at INTEGER NOT NULL DEFAULT 0, 

101 locked_until INTEGER NOT NULL DEFAULT 0 

102 ) 

103 """ 

104 ) 

105 db.commit() 

106 

107 

108def _get_row(username: str, ip: str): 

109 db = get_db() 

110 cur = db.execute( 

111 'SELECT fail_count, last_failed_at, locked_until FROM login_attempts WHERE username = ? AND ip = ?', 

112 (username, ip), 

113 ) 

114 return cur.fetchone() 

115 

116 

117def _get_ip_row(ip: str): 

118 db = get_db() 

119 cur = db.execute( 

120 'SELECT fail_count, last_failed_at, locked_until FROM login_ip_attempts WHERE ip = ?', 

121 (ip,), 

122 ) 

123 return cur.fetchone() 

124 

125 

126def _upsert_row(username: str, ip: str, fail_count: int, last_failed_at: int, locked_until: int): 

127 db = get_db() 

128 db.execute( 

129 """ 

130 INSERT INTO login_attempts (username, ip, fail_count, last_failed_at, locked_until) 

131 VALUES (?, ?, ?, ?, ?) 

132 ON CONFLICT(username, ip) DO UPDATE SET 

133 fail_count=excluded.fail_count, 

134 last_failed_at=excluded.last_failed_at, 

135 locked_until=excluded.locked_until 

136 """, 

137 (username, ip, fail_count, last_failed_at, locked_until), 

138 ) 

139 db.commit() 

140 

141 

142def _upsert_ip_row(ip: str, fail_count: int, last_failed_at: int, locked_until: int): 

143 db = get_db() 

144 db.execute( 

145 """ 

146 INSERT INTO login_ip_attempts (ip, fail_count, last_failed_at, locked_until) 

147 VALUES (?, ?, ?, ?) 

148 ON CONFLICT(ip) DO UPDATE SET 

149 fail_count=excluded.fail_count, 

150 last_failed_at=excluded.last_failed_at, 

151 locked_until=excluded.locked_until 

152 """, 

153 (ip, fail_count, last_failed_at, locked_until), 

154 ) 

155 db.commit() 

156 

157 

158def is_login_allowed(username: str, ip: str) -> Tuple[bool, int]: 

159 """Return (allowed, retry_after_seconds).""" 

160 # Exempt localhost IPs from rate limiting (for testing and local development) 

161 if _is_localhost_ip(ip): 

162 return True, 0 

163 

164 _ensure_schema() 

165 row = _get_row(username, ip) 

166 now = _now_ts() 

167 if row: 

168 locked_until = int(row['locked_until'] or 0) 

169 if locked_until > now: 

170 return False, locked_until - now 

171 # Extra guard: if window has not elapsed and failures at/over threshold, block 

172 if int(row['fail_count'] or 0) >= LOGIN_MAX_FAILS and int(row['last_failed_at'] or 0) + LOGIN_WINDOW_SECONDS > now: 

173 # Normalize: set a lock if missing 

174 lock_until = now + LOGIN_LOCK_SECONDS 

175 _upsert_row(username, ip, int(row['fail_count'] or 0), int(row['last_failed_at'] or 0), lock_until) 

176 # Optionally escalate to IP-level lock 

177 if LOCK_IP_ON_ACCOUNT_LOCK: 

178 ip_row = _get_ip_row(ip) 

179 existing = int((ip_row['locked_until'] if ip_row else 0) or 0) 

180 if existing < lock_until: 

181 _upsert_ip_row(ip, int((ip_row['fail_count'] if ip_row else 0) or 0), now, lock_until) 

182 return False, LOGIN_LOCK_SECONDS 

183 return True, 0 

184 

185 

186def is_ip_allowed(ip: str) -> Tuple[bool, int]: 

187 # Exempt localhost IPs from rate limiting (for testing and local development) 

188 if _is_localhost_ip(ip): 

189 return True, 0 

190 

191 _ensure_schema() 

192 row = _get_ip_row(ip) 

193 now = _now_ts() 

194 if row: 

195 locked_until = int(row['locked_until'] or 0) 

196 if locked_until > now: 

197 return False, locked_until - now 

198 if int(row['fail_count'] or 0) >= LOGIN_MAX_FAILS and int(row['last_failed_at'] or 0) + LOGIN_WINDOW_SECONDS > now: 

199 _upsert_ip_row(ip, int(row['fail_count'] or 0), int(row['last_failed_at'] or 0), now + LOGIN_LOCK_SECONDS) 

200 return False, LOGIN_LOCK_SECONDS 

201 return True, 0 

202 

203 

204def record_ip_failure(ip: str) -> None: 

205 """Increment only the IP-level counters. Used for attempts blocked before 

206 credential verification to escalate to IP lock across usernames.""" 

207 # Don't record failures for localhost IPs 

208 if _is_localhost_ip(ip): 

209 return 

210 

211 _ensure_schema() 

212 now = _now_ts() 

213 row = _get_ip_row(ip) 

214 if not row: 

215 _upsert_ip_row(ip, 1, now, 0) 

216 return 

217 fail = int(row['fail_count'] or 0) 

218 last = int(row['last_failed_at'] or 0) 

219 lock = int(row['locked_until'] or 0) 

220 if last + LOGIN_WINDOW_SECONDS < now: 

221 fail = 0 

222 fail += 1 

223 last = now 

224 if fail >= LOGIN_MAX_FAILS: 

225 lock = now + LOGIN_LOCK_SECONDS 

226 _upsert_ip_row(ip, fail, last, lock) 

227 

228 

229def record_login_failure(username: str, ip: str) -> None: 

230 # Don't record failures for localhost IPs 

231 if _is_localhost_ip(ip): 

232 return 

233 

234 _ensure_schema() 

235 now = _now_ts() 

236 row = _get_row(username, ip) 

237 if not row: 

238 _upsert_row(username, ip, 1, now, 0) 

239 else: 

240 fail_count = int(row['fail_count'] or 0) 

241 last_failed_at = int(row['last_failed_at'] or 0) 

242 locked_until = int(row['locked_until'] or 0) 

243 

244 # Reset window if outside 

245 if last_failed_at + LOGIN_WINDOW_SECONDS < now: 

246 fail_count = 0 

247 

248 fail_count += 1 

249 last_failed_at = now 

250 

251 if fail_count >= LOGIN_MAX_FAILS: 

252 locked_until = now + LOGIN_LOCK_SECONDS 

253 

254 _upsert_row(username, ip, fail_count, last_failed_at, locked_until) 

255 

256 # IP-only failure tracking 

257 ip_row = _get_ip_row(ip) 

258 if not ip_row: 

259 _upsert_ip_row(ip, 1, now, 0) 

260 return 

261 

262 ip_fail = int(ip_row['fail_count'] or 0) 

263 ip_last = int(ip_row['last_failed_at'] or 0) 

264 ip_lock = int(ip_row['locked_until'] or 0) 

265 if ip_last + LOGIN_WINDOW_SECONDS < now: 

266 ip_fail = 0 

267 ip_fail += 1 

268 ip_last = now 

269 if ip_fail >= LOGIN_MAX_FAILS: 

270 ip_lock = now + LOGIN_LOCK_SECONDS 

271 _upsert_ip_row(ip, ip_fail, ip_last, ip_lock) 

272 

273 

274def record_login_success(username: str, ip: str) -> None: 

275 _ensure_schema() 

276 db = get_db() 

277 # Reset only the (username, ip) counters. Keep IP-level state to avoid 

278 # trivial bypass by switching usernames after abuse. 

279 db.execute('DELETE FROM login_attempts WHERE username = ? AND ip = ?', (username, ip)) 

280 db.commit() 

281 

282