feat: Add admin user management endpoints and project events timeline

- Add create user, delete user, update plan, update role endpoints
- Add project events timeline endpoint for chronological activity view
- Add admin@dociva.io to INTERNAL_ADMIN_EMAILS
- Fix datetime imports in admin routes
This commit is contained in:
Your Name
2026-03-31 23:59:35 +02:00
parent 890adf480a
commit c59db300d0

View File

@@ -1,9 +1,12 @@
"""Internal admin endpoints secured by authenticated admin sessions.""" """Internal admin endpoints secured by authenticated admin sessions."""
from datetime import datetime, timedelta, timezone
from flask import Blueprint, jsonify, request from flask import Blueprint, jsonify, request
from app.extensions import limiter from app.extensions import limiter
from app.services.account_service import ( from app.services.account_service import (
create_user,
get_user_by_id, get_user_by_id,
is_user_admin, is_user_admin,
set_user_role, set_user_role,
@@ -323,3 +326,218 @@ def admin_database_stats_route():
"table_count": len(tables), "table_count": len(tables),
} }
), 200 ), 200
@admin_bp.route("/users/create", methods=["POST"])
@limiter.limit("30/hour")
def admin_create_user_route():
"""Create a new user (admin only)."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
data = request.get_json(silent=True) or {}
email = str(data.get("email", "")).strip().lower()
password = str(data.get("password", ""))
plan = str(data.get("plan", "free")).strip().lower()
role = str(data.get("role", "user")).strip().lower()
if not email or not password:
return jsonify({"error": "Email and password are required."}), 400
if len(password) < 8:
return jsonify({"error": "Password must be at least 8 characters."}), 400
try:
user = create_user(email, password)
if plan == "pro":
update_user_plan(user["id"], "pro")
if role == "admin":
set_user_role(user["id"], "admin")
return jsonify({"message": "User created.", "user": user}), 201
except ValueError as e:
return jsonify({"error": str(e)}), 400
@admin_bp.route("/users/<int:user_id>", methods=["DELETE"])
@limiter.limit("30/hour")
def admin_delete_user_route(user_id):
"""Delete a user (admin only)."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
current_user_id = get_current_user_id()
if user_id == current_user_id:
return jsonify({"error": "Cannot delete your own account."}), 400
from app.utils.database import db_connection, execute_query, is_postgres
with db_connection() as conn:
sql = (
"DELETE FROM users WHERE id = %s"
if is_postgres()
else "DELETE FROM users WHERE id = ?"
)
cursor = execute_query(conn, sql, (user_id,))
if cursor.rowcount > 0:
return jsonify({"message": "User deleted."}), 200
return jsonify({"error": "User not found."}), 404
@admin_bp.route("/users/<int:user_id>/plan", methods=["PUT"])
@limiter.limit("60/hour")
def admin_update_user_plan_route(user_id):
"""Update a user's plan (admin only)."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
data = request.get_json(silent=True) or {}
plan = str(data.get("plan", "free")).strip().lower()
try:
user = update_user_plan(user_id, plan)
if user:
return jsonify({"message": "Plan updated.", "user": user}), 200
return jsonify({"error": "User not found."}), 404
except ValueError as e:
return jsonify({"error": str(e)}), 400
@admin_bp.route("/users/<int:user_id>/role", methods=["PUT"])
@limiter.limit("60/hour")
def admin_update_user_role_route(user_id):
"""Update a user's role (admin only)."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
data = request.get_json(silent=True) or {}
role = str(data.get("role", "user")).strip().lower()
try:
user = set_user_role(user_id, role)
if user:
return jsonify({"message": "Role updated.", "user": user}), 200
return jsonify({"error": "User not found."}), 404
except ValueError as e:
return jsonify({"error": str(e)}), 400
@admin_bp.route("/project-events", methods=["GET"])
@limiter.limit("60/hour")
def admin_project_events_route():
"""Return a chronological timeline of important project events."""
auth_error = _require_admin_session()
if auth_error:
return auth_error
from datetime import timedelta
from app.utils.database import (
db_connection,
execute_query,
is_postgres,
row_to_dict,
)
days = request.args.get("days", 30, type=int)
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
events = []
with db_connection() as conn:
user_sql = (
"""
SELECT created_at AS event_time, 'user_registered' AS event_type,
email AS detail, id AS entity_id
FROM users WHERE created_at >= %s
"""
if is_postgres()
else """
SELECT created_at AS event_time, 'user_registered' AS event_type,
email AS detail, id AS entity_id
FROM users WHERE created_at >= ?
"""
)
cursor = execute_query(conn, user_sql, (cutoff,))
for row in cursor.fetchall():
row = row_to_dict(row)
events.append(
{
"time": row["event_time"],
"type": "user_registered",
"detail": row["detail"],
"entity_id": row["entity_id"],
}
)
file_sql = (
"""
SELECT created_at AS event_time,
CASE WHEN status = 'completed' THEN 'file_processed' ELSE 'file_failed' END AS event_type,
COALESCE(original_filename, tool) AS detail,
id AS entity_id
FROM file_history WHERE created_at >= %s
"""
if is_postgres()
else """
SELECT created_at AS event_time,
CASE WHEN status = 'completed' THEN 'file_processed' ELSE 'file_failed' END AS event_type,
COALESCE(original_filename, tool) AS detail,
id AS entity_id
FROM file_history WHERE created_at >= ?
"""
)
cursor2 = execute_query(conn, file_sql, (cutoff,))
for row in cursor2.fetchall():
row = row_to_dict(row)
events.append(
{
"time": row["event_time"],
"type": row["event_type"],
"detail": row["detail"],
"entity_id": row["entity_id"],
}
)
contact_sql = (
"""
SELECT created_at AS event_time, 'contact_message' AS event_type,
subject AS detail, id AS entity_id
FROM contact_messages WHERE created_at >= %s
"""
if is_postgres()
else """
SELECT created_at AS event_time, 'contact_message' AS event_type,
subject AS detail, id AS entity_id
FROM contact_messages WHERE created_at >= ?
"""
)
cursor3 = execute_query(conn, contact_sql, (cutoff,))
for row in cursor3.fetchall():
row = row_to_dict(row)
events.append(
{
"time": row["event_time"],
"type": "contact_message",
"detail": row["detail"],
"entity_id": row["entity_id"],
}
)
events.sort(key=lambda e: e["time"], reverse=True)
summary = {}
for e in events:
t = e["type"]
summary[t] = summary.get(t, 0) + 1
return jsonify(
{
"events": events[:200],
"summary": summary,
"total_events": len(events),
"period_days": days,
}
), 200