From 030418f6db6b3be4df3d538b87565c9811b6fb82 Mon Sep 17 00:00:00 2001 From: Your Name <119736744+aborayan2022@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:51:45 +0200 Subject: [PATCH] feat: Add PostgreSQL support and enhance admin dashboard - Migrate all service files from hardcoded SQLite to dual SQLite/PostgreSQL support - Add PostgreSQL service to docker-compose.yml - Create database abstraction layer (database.py) with execute_query, row_to_dict helpers - Update all 7 service files: account, rating, contact, ai_cost, quota, site_assistant, admin - Add new admin endpoint /database-stats for table size and row count visualization - Add database_type field to system health endpoint - Update .env.example with proper PostgreSQL connection string --- .env.example | 5 +- backend/app/routes/admin.py | 82 +- backend/app/services/account_service.py | 911 ++++++++++++------ backend/app/services/admin_service.py | 506 +++++++--- backend/app/services/ai_cost_service.py | 149 ++- backend/app/services/contact_service.py | 144 +-- backend/app/services/quota_service.py | 404 ++++---- backend/app/services/rating_service.py | 194 ++-- .../app/services/site_assistant_service.py | 406 ++++++-- backend/app/utils/database.py | 107 +- docker-compose.yml | 28 + 11 files changed, 1930 insertions(+), 1006 deletions(-) diff --git a/.env.example b/.env.example index 51ab89c..35c8454 100644 --- a/.env.example +++ b/.env.example @@ -41,6 +41,7 @@ UPLOAD_FOLDER=/tmp/uploads OUTPUT_FOLDER=/tmp/outputs FILE_EXPIRY_SECONDS=1800 STORAGE_ALLOW_LOCAL_FALLBACK=true +# Use DATABASE_PATH for SQLite (development) or DATABASE_URL for PostgreSQL (production) DATABASE_PATH=/app/data/dociva.db # CORS @@ -64,8 +65,8 @@ STRIPE_PRICE_ID_PRO_YEARLY= SENTRY_DSN= SENTRY_ENVIRONMENT=production -# PostgreSQL (production) — leave empty to use SQLite -DATABASE_URL= +# PostgreSQL (production) +DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD}@postgres:5432/dociva POSTGRES_DB=dociva POSTGRES_USER=dociva POSTGRES_PASSWORD=replace-with-strong-postgres-password diff --git a/backend/app/routes/admin.py b/backend/app/routes/admin.py index 4bef7b2..64d34a0 100644 --- a/backend/app/routes/admin.py +++ b/backend/app/routes/admin.py @@ -1,8 +1,14 @@ """Internal admin endpoints secured by authenticated admin sessions.""" + from flask import Blueprint, jsonify, request from app.extensions import limiter -from app.services.account_service import get_user_by_id, is_user_admin, set_user_role, update_user_plan +from app.services.account_service import ( + get_user_by_id, + is_user_admin, + set_user_role, + update_user_plan, +) from app.services.admin_service import ( get_admin_overview, get_admin_ratings_detail, @@ -138,7 +144,9 @@ def update_role_route(user_id: int): return jsonify({"error": "User not found."}), 404 if bool(user.get("is_allowlisted_admin")): - return jsonify({"error": "Allowlisted admin access is managed by INTERNAL_ADMIN_EMAILS."}), 400 + return jsonify( + {"error": "Allowlisted admin access is managed by INTERNAL_ADMIN_EMAILS."} + ), 400 if actor_user_id == user_id and role != "admin": return jsonify({"error": "You cannot remove your own admin role."}), 400 @@ -183,7 +191,9 @@ def admin_ratings_route(): tool_filter = request.args.get("tool", "").strip() - return jsonify(get_admin_ratings_detail(page=page, per_page=per_page, tool_filter=tool_filter)), 200 + return jsonify( + get_admin_ratings_detail(page=page, per_page=per_page, tool_filter=tool_filter) + ), 200 @admin_bp.route("/tool-analytics", methods=["GET"]) @@ -247,3 +257,69 @@ def record_plan_interest_route(): record_plan_interest_click(user_id=user_id, plan=plan, billing=billing) return jsonify({"message": "Interest recorded."}), 200 + + +@admin_bp.route("/database-stats", methods=["GET"]) +@limiter.limit("60/hour") +def admin_database_stats_route(): + """Return database statistics (table sizes, row counts).""" + auth_error = _require_admin_session() + if auth_error: + return auth_error + + from app.utils.database import ( + db_connection, + execute_query, + is_postgres, + row_to_dict, + ) + + with db_connection() as conn: + if is_postgres(): + tables_sql = """ + SELECT + schemaname, + relname AS table_name, + n_live_tup AS row_count, + pg_total_relation_size(relid) AS total_size, + pg_relation_size(relid) AS data_size + FROM pg_stat_user_tables + ORDER BY n_live_tup DESC + """ + else: + tables_sql = """ + SELECT name AS table_name FROM sqlite_master + WHERE type='table' ORDER BY name + """ + cursor = execute_query(conn, tables_sql) + tables = [] + for row in cursor.fetchall(): + row = row_to_dict(row) + if is_postgres(): + tables.append( + { + "table_name": row["table_name"], + "row_count": int(row["row_count"]), + "total_size_kb": round(int(row["total_size"]) / 1024, 1), + "data_size_kb": round(int(row["data_size"]) / 1024, 1), + } + ) + else: + count_cursor = execute_query( + conn, f"SELECT COUNT(*) AS cnt FROM {row['table_name']}" + ) + count_row = row_to_dict(count_cursor.fetchone()) + tables.append( + { + "table_name": row["table_name"], + "row_count": int(count_row["cnt"]), + } + ) + + return jsonify( + { + "database_type": "postgresql" if is_postgres() else "sqlite", + "tables": tables, + "table_count": len(tables), + } + ), 200 diff --git a/backend/app/services/account_service.py b/backend/app/services/account_service.py index f2d6e9f..bd19c27 100644 --- a/backend/app/services/account_service.py +++ b/backend/app/services/account_service.py @@ -1,15 +1,30 @@ -"""User accounts, API keys, history, and usage storage using SQLite.""" +"""User accounts, API keys, history, and usage storage. + +Supports both SQLite (development) and PostgreSQL (production) +via the app.utils.database abstraction layer. +""" + import hashlib import json import logging import os import secrets -import sqlite3 from datetime import datetime, timezone, timedelta from flask import current_app from werkzeug.security import check_password_hash, generate_password_hash +from app.utils.database import ( + db_connection, + execute_query, + get_connection, + get_integrity_error, + get_last_insert_id, + get_row_value, + is_postgres, + row_to_dict, +) + logger = logging.getLogger(__name__) VALID_PLANS = {"free", "pro"} @@ -17,254 +32,358 @@ VALID_ROLES = {"user", "admin"} def _utc_now() -> str: - """Return a stable UTC timestamp string.""" return datetime.now(timezone.utc).isoformat() def get_current_period_month() -> str: - """Return the active usage period in YYYY-MM format.""" return datetime.now(timezone.utc).strftime("%Y-%m") def normalize_plan(plan: str | None) -> str: - """Normalize plan values to the supported set.""" return "pro" if plan == "pro" else "free" def normalize_role(role: str | None) -> str: - """Normalize role values to the supported set.""" return "admin" if role == "admin" else "user" def _get_allowlisted_admin_emails() -> set[str]: configured = current_app.config.get("INTERNAL_ADMIN_EMAILS", ()) - return { - str(email).strip().lower() - for email in configured - if str(email).strip() - } + return {str(email).strip().lower() for email in configured if str(email).strip()} def is_allowlisted_admin_email(email: str | None) -> bool: - """Return whether an email is bootstrapped as admin from configuration.""" normalized_email = _normalize_email(email or "") return normalized_email in _get_allowlisted_admin_emails() -def _resolve_row_role(row: sqlite3.Row | None) -> str: +def _resolve_row_role(row: dict | None) -> str: if row is None: return "user" - - row_keys = row.keys() - stored_role = normalize_role(row["role"]) if "role" in row_keys else "user" - email = str(row["email"]).strip().lower() if "email" in row_keys else "" + stored_role = normalize_role(row.get("role")) if "role" in row else "user" + email = str(row.get("email", "")).strip().lower() if stored_role == "admin" or email in _get_allowlisted_admin_emails(): return "admin" return "user" -def _connect() -> sqlite3.Connection: - """Create a SQLite connection with row access by column name.""" - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - - connection = sqlite3.connect(db_path) - connection.row_factory = sqlite3.Row - connection.execute("PRAGMA foreign_keys = ON") - return connection - - -def _column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool: - """Check whether one column exists in a SQLite table.""" - rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall() - return any(row["name"] == column_name for row in rows) - - -def _serialize_user(row: sqlite3.Row | None) -> dict | None: - """Convert a user row into API-safe data.""" +def _serialize_user(row: dict | None) -> dict | None: if row is None: return None - return { "id": row["id"], "email": row["email"], - "plan": normalize_plan(row["plan"]), + "plan": normalize_plan(row.get("plan")), "role": _resolve_row_role(row), - "is_allowlisted_admin": is_allowlisted_admin_email(row["email"]), - "created_at": row["created_at"], + "is_allowlisted_admin": is_allowlisted_admin_email(row.get("email")), + "created_at": row.get("created_at"), } -def _serialize_api_key(row: sqlite3.Row) -> dict: - """Convert an API key row into public API-safe data.""" +def _serialize_api_key(row: dict) -> dict: return { "id": row["id"], "name": row["name"], "key_prefix": row["key_prefix"], - "last_used_at": row["last_used_at"], - "revoked_at": row["revoked_at"], + "last_used_at": row.get("last_used_at"), + "revoked_at": row.get("revoked_at"), "created_at": row["created_at"], } def _normalize_email(email: str) -> str: - """Normalize user emails for lookups and uniqueness.""" return email.strip().lower() def _hash_api_key(raw_key: str) -> str: - """Return a deterministic digest for one API key.""" return hashlib.sha256(raw_key.encode("utf-8")).hexdigest() +def _column_exists(conn, table_name: str, column_name: str) -> bool: + if is_postgres(): + sql = """ + SELECT column_name FROM information_schema.columns + WHERE table_name = %s AND column_name = %s + """ + cursor = conn.cursor() + cursor.execute(sql, (table_name, column_name)) + return cursor.fetchone() is not None + rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall() + return any(row["name"] == column_name for row in rows) + + def init_account_db(): - """Initialize user, history, API key, and usage tables if they do not exist.""" - with _connect() as conn: - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - email TEXT NOT NULL UNIQUE, - password_hash TEXT NOT NULL, - plan TEXT NOT NULL DEFAULT 'free', - role TEXT NOT NULL DEFAULT 'user', - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL - ); + """Initialize tables if they do not exist.""" + with db_connection() as conn: + if is_postgres(): + _init_postgres_tables(conn) + else: + _init_sqlite_tables(conn) - CREATE TABLE IF NOT EXISTS file_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - tool TEXT NOT NULL, - original_filename TEXT, - output_filename TEXT, - status TEXT NOT NULL, - download_url TEXT, - metadata_json TEXT NOT NULL DEFAULT '{}', - created_at TEXT NOT NULL, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE - ); - CREATE TABLE IF NOT EXISTS api_keys ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - name TEXT NOT NULL, - key_prefix TEXT NOT NULL, - key_hash TEXT NOT NULL UNIQUE, - last_used_at TEXT, - revoked_at TEXT, - created_at TEXT NOT NULL, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE - ); +def _init_postgres_tables(conn): + cursor = conn.cursor() - CREATE TABLE IF NOT EXISTS usage_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - api_key_id INTEGER, - source TEXT NOT NULL, - tool TEXT NOT NULL, - task_id TEXT NOT NULL, - event_type TEXT NOT NULL, - created_at TEXT NOT NULL, - period_month TEXT NOT NULL, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, - FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE - ); - - CREATE INDEX IF NOT EXISTS idx_file_history_user_created - ON file_history(user_id, created_at DESC); - - CREATE INDEX IF NOT EXISTS idx_api_keys_user_created - ON api_keys(user_id, created_at DESC); - - CREATE INDEX IF NOT EXISTS idx_usage_events_user_source_period_event - ON usage_events(user_id, source, period_month, event_type); - - CREATE INDEX IF NOT EXISTS idx_usage_events_task_lookup - ON usage_events(user_id, source, task_id, event_type); - """ + cursor.execute(""" + CREATE TABLE IF NOT EXISTS users ( + id SERIAL PRIMARY KEY, + email TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + plan TEXT NOT NULL DEFAULT 'free', + role TEXT NOT NULL DEFAULT 'user', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL ) + """) - if not _column_exists(conn, "users", "plan"): - conn.execute( - "ALTER TABLE users ADD COLUMN plan TEXT NOT NULL DEFAULT 'free'" - ) - if not _column_exists(conn, "users", "updated_at"): - conn.execute( - "ALTER TABLE users ADD COLUMN updated_at TEXT NOT NULL DEFAULT ''" - ) - if not _column_exists(conn, "users", "role"): - conn.execute( - "ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'" - ) - - # Password reset tokens - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS password_reset_tokens ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - token_hash TEXT NOT NULL UNIQUE, - expires_at TEXT NOT NULL, - used_at TEXT, - created_at TEXT NOT NULL, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE - ); - - CREATE INDEX IF NOT EXISTS idx_prt_token_hash - ON password_reset_tokens(token_hash); - - CREATE TABLE IF NOT EXISTS file_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - event_type TEXT NOT NULL, - file_path TEXT, - detail TEXT, - created_at TEXT NOT NULL - ); - - CREATE INDEX IF NOT EXISTS idx_file_events_created - ON file_events(created_at DESC); - """ + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_history ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + tool TEXT NOT NULL, + original_filename TEXT, + output_filename TEXT, + status TEXT NOT NULL, + download_url TEXT, + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS api_keys ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + name TEXT NOT NULL, + key_prefix TEXT NOT NULL, + key_hash TEXT NOT NULL UNIQUE, + last_used_at TEXT, + revoked_at TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS usage_events ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + api_key_id INTEGER, + source TEXT NOT NULL, + tool TEXT NOT NULL, + task_id TEXT NOT NULL, + event_type TEXT NOT NULL, + created_at TEXT NOT NULL, + period_month TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE + ) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_file_history_user_created + ON file_history(user_id, created_at DESC) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_api_keys_user_created + ON api_keys(user_id, created_at DESC) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_usage_events_user_source_period_event + ON usage_events(user_id, source, period_month, event_type) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_usage_events_task_lookup + ON usage_events(user_id, source, task_id, event_type) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS password_reset_tokens ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + token_hash TEXT NOT NULL UNIQUE, + expires_at TEXT NOT NULL, + used_at TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_prt_token_hash + ON password_reset_tokens(token_hash) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_events ( + id SERIAL PRIMARY KEY, + event_type TEXT NOT NULL, + file_path TEXT, + detail TEXT, + created_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_file_events_created + ON file_events(created_at DESC) + """) + + +def _init_sqlite_tables(conn): + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + email TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + plan TEXT NOT NULL DEFAULT 'free', + role TEXT NOT NULL DEFAULT 'user', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS file_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + tool TEXT NOT NULL, + original_filename TEXT, + output_filename TEXT, + status TEXT NOT NULL, + download_url TEXT, + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS api_keys ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + name TEXT NOT NULL, + key_prefix TEXT NOT NULL, + key_hash TEXT NOT NULL UNIQUE, + last_used_at TEXT, + revoked_at TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS usage_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + api_key_id INTEGER, + source TEXT NOT NULL, + tool TEXT NOT NULL, + task_id TEXT NOT NULL, + event_type TEXT NOT NULL, + created_at TEXT NOT NULL, + period_month TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_file_history_user_created + ON file_history(user_id, created_at DESC); + + CREATE INDEX IF NOT EXISTS idx_api_keys_user_created + ON api_keys(user_id, created_at DESC); + + CREATE INDEX IF NOT EXISTS idx_usage_events_user_source_period_event + ON usage_events(user_id, source, period_month, event_type); + + CREATE INDEX IF NOT EXISTS idx_usage_events_task_lookup + ON usage_events(user_id, source, task_id, event_type); + + CREATE TABLE IF NOT EXISTS password_reset_tokens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + token_hash TEXT NOT NULL UNIQUE, + expires_at TEXT NOT NULL, + used_at TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_prt_token_hash + ON password_reset_tokens(token_hash); + + CREATE TABLE IF NOT EXISTS file_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type TEXT NOT NULL, + file_path TEXT, + detail TEXT, + created_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_file_events_created + ON file_events(created_at DESC); + """ + ) + + if not _column_exists(conn, "users", "plan"): + conn.execute("ALTER TABLE users ADD COLUMN plan TEXT NOT NULL DEFAULT 'free'") + if not _column_exists(conn, "users", "updated_at"): + conn.execute("ALTER TABLE users ADD COLUMN updated_at TEXT NOT NULL DEFAULT ''") + if not _column_exists(conn, "users", "role"): + conn.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'") def create_user(email: str, password: str) -> dict: - """Create a new user and return the public record.""" email = _normalize_email(email) now = _utc_now() role = "admin" if email in _get_allowlisted_admin_emails() else "user" try: - with _connect() as conn: - cursor = conn.execute( + with db_connection() as conn: + sql = ( """ INSERT INTO users (email, password_hash, plan, role, created_at, updated_at) + VALUES (%s, %s, 'free', %s, %s, %s) + RETURNING id + """ + if is_postgres() + else """ + INSERT INTO users (email, password_hash, plan, role, created_at, updated_at) VALUES (?, ?, 'free', ?, ?, ?) - """, - (email, generate_password_hash(password), role, now, now), + """ ) - user_id = cursor.lastrowid - row = conn.execute( - "SELECT id, email, plan, role, created_at FROM users WHERE id = ?", - (user_id,), - ).fetchone() - except sqlite3.IntegrityError as exc: - raise ValueError("An account with this email already exists.") from exc + cursor = execute_query( + conn, sql, (email, generate_password_hash(password), role, now, now) + ) + + if is_postgres(): + result = cursor.fetchone() + user_id = result["id"] if result else None + else: + user_id = cursor.lastrowid + + row_sql = ( + "SELECT id, email, plan, role, created_at FROM users WHERE id = %s" + if is_postgres() + else "SELECT id, email, plan, role, created_at FROM users WHERE id = ?" + ) + cursor2 = execute_query(conn, row_sql, (user_id,)) + row = cursor2.fetchone() + row = row_to_dict(row) + except Exception as exc: + if isinstance(exc, get_integrity_error()): + raise ValueError("An account with this email already exists.") from exc + raise return _serialize_user(row) or {} def authenticate_user(email: str, password: str) -> dict | None: - """Return the public user record when credentials are valid.""" email = _normalize_email(email) - with _connect() as conn: - row = conn.execute( - "SELECT * FROM users WHERE email = ?", - (email,), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT * FROM users WHERE email = %s" + if is_postgres() + else "SELECT * FROM users WHERE email = ?" + ) + cursor = execute_query(conn, sql, (email,)) + row = cursor.fetchone() + row = row_to_dict(row) if row is None or not check_password_hash(row["password_hash"], password): return None @@ -273,78 +392,103 @@ def authenticate_user(email: str, password: str) -> dict | None: def get_user_by_id(user_id: int) -> dict | None: - """Fetch a public user record by id.""" - with _connect() as conn: - row = conn.execute( - "SELECT id, email, plan, role, created_at FROM users WHERE id = ?", - (user_id,), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT id, email, plan, role, created_at FROM users WHERE id = %s" + if is_postgres() + else "SELECT id, email, plan, role, created_at FROM users WHERE id = ?" + ) + cursor = execute_query(conn, sql, (user_id,)) + row = cursor.fetchone() + row = row_to_dict(row) return _serialize_user(row) def is_user_admin(user_id: int | None) -> bool: - """Return whether one user has internal admin access.""" if user_id is None: return False - with _connect() as conn: - row = conn.execute( - "SELECT id, email, role FROM users WHERE id = ?", - (user_id,), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT id, email, role FROM users WHERE id = %s" + if is_postgres() + else "SELECT id, email, role FROM users WHERE id = ?" + ) + cursor = execute_query(conn, sql, (user_id,)) + row = cursor.fetchone() + row = row_to_dict(row) return _resolve_row_role(row) == "admin" def set_user_role(user_id: int, role: str) -> dict | None: - """Update one user role and return the public user record.""" normalized_role = normalize_role(role) if normalized_role not in VALID_ROLES: raise ValueError("Invalid role.") - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """ UPDATE users + SET role = %s, updated_at = %s + WHERE id = %s + """ + if is_postgres() + else """ + UPDATE users SET role = ?, updated_at = ? WHERE id = ? - """, - (normalized_role, _utc_now(), user_id), + """ ) - row = conn.execute( - "SELECT id, email, plan, role, created_at FROM users WHERE id = ?", - (user_id,), - ).fetchone() + execute_query(conn, sql, (normalized_role, _utc_now(), user_id)) + + sql2 = ( + "SELECT id, email, plan, role, created_at FROM users WHERE id = %s" + if is_postgres() + else "SELECT id, email, plan, role, created_at FROM users WHERE id = ?" + ) + cursor = execute_query(conn, sql2, (user_id,)) + row = cursor.fetchone() + row = row_to_dict(row) return _serialize_user(row) def update_user_plan(user_id: int, plan: str) -> dict | None: - """Update one user plan and return the public record.""" normalized_plan = normalize_plan(plan) if normalized_plan not in VALID_PLANS: raise ValueError("Invalid plan.") - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """ UPDATE users + SET plan = %s, updated_at = %s + WHERE id = %s + """ + if is_postgres() + else """ + UPDATE users SET plan = ?, updated_at = ? WHERE id = ? - """, - (normalized_plan, _utc_now(), user_id), + """ ) - row = conn.execute( - "SELECT id, email, plan, role, created_at FROM users WHERE id = ?", - (user_id,), - ).fetchone() + execute_query(conn, sql, (normalized_plan, _utc_now(), user_id)) + + sql2 = ( + "SELECT id, email, plan, role, created_at FROM users WHERE id = %s" + if is_postgres() + else "SELECT id, email, plan, role, created_at FROM users WHERE id = ?" + ) + cursor = execute_query(conn, sql2, (user_id,)) + row = cursor.fetchone() + row = row_to_dict(row) return _serialize_user(row) def create_api_key(user_id: int, name: str) -> dict: - """Create one API key and return the public record plus raw secret once.""" name = name.strip() if not name: raise ValueError("API key name is required.") @@ -354,28 +498,45 @@ def create_api_key(user_id: int, name: str) -> dict: raw_key = f"spdf_{secrets.token_urlsafe(32)}" now = _utc_now() - with _connect() as conn: - cursor = conn.execute( + with db_connection() as conn: + sql = ( """ INSERT INTO api_keys (user_id, name, key_prefix, key_hash, created_at) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + """ + if is_postgres() + else """ + INSERT INTO api_keys (user_id, name, key_prefix, key_hash, created_at) VALUES (?, ?, ?, ?, ?) - """, - ( - user_id, - name, - raw_key[:16], - _hash_api_key(raw_key), - now, - ), + """ ) - row = conn.execute( + cursor = execute_query( + conn, sql, (user_id, name, raw_key[:16], _hash_api_key(raw_key), now) + ) + + if is_postgres(): + result = cursor.fetchone() + key_id = result["id"] if result else None + else: + key_id = cursor.lastrowid + + sql2 = ( """ SELECT id, name, key_prefix, last_used_at, revoked_at, created_at FROM api_keys + WHERE id = %s + """ + if is_postgres() + else """ + SELECT id, name, key_prefix, last_used_at, revoked_at, created_at + FROM api_keys WHERE id = ? - """, - (cursor.lastrowid,), - ).fetchone() + """ + ) + cursor2 = execute_query(conn, sql2, (key_id,)) + row = cursor2.fetchone() + row = row_to_dict(row) result = _serialize_api_key(row) result["raw_key"] = raw_key @@ -383,45 +544,57 @@ def create_api_key(user_id: int, name: str) -> dict: def list_api_keys(user_id: int) -> list[dict]: - """Return all API keys for one user.""" - with _connect() as conn: - rows = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT id, name, key_prefix, last_used_at, revoked_at, created_at FROM api_keys + WHERE user_id = %s + ORDER BY created_at DESC + """ + if is_postgres() + else """ + SELECT id, name, key_prefix, last_used_at, revoked_at, created_at + FROM api_keys WHERE user_id = ? ORDER BY created_at DESC - """, - (user_id,), - ).fetchall() + """ + ) + cursor = execute_query(conn, sql, (user_id,)) + rows = cursor.fetchall() + rows = [row_to_dict(r) for r in rows] return [_serialize_api_key(row) for row in rows] def revoke_api_key(user_id: int, key_id: int) -> bool: - """Revoke one API key owned by one user.""" - with _connect() as conn: - cursor = conn.execute( + with db_connection() as conn: + sql = ( """ UPDATE api_keys + SET revoked_at = %s + WHERE id = %s AND user_id = %s AND revoked_at IS NULL + """ + if is_postgres() + else """ + UPDATE api_keys SET revoked_at = ? WHERE id = ? AND user_id = ? AND revoked_at IS NULL - """, - (_utc_now(), key_id, user_id), + """ ) + cursor = execute_query(conn, sql, (_utc_now(), key_id, user_id)) return cursor.rowcount > 0 def get_api_key_actor(raw_key: str) -> dict | None: - """Resolve one raw API key into the owning active user context.""" if not raw_key: return None key_hash = _hash_api_key(raw_key.strip()) now = _utc_now() - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT api_keys.id AS api_key_id, @@ -434,18 +607,37 @@ def get_api_key_actor(raw_key: str) -> dict | None: users.created_at FROM api_keys INNER JOIN users ON users.id = api_keys.user_id + WHERE api_keys.key_hash = %s AND api_keys.revoked_at IS NULL + """ + if is_postgres() + else """ + SELECT + api_keys.id AS api_key_id, + api_keys.user_id, + api_keys.name, + api_keys.key_prefix, + api_keys.last_used_at, + users.email, + users.plan, + users.created_at + FROM api_keys + INNER JOIN users ON users.id = api_keys.user_id WHERE api_keys.key_hash = ? AND api_keys.revoked_at IS NULL - """, - (key_hash,), - ).fetchone() + """ + ) + cursor = execute_query(conn, sql, (key_hash,)) + row = cursor.fetchone() + row = row_to_dict(row) if row is None: return None - conn.execute( - "UPDATE api_keys SET last_used_at = ? WHERE id = ?", - (now, row["api_key_id"]), + sql2 = ( + "UPDATE api_keys SET last_used_at = %s WHERE id = %s" + if is_postgres() + else "UPDATE api_keys SET last_used_at = ? WHERE id = ?" ) + execute_query(conn, sql2, (now, row["api_key_id"])) return { "api_key_id": row["api_key_id"], @@ -468,16 +660,27 @@ def record_file_history( download_url: str | None, metadata: dict | None = None, ): - """Persist one generated-file history entry.""" - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """ INSERT INTO file_history ( user_id, tool, original_filename, output_filename, status, download_url, metadata_json, created_at ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + if is_postgres() + else """ + INSERT INTO file_history ( + user_id, tool, original_filename, output_filename, + status, download_url, metadata_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, + """ + ) + execute_query( + conn, + sql, ( user_id, tool, @@ -497,7 +700,6 @@ def record_task_history( original_filename: str | None, result: dict, ): - """Persist task results when the request belongs to an authenticated user.""" if user_id is None: return @@ -525,19 +727,29 @@ def record_task_history( def list_file_history(user_id: int, limit: int = 50) -> list[dict]: - """Return most recent file history entries for one user.""" - with _connect() as conn: - rows = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT id, tool, original_filename, output_filename, status, download_url, metadata_json, created_at FROM file_history + WHERE user_id = %s + ORDER BY created_at DESC + LIMIT %s + """ + if is_postgres() + else """ + SELECT id, tool, original_filename, output_filename, status, + download_url, metadata_json, created_at + FROM file_history WHERE user_id = ? ORDER BY created_at DESC LIMIT ? - """, - (user_id, limit), - ).fetchall() + """ + ) + cursor = execute_query(conn, sql, (user_id, limit)) + rows = cursor.fetchall() + rows = [row_to_dict(r) for r in rows] return [ { @@ -555,40 +767,56 @@ def list_file_history(user_id: int, limit: int = 50) -> list[dict]: def get_public_history_summary(limit_tools: int = 5) -> dict: - """Return aggregate public-friendly processing stats derived from history.""" cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() - with _connect() as conn: - totals_row = conn.execute( - """ + with db_connection() as conn: + sql = """ SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed FROM file_history - """ - ).fetchone() + """ + cursor = execute_query(conn, sql) + totals_row = row_to_dict(cursor.fetchone()) - recent_row = conn.execute( + sql2 = ( """ SELECT COUNT(*) AS total FROM file_history + WHERE created_at >= %s + """ + if is_postgres() + else """ + SELECT COUNT(*) AS total + FROM file_history WHERE created_at >= ? - """, - (cutoff_24h,), - ).fetchone() + """ + ) + cursor2 = execute_query(conn, sql2, (cutoff_24h,)) + recent_row = row_to_dict(cursor2.fetchone()) - top_rows = conn.execute( + sql3 = ( """ SELECT tool, COUNT(*) AS count FROM file_history WHERE status = 'completed' GROUP BY tool ORDER BY count DESC, tool ASC + LIMIT %s + """ + if is_postgres() + else """ + SELECT tool, COUNT(*) AS count + FROM file_history + WHERE status = 'completed' + GROUP BY tool + ORDER BY count DESC, tool ASC LIMIT ? - """, - (limit_tools,), - ).fetchall() + """ + ) + cursor3 = execute_query(conn, sql3, (limit_tools,)) + top_rows = [row_to_dict(r) for r in cursor3.fetchall()] total = int(totals_row["total"]) if totals_row else 0 completed = int(totals_row["completed"]) if totals_row else 0 @@ -602,8 +830,7 @@ def get_public_history_summary(limit_tools: int = 5) -> dict: "success_rate": success_rate, "files_last_24h": int(recent_row["total"]) if recent_row else 0, "top_tools": [ - {"tool": row["tool"], "count": int(row["count"])} - for row in top_rows + {"tool": row["tool"], "count": int(row["count"])} for row in top_rows ], } @@ -616,19 +843,30 @@ def record_usage_event( event_type: str, api_key_id: int | None = None, ): - """Persist one usage event when it belongs to an authenticated actor.""" if user_id is None: return - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """ INSERT INTO usage_events ( user_id, api_key_id, source, tool, task_id, event_type, created_at, period_month ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + if is_postgres() + else """ + INSERT INTO usage_events ( + user_id, api_key_id, source, tool, task_id, + event_type, created_at, period_month + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, + """ + ) + execute_query( + conn, + sql, ( user_id, api_key_id, @@ -648,128 +886,161 @@ def count_usage_events( event_type: str = "accepted", period_month: str | None = None, ) -> int: - """Count usage events for one user, source, period, and type.""" - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT COUNT(*) AS count FROM usage_events + WHERE user_id = %s AND source = %s AND event_type = %s AND period_month = %s + """ + if is_postgres() + else """ + SELECT COUNT(*) AS count + FROM usage_events WHERE user_id = ? AND source = ? AND event_type = ? AND period_month = ? - """, + """ + ) + cursor = execute_query( + conn, + sql, (user_id, source, event_type, period_month or get_current_period_month()), - ).fetchone() + ) + row = row_to_dict(cursor.fetchone()) return int(row["count"]) if row else 0 def has_task_access(user_id: int, source: str, task_id: str) -> bool: - """Return whether one user owns one previously accepted task for one source.""" - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT 1 FROM usage_events + WHERE user_id = %s AND source = %s AND task_id = %s + AND event_type IN ('accepted', 'download_alias') + LIMIT 1 + """ + if is_postgres() + else """ + SELECT 1 + FROM usage_events WHERE user_id = ? AND source = ? AND task_id = ? AND event_type IN ('accepted', 'download_alias') LIMIT 1 - """, - (user_id, source, task_id), - ).fetchone() + """ + ) + cursor = execute_query(conn, sql, (user_id, source, task_id)) + row = cursor.fetchone() return row is not None -# --------------------------------------------------------------------------- -# Password reset tokens -# --------------------------------------------------------------------------- - def get_user_by_email(email: str) -> dict | None: - """Fetch a public user record by email.""" email = _normalize_email(email) - with _connect() as conn: - row = conn.execute( - "SELECT id, email, plan, role, created_at FROM users WHERE email = ?", - (email,), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT id, email, plan, role, created_at FROM users WHERE email = %s" + if is_postgres() + else "SELECT id, email, plan, role, created_at FROM users WHERE email = ?" + ) + cursor = execute_query(conn, sql, (email,)) + row = row_to_dict(cursor.fetchone()) return _serialize_user(row) def create_password_reset_token(user_id: int) -> str: - """Generate a password-reset token (returned raw) and store its hash.""" raw_token = secrets.token_urlsafe(48) token_hash = hashlib.sha256(raw_token.encode()).hexdigest() now = _utc_now() - # Expire in 1 hour expires = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat() - with _connect() as conn: - # Invalidate any previous unused tokens for this user - conn.execute( - "UPDATE password_reset_tokens SET used_at = ? WHERE user_id = ? AND used_at IS NULL", - (now, user_id), + with db_connection() as conn: + sql1 = ( + "UPDATE password_reset_tokens SET used_at = %s WHERE user_id = %s AND used_at IS NULL" + if is_postgres() + else "UPDATE password_reset_tokens SET used_at = ? WHERE user_id = ? AND used_at IS NULL" ) - conn.execute( + execute_query(conn, sql1, (now, user_id)) + + sql2 = ( """ INSERT INTO password_reset_tokens (user_id, token_hash, expires_at, created_at) + VALUES (%s, %s, %s, %s) + """ + if is_postgres() + else """ + INSERT INTO password_reset_tokens (user_id, token_hash, expires_at, created_at) VALUES (?, ?, ?, ?) - """, - (user_id, token_hash, expires, now), + """ ) + execute_query(conn, sql2, (user_id, token_hash, expires, now)) return raw_token def verify_and_consume_reset_token(raw_token: str) -> int | None: - """Verify a reset token. Returns user_id if valid, else None. Marks it used.""" token_hash = hashlib.sha256(raw_token.encode()).hexdigest() now = _utc_now() - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """ SELECT id, user_id, expires_at FROM password_reset_tokens + WHERE token_hash = %s AND used_at IS NULL + """ + if is_postgres() + else """ + SELECT id, user_id, expires_at + FROM password_reset_tokens WHERE token_hash = ? AND used_at IS NULL - """, - (token_hash,), - ).fetchone() + """ + ) + cursor = execute_query(conn, sql, (token_hash,)) + row = row_to_dict(cursor.fetchone()) if row is None: return None - # Check expiry if row["expires_at"] < now: - conn.execute( - "UPDATE password_reset_tokens SET used_at = ? WHERE id = ?", - (now, row["id"]), + sql2 = ( + "UPDATE password_reset_tokens SET used_at = %s WHERE id = %s" + if is_postgres() + else "UPDATE password_reset_tokens SET used_at = ? WHERE id = ?" ) + execute_query(conn, sql2, (now, row["id"])) return None - # Mark used - conn.execute( - "UPDATE password_reset_tokens SET used_at = ? WHERE id = ?", - (now, row["id"]), + sql3 = ( + "UPDATE password_reset_tokens SET used_at = %s WHERE id = %s" + if is_postgres() + else "UPDATE password_reset_tokens SET used_at = ? WHERE id = ?" ) + execute_query(conn, sql3, (now, row["id"])) return row["user_id"] def update_user_password(user_id: int, new_password: str) -> bool: - """Update a user's password hash.""" now = _utc_now() password_hash = generate_password_hash(new_password) - with _connect() as conn: - conn.execute( - "UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?", - (password_hash, now, user_id), + with db_connection() as conn: + sql = ( + "UPDATE users SET password_hash = %s, updated_at = %s WHERE id = %s" + if is_postgres() + else "UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?" ) + execute_query(conn, sql, (password_hash, now, user_id)) return True -def log_file_event(event_type: str, file_path: str | None = None, detail: str | None = None) -> None: - """Record a file lifecycle event (upload, download, cleanup, etc.).""" - with _connect() as conn: - conn.execute( - "INSERT INTO file_events (event_type, file_path, detail, created_at) VALUES (?, ?, ?, ?)", - (event_type, file_path, detail, _utc_now()), +def log_file_event( + event_type: str, file_path: str | None = None, detail: str | None = None +) -> None: + with db_connection() as conn: + sql = ( + "INSERT INTO file_events (event_type, file_path, detail, created_at) VALUES (%s, %s, %s, %s)" + if is_postgres() + else "INSERT INTO file_events (event_type, file_path, detail, created_at) VALUES (?, ?, ?, ?)" ) + execute_query(conn, sql, (event_type, file_path, detail, _utc_now())) diff --git a/backend/app/services/admin_service.py b/backend/app/services/admin_service.py index a90b813..a7a301e 100644 --- a/backend/app/services/admin_service.py +++ b/backend/app/services/admin_service.py @@ -1,7 +1,10 @@ -"""Internal admin aggregation helpers for operational dashboards.""" +"""Internal admin aggregation helpers for operational dashboards. + +Supports both SQLite (development) and PostgreSQL (production). +""" + import json import os -import sqlite3 from datetime import datetime, timedelta, timezone from flask import current_app @@ -10,16 +13,7 @@ from app.services.account_service import is_allowlisted_admin_email, normalize_r from app.services.ai_cost_service import get_monthly_spend from app.services.contact_service import mark_read from app.services.rating_service import get_global_rating_summary - - -def _connect() -> sqlite3.Connection: - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - connection = sqlite3.connect(db_path) - connection.row_factory = sqlite3.Row - return connection +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict def _parse_metadata(raw_value: str | None) -> dict: @@ -36,30 +30,40 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict: cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() ai_cost_summary = get_monthly_spend() - with _connect() as conn: - users_row = conn.execute( - """ + with db_connection() as conn: + users_sql = """ SELECT COUNT(*) AS total_users, COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_users, COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_users FROM users - """ - ).fetchone() + """ + cursor = execute_query(conn, users_sql) + users_row = row_to_dict(cursor.fetchone()) - history_row = conn.execute( + history_sql = ( """ + SELECT + COUNT(*) AS total_files_processed, + COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS files_last_24h + FROM file_history + """ + if is_postgres() + else """ SELECT COUNT(*) AS total_files_processed, COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files, COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS files_last_24h FROM file_history - """, - (cutoff_24h,), - ).fetchone() + """ + ) + cursor2 = execute_query(conn, history_sql, (cutoff_24h,)) + history_row = row_to_dict(cursor2.fetchone()) - top_tools_rows = conn.execute( + top_tools_sql = ( """ SELECT tool, @@ -68,12 +72,24 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict: FROM file_history GROUP BY tool ORDER BY total_runs DESC, tool ASC + LIMIT %s + """ + if is_postgres() + else """ + SELECT + tool, + COUNT(*) AS total_runs, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_runs + FROM file_history + GROUP BY tool + ORDER BY total_runs DESC, tool ASC LIMIT ? - """, - (top_tools_limit,), - ).fetchall() + """ + ) + cursor3 = execute_query(conn, top_tools_sql, (top_tools_limit,)) + top_tools_rows = [row_to_dict(r) for r in cursor3.fetchall()] - failure_rows = conn.execute( + failure_sql = ( """ SELECT file_history.id, @@ -87,12 +103,29 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict: LEFT JOIN users ON users.id = file_history.user_id WHERE file_history.status = 'failed' ORDER BY file_history.created_at DESC + LIMIT %s + """ + if is_postgres() + else """ + SELECT + file_history.id, + file_history.user_id, + file_history.tool, + file_history.original_filename, + file_history.metadata_json, + file_history.created_at, + users.email + FROM file_history + LEFT JOIN users ON users.id = file_history.user_id + WHERE file_history.status = 'failed' + ORDER BY file_history.created_at DESC LIMIT ? - """, - (limit_recent,), - ).fetchall() + """ + ) + cursor4 = execute_query(conn, failure_sql, (limit_recent,)) + failure_rows = [row_to_dict(r) for r in cursor4.fetchall()] - recent_user_rows = conn.execute( + recent_user_sql = ( """ SELECT users.id, @@ -103,33 +136,66 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict: COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys FROM users ORDER BY users.created_at DESC + LIMIT %s + """ + if is_postgres() + else """ + SELECT + users.id, + users.email, + users.plan, + users.created_at, + COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id), 0) AS total_tasks, + COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys + FROM users + ORDER BY users.created_at DESC LIMIT ? - """, - (limit_recent,), - ).fetchall() + """ + ) + cursor5 = execute_query(conn, recent_user_sql, (limit_recent,)) + recent_user_rows = [row_to_dict(r) for r in cursor5.fetchall()] - contact_row = conn.execute( + contact_sql = ( """ + SELECT + COUNT(*) AS total_messages, + COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread_messages + FROM contact_messages + """ + if is_postgres() + else """ SELECT COUNT(*) AS total_messages, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread_messages FROM contact_messages - """ - ).fetchone() + """ + ) + cursor6 = execute_query(conn, contact_sql) + contact_row = row_to_dict(cursor6.fetchone()) - recent_contact_rows = conn.execute( + recent_contact_sql = ( """ SELECT id, name, email, category, subject, message, created_at, is_read FROM contact_messages ORDER BY created_at DESC + LIMIT %s + """ + if is_postgres() + else """ + SELECT id, name, email, category, subject, message, created_at, is_read + FROM contact_messages + ORDER BY created_at DESC LIMIT ? - """, - (limit_recent,), - ).fetchall() + """ + ) + cursor7 = execute_query(conn, recent_contact_sql, (limit_recent,)) + recent_contact_rows = [row_to_dict(r) for r in cursor7.fetchall()] total_processed = int(history_row["total_files_processed"]) if history_row else 0 completed_files = int(history_row["completed_files"]) if history_row else 0 - success_rate = round((completed_files / total_processed) * 100, 1) if total_processed else 0.0 + success_rate = ( + round((completed_files / total_processed) * 100, 1) if total_processed else 0.0 + ) return { "users": { @@ -153,7 +219,9 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict: }, "contacts": { "total_messages": int(contact_row["total_messages"]) if contact_row else 0, - "unread_messages": int(contact_row["unread_messages"]) if contact_row else 0, + "unread_messages": int(contact_row["unread_messages"]) + if contact_row + else 0, "recent": [ { "id": row["id"], @@ -219,20 +287,32 @@ def list_admin_users(limit: int = 25, query: str = "") -> list[dict]: """ params: list[object] = [] if normalized_query: - sql += " WHERE LOWER(users.email) LIKE ?" + sql += ( + " WHERE LOWER(users.email) LIKE %s" + if is_postgres() + else " WHERE LOWER(users.email) LIKE ?" + ) params.append(f"%{normalized_query}%") - sql += " ORDER BY users.created_at DESC LIMIT ?" + sql += ( + " ORDER BY users.created_at DESC LIMIT %s" + if is_postgres() + else " ORDER BY users.created_at DESC LIMIT ?" + ) params.append(limit) - with _connect() as conn: - rows = conn.execute(sql, tuple(params)).fetchall() + with db_connection() as conn: + cursor = execute_query(conn, sql, tuple(params)) + rows = cursor.fetchall() + rows = [row_to_dict(r) for r in rows] return [ { "id": row["id"], "email": row["email"], "plan": row["plan"], - "role": "admin" if is_allowlisted_admin_email(row["email"]) else normalize_role(row["role"]), + "role": "admin" + if is_allowlisted_admin_email(row["email"]) + else normalize_role(row["role"]), "is_allowlisted_admin": is_allowlisted_admin_email(row["email"]), "created_at": row["created_at"], "total_tasks": int(row["total_tasks"]), @@ -249,19 +329,32 @@ def list_admin_contacts(page: int = 1, per_page: int = 20) -> dict: safe_per_page = max(1, min(per_page, 100)) offset = (safe_page - 1) * safe_per_page - with _connect() as conn: - total_row = conn.execute( - "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages" - ).fetchone() - rows = conn.execute( + with db_connection() as conn: + total_sql = ( + "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages" + if is_postgres() + else "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages" + ) + cursor = execute_query(conn, total_sql) + total_row = row_to_dict(cursor.fetchone()) + + rows_sql = ( """ SELECT id, name, email, category, subject, message, created_at, is_read FROM contact_messages ORDER BY created_at DESC + LIMIT %s OFFSET %s + """ + if is_postgres() + else """ + SELECT id, name, email, category, subject, message, created_at, is_read + FROM contact_messages + ORDER BY created_at DESC LIMIT ? OFFSET ? - """, - (safe_per_page, offset), - ).fetchall() + """ + ) + cursor2 = execute_query(conn, rows_sql, (safe_per_page, offset)) + rows = [row_to_dict(r) for r in cursor2.fetchall()] return { "items": [ @@ -288,39 +381,51 @@ def mark_admin_contact_read(message_id: int) -> bool: return mark_read(message_id) -# --------------------------------------------------------------------------- -# Enhanced admin analytics -# --------------------------------------------------------------------------- - - def _ensure_plan_interest_table(): """Create plan_interest_clicks table if it does not exist.""" - with _connect() as conn: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS plan_interest_clicks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER, - plan TEXT NOT NULL, - billing TEXT NOT NULL DEFAULT 'monthly', - created_at TEXT NOT NULL + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS plan_interest_clicks ( + id SERIAL PRIMARY KEY, + user_id INTEGER, + plan TEXT NOT NULL, + billing TEXT NOT NULL DEFAULT 'monthly', + created_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at) + """) + else: + conn.execute(""" + CREATE TABLE IF NOT EXISTS plan_interest_clicks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + plan TEXT NOT NULL, + billing TEXT NOT NULL DEFAULT 'monthly', + created_at TEXT NOT NULL + ) + """) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)" ) - """ - ) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)" - ) -def record_plan_interest_click(user_id: int | None, plan: str, billing: str = "monthly") -> None: +def record_plan_interest_click( + user_id: int | None, plan: str, billing: str = "monthly" +) -> None: """Record a click on a pricing plan button.""" _ensure_plan_interest_table() now = datetime.now(timezone.utc).isoformat() - with _connect() as conn: - conn.execute( - "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (?, ?, ?, ?)", - (user_id, plan, billing, now), + with db_connection() as conn: + sql = ( + "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (%s, %s, %s, %s)" + if is_postgres() + else "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (?, ?, ?, ?)" ) + execute_query(conn, sql, (user_id, plan, billing, now)) def get_plan_interest_summary() -> dict: @@ -329,30 +434,39 @@ def get_plan_interest_summary() -> dict: cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() - with _connect() as conn: - total_row = conn.execute( + with db_connection() as conn: + total_sql = ( """ + SELECT + COUNT(*) AS total_clicks, + COUNT(DISTINCT user_id) AS unique_users, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_7d, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_30d + FROM plan_interest_clicks + """ + if is_postgres() + else """ SELECT COUNT(*) AS total_clicks, COUNT(DISTINCT user_id) AS unique_users, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_7d, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_30d FROM plan_interest_clicks - """, - (cutoff_7d, cutoff_30d), - ).fetchone() + """ + ) + cursor = execute_query(conn, total_sql, (cutoff_7d, cutoff_30d)) + total_row = row_to_dict(cursor.fetchone()) - by_plan_rows = conn.execute( - """ + by_plan_sql = """ SELECT plan, billing, COUNT(*) AS clicks FROM plan_interest_clicks GROUP BY plan, billing ORDER BY clicks DESC - """ - ).fetchall() + """ + cursor2 = execute_query(conn, by_plan_sql) + by_plan_rows = [row_to_dict(r) for r in cursor2.fetchall()] - recent_rows = conn.execute( - """ + recent_sql = """ SELECT plan_interest_clicks.id, plan_interest_clicks.user_id, @@ -364,8 +478,9 @@ def get_plan_interest_summary() -> dict: LEFT JOIN users ON users.id = plan_interest_clicks.user_id ORDER BY plan_interest_clicks.created_at DESC LIMIT 20 - """ - ).fetchall() + """ + cursor3 = execute_query(conn, recent_sql) + recent_rows = [row_to_dict(r) for r in cursor3.fetchall()] return { "total_clicks": int(total_row["total_clicks"]) if total_row else 0, @@ -373,7 +488,11 @@ def get_plan_interest_summary() -> dict: "clicks_last_7d": int(total_row["clicks_last_7d"]) if total_row else 0, "clicks_last_30d": int(total_row["clicks_last_30d"]) if total_row else 0, "by_plan": [ - {"plan": row["plan"], "billing": row["billing"], "clicks": int(row["clicks"])} + { + "plan": row["plan"], + "billing": row["billing"], + "clicks": int(row["clicks"]), + } for row in by_plan_rows ], "recent": [ @@ -390,39 +509,43 @@ def get_plan_interest_summary() -> dict: } -def get_admin_ratings_detail(page: int = 1, per_page: int = 20, tool_filter: str = "") -> dict: +def get_admin_ratings_detail( + page: int = 1, per_page: int = 20, tool_filter: str = "" +) -> dict: """Return detailed ratings list with feedback for the admin dashboard.""" safe_page = max(1, page) safe_per_page = max(1, min(per_page, 100)) offset = (safe_page - 1) * safe_per_page - with _connect() as conn: - # Total count + with db_connection() as conn: count_sql = "SELECT COUNT(*) AS total FROM tool_ratings" count_params: list[object] = [] if tool_filter: - count_sql += " WHERE tool = ?" + count_sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?" count_params.append(tool_filter) - total_row = conn.execute(count_sql, tuple(count_params)).fetchone() + cursor = execute_query(conn, count_sql, tuple(count_params)) + total_row = row_to_dict(cursor.fetchone()) - # Paginated ratings sql = """ SELECT id, tool, rating, feedback, tag, fingerprint, created_at FROM tool_ratings """ params: list[object] = [] if tool_filter: - sql += " WHERE tool = ?" + sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?" params.append(tool_filter) - sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?" + sql += ( + " ORDER BY created_at DESC LIMIT %s OFFSET %s" + if is_postgres() + else " ORDER BY created_at DESC LIMIT ? OFFSET ?" + ) params.extend([safe_per_page, offset]) - rows = conn.execute(sql, tuple(params)).fetchall() + cursor2 = execute_query(conn, sql, tuple(params)) + rows = [row_to_dict(r) for r in cursor2.fetchall()] - # Per-tool summary - summary_rows = conn.execute( - """ + summary_sql = """ SELECT tool, COUNT(*) AS count, @@ -432,8 +555,9 @@ def get_admin_ratings_detail(page: int = 1, per_page: int = 20, tool_filter: str FROM tool_ratings GROUP BY tool ORDER BY count DESC - """ - ).fetchall() + """ + cursor3 = execute_query(conn, summary_sql) + summary_rows = [row_to_dict(r) for r in cursor3.fetchall()] return { "items": [ @@ -469,10 +593,24 @@ def get_admin_tool_analytics() -> dict: cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() - with _connect() as conn: - # Per-tool detailed stats - tool_rows = conn.execute( + with db_connection() as conn: + tool_sql = ( """ + SELECT + tool, + COUNT(*) AS total_runs, + COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_24h, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_7d, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_30d, + COUNT(DISTINCT user_id) AS unique_users + FROM file_history + GROUP BY tool + ORDER BY total_runs DESC + """ + if is_postgres() + else """ SELECT tool, COUNT(*) AS total_runs, @@ -485,13 +623,25 @@ def get_admin_tool_analytics() -> dict: FROM file_history GROUP BY tool ORDER BY total_runs DESC - """, - (cutoff_24h, cutoff_7d, cutoff_30d), - ).fetchall() + """ + ) + cursor = execute_query(conn, tool_sql, (cutoff_24h, cutoff_7d, cutoff_30d)) + tool_rows = [row_to_dict(r) for r in cursor.fetchall()] - # Daily usage for the last 30 days - daily_rows = conn.execute( + daily_sql = ( """ + SELECT + created_at::date AS day, + COUNT(*) AS total, + COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed + FROM file_history + WHERE created_at >= %s + GROUP BY created_at::date + ORDER BY day ASC + """ + if is_postgres() + else """ SELECT DATE(created_at) AS day, COUNT(*) AS total, @@ -501,25 +651,38 @@ def get_admin_tool_analytics() -> dict: WHERE created_at >= ? GROUP BY DATE(created_at) ORDER BY day ASC - """, - (cutoff_30d,), - ).fetchall() + """ + ) + cursor2 = execute_query(conn, daily_sql, (cutoff_30d,)) + daily_rows = [row_to_dict(r) for r in cursor2.fetchall()] - # Most common errors - error_rows = conn.execute( + error_sql = ( """ SELECT tool, metadata_json, COUNT(*) AS occurrences FROM file_history + WHERE status = 'failed' AND created_at >= %s + GROUP BY tool, metadata_json + ORDER BY occurrences DESC + LIMIT 15 + """ + if is_postgres() + else """ + SELECT + tool, + metadata_json, + COUNT(*) AS occurrences + FROM file_history WHERE status = 'failed' AND created_at >= ? GROUP BY tool, metadata_json ORDER BY occurrences DESC LIMIT 15 - """, - (cutoff_30d,), - ).fetchall() + """ + ) + cursor3 = execute_query(conn, error_sql, (cutoff_30d,)) + error_rows = [row_to_dict(r) for r in cursor3.fetchall()] return { "tools": [ @@ -528,7 +691,11 @@ def get_admin_tool_analytics() -> dict: "total_runs": int(row["total_runs"]), "completed": int(row["completed"]), "failed": int(row["failed"]), - "success_rate": round((int(row["completed"]) / int(row["total_runs"])) * 100, 1) if int(row["total_runs"]) > 0 else 0, + "success_rate": round( + (int(row["completed"]) / int(row["total_runs"])) * 100, 1 + ) + if int(row["total_runs"]) > 0 + else 0, "runs_24h": int(row["runs_24h"]), "runs_7d": int(row["runs_7d"]), "runs_30d": int(row["runs_30d"]), @@ -538,7 +705,7 @@ def get_admin_tool_analytics() -> dict: ], "daily_usage": [ { - "day": row["day"], + "day": str(row["day"]), "total": int(row["total"]), "completed": int(row["completed"]), "failed": int(row["failed"]), @@ -548,7 +715,9 @@ def get_admin_tool_analytics() -> dict: "common_errors": [ { "tool": row["tool"], - "error": _parse_metadata(row["metadata_json"]).get("error", "Unknown error"), + "error": _parse_metadata(row["metadata_json"]).get( + "error", "Unknown error" + ), "occurrences": int(row["occurrences"]), } for row in error_rows @@ -561,9 +730,19 @@ def get_admin_user_registration_stats() -> dict: cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() - with _connect() as conn: - totals_row = conn.execute( + with db_connection() as conn: + totals_sql = ( """ + SELECT + COUNT(*) AS total, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_7d, + COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_30d, + COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count, + COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count + FROM users + """ + if is_postgres() + else """ SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS last_7d, @@ -571,25 +750,32 @@ def get_admin_user_registration_stats() -> dict: COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count, COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count FROM users - """, - (cutoff_7d, cutoff_30d), - ).fetchone() + """ + ) + cursor = execute_query(conn, totals_sql, (cutoff_7d, cutoff_30d)) + totals_row = row_to_dict(cursor.fetchone()) - # Daily registrations for the last 30 days - daily_rows = conn.execute( + daily_sql = ( """ + SELECT created_at::date AS day, COUNT(*) AS registrations + FROM users + WHERE created_at >= %s + GROUP BY created_at::date + ORDER BY day ASC + """ + if is_postgres() + else """ SELECT DATE(created_at) AS day, COUNT(*) AS registrations FROM users WHERE created_at >= ? GROUP BY DATE(created_at) ORDER BY day ASC - """, - (cutoff_30d,), - ).fetchall() + """ + ) + cursor2 = execute_query(conn, daily_sql, (cutoff_30d,)) + daily_rows = [row_to_dict(r) for r in cursor2.fetchall()] - # Most active users (by task count) - active_rows = conn.execute( - """ + active_sql = """ SELECT users.id, users.email, @@ -601,8 +787,9 @@ def get_admin_user_registration_stats() -> dict: GROUP BY users.id ORDER BY total_tasks DESC LIMIT 10 - """ - ).fetchall() + """ + cursor3 = execute_query(conn, active_sql) + active_rows = [row_to_dict(r) for r in cursor3.fetchall()] return { "total_users": int(totals_row["total"]) if totals_row else 0, @@ -611,7 +798,7 @@ def get_admin_user_registration_stats() -> dict: "pro_users": int(totals_row["pro_count"]) if totals_row else 0, "free_users": int(totals_row["free_count"]) if totals_row else 0, "daily_registrations": [ - {"day": row["day"], "count": int(row["registrations"])} + {"day": str(row["day"]), "count": int(row["registrations"])} for row in daily_rows ], "most_active_users": [ @@ -634,23 +821,33 @@ def get_admin_system_health() -> dict: ai_cost_summary = get_monthly_spend() settings = get_openrouter_settings() - with _connect() as conn: - # Recent error rate (last 1h) + with db_connection() as conn: cutoff_1h = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() - error_row = conn.execute( + error_sql = ( """ SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed FROM file_history + WHERE created_at >= %s + """ + if is_postgres() + else """ + SELECT + COUNT(*) AS total, + COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed + FROM file_history WHERE created_at >= ? - """, - (cutoff_1h,), - ).fetchone() + """ + ) + cursor = execute_query(conn, error_sql, (cutoff_1h,)) + error_row = row_to_dict(cursor.fetchone()) - # DB size - db_path = current_app.config["DATABASE_PATH"] - db_size_mb = round(os.path.getsize(db_path) / (1024 * 1024), 2) if os.path.exists(db_path) else 0 + db_size_mb = 0 + if not is_postgres(): + db_path = current_app.config.get("DATABASE_PATH", "") + if db_path and os.path.exists(db_path): + db_size_mb = round(os.path.getsize(db_path) / (1024 * 1024), 2) error_total = int(error_row["total"]) if error_row else 0 error_failed = int(error_row["failed"]) if error_row else 0 @@ -659,8 +856,11 @@ def get_admin_system_health() -> dict: "ai_configured": bool(settings.api_key), "ai_model": settings.model, "ai_budget_used_percent": ai_cost_summary["budget_used_percent"], - "error_rate_1h": round((error_failed / error_total) * 100, 1) if error_total > 0 else 0, + "error_rate_1h": round((error_failed / error_total) * 100, 1) + if error_total > 0 + else 0, "tasks_last_1h": error_total, "failures_last_1h": error_failed, "database_size_mb": db_size_mb, - } \ No newline at end of file + "database_type": "postgresql" if is_postgres() else "sqlite", + } diff --git a/backend/app/services/ai_cost_service.py b/backend/app/services/ai_cost_service.py index 09fea43..229af08 100644 --- a/backend/app/services/ai_cost_service.py +++ b/backend/app/services/ai_cost_service.py @@ -1,31 +1,23 @@ -"""AI cost tracking service — monitors and limits AI API spending.""" +"""AI cost tracking service — monitors and limits AI API spending. + +Supports both SQLite (development) and PostgreSQL (production). +""" + import logging import os -import sqlite3 from datetime import datetime, timezone from flask import current_app +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict + logger = logging.getLogger(__name__) -# Monthly budget in USD — set via environment variable, default $50 AI_MONTHLY_BUDGET = float(os.getenv("AI_MONTHLY_BUDGET", "50.0")) - -# Estimated cost per 1K tokens (adjust based on your model) COST_PER_1K_INPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_INPUT", "0.0")) COST_PER_1K_OUTPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_OUTPUT", "0.0")) -def _connect() -> sqlite3.Connection: - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - connection = sqlite3.connect(db_path) - connection.row_factory = sqlite3.Row - return connection - - def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() @@ -36,24 +28,43 @@ def _current_month() -> str: def init_ai_cost_db(): """Create AI cost tracking table if not exists.""" - with _connect() as conn: - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS ai_cost_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - tool TEXT NOT NULL, - model TEXT NOT NULL, - input_tokens INTEGER DEFAULT 0, - output_tokens INTEGER DEFAULT 0, - estimated_cost_usd REAL DEFAULT 0.0, - period_month TEXT NOT NULL, - created_at TEXT NOT NULL - ); + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS ai_cost_log ( + id SERIAL PRIMARY KEY, + tool TEXT NOT NULL, + model TEXT NOT NULL, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + estimated_cost_usd REAL DEFAULT 0.0, + period_month TEXT NOT NULL, + created_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_ai_cost_period + ON ai_cost_log(period_month) + """) + else: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS ai_cost_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tool TEXT NOT NULL, + model TEXT NOT NULL, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + estimated_cost_usd REAL DEFAULT 0.0, + period_month TEXT NOT NULL, + created_at TEXT NOT NULL + ); - CREATE INDEX IF NOT EXISTS idx_ai_cost_period - ON ai_cost_log(period_month); - """ - ) + CREATE INDEX IF NOT EXISTS idx_ai_cost_period + ON ai_cost_log(period_month); + """ + ) def log_ai_usage( @@ -63,22 +74,41 @@ def log_ai_usage( output_tokens: int = 0, ) -> None: """Log an AI API call with token usage.""" - estimated_cost = ( - (input_tokens / 1000.0) * COST_PER_1K_INPUT_TOKENS - + (output_tokens / 1000.0) * COST_PER_1K_OUTPUT_TOKENS - ) + estimated_cost = (input_tokens / 1000.0) * COST_PER_1K_INPUT_TOKENS + ( + output_tokens / 1000.0 + ) * COST_PER_1K_OUTPUT_TOKENS - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """INSERT INTO ai_cost_log - (tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - (tool, model, input_tokens, output_tokens, estimated_cost, _current_month(), _utc_now()), + (tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at) + VALUES (%s, %s, %s, %s, %s, %s, %s)""" + if is_postgres() + else """INSERT INTO ai_cost_log + (tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""" + ) + execute_query( + conn, + sql, + ( + tool, + model, + input_tokens, + output_tokens, + estimated_cost, + _current_month(), + _utc_now(), + ), ) logger.info( "AI usage: tool=%s model=%s in=%d out=%d cost=$%.4f", - tool, model, input_tokens, output_tokens, estimated_cost, + tool, + model, + input_tokens, + output_tokens, + estimated_cost, ) @@ -86,17 +116,26 @@ def get_monthly_spend() -> dict: """Get the current month's AI spending summary.""" month = _current_month() - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """SELECT - COUNT(*) as total_calls, - COALESCE(SUM(input_tokens), 0) as total_input_tokens, - COALESCE(SUM(output_tokens), 0) as total_output_tokens, - COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost - FROM ai_cost_log - WHERE period_month = ?""", - (month,), - ).fetchone() + COUNT(*) as total_calls, + COALESCE(SUM(input_tokens), 0) as total_input_tokens, + COALESCE(SUM(output_tokens), 0) as total_output_tokens, + COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost + FROM ai_cost_log + WHERE period_month = %s""" + if is_postgres() + else """SELECT + COUNT(*) as total_calls, + COALESCE(SUM(input_tokens), 0) as total_input_tokens, + COALESCE(SUM(output_tokens), 0) as total_output_tokens, + COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost + FROM ai_cost_log + WHERE period_month = ?""" + ) + cursor = execute_query(conn, sql, (month,)) + row = row_to_dict(cursor.fetchone()) return { "period": month, @@ -107,7 +146,10 @@ def get_monthly_spend() -> dict: "budget_usd": AI_MONTHLY_BUDGET, "budget_remaining_usd": round(AI_MONTHLY_BUDGET - row["total_cost"], 4), "budget_used_percent": round( - (row["total_cost"] / AI_MONTHLY_BUDGET * 100) if AI_MONTHLY_BUDGET > 0 else 0, 1 + (row["total_cost"] / AI_MONTHLY_BUDGET * 100) + if AI_MONTHLY_BUDGET > 0 + else 0, + 1, ), } @@ -128,4 +170,5 @@ def check_ai_budget() -> None: class AiBudgetExceededError(Exception): """Raised when the monthly AI budget is exceeded.""" + pass diff --git a/backend/app/services/contact_service.py b/backend/app/services/contact_service.py index 609ffb2..c9cc876 100644 --- a/backend/app/services/contact_service.py +++ b/backend/app/services/contact_service.py @@ -1,71 +1,86 @@ -"""Contact form service — stores messages and sends notification emails.""" +"""Contact form service — stores messages and sends notification emails. + +Supports both SQLite (development) and PostgreSQL (production). +""" + import logging -import os -import sqlite3 from datetime import datetime, timezone from flask import current_app from app.services.email_service import send_email +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict logger = logging.getLogger(__name__) VALID_CATEGORIES = {"general", "bug", "feature"} -def _connect() -> sqlite3.Connection: - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - return conn - - def init_contact_db() -> None: """Create the contact_messages table if it doesn't exist.""" - conn = _connect() - try: - conn.execute(""" - CREATE TABLE IF NOT EXISTS contact_messages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - email TEXT NOT NULL, - category TEXT NOT NULL DEFAULT 'general', - subject TEXT NOT NULL, - message TEXT NOT NULL, - created_at TEXT NOT NULL, - is_read INTEGER NOT NULL DEFAULT 0 - ) - """) - conn.commit() - finally: - conn.close() + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS contact_messages ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT NOT NULL, + category TEXT NOT NULL DEFAULT 'general', + subject TEXT NOT NULL, + message TEXT NOT NULL, + created_at TEXT NOT NULL, + is_read BOOLEAN NOT NULL DEFAULT FALSE + ) + """) + else: + conn.execute(""" + CREATE TABLE IF NOT EXISTS contact_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + email TEXT NOT NULL, + category TEXT NOT NULL DEFAULT 'general', + subject TEXT NOT NULL, + message TEXT NOT NULL, + created_at TEXT NOT NULL, + is_read INTEGER NOT NULL DEFAULT 0 + ) + """) -def save_message(name: str, email: str, category: str, subject: str, message: str) -> dict: +def save_message( + name: str, email: str, category: str, subject: str, message: str +) -> dict: """Persist a contact message and send a notification email.""" if category not in VALID_CATEGORIES: category = "general" now = datetime.now(timezone.utc).isoformat() - conn = _connect() - try: - cursor = conn.execute( - """INSERT INTO contact_messages (name, email, category, subject, message, created_at) - VALUES (?, ?, ?, ?, ?, ?)""", - (name, email, category, subject, message, now), - ) - conn.commit() - msg_id = cursor.lastrowid - finally: - conn.close() - # Send notification email to admin + with db_connection() as conn: + sql = ( + """INSERT INTO contact_messages (name, email, category, subject, message, created_at) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id""" + if is_postgres() + else """INSERT INTO contact_messages (name, email, category, subject, message, created_at) + VALUES (?, ?, ?, ?, ?, ?)""" + ) + cursor = execute_query( + conn, sql, (name, email, category, subject, message, now) + ) + + if is_postgres(): + result = cursor.fetchone() + msg_id = result["id"] if result else None + else: + msg_id = cursor.lastrowid + admin_emails = tuple(current_app.config.get("INTERNAL_ADMIN_EMAILS", ())) - admin_email = admin_emails[0] if admin_emails else current_app.config.get( - "SMTP_FROM", "noreply@dociva.io" + admin_email = ( + admin_emails[0] + if admin_emails + else current_app.config.get("SMTP_FROM", "noreply@dociva.io") ) try: send_email( @@ -89,16 +104,19 @@ def save_message(name: str, email: str, category: str, subject: str, message: st def get_messages(page: int = 1, per_page: int = 20) -> dict: """Retrieve paginated contact messages (admin use).""" offset = (page - 1) * per_page - conn = _connect() - try: - total = conn.execute("SELECT COUNT(*) FROM contact_messages").fetchone()[0] - rows = conn.execute( - "SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT ? OFFSET ?", - (per_page, offset), - ).fetchall() - messages = [dict(r) for r in rows] - finally: - conn.close() + + with db_connection() as conn: + cursor = execute_query(conn, "SELECT COUNT(*) FROM contact_messages") + total = cursor.fetchone()[0] + + sql = ( + """SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT %s OFFSET %s""" + if is_postgres() + else """SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT ? OFFSET ?""" + ) + cursor2 = execute_query(conn, sql, (per_page, offset)) + rows = cursor2.fetchall() + messages = [row_to_dict(r) for r in rows] return { "messages": messages, @@ -110,13 +128,11 @@ def get_messages(page: int = 1, per_page: int = 20) -> dict: def mark_read(message_id: int) -> bool: """Mark a contact message as read.""" - conn = _connect() - try: - result = conn.execute( - "UPDATE contact_messages SET is_read = 1 WHERE id = ?", - (message_id,), + with db_connection() as conn: + sql = ( + "UPDATE contact_messages SET is_read = TRUE WHERE id = %s" + if is_postgres() + else "UPDATE contact_messages SET is_read = 1 WHERE id = ?" ) - conn.commit() - return result.rowcount > 0 - finally: - conn.close() + cursor = execute_query(conn, sql, (message_id,)) + return cursor.rowcount > 0 diff --git a/backend/app/services/quota_service.py b/backend/app/services/quota_service.py index 3b27a63..4229f74 100644 --- a/backend/app/services/quota_service.py +++ b/backend/app/services/quota_service.py @@ -1,24 +1,14 @@ """ QuotaService Manages usage quotas and limits for Free, Pro, and Business tiers - -Quota Structure: -- Free: 5 conversions/day, 10MB max file size, no batch processing -- Pro: 100 conversions/day, 100MB max file size, batch processing (5 files) -- Business: Unlimited conversions, 500MB max file size, batch processing (20 files) - -Tracks: -- Daily usage (resets at UTC midnight) -- Storage usage -- API rate limits -- Feature access (premium features) """ import logging from datetime import datetime, timedelta from typing import Dict, Optional, Tuple from flask import current_app -from app.services.account_service import _connect, _utc_now +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict +from app.services.account_service import _utc_now logger = logging.getLogger(__name__) @@ -26,49 +16,42 @@ logger = logging.getLogger(__name__) class QuotaLimits: """Define quota limits for each tier""" - # Conversions per day CONVERSIONS_PER_DAY = { "free": 5, "pro": 100, "business": float("inf"), } - # Maximum file size in MB MAX_FILE_SIZE_MB = { "free": 10, "pro": 100, "business": 500, } - # Storage limit in MB (monthly) STORAGE_LIMIT_MB = { "free": 500, "pro": 5000, "business": float("inf"), } - # API rate limit (requests per minute) API_RATE_LIMIT = { "free": 10, "pro": 60, "business": float("inf"), } - # Concurrent processing jobs CONCURRENT_JOBS = { "free": 1, "pro": 3, "business": 10, } - # Batch file limit BATCH_FILE_LIMIT = { "free": 1, "pro": 5, "business": 20, } - # Premium features (Pro/Business) PREMIUM_FEATURES = { "free": set(), "pro": {"batch_processing", "priority_queue", "email_delivery", "api_access"}, @@ -89,67 +72,100 @@ class QuotaService: @staticmethod def _ensure_quota_tables(): """Create quota tracking tables if they don't exist""" - conn = _connect() - try: - # Daily usage tracking - conn.execute(""" - CREATE TABLE IF NOT EXISTS daily_usage ( - id INTEGER PRIMARY KEY, - user_id INTEGER NOT NULL, - date TEXT NOT NULL, - conversions INTEGER DEFAULT 0, - files_processed INTEGER DEFAULT 0, - total_size_mb REAL DEFAULT 0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - UNIQUE(user_id, date), - FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE - ) - """) + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS daily_usage ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + date TEXT NOT NULL, + conversions INTEGER DEFAULT 0, + files_processed INTEGER DEFAULT 0, + total_size_mb REAL DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, date), + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS storage_usage ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + month TEXT NOT NULL, + total_size_mb REAL DEFAULT 0, + file_count INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, month), + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS api_requests ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + endpoint TEXT NOT NULL, + timestamp TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS feature_access ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + feature TEXT NOT NULL, + accessed_at TEXT DEFAULT CURRENT_TIMESTAMP, + allowed BOOLEAN DEFAULT TRUE, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + else: + conn.execute(""" + CREATE TABLE IF NOT EXISTS daily_usage ( + id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL, + date TEXT NOT NULL, + conversions INTEGER DEFAULT 0, + files_processed INTEGER DEFAULT 0, + total_size_mb REAL DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, date), + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS storage_usage ( + id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL, + month TEXT NOT NULL, + total_size_mb REAL DEFAULT 0, + file_count INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, month), + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS api_requests ( + id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL, + endpoint TEXT NOT NULL, + timestamp TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS feature_access ( + id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL, + feature TEXT NOT NULL, + accessed_at TEXT DEFAULT CURRENT_TIMESTAMP, + allowed BOOLEAN DEFAULT 1, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + ) + """) - # Storage usage tracking - conn.execute(""" - CREATE TABLE IF NOT EXISTS storage_usage ( - id INTEGER PRIMARY KEY, - user_id INTEGER NOT NULL, - month TEXT NOT NULL, - total_size_mb REAL DEFAULT 0, - file_count INTEGER DEFAULT 0, - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - UNIQUE(user_id, month), - FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE - ) - """) - - # API rate limiting - conn.execute(""" - CREATE TABLE IF NOT EXISTS api_requests ( - id INTEGER PRIMARY KEY, - user_id INTEGER NOT NULL, - endpoint TEXT NOT NULL, - timestamp TEXT DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE - ) - """) - - # Feature access log - conn.execute(""" - CREATE TABLE IF NOT EXISTS feature_access ( - id INTEGER PRIMARY KEY, - user_id INTEGER NOT NULL, - feature TEXT NOT NULL, - accessed_at TEXT DEFAULT CURRENT_TIMESTAMP, - allowed BOOLEAN DEFAULT 1, - FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE - ) - """) - - conn.commit() logger.info("Quota tables initialized") - except Exception as e: - logger.error(f"Failed to create quota tables: {e}") - raise - finally: - conn.close() @staticmethod def init_quota_db(): @@ -159,36 +175,29 @@ class QuotaService: @staticmethod def get_user_plan(user_id: int) -> str: """Get user's current plan""" - conn = _connect() - try: - row = conn.execute( - "SELECT plan FROM users WHERE id = ?", (user_id,) - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT plan FROM users WHERE id = %s" + if is_postgres() + else "SELECT plan FROM users WHERE id = ?" + ) + cursor = execute_query(conn, sql, (user_id,)) + row = row_to_dict(cursor.fetchone()) return row["plan"] if row else "free" - finally: - conn.close() @staticmethod def get_daily_usage(user_id: int, date: Optional[str] = None) -> Dict: - """ - Get daily usage for a user - - Args: - user_id: User ID - date: Date in YYYY-MM-DD format (defaults to today) - - Returns: - Usage stats dict - """ if not date: date = datetime.utcnow().strftime("%Y-%m-%d") - conn = _connect() - try: - row = conn.execute( - "SELECT * FROM daily_usage WHERE user_id = ? AND date = ?", - (user_id, date), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT * FROM daily_usage WHERE user_id = %s AND date = %s" + if is_postgres() + else "SELECT * FROM daily_usage WHERE user_id = ? AND date = ?" + ) + cursor = execute_query(conn, sql, (user_id, date)) + row = row_to_dict(cursor.fetchone()) if not row: return { @@ -196,33 +205,21 @@ class QuotaService: "files_processed": 0, "total_size_mb": 0, } - - return dict(row) - finally: - conn.close() + return row @staticmethod def record_conversion(user_id: int, file_size_mb: float) -> bool: - """ - Record a file conversion - - Args: - user_id: User ID - file_size_mb: File size in MB - - Returns: - True if allowed, False if quota exceeded - """ plan = QuotaService.get_user_plan(user_id) today = datetime.utcnow().strftime("%Y-%m-%d") - conn = _connect() - try: - # Check daily conversion limit - usage = conn.execute( - "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?", - (user_id, today), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT conversions FROM daily_usage WHERE user_id = %s AND date = %s" + if is_postgres() + else "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?" + ) + cursor = execute_query(conn, sql, (user_id, today)) + usage = row_to_dict(cursor.fetchone()) current_conversions = usage["conversions"] if usage else 0 limit = QuotaLimits.CONVERSIONS_PER_DAY[plan] @@ -231,7 +228,6 @@ class QuotaService: logger.warning(f"User {user_id} exceeded daily conversion limit") return False - # Check file size limit max_size = QuotaLimits.MAX_FILE_SIZE_MB[plan] if file_size_mb > max_size: logger.warning( @@ -239,120 +235,95 @@ class QuotaService: ) return False - # Record the conversion - conn.execute( + upsert_sql = ( """ INSERT INTO daily_usage (user_id, date, conversions, files_processed, total_size_mb) + VALUES (%s, %s, 1, 1, %s) + ON CONFLICT(user_id, date) DO UPDATE SET + conversions = conversions + 1, + files_processed = files_processed + 1, + total_size_mb = total_size_mb + %s + """ + if is_postgres() + else """ + INSERT INTO daily_usage (user_id, date, conversions, files_processed, total_size_mb) VALUES (?, ?, 1, 1, ?) ON CONFLICT(user_id, date) DO UPDATE SET conversions = conversions + 1, files_processed = files_processed + 1, total_size_mb = total_size_mb + ? - """, - (user_id, today, file_size_mb, file_size_mb), + """ + ) + execute_query( + conn, upsert_sql, (user_id, today, file_size_mb, file_size_mb) ) - conn.commit() logger.info(f"Recorded conversion for user {user_id}: {file_size_mb}MB") return True - except Exception as e: - logger.error(f"Failed to record conversion: {e}") - return False - finally: - conn.close() @staticmethod def check_rate_limit(user_id: int) -> Tuple[bool, int]: - """ - Check if user has exceeded API rate limit - - Args: - user_id: User ID - - Returns: - (allowed, remaining_requests_in_window) tuple - """ plan = QuotaService.get_user_plan(user_id) limit = QuotaLimits.API_RATE_LIMIT[plan] if limit == float("inf"): - return True, -1 # Unlimited + return True, -1 - # Check requests in last minute one_minute_ago = (datetime.utcnow() - timedelta(minutes=1)).isoformat() - conn = _connect() - try: - count = conn.execute( - "SELECT COUNT(*) as count FROM api_requests WHERE user_id = ? AND timestamp > ?", - (user_id, one_minute_ago), - ).fetchone()["count"] + with db_connection() as conn: + sql = ( + "SELECT COUNT(*) as count FROM api_requests WHERE user_id = %s AND timestamp > %s" + if is_postgres() + else "SELECT COUNT(*) as count FROM api_requests WHERE user_id = ? AND timestamp > ?" + ) + cursor = execute_query(conn, sql, (user_id, one_minute_ago)) + row = row_to_dict(cursor.fetchone()) + count = row["count"] if row else 0 if count >= limit: return False, 0 - # Record this request - conn.execute( - "INSERT INTO api_requests (user_id, endpoint) VALUES (?, ?)", - (user_id, "api"), + sql2 = ( + "INSERT INTO api_requests (user_id, endpoint) VALUES (%s, %s)" + if is_postgres() + else "INSERT INTO api_requests (user_id, endpoint) VALUES (?, ?)" ) - conn.commit() + execute_query(conn, sql2, (user_id, "api")) return True, limit - count - 1 - finally: - conn.close() @staticmethod def has_feature(user_id: int, feature: str) -> bool: - """ - Check if user has access to a premium feature - - Args: - user_id: User ID - feature: Feature name (e.g., 'batch_processing') - - Returns: - True if user can access feature - """ plan = QuotaService.get_user_plan(user_id) allowed = feature in QuotaLimits.PREMIUM_FEATURES[plan] - # Log feature access attempt - conn = _connect() try: - conn.execute( - "INSERT INTO feature_access (user_id, feature, allowed) VALUES (?, ?, ?)", - (user_id, feature, allowed), - ) - conn.commit() + with db_connection() as conn: + sql = ( + "INSERT INTO feature_access (user_id, feature, allowed) VALUES (%s, %s, %s)" + if is_postgres() + else "INSERT INTO feature_access (user_id, feature, allowed) VALUES (?, ?, ?)" + ) + execute_query(conn, sql, (user_id, feature, allowed)) except Exception as e: logger.error(f"Failed to log feature access: {e}") - finally: - conn.close() return allowed @staticmethod def get_quota_status(user_id: int) -> Dict: - """ - Get comprehensive quota status for a user - - Args: - user_id: User ID - - Returns: - Complete quota status dict - """ plan = QuotaService.get_user_plan(user_id) today = datetime.utcnow().strftime("%Y-%m-%d") - conn = _connect() - try: - # Get daily usage - daily = conn.execute( - "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?", - (user_id, today), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT conversions FROM daily_usage WHERE user_id = %s AND date = %s" + if is_postgres() + else "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?" + ) + cursor = execute_query(conn, sql, (user_id, today)) + daily = row_to_dict(cursor.fetchone()) conversions_used = daily["conversions"] if daily else 0 conversions_limit = QuotaLimits.CONVERSIONS_PER_DAY[plan] @@ -383,63 +354,42 @@ class QuotaService: user_id, "email_delivery" ), } - finally: - conn.close() @staticmethod def get_monthly_storage_usage(user_id: int, year: int, month: int) -> float: - """Get storage usage for a specific month""" month_key = f"{year}-{month:02d}" - conn = _connect() - try: - row = conn.execute( - "SELECT total_size_mb FROM storage_usage WHERE user_id = ? AND month = ?", - (user_id, month_key), - ).fetchone() - + with db_connection() as conn: + sql = ( + "SELECT total_size_mb FROM storage_usage WHERE user_id = %s AND month = %s" + if is_postgres() + else "SELECT total_size_mb FROM storage_usage WHERE user_id = ? AND month = ?" + ) + cursor = execute_query(conn, sql, (user_id, month_key)) + row = row_to_dict(cursor.fetchone()) return row["total_size_mb"] if row else 0 - finally: - conn.close() @staticmethod def upgrade_plan(user_id: int, new_plan: str) -> bool: - """ - Upgrade user to a new plan - - Args: - user_id: User ID - new_plan: New plan ('pro' or 'business') - - Returns: - Success status - """ if new_plan not in QuotaLimits.CONVERSIONS_PER_DAY: logger.error(f"Invalid plan: {new_plan}") return False - conn = _connect() - try: - conn.execute( - "UPDATE users SET plan = ?, updated_at = ? WHERE id = ?", - (new_plan, _utc_now(), user_id), + with db_connection() as conn: + sql = ( + "UPDATE users SET plan = %s, updated_at = %s WHERE id = %s" + if is_postgres() + else "UPDATE users SET plan = ?, updated_at = ? WHERE id = ?" ) - conn.commit() + execute_query(conn, sql, (new_plan, _utc_now(), user_id)) logger.info(f"User {user_id} upgraded to {new_plan}") return True - except Exception as e: - logger.error(f"Failed to upgrade user plan: {e}") - return False - finally: - conn.close() @staticmethod def downgrade_plan(user_id: int, new_plan: str = "free") -> bool: - """Downgrade user to a lower plan""" return QuotaService.upgrade_plan(user_id, new_plan) -# Convenience functions def init_quota_db(): return QuotaService.init_quota_db() diff --git a/backend/app/services/rating_service.py b/backend/app/services/rating_service.py index 2afe05e..4a2b9bd 100644 --- a/backend/app/services/rating_service.py +++ b/backend/app/services/rating_service.py @@ -1,51 +1,64 @@ -"""Rating service — stores and aggregates user ratings per tool.""" +"""Rating service — stores and aggregates user ratings per tool. + +Supports both SQLite (development) and PostgreSQL (production). +""" + import logging -import os -import sqlite3 from datetime import datetime, timezone -from flask import current_app +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict logger = logging.getLogger(__name__) -def _connect() -> sqlite3.Connection: - """Create a SQLite connection.""" - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - connection = sqlite3.connect(db_path) - connection.row_factory = sqlite3.Row - return connection - - def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def init_ratings_db(): """Create ratings table if it does not exist.""" - with _connect() as conn: - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS tool_ratings ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - tool TEXT NOT NULL, - rating INTEGER NOT NULL CHECK(rating BETWEEN 1 AND 5), - feedback TEXT DEFAULT '', - tag TEXT DEFAULT '', - fingerprint TEXT NOT NULL, - created_at TEXT NOT NULL - ); + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS tool_ratings ( + id SERIAL PRIMARY KEY, + tool TEXT NOT NULL, + rating INTEGER NOT NULL CHECK(rating BETWEEN 1 AND 5), + feedback TEXT DEFAULT '', + tag TEXT DEFAULT '', + fingerprint TEXT NOT NULL, + created_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_tool_ratings_tool + ON tool_ratings(tool) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_tool_ratings_fingerprint_tool + ON tool_ratings(fingerprint, tool) + """) + else: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS tool_ratings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tool TEXT NOT NULL, + rating INTEGER NOT NULL CHECK(rating BETWEEN 1 AND 5), + feedback TEXT DEFAULT '', + tag TEXT DEFAULT '', + fingerprint TEXT NOT NULL, + created_at TEXT NOT NULL + ); - CREATE INDEX IF NOT EXISTS idx_tool_ratings_tool - ON tool_ratings(tool); + CREATE INDEX IF NOT EXISTS idx_tool_ratings_tool + ON tool_ratings(tool); - CREATE INDEX IF NOT EXISTS idx_tool_ratings_fingerprint_tool - ON tool_ratings(fingerprint, tool); - """ - ) + CREATE INDEX IF NOT EXISTS idx_tool_ratings_fingerprint_tool + ON tool_ratings(fingerprint, tool); + """ + ) def submit_rating( @@ -57,48 +70,75 @@ def submit_rating( ) -> None: """Store a rating. Limits one rating per fingerprint per tool per day.""" now = _utc_now() - today = now[:10] # YYYY-MM-DD + today = now[:10] - with _connect() as conn: - # Check for duplicate rating from same fingerprint today - existing = conn.execute( - """SELECT id FROM tool_ratings - WHERE fingerprint = ? AND tool = ? AND created_at LIKE ? - LIMIT 1""", - (fingerprint, tool, f"{today}%"), - ).fetchone() + with db_connection() as conn: + like_sql = "LIKE %s" if is_postgres() else "LIKE ?" + sql = ( + f"""SELECT id FROM tool_ratings + WHERE fingerprint = %s AND tool = %s AND created_at {like_sql} + LIMIT 1""" + if is_postgres() + else f"""SELECT id FROM tool_ratings + WHERE fingerprint = ? AND tool = ? AND created_at {like_sql} + LIMIT 1""" + ) + cursor = execute_query(conn, sql, (fingerprint, tool, f"{today}%")) + existing = cursor.fetchone() if existing: - # Update existing rating instead of creating duplicate - conn.execute( + existing = row_to_dict(existing) + update_sql = ( """UPDATE tool_ratings - SET rating = ?, feedback = ?, tag = ?, created_at = ? - WHERE id = ?""", - (rating, feedback, tag, now, existing["id"]), + SET rating = %s, feedback = %s, tag = %s, created_at = %s + WHERE id = %s""" + if is_postgres() + else """UPDATE tool_ratings + SET rating = ?, feedback = ?, tag = ?, created_at = ? + WHERE id = ?""" + ) + execute_query( + conn, update_sql, (rating, feedback, tag, now, existing["id"]) ) else: - conn.execute( + insert_sql = ( """INSERT INTO tool_ratings (tool, rating, feedback, tag, fingerprint, created_at) - VALUES (?, ?, ?, ?, ?, ?)""", - (tool, rating, feedback, tag, fingerprint, now), + VALUES (%s, %s, %s, %s, %s, %s)""" + if is_postgres() + else """INSERT INTO tool_ratings (tool, rating, feedback, tag, fingerprint, created_at) + VALUES (?, ?, ?, ?, ?, ?)""" + ) + execute_query( + conn, insert_sql, (tool, rating, feedback, tag, fingerprint, now) ) def get_tool_rating_summary(tool: str) -> dict: """Return aggregate rating data for one tool.""" - with _connect() as conn: - row = conn.execute( + with db_connection() as conn: + sql = ( """SELECT - COUNT(*) as count, - COALESCE(AVG(rating), 0) as average, - COALESCE(SUM(CASE WHEN rating = 5 THEN 1 ELSE 0 END), 0) as star5, - COALESCE(SUM(CASE WHEN rating = 4 THEN 1 ELSE 0 END), 0) as star4, - COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3, - COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2, - COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1 - FROM tool_ratings WHERE tool = ?""", - (tool,), - ).fetchone() + COUNT(*) as count, + COALESCE(AVG(rating), 0) as average, + COALESCE(SUM(CASE WHEN rating = 5 THEN 1 ELSE 0 END), 0) as star5, + COALESCE(SUM(CASE WHEN rating = 4 THEN 1 ELSE 0 END), 0) as star4, + COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3, + COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2, + COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1 + FROM tool_ratings WHERE tool = %s""" + if is_postgres() + else """SELECT + COUNT(*) as count, + COALESCE(AVG(rating), 0) as average, + COALESCE(SUM(CASE WHEN rating = 5 THEN 1 ELSE 0 END), 0) as star5, + COALESCE(SUM(CASE WHEN rating = 4 THEN 1 ELSE 0 END), 0) as star4, + COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3, + COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2, + COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1 + FROM tool_ratings WHERE tool = ?""" + ) + cursor = execute_query(conn, sql, (tool,)) + row = row_to_dict(cursor.fetchone()) return { "tool": tool, @@ -116,16 +156,16 @@ def get_tool_rating_summary(tool: str) -> dict: def get_all_ratings_summary() -> list[dict]: """Return aggregated ratings for all tools that have at least one rating.""" - with _connect() as conn: - rows = conn.execute( - """SELECT - tool, - COUNT(*) as count, - COALESCE(AVG(rating), 0) as average - FROM tool_ratings - GROUP BY tool - ORDER BY count DESC""" - ).fetchall() + with db_connection() as conn: + sql = """SELECT + tool, + COUNT(*) as count, + COALESCE(AVG(rating), 0) as average + FROM tool_ratings + GROUP BY tool + ORDER BY count DESC""" + cursor = execute_query(conn, sql) + rows = [row_to_dict(r) for r in cursor.fetchall()] return [ { @@ -139,15 +179,15 @@ def get_all_ratings_summary() -> list[dict]: def get_global_rating_summary() -> dict: """Return aggregate rating stats across all rated tools.""" - with _connect() as conn: - row = conn.execute( - """ + with db_connection() as conn: + sql = """ SELECT COUNT(*) AS count, COALESCE(AVG(rating), 0) AS average FROM tool_ratings - """ - ).fetchone() + """ + cursor = execute_query(conn, sql) + row = row_to_dict(cursor.fetchone()) return { "rating_count": int(row["count"]) if row else 0, diff --git a/backend/app/services/site_assistant_service.py b/backend/app/services/site_assistant_service.py index 75d54fa..8bfd81b 100644 --- a/backend/app/services/site_assistant_service.py +++ b/backend/app/services/site_assistant_service.py @@ -1,19 +1,25 @@ -"""Site assistant service — page-aware AI help plus persistent conversation logging.""" +"""Site assistant service — page-aware AI help plus persistent conversation logging. + +Supports both SQLite (development) and PostgreSQL (production). +""" + import json import logging -import os -import sqlite3 import uuid from datetime import datetime, timezone import requests -from flask import current_app from app.services.openrouter_config_service import ( extract_openrouter_text, get_openrouter_settings, ) -from app.services.ai_cost_service import AiBudgetExceededError, check_ai_budget, log_ai_usage +from app.services.ai_cost_service import ( + AiBudgetExceededError, + check_ai_budget, + log_ai_usage, +) +from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict logger = logging.getLogger(__name__) @@ -21,38 +27,166 @@ MAX_HISTORY_MESSAGES = 8 MAX_MESSAGE_LENGTH = 4000 TOOL_CATALOG = [ - {"slug": "pdf-to-word", "label": "PDF to Word", "summary": "convert PDF files into editable Word documents"}, - {"slug": "word-to-pdf", "label": "Word to PDF", "summary": "turn DOC or DOCX files into PDF documents"}, - {"slug": "compress-pdf", "label": "Compress PDF", "summary": "reduce PDF file size while preserving readability"}, - {"slug": "merge-pdf", "label": "Merge PDF", "summary": "combine multiple PDF files into one document"}, - {"slug": "split-pdf", "label": "Split PDF", "summary": "extract ranges or split one PDF into separate pages"}, - {"slug": "rotate-pdf", "label": "Rotate PDF", "summary": "rotate PDF pages to the correct orientation"}, - {"slug": "pdf-to-images", "label": "PDF to Images", "summary": "convert each PDF page into PNG or JPG images"}, - {"slug": "images-to-pdf", "label": "Images to PDF", "summary": "combine multiple images into one PDF"}, - {"slug": "watermark-pdf", "label": "Watermark PDF", "summary": "add text watermarks to PDF pages"}, - {"slug": "remove-watermark-pdf", "label": "Remove Watermark", "summary": "remove supported text and image-overlay watermarks from PDFs"}, - {"slug": "protect-pdf", "label": "Protect PDF", "summary": "add password protection to PDF files"}, - {"slug": "unlock-pdf", "label": "Unlock PDF", "summary": "remove PDF password protection when the password is known"}, - {"slug": "page-numbers", "label": "Page Numbers", "summary": "add page numbers in different positions"}, - {"slug": "pdf-editor", "label": "PDF Editor", "summary": "optimize and clean PDF copies"}, - {"slug": "pdf-flowchart", "label": "PDF Flowchart", "summary": "analyze PDF procedures and turn them into flowcharts"}, - {"slug": "pdf-to-excel", "label": "PDF to Excel", "summary": "extract structured table data into spreadsheet files"}, - {"slug": "html-to-pdf", "label": "HTML to PDF", "summary": "convert HTML documents into PDF"}, - {"slug": "reorder-pdf", "label": "Reorder PDF", "summary": "rearrange PDF pages using a full page order"}, - {"slug": "extract-pages", "label": "Extract Pages", "summary": "create a PDF from selected pages"}, - {"slug": "chat-pdf", "label": "Chat with PDF", "summary": "ask questions about one uploaded PDF"}, - {"slug": "summarize-pdf", "label": "Summarize PDF", "summary": "generate a concise summary of one PDF"}, - {"slug": "translate-pdf", "label": "Translate PDF", "summary": "translate PDF content into another language"}, - {"slug": "extract-tables", "label": "Extract Tables", "summary": "find tables in a PDF and export them"}, - {"slug": "image-converter", "label": "Image Converter", "summary": "convert images between common formats"}, - {"slug": "image-resize", "label": "Image Resize", "summary": "resize images to exact dimensions"}, - {"slug": "compress-image", "label": "Compress Image", "summary": "reduce image file size"}, - {"slug": "ocr", "label": "OCR", "summary": "extract text from image or scanned PDF content"}, - {"slug": "remove-background", "label": "Remove Background", "summary": "remove image backgrounds automatically"}, - {"slug": "qr-code", "label": "QR Code", "summary": "generate QR codes from text or URLs"}, - {"slug": "video-to-gif", "label": "Video to GIF", "summary": "convert short videos into GIF animations"}, - {"slug": "word-counter", "label": "Word Counter", "summary": "count words, characters, and reading metrics"}, - {"slug": "text-cleaner", "label": "Text Cleaner", "summary": "clean up text spacing and formatting"}, + { + "slug": "pdf-to-word", + "label": "PDF to Word", + "summary": "convert PDF files into editable Word documents", + }, + { + "slug": "word-to-pdf", + "label": "Word to PDF", + "summary": "turn DOC or DOCX files into PDF documents", + }, + { + "slug": "compress-pdf", + "label": "Compress PDF", + "summary": "reduce PDF file size while preserving readability", + }, + { + "slug": "merge-pdf", + "label": "Merge PDF", + "summary": "combine multiple PDF files into one document", + }, + { + "slug": "split-pdf", + "label": "Split PDF", + "summary": "extract ranges or split one PDF into separate pages", + }, + { + "slug": "rotate-pdf", + "label": "Rotate PDF", + "summary": "rotate PDF pages to the correct orientation", + }, + { + "slug": "pdf-to-images", + "label": "PDF to Images", + "summary": "convert each PDF page into PNG or JPG images", + }, + { + "slug": "images-to-pdf", + "label": "Images to PDF", + "summary": "combine multiple images into one PDF", + }, + { + "slug": "watermark-pdf", + "label": "Watermark PDF", + "summary": "add text watermarks to PDF pages", + }, + { + "slug": "remove-watermark-pdf", + "label": "Remove Watermark", + "summary": "remove supported text and image-overlay watermarks from PDFs", + }, + { + "slug": "protect-pdf", + "label": "Protect PDF", + "summary": "add password protection to PDF files", + }, + { + "slug": "unlock-pdf", + "label": "Unlock PDF", + "summary": "remove PDF password protection when the password is known", + }, + { + "slug": "page-numbers", + "label": "Page Numbers", + "summary": "add page numbers in different positions", + }, + { + "slug": "pdf-editor", + "label": "PDF Editor", + "summary": "optimize and clean PDF copies", + }, + { + "slug": "pdf-flowchart", + "label": "PDF Flowchart", + "summary": "analyze PDF procedures and turn them into flowcharts", + }, + { + "slug": "pdf-to-excel", + "label": "PDF to Excel", + "summary": "extract structured table data into spreadsheet files", + }, + { + "slug": "html-to-pdf", + "label": "HTML to PDF", + "summary": "convert HTML documents into PDF", + }, + { + "slug": "reorder-pdf", + "label": "Reorder PDF", + "summary": "rearrange PDF pages using a full page order", + }, + { + "slug": "extract-pages", + "label": "Extract Pages", + "summary": "create a PDF from selected pages", + }, + { + "slug": "chat-pdf", + "label": "Chat with PDF", + "summary": "ask questions about one uploaded PDF", + }, + { + "slug": "summarize-pdf", + "label": "Summarize PDF", + "summary": "generate a concise summary of one PDF", + }, + { + "slug": "translate-pdf", + "label": "Translate PDF", + "summary": "translate PDF content into another language", + }, + { + "slug": "extract-tables", + "label": "Extract Tables", + "summary": "find tables in a PDF and export them", + }, + { + "slug": "image-converter", + "label": "Image Converter", + "summary": "convert images between common formats", + }, + { + "slug": "image-resize", + "label": "Image Resize", + "summary": "resize images to exact dimensions", + }, + { + "slug": "compress-image", + "label": "Compress Image", + "summary": "reduce image file size", + }, + { + "slug": "ocr", + "label": "OCR", + "summary": "extract text from image or scanned PDF content", + }, + { + "slug": "remove-background", + "label": "Remove Background", + "summary": "remove image backgrounds automatically", + }, + { + "slug": "qr-code", + "label": "QR Code", + "summary": "generate QR codes from text or URLs", + }, + { + "slug": "video-to-gif", + "label": "Video to GIF", + "summary": "convert short videos into GIF animations", + }, + { + "slug": "word-counter", + "label": "Word Counter", + "summary": "count words, characters, and reading metrics", + }, + { + "slug": "text-cleaner", + "label": "Text Cleaner", + "summary": "clean up text spacing and formatting", + }, ] SYSTEM_PROMPT = """You are the Dociva site assistant. @@ -68,62 +202,92 @@ Rules: """ -def _connect() -> sqlite3.Connection: - db_path = current_app.config["DATABASE_PATH"] - db_dir = os.path.dirname(db_path) - if db_dir: - os.makedirs(db_dir, exist_ok=True) - - connection = sqlite3.connect(db_path) - connection.row_factory = sqlite3.Row - connection.execute("PRAGMA foreign_keys = ON") - return connection - - def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def init_site_assistant_db() -> None: """Create assistant conversation tables if they do not exist.""" - with _connect() as conn: - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS assistant_conversations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - session_id TEXT NOT NULL UNIQUE, - user_id INTEGER, - fingerprint TEXT NOT NULL, - tool_slug TEXT DEFAULT '', - page_url TEXT DEFAULT '', - locale TEXT DEFAULT 'en', - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL - ); + with db_connection() as conn: + if is_postgres(): + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS assistant_conversations ( + id SERIAL PRIMARY KEY, + session_id TEXT NOT NULL UNIQUE, + user_id INTEGER, + fingerprint TEXT NOT NULL, + tool_slug TEXT DEFAULT '', + page_url TEXT DEFAULT '', + locale TEXT DEFAULT 'en', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS assistant_messages ( + id SERIAL PRIMARY KEY, + conversation_id INTEGER NOT NULL, + role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')), + content TEXT NOT NULL, + tool_slug TEXT DEFAULT '', + page_url TEXT DEFAULT '', + locale TEXT DEFAULT 'en', + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY (conversation_id) REFERENCES assistant_conversations(id) ON DELETE CASCADE + ) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_assistant_conversations_user_id + ON assistant_conversations(user_id) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_assistant_messages_conversation_id + ON assistant_messages(conversation_id) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_assistant_messages_created_at + ON assistant_messages(created_at) + """) + else: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS assistant_conversations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL UNIQUE, + user_id INTEGER, + fingerprint TEXT NOT NULL, + tool_slug TEXT DEFAULT '', + page_url TEXT DEFAULT '', + locale TEXT DEFAULT 'en', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); - CREATE TABLE IF NOT EXISTS assistant_messages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - conversation_id INTEGER NOT NULL, - role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')), - content TEXT NOT NULL, - tool_slug TEXT DEFAULT '', - page_url TEXT DEFAULT '', - locale TEXT DEFAULT 'en', - metadata_json TEXT NOT NULL DEFAULT '{}', - created_at TEXT NOT NULL, - FOREIGN KEY (conversation_id) REFERENCES assistant_conversations(id) ON DELETE CASCADE - ); + CREATE TABLE IF NOT EXISTS assistant_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id INTEGER NOT NULL, + role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')), + content TEXT NOT NULL, + tool_slug TEXT DEFAULT '', + page_url TEXT DEFAULT '', + locale TEXT DEFAULT 'en', + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY (conversation_id) REFERENCES assistant_conversations(id) ON DELETE CASCADE + ); - CREATE INDEX IF NOT EXISTS idx_assistant_conversations_user_id - ON assistant_conversations(user_id); + CREATE INDEX IF NOT EXISTS idx_assistant_conversations_user_id + ON assistant_conversations(user_id); - CREATE INDEX IF NOT EXISTS idx_assistant_messages_conversation_id - ON assistant_messages(conversation_id); + CREATE INDEX IF NOT EXISTS idx_assistant_messages_conversation_id + ON assistant_messages(conversation_id); - CREATE INDEX IF NOT EXISTS idx_assistant_messages_created_at - ON assistant_messages(created_at); - """ - ) + CREATE INDEX IF NOT EXISTS idx_assistant_messages_created_at + ON assistant_messages(created_at); + """ + ) def chat_with_site_assistant( @@ -249,8 +413,12 @@ def stream_site_assistant_chat( check_ai_budget() settings = get_openrouter_settings() if not settings.api_key: - logger.error("OPENROUTER_API_KEY is not set — assistant AI unavailable.") - raise RuntimeError("AI assistant is temporarily unavailable. Please try again later.") + logger.error( + "OPENROUTER_API_KEY is not set — assistant AI unavailable." + ) + raise RuntimeError( + "AI assistant is temporarily unavailable. Please try again later." + ) response_model = settings.model messages = _build_ai_messages( @@ -317,32 +485,60 @@ def _ensure_conversation( locale: str, ) -> int: now = _utc_now() - with _connect() as conn: - row = conn.execute( - "SELECT id FROM assistant_conversations WHERE session_id = ?", - (session_id,), - ).fetchone() + with db_connection() as conn: + sql = ( + "SELECT id FROM assistant_conversations WHERE session_id = %s" + if is_postgres() + else "SELECT id FROM assistant_conversations WHERE session_id = ?" + ) + cursor = execute_query(conn, sql, (session_id,)) + row = row_to_dict(cursor.fetchone()) if row is not None: - conn.execute( + update_sql = ( """ UPDATE assistant_conversations + SET user_id = %s, fingerprint = %s, tool_slug = %s, page_url = %s, locale = %s, updated_at = %s + WHERE id = %s + """ + if is_postgres() + else """ + UPDATE assistant_conversations SET user_id = ?, fingerprint = ?, tool_slug = ?, page_url = ?, locale = ?, updated_at = ? WHERE id = ? - """, + """ + ) + execute_query( + conn, + update_sql, (user_id, fingerprint, tool_slug, page_url, locale, now, row["id"]), ) return int(row["id"]) - cursor = conn.execute( + insert_sql = ( """ + INSERT INTO assistant_conversations ( + session_id, user_id, fingerprint, tool_slug, page_url, locale, created_at, updated_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id + """ + if is_postgres() + else """ INSERT INTO assistant_conversations ( session_id, user_id, fingerprint, tool_slug, page_url, locale, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, + """ + ) + cursor2 = execute_query( + conn, + insert_sql, (session_id, user_id, fingerprint, tool_slug, page_url, locale, now, now), ) - return int(cursor.lastrowid) + + if is_postgres(): + result = cursor2.fetchone() + return int(result["id"]) if result else 0 + return int(cursor2.lastrowid) def _record_message( @@ -354,13 +550,23 @@ def _record_message( locale: str, metadata: dict | None = None, ) -> None: - with _connect() as conn: - conn.execute( + with db_connection() as conn: + sql = ( """ + INSERT INTO assistant_messages ( + conversation_id, role, content, tool_slug, page_url, locale, metadata_json, created_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + if is_postgres() + else """ INSERT INTO assistant_messages ( conversation_id, role, content, tool_slug, page_url, locale, metadata_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, + """ + ) + execute_query( + conn, + sql, ( conversation_id, role, @@ -441,7 +647,9 @@ def _request_ai_reply( if not settings.api_key: logger.error("OPENROUTER_API_KEY is not set — assistant AI unavailable.") - raise RuntimeError("AI assistant is temporarily unavailable. Please try again later.") + raise RuntimeError( + "AI assistant is temporarily unavailable. Please try again later." + ) messages = _build_ai_messages( message=message, diff --git a/backend/app/utils/database.py b/backend/app/utils/database.py index 6263bb1..2eff6d8 100644 --- a/backend/app/utils/database.py +++ b/backend/app/utils/database.py @@ -1,7 +1,7 @@ """Database abstraction — supports SQLite (dev) and PostgreSQL (production). Usage: - from app.utils.database import get_connection + from app.utils.database import db_connection, adapt_query The returned connection behaves like a sqlite3.Connection with row_factory set. For PostgreSQL it wraps psycopg2 with RealDictCursor for dict-like rows. @@ -10,8 +10,10 @@ Selection logic: - If DATABASE_URL env var is set (starts with ``postgres``), use PostgreSQL. - Otherwise fall back to SQLite via DATABASE_PATH config. """ + import logging import os +import re import sqlite3 from contextlib import contextmanager @@ -23,6 +25,8 @@ _pg_available = False try: import psycopg2 import psycopg2.extras + import psycopg2.errors + _pg_available = True except ImportError: pass @@ -35,7 +39,13 @@ def is_postgres() -> bool: def _sqlite_connect() -> sqlite3.Connection: - db_path = current_app.config["DATABASE_PATH"] + db_path = current_app.config.get("DATABASE_PATH") + if not db_path: + db_path = os.path.join( + os.path.abspath(os.path.join(os.path.dirname(__file__), "..")), + "data", + "dociva.db", + ) db_dir = os.path.dirname(db_path) if db_dir: os.makedirs(db_dir, exist_ok=True) @@ -50,7 +60,10 @@ def _pg_connect(): if not _pg_available: raise RuntimeError("psycopg2 is not installed — cannot use PostgreSQL.") db_url = os.getenv("DATABASE_URL", "") - conn = psycopg2.connect(db_url, cursor_factory=psycopg2.extras.RealDictCursor) + conn = psycopg2.connect( + db_url, + cursor_factory=psycopg2.extras.RealDictCursor, + ) conn.autocommit = False return conn @@ -76,16 +89,94 @@ def db_connection(): conn.close() -def adapt_sql(sql: str) -> str: - """Adapt SQLite SQL to PostgreSQL if needed. +def adapt_query(sql: str, params: tuple = ()) -> tuple: + """Adapt SQLite SQL and parameters to PostgreSQL if needed. Converts: - INTEGER PRIMARY KEY AUTOINCREMENT -> SERIAL PRIMARY KEY - ? placeholders -> %s placeholders + - params tuple unchanged (psycopg2 accepts tuple with %s) + + Returns (adapted_sql, adapted_params). """ if not is_postgres(): - return sql + return sql, params sql = sql.replace("INTEGER PRIMARY KEY AUTOINCREMENT", "SERIAL PRIMARY KEY") - sql = sql.replace("?", "%s") - return sql + sql = sql.replace("INTEGER PRIMARY KEY", "SERIAL PRIMARY KEY") + sql = sql.replace("BOOLEAN DEFAULT 1", "BOOLEAN DEFAULT TRUE") + sql = sql.replace("BOOLEAN DEFAULT 0", "BOOLEAN DEFAULT FALSE") + sql = re.sub(r"\?", "%s", sql) + + return sql, params + + +def execute_query(conn, sql: str, params: tuple = ()): + """Execute a query, adapting SQL for the current database. + + Returns the cursor. + """ + adapted_sql, adapted_params = adapt_query(sql, params) + if is_postgres(): + cursor = conn.cursor() + cursor.execute(adapted_sql, adapted_params) + return cursor + return conn.execute(adapted_sql, adapted_params) + + +def get_last_insert_id(conn, cursor=None): + """Get the last inserted row ID, compatible with both SQLite and PostgreSQL.""" + if is_postgres(): + if cursor is None: + raise ValueError("cursor is required for PostgreSQL to get last insert ID") + result = cursor.fetchone() + if result: + if isinstance(result, dict): + return result.get("id") or result.get("lastval") + return result[0] + return None + return cursor.lastrowid + + +def get_integrity_error(): + """Get the appropriate IntegrityError exception for the current database.""" + if is_postgres(): + if _pg_available: + return psycopg2.IntegrityError + raise RuntimeError("psycopg2 is not installed") + return sqlite3.IntegrityError + + +def get_row_value(row, key: str): + """Get a value from a row by key, compatible with both SQLite Row and psycopg2 dict.""" + if row is None: + return None + if isinstance(row, dict): + return row.get(key) + return row[key] + + +def row_to_dict(row): + """Convert a database row to a plain dict.""" + if row is None: + return None + if isinstance(row, dict): + return dict(row) + return dict(row) + + +def init_tables(conn): + """Run initialization SQL for all tables, adapting for the current database.""" + if is_postgres(): + cursor = conn.cursor() + cursor.execute( + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'users')" + ) + if cursor.fetchone()[0]: + return + else: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='users'" + ) + if cursor.fetchone(): + return diff --git a/docker-compose.yml b/docker-compose.yml index 077399e..d400f83 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,24 @@ services: timeout: 3s retries: 5 + # --- PostgreSQL --- + postgres: + image: postgres:16-alpine + ports: + - "5432:5432" + environment: + - POSTGRES_DB=dociva + - POSTGRES_USER=dociva + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-dociva_secret_password} + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U dociva -d dociva"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + # --- Flask Backend --- backend: build: @@ -26,6 +44,7 @@ services: - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/1 + - DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva volumes: - ./backend:/app - upload_data:/tmp/uploads @@ -33,6 +52,8 @@ services: depends_on: redis: condition: service_healthy + postgres: + condition: service_healthy restart: unless-stopped # --- Celery Worker --- @@ -52,6 +73,7 @@ services: - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/1 + - DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva volumes: - ./backend:/app - upload_data:/tmp/uploads @@ -59,6 +81,8 @@ services: depends_on: redis: condition: service_healthy + postgres: + condition: service_healthy healthcheck: test: ["CMD", "celery", "-A", "celery_worker.celery", "inspect", "ping"] interval: 30s @@ -82,11 +106,14 @@ services: - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/1 + - DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva volumes: - ./backend:/app depends_on: redis: condition: service_healthy + postgres: + condition: service_healthy restart: unless-stopped # --- React Frontend (Vite Dev) --- @@ -141,5 +168,6 @@ services: volumes: redis_data: + postgres_data: upload_data: output_data: