o
    Fý¤h¡$  ã                   @  sê   d Z ddlmZ ddlZddlZddlmZ ddlmZ d5d	d
„Z	d6dd„Z
d7dd„ZeddƒZeddƒZeddƒZe dd¡dkZdd„ Zd8dd„Zd9d d!„Zd:d%d&„Zd;d'd(„Zd<d*d+„Zd=d,d-„Zd>d/d0„Zd?d1d2„Zd?d3d4„ZdS )@a‰  
rate_limit.py

Purpose:
  Implements robust, database-backed login rate limiting and lockout to
  mitigate credential stuffing and brute-force attacks. Tracks failed login
  attempts per (username, ip) pair with a sliding window and enforces a
  temporary lock after too many failures.

How it works:
  - On each login POST, `is_login_allowed(username, ip)` is called.
    - If an active lock exists (locked_until > now), it returns (False, retry_s).
  - On password failure, `record_login_failure(username, ip)`:
    - Resets the window if last_failed_at is outside the window.
    - Increments fail_count and, when threshold is reached, sets locked_until.
  - On success, `record_login_success(username, ip)` resets counters.

Defense-in-depth:
  - Also enforces an IP-level limiter (`is_ip_allowed(ip)`) to avoid trivial
    bypass by switching usernames. Both username+ip and ip-only checks must pass.

Configuration (env vars):
  - LOGIN_MAX_FAILS: max failures in the window before lock (default: 5)
  - LOGIN_WINDOW_SECONDS: sliding window duration in seconds (default: 900)
  - LOGIN_LOCK_SECONDS: lock duration after threshold exceeded (default: 900)
é    )ÚannotationsN)ÚTupleé   )Úget_dbÚreturnÚintc                   C  s   t t ¡ ƒS ©N)r   Útime© r
   r
   ú'/var/www/html/app_modules/rate_limit.pyÚ_now_ts%   s   r   ÚipÚstrÚboolc                 C  s.   | sdS h d£}| |v rdS |   d¡rdS dS )zMCheck if the IP address is localhost and should be exempt from rate limiting.F>   ú::1ú0.0.0.0ú	127.0.0.1Ú	localhostTz127.)Ú
startswith)r   Úlocalhost_ipsr
   r
   r   Ú_is_localhost_ip)   s   
r   ÚnameÚdefaultc                 C  s.   zt t | t|ƒ¡ƒW S  ty   | Y S w r   )r   ÚosÚgetenvr   Ú	Exception)r   r   r
   r
   r   Ú_cfg_int@   s
   ÿr   ÚLOGIN_MAX_FAILSé
   ÚLOGIN_WINDOW_SECONDSi,  ÚLOGIN_LOCK_SECONDSÚLOGIN_LOCK_IP_ON_ACCOUNT_LOCKÚ1c                  C  s.   t ƒ } |  d¡ |  ¡  |  d¡ |  ¡  d S )Na{  
        CREATE TABLE IF NOT EXISTS login_attempts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT NOT NULL,
            ip TEXT NOT NULL,
            fail_count INTEGER NOT NULL DEFAULT 0,
            last_failed_at INTEGER NOT NULL DEFAULT 0,
            locked_until INTEGER NOT NULL DEFAULT 0,
            UNIQUE(username, ip)
        )
        a	  
        CREATE TABLE IF NOT EXISTS login_ip_attempts (
            ip TEXT PRIMARY KEY,
            fail_count INTEGER NOT NULL DEFAULT 0,
            last_failed_at INTEGER NOT NULL DEFAULT 0,
            locked_until INTEGER NOT NULL DEFAULT 0
        )
        ©r   ÚexecuteÚcommit)Údbr
   r
   r   Ú_ensure_schemaM   s   ÿÿ
r'   Úusernamec                 C  s   t ƒ }| d| |f¡}| ¡ S )NzaSELECT fail_count, last_failed_at, locked_until FROM login_attempts WHERE username = ? AND ip = ?©r   r$   Úfetchone)r(   r   r&   Úcurr
   r
   r   Ú_get_rowl   s   þr,   c                 C  s   t ƒ }| d| f¡}| ¡ S )NzSSELECT fail_count, last_failed_at, locked_until FROM login_ip_attempts WHERE ip = ?r)   )r   r&   r+   r
   r
   r   Ú_get_ip_rowu   s   þr-   Ú
fail_countÚlast_failed_atÚlocked_untilc                 C  s(   t ƒ }| d| ||||f¡ | ¡  d S )NaC  
        INSERT INTO login_attempts (username, ip, fail_count, last_failed_at, locked_until)
        VALUES (?, ?, ?, ?, ?)
        ON CONFLICT(username, ip) DO UPDATE SET
            fail_count=excluded.fail_count,
            last_failed_at=excluded.last_failed_at,
            locked_until=excluded.locked_until
        r#   )r(   r   r.   r/   r0   r&   r
   r
   r   Ú_upsert_row~   s   ÷r1   c                 C  s&   t ƒ }| d| |||f¡ | ¡  d S )Na/  
        INSERT INTO login_ip_attempts (ip, fail_count, last_failed_at, locked_until)
        VALUES (?, ?, ?, ?)
        ON CONFLICT(ip) DO UPDATE SET
            fail_count=excluded.fail_count,
            last_failed_at=excluded.last_failed_at,
            locked_until=excluded.locked_until
        r#   )r   r.   r/   r0   r&   r
   r
   r   Ú_upsert_ip_rowŽ   s   
÷r2   úTuple[bool, int]c                 C  s   t |ƒrdS tƒ  t| |ƒ}tƒ }|r~t|d pdƒ}||kr%d|| fS t|d p+dƒtkr~t|d p5dƒt |kr~|t }t| |t|d pHdƒt|d pOdƒ|ƒ t	rzt
|ƒ}t|r`|d ndpcdƒ}||k rzt|t|rr|d ndpudƒ||ƒ dtfS dS )z&Return (allowed, retry_after_seconds).©Tr   r0   r   Fr.   r/   )r   r'   r,   r   r   r   r   r    r1   ÚLOCK_IP_ON_ACCOUNT_LOCKr-   r2   )r(   r   ÚrowÚnowr0   Ú
lock_untilÚip_rowÚexistingr
   r
   r   Úis_login_allowedž   s&   
,("r;   c                 C  sª   t | ƒrdS tƒ  t| ƒ}tƒ }|rSt|d pdƒ}||kr$d|| fS t|d p*dƒtkrSt|d p4dƒt |krSt| t|d pBdƒt|d pIdƒ|t ƒ dtfS dS )Nr4   r0   r   Fr.   r/   )	r   r'   r-   r   r   r   r   r2   r    )r   r6   r7   r0   r
   r
   r   Úis_ip_allowedº   s   ,*r<   ÚNonec                 C  s¤   t | ƒrdS tƒ  tƒ }t| ƒ}|st| d|dƒ dS t|d p!dƒ}t|d p)dƒ}t|d p1dƒ}|t |k r;d}|d7 }|}|tkrI|t }t| |||ƒ dS )z‹Increment only the IP-level counters. Used for attempts blocked before
    credential verification to escalate to IP lock across usernames.Nr   r   r.   r/   r0   )	r   r'   r   r-   r2   r   r   r   r    )r   r7   r6   ÚfailÚlastÚlockr
   r
   r   Úrecord_ip_failureÌ   s$   rA   c                 C  s0  t |ƒrd S tƒ  tƒ }t| |ƒ}|st| |d|dƒ n6t|d p"dƒ}t|d p*dƒ}t|d p2dƒ}|t |k r<d}|d7 }|}|tkrJ|t }t| ||||ƒ t	|ƒ}|sat
|d|dƒ d S t|d pgdƒ}t|d podƒ}	t|d pwdƒ}
|	t |k rd}|d7 }|}	|tkr|t }
t
|||	|
ƒ d S )Nr   r   r.   r/   r0   )r   r'   r   r,   r1   r   r   r   r    r-   r2   )r(   r   r7   r6   r.   r/   r0   r9   Úip_failÚip_lastÚip_lockr
   r
   r   Úrecord_login_failureå   s>   
rE   c                 C  s(   t ƒ  tƒ }| d| |f¡ | ¡  d S )Nz8DELETE FROM login_attempts WHERE username = ? AND ip = ?)r'   r   r$   r%   )r(   r   r&   r
   r
   r   Úrecord_login_success  s   rF   )r   r   )r   r   r   r   )r   r   r   r   r   r   )r(   r   r   r   )r   r   )
r(   r   r   r   r.   r   r/   r   r0   r   )r   r   r.   r   r/   r   r0   r   )r(   r   r   r   r   r3   )r   r   r   r3   )r   r   r   r=   )r(   r   r   r   r   r=   )Ú__doc__Ú
__future__r   r   r	   Útypingr   r&   r   r   r   r   r   r   r    r   r5   r'   r,   r-   r1   r2   r;   r<   rA   rE   rF   r
   r
   r
   r   Ú<module>   s.    







	
	




-