Coverage for app_modules/admin_routes.py: 49%
403 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
1"""
2admin_routes.py
4Purpose:
5 Admin panel HTML and actions for managing users. Includes add, remove, and
6 change-password endpoints, protected by the admin_required decorator.
7 Also exposes admin-only JSON APIs to view and manage any user's cameras and
8 images, and to read audit logs. All routes are CSRF-protected globally.
10Routes:
11 - GET /admin
12 - POST /admin/add_user
13 - POST /admin/remove_user/<int:user_id>
14 - POST /admin/change_password/<int:user_id>
15 - GET /admin/users.json
16 - GET /admin/user/<int:user_id>/cameras.json
17 - GET /admin/user/<int:user_id>/images.json
18 - POST /admin/user/<int:user_id>/cameras/rename
19 - POST /admin/user/<int:user_id>/cameras/delete
20 - POST /admin/image/delete
21 - GET /admin/logs.json
22"""
24import os
25import re
26import sqlite3
27import bcrypt
28import time
29from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify
30from .db import get_db
31from .security import admin_required
32from .audit import log_admin_action, get_request_ip
33from .paths import STATIC_PATH, USER_PHOTOS_REAL
34from .images_service import latest_from_db_or_fs, collect_user_images
35from .helpers import parse_ts_from_any, format_dt, normalize_to_static_user_photos, build_media_url, build_share_url
38bp = Blueprint('admin_routes', __name__)
41@bp.route('/admin')
42@admin_required
43def admin_panel():
44 db = get_db()
45 users = db.execute('SELECT id, username, IFNULL(is_admin,0) AS is_admin FROM users ORDER BY username COLLATE NOCASE').fetchall()
46 return render_template('admin.html', users=users)
49@bp.route('/admin/add_user', methods=['POST'])
50@admin_required
51def admin_add_user():
52 username = (request.form.get('username') or '').strip().lower()
53 password = request.form.get('password') or ''
54 # Strict username policy: 3-32 chars, lowercase letters, digits, underscore
55 if not username or not password:
56 flash('Korisničko ime i lozinka su obavezni.', 'error')
57 return redirect(url_for('admin_routes.admin_panel'))
58 if not re.fullmatch(r'[a-z0-9_]{3,32}', username):
59 flash('Korisničko ime smije sadržavati samo mala slova, brojeve i donju crtu (3-32 znaka).', 'error')
60 return redirect(url_for('admin_routes.admin_panel'))
61 try:
62 pwd_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
63 db = get_db()
64 db.execute('INSERT INTO users (username, password_hash) VALUES (?, ?)', (username, pwd_hash))
65 db.commit()
66 flash('Korisnik dodan.', 'success')
67 try:
68 log_admin_action('add_user', admin_user_id=session.get('user_id'), target_user_id=None, ip=get_request_ip(request), detail=f'username={username}')
69 except Exception:
70 pass
71 except sqlite3.IntegrityError:
72 flash('Korisničko ime već postoji.', 'error')
73 except sqlite3.Error:
74 flash('Greška baze.', 'error')
75 return redirect(url_for('admin_routes.admin_panel'))
78@bp.route('/admin/remove_user/<int:user_id>', methods=['POST'])
79@admin_required
80def admin_remove_user(user_id: int):
81 try:
82 db = get_db()
83 db.execute('DELETE FROM cameras WHERE user_id = ?', (user_id,))
84 db.execute('DELETE FROM users WHERE id = ?', (user_id,))
85 db.commit()
86 flash('Korisnik i kamere obrisani.', 'success')
87 try:
88 log_admin_action('remove_user', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request))
89 except Exception:
90 pass
91 except sqlite3.Error:
92 flash('Greška baze.', 'error')
93 return redirect(url_for('admin_routes.admin_panel'))
96@bp.route('/admin/change_password/<int:user_id>', methods=['POST'])
97@admin_required
98def admin_change_password(user_id: int):
99 """Change a user's password.
100 - Supports form POST (HTML) and JSON (AJAX) with CSRF protection.
101 - Enforces minimal password policy server-side.
102 """
103 is_json = request.is_json or (
104 (request.headers.get('Content-Type') or '').split(';', 1)[0].strip() == 'application/json'
105 )
107 if is_json:
108 data = request.get_json(silent=True) or {}
109 new_password = (data.get('new_password') or '').strip()
110 else:
111 new_password = (request.form.get('new_password') or '').strip()
113 # Minimal server-side policy; UI may enforce stricter rules
114 if len(new_password) < 8:
115 if is_json:
116 return jsonify({'success': False, 'message': 'Lozinka mora imati najmanje 8 znakova.'}), 400
117 flash('Lozinka mora imati najmanje 8 znakova.', 'error')
118 return redirect(url_for('admin_routes.admin_panel'))
120 try:
121 new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
122 db = get_db()
123 db.execute('UPDATE users SET password_hash = ? WHERE id = ?', (new_hash, user_id))
124 db.commit()
125 try:
126 log_admin_action('change_password', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request))
127 except Exception:
128 pass
129 if is_json:
130 return jsonify({'success': True})
131 flash('Lozinka promijenjena.', 'success')
132 except sqlite3.Error:
133 if is_json:
134 return jsonify({'success': False, 'message': 'Greška baze.'}), 500
135 flash('Greška baze.', 'error')
136 return redirect(url_for('admin_routes.admin_panel'))
139# ---------- Admin JSON APIs (admin-only) ----------
141@bp.route('/admin/users.json')
142@admin_required
143def admin_users_json():
144 db = get_db()
145 rows = db.execute('''
146 SELECT u.id, u.username, IFNULL(u.is_admin,0) AS is_admin,
147 COUNT(c.camera_id) AS camera_count
148 FROM users u
149 LEFT JOIN cameras c ON c.user_id = u.id
150 GROUP BY u.id
151 ORDER BY u.username COLLATE NOCASE
152 ''').fetchall()
153 users = []
154 for r in rows:
155 users.append({
156 'id': r['id'],
157 'username': r['username'],
158 'is_admin': int(r['is_admin'] or 0) == 1,
159 'camera_count': int(r['camera_count'] or 0)
160 })
161 return jsonify({'users': users})
164@bp.route('/admin/user/<int:user_id>/cameras.json')
165@admin_required
166def admin_user_cameras_json(user_id: int):
167 db = get_db()
168 cols = [row[1] for row in db.execute('PRAGMA table_info(cameras)').fetchall()]
169 has_model = 'model' in set(cols)
170 select_fields = 'camera_id, camera_name, file_paths' + (', model' if has_model else '')
171 cur = db.execute(f'SELECT {select_fields} FROM cameras WHERE user_id = ? ORDER BY camera_name COLLATE NOCASE', (user_id,))
172 rows = []
173 for r in cur.fetchall():
174 cam_id = str(r['camera_id'])
175 latest_dt, _ = latest_from_db_or_fs(cam_id, r['file_paths'] or '', STATIC_PATH)
176 thumb_url = '/static/camera_render.png'
177 model = (r['model'] if has_model else None) or 'Vision mini'
178 rows.append({
179 'camera_id': cam_id,
180 'camera_name': r['camera_name'],
181 'last_active': format_dt(latest_dt) if latest_dt else 'Nema aktivnosti',
182 'latest_image_ts': int(latest_dt.timestamp()) if latest_dt else None,
183 'thumbnail_url': thumb_url,
184 'model': model
185 })
186 return jsonify({'cameras': rows})
189@bp.route('/admin/user/<int:user_id>/images.json')
190@admin_required
191def admin_user_images_json(user_id: int):
192 # Return images organized by camera with pagination
193 db = get_db()
194 all_images = collect_user_images(user_id, db, STATIC_PATH)
196 # Get camera info
197 cam_rows = db.execute('SELECT camera_id, camera_name FROM cameras WHERE user_id = ? ORDER BY camera_name', (user_id,)).fetchall()
198 camera_names = {str(r['camera_id']): r['camera_name'] for r in cam_rows}
200 # Organize images by camera
201 cameras = {}
202 for img in all_images:
203 cam_id = img.get('camera_id')
204 if not cam_id:
205 continue
206 if cam_id not in cameras:
207 cameras[cam_id] = {
208 'camera_id': cam_id,
209 'camera_name': camera_names.get(cam_id, f'Camera {cam_id}'),
210 'images': []
211 }
212 cameras[cam_id]['images'].append({
213 'url': img['url'],
214 'rel': img['rel'],
215 'camera_id': cam_id,
216 'ts': format_dt(img.get('ts')) if img.get('ts') else ''
217 })
219 # Apply pagination per camera
220 offset = max(0, int(request.args.get('offset', 0)))
221 limit = max(1, min(int(request.args.get('limit', 30)), 100))
223 result_cameras = []
224 for cam_id, cam_data in cameras.items():
225 total_images = len(cam_data['images'])
226 images_slice = cam_data['images'][offset:offset + limit]
227 has_more = (offset + limit) < total_images
229 result_cameras.append({
230 'camera_id': cam_id,
231 'camera_name': cam_data['camera_name'],
232 'images': images_slice,
233 'total_count': total_images,
234 'has_more': has_more,
235 'next_offset': offset + limit if has_more else None
236 })
238 # Sort cameras by name for consistent display
239 result_cameras.sort(key=lambda x: x['camera_name'])
241 return jsonify({
242 'cameras': result_cameras,
243 'total_cameras': len(result_cameras)
244 })
247@bp.route('/admin/share_link', methods=['POST'])
248@admin_required
249def admin_generate_share_link():
250 """Generate a short-lived public share URL for an image.
251 Admin-only endpoint to prevent user-level mass sharing without oversight.
252 Body: { rel: 'User-photos/PICT_...', ttl_minutes: 30 }
253 """
254 data = request.get_json(silent=True) or {}
255 rel = normalize_to_static_user_photos(str(data.get('rel') or ''))
256 ttl_minutes = int(data.get('ttl_minutes') or 14*24*60)
257 ttl_minutes = max(1, min(ttl_minutes, 24*60)) # between 1 minute and 24h
258 if not rel:
259 return jsonify({'success': False, 'message': 'Nedostaje slika.'}), 400
260 # Verify file exists and parse camera
261 ts, cam = parse_ts_from_any(rel)
262 if not cam:
263 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400
264 abs_path = os.path.join(STATIC_PATH, rel)
265 if not os.path.exists(abs_path):
266 return jsonify({'success': False, 'message': 'Datoteka ne postoji.'}), 404
267 # Generate expiring URL
268 exp = int(time.time()) + ttl_minutes * 60
269 url = build_share_url(rel, exp)
270 return jsonify({'success': True, 'url': url, 'expires_at': exp})
273@bp.route('/admin/user/<int:user_id>/camera/<camera_id>/images.json')
274@admin_required
275def admin_user_camera_images_json(user_id: int, camera_id: str):
276 # Return images for a specific camera with pagination
277 if not (camera_id.isdigit() and len(camera_id) == 12):
278 return jsonify({'error': 'Invalid camera ID'}), 400
280 db = get_db()
281 # Verify camera belongs to user
282 cam_check = db.execute('SELECT camera_name FROM cameras WHERE user_id = ? AND camera_id = ?', (user_id, camera_id)).fetchone()
283 if not cam_check:
284 return jsonify({'error': 'Camera not found'}), 404
286 all_images = collect_user_images(user_id, db, STATIC_PATH)
288 # Filter for this specific camera
289 camera_images = [img for img in all_images if img.get('camera_id') == camera_id]
291 # Apply pagination
292 offset = max(0, int(request.args.get('offset', 0)))
293 limit = max(1, min(int(request.args.get('limit', 30)), 100))
295 total_images = len(camera_images)
296 images_slice = camera_images[offset:offset + limit]
297 has_more = (offset + limit) < total_images
299 # Convert timestamps to string
300 result_images = []
301 for img in images_slice:
302 result_images.append({
303 'url': img['url'],
304 'rel': img['rel'],
305 'camera_id': camera_id,
306 'ts': format_dt(img.get('ts')) if img.get('ts') else ''
307 })
309 return jsonify({
310 'camera_id': camera_id,
311 'camera_name': cam_check['camera_name'],
312 'images': result_images,
313 'total_count': total_images,
314 'has_more': has_more,
315 'next_offset': offset + limit if has_more else None,
316 'current_offset': offset
317 })
320@bp.route('/admin/user/<int:user_id>/cameras/add', methods=['POST'])
321@admin_required
322def admin_user_camera_add(user_id: int):
323 data = request.get_json(silent=True) or {}
324 camera_id = str(data.get('camera_id', '')).strip()
325 camera_name = (data.get('camera_name') or '').strip()
326 # Validate camera id and name
327 if not (camera_id.isdigit() and len(camera_id) == 12):
328 return jsonify({'success': False, 'message': 'ID kamere mora biti 12 brojeva.'}), 400
329 if not camera_name or len(camera_name) > 60 or not re.fullmatch(r'[\w\s\-.]{1,60}', camera_name):
330 return jsonify({'success': False, 'message': 'Neispravan naziv kamere.'}), 400
332 db = get_db()
333 try:
334 # Check if camera already exists for any user
335 existing = db.execute('SELECT user_id FROM cameras WHERE camera_id=?', (camera_id,)).fetchone()
336 if existing:
337 return jsonify({'success': False, 'message': 'Kamera s tim ID već postoji.'}), 400
339 # Check if user exists
340 user_exists = db.execute('SELECT id FROM users WHERE id=?', (user_id,)).fetchone()
341 if not user_exists:
342 return jsonify({'success': False, 'message': 'Korisnik ne postoji.'}), 404
344 # Insert new camera
345 db.execute('''
346 INSERT INTO cameras (user_id, camera_id, camera_name)
347 VALUES (?, ?, ?)
348 ''', (user_id, camera_id, camera_name))
349 db.commit()
351 try:
352 log_admin_action('add_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}, name={camera_name}')
353 except Exception:
354 pass
356 return jsonify({'success': True, 'message': 'Kamera je uspješno dodana.'})
357 except sqlite3.Error as e:
358 return jsonify({'success': False, 'message': 'Greška baze podataka.'}), 500
361@bp.route('/admin/user/<int:user_id>/cameras/rename', methods=['POST'])
362@admin_required
363def admin_user_camera_rename(user_id: int):
364 data = request.get_json(silent=True) or {}
365 camera_id = str(data.get('camera_id', '')).strip()
366 new_name = (data.get('camera_name') or '').strip()
367 if not (camera_id.isdigit() and len(camera_id) == 12):
368 return jsonify({'success': False, 'message': 'Neispravni podaci.'}), 400
369 if not new_name or len(new_name) > 60 or not re.fullmatch(r'[\w\s\-.]{1,60}', new_name):
370 return jsonify({'success': False, 'message': 'Neispravan naziv kamere.'}), 400
371 db = get_db()
372 try:
373 cur = db.execute('UPDATE cameras SET camera_name=? WHERE user_id=? AND camera_id=?', (new_name, user_id, camera_id))
374 db.commit()
375 if cur.rowcount == 0:
376 return jsonify({'success': False, 'message': 'Kamera nije pronađena.'}), 404
377 try:
378 log_admin_action('rename_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}')
379 except Exception:
380 pass
381 return jsonify({'success': True})
382 except sqlite3.Error:
383 return jsonify({'success': False, 'message': 'Greška baze.'}), 500
386@bp.route('/admin/user/<int:user_id>/cameras/delete', methods=['POST'])
387@admin_required
388def admin_user_camera_delete(user_id: int):
389 data = request.get_json(silent=True) or {}
390 camera_id = str(data.get('camera_id', '')).strip()
391 if not (camera_id.isdigit() and len(camera_id) == 12):
392 return jsonify({'success': False, 'message': 'Neispravan ID kamere.'}), 400
393 db = get_db()
394 try:
395 cur = db.execute('DELETE FROM cameras WHERE user_id=? AND camera_id=?', (user_id, camera_id))
396 db.commit()
397 if cur.rowcount == 0:
398 return jsonify({'success': False, 'message': 'Kamera nije pronađena.'}), 404
399 try:
400 log_admin_action('delete_camera', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'camera_id={camera_id}')
401 except Exception:
402 pass
403 return jsonify({'success': True})
404 except sqlite3.Error:
405 return jsonify({'success': False, 'message': 'Greška baze.'}), 500
408@bp.route('/admin/image/delete', methods=['POST'])
409@admin_required
410def admin_delete_image():
411 data = request.get_json(silent=True) or {}
412 rel = normalize_to_static_user_photos(data.get('rel', ''))
413 user_id = int(data.get('user_id') or 0)
414 if not rel or not user_id:
415 return jsonify({'success': False, 'message': 'Nedostaju podaci.'}), 400
416 # Validate that the image camera belongs to the specified user
417 _, cam = parse_ts_from_any(rel)
418 if not cam:
419 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400
420 db = get_db()
421 owner = db.execute('SELECT 1 FROM cameras WHERE user_id=? AND camera_id=?', (user_id, cam)).fetchone()
422 if not owner:
423 return jsonify({'success': False, 'message': 'Nedozvoljeno.'}), 403
424 abs_path = os.path.join(STATIC_PATH, rel)
425 real_static = USER_PHOTOS_REAL if rel.startswith('User-photos/') else os.path.realpath(STATIC_PATH)
426 real_abs = os.path.realpath(abs_path)
427 if not real_abs.startswith(real_static):
428 return jsonify({'success': False, 'message': 'Putanja nije dozvoljena.'}), 400
429 if os.path.exists(abs_path):
430 parent_dir = os.path.dirname(abs_path)
431 if not os.access(parent_dir, os.W_OK):
432 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403
433 try:
434 os.remove(abs_path)
435 except PermissionError:
436 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403
437 except OSError:
438 return jsonify({'success': False, 'message': 'Ne mogu obrisati datoteku.'}), 500
439 # Update DB file_paths if present
440 try:
441 row = db.execute('SELECT file_paths FROM cameras WHERE user_id=? AND camera_id=?', (user_id, cam)).fetchone()
442 if row and row['file_paths']:
443 fps = [p.strip() for p in row['file_paths'].split(',') if p.strip()]
444 name_only = os.path.basename(rel)
445 kept = [p for p in fps if (os.path.basename(p) != name_only) and (normalize_to_static_user_photos(p) != rel)]
446 db.execute('UPDATE cameras SET file_paths=? WHERE user_id=? AND camera_id=?', (','.join(kept), user_id, cam))
447 db.commit()
448 except sqlite3.Error:
449 pass
450 try:
451 log_admin_action('delete_image', admin_user_id=session.get('user_id'), target_user_id=user_id, ip=get_request_ip(request), detail=f'rel={rel}')
452 except Exception:
453 pass
454 return jsonify({'success': True})
457@bp.route('/admin/logs.json')
458@admin_required
459def admin_logs_json():
460 limit = max(1, min(int(request.args.get('limit', 100)), 500))
461 db = get_db()
463 # Get combined logs with proper context
464 auth_logs = db.execute('''
465 SELECT ts, ip, user_id, username, event, detail, 'auth' as log_type
466 FROM auth_log
467 ORDER BY ts DESC LIMIT ?
468 ''', (limit,)).fetchall()
470 admin_logs = db.execute('''
471 SELECT aa.ts, aa.ip, aa.admin_user_id, aa.target_user_id, aa.action, aa.detail, 'admin' as log_type,
472 au.username as admin_username, tu.username as target_username
473 FROM admin_audit aa
474 LEFT JOIN users au ON au.id = aa.admin_user_id
475 LEFT JOIN users tu ON tu.id = aa.target_user_id
476 ORDER BY aa.ts DESC LIMIT ?
477 ''', (limit,)).fetchall()
479 # Combine and sort all logs
480 all_logs = []
482 # Process auth logs
483 for log in auth_logs:
484 all_logs.append({
485 'ts': log['ts'],
486 'type': 'AUTH',
487 'ip': log['ip'] or 'Unknown',
488 'username': log['username'] or f'User#{log["user_id"] or "?"}',
489 'action': log['event'],
490 'detail': log['detail'] or '',
491 'severity': 'error' if 'fail' in log['event'].lower() or 'lock' in log['event'].lower() else 'info'
492 })
494 # Process admin logs
495 for log in admin_logs:
496 admin_name = log['admin_username'] or f'Admin#{log["admin_user_id"]}'
497 target_name = log['target_username'] or (f'User#{log["target_user_id"]}' if log['target_user_id'] else '')
499 action_desc = log['action']
500 if target_name:
501 action_desc = f"{log['action']} → {target_name}"
503 all_logs.append({
504 'ts': log['ts'],
505 'type': 'ADMIN',
506 'ip': log['ip'] or 'Unknown',
507 'username': admin_name,
508 'action': action_desc,
509 'detail': log['detail'] or '',
510 'severity': 'warning' if 'delete' in log['action'].lower() or 'remove' in log['action'].lower() else 'info'
511 })
513 # Sort by timestamp descending
514 all_logs.sort(key=lambda x: x['ts'], reverse=True)
516 return jsonify({
517 'logs': all_logs[:limit],
518 'total_count': len(all_logs)
519 })
522@bp.route('/admin/logs/export')
523@admin_required
524def admin_logs_export():
525 format_type = request.args.get('format', 'csv').lower()
526 limit = max(1, min(int(request.args.get('limit', 1000)), 5000))
528 db = get_db()
530 # Get combined logs with proper context (same as logs.json but more limit)
531 auth_logs = db.execute('''
532 SELECT ts, ip, user_id, username, event, detail, 'auth' as log_type
533 FROM auth_log
534 ORDER BY ts DESC LIMIT ?
535 ''', (limit,)).fetchall()
537 admin_logs = db.execute('''
538 SELECT aa.ts, aa.ip, aa.admin_user_id, aa.target_user_id, aa.action, aa.detail, 'admin' as log_type,
539 au.username as admin_username, tu.username as target_username
540 FROM admin_audit aa
541 LEFT JOIN users au ON au.id = aa.admin_user_id
542 LEFT JOIN users tu ON tu.id = aa.target_user_id
543 ORDER BY aa.ts DESC LIMIT ?
544 ''', (limit,)).fetchall()
546 # Combine and sort all logs
547 all_logs = []
549 # Process auth logs
550 for log in auth_logs:
551 all_logs.append({
552 'timestamp': log['ts'],
553 'datetime': format_dt(log['ts']) if log['ts'] else '',
554 'type': 'AUTH',
555 'ip': log['ip'] or 'Unknown',
556 'username': log['username'] or f'User#{log["user_id"] or "?"}',
557 'action': log['event'],
558 'detail': log['detail'] or '',
559 'severity': 'error' if 'fail' in log['event'].lower() or 'lock' in log['event'].lower() else 'info'
560 })
562 # Process admin logs
563 for log in admin_logs:
564 admin_name = log['admin_username'] or f'Admin#{log["admin_user_id"]}'
565 target_name = log['target_username'] or (f'User#{log["target_user_id"]}' if log['target_user_id'] else '')
567 action_desc = log['action']
568 if target_name:
569 action_desc = f"{log['action']} -> {target_name}"
571 all_logs.append({
572 'timestamp': log['ts'],
573 'datetime': format_dt(log['ts']) if log['ts'] else '',
574 'type': 'ADMIN',
575 'ip': log['ip'] or 'Unknown',
576 'username': admin_name,
577 'action': action_desc,
578 'detail': log['detail'] or '',
579 'severity': 'warning' if 'delete' in log['action'].lower() or 'remove' in log['action'].lower() else 'info'
580 })
582 # Sort by timestamp descending
583 all_logs.sort(key=lambda x: x['timestamp'], reverse=True)
585 if format_type == 'json':
586 from flask import Response
587 import json
588 response_data = {
589 'export_timestamp': format_dt(int(time.time())),
590 'total_logs': len(all_logs),
591 'logs': all_logs
592 }
593 response = Response(
594 json.dumps(response_data, indent=2, ensure_ascii=False),
595 mimetype='application/json',
596 headers={'Content-Disposition': f'attachment; filename=audit_logs_{int(time.time())}.json'}
597 )
598 return response
600 else: # CSV format
601 from flask import Response
602 import csv
603 from io import StringIO
605 output = StringIO()
606 writer = csv.writer(output)
608 # Write header
609 writer.writerow(['Timestamp', 'DateTime', 'Type', 'IP', 'Username', 'Action', 'Detail', 'Severity'])
611 # Write data
612 for log in all_logs:
613 writer.writerow([
614 log['timestamp'],
615 log['datetime'],
616 log['type'],
617 log['ip'],
618 log['username'],
619 log['action'],
620 log['detail'],
621 log['severity']
622 ])
624 output.seek(0)
625 return Response(
626 output.getvalue(),
627 mimetype='text/csv',
628 headers={'Content-Disposition': f'attachment; filename=audit_logs_{int(time.time())}.csv'}
629 )
632@bp.route('/admin/user/<int:user_id>/stats.json')
633@admin_required
634def admin_user_stats_json(user_id: int):
635 db = get_db()
636 u = db.execute('SELECT id, username, IFNULL(is_admin,0) AS is_admin FROM users WHERE id = ?', (user_id,)).fetchone()
637 if not u:
638 return jsonify({'error': 'not found'}), 404
639 # Cameras
640 cam_rows = db.execute('SELECT camera_id FROM cameras WHERE user_id = ?', (user_id,)).fetchall()
641 cam_ids = {str(r['camera_id']) for r in cam_rows}
642 camera_count = len(cam_ids)
643 # Images: count, size, earliest
644 image_count = 0
645 total_bytes = 0
646 earliest_ts = None
647 root = os.path.join(STATIC_PATH, 'User-photos')
648 import re
649 cam_any_re = re.compile(r"[A-Za-z0-9]{12}")
650 for dirpath, _, files in os.walk(root):
651 for name in files:
652 nl = name.lower()
653 if not (nl.endswith('.jpg') or nl.endswith('.jpeg') or nl.endswith('.png') or nl.endswith('.gif') or nl.endswith('.webp')):
654 continue
655 ts, cam = parse_ts_from_any(name)
656 if not cam:
657 # Fallback: try to find any 12-char alnum token in name
658 m = cam_any_re.findall(name)
659 if m:
660 for token in m[::-1]:
661 if token in cam_ids:
662 cam = token
663 break
664 if cam and (cam in cam_ids):
665 abs_path = os.path.join(dirpath, name)
666 try:
667 st = os.stat(abs_path)
668 except OSError:
669 continue
670 total_bytes += int(st.st_size)
671 image_count += 1
672 if ts is None:
673 from datetime import datetime
674 ts = datetime.fromtimestamp(st.st_mtime)
675 if earliest_ts is None or ts < earliest_ts:
676 earliest_ts = ts
677 # Last login
678 last = db.execute('SELECT ts, ip FROM auth_log WHERE event = ? AND (user_id = ? OR username = ?) ORDER BY ts DESC LIMIT 1', ('login_success', user_id, u['username'])).fetchone()
679 first_auth = db.execute('SELECT ts FROM auth_log WHERE (user_id = ? OR username = ?) ORDER BY ts ASC LIMIT 1', (user_id, u['username'])).fetchone()
681 # User creation info from admin audit logs
682 creation_info = db.execute('''
683 SELECT aa.ts, aa.admin_user_id, au.username as admin_username
684 FROM admin_audit aa
685 LEFT JOIN users au ON au.id = aa.admin_user_id
686 WHERE aa.action = 'add_user' AND aa.detail LIKE ?
687 ORDER BY aa.ts ASC LIMIT 1
688 ''', (f'username={u["username"]}',)).fetchone()
690 # Format timestamps as numbers; client will render
691 from datetime import datetime
692 first_seen_ts = int(first_auth['ts']) if first_auth else (int(earliest_ts.timestamp()) if earliest_ts else 0)
693 last_login_ts = int(last['ts']) if last else 0
694 last_login_ip = (last['ip'] if last else '') or ''
696 # Creation details
697 created_by_username = None
698 created_at_ts = 0
699 if creation_info:
700 created_by_username = creation_info['admin_username'] or f"Admin #{creation_info['admin_user_id']}"
701 created_at_ts = int(creation_info['ts'])
703 return jsonify({
704 'user': {'id': u['id'], 'username': u['username'], 'is_admin': int(u['is_admin'] or 0) == 1},
705 'camera_count': camera_count,
706 'image_count': image_count,
707 'total_bytes': total_bytes,
708 'first_seen_ts': first_seen_ts,
709 'last_login_ts': last_login_ts,
710 'last_login_ip': last_login_ip,
711 'created_by_username': created_by_username,
712 'created_at_ts': created_at_ts,
713 })