- Migrate all service files from hardcoded SQLite to dual SQLite/PostgreSQL support - Add PostgreSQL service to docker-compose.yml - Create database abstraction layer (database.py) with execute_query, row_to_dict helpers - Update all 7 service files: account, rating, contact, ai_cost, quota, site_assistant, admin - Add new admin endpoint /database-stats for table size and row count visualization - Add database_type field to system health endpoint - Update .env.example with proper PostgreSQL connection string
867 lines
32 KiB
Python
867 lines
32 KiB
Python
"""Internal admin aggregation helpers for operational dashboards.
|
|
|
|
Supports both SQLite (development) and PostgreSQL (production).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from flask import current_app
|
|
|
|
from app.services.account_service import is_allowlisted_admin_email, normalize_role
|
|
from app.services.ai_cost_service import get_monthly_spend
|
|
from app.services.contact_service import mark_read
|
|
from app.services.rating_service import get_global_rating_summary
|
|
from app.utils.database import db_connection, execute_query, is_postgres, row_to_dict
|
|
|
|
|
|
def _parse_metadata(raw_value: str | None) -> dict:
|
|
if not raw_value:
|
|
return {}
|
|
try:
|
|
parsed = json.loads(raw_value)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
|
|
|
|
def get_admin_overview(limit_recent: int = 8, top_tools_limit: int = 6) -> dict:
|
|
cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
|
|
ai_cost_summary = get_monthly_spend()
|
|
|
|
with db_connection() as conn:
|
|
users_sql = """
|
|
SELECT
|
|
COUNT(*) AS total_users,
|
|
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_users,
|
|
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_users
|
|
FROM users
|
|
"""
|
|
cursor = execute_query(conn, users_sql)
|
|
users_row = row_to_dict(cursor.fetchone())
|
|
|
|
history_sql = (
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_files_processed,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS files_last_24h
|
|
FROM file_history
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
COUNT(*) AS total_files_processed,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed_files,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_files,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS files_last_24h
|
|
FROM file_history
|
|
"""
|
|
)
|
|
cursor2 = execute_query(conn, history_sql, (cutoff_24h,))
|
|
history_row = row_to_dict(cursor2.fetchone())
|
|
|
|
top_tools_sql = (
|
|
"""
|
|
SELECT
|
|
tool,
|
|
COUNT(*) AS total_runs,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_runs
|
|
FROM file_history
|
|
GROUP BY tool
|
|
ORDER BY total_runs DESC, tool ASC
|
|
LIMIT %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
tool,
|
|
COUNT(*) AS total_runs,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed_runs
|
|
FROM file_history
|
|
GROUP BY tool
|
|
ORDER BY total_runs DESC, tool ASC
|
|
LIMIT ?
|
|
"""
|
|
)
|
|
cursor3 = execute_query(conn, top_tools_sql, (top_tools_limit,))
|
|
top_tools_rows = [row_to_dict(r) for r in cursor3.fetchall()]
|
|
|
|
failure_sql = (
|
|
"""
|
|
SELECT
|
|
file_history.id,
|
|
file_history.user_id,
|
|
file_history.tool,
|
|
file_history.original_filename,
|
|
file_history.metadata_json,
|
|
file_history.created_at,
|
|
users.email
|
|
FROM file_history
|
|
LEFT JOIN users ON users.id = file_history.user_id
|
|
WHERE file_history.status = 'failed'
|
|
ORDER BY file_history.created_at DESC
|
|
LIMIT %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
file_history.id,
|
|
file_history.user_id,
|
|
file_history.tool,
|
|
file_history.original_filename,
|
|
file_history.metadata_json,
|
|
file_history.created_at,
|
|
users.email
|
|
FROM file_history
|
|
LEFT JOIN users ON users.id = file_history.user_id
|
|
WHERE file_history.status = 'failed'
|
|
ORDER BY file_history.created_at DESC
|
|
LIMIT ?
|
|
"""
|
|
)
|
|
cursor4 = execute_query(conn, failure_sql, (limit_recent,))
|
|
failure_rows = [row_to_dict(r) for r in cursor4.fetchall()]
|
|
|
|
recent_user_sql = (
|
|
"""
|
|
SELECT
|
|
users.id,
|
|
users.email,
|
|
users.plan,
|
|
users.created_at,
|
|
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id), 0) AS total_tasks,
|
|
COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys
|
|
FROM users
|
|
ORDER BY users.created_at DESC
|
|
LIMIT %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
users.id,
|
|
users.email,
|
|
users.plan,
|
|
users.created_at,
|
|
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id), 0) AS total_tasks,
|
|
COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys
|
|
FROM users
|
|
ORDER BY users.created_at DESC
|
|
LIMIT ?
|
|
"""
|
|
)
|
|
cursor5 = execute_query(conn, recent_user_sql, (limit_recent,))
|
|
recent_user_rows = [row_to_dict(r) for r in cursor5.fetchall()]
|
|
|
|
contact_sql = (
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_messages,
|
|
COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread_messages
|
|
FROM contact_messages
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
COUNT(*) AS total_messages,
|
|
COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread_messages
|
|
FROM contact_messages
|
|
"""
|
|
)
|
|
cursor6 = execute_query(conn, contact_sql)
|
|
contact_row = row_to_dict(cursor6.fetchone())
|
|
|
|
recent_contact_sql = (
|
|
"""
|
|
SELECT id, name, email, category, subject, message, created_at, is_read
|
|
FROM contact_messages
|
|
ORDER BY created_at DESC
|
|
LIMIT %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT id, name, email, category, subject, message, created_at, is_read
|
|
FROM contact_messages
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
"""
|
|
)
|
|
cursor7 = execute_query(conn, recent_contact_sql, (limit_recent,))
|
|
recent_contact_rows = [row_to_dict(r) for r in cursor7.fetchall()]
|
|
|
|
total_processed = int(history_row["total_files_processed"]) if history_row else 0
|
|
completed_files = int(history_row["completed_files"]) if history_row else 0
|
|
success_rate = (
|
|
round((completed_files / total_processed) * 100, 1) if total_processed else 0.0
|
|
)
|
|
|
|
return {
|
|
"users": {
|
|
"total": int(users_row["total_users"]) if users_row else 0,
|
|
"pro": int(users_row["pro_users"]) if users_row else 0,
|
|
"free": int(users_row["free_users"]) if users_row else 0,
|
|
},
|
|
"processing": {
|
|
"total_files_processed": total_processed,
|
|
"completed_files": completed_files,
|
|
"failed_files": int(history_row["failed_files"]) if history_row else 0,
|
|
"files_last_24h": int(history_row["files_last_24h"]) if history_row else 0,
|
|
"success_rate": success_rate,
|
|
},
|
|
"ratings": get_global_rating_summary(),
|
|
"ai_cost": {
|
|
"month": ai_cost_summary["period"],
|
|
"total_usd": ai_cost_summary["total_cost_usd"],
|
|
"budget_usd": ai_cost_summary["budget_usd"],
|
|
"percent_used": ai_cost_summary["budget_used_percent"],
|
|
},
|
|
"contacts": {
|
|
"total_messages": int(contact_row["total_messages"]) if contact_row else 0,
|
|
"unread_messages": int(contact_row["unread_messages"])
|
|
if contact_row
|
|
else 0,
|
|
"recent": [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"email": row["email"],
|
|
"category": row["category"],
|
|
"subject": row["subject"],
|
|
"message": row["message"],
|
|
"created_at": row["created_at"],
|
|
"is_read": bool(row["is_read"]),
|
|
}
|
|
for row in recent_contact_rows
|
|
],
|
|
},
|
|
"top_tools": [
|
|
{
|
|
"tool": row["tool"],
|
|
"total_runs": int(row["total_runs"]),
|
|
"failed_runs": int(row["failed_runs"]),
|
|
}
|
|
for row in top_tools_rows
|
|
],
|
|
"recent_failures": [
|
|
{
|
|
"id": row["id"],
|
|
"user_id": row["user_id"],
|
|
"email": row["email"],
|
|
"tool": row["tool"],
|
|
"original_filename": row["original_filename"],
|
|
"created_at": row["created_at"],
|
|
"metadata": _parse_metadata(row["metadata_json"]),
|
|
}
|
|
for row in failure_rows
|
|
],
|
|
"recent_users": [
|
|
{
|
|
"id": row["id"],
|
|
"email": row["email"],
|
|
"plan": row["plan"],
|
|
"created_at": row["created_at"],
|
|
"total_tasks": int(row["total_tasks"]),
|
|
"active_api_keys": int(row["active_api_keys"]),
|
|
}
|
|
for row in recent_user_rows
|
|
],
|
|
}
|
|
|
|
|
|
def list_admin_users(limit: int = 25, query: str = "") -> list[dict]:
|
|
normalized_query = query.strip().lower()
|
|
sql = """
|
|
SELECT
|
|
users.id,
|
|
users.email,
|
|
users.plan,
|
|
users.role,
|
|
users.created_at,
|
|
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id), 0) AS total_tasks,
|
|
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id AND file_history.status = 'completed'), 0) AS completed_tasks,
|
|
COALESCE((SELECT COUNT(*) FROM file_history WHERE file_history.user_id = users.id AND file_history.status = 'failed'), 0) AS failed_tasks,
|
|
COALESCE((SELECT COUNT(*) FROM api_keys WHERE api_keys.user_id = users.id AND api_keys.revoked_at IS NULL), 0) AS active_api_keys
|
|
FROM users
|
|
"""
|
|
params: list[object] = []
|
|
if normalized_query:
|
|
sql += (
|
|
" WHERE LOWER(users.email) LIKE %s"
|
|
if is_postgres()
|
|
else " WHERE LOWER(users.email) LIKE ?"
|
|
)
|
|
params.append(f"%{normalized_query}%")
|
|
sql += (
|
|
" ORDER BY users.created_at DESC LIMIT %s"
|
|
if is_postgres()
|
|
else " ORDER BY users.created_at DESC LIMIT ?"
|
|
)
|
|
params.append(limit)
|
|
|
|
with db_connection() as conn:
|
|
cursor = execute_query(conn, sql, tuple(params))
|
|
rows = cursor.fetchall()
|
|
rows = [row_to_dict(r) for r in rows]
|
|
|
|
return [
|
|
{
|
|
"id": row["id"],
|
|
"email": row["email"],
|
|
"plan": row["plan"],
|
|
"role": "admin"
|
|
if is_allowlisted_admin_email(row["email"])
|
|
else normalize_role(row["role"]),
|
|
"is_allowlisted_admin": is_allowlisted_admin_email(row["email"]),
|
|
"created_at": row["created_at"],
|
|
"total_tasks": int(row["total_tasks"]),
|
|
"completed_tasks": int(row["completed_tasks"]),
|
|
"failed_tasks": int(row["failed_tasks"]),
|
|
"active_api_keys": int(row["active_api_keys"]),
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
|
|
def list_admin_contacts(page: int = 1, per_page: int = 20) -> dict:
|
|
safe_page = max(1, page)
|
|
safe_per_page = max(1, min(per_page, 100))
|
|
offset = (safe_page - 1) * safe_per_page
|
|
|
|
with db_connection() as conn:
|
|
total_sql = (
|
|
"SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = FALSE THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages"
|
|
if is_postgres()
|
|
else "SELECT COUNT(*) AS total, COALESCE(SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END), 0) AS unread FROM contact_messages"
|
|
)
|
|
cursor = execute_query(conn, total_sql)
|
|
total_row = row_to_dict(cursor.fetchone())
|
|
|
|
rows_sql = (
|
|
"""
|
|
SELECT id, name, email, category, subject, message, created_at, is_read
|
|
FROM contact_messages
|
|
ORDER BY created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT id, name, email, category, subject, message, created_at, is_read
|
|
FROM contact_messages
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
)
|
|
cursor2 = execute_query(conn, rows_sql, (safe_per_page, offset))
|
|
rows = [row_to_dict(r) for r in cursor2.fetchall()]
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"email": row["email"],
|
|
"category": row["category"],
|
|
"subject": row["subject"],
|
|
"message": row["message"],
|
|
"created_at": row["created_at"],
|
|
"is_read": bool(row["is_read"]),
|
|
}
|
|
for row in rows
|
|
],
|
|
"page": safe_page,
|
|
"per_page": safe_per_page,
|
|
"total": int(total_row["total"]) if total_row else 0,
|
|
"unread": int(total_row["unread"]) if total_row else 0,
|
|
}
|
|
|
|
|
|
def mark_admin_contact_read(message_id: int) -> bool:
|
|
return mark_read(message_id)
|
|
|
|
|
|
def _ensure_plan_interest_table():
|
|
"""Create plan_interest_clicks table if it does not exist."""
|
|
with db_connection() as conn:
|
|
if is_postgres():
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS plan_interest_clicks (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id INTEGER,
|
|
plan TEXT NOT NULL,
|
|
billing TEXT NOT NULL DEFAULT 'monthly',
|
|
created_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)
|
|
""")
|
|
else:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS plan_interest_clicks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER,
|
|
plan TEXT NOT NULL,
|
|
billing TEXT NOT NULL DEFAULT 'monthly',
|
|
created_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_plan_interest_created ON plan_interest_clicks(created_at)"
|
|
)
|
|
|
|
|
|
def record_plan_interest_click(
|
|
user_id: int | None, plan: str, billing: str = "monthly"
|
|
) -> None:
|
|
"""Record a click on a pricing plan button."""
|
|
_ensure_plan_interest_table()
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db_connection() as conn:
|
|
sql = (
|
|
"INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (%s, %s, %s, %s)"
|
|
if is_postgres()
|
|
else "INSERT INTO plan_interest_clicks (user_id, plan, billing, created_at) VALUES (?, ?, ?, ?)"
|
|
)
|
|
execute_query(conn, sql, (user_id, plan, billing, now))
|
|
|
|
|
|
def get_plan_interest_summary() -> dict:
|
|
"""Return summary of paid plan button clicks."""
|
|
_ensure_plan_interest_table()
|
|
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
|
|
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
|
|
|
|
with db_connection() as conn:
|
|
total_sql = (
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_clicks,
|
|
COUNT(DISTINCT user_id) AS unique_users,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS clicks_last_30d
|
|
FROM plan_interest_clicks
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
COUNT(*) AS total_clicks,
|
|
COUNT(DISTINCT user_id) AS unique_users,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS clicks_last_30d
|
|
FROM plan_interest_clicks
|
|
"""
|
|
)
|
|
cursor = execute_query(conn, total_sql, (cutoff_7d, cutoff_30d))
|
|
total_row = row_to_dict(cursor.fetchone())
|
|
|
|
by_plan_sql = """
|
|
SELECT plan, billing, COUNT(*) AS clicks
|
|
FROM plan_interest_clicks
|
|
GROUP BY plan, billing
|
|
ORDER BY clicks DESC
|
|
"""
|
|
cursor2 = execute_query(conn, by_plan_sql)
|
|
by_plan_rows = [row_to_dict(r) for r in cursor2.fetchall()]
|
|
|
|
recent_sql = """
|
|
SELECT
|
|
plan_interest_clicks.id,
|
|
plan_interest_clicks.user_id,
|
|
plan_interest_clicks.plan,
|
|
plan_interest_clicks.billing,
|
|
plan_interest_clicks.created_at,
|
|
users.email
|
|
FROM plan_interest_clicks
|
|
LEFT JOIN users ON users.id = plan_interest_clicks.user_id
|
|
ORDER BY plan_interest_clicks.created_at DESC
|
|
LIMIT 20
|
|
"""
|
|
cursor3 = execute_query(conn, recent_sql)
|
|
recent_rows = [row_to_dict(r) for r in cursor3.fetchall()]
|
|
|
|
return {
|
|
"total_clicks": int(total_row["total_clicks"]) if total_row else 0,
|
|
"unique_users": int(total_row["unique_users"]) if total_row else 0,
|
|
"clicks_last_7d": int(total_row["clicks_last_7d"]) if total_row else 0,
|
|
"clicks_last_30d": int(total_row["clicks_last_30d"]) if total_row else 0,
|
|
"by_plan": [
|
|
{
|
|
"plan": row["plan"],
|
|
"billing": row["billing"],
|
|
"clicks": int(row["clicks"]),
|
|
}
|
|
for row in by_plan_rows
|
|
],
|
|
"recent": [
|
|
{
|
|
"id": row["id"],
|
|
"user_id": row["user_id"],
|
|
"email": row["email"],
|
|
"plan": row["plan"],
|
|
"billing": row["billing"],
|
|
"created_at": row["created_at"],
|
|
}
|
|
for row in recent_rows
|
|
],
|
|
}
|
|
|
|
|
|
def get_admin_ratings_detail(
|
|
page: int = 1, per_page: int = 20, tool_filter: str = ""
|
|
) -> dict:
|
|
"""Return detailed ratings list with feedback for the admin dashboard."""
|
|
safe_page = max(1, page)
|
|
safe_per_page = max(1, min(per_page, 100))
|
|
offset = (safe_page - 1) * safe_per_page
|
|
|
|
with db_connection() as conn:
|
|
count_sql = "SELECT COUNT(*) AS total FROM tool_ratings"
|
|
count_params: list[object] = []
|
|
if tool_filter:
|
|
count_sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?"
|
|
count_params.append(tool_filter)
|
|
|
|
cursor = execute_query(conn, count_sql, tuple(count_params))
|
|
total_row = row_to_dict(cursor.fetchone())
|
|
|
|
sql = """
|
|
SELECT id, tool, rating, feedback, tag, fingerprint, created_at
|
|
FROM tool_ratings
|
|
"""
|
|
params: list[object] = []
|
|
if tool_filter:
|
|
sql += " WHERE tool = %s" if is_postgres() else " WHERE tool = ?"
|
|
params.append(tool_filter)
|
|
sql += (
|
|
" ORDER BY created_at DESC LIMIT %s OFFSET %s"
|
|
if is_postgres()
|
|
else " ORDER BY created_at DESC LIMIT ? OFFSET ?"
|
|
)
|
|
params.extend([safe_per_page, offset])
|
|
|
|
cursor2 = execute_query(conn, sql, tuple(params))
|
|
rows = [row_to_dict(r) for r in cursor2.fetchall()]
|
|
|
|
summary_sql = """
|
|
SELECT
|
|
tool,
|
|
COUNT(*) AS count,
|
|
COALESCE(AVG(rating), 0) AS average,
|
|
COALESCE(SUM(CASE WHEN rating >= 4 THEN 1 ELSE 0 END), 0) AS positive,
|
|
COALESCE(SUM(CASE WHEN rating <= 2 THEN 1 ELSE 0 END), 0) AS negative
|
|
FROM tool_ratings
|
|
GROUP BY tool
|
|
ORDER BY count DESC
|
|
"""
|
|
cursor3 = execute_query(conn, summary_sql)
|
|
summary_rows = [row_to_dict(r) for r in cursor3.fetchall()]
|
|
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": row["id"],
|
|
"tool": row["tool"],
|
|
"rating": int(row["rating"]),
|
|
"feedback": row["feedback"] or "",
|
|
"tag": row["tag"] or "",
|
|
"created_at": row["created_at"],
|
|
}
|
|
for row in rows
|
|
],
|
|
"page": safe_page,
|
|
"per_page": safe_per_page,
|
|
"total": int(total_row["total"]) if total_row else 0,
|
|
"tool_summaries": [
|
|
{
|
|
"tool": row["tool"],
|
|
"count": int(row["count"]),
|
|
"average": round(row["average"], 1),
|
|
"positive": int(row["positive"]),
|
|
"negative": int(row["negative"]),
|
|
}
|
|
for row in summary_rows
|
|
],
|
|
}
|
|
|
|
|
|
def get_admin_tool_analytics() -> dict:
|
|
"""Return detailed per-tool usage analytics for the admin dashboard."""
|
|
cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
|
|
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
|
|
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
|
|
|
|
with db_connection() as conn:
|
|
tool_sql = (
|
|
"""
|
|
SELECT
|
|
tool,
|
|
COUNT(*) AS total_runs,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_24h,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS runs_30d,
|
|
COUNT(DISTINCT user_id) AS unique_users
|
|
FROM file_history
|
|
GROUP BY tool
|
|
ORDER BY total_runs DESC
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
tool,
|
|
COUNT(*) AS total_runs,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS runs_24h,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS runs_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS runs_30d,
|
|
COUNT(DISTINCT user_id) AS unique_users
|
|
FROM file_history
|
|
GROUP BY tool
|
|
ORDER BY total_runs DESC
|
|
"""
|
|
)
|
|
cursor = execute_query(conn, tool_sql, (cutoff_24h, cutoff_7d, cutoff_30d))
|
|
tool_rows = [row_to_dict(r) for r in cursor.fetchall()]
|
|
|
|
daily_sql = (
|
|
"""
|
|
SELECT
|
|
created_at::date AS day,
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
|
|
FROM file_history
|
|
WHERE created_at >= %s
|
|
GROUP BY created_at::date
|
|
ORDER BY day ASC
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
DATE(created_at) AS day,
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
|
|
FROM file_history
|
|
WHERE created_at >= ?
|
|
GROUP BY DATE(created_at)
|
|
ORDER BY day ASC
|
|
"""
|
|
)
|
|
cursor2 = execute_query(conn, daily_sql, (cutoff_30d,))
|
|
daily_rows = [row_to_dict(r) for r in cursor2.fetchall()]
|
|
|
|
error_sql = (
|
|
"""
|
|
SELECT
|
|
tool,
|
|
metadata_json,
|
|
COUNT(*) AS occurrences
|
|
FROM file_history
|
|
WHERE status = 'failed' AND created_at >= %s
|
|
GROUP BY tool, metadata_json
|
|
ORDER BY occurrences DESC
|
|
LIMIT 15
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
tool,
|
|
metadata_json,
|
|
COUNT(*) AS occurrences
|
|
FROM file_history
|
|
WHERE status = 'failed' AND created_at >= ?
|
|
GROUP BY tool, metadata_json
|
|
ORDER BY occurrences DESC
|
|
LIMIT 15
|
|
"""
|
|
)
|
|
cursor3 = execute_query(conn, error_sql, (cutoff_30d,))
|
|
error_rows = [row_to_dict(r) for r in cursor3.fetchall()]
|
|
|
|
return {
|
|
"tools": [
|
|
{
|
|
"tool": row["tool"],
|
|
"total_runs": int(row["total_runs"]),
|
|
"completed": int(row["completed"]),
|
|
"failed": int(row["failed"]),
|
|
"success_rate": round(
|
|
(int(row["completed"]) / int(row["total_runs"])) * 100, 1
|
|
)
|
|
if int(row["total_runs"]) > 0
|
|
else 0,
|
|
"runs_24h": int(row["runs_24h"]),
|
|
"runs_7d": int(row["runs_7d"]),
|
|
"runs_30d": int(row["runs_30d"]),
|
|
"unique_users": int(row["unique_users"]),
|
|
}
|
|
for row in tool_rows
|
|
],
|
|
"daily_usage": [
|
|
{
|
|
"day": str(row["day"]),
|
|
"total": int(row["total"]),
|
|
"completed": int(row["completed"]),
|
|
"failed": int(row["failed"]),
|
|
}
|
|
for row in daily_rows
|
|
],
|
|
"common_errors": [
|
|
{
|
|
"tool": row["tool"],
|
|
"error": _parse_metadata(row["metadata_json"]).get(
|
|
"error", "Unknown error"
|
|
),
|
|
"occurrences": int(row["occurrences"]),
|
|
}
|
|
for row in error_rows
|
|
],
|
|
}
|
|
|
|
|
|
def get_admin_user_registration_stats() -> dict:
|
|
"""Return user registration trends and breakdown."""
|
|
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
|
|
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
|
|
|
|
with db_connection() as conn:
|
|
totals_sql = (
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= %s THEN 1 ELSE 0 END), 0) AS last_30d,
|
|
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count,
|
|
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count
|
|
FROM users
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS last_7d,
|
|
COALESCE(SUM(CASE WHEN created_at >= ? THEN 1 ELSE 0 END), 0) AS last_30d,
|
|
COALESCE(SUM(CASE WHEN plan = 'pro' THEN 1 ELSE 0 END), 0) AS pro_count,
|
|
COALESCE(SUM(CASE WHEN plan = 'free' THEN 1 ELSE 0 END), 0) AS free_count
|
|
FROM users
|
|
"""
|
|
)
|
|
cursor = execute_query(conn, totals_sql, (cutoff_7d, cutoff_30d))
|
|
totals_row = row_to_dict(cursor.fetchone())
|
|
|
|
daily_sql = (
|
|
"""
|
|
SELECT created_at::date AS day, COUNT(*) AS registrations
|
|
FROM users
|
|
WHERE created_at >= %s
|
|
GROUP BY created_at::date
|
|
ORDER BY day ASC
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT DATE(created_at) AS day, COUNT(*) AS registrations
|
|
FROM users
|
|
WHERE created_at >= ?
|
|
GROUP BY DATE(created_at)
|
|
ORDER BY day ASC
|
|
"""
|
|
)
|
|
cursor2 = execute_query(conn, daily_sql, (cutoff_30d,))
|
|
daily_rows = [row_to_dict(r) for r in cursor2.fetchall()]
|
|
|
|
active_sql = """
|
|
SELECT
|
|
users.id,
|
|
users.email,
|
|
users.plan,
|
|
users.created_at,
|
|
COUNT(file_history.id) AS total_tasks
|
|
FROM users
|
|
JOIN file_history ON file_history.user_id = users.id
|
|
GROUP BY users.id
|
|
ORDER BY total_tasks DESC
|
|
LIMIT 10
|
|
"""
|
|
cursor3 = execute_query(conn, active_sql)
|
|
active_rows = [row_to_dict(r) for r in cursor3.fetchall()]
|
|
|
|
return {
|
|
"total_users": int(totals_row["total"]) if totals_row else 0,
|
|
"new_last_7d": int(totals_row["last_7d"]) if totals_row else 0,
|
|
"new_last_30d": int(totals_row["last_30d"]) if totals_row else 0,
|
|
"pro_users": int(totals_row["pro_count"]) if totals_row else 0,
|
|
"free_users": int(totals_row["free_count"]) if totals_row else 0,
|
|
"daily_registrations": [
|
|
{"day": str(row["day"]), "count": int(row["registrations"])}
|
|
for row in daily_rows
|
|
],
|
|
"most_active_users": [
|
|
{
|
|
"id": row["id"],
|
|
"email": row["email"],
|
|
"plan": row["plan"],
|
|
"created_at": row["created_at"],
|
|
"total_tasks": int(row["total_tasks"]),
|
|
}
|
|
for row in active_rows
|
|
],
|
|
}
|
|
|
|
|
|
def get_admin_system_health() -> dict:
|
|
"""Return system health indicators for the admin dashboard."""
|
|
from app.services.openrouter_config_service import get_openrouter_settings
|
|
|
|
ai_cost_summary = get_monthly_spend()
|
|
settings = get_openrouter_settings()
|
|
|
|
with db_connection() as conn:
|
|
cutoff_1h = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
|
|
error_sql = (
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
|
|
FROM file_history
|
|
WHERE created_at >= %s
|
|
"""
|
|
if is_postgres()
|
|
else """
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
|
|
FROM file_history
|
|
WHERE created_at >= ?
|
|
"""
|
|
)
|
|
cursor = execute_query(conn, error_sql, (cutoff_1h,))
|
|
error_row = row_to_dict(cursor.fetchone())
|
|
|
|
db_size_mb = 0
|
|
if not is_postgres():
|
|
db_path = current_app.config.get("DATABASE_PATH", "")
|
|
if db_path and os.path.exists(db_path):
|
|
db_size_mb = round(os.path.getsize(db_path) / (1024 * 1024), 2)
|
|
|
|
error_total = int(error_row["total"]) if error_row else 0
|
|
error_failed = int(error_row["failed"]) if error_row else 0
|
|
|
|
return {
|
|
"ai_configured": bool(settings.api_key),
|
|
"ai_model": settings.model,
|
|
"ai_budget_used_percent": ai_cost_summary["budget_used_percent"],
|
|
"error_rate_1h": round((error_failed / error_total) * 100, 1)
|
|
if error_total > 0
|
|
else 0,
|
|
"tasks_last_1h": error_total,
|
|
"failures_last_1h": error_failed,
|
|
"database_size_mb": db_size_mb,
|
|
"database_type": "postgresql" if is_postgres() else "sqlite",
|
|
}
|