Coverage for app_modules/rate_limit.py: 50%
137 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
1"""
2rate_limit.py
4Purpose:
5 Implements robust, database-backed login rate limiting and lockout to
6 mitigate credential stuffing and brute-force attacks. Tracks failed login
7 attempts per (username, ip) pair with a sliding window and enforces a
8 temporary lock after too many failures.
10How it works:
11 - On each login POST, `is_login_allowed(username, ip)` is called.
12 - If an active lock exists (locked_until > now), it returns (False, retry_s).
13 - On password failure, `record_login_failure(username, ip)`:
14 - Resets the window if last_failed_at is outside the window.
15 - Increments fail_count and, when threshold is reached, sets locked_until.
16 - On success, `record_login_success(username, ip)` resets counters.
18Defense-in-depth:
19 - Also enforces an IP-level limiter (`is_ip_allowed(ip)`) to avoid trivial
20 bypass by switching usernames. Both username+ip and ip-only checks must pass.
22Configuration (env vars):
23 - LOGIN_MAX_FAILS: max failures in the window before lock (default: 5)
24 - LOGIN_WINDOW_SECONDS: sliding window duration in seconds (default: 900)
25 - LOGIN_LOCK_SECONDS: lock duration after threshold exceeded (default: 900)
26"""
28from __future__ import annotations
30import os
31import time
32from typing import Tuple
34from .db import get_db
37def _now_ts() -> int:
38 return int(time.time())
41def _is_localhost_ip(ip: str) -> bool:
42 """Check if the IP address is localhost and should be exempt from rate limiting."""
43 if not ip:
44 return False
46 # Handle common localhost addresses
47 localhost_ips = {
48 '127.0.0.1', # IPv4 localhost
49 '::1', # IPv6 localhost
50 '0.0.0.0', # Sometimes used in tests
51 'localhost' # In case hostname is passed
52 }
54 if ip in localhost_ips:
55 return True
57 # Check for any IP in 127.x.x.x range
58 if ip.startswith('127.'):
59 return True
61 return False
64def _cfg_int(name: str, default: int) -> int:
65 try:
66 return int(os.getenv(name, str(default)))
67 except Exception:
68 return default
71LOGIN_MAX_FAILS = _cfg_int('LOGIN_MAX_FAILS', 10)
72LOGIN_WINDOW_SECONDS = _cfg_int('LOGIN_WINDOW_SECONDS', 300)
73LOGIN_LOCK_SECONDS = _cfg_int('LOGIN_LOCK_SECONDS', 300)
74LOCK_IP_ON_ACCOUNT_LOCK = os.getenv('LOGIN_LOCK_IP_ON_ACCOUNT_LOCK', '1') == '1'
77def _ensure_schema():
78 db = get_db()
79 db.execute(
80 """
81 CREATE TABLE IF NOT EXISTS login_attempts (
82 id INTEGER PRIMARY KEY AUTOINCREMENT,
83 username TEXT NOT NULL,
84 ip TEXT NOT NULL,
85 fail_count INTEGER NOT NULL DEFAULT 0,
86 last_failed_at INTEGER NOT NULL DEFAULT 0,
87 locked_until INTEGER NOT NULL DEFAULT 0,
88 UNIQUE(username, ip)
89 )
90 """
91 )
92 db.commit()
94 # IP-only attempts table
95 db.execute(
96 """
97 CREATE TABLE IF NOT EXISTS login_ip_attempts (
98 ip TEXT PRIMARY KEY,
99 fail_count INTEGER NOT NULL DEFAULT 0,
100 last_failed_at INTEGER NOT NULL DEFAULT 0,
101 locked_until INTEGER NOT NULL DEFAULT 0
102 )
103 """
104 )
105 db.commit()
108def _get_row(username: str, ip: str):
109 db = get_db()
110 cur = db.execute(
111 'SELECT fail_count, last_failed_at, locked_until FROM login_attempts WHERE username = ? AND ip = ?',
112 (username, ip),
113 )
114 return cur.fetchone()
117def _get_ip_row(ip: str):
118 db = get_db()
119 cur = db.execute(
120 'SELECT fail_count, last_failed_at, locked_until FROM login_ip_attempts WHERE ip = ?',
121 (ip,),
122 )
123 return cur.fetchone()
126def _upsert_row(username: str, ip: str, fail_count: int, last_failed_at: int, locked_until: int):
127 db = get_db()
128 db.execute(
129 """
130 INSERT INTO login_attempts (username, ip, fail_count, last_failed_at, locked_until)
131 VALUES (?, ?, ?, ?, ?)
132 ON CONFLICT(username, ip) DO UPDATE SET
133 fail_count=excluded.fail_count,
134 last_failed_at=excluded.last_failed_at,
135 locked_until=excluded.locked_until
136 """,
137 (username, ip, fail_count, last_failed_at, locked_until),
138 )
139 db.commit()
142def _upsert_ip_row(ip: str, fail_count: int, last_failed_at: int, locked_until: int):
143 db = get_db()
144 db.execute(
145 """
146 INSERT INTO login_ip_attempts (ip, fail_count, last_failed_at, locked_until)
147 VALUES (?, ?, ?, ?)
148 ON CONFLICT(ip) DO UPDATE SET
149 fail_count=excluded.fail_count,
150 last_failed_at=excluded.last_failed_at,
151 locked_until=excluded.locked_until
152 """,
153 (ip, fail_count, last_failed_at, locked_until),
154 )
155 db.commit()
158def is_login_allowed(username: str, ip: str) -> Tuple[bool, int]:
159 """Return (allowed, retry_after_seconds)."""
160 # Exempt localhost IPs from rate limiting (for testing and local development)
161 if _is_localhost_ip(ip):
162 return True, 0
164 _ensure_schema()
165 row = _get_row(username, ip)
166 now = _now_ts()
167 if row:
168 locked_until = int(row['locked_until'] or 0)
169 if locked_until > now:
170 return False, locked_until - now
171 # Extra guard: if window has not elapsed and failures at/over threshold, block
172 if int(row['fail_count'] or 0) >= LOGIN_MAX_FAILS and int(row['last_failed_at'] or 0) + LOGIN_WINDOW_SECONDS > now:
173 # Normalize: set a lock if missing
174 lock_until = now + LOGIN_LOCK_SECONDS
175 _upsert_row(username, ip, int(row['fail_count'] or 0), int(row['last_failed_at'] or 0), lock_until)
176 # Optionally escalate to IP-level lock
177 if LOCK_IP_ON_ACCOUNT_LOCK:
178 ip_row = _get_ip_row(ip)
179 existing = int((ip_row['locked_until'] if ip_row else 0) or 0)
180 if existing < lock_until:
181 _upsert_ip_row(ip, int((ip_row['fail_count'] if ip_row else 0) or 0), now, lock_until)
182 return False, LOGIN_LOCK_SECONDS
183 return True, 0
186def is_ip_allowed(ip: str) -> Tuple[bool, int]:
187 # Exempt localhost IPs from rate limiting (for testing and local development)
188 if _is_localhost_ip(ip):
189 return True, 0
191 _ensure_schema()
192 row = _get_ip_row(ip)
193 now = _now_ts()
194 if row:
195 locked_until = int(row['locked_until'] or 0)
196 if locked_until > now:
197 return False, locked_until - now
198 if int(row['fail_count'] or 0) >= LOGIN_MAX_FAILS and int(row['last_failed_at'] or 0) + LOGIN_WINDOW_SECONDS > now:
199 _upsert_ip_row(ip, int(row['fail_count'] or 0), int(row['last_failed_at'] or 0), now + LOGIN_LOCK_SECONDS)
200 return False, LOGIN_LOCK_SECONDS
201 return True, 0
204def record_ip_failure(ip: str) -> None:
205 """Increment only the IP-level counters. Used for attempts blocked before
206 credential verification to escalate to IP lock across usernames."""
207 # Don't record failures for localhost IPs
208 if _is_localhost_ip(ip):
209 return
211 _ensure_schema()
212 now = _now_ts()
213 row = _get_ip_row(ip)
214 if not row:
215 _upsert_ip_row(ip, 1, now, 0)
216 return
217 fail = int(row['fail_count'] or 0)
218 last = int(row['last_failed_at'] or 0)
219 lock = int(row['locked_until'] or 0)
220 if last + LOGIN_WINDOW_SECONDS < now:
221 fail = 0
222 fail += 1
223 last = now
224 if fail >= LOGIN_MAX_FAILS:
225 lock = now + LOGIN_LOCK_SECONDS
226 _upsert_ip_row(ip, fail, last, lock)
229def record_login_failure(username: str, ip: str) -> None:
230 # Don't record failures for localhost IPs
231 if _is_localhost_ip(ip):
232 return
234 _ensure_schema()
235 now = _now_ts()
236 row = _get_row(username, ip)
237 if not row:
238 _upsert_row(username, ip, 1, now, 0)
239 else:
240 fail_count = int(row['fail_count'] or 0)
241 last_failed_at = int(row['last_failed_at'] or 0)
242 locked_until = int(row['locked_until'] or 0)
244 # Reset window if outside
245 if last_failed_at + LOGIN_WINDOW_SECONDS < now:
246 fail_count = 0
248 fail_count += 1
249 last_failed_at = now
251 if fail_count >= LOGIN_MAX_FAILS:
252 locked_until = now + LOGIN_LOCK_SECONDS
254 _upsert_row(username, ip, fail_count, last_failed_at, locked_until)
256 # IP-only failure tracking
257 ip_row = _get_ip_row(ip)
258 if not ip_row:
259 _upsert_ip_row(ip, 1, now, 0)
260 return
262 ip_fail = int(ip_row['fail_count'] or 0)
263 ip_last = int(ip_row['last_failed_at'] or 0)
264 ip_lock = int(ip_row['locked_until'] or 0)
265 if ip_last + LOGIN_WINDOW_SECONDS < now:
266 ip_fail = 0
267 ip_fail += 1
268 ip_last = now
269 if ip_fail >= LOGIN_MAX_FAILS:
270 ip_lock = now + LOGIN_LOCK_SECONDS
271 _upsert_ip_row(ip, ip_fail, ip_last, ip_lock)
274def record_login_success(username: str, ip: str) -> None:
275 _ensure_schema()
276 db = get_db()
277 # Reset only the (username, ip) counters. Keep IP-level state to avoid
278 # trivial bypass by switching usernames after abuse.
279 db.execute('DELETE FROM login_attempts WHERE username = ? AND ip = ?', (username, ip))
280 db.commit()