تم الانتهاء من آخر دفعة تحسينات على المشروع، وتشمل:

تحويل لوحة الإدارة الداخلية من secret header إلى session auth حقيقي مع صلاحيات admin.
إضافة دعم إدارة الأدوار من داخل لوحة الإدارة نفسها، مع حماية الحسابات المعتمدة عبر INTERNAL_ADMIN_EMAILS.
تحسين بيانات المستخدم في الواجهة والباكند لتشمل role وis_allowlisted_admin.
إضافة اختبار frontend مخصص لصفحة /internal/admin بدل الاعتماد فقط على build واختبار routes.
تحسين إضافي في الأداء عبر إزالة الاعتماد على pdfjs-dist/pdf.worker في عدّ صفحات PDF واستبداله بمسار أخف باستخدام pdf-lib.
تحسين تقسيم الـ chunks في build لتقليل أثر الحزم الكبيرة وفصل أجزاء مثل network, icons, pdf-core, وeditor.
التحقق الذي تم:

نجاح build للواجهة.
نجاح اختبار صفحة الإدارة الداخلية في frontend.
نجاح اختبارات auth/admin في backend.
نجاح full backend suite مسبقًا مع EXIT:0.
ولو تريد نسخة أقصر جدًا، استخدم هذه:

آخر التحديثات:
تم تحسين نظام الإدارة الداخلية ليعتمد على صلاحيات وجلسات حقيقية بدل secret header، مع إضافة إدارة أدوار من لوحة admin نفسها، وإضافة اختبارات frontend مخصصة للوحة، وتحسين أداء الواجهة عبر إزالة pdf.worker وتحسين تقسيم الـ chunks في build. جميع الاختبارات والتحققات الأساسية المطلوبة نجح
This commit is contained in:
Your Name
2026-03-16 13:50:45 +02:00
parent b5d97324a9
commit 957d37838c
85 changed files with 9915 additions and 119 deletions

View File

@@ -13,6 +13,7 @@ from werkzeug.security import check_password_hash, generate_password_hash
logger = logging.getLogger(__name__)
VALID_PLANS = {"free", "pro"}
VALID_ROLES = {"user", "admin"}
def _utc_now() -> str:
@@ -30,6 +31,38 @@ def normalize_plan(plan: str | None) -> str:
return "pro" if plan == "pro" else "free"
def normalize_role(role: str | None) -> str:
"""Normalize role values to the supported set."""
return "admin" if role == "admin" else "user"
def _get_allowlisted_admin_emails() -> set[str]:
configured = current_app.config.get("INTERNAL_ADMIN_EMAILS", ())
return {
str(email).strip().lower()
for email in configured
if str(email).strip()
}
def is_allowlisted_admin_email(email: str | None) -> bool:
"""Return whether an email is bootstrapped as admin from configuration."""
normalized_email = _normalize_email(email or "")
return normalized_email in _get_allowlisted_admin_emails()
def _resolve_row_role(row: sqlite3.Row | None) -> str:
if row is None:
return "user"
row_keys = row.keys()
stored_role = normalize_role(row["role"]) if "role" in row_keys else "user"
email = str(row["email"]).strip().lower() if "email" in row_keys else ""
if stored_role == "admin" or email in _get_allowlisted_admin_emails():
return "admin"
return "user"
def _connect() -> sqlite3.Connection:
"""Create a SQLite connection with row access by column name."""
db_path = current_app.config["DATABASE_PATH"]
@@ -58,6 +91,8 @@ def _serialize_user(row: sqlite3.Row | None) -> dict | None:
"id": row["id"],
"email": row["email"],
"plan": normalize_plan(row["plan"]),
"role": _resolve_row_role(row),
"is_allowlisted_admin": is_allowlisted_admin_email(row["email"]),
"created_at": row["created_at"],
}
@@ -94,6 +129,7 @@ def init_account_db():
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
plan TEXT NOT NULL DEFAULT 'free',
role TEXT NOT NULL DEFAULT 'user',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
@@ -159,6 +195,10 @@ def init_account_db():
conn.execute(
"ALTER TABLE users ADD COLUMN updated_at TEXT NOT NULL DEFAULT ''"
)
if not _column_exists(conn, "users", "role"):
conn.execute(
"ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'"
)
# Password reset tokens
conn.executescript(
@@ -194,19 +234,20 @@ def create_user(email: str, password: str) -> dict:
"""Create a new user and return the public record."""
email = _normalize_email(email)
now = _utc_now()
role = "admin" if email in _get_allowlisted_admin_emails() else "user"
try:
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO users (email, password_hash, plan, created_at, updated_at)
VALUES (?, ?, 'free', ?, ?)
INSERT INTO users (email, password_hash, plan, role, created_at, updated_at)
VALUES (?, ?, 'free', ?, ?, ?)
""",
(email, generate_password_hash(password), now, now),
(email, generate_password_hash(password), role, now, now),
)
user_id = cursor.lastrowid
row = conn.execute(
"SELECT id, email, plan, created_at FROM users WHERE id = ?",
"SELECT id, email, plan, role, created_at FROM users WHERE id = ?",
(user_id,),
).fetchone()
except sqlite3.IntegrityError as exc:
@@ -235,7 +276,44 @@ def get_user_by_id(user_id: int) -> dict | None:
"""Fetch a public user record by id."""
with _connect() as conn:
row = conn.execute(
"SELECT id, email, plan, created_at FROM users WHERE id = ?",
"SELECT id, email, plan, role, created_at FROM users WHERE id = ?",
(user_id,),
).fetchone()
return _serialize_user(row)
def is_user_admin(user_id: int | None) -> bool:
"""Return whether one user has internal admin access."""
if user_id is None:
return False
with _connect() as conn:
row = conn.execute(
"SELECT id, email, role FROM users WHERE id = ?",
(user_id,),
).fetchone()
return _resolve_row_role(row) == "admin"
def set_user_role(user_id: int, role: str) -> dict | None:
"""Update one user role and return the public user record."""
normalized_role = normalize_role(role)
if normalized_role not in VALID_ROLES:
raise ValueError("Invalid role.")
with _connect() as conn:
conn.execute(
"""
UPDATE users
SET role = ?, updated_at = ?
WHERE id = ?
""",
(normalized_role, _utc_now(), user_id),
)
row = conn.execute(
"SELECT id, email, plan, role, created_at FROM users WHERE id = ?",
(user_id,),
).fetchone()
@@ -258,7 +336,7 @@ def update_user_plan(user_id: int, plan: str) -> dict | None:
(normalized_plan, _utc_now(), user_id),
)
row = conn.execute(
"SELECT id, email, plan, created_at FROM users WHERE id = ?",
"SELECT id, email, plan, role, created_at FROM users WHERE id = ?",
(user_id,),
).fetchone()
@@ -476,6 +554,60 @@ def list_file_history(user_id: int, limit: int = 50) -> list[dict]:
]
def get_public_history_summary(limit_tools: int = 5) -> dict:
"""Return aggregate public-friendly processing stats derived from history."""
cutoff_24h = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
with _connect() as conn:
totals_row = conn.execute(
"""
SELECT
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed
FROM file_history
"""
).fetchone()
recent_row = conn.execute(
"""
SELECT COUNT(*) AS total
FROM file_history
WHERE created_at >= ?
""",
(cutoff_24h,),
).fetchone()
top_rows = conn.execute(
"""
SELECT tool, COUNT(*) AS count
FROM file_history
WHERE status = 'completed'
GROUP BY tool
ORDER BY count DESC, tool ASC
LIMIT ?
""",
(limit_tools,),
).fetchall()
total = int(totals_row["total"]) if totals_row else 0
completed = int(totals_row["completed"]) if totals_row else 0
failed = int(totals_row["failed"]) if totals_row else 0
success_rate = round((completed / total) * 100, 1) if total else 0.0
return {
"total_files_processed": total,
"completed_files": completed,
"failed_files": failed,
"success_rate": success_rate,
"files_last_24h": int(recent_row["total"]) if recent_row else 0,
"top_tools": [
{"tool": row["tool"], "count": int(row["count"])}
for row in top_rows
],
}
def record_usage_event(
user_id: int | None,
source: str,
@@ -555,7 +687,7 @@ def get_user_by_email(email: str) -> dict | None:
email = _normalize_email(email)
with _connect() as conn:
row = conn.execute(
"SELECT id, email, plan, created_at FROM users WHERE email = ?",
"SELECT id, email, plan, role, created_at FROM users WHERE email = ?",
(email,),
).fetchone()
return _serialize_user(row)