Files
SaaS-PDF/backend/app/utils/database.py
Your Name 030418f6db feat: Add PostgreSQL support and enhance admin dashboard
- Migrate all service files from hardcoded SQLite to dual SQLite/PostgreSQL support
- Add PostgreSQL service to docker-compose.yml
- Create database abstraction layer (database.py) with execute_query, row_to_dict helpers
- Update all 7 service files: account, rating, contact, ai_cost, quota, site_assistant, admin
- Add new admin endpoint /database-stats for table size and row count visualization
- Add database_type field to system health endpoint
- Update .env.example with proper PostgreSQL connection string
2026-03-31 21:51:45 +02:00

183 lines
5.1 KiB
Python

"""Database abstraction — supports SQLite (dev) and PostgreSQL (production).
Usage:
from app.utils.database import db_connection, adapt_query
The returned connection behaves like a sqlite3.Connection with row_factory set.
For PostgreSQL it wraps psycopg2 with RealDictCursor for dict-like rows.
Selection logic:
- If DATABASE_URL env var is set (starts with ``postgres``), use PostgreSQL.
- Otherwise fall back to SQLite via DATABASE_PATH config.
"""
import logging
import os
import re
import sqlite3
from contextlib import contextmanager
from flask import current_app
logger = logging.getLogger(__name__)
_pg_available = False
try:
import psycopg2
import psycopg2.extras
import psycopg2.errors
_pg_available = True
except ImportError:
pass
def is_postgres() -> bool:
"""Return True when the app is configured to use PostgreSQL."""
db_url = os.getenv("DATABASE_URL", "")
return db_url.startswith("postgres")
def _sqlite_connect() -> sqlite3.Connection:
db_path = current_app.config.get("DATABASE_PATH")
if not db_path:
db_path = os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), "..")),
"data",
"dociva.db",
)
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _pg_connect():
"""Return a psycopg2 connection with RealDictCursor."""
if not _pg_available:
raise RuntimeError("psycopg2 is not installed — cannot use PostgreSQL.")
db_url = os.getenv("DATABASE_URL", "")
conn = psycopg2.connect(
db_url,
cursor_factory=psycopg2.extras.RealDictCursor,
)
conn.autocommit = False
return conn
def get_connection():
"""Get a database connection (SQLite or PostgreSQL based on config)."""
if is_postgres():
return _pg_connect()
return _sqlite_connect()
@contextmanager
def db_connection():
"""Context manager that yields a connection and handles commit/rollback."""
conn = get_connection()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def adapt_query(sql: str, params: tuple = ()) -> tuple:
"""Adapt SQLite SQL and parameters to PostgreSQL if needed.
Converts:
- INTEGER PRIMARY KEY AUTOINCREMENT -> SERIAL PRIMARY KEY
- ? placeholders -> %s placeholders
- params tuple unchanged (psycopg2 accepts tuple with %s)
Returns (adapted_sql, adapted_params).
"""
if not is_postgres():
return sql, params
sql = sql.replace("INTEGER PRIMARY KEY AUTOINCREMENT", "SERIAL PRIMARY KEY")
sql = sql.replace("INTEGER PRIMARY KEY", "SERIAL PRIMARY KEY")
sql = sql.replace("BOOLEAN DEFAULT 1", "BOOLEAN DEFAULT TRUE")
sql = sql.replace("BOOLEAN DEFAULT 0", "BOOLEAN DEFAULT FALSE")
sql = re.sub(r"\?", "%s", sql)
return sql, params
def execute_query(conn, sql: str, params: tuple = ()):
"""Execute a query, adapting SQL for the current database.
Returns the cursor.
"""
adapted_sql, adapted_params = adapt_query(sql, params)
if is_postgres():
cursor = conn.cursor()
cursor.execute(adapted_sql, adapted_params)
return cursor
return conn.execute(adapted_sql, adapted_params)
def get_last_insert_id(conn, cursor=None):
"""Get the last inserted row ID, compatible with both SQLite and PostgreSQL."""
if is_postgres():
if cursor is None:
raise ValueError("cursor is required for PostgreSQL to get last insert ID")
result = cursor.fetchone()
if result:
if isinstance(result, dict):
return result.get("id") or result.get("lastval")
return result[0]
return None
return cursor.lastrowid
def get_integrity_error():
"""Get the appropriate IntegrityError exception for the current database."""
if is_postgres():
if _pg_available:
return psycopg2.IntegrityError
raise RuntimeError("psycopg2 is not installed")
return sqlite3.IntegrityError
def get_row_value(row, key: str):
"""Get a value from a row by key, compatible with both SQLite Row and psycopg2 dict."""
if row is None:
return None
if isinstance(row, dict):
return row.get(key)
return row[key]
def row_to_dict(row):
"""Convert a database row to a plain dict."""
if row is None:
return None
if isinstance(row, dict):
return dict(row)
return dict(row)
def init_tables(conn):
"""Run initialization SQL for all tables, adapting for the current database."""
if is_postgres():
cursor = conn.cursor()
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'users')"
)
if cursor.fetchone()[0]:
return
else:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'"
)
if cursor.fetchone():
return