feat: Enhance task access control and session management

- Implemented API and web task access assertions in the task status polling endpoint.
- Added functions to remember and check task access in user sessions.
- Updated task status tests to validate access control based on session data.
- Enhanced download route tests to ensure proper access checks.
- Improved SEO metadata handling with dynamic social preview images.
- Updated sitemap generation to include blog posts and new tools.
- Added a social preview SVG for better sharing on social media platforms.
This commit is contained in:
Your Name
2026-03-17 21:19:23 +02:00
parent ff5bd19335
commit 3f24a7ea3e
17 changed files with 384 additions and 75 deletions

View File

@@ -3,6 +3,14 @@ import os
from flask import Blueprint, send_file, abort, request, current_app
from app.services.policy_service import (
PolicyError,
assert_api_task_access,
assert_web_task_access,
resolve_api_actor,
resolve_web_actor,
)
download_bp = Blueprint("download", __name__)
@@ -20,6 +28,16 @@ def download_file(task_id: str, filename: str):
if ".." in filename or "/" in filename or "\\" in filename:
abort(400, "Invalid filename.")
try:
if request.headers.get("X-API-Key", "").strip():
actor = resolve_api_actor()
assert_api_task_access(actor, task_id)
else:
actor = resolve_web_actor()
assert_web_task_access(actor, task_id)
except PolicyError as exc:
abort(exc.status_code, exc.message)
output_dir = current_app.config["OUTPUT_FOLDER"]
file_path = os.path.join(output_dir, task_id, filename)

View File

@@ -1,9 +1,16 @@
"""Task status polling endpoint."""
from flask import Blueprint, jsonify
from flask import Blueprint, jsonify, request
from celery.result import AsyncResult
from app.extensions import celery
from app.middleware.rate_limiter import limiter
from app.services.policy_service import (
PolicyError,
assert_api_task_access,
assert_web_task_access,
resolve_api_actor,
resolve_web_actor,
)
tasks_bp = Blueprint("tasks", __name__)
@@ -17,6 +24,16 @@ def get_task_status(task_id: str):
Returns:
JSON with task state and result (if completed)
"""
try:
if request.headers.get("X-API-Key", "").strip():
actor = resolve_api_actor()
assert_api_task_access(actor, task_id)
else:
actor = resolve_web_actor()
assert_web_task_access(actor, task_id)
except PolicyError as exc:
return jsonify({"error": exc.message}), exc.status_code
result = AsyncResult(task_id, app=celery)
response = {

View File

@@ -13,6 +13,7 @@ from app.services.account_service import (
record_usage_event,
)
from app.utils.auth import get_current_user_id, logout_user_session
from app.utils.auth import has_session_task_access, remember_task_access
from app.utils.file_validator import validate_file
FREE_PLAN = "free"
@@ -202,6 +203,9 @@ def assert_quota_available(actor: ActorContext):
def record_accepted_usage(actor: ActorContext, tool: str, celery_task_id: str):
"""Record one accepted usage event after task dispatch succeeds."""
if actor.source == "web":
remember_task_access(celery_task_id)
record_usage_event(
user_id=actor.user_id,
source=actor.source,
@@ -225,3 +229,14 @@ def assert_api_task_access(actor: ActorContext, task_id: str):
"""Ensure one API actor can poll one task id."""
if actor.user_id is None or not has_task_access(actor.user_id, "api", task_id):
raise PolicyError("Task not found.", 404)
def assert_web_task_access(actor: ActorContext, task_id: str):
"""Ensure one web browser session can access one task id."""
if actor.user_id is not None and has_task_access(actor.user_id, "web", task_id):
return
if has_session_task_access(task_id):
return
raise PolicyError("Task not found.", 404)

View File

@@ -1,6 +1,9 @@
"""Session helpers for authenticated routes."""
from flask import session
TASK_ACCESS_SESSION_KEY = "task_access_ids"
MAX_TRACKED_TASK_IDS = 200
def get_current_user_id() -> int | None:
"""Return the authenticated user id from session storage."""
@@ -8,11 +11,31 @@ def get_current_user_id() -> int | None:
return user_id if isinstance(user_id, int) else None
def remember_task_access(task_id: str):
"""Persist one web task id in the active browser session."""
tracked = session.get(TASK_ACCESS_SESSION_KEY, [])
if not isinstance(tracked, list):
tracked = []
normalized = [value for value in tracked if isinstance(value, str) and value != task_id]
normalized.append(task_id)
session[TASK_ACCESS_SESSION_KEY] = normalized[-MAX_TRACKED_TASK_IDS:]
def has_session_task_access(task_id: str) -> bool:
"""Return whether the active browser session owns one web task id."""
tracked = session.get(TASK_ACCESS_SESSION_KEY, [])
return isinstance(tracked, list) and task_id in tracked
def login_user_session(user_id: int):
"""Persist the authenticated user in the Flask session."""
tracked_task_ids = session.get(TASK_ACCESS_SESSION_KEY, [])
session.clear()
session.permanent = True
session["user_id"] = user_id
if isinstance(tracked_task_ids, list) and tracked_task_ids:
session[TASK_ACCESS_SESSION_KEY] = tracked_task_ids[-MAX_TRACKED_TASK_IDS:]
def logout_user_session():