Coverage for app_modules/gallery.py: 76%

124 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2gallery.py 

3 

4Purpose: 

5 Gallery pages and image deletion API. Uses the images_service to enumerate and 

6 group images by time. 

7 

8Routes: 

9 - GET /galerija 

10 - GET /galerija/<int:camera_id> 

11 - GET /slika?rel=... 

12 - POST /api/image/delete 

13""" 

14 

15import os 

16import sqlite3 

17from flask import Blueprint, render_template, request, redirect, url_for, jsonify, session, abort 

18from .security import login_required 

19from .db import get_db 

20from .paths import STATIC_PATH, USER_PHOTOS_REAL 

21from .helpers import parse_ts_from_any, format_dt, normalize_to_static_user_photos, build_media_url, resolve_media_token, build_share_url 

22from .images_service import collect_user_images, group_images_by_time 

23 

24 

25bp = Blueprint('gallery', __name__) 

26 

27 

28@bp.route('/galerija') 

29@login_required 

30def gallery_all(): 

31 images = collect_user_images(session['user_id'], get_db(), STATIC_PATH) 

32 grouped = group_images_by_time(images) 

33 sections = [] 

34 for sec in grouped: 

35 items = [{'url': build_media_url(it['rel']), 'rel': it['rel'], 'ts': format_dt(it['ts']) if it['ts'] else ''} for it in sec['items']] 

36 sections.append({'title': sec['title'], 'items': items}) 

37 return render_template('gallery_all.html', sections=sections) 

38 

39 

40@bp.route('/galerija/<int:camera_id>') 

41@login_required 

42def camera_gallery(camera_id: int): 

43 cam_id_str = str(camera_id) 

44 db = get_db() 

45 row = db.execute('SELECT camera_name FROM cameras WHERE user_id=? AND camera_id=?', (session['user_id'], camera_id)).fetchone() 

46 if not row: 

47 abort(404) 

48 camera_name = row['camera_name'] 

49 # Scan filesystem for this camera id 

50 images = [] 

51 root = os.path.join(STATIC_PATH, 'User-photos') 

52 for dirpath, _, files in os.walk(root): 

53 for name in files: 

54 if not name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): 

55 continue 

56 ts, cam = parse_ts_from_any(name) 

57 if cam == cam_id_str: 

58 abs_path = os.path.join(dirpath, name) 

59 rel = os.path.relpath(abs_path, STATIC_PATH).replace('\\', '/') 

60 ts = ts or os.path.getmtime(abs_path) 

61 images.append({'url': build_media_url(rel), 'rel': rel, 'ts': ts}) 

62 grouped = group_images_by_time(images) 

63 sections = [] 

64 for sec in grouped: 

65 items = [{'url': build_media_url(it['rel']), 'rel': it['rel'], 'ts': format_dt(it['ts']) if it['ts'] else ''} for it in sec['items']] 

66 sections.append({'title': sec['title'], 'items': items}) 

67 return render_template('camera_gallery.html', camera_id=camera_id, camera_name=camera_name, sections=sections) 

68 

69 

70@bp.route('/slika') 

71@login_required 

72def image_detail(): 

73 rel_or_token = request.args.get('rel', type=str, default='') 

74 if not rel_or_token: 

75 return redirect(url_for('gallery.gallery_all')) 

76 # Accept either a raw rel path (legacy) or a signed token 

77 rel = resolve_media_token(rel_or_token) or normalize_to_static_user_photos(rel_or_token) 

78 ts, cam = parse_ts_from_any(rel) 

79 if not cam: 

80 abort(404) 

81 # Ownership enforcement: ensure the camera belongs to the current user 

82 db = get_db() 

83 owner = db.execute('SELECT 1 FROM cameras WHERE user_id=? AND camera_id=?', (session['user_id'], cam)).fetchone() 

84 if not owner: 

85 abort(404) 

86 return render_template('image_detail.html', image_url=build_media_url(rel), timestamp=format_dt(ts) if ts else '', tags=['Image'], rel=rel) 

87 

88 

89@bp.route('/api/image/delete', methods=['POST']) 

90@login_required 

91def api_delete_image(): 

92 data = request.get_json(silent=True) or {} 

93 rel = normalize_to_static_user_photos(data.get('rel', '')) 

94 if not rel: 

95 return jsonify({'success': False, 'message': 'Nedostaje slika.'}), 400 

96 # Validate camera ownership by parsing camera id from filename 

97 _, cam = parse_ts_from_any(rel) 

98 if not cam: 

99 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400 

100 db = get_db() 

101 owner = db.execute('SELECT 1 FROM cameras WHERE user_id=? AND camera_id=?', (session['user_id'], cam)).fetchone() 

102 if not owner: 

103 return jsonify({'success': False, 'message': 'Nedozvoljeno.'}), 403 

104 # Remove file from disk if it is under STATIC_PATH 

105 abs_path = os.path.join(STATIC_PATH, rel) 

106 real_static = USER_PHOTOS_REAL if rel.startswith('User-photos/') else os.path.realpath(STATIC_PATH) 

107 real_abs = os.path.realpath(abs_path) 

108 # Ensure path is within STATIC_PATH 

109 if not real_abs.startswith(real_static): 

110 return jsonify({'success': False, 'message': 'Putanja nije dozvoljena.'}), 400 

111 if os.path.exists(abs_path): 

112 # Deletion requires write permission on the containing directory, not on the file itself 

113 parent_dir = os.path.dirname(abs_path) 

114 if not os.access(parent_dir, os.W_OK): 

115 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403 

116 try: 

117 os.remove(abs_path) 

118 except PermissionError: 

119 return jsonify({'success': False, 'message': 'Server nema dozvolu za brisanje datoteke.'}), 403 

120 except OSError: 

121 return jsonify({'success': False, 'message': 'Ne mogu obrisati datoteku.'}), 500 

122 # Remove from DB paths if present 

123 try: 

124 row = db.execute('SELECT file_paths FROM cameras WHERE user_id=? AND camera_id=?', (session['user_id'], cam)).fetchone() 

125 if row and row['file_paths']: 

126 fps = [p.strip() for p in row['file_paths'].split(',') if p.strip()] 

127 # match by normalized path or just filename 

128 name_only = os.path.basename(rel) 

129 kept = [p for p in fps if (os.path.basename(p) != name_only) and (normalize_to_static_user_photos(p) != rel)] 

130 db.execute('UPDATE cameras SET file_paths=? WHERE user_id=? AND camera_id=?', (','.join(kept), session['user_id'], cam)) 

131 db.commit() 

132 except sqlite3.Error: 

133 # Do not fail deletion if DB update fails; image removal already done 

134 pass 

135 return jsonify({'success': True}) 

136 

137 

138@bp.route('/share_link', methods=['POST']) 

139@login_required 

140def user_generate_share_link(): 

141 """Generate a short-lived public share URL for an image owned by the current user. 

142 Body: { rel: 'User-photos/PICT_...', ttl_minutes: 30 } 

143 """ 

144 data = request.get_json(silent=True) or {} 

145 rel = normalize_to_static_user_photos(str(data.get('rel') or '')) 

146 ttl_minutes = int(data.get('ttl_minutes') or 14*24*60) 

147 ttl_minutes = max(1, min(ttl_minutes, 24*60)) 

148 if not rel: 

149 return jsonify({'success': False, 'message': 'Nedostaje slika.'}), 400 

150 ts, cam = parse_ts_from_any(rel) 

151 if not cam: 

152 return jsonify({'success': False, 'message': 'Neispravno ime datoteke.'}), 400 

153 # Verify ownership 

154 db = get_db() 

155 owner = db.execute('SELECT 1 FROM cameras WHERE user_id=? AND camera_id=?', (session['user_id'], cam)).fetchone() 

156 if not owner: 

157 return jsonify({'success': False, 'message': 'Nedozvoljeno.'}), 403 

158 # Verify file exists 

159 abs_path = os.path.join(STATIC_PATH, rel) 

160 if not os.path.exists(abs_path): 

161 return jsonify({'success': False, 'message': 'Datoteka ne postoji.'}), 404 

162 # Build share URL 

163 import time 

164 exp = int(time.time()) + ttl_minutes * 60 

165 url = build_share_url(rel, exp) 

166 return jsonify({'success': True, 'url': url, 'expires_at': exp}) 

167 

168