Coverage for app_modules/admin_routes.py: 49%

403 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2admin_routes.py 

3 

4Purpose: 

5 Admin panel HTML and actions for managing users. Includes add, remove, and 

6 change-password endpoints, protected by the admin_required decorator. 

7 Also exposes admin-only JSON APIs to view and manage any user's cameras and 

8 images, and to read audit logs. All routes are CSRF-protected globally. 

9 

10Routes: 

11 - GET /admin 

12 - POST /admin/add_user 

13 - POST /admin/remove_user/<int:user_id> 

14 - POST /admin/change_password/<int:user_id> 

15 - GET /admin/users.json 

16 - GET /admin/user/<int:user_id>/cameras.json 

17 - GET /admin/user/<int:user_id>/images.json 

18 - POST /admin/user/<int:user_id>/cameras/rename 

19 - POST /admin/user/<int:user_id>/cameras/delete 

20 - POST /admin/image/delete 

21 - GET /admin/logs.json 

22""" 

23 

24import os 

25import re 

26import sqlite3 

27import bcrypt 

28import time 

29from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify 

30from .db import get_db 

31from .security import admin_required 

32from .audit import log_admin_action, get_request_ip 

33from .paths import STATIC_PATH, USER_PHOTOS_REAL 

34from .images_service import latest_from_db_or_fs, collect_user_images 

35from .helpers import parse_ts_from_any, format_dt, normalize_to_static_user_photos, build_media_url, build_share_url 

36 

37 

38bp = Blueprint('admin_routes', __name__) 

39 

40 

41@bp.route('/admin') 

42@admin_required 

43def admin_panel(): 

44 db = get_db() 

45 users = db.execute('SELECT id, username, IFNULL(is_admin,0) AS is_admin FROM users ORDER BY username COLLATE NOCASE').fetchall() 

46 return render_template('admin.html', users=users) 

47 

48 

49@bp.route('/admin/add_user', methods=['POST']) 

50@admin_required 

51def admin_add_user(): 

52 username = (request.form.get('username') or '').strip().lower() 

53 password = request.form.get('password') or '' 

54 # Strict username policy: 3-32 chars, lowercase letters, digits, underscore 

55 if not username or not password: 

56 flash('Korisničko ime i lozinka su obavezni.', 'error') 

57 return redirect(url_for('admin_routes.admin_panel')) 

58 if not re.fullmatch(r'[a-z0-9_]{3,32}', username): 

59 flash('Korisničko ime smije sadržavati samo mala slova, brojeve i donju crtu (3-32 znaka).', 'error') 

60 return redirect(url_for('admin_routes.admin_panel')) 

61 try: 

62 pwd_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') 

63 db = get_db() 

64 db.execute('INSERT INTO users (username, password_hash) VALUES (?, ?)', (username, pwd_hash)) 

65 db.commit() 

66 flash('Korisnik dodan.', 'success') 

67 try: 

68 log_admin_action('add_user', admin_user_id=session.get('user_id'), target_user_id=None, ip=get_request_ip(request), detail=f'username={username}') 

69 except Exception: 

70 pass 

71 except sqlite3.IntegrityError: 

72 flash('Korisničko ime već postoji.', 'error') 

73 except sqlite3.Error: 

74 flash('Greška baze.', 'error') 

75 return redirect(url_for('admin_routes.admin_panel')) 

76 

77 

78@bp.route('/admin/remove_user/<int:user_id>', methods=['POST']) 

79@admin_required 

80def admin_remove_user(user_id: int): 

81 try: 

82 db = get_db() 

83 db.execute('DELETE FROM cameras WHERE user_id = ?', (user_id,)) 

84 db.execute('DELETE FROM users WHERE id = ?', (user_id,)) 

85 db.commit() 

86 flash('Korisnik i kamere obrisani.', 'success') 

87 try: 

88 log_admin_action('remove_user', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request)) 

89 except Exception: 

90 pass 

91 except sqlite3.Error: 

92 flash('Greška baze.', 'error') 

93 return redirect(url_for('admin_routes.admin_panel')) 

94 

95 

96@bp.route('/admin/change_password/<int:user_id>', methods=['POST']) 

97@admin_required 

98def admin_change_password(user_id: int): 

99 """Change a user's password. 

100 - Supports form POST (HTML) and JSON (AJAX) with CSRF protection. 

101 - Enforces minimal password policy server-side. 

102 """ 

103 is_json = request.is_json or ( 

104 (request.headers.get('Content-Type') or '').split(';', 1)[0].strip() == 'application/json' 

105 ) 

106 

107 if is_json: 

108 data = request.get_json(silent=True) or {} 

109 new_password = (data.get('new_password') or '').strip() 

110 else: 

111 new_password = (request.form.get('new_password') or '').strip() 

112 

113 # Minimal server-side policy; UI may enforce stricter rules 

114 if len(new_password) < 8: 

115 if is_json: 

116 return jsonify({'success': False, 'message': 'Lozinka mora imati najmanje 8 znakova.'}), 400 

117 flash('Lozinka mora imati najmanje 8 znakova.', 'error') 

118 return redirect(url_for('admin_routes.admin_panel')) 

119 

120 try: 

121 new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') 

122 db = get_db() 

123 db.execute('UPDATE users SET password_hash = ? WHERE id = ?', (new_hash, user_id)) 

124 db.commit() 

125 try: 

126 log_admin_action('change_password', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request)) 

127 except Exception: 

128 pass 

129 if is_json: 

130 return jsonify({'success': True}) 

131 flash('Lozinka promijenjena.', 'success') 

132 except sqlite3.Error: 

133 if is_json: 

134 return jsonify({'success': False, 'message': 'Greška baze.'}), 500 

135 flash('Greška baze.', 'error') 

136 return redirect(url_for('admin_routes.admin_panel')) 

137 

138 

139# ---------- Admin JSON APIs (admin-only) ---------- 

140 

141@bp.route('/admin/users.json') 

142@admin_required 

143def admin_users_json(): 

144 db = get_db() 

145 rows = db.execute(''' 

146 SELECT u.id, u.username, IFNULL(u.is_admin,0) AS is_admin, 

147 COUNT(c.camera_id) AS camera_count 

148 FROM users u 

149 LEFT JOIN cameras c ON c.user_id = u.id 

150 GROUP BY u.id 

151 ORDER BY u.username COLLATE NOCASE 

152 ''').fetchall() 

153 users = [] 

154 for r in rows: 

155 users.append({ 

156 'id': r['id'], 

157 'username': r['username'], 

158 'is_admin': int(r['is_admin'] or 0) == 1, 

159 'camera_count': int(r['camera_count'] or 0) 

160 }) 

161 return jsonify({'users': users}) 

162 

163 

164@bp.route('/admin/user/<int:user_id>/cameras.json') 

165@admin_required 

166def admin_user_cameras_json(user_id: int): 

167 db = get_db() 

168 cols = [row[1] for row in db.execute('PRAGMA table_info(cameras)').fetchall()] 

169 has_model = 'model' in set(cols) 

170 select_fields = 'camera_id, camera_name, file_paths' + (', model' if has_model else '') 

171 cur = db.execute(f'SELECT {select_fields} FROM cameras WHERE user_id = ? ORDER BY camera_name COLLATE NOCASE', (user_id,)) 

172 rows = [] 

173 for r in cur.fetchall(): 

174 cam_id = str(r['camera_id']) 

175 latest_dt, _ = latest_from_db_or_fs(cam_id, r['file_paths'] or '', STATIC_PATH) 

176 thumb_url = '/static/camera_render.png' 

177 model = (r['model'] if has_model else None) or 'Vision mini' 

178 rows.append({ 

179 'camera_id': cam_id, 

180 'camera_name': r['camera_name'], 

181 'last_active': format_dt(latest_dt) if latest_dt else 'Nema aktivnosti', 

182 'latest_image_ts': int(latest_dt.timestamp()) if latest_dt else None, 

183 'thumbnail_url': thumb_url, 

184 'model': model 

185 }) 

186 return jsonify({'cameras': rows}) 

187 

188 

189@bp.route('/admin/user/<int:user_id>/images.json') 

190@admin_required 

191def admin_user_images_json(user_id: int): 

192 # Return images organized by camera with pagination 

193 db = get_db() 

194 all_images = collect_user_images(user_id, db, STATIC_PATH) 

195 

196 # Get camera info 

197 cam_rows = db.execute('SELECT camera_id, camera_name FROM cameras WHERE user_id = ? ORDER BY camera_name', (user_id,)).fetchall() 

198 camera_names = {str(r['camera_id']): r['camera_name'] for r in cam_rows} 

199 

200 # Organize images by camera 

201 cameras = {} 

202 for img in all_images: 

203 cam_id = img.get('camera_id') 

204 if not cam_id: 

205 continue 

206 if cam_id not in cameras: 

207 cameras[cam_id] = { 

208 'camera_id': cam_id, 

209 'camera_name': camera_names.get(cam_id, f'Camera {cam_id}'), 

210 'images': [] 

211 } 

212 cameras[cam_id]['images'].append({ 

213 'url': img['url'], 

214 'rel': img['rel'], 

215 'camera_id': cam_id, 

216 'ts': format_dt(img.get('ts')) if img.get('ts') else '' 

217 }) 

218 

219 # Apply pagination per camera 

220 offset = max(0, int(request.args.get('offset', 0))) 

221 limit = max(1, min(int(request.args.get('limit', 30)), 100)) 

222 

223 result_cameras = [] 

224 for cam_id, cam_data in cameras.items(): 

225 total_images = len(cam_data['images']) 

226 images_slice = cam_data['images'][offset:offset + limit] 

227 has_more = (offset + limit) < total_images 

228 

229 result_cameras.append({ 

230 'camera_id': cam_id, 

231 'camera_name': cam_data['camera_name'], 

232 'images': images_slice, 

233 'total_count': total_images, 

234 'has_more': has_more, 

235 'next_offset': offset + limit if has_more else None 

236 }) 

237 

238 # Sort cameras by name for consistent display 

239 result_cameras.sort(key=lambda x: x['camera_name']) 

240 

241 return jsonify({ 

242 'cameras': result_cameras, 

243 'total_cameras': len(result_cameras) 

244 }) 

245 

246 

247@bp.route('/admin/share_link', methods=['POST']) 

248@admin_required 

249def admin_generate_share_link(): 

250 """Generate a short-lived public share URL for an image. 

251 Admin-only endpoint to prevent user-level mass sharing without oversight. 

252 Body: { rel: 'User-photos/PICT_...', ttl_minutes: 30 } 

253 """ 

254 data = request.get_json(silent=True) or {} 

255 rel = normalize_to_static_user_photos(str(data.get('rel') or '')) 

256 ttl_minutes = int(data.get('ttl_minutes') or 14*24*60) 

257 ttl_minutes = max(1, min(ttl_minutes, 24*60)) # between 1 minute and 24h 

258 if not rel: 

259 return jsonify({'success': False, 'message': 'Nedostaje slika.'}), 400 

260 # Verify file exists and parse camera 

261 ts, cam = parse_ts_from_any(rel) 

262 if not cam: 

263 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400 

264 abs_path = os.path.join(STATIC_PATH, rel) 

265 if not os.path.exists(abs_path): 

266 return jsonify({'success': False, 'message': 'Datoteka ne postoji.'}), 404 

267 # Generate expiring URL 

268 exp = int(time.time()) + ttl_minutes * 60 

269 url = build_share_url(rel, exp) 

270 return jsonify({'success': True, 'url': url, 'expires_at': exp}) 

271 

272 

273@bp.route('/admin/user/<int:user_id>/camera/<camera_id>/images.json') 

274@admin_required 

275def admin_user_camera_images_json(user_id: int, camera_id: str): 

276 # Return images for a specific camera with pagination 

277 if not (camera_id.isdigit() and len(camera_id) == 12): 

278 return jsonify({'error': 'Invalid camera ID'}), 400 

279 

280 db = get_db() 

281 # Verify camera belongs to user 

282 cam_check = db.execute('SELECT camera_name FROM cameras WHERE user_id = ? AND camera_id = ?', (user_id, camera_id)).fetchone() 

283 if not cam_check: 

284 return jsonify({'error': 'Camera not found'}), 404 

285 

286 all_images = collect_user_images(user_id, db, STATIC_PATH) 

287 

288 # Filter for this specific camera 

289 camera_images = [img for img in all_images if img.get('camera_id') == camera_id] 

290 

291 # Apply pagination 

292 offset = max(0, int(request.args.get('offset', 0))) 

293 limit = max(1, min(int(request.args.get('limit', 30)), 100)) 

294 

295 total_images = len(camera_images) 

296 images_slice = camera_images[offset:offset + limit] 

297 has_more = (offset + limit) < total_images 

298 

299 # Convert timestamps to string 

300 result_images = [] 

301 for img in images_slice: 

302 result_images.append({ 

303 'url': img['url'], 

304 'rel': img['rel'], 

305 'camera_id': camera_id, 

306 'ts': format_dt(img.get('ts')) if img.get('ts') else '' 

307 }) 

308 

309 return jsonify({ 

310 'camera_id': camera_id, 

311 'camera_name': cam_check['camera_name'], 

312 'images': result_images, 

313 'total_count': total_images, 

314 'has_more': has_more, 

315 'next_offset': offset + limit if has_more else None, 

316 'current_offset': offset 

317 }) 

318 

319 

320@bp.route('/admin/user/<int:user_id>/cameras/add', methods=['POST']) 

321@admin_required 

322def admin_user_camera_add(user_id: int): 

323 data = request.get_json(silent=True) or {} 

324 camera_id = str(data.get('camera_id', '')).strip() 

325 camera_name = (data.get('camera_name') or '').strip() 

326 # Validate camera id and name 

327 if not (camera_id.isdigit() and len(camera_id) == 12): 

328 return jsonify({'success': False, 'message': 'ID kamere mora biti 12 brojeva.'}), 400 

329 if not camera_name or len(camera_name) > 60 or not re.fullmatch(r'[\w\s\-.]{1,60}', camera_name): 

330 return jsonify({'success': False, 'message': 'Neispravan naziv kamere.'}), 400 

331 

332 db = get_db() 

333 try: 

334 # Check if camera already exists for any user 

335 existing = db.execute('SELECT user_id FROM cameras WHERE camera_id=?', (camera_id,)).fetchone() 

336 if existing: 

337 return jsonify({'success': False, 'message': 'Kamera s tim ID već postoji.'}), 400 

338 

339 # Check if user exists 

340 user_exists = db.execute('SELECT id FROM users WHERE id=?', (user_id,)).fetchone() 

341 if not user_exists: 

342 return jsonify({'success': False, 'message': 'Korisnik ne postoji.'}), 404 

343 

344 # Insert new camera 

345 db.execute(''' 

346 INSERT INTO cameras (user_id, camera_id, camera_name)  

347 VALUES (?, ?, ?) 

348 ''', (user_id, camera_id, camera_name)) 

349 db.commit() 

350 

351 try: 

352 log_admin_action('add_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}, name={camera_name}') 

353 except Exception: 

354 pass 

355 

356 return jsonify({'success': True, 'message': 'Kamera je uspješno dodana.'}) 

357 except sqlite3.Error as e: 

358 return jsonify({'success': False, 'message': 'Greška baze podataka.'}), 500 

359 

360 

361@bp.route('/admin/user/<int:user_id>/cameras/rename', methods=['POST']) 

362@admin_required 

363def admin_user_camera_rename(user_id: int): 

364 data = request.get_json(silent=True) or {} 

365 camera_id = str(data.get('camera_id', '')).strip() 

366 new_name = (data.get('camera_name') or '').strip() 

367 if not (camera_id.isdigit() and len(camera_id) == 12): 

368 return jsonify({'success': False, 'message': 'Neispravni podaci.'}), 400 

369 if not new_name or len(new_name) > 60 or not re.fullmatch(r'[\w\s\-.]{1,60}', new_name): 

370 return jsonify({'success': False, 'message': 'Neispravan naziv kamere.'}), 400 

371 db = get_db() 

372 try: 

373 cur = db.execute('UPDATE cameras SET camera_name=? WHERE user_id=? AND camera_id=?', (new_name, user_id, camera_id)) 

374 db.commit() 

375 if cur.rowcount == 0: 

376 return jsonify({'success': False, 'message': 'Kamera nije pronađena.'}), 404 

377 try: 

378 log_admin_action('rename_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}') 

379 except Exception: 

380 pass 

381 return jsonify({'success': True}) 

382 except sqlite3.Error: 

383 return jsonify({'success': False, 'message': 'Greška baze.'}), 500 

384 

385 

386@bp.route('/admin/user/<int:user_id>/cameras/delete', methods=['POST']) 

387@admin_required 

388def admin_user_camera_delete(user_id: int): 

389 data = request.get_json(silent=True) or {} 

390 camera_id = str(data.get('camera_id', '')).strip() 

391 if not (camera_id.isdigit() and len(camera_id) == 12): 

392 return jsonify({'success': False, 'message': 'Neispravan ID kamere.'}), 400 

393 db = get_db() 

394 try: 

395 cur = db.execute('DELETE FROM cameras WHERE user_id=? AND camera_id=?', (user_id, camera_id)) 

396 db.commit() 

397 if cur.rowcount == 0: 

398 return jsonify({'success': False, 'message': 'Kamera nije pronađena.'}), 404 

399 try: 

400 log_admin_action('delete_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}') 

401 except Exception: 

402 pass 

403 return jsonify({'success': True}) 

404 except sqlite3.Error: 

405 return jsonify({'success': False, 'message': 'Greška baze.'}), 500 

406 

407 

408@bp.route('/admin/image/delete', methods=['POST']) 

409@admin_required 

410def admin_delete_image(): 

411 data = request.get_json(silent=True) or {} 

412 rel = normalize_to_static_user_photos(data.get('rel', '')) 

413 user_id = int(data.get('user_id') or 0) 

414 if not rel or not user_id: 

415 return jsonify({'success': False, 'message': 'Nedostaju podaci.'}), 400 

416 # Validate that the image camera belongs to the specified user 

417 _, cam = parse_ts_from_any(rel) 

418 if not cam: 

419 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400 

420 db = get_db() 

421 owner = db.execute('SELECT 1 FROM cameras WHERE user_id=? AND camera_id=?', (user_id, cam)).fetchone() 

422 if not owner: 

423 return jsonify({'success': False, 'message': 'Nedozvoljeno.'}), 403 

424 abs_path = os.path.join(STATIC_PATH, rel) 

425 real_static = USER_PHOTOS_REAL if rel.startswith('User-photos/') else os.path.realpath(STATIC_PATH) 

426 real_abs = os.path.realpath(abs_path) 

427 if not real_abs.startswith(real_static): 

428 return jsonify({'success': False, 'message': 'Putanja nije dozvoljena.'}), 400 

429 if os.path.exists(abs_path): 

430 parent_dir = os.path.dirname(abs_path) 

431 if not os.access(parent_dir, os.W_OK): 

432 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403 

433 try: 

434 os.remove(abs_path) 

435 except PermissionError: 

436 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403 

437 except OSError: 

438 return jsonify({'success': False, 'message': 'Ne mogu obrisati datoteku.'}), 500 

439 # Update DB file_paths if present 

440 try: 

441 row = db.execute('SELECT file_paths FROM cameras WHERE user_id=? AND camera_id=?', (user_id, cam)).fetchone() 

442 if row and row['file_paths']: 

443 fps = [p.strip() for p in row['file_paths'].split(',') if p.strip()] 

444 name_only = os.path.basename(rel) 

445 kept = [p for p in fps if (os.path.basename(p) != name_only) and (normalize_to_static_user_photos(p) != rel)] 

446 db.execute('UPDATE cameras SET file_paths=? WHERE user_id=? AND camera_id=?', (','.join(kept), user_id, cam)) 

447 db.commit() 

448 except sqlite3.Error: 

449 pass 

450 try: 

451 log_admin_action('delete_image', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'rel={rel}') 

452 except Exception: 

453 pass 

454 return jsonify({'success': True}) 

455 

456 

457@bp.route('/admin/logs.json') 

458@admin_required 

459def admin_logs_json(): 

460 limit = max(1, min(int(request.args.get('limit', 100)), 500)) 

461 db = get_db() 

462 

463 # Get combined logs with proper context 

464 auth_logs = db.execute(''' 

465 SELECT ts, ip, user_id, username, event, detail, 'auth' as log_type 

466 FROM auth_log  

467 ORDER BY ts DESC LIMIT ? 

468 ''', (limit,)).fetchall() 

469 

470 admin_logs = db.execute(''' 

471 SELECT aa.ts, aa.ip, aa.admin_user_id, aa.target_user_id, aa.action, aa.detail, 'admin' as log_type, 

472 au.username as admin_username, tu.username as target_username 

473 FROM admin_audit aa 

474 LEFT JOIN users au ON au.id = aa.admin_user_id 

475 LEFT JOIN users tu ON tu.id = aa.target_user_id 

476 ORDER BY aa.ts DESC LIMIT ? 

477 ''', (limit,)).fetchall() 

478 

479 # Combine and sort all logs 

480 all_logs = [] 

481 

482 # Process auth logs 

483 for log in auth_logs: 

484 all_logs.append({ 

485 'ts': log['ts'], 

486 'type': 'AUTH', 

487 'ip': log['ip'] or 'Unknown', 

488 'username': log['username'] or f'User#{log["user_id"] or "?"}', 

489 'action': log['event'], 

490 'detail': log['detail'] or '', 

491 'severity': 'error' if 'fail' in log['event'].lower() or 'lock' in log['event'].lower() else 'info' 

492 }) 

493 

494 # Process admin logs 

495 for log in admin_logs: 

496 admin_name = log['admin_username'] or f'Admin#{log["admin_user_id"]}' 

497 target_name = log['target_username'] or (f'User#{log["target_user_id"]}' if log['target_user_id'] else '') 

498 

499 action_desc = log['action'] 

500 if target_name: 

501 action_desc = f"{log['action']} → {target_name}" 

502 

503 all_logs.append({ 

504 'ts': log['ts'], 

505 'type': 'ADMIN', 

506 'ip': log['ip'] or 'Unknown', 

507 'username': admin_name, 

508 'action': action_desc, 

509 'detail': log['detail'] or '', 

510 'severity': 'warning' if 'delete' in log['action'].lower() or 'remove' in log['action'].lower() else 'info' 

511 }) 

512 

513 # Sort by timestamp descending 

514 all_logs.sort(key=lambda x: x['ts'], reverse=True) 

515 

516 return jsonify({ 

517 'logs': all_logs[:limit], 

518 'total_count': len(all_logs) 

519 }) 

520 

521 

522@bp.route('/admin/logs/export') 

523@admin_required 

524def admin_logs_export(): 

525 format_type = request.args.get('format', 'csv').lower() 

526 limit = max(1, min(int(request.args.get('limit', 1000)), 5000)) 

527 

528 db = get_db() 

529 

530 # Get combined logs with proper context (same as logs.json but more limit) 

531 auth_logs = db.execute(''' 

532 SELECT ts, ip, user_id, username, event, detail, 'auth' as log_type 

533 FROM auth_log  

534 ORDER BY ts DESC LIMIT ? 

535 ''', (limit,)).fetchall() 

536 

537 admin_logs = db.execute(''' 

538 SELECT aa.ts, aa.ip, aa.admin_user_id, aa.target_user_id, aa.action, aa.detail, 'admin' as log_type, 

539 au.username as admin_username, tu.username as target_username 

540 FROM admin_audit aa 

541 LEFT JOIN users au ON au.id = aa.admin_user_id 

542 LEFT JOIN users tu ON tu.id = aa.target_user_id 

543 ORDER BY aa.ts DESC LIMIT ? 

544 ''', (limit,)).fetchall() 

545 

546 # Combine and sort all logs 

547 all_logs = [] 

548 

549 # Process auth logs 

550 for log in auth_logs: 

551 all_logs.append({ 

552 'timestamp': log['ts'], 

553 'datetime': format_dt(log['ts']) if log['ts'] else '', 

554 'type': 'AUTH', 

555 'ip': log['ip'] or 'Unknown', 

556 'username': log['username'] or f'User#{log["user_id"] or "?"}', 

557 'action': log['event'], 

558 'detail': log['detail'] or '', 

559 'severity': 'error' if 'fail' in log['event'].lower() or 'lock' in log['event'].lower() else 'info' 

560 }) 

561 

562 # Process admin logs 

563 for log in admin_logs: 

564 admin_name = log['admin_username'] or f'Admin#{log["admin_user_id"]}' 

565 target_name = log['target_username'] or (f'User#{log["target_user_id"]}' if log['target_user_id'] else '') 

566 

567 action_desc = log['action'] 

568 if target_name: 

569 action_desc = f"{log['action']} -> {target_name}" 

570 

571 all_logs.append({ 

572 'timestamp': log['ts'], 

573 'datetime': format_dt(log['ts']) if log['ts'] else '', 

574 'type': 'ADMIN', 

575 'ip': log['ip'] or 'Unknown', 

576 'username': admin_name, 

577 'action': action_desc, 

578 'detail': log['detail'] or '', 

579 'severity': 'warning' if 'delete' in log['action'].lower() or 'remove' in log['action'].lower() else 'info' 

580 }) 

581 

582 # Sort by timestamp descending 

583 all_logs.sort(key=lambda x: x['timestamp'], reverse=True) 

584 

585 if format_type == 'json': 

586 from flask import Response 

587 import json 

588 response_data = { 

589 'export_timestamp': format_dt(int(time.time())), 

590 'total_logs': len(all_logs), 

591 'logs': all_logs 

592 } 

593 response = Response( 

594 json.dumps(response_data, indent=2, ensure_ascii=False), 

595 mimetype='application/json', 

596 headers={'Content-Disposition': f'attachment; filename=audit_logs_{int(time.time())}.json'} 

597 ) 

598 return response 

599 

600 else: # CSV format 

601 from flask import Response 

602 import csv 

603 from io import StringIO 

604 

605 output = StringIO() 

606 writer = csv.writer(output) 

607 

608 # Write header 

609 writer.writerow(['Timestamp', 'DateTime', 'Type', 'IP', 'Username', 'Action', 'Detail', 'Severity']) 

610 

611 # Write data 

612 for log in all_logs: 

613 writer.writerow([ 

614 log['timestamp'], 

615 log['datetime'], 

616 log['type'], 

617 log['ip'], 

618 log['username'], 

619 log['action'], 

620 log['detail'], 

621 log['severity'] 

622 ]) 

623 

624 output.seek(0) 

625 return Response( 

626 output.getvalue(), 

627 mimetype='text/csv', 

628 headers={'Content-Disposition': f'attachment; filename=audit_logs_{int(time.time())}.csv'} 

629 ) 

630 

631 

632@bp.route('/admin/user/<int:user_id>/stats.json') 

633@admin_required 

634def admin_user_stats_json(user_id: int): 

635 db = get_db() 

636 u = db.execute('SELECT id, username, IFNULL(is_admin,0) AS is_admin FROM users WHERE id = ?', (user_id,)).fetchone() 

637 if not u: 

638 return jsonify({'error': 'not found'}), 404 

639 # Cameras 

640 cam_rows = db.execute('SELECT camera_id FROM cameras WHERE user_id = ?', (user_id,)).fetchall() 

641 cam_ids = {str(r['camera_id']) for r in cam_rows} 

642 camera_count = len(cam_ids) 

643 # Images: count, size, earliest 

644 image_count = 0 

645 total_bytes = 0 

646 earliest_ts = None 

647 root = os.path.join(STATIC_PATH, 'User-photos') 

648 import re 

649 cam_any_re = re.compile(r"[A-Za-z0-9]{12}") 

650 for dirpath, _, files in os.walk(root): 

651 for name in files: 

652 nl = name.lower() 

653 if not (nl.endswith('.jpg') or nl.endswith('.jpeg') or nl.endswith('.png') or nl.endswith('.gif') or nl.endswith('.webp')): 

654 continue 

655 ts, cam = parse_ts_from_any(name) 

656 if not cam: 

657 # Fallback: try to find any 12-char alnum token in name 

658 m = cam_any_re.findall(name) 

659 if m: 

660 for token in m[::-1]: 

661 if token in cam_ids: 

662 cam = token 

663 break 

664 if cam and (cam in cam_ids): 

665 abs_path = os.path.join(dirpath, name) 

666 try: 

667 st = os.stat(abs_path) 

668 except OSError: 

669 continue 

670 total_bytes += int(st.st_size) 

671 image_count += 1 

672 if ts is None: 

673 from datetime import datetime 

674 ts = datetime.fromtimestamp(st.st_mtime) 

675 if earliest_ts is None or ts < earliest_ts: 

676 earliest_ts = ts 

677 # Last login 

678 last = db.execute('SELECT ts, ip FROM auth_log WHERE event = ? AND (user_id = ? OR username = ?) ORDER BY ts DESC LIMIT 1', ('login_success', user_id, u['username'])).fetchone() 

679 first_auth = db.execute('SELECT ts FROM auth_log WHERE (user_id = ? OR username = ?) ORDER BY ts ASC LIMIT 1', (user_id, u['username'])).fetchone() 

680 

681 # User creation info from admin audit logs 

682 creation_info = db.execute(''' 

683 SELECT aa.ts, aa.admin_user_id, au.username as admin_username 

684 FROM admin_audit aa 

685 LEFT JOIN users au ON au.id = aa.admin_user_id 

686 WHERE aa.action = 'add_user' AND aa.detail LIKE ? 

687 ORDER BY aa.ts ASC LIMIT 1 

688 ''', (f'username={u["username"]}',)).fetchone() 

689 

690 # Format timestamps as numbers; client will render 

691 from datetime import datetime 

692 first_seen_ts = int(first_auth['ts']) if first_auth else (int(earliest_ts.timestamp()) if earliest_ts else 0) 

693 last_login_ts = int(last['ts']) if last else 0 

694 last_login_ip = (last['ip'] if last else '') or '' 

695 

696 # Creation details 

697 created_by_username = None 

698 created_at_ts = 0 

699 if creation_info: 

700 created_by_username = creation_info['admin_username'] or f"Admin #{creation_info['admin_user_id']}" 

701 created_at_ts = int(creation_info['ts']) 

702 

703 return jsonify({ 

704 'user': {'id': u['id'], 'username': u['username'], 'is_admin': int(u['is_admin'] or 0) == 1}, 

705 'camera_count': camera_count, 

706 'image_count': image_count, 

707 'total_bytes': total_bytes, 

708 'first_seen_ts': first_seen_ts, 

709 'last_login_ts': last_login_ts, 

710 'last_login_ip': last_login_ip, 

711 'created_by_username': created_by_username, 

712 'created_at_ts': created_at_ts, 

713 }) 

714 

715