Coverage for app_modules/security_enhancements.py: 96%

109 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2security_enhancements.py 

3 

4Purpose: 

5 Advanced security enhancement functions that implement protection against sophisticated 

6 attack vectors identified during red-team security auditing. This module provides 

7 timing attack protection, input validation hardening, and security-focused error 

8 handling to maintain the highest security standards across the application. 

9 

10Security Enhancements: 

11 - Timing attack resistant authentication checks (username enumeration prevention) 

12 - Geographic coordinate validation with proper bounds checking 

13 - Security-focused error message sanitization to prevent information disclosure 

14 - Enhanced input validation for all user-controllable parameters 

15 - Cryptographic helper functions for secure token generation and validation 

16 

17Implementation Philosophy: 

18 All functions follow defense-in-depth principles with multiple validation layers. 

19 Error handling is designed to prevent information leakage while maintaining 

20 system functionality. Security measures are transparent to legitimate users 

21 but effectively block malicious attempts. 

22""" 

23 

24import time 

25import bcrypt 

26import secrets 

27import hashlib 

28import hmac 

29from typing import Tuple, Optional, Dict, Any 

30from functools import wraps 

31from flask import current_app 

32 

33 

34def secure_password_check(username: str, password: str, user_row: Optional[Dict]) -> bool: 

35 """ 

36 Timing-attack resistant password verification. 

37  

38 Prevents username enumeration by ensuring consistent timing regardless of whether 

39 the username exists in the database. Always performs bcrypt operation to maintain 

40 constant time complexity. 

41  

42 Args: 

43 username: Username being authenticated 

44 password: Password to verify 

45 user_row: User database row (None if user doesn't exist) 

46  

47 Returns: 

48 bool: True if authentication successful, False otherwise 

49  

50 Security Features: 

51 - Constant time execution regardless of username validity 

52 - Secure password hashing verification 

53 - No information leakage through timing differences 

54 """ 

55 # Dummy hash for timing consistency when user doesn't exist 

56 # This ensures bcrypt operation always runs, preventing timing attacks 

57 dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewiyTuTpbsEf.ug." 

58 

59 if user_row and user_row['password_hash']: 

60 # User exists - verify actual password 

61 target_hash = user_row['password_hash'].encode('utf-8') 

62 result = bcrypt.checkpw(password.encode('utf-8'), target_hash) 

63 else: 

64 # User doesn't exist - perform dummy bcrypt to maintain timing 

65 bcrypt.checkpw(password.encode('utf-8'), dummy_hash.encode('utf-8')) 

66 result = False 

67 

68 return result 

69 

70 

71def validate_geographic_coordinates(lat: Any, lng: Any) -> Tuple[bool, Optional[str], Optional[Tuple[float, float]]]: 

72 """ 

73 Validate geographic coordinates with proper bounds checking. 

74  

75 Prevents injection of invalid coordinate data that could pollute the database 

76 or cause application errors. Enforces strict geographic bounds and type validation. 

77  

78 Args: 

79 lat: Latitude value (any type, will be validated) 

80 lng: Longitude value (any type, will be validated) 

81  

82 Returns: 

83 Tuple[bool, Optional[str], Optional[Tuple[float, float]]]: 

84 - success: Whether validation passed 

85 - error_message: Error description if validation failed 

86 - coordinates: Validated (lat, lng) tuple if successful 

87  

88 Security Features: 

89 - Strict type validation prevents injection attacks 

90 - Geographic bounds enforcement prevents invalid data 

91 - Sanitized error messages prevent information disclosure 

92 """ 

93 try: 

94 # Type validation and conversion 

95 lat_float = float(lat) 

96 lng_float = float(lng) 

97 

98 # Geographic bounds validation 

99 if not (-90.0 <= lat_float <= 90.0): 

100 return False, "Neispravna geografska širina.", None 

101 

102 if not (-180.0 <= lng_float <= 180.0): 

103 return False, "Neispravna geografska dužina.", None 

104 

105 # Additional sanity checks 

106 if lat_float != lat_float or lng_float != lng_float: # NaN check 

107 return False, "Neispravne koordinate.", None 

108 

109 return True, None, (lat_float, lng_float) 

110 

111 except (ValueError, TypeError, OverflowError): 

112 return False, "Neispravne koordinate.", None 

113 

114 

115def sanitize_error_message(error: Exception, context: str = "operation") -> str: 

116 """ 

117 Sanitize error messages to prevent information disclosure. 

118  

119 Converts detailed system errors into generic user-friendly messages that don't 

120 reveal sensitive system information or internal application structure. 

121  

122 Args: 

123 error: The original exception 

124 context: Context of the operation for logging purposes 

125  

126 Returns: 

127 str: Sanitized error message safe for user display 

128  

129 Security Features: 

130 - Removes database-specific error details 

131 - Hides file system paths and internal structure 

132 - Prevents stack trace information leakage 

133 - Maintains user experience with helpful generic messages 

134 """ 

135 error_str = str(error).lower() 

136 

137 # Database-related errors 

138 if any(keyword in error_str for keyword in ['sqlite', 'database', 'table', 'column', 'constraint']): 

139 return "Greška pri obradi podataka." 

140 

141 # File system errors  

142 if any(keyword in error_str for keyword in ['permission', 'access', 'file', 'directory', 'path']): 

143 return "Greška pri pristupu datoteci." 

144 

145 # Network/connection errors 

146 if any(keyword in error_str for keyword in ['connection', 'timeout', 'network', 'socket']): 

147 return "Greška mreže." 

148 

149 # Authentication/authorization errors 

150 if any(keyword in error_str for keyword in ['unauthorized', 'forbidden', 'access denied']): 

151 return "Nemate dozvolu za ovu operaciju." 

152 

153 # Generic server errors 

154 return "Došlo je do greške. Molimo pokušajte ponovno." 

155 

156 

157def validate_filename_security(filename: str) -> Tuple[bool, Optional[str]]: 

158 """ 

159 Validate filename for security issues including path traversal and malicious extensions. 

160  

161 Prevents path traversal attacks, executable file uploads, and other filename-based 

162 security vulnerabilities. 

163  

164 Args: 

165 filename: Filename to validate 

166  

167 Returns: 

168 Tuple[bool, Optional[str]]: (is_valid, error_message) 

169  

170 Security Features: 

171 - Path traversal prevention (../, ..\\, etc.) 

172 - Malicious extension detection 

173 - Special character filtering 

174 - Length validation 

175 """ 

176 if not filename or not isinstance(filename, str): 

177 return False, "Nedostaje ime datoteke." 

178 

179 # Length validation 

180 if len(filename) > 255: 

181 return False, "Ime datoteke je predugačko." 

182 

183 # Path traversal detection 

184 dangerous_patterns = [ 

185 '..', 

186 '/', 

187 '\\', 

188 ':', 

189 '|', 

190 '<', 

191 '>', 

192 '*', 

193 '?', 

194 '"' 

195 ] 

196 

197 for pattern in dangerous_patterns: 

198 if pattern in filename: 

199 return False, "Neispravno ime datoteke." 

200 

201 # Malicious extension detection 

202 dangerous_extensions = [ 

203 '.php', '.php3', '.php4', '.php5', '.phtml', 

204 '.asp', '.aspx', '.jsp', '.jspx', 

205 '.py', '.pl', '.rb', '.sh', '.bat', '.cmd', 

206 '.exe', '.scr', '.com', '.pif', 

207 '.htaccess', '.htpasswd', 

208 '.config', '.ini', '.cfg' 

209 ] 

210 

211 filename_lower = filename.lower() 

212 for ext in dangerous_extensions: 

213 if filename_lower.endswith(ext): 

214 return False, "Nedozvoljena vrsta datoteke." 

215 

216 # Null byte injection prevention 

217 if '\x00' in filename: 

218 return False, "Neispravno ime datoteke." 

219 

220 return True, None 

221 

222 

223def generate_secure_token(length: int = 32) -> str: 

224 """ 

225 Generate cryptographically secure random token. 

226  

227 Creates tokens with high entropy suitable for CSRF protection, session tokens, 

228 and other security-critical applications. 

229  

230 Args: 

231 length: Desired token length in bytes (default 32) 

232  

233 Returns: 

234 str: URL-safe base64 encoded secure token 

235  

236 Security Features: 

237 - Cryptographically secure random number generation 

238 - High entropy output suitable for security tokens 

239 - URL-safe encoding for web application compatibility 

240 """ 

241 return secrets.token_urlsafe(length) 

242 

243 

244def verify_hmac_token(data: str, token: str, secret_key: str) -> bool: 

245 """ 

246 Verify HMAC-based token for integrity and authenticity. 

247  

248 Provides secure verification of tokens used in media URLs, API authentication, 

249 and other security-sensitive contexts. 

250  

251 Args: 

252 data: Original data that was signed 

253 token: HMAC token to verify 

254 secret_key: Secret key used for signing 

255  

256 Returns: 

257 bool: True if token is valid, False otherwise 

258  

259 Security Features: 

260 - Timing-attack resistant comparison 

261 - Strong HMAC-SHA256 verification 

262 - Prevents token manipulation attacks 

263 """ 

264 try: 

265 expected_token = hmac.new( 

266 secret_key.encode('utf-8'), 

267 data.encode('utf-8'), 

268 hashlib.sha256 

269 ).hexdigest() 

270 

271 # Timing-attack resistant comparison 

272 return hmac.compare_digest(token, expected_token) 

273 

274 except Exception: 

275 return False 

276 

277 

278def rate_limit_with_exponential_backoff(attempt_count: int, base_delay: int = 5) -> int: 

279 """ 

280 Calculate exponential backoff delay for rate limiting. 

281  

282 Implements increasingly longer delays for repeated failed attempts, 

283 making brute force attacks impractical while allowing legitimate users 

284 to retry after reasonable delays. 

285  

286 Args: 

287 attempt_count: Number of failed attempts 

288 base_delay: Base delay in seconds (default 5) 

289  

290 Returns: 

291 int: Delay in seconds before next attempt allowed 

292  

293 Security Features: 

294 - Exponential increase in delay times 

295 - Maximum cap to prevent excessive delays 

296 - Transparent to legitimate users with minimal attempts 

297 """ 

298 if attempt_count <= 0: 

299 return 0 

300 

301 # Exponential backoff with maximum cap 

302 delay = min(base_delay * (2 ** (attempt_count - 1)), 3600) # Max 1 hour 

303 return delay 

304 

305 

306def validate_api_input(data: Dict[str, Any], schema: Dict[str, Dict]) -> Tuple[bool, Optional[str], Dict[str, Any]]: 

307 """ 

308 Validate API input against schema with security-focused validation. 

309  

310 Provides comprehensive input validation with type checking, length limits, 

311 and security pattern detection to prevent injection attacks. 

312  

313 Args: 

314 data: Input data to validate 

315 schema: Validation schema with field definitions 

316  

317 Returns: 

318 Tuple[bool, Optional[str], Dict[str, Any]]: (is_valid, error_message, sanitized_data) 

319  

320 Security Features: 

321 - SQL injection pattern detection 

322 - XSS prevention through input sanitization 

323 - Type and length validation 

324 - Required field enforcement 

325 """ 

326 if not isinstance(data, dict): 

327 return False, "Neispravni podaci.", {} 

328 

329 sanitized_data = {} 

330 

331 for field_name, field_schema in schema.items(): 

332 value = data.get(field_name) 

333 

334 # Required field check 

335 if field_schema.get('required', False) and (value is None or value == ''): 

336 return False, f"Polje '{field_name}' je obavezno.", {} 

337 

338 if value is not None: 

339 # Type validation 

340 expected_type = field_schema.get('type', str) 

341 if not isinstance(value, expected_type): 

342 try: 

343 value = expected_type(value) 

344 except (ValueError, TypeError): 

345 return False, f"Neispravna vrijednost za '{field_name}'.", {} 

346 

347 # String-specific validations 

348 if isinstance(value, str): 

349 # Length validation 

350 min_length = field_schema.get('min_length', 0) 

351 max_length = field_schema.get('max_length', 10000) 

352 

353 if len(value) < min_length: 

354 return False, f"'{field_name}' prekratak.", {} 

355 

356 if len(value) > max_length: 

357 return False, f"'{field_name}' predugačak.", {} 

358 

359 # Security pattern detection 

360 dangerous_patterns = [ 

361 'script', 

362 'javascript', 

363 'vbscript', 

364 'onload', 

365 'onerror', 

366 'onclick', 

367 'select', 

368 'union', 

369 'drop', 

370 'insert', 

371 'update', 

372 'delete', 

373 '--', 

374 ';' 

375 ] 

376 

377 value_lower = value.lower() 

378 for pattern in dangerous_patterns: 

379 if pattern in value_lower: 

380 return False, f"Neispravna vrijednost za '{field_name}'.", {} 

381 

382 sanitized_data[field_name] = value 

383 

384 return True, None, sanitized_data 

385 

386 

387def security_headers_middleware(response): 

388 """ 

389 Add comprehensive security headers to HTTP responses. 

390  

391 Implements defense-in-depth security headers to protect against various 

392 web application attacks including XSS, clickjacking, and data injection. 

393  

394 Args: 

395 response: Flask response object 

396  

397 Returns: 

398 Response object with security headers added 

399  

400 Security Features: 

401 - Content Security Policy (CSP) enforcement 

402 - XSS protection headers 

403 - Clickjacking prevention 

404 - MIME type sniffing prevention 

405 - Referrer policy enforcement 

406 """ 

407 # Enhanced Content Security Policy 

408 csp_directives = [ 

409 "default-src 'self'", 

410 "script-src 'self' 'unsafe-inline' https://unpkg.com", 

411 "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://unpkg.com", 

412 "img-src 'self' data: https://*.tile.openstreetmap.org https://unpkg.com", 

413 "font-src 'self' https://fonts.gstatic.com data:", 

414 "connect-src 'self'", 

415 "frame-ancestors 'none'", 

416 "base-uri 'self'", 

417 "form-action 'self'" 

418 ] 

419 

420 response.headers['Content-Security-Policy'] = '; '.join(csp_directives) 

421 

422 # Security headers 

423 response.headers['X-Frame-Options'] = 'DENY' 

424 response.headers['X-Content-Type-Options'] = 'nosniff' 

425 response.headers['X-XSS-Protection'] = '1; mode=block' 

426 response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' 

427 response.headers['Permissions-Policy'] = 'geolocation=(self), microphone=(), camera=()' 

428 

429 # Remove server information 

430 response.headers.pop('Server', None) 

431 

432 return response