Coverage for app_modules/security_enhancements.py: 96%
109 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
1"""
2security_enhancements.py
4Purpose:
5 Advanced security enhancement functions that implement protection against sophisticated
6 attack vectors identified during red-team security auditing. This module provides
7 timing attack protection, input validation hardening, and security-focused error
8 handling to maintain the highest security standards across the application.
10Security Enhancements:
11 - Timing attack resistant authentication checks (username enumeration prevention)
12 - Geographic coordinate validation with proper bounds checking
13 - Security-focused error message sanitization to prevent information disclosure
14 - Enhanced input validation for all user-controllable parameters
15 - Cryptographic helper functions for secure token generation and validation
17Implementation Philosophy:
18 All functions follow defense-in-depth principles with multiple validation layers.
19 Error handling is designed to prevent information leakage while maintaining
20 system functionality. Security measures are transparent to legitimate users
21 but effectively block malicious attempts.
22"""
24import time
25import bcrypt
26import secrets
27import hashlib
28import hmac
29from typing import Tuple, Optional, Dict, Any
30from functools import wraps
31from flask import current_app
34def secure_password_check(username: str, password: str, user_row: Optional[Dict]) -> bool:
35 """
36 Timing-attack resistant password verification.
38 Prevents username enumeration by ensuring consistent timing regardless of whether
39 the username exists in the database. Always performs bcrypt operation to maintain
40 constant time complexity.
42 Args:
43 username: Username being authenticated
44 password: Password to verify
45 user_row: User database row (None if user doesn't exist)
47 Returns:
48 bool: True if authentication successful, False otherwise
50 Security Features:
51 - Constant time execution regardless of username validity
52 - Secure password hashing verification
53 - No information leakage through timing differences
54 """
55 # Dummy hash for timing consistency when user doesn't exist
56 # This ensures bcrypt operation always runs, preventing timing attacks
57 dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewiyTuTpbsEf.ug."
59 if user_row and user_row['password_hash']:
60 # User exists - verify actual password
61 target_hash = user_row['password_hash'].encode('utf-8')
62 result = bcrypt.checkpw(password.encode('utf-8'), target_hash)
63 else:
64 # User doesn't exist - perform dummy bcrypt to maintain timing
65 bcrypt.checkpw(password.encode('utf-8'), dummy_hash.encode('utf-8'))
66 result = False
68 return result
71def validate_geographic_coordinates(lat: Any, lng: Any) -> Tuple[bool, Optional[str], Optional[Tuple[float, float]]]:
72 """
73 Validate geographic coordinates with proper bounds checking.
75 Prevents injection of invalid coordinate data that could pollute the database
76 or cause application errors. Enforces strict geographic bounds and type validation.
78 Args:
79 lat: Latitude value (any type, will be validated)
80 lng: Longitude value (any type, will be validated)
82 Returns:
83 Tuple[bool, Optional[str], Optional[Tuple[float, float]]]:
84 - success: Whether validation passed
85 - error_message: Error description if validation failed
86 - coordinates: Validated (lat, lng) tuple if successful
88 Security Features:
89 - Strict type validation prevents injection attacks
90 - Geographic bounds enforcement prevents invalid data
91 - Sanitized error messages prevent information disclosure
92 """
93 try:
94 # Type validation and conversion
95 lat_float = float(lat)
96 lng_float = float(lng)
98 # Geographic bounds validation
99 if not (-90.0 <= lat_float <= 90.0):
100 return False, "Neispravna geografska širina.", None
102 if not (-180.0 <= lng_float <= 180.0):
103 return False, "Neispravna geografska dužina.", None
105 # Additional sanity checks
106 if lat_float != lat_float or lng_float != lng_float: # NaN check
107 return False, "Neispravne koordinate.", None
109 return True, None, (lat_float, lng_float)
111 except (ValueError, TypeError, OverflowError):
112 return False, "Neispravne koordinate.", None
115def sanitize_error_message(error: Exception, context: str = "operation") -> str:
116 """
117 Sanitize error messages to prevent information disclosure.
119 Converts detailed system errors into generic user-friendly messages that don't
120 reveal sensitive system information or internal application structure.
122 Args:
123 error: The original exception
124 context: Context of the operation for logging purposes
126 Returns:
127 str: Sanitized error message safe for user display
129 Security Features:
130 - Removes database-specific error details
131 - Hides file system paths and internal structure
132 - Prevents stack trace information leakage
133 - Maintains user experience with helpful generic messages
134 """
135 error_str = str(error).lower()
137 # Database-related errors
138 if any(keyword in error_str for keyword in ['sqlite', 'database', 'table', 'column', 'constraint']):
139 return "Greška pri obradi podataka."
141 # File system errors
142 if any(keyword in error_str for keyword in ['permission', 'access', 'file', 'directory', 'path']):
143 return "Greška pri pristupu datoteci."
145 # Network/connection errors
146 if any(keyword in error_str for keyword in ['connection', 'timeout', 'network', 'socket']):
147 return "Greška mreže."
149 # Authentication/authorization errors
150 if any(keyword in error_str for keyword in ['unauthorized', 'forbidden', 'access denied']):
151 return "Nemate dozvolu za ovu operaciju."
153 # Generic server errors
154 return "Došlo je do greške. Molimo pokušajte ponovno."
157def validate_filename_security(filename: str) -> Tuple[bool, Optional[str]]:
158 """
159 Validate filename for security issues including path traversal and malicious extensions.
161 Prevents path traversal attacks, executable file uploads, and other filename-based
162 security vulnerabilities.
164 Args:
165 filename: Filename to validate
167 Returns:
168 Tuple[bool, Optional[str]]: (is_valid, error_message)
170 Security Features:
171 - Path traversal prevention (../, ..\\, etc.)
172 - Malicious extension detection
173 - Special character filtering
174 - Length validation
175 """
176 if not filename or not isinstance(filename, str):
177 return False, "Nedostaje ime datoteke."
179 # Length validation
180 if len(filename) > 255:
181 return False, "Ime datoteke je predugačko."
183 # Path traversal detection
184 dangerous_patterns = [
185 '..',
186 '/',
187 '\\',
188 ':',
189 '|',
190 '<',
191 '>',
192 '*',
193 '?',
194 '"'
195 ]
197 for pattern in dangerous_patterns:
198 if pattern in filename:
199 return False, "Neispravno ime datoteke."
201 # Malicious extension detection
202 dangerous_extensions = [
203 '.php', '.php3', '.php4', '.php5', '.phtml',
204 '.asp', '.aspx', '.jsp', '.jspx',
205 '.py', '.pl', '.rb', '.sh', '.bat', '.cmd',
206 '.exe', '.scr', '.com', '.pif',
207 '.htaccess', '.htpasswd',
208 '.config', '.ini', '.cfg'
209 ]
211 filename_lower = filename.lower()
212 for ext in dangerous_extensions:
213 if filename_lower.endswith(ext):
214 return False, "Nedozvoljena vrsta datoteke."
216 # Null byte injection prevention
217 if '\x00' in filename:
218 return False, "Neispravno ime datoteke."
220 return True, None
223def generate_secure_token(length: int = 32) -> str:
224 """
225 Generate cryptographically secure random token.
227 Creates tokens with high entropy suitable for CSRF protection, session tokens,
228 and other security-critical applications.
230 Args:
231 length: Desired token length in bytes (default 32)
233 Returns:
234 str: URL-safe base64 encoded secure token
236 Security Features:
237 - Cryptographically secure random number generation
238 - High entropy output suitable for security tokens
239 - URL-safe encoding for web application compatibility
240 """
241 return secrets.token_urlsafe(length)
244def verify_hmac_token(data: str, token: str, secret_key: str) -> bool:
245 """
246 Verify HMAC-based token for integrity and authenticity.
248 Provides secure verification of tokens used in media URLs, API authentication,
249 and other security-sensitive contexts.
251 Args:
252 data: Original data that was signed
253 token: HMAC token to verify
254 secret_key: Secret key used for signing
256 Returns:
257 bool: True if token is valid, False otherwise
259 Security Features:
260 - Timing-attack resistant comparison
261 - Strong HMAC-SHA256 verification
262 - Prevents token manipulation attacks
263 """
264 try:
265 expected_token = hmac.new(
266 secret_key.encode('utf-8'),
267 data.encode('utf-8'),
268 hashlib.sha256
269 ).hexdigest()
271 # Timing-attack resistant comparison
272 return hmac.compare_digest(token, expected_token)
274 except Exception:
275 return False
278def rate_limit_with_exponential_backoff(attempt_count: int, base_delay: int = 5) -> int:
279 """
280 Calculate exponential backoff delay for rate limiting.
282 Implements increasingly longer delays for repeated failed attempts,
283 making brute force attacks impractical while allowing legitimate users
284 to retry after reasonable delays.
286 Args:
287 attempt_count: Number of failed attempts
288 base_delay: Base delay in seconds (default 5)
290 Returns:
291 int: Delay in seconds before next attempt allowed
293 Security Features:
294 - Exponential increase in delay times
295 - Maximum cap to prevent excessive delays
296 - Transparent to legitimate users with minimal attempts
297 """
298 if attempt_count <= 0:
299 return 0
301 # Exponential backoff with maximum cap
302 delay = min(base_delay * (2 ** (attempt_count - 1)), 3600) # Max 1 hour
303 return delay
306def validate_api_input(data: Dict[str, Any], schema: Dict[str, Dict]) -> Tuple[bool, Optional[str], Dict[str, Any]]:
307 """
308 Validate API input against schema with security-focused validation.
310 Provides comprehensive input validation with type checking, length limits,
311 and security pattern detection to prevent injection attacks.
313 Args:
314 data: Input data to validate
315 schema: Validation schema with field definitions
317 Returns:
318 Tuple[bool, Optional[str], Dict[str, Any]]: (is_valid, error_message, sanitized_data)
320 Security Features:
321 - SQL injection pattern detection
322 - XSS prevention through input sanitization
323 - Type and length validation
324 - Required field enforcement
325 """
326 if not isinstance(data, dict):
327 return False, "Neispravni podaci.", {}
329 sanitized_data = {}
331 for field_name, field_schema in schema.items():
332 value = data.get(field_name)
334 # Required field check
335 if field_schema.get('required', False) and (value is None or value == ''):
336 return False, f"Polje '{field_name}' je obavezno.", {}
338 if value is not None:
339 # Type validation
340 expected_type = field_schema.get('type', str)
341 if not isinstance(value, expected_type):
342 try:
343 value = expected_type(value)
344 except (ValueError, TypeError):
345 return False, f"Neispravna vrijednost za '{field_name}'.", {}
347 # String-specific validations
348 if isinstance(value, str):
349 # Length validation
350 min_length = field_schema.get('min_length', 0)
351 max_length = field_schema.get('max_length', 10000)
353 if len(value) < min_length:
354 return False, f"'{field_name}' prekratak.", {}
356 if len(value) > max_length:
357 return False, f"'{field_name}' predugačak.", {}
359 # Security pattern detection
360 dangerous_patterns = [
361 'script',
362 'javascript',
363 'vbscript',
364 'onload',
365 'onerror',
366 'onclick',
367 'select',
368 'union',
369 'drop',
370 'insert',
371 'update',
372 'delete',
373 '--',
374 ';'
375 ]
377 value_lower = value.lower()
378 for pattern in dangerous_patterns:
379 if pattern in value_lower:
380 return False, f"Neispravna vrijednost za '{field_name}'.", {}
382 sanitized_data[field_name] = value
384 return True, None, sanitized_data
387def security_headers_middleware(response):
388 """
389 Add comprehensive security headers to HTTP responses.
391 Implements defense-in-depth security headers to protect against various
392 web application attacks including XSS, clickjacking, and data injection.
394 Args:
395 response: Flask response object
397 Returns:
398 Response object with security headers added
400 Security Features:
401 - Content Security Policy (CSP) enforcement
402 - XSS protection headers
403 - Clickjacking prevention
404 - MIME type sniffing prevention
405 - Referrer policy enforcement
406 """
407 # Enhanced Content Security Policy
408 csp_directives = [
409 "default-src 'self'",
410 "script-src 'self' 'unsafe-inline' https://unpkg.com",
411 "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://unpkg.com",
412 "img-src 'self' data: https://*.tile.openstreetmap.org https://unpkg.com",
413 "font-src 'self' https://fonts.gstatic.com data:",
414 "connect-src 'self'",
415 "frame-ancestors 'none'",
416 "base-uri 'self'",
417 "form-action 'self'"
418 ]
420 response.headers['Content-Security-Policy'] = '; '.join(csp_directives)
422 # Security headers
423 response.headers['X-Frame-Options'] = 'DENY'
424 response.headers['X-Content-Type-Options'] = 'nosniff'
425 response.headers['X-XSS-Protection'] = '1; mode=block'
426 response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
427 response.headers['Permissions-Policy'] = 'geolocation=(self), microphone=(), camera=()'
429 # Remove server information
430 response.headers.pop('Server', None)
432 return response