feat: Add PostgreSQL support and enhance admin dashboard

- Migrate all service files from hardcoded SQLite to dual SQLite/PostgreSQL support
- Add PostgreSQL service to docker-compose.yml
- Create database abstraction layer (database.py) with execute_query, row_to_dict helpers
- Update all 7 service files: account, rating, contact, ai_cost, quota, site_assistant, admin
- Add new admin endpoint /database-stats for table size and row count visualization
- Add database_type field to system health endpoint
- Update .env.example with proper PostgreSQL connection string
This commit is contained in:
Your Name
2026-03-31 21:51:45 +02:00
parent 42b1ad1250
commit 030418f6db
11 changed files with 1930 additions and 1006 deletions

View File

@@ -41,6 +41,7 @@ UPLOAD_FOLDER=/tmp/uploads
OUTPUT_FOLDER=/tmp/outputs OUTPUT_FOLDER=/tmp/outputs
FILE_EXPIRY_SECONDS=1800 FILE_EXPIRY_SECONDS=1800
STORAGE_ALLOW_LOCAL_FALLBACK=true STORAGE_ALLOW_LOCAL_FALLBACK=true
# Use DATABASE_PATH for SQLite (development) or DATABASE_URL for PostgreSQL (production)
DATABASE_PATH=/app/data/dociva.db DATABASE_PATH=/app/data/dociva.db
# CORS # CORS
@@ -64,8 +65,8 @@ STRIPE_PRICE_ID_PRO_YEARLY=
SENTRY_DSN= SENTRY_DSN=
SENTRY_ENVIRONMENT=production SENTRY_ENVIRONMENT=production
# PostgreSQL (production) — leave empty to use SQLite # PostgreSQL (production)
DATABASE_URL= DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD}@postgres:5432/dociva
POSTGRES_DB=dociva POSTGRES_DB=dociva
POSTGRES_USER=dociva POSTGRES_USER=dociva
POSTGRES_PASSWORD=replace-with-strong-postgres-password POSTGRES_PASSWORD=replace-with-strong-postgres-password

View File

@@ -1,8 +1,14 @@
"""Internal admin endpoints secured by authenticated admin sessions.""" """Internal admin endpoints secured by authenticated admin sessions."""
from flask import Blueprint, jsonify, request from flask import Blueprint, jsonify, request
from app.extensions import limiter from app.extensions import limiter
from app.services.account_service import get_user_by_id, is_user_admin, set_user_role, update_user_plan from app.services.account_service import (
get_user_by_id,
is_user_admin,
set_user_role,
update_user_plan,
)
from app.services.admin_service import ( from app.services.admin_service import (
get_admin_overview, get_admin_overview,
get_admin_ratings_detail, get_admin_ratings_detail,
@@ -138,7 +144,9 @@ def update_role_route(user_id: int):
return jsonify({"error": "User not found."}), 404 return jsonify({"error": "User not found."}), 404
if bool(user.get("is_allowlisted_admin")): if bool(user.get("is_allowlisted_admin")):
return jsonify({"error": "Allowlisted admin access is managed by INTERNAL_ADMIN_EMAILS."}), 400 return jsonify(
{"error": "Allowlisted admin access is managed by INTERNAL_ADMIN_EMAILS."}
), 400
if actor_user_id == user_id and role != "admin": if actor_user_id == user_id and role != "admin":
return jsonify({"error": "You cannot remove your own admin role."}), 400 return jsonify({"error": "You cannot remove your own admin role."}), 400
@@ -183,7 +191,9 @@ def admin_ratings_route():
tool_filter = request.args.get("tool", "").strip() tool_filter = request.args.get("tool", "").strip()
return jsonify(get_admin_ratings_detail(page=page, per_page=per_page, tool_filter=tool_filter)), 200 return jsonify(
get_admin_ratings_detail(page=page, per_page=per_page, tool_filter=tool_filter)
), 200
@admin_bp.route("/tool-analytics", methods=["GET"]) @admin_bp.route("/tool-analytics", methods=["GET"])
@@ -247,3 +257,69 @@ def record_plan_interest_route():
record_plan_interest_click(user_id=user_id, plan=plan, billing=billing) record_plan_interest_click(user_id=user_id, plan=plan, billing=billing)
return jsonify({"message": "Interest recorded."}), 200 return jsonify({"message": "Interest recorded."}), 200
@admin_bp.route("/database-stats", methods=["GET"])
@limiter.limit("60/hour")
def admin_database_stats_route():
"""Return database statistics (table sizes, row counts)."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
from app.utils.database import (
db_connection,
execute_query,
is_postgres,
row_to_dict,
)
with db_connection() as conn:
if is_postgres():
tables_sql = """
SELECT
schemaname,
relname AS table_name,
n_live_tup AS row_count,
pg_total_relation_size(relid) AS total_size,
pg_relation_size(relid) AS data_size
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC
"""
else:
tables_sql = """
SELECT name AS table_name FROM sqlite_master
WHERE type='table' ORDER BY name
"""
cursor = execute_query(conn, tables_sql)
tables = []
for row in cursor.fetchall():
row = row_to_dict(row)
if is_postgres():
tables.append(
{
"table_name": row["table_name"],
"row_count": int(row["row_count"]),
"total_size_kb": round(int(row["total_size"]) / 1024, 1),
"data_size_kb": round(int(row["data_size"]) / 1024, 1),
}
)
else:
count_cursor = execute_query(
conn, f"SELECT COUNT(*) AS cnt FROM {row['table_name']}"
)
count_row = row_to_dict(count_cursor.fetchone())
tables.append(
{
"table_name": row["table_name"],
"row_count": int(count_row["cnt"]),
}
)
return jsonify(
{
"database_type": "postgresql" if is_postgres() else "sqlite",
"tables": tables,
"table_count": len(tables),
}
), 200

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,10 @@
"""Internal admin aggregation helpers for operational dashboards.""" """Internal admin aggregation helpers for operational dashboards.
Supports both SQLite (development) and PostgreSQL (production).
"""
import json import json
import os import os
import sqlite3
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from flask import current_app from flask import current_app
@@ -10,16 +13,7 @@ from app.services.account_service import is_allowlisted_admin_email, normalize_r
from app.services.ai_cost_service import get_monthly_spend from app.services.ai_cost_service import get_monthly_spend
from app.services.contact_service import mark_read from app.services.contact_service import mark_read
from app.services.rating_service import get_global_rating_summary from app.services.rating_service import get_global_rating_summary
from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
def _connect() -> sqlite3.Connection:
db_path = current_app.config["DATABASE_PATH"]
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
return connection
def _parse_metadata(raw_value: str | None) -> dict: def _parse_metadata(raw_value: str | None) -> dict:
@@ -36,30 +30,40 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
ai_cost_summary = get_monthly_spend() ai_cost_summary = get_monthly_spend()
with _connect() as conn: with db_connection() as conn:
users_row = conn.execute( users_sql = """
"""
SELECT SELECT
COUNT(*) AS total_users, COUNT(*) AS total_users,
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_users, COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_users,
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_users COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_users
FROM users FROM users
""" """
).fetchone() cursor = execute_query(conn, users_sql)
users_row = row_to_dict(cursor.fetchone())
history_row = conn.execute( history_sql = (
""" """
SELECT
COUNT(*) AS total_files_processed,
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS files_last_24h
FROM file_history
"""
if is_postgres()
else """
SELECT SELECT
COUNT(*) AS total_files_processed, COUNT(*) AS total_files_processed,
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files, COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files, COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files,
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS files_last_24h COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS files_last_24h
FROM file_history FROM file_history
""", """
(cutoff_24h,), )
).fetchone() cursor2 = execute_query(conn, history_sql, (cutoff_24h,))
history_row = row_to_dict(cursor2.fetchone())
top_tools_rows = conn.execute( top_tools_sql = (
""" """
SELECT SELECT
tool, tool,
@@ -68,12 +72,24 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
FROM file_history FROM file_history
GROUP BY tool GROUP BY tool
ORDER BY total_runs DESC, tool ASC ORDER BY total_runs DESC, tool ASC
LIMIT %s
"""
if is_postgres()
else """
SELECT
tool,
COUNT(*) AS total_runs,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_runs
FROM file_history
GROUP BY tool
ORDER BY total_runs DESC, tool ASC
LIMIT ? LIMIT ?
""", """
(top_tools_limit,), )
).fetchall() cursor3 = execute_query(conn, top_tools_sql, (top_tools_limit,))
top_tools_rows = [row_to_dict(r) for r in cursor3.fetchall()]
failure_rows = conn.execute( failure_sql = (
""" """
SELECT SELECT
file_history.id, file_history.id,
@@ -87,12 +103,29 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
LEFT JOIN users ON users.id = file_history.user_id LEFT JOIN users ON users.id = file_history.user_id
WHERE file_history.status = 'failed' WHERE file_history.status = 'failed'
ORDER BY file_history.created_at DESC ORDER BY file_history.created_at DESC
LIMIT %s
"""
if is_postgres()
else """
SELECT
file_history.id,
file_history.user_id,
file_history.tool,
file_history.original_filename,
file_history.metadata_json,
file_history.created_at,
users.email
FROM file_history
LEFT JOIN users ON users.id = file_history.user_id
WHERE file_history.status = 'failed'
ORDER BY file_history.created_at DESC
LIMIT ? LIMIT ?
""", """
(limit_recent,), )
).fetchall() cursor4 = execute_query(conn, failure_sql, (limit_recent,))
failure_rows = [row_to_dict(r) for r in cursor4.fetchall()]
recent_user_rows = conn.execute( recent_user_sql = (
""" """
SELECT SELECT
users.id, users.id,
@@ -103,33 +136,66 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys
FROM users FROM users
ORDER BY users.created_at DESC ORDER BY users.created_at DESC
LIMIT ? LIMIT %s
""",
(limit_recent,),
).fetchall()
contact_row = conn.execute(
""" """
if is_postgres()
else """
SELECT
users.id,
users.email,
users.plan,
users.created_at,
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id), 0) AS total_tasks,
COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys
FROM users
ORDER BY users.created_at DESC
LIMIT ?
"""
)
cursor5 = execute_query(conn, recent_user_sql, (limit_recent,))
recent_user_rows = [row_to_dict(r) for r in cursor5.fetchall()]
contact_sql = (
"""
SELECT
COUNT(*) AS total_messages,
COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread_messages
FROM contact_messages
"""
if is_postgres()
else """
SELECT SELECT
COUNT(*) AS total_messages, COUNT(*) AS total_messages,
COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread_messages COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread_messages
FROM contact_messages FROM contact_messages
""" """
).fetchone() )
cursor6 = execute_query(conn, contact_sql)
contact_row = row_to_dict(cursor6.fetchone())
recent_contact_rows = conn.execute( recent_contact_sql = (
""" """
SELECT id, name, email, category, subject, message, created_at, is_read SELECT id, name, email, category, subject, message, created_at, is_read
FROM contact_messages FROM contact_messages
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT %s
"""
if is_postgres()
else """
SELECT id, name, email, category, subject, message, created_at, is_read
FROM contact_messages
ORDER BY created_at DESC
LIMIT ? LIMIT ?
""", """
(limit_recent,), )
).fetchall() cursor7 = execute_query(conn, recent_contact_sql, (limit_recent,))
recent_contact_rows = [row_to_dict(r) for r in cursor7.fetchall()]
total_processed = int(history_row["total_files_processed"]) if history_row else 0 total_processed = int(history_row["total_files_processed"]) if history_row else 0
completed_files = int(history_row["completed_files"]) if history_row else 0 completed_files = int(history_row["completed_files"]) if history_row else 0
success_rate = round((completed_files / total_processed) * 100, 1) if total_processed else 0.0 success_rate = (
round((completed_files / total_processed) * 100, 1) if total_processed else 0.0
)
return { return {
"users": { "users": {
@@ -153,7 +219,9 @@ def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
}, },
"contacts": { "contacts": {
"total_messages": int(contact_row["total_messages"]) if contact_row else 0, "total_messages": int(contact_row["total_messages"]) if contact_row else 0,
"unread_messages": int(contact_row["unread_messages"]) if contact_row else 0, "unread_messages": int(contact_row["unread_messages"])
if contact_row
else 0,
"recent": [ "recent": [
{ {
"id": row["id"], "id": row["id"],
@@ -219,20 +287,32 @@ def list_admin_users(limit: int = 25, query: str = "") -> list[dict]:
""" """
params: list[object] = [] params: list[object] = []
if normalized_query: if normalized_query:
sql += " WHERE LOWER(users.email) LIKE ?" sql += (
" WHERE LOWER(users.email) LIKE %s"
if is_postgres()
else " WHERE LOWER(users.email) LIKE ?"
)
params.append(f"%{normalized_query}%") params.append(f"%{normalized_query}%")
sql += " ORDER BY users.created_at DESC LIMIT ?" sql += (
" ORDER BY users.created_at DESC LIMIT %s"
if is_postgres()
else " ORDER BY users.created_at DESC LIMIT ?"
)
params.append(limit) params.append(limit)
with _connect() as conn: with db_connection() as conn:
rows = conn.execute(sql, tuple(params)).fetchall() cursor = execute_query(conn, sql, tuple(params))
rows = cursor.fetchall()
rows = [row_to_dict(r) for r in rows]
return [ return [
{ {
"id": row["id"], "id": row["id"],
"email": row["email"], "email": row["email"],
"plan": row["plan"], "plan": row["plan"],
"role": "admin" if is_allowlisted_admin_email(row["email"]) else normalize_role(row["role"]), "role": "admin"
if is_allowlisted_admin_email(row["email"])
else normalize_role(row["role"]),
"is_allowlisted_admin": is_allowlisted_admin_email(row["email"]), "is_allowlisted_admin": is_allowlisted_admin_email(row["email"]),
"created_at": row["created_at"], "created_at": row["created_at"],
"total_tasks": int(row["total_tasks"]), "total_tasks": int(row["total_tasks"]),
@@ -249,19 +329,32 @@ def list_admin_contacts(page: int = 1, per_page: int = 20) -> dict:
safe_per_page = max(1, min(per_page, 100)) safe_per_page = max(1, min(per_page, 100))
offset = (safe_page - 1) * safe_per_page offset = (safe_page - 1) * safe_per_page
with _connect() as conn: with db_connection() as conn:
total_row = conn.execute( total_sql = (
"SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages" "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages"
).fetchone() if is_postgres()
rows = conn.execute( else "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages"
)
cursor = execute_query(conn, total_sql)
total_row = row_to_dict(cursor.fetchone())
rows_sql = (
""" """
SELECT id, name, email, category, subject, message, created_at, is_read SELECT id, name, email, category, subject, message, created_at, is_read
FROM contact_messages FROM contact_messages
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
if is_postgres()
else """
SELECT id, name, email, category, subject, message, created_at, is_read
FROM contact_messages
ORDER BY created_at DESC
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
""", """
(safe_per_page, offset), )
).fetchall() cursor2 = execute_query(conn, rows_sql, (safe_per_page, offset))
rows = [row_to_dict(r) for r in cursor2.fetchall()]
return { return {
"items": [ "items": [
@@ -288,16 +381,25 @@ def mark_admin_contact_read(message_id: int) -> bool:
return mark_read(message_id) return mark_read(message_id)
# ---------------------------------------------------------------------------
# Enhanced admin analytics
# ---------------------------------------------------------------------------
def _ensure_plan_interest_table(): def _ensure_plan_interest_table():
"""Create plan_interest_clicks table if it does not exist.""" """Create plan_interest_clicks table if it does not exist."""
with _connect() as conn: with db_connection() as conn:
conn.execute( if is_postgres():
""" cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS plan_interest_clicks (
id SERIAL PRIMARY KEY,
user_id INTEGER,
plan TEXT NOT NULL,
billing TEXT NOT NULL DEFAULT 'monthly',
created_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)
""")
else:
conn.execute("""
CREATE TABLE IF NOT EXISTS plan_interest_clicks ( CREATE TABLE IF NOT EXISTS plan_interest_clicks (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER, user_id INTEGER,
@@ -305,22 +407,25 @@ def _ensure_plan_interest_table():
billing TEXT NOT NULL DEFAULT 'monthly', billing TEXT NOT NULL DEFAULT 'monthly',
created_at TEXT NOT NULL created_at TEXT NOT NULL
) )
""" """)
)
conn.execute( conn.execute(
"CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)" "CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)"
) )
def record_plan_interest_click(user_id: int | None, plan: str, billing: str = "monthly") -> None: def record_plan_interest_click(
user_id: int | None, plan: str, billing: str = "monthly"
) -> None:
"""Record a click on a pricing plan button.""" """Record a click on a pricing plan button."""
_ensure_plan_interest_table() _ensure_plan_interest_table()
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
with _connect() as conn: with db_connection() as conn:
conn.execute( sql = (
"INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (?, ?, ?, ?)", "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (%s, %s, %s, %s)"
(user_id, plan, billing, now), if is_postgres()
else "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (?, ?, ?, ?)"
) )
execute_query(conn, sql, (user_id, plan, billing, now))
def get_plan_interest_summary() -> dict: def get_plan_interest_summary() -> dict:
@@ -329,30 +434,39 @@ def get_plan_interest_summary() -> dict:
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
with _connect() as conn: with db_connection() as conn:
total_row = conn.execute( total_sql = (
""" """
SELECT
COUNT(*) AS total_clicks,
COUNT(DISTINCT user_id) AS unique_users,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_7d,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_30d
FROM plan_interest_clicks
"""
if is_postgres()
else """
SELECT SELECT
COUNT(*) AS total_clicks, COUNT(*) AS total_clicks,
COUNT(DISTINCT user_id) AS unique_users, COUNT(DISTINCT user_id) AS unique_users,
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_7d, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_7d,
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_30d COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_30d
FROM plan_interest_clicks FROM plan_interest_clicks
""",
(cutoff_7d, cutoff_30d),
).fetchone()
by_plan_rows = conn.execute(
""" """
)
cursor = execute_query(conn, total_sql, (cutoff_7d, cutoff_30d))
total_row = row_to_dict(cursor.fetchone())
by_plan_sql = """
SELECT plan, billing, COUNT(*) AS clicks SELECT plan, billing, COUNT(*) AS clicks
FROM plan_interest_clicks FROM plan_interest_clicks
GROUP BY plan, billing GROUP BY plan, billing
ORDER BY clicks DESC ORDER BY clicks DESC
""" """
).fetchall() cursor2 = execute_query(conn, by_plan_sql)
by_plan_rows = [row_to_dict(r) for r in cursor2.fetchall()]
recent_rows = conn.execute( recent_sql = """
"""
SELECT SELECT
plan_interest_clicks.id, plan_interest_clicks.id,
plan_interest_clicks.user_id, plan_interest_clicks.user_id,
@@ -365,7 +479,8 @@ def get_plan_interest_summary() -> dict:
ORDER BY plan_interest_clicks.created_at DESC ORDER BY plan_interest_clicks.created_at DESC
LIMIT 20 LIMIT 20
""" """
).fetchall() cursor3 = execute_query(conn, recent_sql)
recent_rows = [row_to_dict(r) for r in cursor3.fetchall()]
return { return {
"total_clicks": int(total_row["total_clicks"]) if total_row else 0, "total_clicks": int(total_row["total_clicks"]) if total_row else 0,
@@ -373,7 +488,11 @@ def get_plan_interest_summary() -> dict:
"clicks_last_7d": int(total_row["clicks_last_7d"]) if total_row else 0, "clicks_last_7d": int(total_row["clicks_last_7d"]) if total_row else 0,
"clicks_last_30d": int(total_row["clicks_last_30d"]) if total_row else 0, "clicks_last_30d": int(total_row["clicks_last_30d"]) if total_row else 0,
"by_plan": [ "by_plan": [
{"plan": row["plan"], "billing": row["billing"], "clicks": int(row["clicks"])} {
"plan": row["plan"],
"billing": row["billing"],
"clicks": int(row["clicks"]),
}
for row in by_plan_rows for row in by_plan_rows
], ],
"recent": [ "recent": [
@@ -390,39 +509,43 @@ def get_plan_interest_summary() -> dict:
} }
def get_admin_ratings_detail(page: int = 1, per_page: int = 20, tool_filter: str = "") -> dict: def get_admin_ratings_detail(
page: int = 1, per_page: int = 20, tool_filter: str = ""
) -> dict:
"""Return detailed ratings list with feedback for the admin dashboard.""" """Return detailed ratings list with feedback for the admin dashboard."""
safe_page = max(1, page) safe_page = max(1, page)
safe_per_page = max(1, min(per_page, 100)) safe_per_page = max(1, min(per_page, 100))
offset = (safe_page - 1) * safe_per_page offset = (safe_page - 1) * safe_per_page
with _connect() as conn: with db_connection() as conn:
# Total count
count_sql = "SELECT COUNT(*) AS total FROM tool_ratings" count_sql = "SELECT COUNT(*) AS total FROM tool_ratings"
count_params: list[object] = [] count_params: list[object] = []
if tool_filter: if tool_filter:
count_sql += " WHERE tool = ?" count_sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?"
count_params.append(tool_filter) count_params.append(tool_filter)
total_row = conn.execute(count_sql, tuple(count_params)).fetchone() cursor = execute_query(conn, count_sql, tuple(count_params))
total_row = row_to_dict(cursor.fetchone())
# Paginated ratings
sql = """ sql = """
SELECT id, tool, rating, feedback, tag, fingerprint, created_at SELECT id, tool, rating, feedback, tag, fingerprint, created_at
FROM tool_ratings FROM tool_ratings
""" """
params: list[object] = [] params: list[object] = []
if tool_filter: if tool_filter:
sql += " WHERE tool = ?" sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?"
params.append(tool_filter) params.append(tool_filter)
sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?" sql += (
" ORDER BY created_at DESC LIMIT %s OFFSET %s"
if is_postgres()
else " ORDER BY created_at DESC LIMIT ? OFFSET ?"
)
params.extend([safe_per_page, offset]) params.extend([safe_per_page, offset])
rows = conn.execute(sql, tuple(params)).fetchall() cursor2 = execute_query(conn, sql, tuple(params))
rows = [row_to_dict(r) for r in cursor2.fetchall()]
# Per-tool summary summary_sql = """
summary_rows = conn.execute(
"""
SELECT SELECT
tool, tool,
COUNT(*) AS count, COUNT(*) AS count,
@@ -433,7 +556,8 @@ def get_admin_ratings_detail(page: int = 1, per_page: int = 20, tool_filter: str
GROUP BY tool GROUP BY tool
ORDER BY count DESC ORDER BY count DESC
""" """
).fetchall() cursor3 = execute_query(conn, summary_sql)
summary_rows = [row_to_dict(r) for r in cursor3.fetchall()]
return { return {
"items": [ "items": [
@@ -469,10 +593,24 @@ def get_admin_tool_analytics() -> dict:
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
with _connect() as conn: with db_connection() as conn:
# Per-tool detailed stats tool_sql = (
tool_rows = conn.execute(
""" """
SELECT
tool,
COUNT(*) AS total_runs,
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_24h,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_7d,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_30d,
COUNT(DISTINCT user_id) AS unique_users
FROM file_history
GROUP BY tool
ORDER BY total_runs DESC
"""
if is_postgres()
else """
SELECT SELECT
tool, tool,
COUNT(*) AS total_runs, COUNT(*) AS total_runs,
@@ -485,13 +623,25 @@ def get_admin_tool_analytics() -> dict:
FROM file_history FROM file_history
GROUP BY tool GROUP BY tool
ORDER BY total_runs DESC ORDER BY total_runs DESC
""",
(cutoff_24h, cutoff_7d, cutoff_30d),
).fetchall()
# Daily usage for the last 30 days
daily_rows = conn.execute(
""" """
)
cursor = execute_query(conn, tool_sql, (cutoff_24h, cutoff_7d, cutoff_30d))
tool_rows = [row_to_dict(r) for r in cursor.fetchall()]
daily_sql = (
"""
SELECT
created_at::date AS day,
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
FROM file_history
WHERE created_at >= %s
GROUP BY created_at::date
ORDER BY day ASC
"""
if is_postgres()
else """
SELECT SELECT
DATE(created_at) AS day, DATE(created_at) AS day,
COUNT(*) AS total, COUNT(*) AS total,
@@ -501,13 +651,25 @@ def get_admin_tool_analytics() -> dict:
WHERE created_at >= ? WHERE created_at >= ?
GROUP BY DATE(created_at) GROUP BY DATE(created_at)
ORDER BY day ASC ORDER BY day ASC
""",
(cutoff_30d,),
).fetchall()
# Most common errors
error_rows = conn.execute(
""" """
)
cursor2 = execute_query(conn, daily_sql, (cutoff_30d,))
daily_rows = [row_to_dict(r) for r in cursor2.fetchall()]
error_sql = (
"""
SELECT
tool,
metadata_json,
COUNT(*) AS occurrences
FROM file_history
WHERE status = 'failed' AND created_at >= %s
GROUP BY tool, metadata_json
ORDER BY occurrences DESC
LIMIT 15
"""
if is_postgres()
else """
SELECT SELECT
tool, tool,
metadata_json, metadata_json,
@@ -517,9 +679,10 @@ def get_admin_tool_analytics() -> dict:
GROUP BY tool, metadata_json GROUP BY tool, metadata_json
ORDER BY occurrences DESC ORDER BY occurrences DESC
LIMIT 15 LIMIT 15
""", """
(cutoff_30d,), )
).fetchall() cursor3 = execute_query(conn, error_sql, (cutoff_30d,))
error_rows = [row_to_dict(r) for r in cursor3.fetchall()]
return { return {
"tools": [ "tools": [
@@ -528,7 +691,11 @@ def get_admin_tool_analytics() -> dict:
"total_runs": int(row["total_runs"]), "total_runs": int(row["total_runs"]),
"completed": int(row["completed"]), "completed": int(row["completed"]),
"failed": int(row["failed"]), "failed": int(row["failed"]),
"success_rate": round((int(row["completed"]) / int(row["total_runs"])) * 100, 1) if int(row["total_runs"]) > 0 else 0, "success_rate": round(
(int(row["completed"]) / int(row["total_runs"])) * 100, 1
)
if int(row["total_runs"]) > 0
else 0,
"runs_24h": int(row["runs_24h"]), "runs_24h": int(row["runs_24h"]),
"runs_7d": int(row["runs_7d"]), "runs_7d": int(row["runs_7d"]),
"runs_30d": int(row["runs_30d"]), "runs_30d": int(row["runs_30d"]),
@@ -538,7 +705,7 @@ def get_admin_tool_analytics() -> dict:
], ],
"daily_usage": [ "daily_usage": [
{ {
"day": row["day"], "day": str(row["day"]),
"total": int(row["total"]), "total": int(row["total"]),
"completed": int(row["completed"]), "completed": int(row["completed"]),
"failed": int(row["failed"]), "failed": int(row["failed"]),
@@ -548,7 +715,9 @@ def get_admin_tool_analytics() -> dict:
"common_errors": [ "common_errors": [
{ {
"tool": row["tool"], "tool": row["tool"],
"error": _parse_metadata(row["metadata_json"]).get("error", "Unknown error"), "error": _parse_metadata(row["metadata_json"]).get(
"error", "Unknown error"
),
"occurrences": int(row["occurrences"]), "occurrences": int(row["occurrences"]),
} }
for row in error_rows for row in error_rows
@@ -561,9 +730,19 @@ def get_admin_user_registration_stats() -> dict:
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
with _connect() as conn: with db_connection() as conn:
totals_row = conn.execute( totals_sql = (
""" """
SELECT
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_7d,
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_30d,
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count,
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count
FROM users
"""
if is_postgres()
else """
SELECT SELECT
COUNT(*) AS total, COUNT(*) AS total,
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS last_7d, COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS last_7d,
@@ -571,25 +750,32 @@ def get_admin_user_registration_stats() -> dict:
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count, COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count,
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count
FROM users FROM users
""",
(cutoff_7d, cutoff_30d),
).fetchone()
# Daily registrations for the last 30 days
daily_rows = conn.execute(
""" """
)
cursor = execute_query(conn, totals_sql, (cutoff_7d, cutoff_30d))
totals_row = row_to_dict(cursor.fetchone())
daily_sql = (
"""
SELECT created_at::date AS day, COUNT(*) AS registrations
FROM users
WHERE created_at >= %s
GROUP BY created_at::date
ORDER BY day ASC
"""
if is_postgres()
else """
SELECT DATE(created_at) AS day, COUNT(*) AS registrations SELECT DATE(created_at) AS day, COUNT(*) AS registrations
FROM users FROM users
WHERE created_at >= ? WHERE created_at >= ?
GROUP BY DATE(created_at) GROUP BY DATE(created_at)
ORDER BY day ASC ORDER BY day ASC
""",
(cutoff_30d,),
).fetchall()
# Most active users (by task count)
active_rows = conn.execute(
""" """
)
cursor2 = execute_query(conn, daily_sql, (cutoff_30d,))
daily_rows = [row_to_dict(r) for r in cursor2.fetchall()]
active_sql = """
SELECT SELECT
users.id, users.id,
users.email, users.email,
@@ -602,7 +788,8 @@ def get_admin_user_registration_stats() -> dict:
ORDER BY total_tasks DESC ORDER BY total_tasks DESC
LIMIT 10 LIMIT 10
""" """
).fetchall() cursor3 = execute_query(conn, active_sql)
active_rows = [row_to_dict(r) for r in cursor3.fetchall()]
return { return {
"total_users": int(totals_row["total"]) if totals_row else 0, "total_users": int(totals_row["total"]) if totals_row else 0,
@@ -611,7 +798,7 @@ def get_admin_user_registration_stats() -> dict:
"pro_users": int(totals_row["pro_count"]) if totals_row else 0, "pro_users": int(totals_row["pro_count"]) if totals_row else 0,
"free_users": int(totals_row["free_count"]) if totals_row else 0, "free_users": int(totals_row["free_count"]) if totals_row else 0,
"daily_registrations": [ "daily_registrations": [
{"day": row["day"], "count": int(row["registrations"])} {"day": str(row["day"]), "count": int(row["registrations"])}
for row in daily_rows for row in daily_rows
], ],
"most_active_users": [ "most_active_users": [
@@ -634,23 +821,33 @@ def get_admin_system_health() -> dict:
ai_cost_summary = get_monthly_spend() ai_cost_summary = get_monthly_spend()
settings = get_openrouter_settings() settings = get_openrouter_settings()
with _connect() as conn: with db_connection() as conn:
# Recent error rate (last 1h)
cutoff_1h = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() cutoff_1h = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
error_row = conn.execute( error_sql = (
""" """
SELECT SELECT
COUNT(*) AS total, COUNT(*) AS total,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
FROM file_history FROM file_history
WHERE created_at >= %s
"""
if is_postgres()
else """
SELECT
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
FROM file_history
WHERE created_at >= ? WHERE created_at >= ?
""", """
(cutoff_1h,), )
).fetchone() cursor = execute_query(conn, error_sql, (cutoff_1h,))
error_row = row_to_dict(cursor.fetchone())
# DB size db_size_mb = 0
db_path = current_app.config["DATABASE_PATH"] if not is_postgres():
db_size_mb = round(os.path.getsize(db_path) / (1024 * 1024), 2) if os.path.exists(db_path) else 0 db_path = current_app.config.get("DATABASE_PATH", "")
if db_path and os.path.exists(db_path):
db_size_mb = round(os.path.getsize(db_path) / (1024 * 1024), 2)
error_total = int(error_row["total"]) if error_row else 0 error_total = int(error_row["total"]) if error_row else 0
error_failed = int(error_row["failed"]) if error_row else 0 error_failed = int(error_row["failed"]) if error_row else 0
@@ -659,8 +856,11 @@ def get_admin_system_health() -> dict:
"ai_configured": bool(settings.api_key), "ai_configured": bool(settings.api_key),
"ai_model": settings.model, "ai_model": settings.model,
"ai_budget_used_percent": ai_cost_summary["budget_used_percent"], "ai_budget_used_percent": ai_cost_summary["budget_used_percent"],
"error_rate_1h": round((error_failed / error_total) * 100, 1) if error_total > 0 else 0, "error_rate_1h": round((error_failed / error_total) * 100, 1)
if error_total > 0
else 0,
"tasks_last_1h": error_total, "tasks_last_1h": error_total,
"failures_last_1h": error_failed, "failures_last_1h": error_failed,
"database_size_mb": db_size_mb, "database_size_mb": db_size_mb,
"database_type": "postgresql" if is_postgres() else "sqlite",
} }

View File

@@ -1,31 +1,23 @@
"""AI cost tracking service — monitors and limits AI API spending.""" """AI cost tracking service — monitors and limits AI API spending.
Supports both SQLite (development) and PostgreSQL (production).
"""
import logging import logging
import os import os
import sqlite3
from datetime import datetime, timezone from datetime import datetime, timezone
from flask import current_app from flask import current_app
from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Monthly budget in USD — set via environment variable, default $50
AI_MONTHLY_BUDGET = float(os.getenv("AI_MONTHLY_BUDGET", "50.0")) AI_MONTHLY_BUDGET = float(os.getenv("AI_MONTHLY_BUDGET", "50.0"))
# Estimated cost per 1K tokens (adjust based on your model)
COST_PER_1K_INPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_INPUT", "0.0")) COST_PER_1K_INPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_INPUT", "0.0"))
COST_PER_1K_OUTPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_OUTPUT", "0.0")) COST_PER_1K_OUTPUT_TOKENS = float(os.getenv("AI_COST_PER_1K_OUTPUT", "0.0"))
def _connect() -> sqlite3.Connection:
db_path = current_app.config["DATABASE_PATH"]
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
return connection
def _utc_now() -> str: def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
@@ -36,7 +28,26 @@ def _current_month() -> str:
def init_ai_cost_db(): def init_ai_cost_db():
"""Create AI cost tracking table if not exists.""" """Create AI cost tracking table if not exists."""
with _connect() as conn: with db_connection() as conn:
if is_postgres():
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS ai_cost_log (
id SERIAL PRIMARY KEY,
tool TEXT NOT NULL,
model TEXT NOT NULL,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
estimated_cost_usd REAL DEFAULT 0.0,
period_month TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_ai_cost_period
ON ai_cost_log(period_month)
""")
else:
conn.executescript( conn.executescript(
""" """
CREATE TABLE IF NOT EXISTS ai_cost_log ( CREATE TABLE IF NOT EXISTS ai_cost_log (
@@ -63,22 +74,41 @@ def log_ai_usage(
output_tokens: int = 0, output_tokens: int = 0,
) -> None: ) -> None:
"""Log an AI API call with token usage.""" """Log an AI API call with token usage."""
estimated_cost = ( estimated_cost = (input_tokens / 1000.0) * COST_PER_1K_INPUT_TOKENS + (
(input_tokens / 1000.0) * COST_PER_1K_INPUT_TOKENS output_tokens / 1000.0
+ (output_tokens / 1000.0) * COST_PER_1K_OUTPUT_TOKENS ) * COST_PER_1K_OUTPUT_TOKENS
)
with _connect() as conn: with db_connection() as conn:
conn.execute( sql = (
"""INSERT INTO ai_cost_log """INSERT INTO ai_cost_log
(tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at) (tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""", VALUES (%s, %s, %s, %s, %s, %s, %s)"""
(tool, model, input_tokens, output_tokens, estimated_cost, _current_month(), _utc_now()), if is_postgres()
else """INSERT INTO ai_cost_log
(tool, model, input_tokens, output_tokens, estimated_cost_usd, period_month, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)"""
)
execute_query(
conn,
sql,
(
tool,
model,
input_tokens,
output_tokens,
estimated_cost,
_current_month(),
_utc_now(),
),
) )
logger.info( logger.info(
"AI usage: tool=%s model=%s in=%d out=%d cost=$%.4f", "AI usage: tool=%s model=%s in=%d out=%d cost=$%.4f",
tool, model, input_tokens, output_tokens, estimated_cost, tool,
model,
input_tokens,
output_tokens,
estimated_cost,
) )
@@ -86,17 +116,26 @@ def get_monthly_spend() -> dict:
"""Get the current month's AI spending summary.""" """Get the current month's AI spending summary."""
month = _current_month() month = _current_month()
with _connect() as conn: with db_connection() as conn:
row = conn.execute( sql = (
"""SELECT """SELECT
COUNT(*) as total_calls, COUNT(*) as total_calls,
COALESCE(SUM(input_tokens), 0) as total_input_tokens, COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens, COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost
FROM ai_cost_log FROM ai_cost_log
WHERE period_month = ?""", WHERE period_month = %s"""
(month,), if is_postgres()
).fetchone() else """SELECT
COUNT(*) as total_calls,
COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(estimated_cost_usd), 0.0) as total_cost
FROM ai_cost_log
WHERE period_month = ?"""
)
cursor = execute_query(conn, sql, (month,))
row = row_to_dict(cursor.fetchone())
return { return {
"period": month, "period": month,
@@ -107,7 +146,10 @@ def get_monthly_spend() -> dict:
"budget_usd": AI_MONTHLY_BUDGET, "budget_usd": AI_MONTHLY_BUDGET,
"budget_remaining_usd": round(AI_MONTHLY_BUDGET - row["total_cost"], 4), "budget_remaining_usd": round(AI_MONTHLY_BUDGET - row["total_cost"], 4),
"budget_used_percent": round( "budget_used_percent": round(
(row["total_cost"] / AI_MONTHLY_BUDGET * 100) if AI_MONTHLY_BUDGET > 0 else 0, 1 (row["total_cost"] / AI_MONTHLY_BUDGET * 100)
if AI_MONTHLY_BUDGET > 0
else 0,
1,
), ),
} }
@@ -128,4 +170,5 @@ def check_ai_budget() -> None:
class AiBudgetExceededError(Exception): class AiBudgetExceededError(Exception):
"""Raised when the monthly AI budget is exceeded.""" """Raised when the monthly AI budget is exceeded."""
pass pass

View File

@@ -1,32 +1,39 @@
"""Contact form service — stores messages and sends notification emails.""" """Contact form service — stores messages and sends notification emails.
Supports both SQLite (development) and PostgreSQL (production).
"""
import logging import logging
import os
import sqlite3
from datetime import datetime, timezone from datetime import datetime, timezone
from flask import current_app from flask import current_app
from app.services.email_service import send_email from app.services.email_service import send_email
from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
VALID_CATEGORIES = {"general", "bug", "feature"} VALID_CATEGORIES = {"general", "bug", "feature"}
def _connect() -> sqlite3.Connection:
db_path = current_app.config["DATABASE_PATH"]
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_contact_db() -> None: def init_contact_db() -> None:
"""Create the contact_messages table if it doesn't exist.""" """Create the contact_messages table if it doesn't exist."""
conn = _connect() with db_connection() as conn:
try: if is_postgres():
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS contact_messages (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'general',
subject TEXT NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL,
is_read BOOLEAN NOT NULL DEFAULT FALSE
)
""")
else:
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS contact_messages ( CREATE TABLE IF NOT EXISTS contact_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -39,33 +46,41 @@ def init_contact_db() -> None:
is_read INTEGER NOT NULL DEFAULT 0 is_read INTEGER NOT NULL DEFAULT 0
) )
""") """)
conn.commit()
finally:
conn.close()
def save_message(name: str, email: str, category: str, subject: str, message: str) -> dict: def save_message(
name: str, email: str, category: str, subject: str, message: str
) -> dict:
"""Persist a contact message and send a notification email.""" """Persist a contact message and send a notification email."""
if category not in VALID_CATEGORIES: if category not in VALID_CATEGORIES:
category = "general" category = "general"
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
conn = _connect()
try:
cursor = conn.execute(
"""INSERT INTO contact_messages (name, email, category, subject, message, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, email, category, subject, message, now),
)
conn.commit()
msg_id = cursor.lastrowid
finally:
conn.close()
# Send notification email to admin with db_connection() as conn:
sql = (
"""INSERT INTO contact_messages (name, email, category, subject, message, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id"""
if is_postgres()
else """INSERT INTO contact_messages (name, email, category, subject, message, created_at)
VALUES (?, ?, ?, ?, ?, ?)"""
)
cursor = execute_query(
conn, sql, (name, email, category, subject, message, now)
)
if is_postgres():
result = cursor.fetchone()
msg_id = result["id"] if result else None
else:
msg_id = cursor.lastrowid
admin_emails = tuple(current_app.config.get("INTERNAL_ADMIN_EMAILS", ())) admin_emails = tuple(current_app.config.get("INTERNAL_ADMIN_EMAILS", ()))
admin_email = admin_emails[0] if admin_emails else current_app.config.get( admin_email = (
"SMTP_FROM", "noreply@dociva.io" admin_emails[0]
if admin_emails
else current_app.config.get("SMTP_FROM", "noreply@dociva.io")
) )
try: try:
send_email( send_email(
@@ -89,16 +104,19 @@ def save_message(name: str, email: str, category: str, subject: str, message: st
def get_messages(page: int = 1, per_page: int = 20) -> dict: def get_messages(page: int = 1, per_page: int = 20) -> dict:
"""Retrieve paginated contact messages (admin use).""" """Retrieve paginated contact messages (admin use)."""
offset = (page - 1) * per_page offset = (page - 1) * per_page
conn = _connect()
try: with db_connection() as conn:
total = conn.execute("SELECT COUNT(*) FROM contact_messages").fetchone()[0] cursor = execute_query(conn, "SELECT COUNT(*) FROM contact_messages")
rows = conn.execute( total = cursor.fetchone()[0]
"SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT ? OFFSET ?",
(per_page, offset), sql = (
).fetchall() """SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT %s OFFSET %s"""
messages = [dict(r) for r in rows] if is_postgres()
finally: else """SELECT * FROM contact_messages ORDER BY created_at DESC LIMIT ? OFFSET ?"""
conn.close() )
cursor2 = execute_query(conn, sql, (per_page, offset))
rows = cursor2.fetchall()
messages = [row_to_dict(r) for r in rows]
return { return {
"messages": messages, "messages": messages,
@@ -110,13 +128,11 @@ def get_messages(page: int = 1, per_page: int = 20) -> dict:
def mark_read(message_id: int) -> bool: def mark_read(message_id: int) -> bool:
"""Mark a contact message as read.""" """Mark a contact message as read."""
conn = _connect() with db_connection() as conn:
try: sql = (
result = conn.execute( "UPDATE contact_messages SET is_read = TRUE WHERE id = %s"
"UPDATE contact_messages SET is_read = 1 WHERE id = ?", if is_postgres()
(message_id,), else "UPDATE contact_messages SET is_read = 1 WHERE id = ?"
) )
conn.commit() cursor = execute_query(conn, sql, (message_id,))
return result.rowcount > 0 return cursor.rowcount > 0
finally:
conn.close()

View File

@@ -1,24 +1,14 @@
""" """
QuotaService QuotaService
Manages usage quotas and limits for Free, Pro, and Business tiers Manages usage quotas and limits for Free, Pro, and Business tiers
Quota Structure:
- Free: 5 conversions/day, 10MB max file size, no batch processing
- Pro: 100 conversions/day, 100MB max file size, batch processing (5 files)
- Business: Unlimited conversions, 500MB max file size, batch processing (20 files)
Tracks:
- Daily usage (resets at UTC midnight)
- Storage usage
- API rate limits
- Feature access (premium features)
""" """
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
from flask import current_app from flask import current_app
from app.services.account_service import _connect, _utc_now from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
from app.services.account_service import _utc_now
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -26,49 +16,42 @@ logger = logging.getLogger(__name__)
class QuotaLimits: class QuotaLimits:
"""Define quota limits for each tier""" """Define quota limits for each tier"""
# Conversions per day
CONVERSIONS_PER_DAY = { CONVERSIONS_PER_DAY = {
"free": 5, "free": 5,
"pro": 100, "pro": 100,
"business": float("inf"), "business": float("inf"),
} }
# Maximum file size in MB
MAX_FILE_SIZE_MB = { MAX_FILE_SIZE_MB = {
"free": 10, "free": 10,
"pro": 100, "pro": 100,
"business": 500, "business": 500,
} }
# Storage limit in MB (monthly)
STORAGE_LIMIT_MB = { STORAGE_LIMIT_MB = {
"free": 500, "free": 500,
"pro": 5000, "pro": 5000,
"business": float("inf"), "business": float("inf"),
} }
# API rate limit (requests per minute)
API_RATE_LIMIT = { API_RATE_LIMIT = {
"free": 10, "free": 10,
"pro": 60, "pro": 60,
"business": float("inf"), "business": float("inf"),
} }
# Concurrent processing jobs
CONCURRENT_JOBS = { CONCURRENT_JOBS = {
"free": 1, "free": 1,
"pro": 3, "pro": 3,
"business": 10, "business": 10,
} }
# Batch file limit
BATCH_FILE_LIMIT = { BATCH_FILE_LIMIT = {
"free": 1, "free": 1,
"pro": 5, "pro": 5,
"business": 20, "business": 20,
} }
# Premium features (Pro/Business)
PREMIUM_FEATURES = { PREMIUM_FEATURES = {
"free": set(), "free": set(),
"pro": {"batch_processing", "priority_queue", "email_delivery", "api_access"}, "pro": {"batch_processing", "priority_queue", "email_delivery", "api_access"},
@@ -89,9 +72,54 @@ class QuotaService:
@staticmethod @staticmethod
def _ensure_quota_tables(): def _ensure_quota_tables():
"""Create quota tracking tables if they don't exist""" """Create quota tracking tables if they don't exist"""
conn = _connect() with db_connection() as conn:
try: if is_postgres():
# Daily usage tracking cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_usage (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
date TEXT NOT NULL,
conversions INTEGER DEFAULT 0,
files_processed INTEGER DEFAULT 0,
total_size_mb REAL DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, date),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS storage_usage (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
month TEXT NOT NULL,
total_size_mb REAL DEFAULT 0,
file_count INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, month),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS api_requests (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
endpoint TEXT NOT NULL,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS feature_access (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
feature TEXT NOT NULL,
accessed_at TEXT DEFAULT CURRENT_TIMESTAMP,
allowed BOOLEAN DEFAULT TRUE,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
else:
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS daily_usage ( CREATE TABLE IF NOT EXISTS daily_usage (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -105,8 +133,6 @@ class QuotaService:
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
) )
""") """)
# Storage usage tracking
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS storage_usage ( CREATE TABLE IF NOT EXISTS storage_usage (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -119,8 +145,6 @@ class QuotaService:
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
) )
""") """)
# API rate limiting
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS api_requests ( CREATE TABLE IF NOT EXISTS api_requests (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -130,8 +154,6 @@ class QuotaService:
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
) )
""") """)
# Feature access log
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS feature_access ( CREATE TABLE IF NOT EXISTS feature_access (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -143,13 +165,7 @@ class QuotaService:
) )
""") """)
conn.commit()
logger.info("Quota tables initialized") logger.info("Quota tables initialized")
except Exception as e:
logger.error(f"Failed to create quota tables: {e}")
raise
finally:
conn.close()
@staticmethod @staticmethod
def init_quota_db(): def init_quota_db():
@@ -159,36 +175,29 @@ class QuotaService:
@staticmethod @staticmethod
def get_user_plan(user_id: int) -> str: def get_user_plan(user_id: int) -> str:
"""Get user's current plan""" """Get user's current plan"""
conn = _connect() with db_connection() as conn:
try: sql = (
row = conn.execute( "SELECT plan FROM users WHERE id = %s"
"SELECT plan FROM users WHERE id = ?", (user_id,) if is_postgres()
).fetchone() else "SELECT plan FROM users WHERE id = ?"
)
cursor = execute_query(conn, sql, (user_id,))
row = row_to_dict(cursor.fetchone())
return row["plan"] if row else "free" return row["plan"] if row else "free"
finally:
conn.close()
@staticmethod @staticmethod
def get_daily_usage(user_id: int, date: Optional[str] = None) -> Dict: def get_daily_usage(user_id: int, date: Optional[str] = None) -> Dict:
"""
Get daily usage for a user
Args:
user_id: User ID
date: Date in YYYY-MM-DD format (defaults to today)
Returns:
Usage stats dict
"""
if not date: if not date:
date = datetime.utcnow().strftime("%Y-%m-%d") date = datetime.utcnow().strftime("%Y-%m-%d")
conn = _connect() with db_connection() as conn:
try: sql = (
row = conn.execute( "SELECT * FROM daily_usage WHERE user_id = %s AND date = %s"
"SELECT * FROM daily_usage WHERE user_id = ? AND date = ?", if is_postgres()
(user_id, date), else "SELECT * FROM daily_usage WHERE user_id = ? AND date = ?"
).fetchone() )
cursor = execute_query(conn, sql, (user_id, date))
row = row_to_dict(cursor.fetchone())
if not row: if not row:
return { return {
@@ -196,33 +205,21 @@ class QuotaService:
"files_processed": 0, "files_processed": 0,
"total_size_mb": 0, "total_size_mb": 0,
} }
return row
return dict(row)
finally:
conn.close()
@staticmethod @staticmethod
def record_conversion(user_id: int, file_size_mb: float) -> bool: def record_conversion(user_id: int, file_size_mb: float) -> bool:
"""
Record a file conversion
Args:
user_id: User ID
file_size_mb: File size in MB
Returns:
True if allowed, False if quota exceeded
"""
plan = QuotaService.get_user_plan(user_id) plan = QuotaService.get_user_plan(user_id)
today = datetime.utcnow().strftime("%Y-%m-%d") today = datetime.utcnow().strftime("%Y-%m-%d")
conn = _connect() with db_connection() as conn:
try: sql = (
# Check daily conversion limit "SELECT conversions FROM daily_usage WHERE user_id = %s AND date = %s"
usage = conn.execute( if is_postgres()
"SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?", else "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?"
(user_id, today), )
).fetchone() cursor = execute_query(conn, sql, (user_id, today))
usage = row_to_dict(cursor.fetchone())
current_conversions = usage["conversions"] if usage else 0 current_conversions = usage["conversions"] if usage else 0
limit = QuotaLimits.CONVERSIONS_PER_DAY[plan] limit = QuotaLimits.CONVERSIONS_PER_DAY[plan]
@@ -231,7 +228,6 @@ class QuotaService:
logger.warning(f"User {user_id} exceeded daily conversion limit") logger.warning(f"User {user_id} exceeded daily conversion limit")
return False return False
# Check file size limit
max_size = QuotaLimits.MAX_FILE_SIZE_MB[plan] max_size = QuotaLimits.MAX_FILE_SIZE_MB[plan]
if file_size_mb > max_size: if file_size_mb > max_size:
logger.warning( logger.warning(
@@ -239,120 +235,95 @@ class QuotaService:
) )
return False return False
# Record the conversion upsert_sql = (
conn.execute(
""" """
INSERT INTO daily_usage (user_id, date, conversions, files_processed, total_size_mb) INSERT INTO daily_usage (user_id, date, conversions, files_processed, total_size_mb)
VALUES (%s, %s, 1, 1, %s)
ON CONFLICT(user_id, date) DO UPDATE SET
conversions = conversions + 1,
files_processed = files_processed + 1,
total_size_mb = total_size_mb + %s
"""
if is_postgres()
else """
INSERT INTO daily_usage (user_id, date, conversions, files_processed, total_size_mb)
VALUES (?, ?, 1, 1, ?) VALUES (?, ?, 1, 1, ?)
ON CONFLICT(user_id, date) DO UPDATE SET ON CONFLICT(user_id, date) DO UPDATE SET
conversions = conversions + 1, conversions = conversions + 1,
files_processed = files_processed + 1, files_processed = files_processed + 1,
total_size_mb = total_size_mb + ? total_size_mb = total_size_mb + ?
""", """
(user_id, today, file_size_mb, file_size_mb), )
execute_query(
conn, upsert_sql, (user_id, today, file_size_mb, file_size_mb)
) )
conn.commit()
logger.info(f"Recorded conversion for user {user_id}: {file_size_mb}MB") logger.info(f"Recorded conversion for user {user_id}: {file_size_mb}MB")
return True return True
except Exception as e:
logger.error(f"Failed to record conversion: {e}")
return False
finally:
conn.close()
@staticmethod @staticmethod
def check_rate_limit(user_id: int) -> Tuple[bool, int]: def check_rate_limit(user_id: int) -> Tuple[bool, int]:
"""
Check if user has exceeded API rate limit
Args:
user_id: User ID
Returns:
(allowed, remaining_requests_in_window) tuple
"""
plan = QuotaService.get_user_plan(user_id) plan = QuotaService.get_user_plan(user_id)
limit = QuotaLimits.API_RATE_LIMIT[plan] limit = QuotaLimits.API_RATE_LIMIT[plan]
if limit == float("inf"): if limit == float("inf"):
return True, -1 # Unlimited return True, -1
# Check requests in last minute
one_minute_ago = (datetime.utcnow() - timedelta(minutes=1)).isoformat() one_minute_ago = (datetime.utcnow() - timedelta(minutes=1)).isoformat()
conn = _connect() with db_connection() as conn:
try: sql = (
count = conn.execute( "SELECT COUNT(*) as count FROM api_requests WHERE user_id = %s AND timestamp > %s"
"SELECT COUNT(*) as count FROM api_requests WHERE user_id = ? AND timestamp > ?", if is_postgres()
(user_id, one_minute_ago), else "SELECT COUNT(*) as count FROM api_requests WHERE user_id = ? AND timestamp > ?"
).fetchone()["count"] )
cursor = execute_query(conn, sql, (user_id, one_minute_ago))
row = row_to_dict(cursor.fetchone())
count = row["count"] if row else 0
if count >= limit: if count >= limit:
return False, 0 return False, 0
# Record this request sql2 = (
conn.execute( "INSERT INTO api_requests (user_id, endpoint) VALUES (%s, %s)"
"INSERT INTO api_requests (user_id, endpoint) VALUES (?, ?)", if is_postgres()
(user_id, "api"), else "INSERT INTO api_requests (user_id, endpoint) VALUES (?, ?)"
) )
conn.commit() execute_query(conn, sql2, (user_id, "api"))
return True, limit - count - 1 return True, limit - count - 1
finally:
conn.close()
@staticmethod @staticmethod
def has_feature(user_id: int, feature: str) -> bool: def has_feature(user_id: int, feature: str) -> bool:
"""
Check if user has access to a premium feature
Args:
user_id: User ID
feature: Feature name (e.g., 'batch_processing')
Returns:
True if user can access feature
"""
plan = QuotaService.get_user_plan(user_id) plan = QuotaService.get_user_plan(user_id)
allowed = feature in QuotaLimits.PREMIUM_FEATURES[plan] allowed = feature in QuotaLimits.PREMIUM_FEATURES[plan]
# Log feature access attempt
conn = _connect()
try: try:
conn.execute( with db_connection() as conn:
"INSERT INTO feature_access (user_id, feature, allowed) VALUES (?, ?, ?)", sql = (
(user_id, feature, allowed), "INSERT INTO feature_access (user_id, feature, allowed) VALUES (%s, %s, %s)"
if is_postgres()
else "INSERT INTO feature_access (user_id, feature, allowed) VALUES (?, ?, ?)"
) )
conn.commit() execute_query(conn, sql, (user_id, feature, allowed))
except Exception as e: except Exception as e:
logger.error(f"Failed to log feature access: {e}") logger.error(f"Failed to log feature access: {e}")
finally:
conn.close()
return allowed return allowed
@staticmethod @staticmethod
def get_quota_status(user_id: int) -> Dict: def get_quota_status(user_id: int) -> Dict:
"""
Get comprehensive quota status for a user
Args:
user_id: User ID
Returns:
Complete quota status dict
"""
plan = QuotaService.get_user_plan(user_id) plan = QuotaService.get_user_plan(user_id)
today = datetime.utcnow().strftime("%Y-%m-%d") today = datetime.utcnow().strftime("%Y-%m-%d")
conn = _connect() with db_connection() as conn:
try: sql = (
# Get daily usage "SELECT conversions FROM daily_usage WHERE user_id = %s AND date = %s"
daily = conn.execute( if is_postgres()
"SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?", else "SELECT conversions FROM daily_usage WHERE user_id = ? AND date = ?"
(user_id, today), )
).fetchone() cursor = execute_query(conn, sql, (user_id, today))
daily = row_to_dict(cursor.fetchone())
conversions_used = daily["conversions"] if daily else 0 conversions_used = daily["conversions"] if daily else 0
conversions_limit = QuotaLimits.CONVERSIONS_PER_DAY[plan] conversions_limit = QuotaLimits.CONVERSIONS_PER_DAY[plan]
@@ -383,63 +354,42 @@ class QuotaService:
user_id, "email_delivery" user_id, "email_delivery"
), ),
} }
finally:
conn.close()
@staticmethod @staticmethod
def get_monthly_storage_usage(user_id: int, year: int, month: int) -> float: def get_monthly_storage_usage(user_id: int, year: int, month: int) -> float:
"""Get storage usage for a specific month"""
month_key = f"{year}-{month:02d}" month_key = f"{year}-{month:02d}"
conn = _connect() with db_connection() as conn:
try: sql = (
row = conn.execute( "SELECT total_size_mb FROM storage_usage WHERE user_id = %s AND month = %s"
"SELECT total_size_mb FROM storage_usage WHERE user_id = ? AND month = ?", if is_postgres()
(user_id, month_key), else "SELECT total_size_mb FROM storage_usage WHERE user_id = ? AND month = ?"
).fetchone() )
cursor = execute_query(conn, sql, (user_id, month_key))
row = row_to_dict(cursor.fetchone())
return row["total_size_mb"] if row else 0 return row["total_size_mb"] if row else 0
finally:
conn.close()
@staticmethod @staticmethod
def upgrade_plan(user_id: int, new_plan: str) -> bool: def upgrade_plan(user_id: int, new_plan: str) -> bool:
"""
Upgrade user to a new plan
Args:
user_id: User ID
new_plan: New plan ('pro' or 'business')
Returns:
Success status
"""
if new_plan not in QuotaLimits.CONVERSIONS_PER_DAY: if new_plan not in QuotaLimits.CONVERSIONS_PER_DAY:
logger.error(f"Invalid plan: {new_plan}") logger.error(f"Invalid plan: {new_plan}")
return False return False
conn = _connect() with db_connection() as conn:
try: sql = (
conn.execute( "UPDATE users SET plan = %s, updated_at = %s WHERE id = %s"
"UPDATE users SET plan = ?, updated_at = ? WHERE id = ?", if is_postgres()
(new_plan, _utc_now(), user_id), else "UPDATE users SET plan = ?, updated_at = ? WHERE id = ?"
) )
conn.commit() execute_query(conn, sql, (new_plan, _utc_now(), user_id))
logger.info(f"User {user_id} upgraded to {new_plan}") logger.info(f"User {user_id} upgraded to {new_plan}")
return True return True
except Exception as e:
logger.error(f"Failed to upgrade user plan: {e}")
return False
finally:
conn.close()
@staticmethod @staticmethod
def downgrade_plan(user_id: int, new_plan: str = "free") -> bool: def downgrade_plan(user_id: int, new_plan: str = "free") -> bool:
"""Downgrade user to a lower plan"""
return QuotaService.upgrade_plan(user_id, new_plan) return QuotaService.upgrade_plan(user_id, new_plan)
# Convenience functions
def init_quota_db(): def init_quota_db():
return QuotaService.init_quota_db() return QuotaService.init_quota_db()

View File

@@ -1,32 +1,45 @@
"""Rating service — stores and aggregates user ratings per tool.""" """Rating service — stores and aggregates user ratings per tool.
Supports both SQLite (development) and PostgreSQL (production).
"""
import logging import logging
import os
import sqlite3
from datetime import datetime, timezone from datetime import datetime, timezone
from flask import current_app from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _connect() -> sqlite3.Connection:
"""Create a SQLite connection."""
db_path = current_app.config["DATABASE_PATH"]
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
return connection
def _utc_now() -> str: def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
def init_ratings_db(): def init_ratings_db():
"""Create ratings table if it does not exist.""" """Create ratings table if it does not exist."""
with _connect() as conn: with db_connection() as conn:
if is_postgres():
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tool_ratings (
id SERIAL PRIMARY KEY,
tool TEXT NOT NULL,
rating INTEGER NOT NULL CHECK(rating BETWEEN 1 AND 5),
feedback TEXT DEFAULT '',
tag TEXT DEFAULT '',
fingerprint TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_ratings_tool
ON tool_ratings(tool)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_ratings_fingerprint_tool
ON tool_ratings(fingerprint, tool)
""")
else:
conn.executescript( conn.executescript(
""" """
CREATE TABLE IF NOT EXISTS tool_ratings ( CREATE TABLE IF NOT EXISTS tool_ratings (
@@ -57,37 +70,53 @@ def submit_rating(
) -> None: ) -> None:
"""Store a rating. Limits one rating per fingerprint per tool per day.""" """Store a rating. Limits one rating per fingerprint per tool per day."""
now = _utc_now() now = _utc_now()
today = now[:10] # YYYY-MM-DD today = now[:10]
with _connect() as conn: with db_connection() as conn:
# Check for duplicate rating from same fingerprint today like_sql = "LIKE %s" if is_postgres() else "LIKE ?"
existing = conn.execute( sql = (
"""SELECT id FROM tool_ratings f"""SELECT id FROM tool_ratings
WHERE fingerprint = ? AND tool = ? AND created_at LIKE ? WHERE fingerprint = %s AND tool = %s AND created_at {like_sql}
LIMIT 1""", LIMIT 1"""
(fingerprint, tool, f"{today}%"), if is_postgres()
).fetchone() else f"""SELECT id FROM tool_ratings
WHERE fingerprint = ? AND tool = ? AND created_at {like_sql}
LIMIT 1"""
)
cursor = execute_query(conn, sql, (fingerprint, tool, f"{today}%"))
existing = cursor.fetchone()
if existing: if existing:
# Update existing rating instead of creating duplicate existing = row_to_dict(existing)
conn.execute( update_sql = (
"""UPDATE tool_ratings """UPDATE tool_ratings
SET rating = %s, feedback = %s, tag = %s, created_at = %s
WHERE id = %s"""
if is_postgres()
else """UPDATE tool_ratings
SET rating = ?, feedback = ?, tag = ?, created_at = ? SET rating = ?, feedback = ?, tag = ?, created_at = ?
WHERE id = ?""", WHERE id = ?"""
(rating, feedback, tag, now, existing["id"]), )
execute_query(
conn, update_sql, (rating, feedback, tag, now, existing["id"])
) )
else: else:
conn.execute( insert_sql = (
"""INSERT INTO tool_ratings (tool, rating, feedback, tag, fingerprint, created_at) """INSERT INTO tool_ratings (tool, rating, feedback, tag, fingerprint, created_at)
VALUES (?, ?, ?, ?, ?, ?)""", VALUES (%s, %s, %s, %s, %s, %s)"""
(tool, rating, feedback, tag, fingerprint, now), if is_postgres()
else """INSERT INTO tool_ratings (tool, rating, feedback, tag, fingerprint, created_at)
VALUES (?, ?, ?, ?, ?, ?)"""
)
execute_query(
conn, insert_sql, (tool, rating, feedback, tag, fingerprint, now)
) )
def get_tool_rating_summary(tool: str) -> dict: def get_tool_rating_summary(tool: str) -> dict:
"""Return aggregate rating data for one tool.""" """Return aggregate rating data for one tool."""
with _connect() as conn: with db_connection() as conn:
row = conn.execute( sql = (
"""SELECT """SELECT
COUNT(*) as count, COUNT(*) as count,
COALESCE(AVG(rating), 0) as average, COALESCE(AVG(rating), 0) as average,
@@ -96,9 +125,20 @@ def get_tool_rating_summary(tool: str) -> dict:
COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3, COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3,
COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2, COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2,
COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1 COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1
FROM tool_ratings WHERE tool = ?""", FROM tool_ratings WHERE tool = %s"""
(tool,), if is_postgres()
).fetchone() else """SELECT
COUNT(*) as count,
COALESCE(AVG(rating), 0) as average,
COALESCE(SUM(CASE WHEN rating = 5 THEN 1 ELSE 0 END), 0) as star5,
COALESCE(SUM(CASE WHEN rating = 4 THEN 1 ELSE 0 END), 0) as star4,
COALESCE(SUM(CASE WHEN rating = 3 THEN 1 ELSE 0 END), 0) as star3,
COALESCE(SUM(CASE WHEN rating = 2 THEN 1 ELSE 0 END), 0) as star2,
COALESCE(SUM(CASE WHEN rating = 1 THEN 1 ELSE 0 END), 0) as star1
FROM tool_ratings WHERE tool = ?"""
)
cursor = execute_query(conn, sql, (tool,))
row = row_to_dict(cursor.fetchone())
return { return {
"tool": tool, "tool": tool,
@@ -116,16 +156,16 @@ def get_tool_rating_summary(tool: str) -> dict:
def get_all_ratings_summary() -> list[dict]: def get_all_ratings_summary() -> list[dict]:
"""Return aggregated ratings for all tools that have at least one rating.""" """Return aggregated ratings for all tools that have at least one rating."""
with _connect() as conn: with db_connection() as conn:
rows = conn.execute( sql = """SELECT
"""SELECT
tool, tool,
COUNT(*) as count, COUNT(*) as count,
COALESCE(AVG(rating), 0) as average COALESCE(AVG(rating), 0) as average
FROM tool_ratings FROM tool_ratings
GROUP BY tool GROUP BY tool
ORDER BY count DESC""" ORDER BY count DESC"""
).fetchall() cursor = execute_query(conn, sql)
rows = [row_to_dict(r) for r in cursor.fetchall()]
return [ return [
{ {
@@ -139,15 +179,15 @@ def get_all_ratings_summary() -> list[dict]:
def get_global_rating_summary() -> dict: def get_global_rating_summary() -> dict:
"""Return aggregate rating stats across all rated tools.""" """Return aggregate rating stats across all rated tools."""
with _connect() as conn: with db_connection() as conn:
row = conn.execute( sql = """
"""
SELECT SELECT
COUNT(*) AS count, COUNT(*) AS count,
COALESCE(AVG(rating), 0) AS average COALESCE(AVG(rating), 0) AS average
FROM tool_ratings FROM tool_ratings
""" """
).fetchone() cursor = execute_query(conn, sql)
row = row_to_dict(cursor.fetchone())
return { return {
"rating_count": int(row["count"]) if row else 0, "rating_count": int(row["count"]) if row else 0,

View File

@@ -1,19 +1,25 @@
"""Site assistant service — page-aware AI help plus persistent conversation logging.""" """Site assistant service — page-aware AI help plus persistent conversation logging.
Supports both SQLite (development) and PostgreSQL (production).
"""
import json import json
import logging import logging
import os
import sqlite3
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
import requests import requests
from flask import current_app
from app.services.openrouter_config_service import ( from app.services.openrouter_config_service import (
extract_openrouter_text, extract_openrouter_text,
get_openrouter_settings, get_openrouter_settings,
) )
from app.services.ai_cost_service import AiBudgetExceededError, check_ai_budget, log_ai_usage from app.services.ai_cost_service import (
AiBudgetExceededError,
check_ai_budget,
log_ai_usage,
)
from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -21,38 +27,166 @@ MAX_HISTORY_MESSAGES = 8
MAX_MESSAGE_LENGTH = 4000 MAX_MESSAGE_LENGTH = 4000
TOOL_CATALOG = [ TOOL_CATALOG = [
{"slug": "pdf-to-word", "label": "PDF to Word", "summary": "convert PDF files into editable Word documents"}, {
{"slug": "word-to-pdf", "label": "Word to PDF", "summary": "turn DOC or DOCX files into PDF documents"}, "slug": "pdf-to-word",
{"slug": "compress-pdf", "label": "Compress PDF", "summary": "reduce PDF file size while preserving readability"}, "label": "PDF to Word",
{"slug": "merge-pdf", "label": "Merge PDF", "summary": "combine multiple PDF files into one document"}, "summary": "convert PDF files into editable Word documents",
{"slug": "split-pdf", "label": "Split PDF", "summary": "extract ranges or split one PDF into separate pages"}, },
{"slug": "rotate-pdf", "label": "Rotate PDF", "summary": "rotate PDF pages to the correct orientation"}, {
{"slug": "pdf-to-images", "label": "PDF to Images", "summary": "convert each PDF page into PNG or JPG images"}, "slug": "word-to-pdf",
{"slug": "images-to-pdf", "label": "Images to PDF", "summary": "combine multiple images into one PDF"}, "label": "Word to PDF",
{"slug": "watermark-pdf", "label": "Watermark PDF", "summary": "add text watermarks to PDF pages"}, "summary": "turn DOC or DOCX files into PDF documents",
{"slug": "remove-watermark-pdf", "label": "Remove Watermark", "summary": "remove supported text and image-overlay watermarks from PDFs"}, },
{"slug": "protect-pdf", "label": "Protect PDF", "summary": "add password protection to PDF files"}, {
{"slug": "unlock-pdf", "label": "Unlock PDF", "summary": "remove PDF password protection when the password is known"}, "slug": "compress-pdf",
{"slug": "page-numbers", "label": "Page Numbers", "summary": "add page numbers in different positions"}, "label": "Compress PDF",
{"slug": "pdf-editor", "label": "PDF Editor", "summary": "optimize and clean PDF copies"}, "summary": "reduce PDF file size while preserving readability",
{"slug": "pdf-flowchart", "label": "PDF Flowchart", "summary": "analyze PDF procedures and turn them into flowcharts"}, },
{"slug": "pdf-to-excel", "label": "PDF to Excel", "summary": "extract structured table data into spreadsheet files"}, {
{"slug": "html-to-pdf", "label": "HTML to PDF", "summary": "convert HTML documents into PDF"}, "slug": "merge-pdf",
{"slug": "reorder-pdf", "label": "Reorder PDF", "summary": "rearrange PDF pages using a full page order"}, "label": "Merge PDF",
{"slug": "extract-pages", "label": "Extract Pages", "summary": "create a PDF from selected pages"}, "summary": "combine multiple PDF files into one document",
{"slug": "chat-pdf", "label": "Chat with PDF", "summary": "ask questions about one uploaded PDF"}, },
{"slug": "summarize-pdf", "label": "Summarize PDF", "summary": "generate a concise summary of one PDF"}, {
{"slug": "translate-pdf", "label": "Translate PDF", "summary": "translate PDF content into another language"}, "slug": "split-pdf",
{"slug": "extract-tables", "label": "Extract Tables", "summary": "find tables in a PDF and export them"}, "label": "Split PDF",
{"slug": "image-converter", "label": "Image Converter", "summary": "convert images between common formats"}, "summary": "extract ranges or split one PDF into separate pages",
{"slug": "image-resize", "label": "Image Resize", "summary": "resize images to exact dimensions"}, },
{"slug": "compress-image", "label": "Compress Image", "summary": "reduce image file size"}, {
{"slug": "ocr", "label": "OCR", "summary": "extract text from image or scanned PDF content"}, "slug": "rotate-pdf",
{"slug": "remove-background", "label": "Remove Background", "summary": "remove image backgrounds automatically"}, "label": "Rotate PDF",
{"slug": "qr-code", "label": "QR Code", "summary": "generate QR codes from text or URLs"}, "summary": "rotate PDF pages to the correct orientation",
{"slug": "video-to-gif", "label": "Video to GIF", "summary": "convert short videos into GIF animations"}, },
{"slug": "word-counter", "label": "Word Counter", "summary": "count words, characters, and reading metrics"}, {
{"slug": "text-cleaner", "label": "Text Cleaner", "summary": "clean up text spacing and formatting"}, "slug": "pdf-to-images",
"label": "PDF to Images",
"summary": "convert each PDF page into PNG or JPG images",
},
{
"slug": "images-to-pdf",
"label": "Images to PDF",
"summary": "combine multiple images into one PDF",
},
{
"slug": "watermark-pdf",
"label": "Watermark PDF",
"summary": "add text watermarks to PDF pages",
},
{
"slug": "remove-watermark-pdf",
"label": "Remove Watermark",
"summary": "remove supported text and image-overlay watermarks from PDFs",
},
{
"slug": "protect-pdf",
"label": "Protect PDF",
"summary": "add password protection to PDF files",
},
{
"slug": "unlock-pdf",
"label": "Unlock PDF",
"summary": "remove PDF password protection when the password is known",
},
{
"slug": "page-numbers",
"label": "Page Numbers",
"summary": "add page numbers in different positions",
},
{
"slug": "pdf-editor",
"label": "PDF Editor",
"summary": "optimize and clean PDF copies",
},
{
"slug": "pdf-flowchart",
"label": "PDF Flowchart",
"summary": "analyze PDF procedures and turn them into flowcharts",
},
{
"slug": "pdf-to-excel",
"label": "PDF to Excel",
"summary": "extract structured table data into spreadsheet files",
},
{
"slug": "html-to-pdf",
"label": "HTML to PDF",
"summary": "convert HTML documents into PDF",
},
{
"slug": "reorder-pdf",
"label": "Reorder PDF",
"summary": "rearrange PDF pages using a full page order",
},
{
"slug": "extract-pages",
"label": "Extract Pages",
"summary": "create a PDF from selected pages",
},
{
"slug": "chat-pdf",
"label": "Chat with PDF",
"summary": "ask questions about one uploaded PDF",
},
{
"slug": "summarize-pdf",
"label": "Summarize PDF",
"summary": "generate a concise summary of one PDF",
},
{
"slug": "translate-pdf",
"label": "Translate PDF",
"summary": "translate PDF content into another language",
},
{
"slug": "extract-tables",
"label": "Extract Tables",
"summary": "find tables in a PDF and export them",
},
{
"slug": "image-converter",
"label": "Image Converter",
"summary": "convert images between common formats",
},
{
"slug": "image-resize",
"label": "Image Resize",
"summary": "resize images to exact dimensions",
},
{
"slug": "compress-image",
"label": "Compress Image",
"summary": "reduce image file size",
},
{
"slug": "ocr",
"label": "OCR",
"summary": "extract text from image or scanned PDF content",
},
{
"slug": "remove-background",
"label": "Remove Background",
"summary": "remove image backgrounds automatically",
},
{
"slug": "qr-code",
"label": "QR Code",
"summary": "generate QR codes from text or URLs",
},
{
"slug": "video-to-gif",
"label": "Video to GIF",
"summary": "convert short videos into GIF animations",
},
{
"slug": "word-counter",
"label": "Word Counter",
"summary": "count words, characters, and reading metrics",
},
{
"slug": "text-cleaner",
"label": "Text Cleaner",
"summary": "clean up text spacing and formatting",
},
] ]
SYSTEM_PROMPT = """You are the Dociva site assistant. SYSTEM_PROMPT = """You are the Dociva site assistant.
@@ -68,25 +202,55 @@ Rules:
""" """
def _connect() -> sqlite3.Connection:
db_path = current_app.config["DATABASE_PATH"]
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
return connection
def _utc_now() -> str: def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
def init_site_assistant_db() -> None: def init_site_assistant_db() -> None:
"""Create assistant conversation tables if they do not exist.""" """Create assistant conversation tables if they do not exist."""
with _connect() as conn: with db_connection() as conn:
if is_postgres():
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS assistant_conversations (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL UNIQUE,
user_id INTEGER,
fingerprint TEXT NOT NULL,
tool_slug TEXT DEFAULT '',
page_url TEXT DEFAULT '',
locale TEXT DEFAULT 'en',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS assistant_messages (
id SERIAL PRIMARY KEY,
conversation_id INTEGER NOT NULL,
role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')),
content TEXT NOT NULL,
tool_slug TEXT DEFAULT '',
page_url TEXT DEFAULT '',
locale TEXT DEFAULT 'en',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY (conversation_id) REFERENCES assistant_conversations(id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_assistant_conversations_user_id
ON assistant_conversations(user_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_assistant_messages_conversation_id
ON assistant_messages(conversation_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_assistant_messages_created_at
ON assistant_messages(created_at)
""")
else:
conn.executescript( conn.executescript(
""" """
CREATE TABLE IF NOT EXISTS assistant_conversations ( CREATE TABLE IF NOT EXISTS assistant_conversations (
@@ -249,8 +413,12 @@ def stream_site_assistant_chat(
check_ai_budget() check_ai_budget()
settings = get_openrouter_settings() settings = get_openrouter_settings()
if not settings.api_key: if not settings.api_key:
logger.error("OPENROUTER_API_KEY is not set — assistant AI unavailable.") logger.error(
raise RuntimeError("AI assistant is temporarily unavailable. Please try again later.") "OPENROUTER_API_KEY is not set — assistant AI unavailable."
)
raise RuntimeError(
"AI assistant is temporarily unavailable. Please try again later."
)
response_model = settings.model response_model = settings.model
messages = _build_ai_messages( messages = _build_ai_messages(
@@ -317,32 +485,60 @@ def _ensure_conversation(
locale: str, locale: str,
) -> int: ) -> int:
now = _utc_now() now = _utc_now()
with _connect() as conn: with db_connection() as conn:
row = conn.execute( sql = (
"SELECT id FROM assistant_conversations WHERE session_id = ?", "SELECT id FROM assistant_conversations WHERE session_id = %s"
(session_id,), if is_postgres()
).fetchone() else "SELECT id FROM assistant_conversations WHERE session_id = ?"
)
cursor = execute_query(conn, sql, (session_id,))
row = row_to_dict(cursor.fetchone())
if row is not None: if row is not None:
conn.execute( update_sql = (
""" """
UPDATE assistant_conversations UPDATE assistant_conversations
SET user_id = %s, fingerprint = %s, tool_slug = %s, page_url = %s, locale = %s, updated_at = %s
WHERE id = %s
"""
if is_postgres()
else """
UPDATE assistant_conversations
SET user_id = ?, fingerprint = ?, tool_slug = ?, page_url = ?, locale = ?, updated_at = ? SET user_id = ?, fingerprint = ?, tool_slug = ?, page_url = ?, locale = ?, updated_at = ?
WHERE id = ? WHERE id = ?
""", """
)
execute_query(
conn,
update_sql,
(user_id, fingerprint, tool_slug, page_url, locale, now, row["id"]), (user_id, fingerprint, tool_slug, page_url, locale, now, row["id"]),
) )
return int(row["id"]) return int(row["id"])
cursor = conn.execute( insert_sql = (
""" """
INSERT INTO assistant_conversations (
session_id, user_id, fingerprint, tool_slug, page_url, locale, created_at, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
"""
if is_postgres()
else """
INSERT INTO assistant_conversations ( INSERT INTO assistant_conversations (
session_id, user_id, fingerprint, tool_slug, page_url, locale, created_at, updated_at session_id, user_id, fingerprint, tool_slug, page_url, locale, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", """
)
cursor2 = execute_query(
conn,
insert_sql,
(session_id, user_id, fingerprint, tool_slug, page_url, locale, now, now), (session_id, user_id, fingerprint, tool_slug, page_url, locale, now, now),
) )
return int(cursor.lastrowid)
if is_postgres():
result = cursor2.fetchone()
return int(result["id"]) if result else 0
return int(cursor2.lastrowid)
def _record_message( def _record_message(
@@ -354,13 +550,23 @@ def _record_message(
locale: str, locale: str,
metadata: dict | None = None, metadata: dict | None = None,
) -> None: ) -> None:
with _connect() as conn: with db_connection() as conn:
conn.execute( sql = (
""" """
INSERT INTO assistant_messages (
conversation_id, role, content, tool_slug, page_url, locale, metadata_json, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
if is_postgres()
else """
INSERT INTO assistant_messages ( INSERT INTO assistant_messages (
conversation_id, role, content, tool_slug, page_url, locale, metadata_json, created_at conversation_id, role, content, tool_slug, page_url, locale, metadata_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", """
)
execute_query(
conn,
sql,
( (
conversation_id, conversation_id,
role, role,
@@ -441,7 +647,9 @@ def _request_ai_reply(
if not settings.api_key: if not settings.api_key:
logger.error("OPENROUTER_API_KEY is not set — assistant AI unavailable.") logger.error("OPENROUTER_API_KEY is not set — assistant AI unavailable.")
raise RuntimeError("AI assistant is temporarily unavailable. Please try again later.") raise RuntimeError(
"AI assistant is temporarily unavailable. Please try again later."
)
messages = _build_ai_messages( messages = _build_ai_messages(
message=message, message=message,

View File

@@ -1,7 +1,7 @@
"""Database abstraction — supports SQLite (dev) and PostgreSQL (production). """Database abstraction — supports SQLite (dev) and PostgreSQL (production).
Usage: Usage:
from app.utils.database import get_connection from app.utils.database import db_connection, adapt_query
The returned connection behaves like a sqlite3.Connection with row_factory set. The returned connection behaves like a sqlite3.Connection with row_factory set.
For PostgreSQL it wraps psycopg2 with RealDictCursor for dict-like rows. For PostgreSQL it wraps psycopg2 with RealDictCursor for dict-like rows.
@@ -10,8 +10,10 @@ Selection logic:
- If DATABASE_URL env var is set (starts with ``postgres``), use PostgreSQL. - If DATABASE_URL env var is set (starts with ``postgres``), use PostgreSQL.
- Otherwise fall back to SQLite via DATABASE_PATH config. - Otherwise fall back to SQLite via DATABASE_PATH config.
""" """
import logging import logging
import os import os
import re
import sqlite3 import sqlite3
from contextlib import contextmanager from contextlib import contextmanager
@@ -23,6 +25,8 @@ _pg_available = False
try: try:
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import psycopg2.errors
_pg_available = True _pg_available = True
except ImportError: except ImportError:
pass pass
@@ -35,7 +39,13 @@ def is_postgres() -> bool:
def _sqlite_connect() -> sqlite3.Connection: def _sqlite_connect() -> sqlite3.Connection:
db_path = current_app.config["DATABASE_PATH"] db_path = current_app.config.get("DATABASE_PATH")
if not db_path:
db_path = os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
"data",
"dociva.db",
)
db_dir = os.path.dirname(db_path) db_dir = os.path.dirname(db_path)
if db_dir: if db_dir:
os.makedirs(db_dir, exist_ok=True) os.makedirs(db_dir, exist_ok=True)
@@ -50,7 +60,10 @@ def _pg_connect():
if not _pg_available: if not _pg_available:
raise RuntimeError("psycopg2 is not installed — cannot use PostgreSQL.") raise RuntimeError("psycopg2 is not installed — cannot use PostgreSQL.")
db_url = os.getenv("DATABASE_URL", "") db_url = os.getenv("DATABASE_URL", "")
conn = psycopg2.connect(db_url, cursor_factory=psycopg2.extras.RealDictCursor) conn = psycopg2.connect(
db_url,
cursor_factory=psycopg2.extras.RealDictCursor,
)
conn.autocommit = False conn.autocommit = False
return conn return conn
@@ -76,16 +89,94 @@ def db_connection():
conn.close() conn.close()
def adapt_sql(sql: str) -> str: def adapt_query(sql: str, params: tuple = ()) -> tuple:
"""Adapt SQLite SQL to PostgreSQL if needed. """Adapt SQLite SQL and parameters to PostgreSQL if needed.
Converts: Converts:
- INTEGER PRIMARY KEY AUTOINCREMENT -> SERIAL PRIMARY KEY - INTEGER PRIMARY KEY AUTOINCREMENT -> SERIAL PRIMARY KEY
- ? placeholders -> %s placeholders - ? placeholders -> %s placeholders
- params tuple unchanged (psycopg2 accepts tuple with %s)
Returns (adapted_sql, adapted_params).
""" """
if not is_postgres(): if not is_postgres():
return sql return sql, params
sql = sql.replace("INTEGER PRIMARY KEY AUTOINCREMENT", "SERIAL PRIMARY KEY") sql = sql.replace("INTEGER PRIMARY KEY AUTOINCREMENT", "SERIAL PRIMARY KEY")
sql = sql.replace("?", "%s") sql = sql.replace("INTEGER PRIMARY KEY", "SERIAL PRIMARY KEY")
return sql sql = sql.replace("BOOLEAN DEFAULT 1", "BOOLEAN DEFAULT TRUE")
sql = sql.replace("BOOLEAN DEFAULT 0", "BOOLEAN DEFAULT FALSE")
sql = re.sub(r"\?", "%s", sql)
return sql, params
def execute_query(conn, sql: str, params: tuple = ()):
"""Execute a query, adapting SQL for the current database.
Returns the cursor.
"""
adapted_sql, adapted_params = adapt_query(sql, params)
if is_postgres():
cursor = conn.cursor()
cursor.execute(adapted_sql, adapted_params)
return cursor
return conn.execute(adapted_sql, adapted_params)
def get_last_insert_id(conn, cursor=None):
"""Get the last inserted row ID, compatible with both SQLite and PostgreSQL."""
if is_postgres():
if cursor is None:
raise ValueError("cursor is required for PostgreSQL to get last insert ID")
result = cursor.fetchone()
if result:
if isinstance(result, dict):
return result.get("id") or result.get("lastval")
return result[0]
return None
return cursor.lastrowid
def get_integrity_error():
"""Get the appropriate IntegrityError exception for the current database."""
if is_postgres():
if _pg_available:
return psycopg2.IntegrityError
raise RuntimeError("psycopg2 is not installed")
return sqlite3.IntegrityError
def get_row_value(row, key: str):
"""Get a value from a row by key, compatible with both SQLite Row and psycopg2 dict."""
if row is None:
return None
if isinstance(row, dict):
return row.get(key)
return row[key]
def row_to_dict(row):
"""Convert a database row to a plain dict."""
if row is None:
return None
if isinstance(row, dict):
return dict(row)
return dict(row)
def init_tables(conn):
"""Run initialization SQL for all tables, adapting for the current database."""
if is_postgres():
cursor = conn.cursor()
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'users')"
)
if cursor.fetchone()[0]:
return
else:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)
if cursor.fetchone():
return

View File

@@ -12,6 +12,24 @@ services:
timeout: 3s timeout: 3s
retries: 5 retries: 5
# --- PostgreSQL ---
postgres:
image: postgres:16-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_DB=dociva
- POSTGRES_USER=dociva
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-dociva_secret_password}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dociva -d dociva"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# --- Flask Backend --- # --- Flask Backend ---
backend: backend:
build: build:
@@ -26,6 +44,7 @@ services:
- REDIS_URL=redis://redis:6379/0 - REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1 - CELERY_RESULT_BACKEND=redis://redis:6379/1
- DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva
volumes: volumes:
- ./backend:/app - ./backend:/app
- upload_data:/tmp/uploads - upload_data:/tmp/uploads
@@ -33,6 +52,8 @@ services:
depends_on: depends_on:
redis: redis:
condition: service_healthy condition: service_healthy
postgres:
condition: service_healthy
restart: unless-stopped restart: unless-stopped
# --- Celery Worker --- # --- Celery Worker ---
@@ -52,6 +73,7 @@ services:
- REDIS_URL=redis://redis:6379/0 - REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1 - CELERY_RESULT_BACKEND=redis://redis:6379/1
- DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva
volumes: volumes:
- ./backend:/app - ./backend:/app
- upload_data:/tmp/uploads - upload_data:/tmp/uploads
@@ -59,6 +81,8 @@ services:
depends_on: depends_on:
redis: redis:
condition: service_healthy condition: service_healthy
postgres:
condition: service_healthy
healthcheck: healthcheck:
test: ["CMD", "celery", "-A", "celery_worker.celery", "inspect", "ping"] test: ["CMD", "celery", "-A", "celery_worker.celery", "inspect", "ping"]
interval: 30s interval: 30s
@@ -82,11 +106,14 @@ services:
- REDIS_URL=redis://redis:6379/0 - REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1 - CELERY_RESULT_BACKEND=redis://redis:6379/1
- DATABASE_URL=postgresql://dociva:${POSTGRES_PASSWORD:-dociva_secret_password}@postgres:5432/dociva
volumes: volumes:
- ./backend:/app - ./backend:/app
depends_on: depends_on:
redis: redis:
condition: service_healthy condition: service_healthy
postgres:
condition: service_healthy
restart: unless-stopped restart: unless-stopped
# --- React Frontend (Vite Dev) --- # --- React Frontend (Vite Dev) ---
@@ -141,5 +168,6 @@ services:
volumes: volumes:
redis_data: redis_data:
postgres_data:
upload_data: upload_data:
output_data: output_data: