Files
SaaS-PDF/backend/app/services/pdf_ai_service.py
2026-03-30 14:24:18 +02:00

787 lines
25 KiB
Python

"""PDF AI services — Chat, Summarize, Translate, Table Extract."""
import json
import logging
import os
import tempfile
import time
from dataclasses import dataclass
import requests
from app.services.openrouter_config_service import (
extract_openrouter_text,
get_openrouter_settings,
)
logger = logging.getLogger(__name__)
DEFAULT_DEEPL_API_URL = "https://api-free.deepl.com/v2/translate"
DEFAULT_DEEPL_TIMEOUT_SECONDS = 90
MAX_TRANSLATION_CHUNK_CHARS = 3500
TRANSLATION_RETRY_ATTEMPTS = 3
TRANSLATION_RETRY_DELAY_SECONDS = 2
LANGUAGE_LABELS = {
"auto": "Auto Detect",
"en": "English",
"ar": "Arabic",
"fr": "French",
"es": "Spanish",
"de": "German",
"zh": "Chinese",
"ja": "Japanese",
"ko": "Korean",
"pt": "Portuguese",
"ru": "Russian",
"tr": "Turkish",
"it": "Italian",
}
DEEPL_LANGUAGE_CODES = {
"ar": "AR",
"de": "DE",
"en": "EN",
"es": "ES",
"fr": "FR",
"it": "IT",
"ja": "JA",
"ko": "KO",
"pt": "PT-PT",
"ru": "RU",
"tr": "TR",
"zh": "ZH",
}
OCR_LANGUAGE_CODES = {
"ar": "ara",
"en": "eng",
"fr": "fra",
}
@dataclass(frozen=True)
class DeepLSettings:
api_key: str
base_url: str
timeout_seconds: int
def _normalize_language_code(value: str | None, default: str = "") -> str:
normalized = str(value or "").strip().lower()
return normalized or default
def _language_label(value: str | None) -> str:
normalized = _normalize_language_code(value)
return LANGUAGE_LABELS.get(normalized, normalized or "Unknown")
def _get_deepl_settings() -> DeepLSettings:
api_key = str(os.getenv("DEEPL_API_KEY", "")).strip()
base_url = (
str(os.getenv("DEEPL_API_URL", DEFAULT_DEEPL_API_URL)).strip()
or DEFAULT_DEEPL_API_URL
)
timeout_seconds = int(
os.getenv("DEEPL_TIMEOUT_SECONDS", DEFAULT_DEEPL_TIMEOUT_SECONDS)
)
return DeepLSettings(
api_key=api_key, base_url=base_url, timeout_seconds=timeout_seconds
)
class PdfAiError(Exception):
"""Custom exception for PDF AI service failures."""
def __init__(
self,
user_message: str,
error_code: str = "PDF_AI_ERROR",
detail: str | None = None,
):
super().__init__(user_message)
self.user_message = user_message
self.error_code = error_code
self.detail = detail
class RetryableTranslationError(PdfAiError):
"""Error wrapper used for provider failures that should be retried."""
def _translate_with_retry(action, provider_name: str) -> dict:
last_error: PdfAiError | None = None
for attempt in range(1, TRANSLATION_RETRY_ATTEMPTS + 1):
try:
return action()
except RetryableTranslationError as error:
last_error = error
logger.warning(
"%s translation attempt %s/%s failed with retryable error %s",
provider_name,
attempt,
TRANSLATION_RETRY_ATTEMPTS,
error.error_code,
)
if attempt == TRANSLATION_RETRY_ATTEMPTS:
break
time.sleep(TRANSLATION_RETRY_DELAY_SECONDS * attempt)
if last_error:
raise PdfAiError(
last_error.user_message,
error_code=last_error.error_code,
detail=last_error.detail,
)
raise PdfAiError(
"Translation provider failed unexpectedly.",
error_code="TRANSLATION_PROVIDER_FAILED",
)
def _estimate_tokens(text: str) -> int:
"""Rough token estimate: ~4 chars per token for English."""
return max(1, len(text) // 4)
def _extract_text_from_pdf(input_path: str, max_pages: int = 50) -> str:
"""Extract text content from a PDF file."""
try:
from PyPDF2 import PdfReader
reader = PdfReader(input_path)
if reader.is_encrypted and reader.decrypt("") == 0:
raise PdfAiError(
"This PDF is password-protected. Please unlock it first.",
error_code="PDF_ENCRYPTED",
)
pages = reader.pages[:max_pages]
texts = []
for i, page in enumerate(pages):
text = page.extract_text() or ""
if text.strip():
texts.append(f"[Page {i + 1}]\n{text}")
extracted = "\n\n".join(texts)
if extracted.strip():
return extracted
# Fall back to OCR for scanned/image-only PDFs instead of failing fast.
try:
from app.services.ocr_service import ocr_pdf
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as handle:
ocr_output_path = handle.name
try:
data = ocr_pdf(input_path, ocr_output_path, lang="eng")
ocr_text = str(data.get("text", "")).strip()
if ocr_text:
return ocr_text
finally:
if os.path.exists(ocr_output_path):
os.unlink(ocr_output_path)
except Exception as ocr_error:
logger.warning("OCR fallback for PDF text extraction failed: %s", ocr_error)
return ""
except PdfAiError:
raise
except Exception as e:
raise PdfAiError(
"Failed to extract text from PDF.",
error_code="PDF_TEXT_EXTRACTION_FAILED",
detail=str(e),
)
def _call_openrouter(
system_prompt: str,
user_message: str,
max_tokens: int = 1000,
tool_name: str = "pdf_ai",
) -> str:
"""Send a request to OpenRouter API and return the reply."""
# Budget guard
try:
from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError
check_ai_budget()
except ImportError:
pass
except Exception as error:
if error.__class__.__name__ == "AiBudgetExceededError":
raise PdfAiError(
"Monthly AI processing budget has been reached. Please try again next month.",
error_code="AI_BUDGET_EXCEEDED",
)
pass
settings = get_openrouter_settings()
if not settings.api_key:
logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.")
raise PdfAiError(
"AI features are temporarily unavailable. Our team has been notified.",
error_code="OPENROUTER_MISSING_API_KEY",
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
]
try:
response = requests.post(
settings.base_url,
headers={
"Authorization": f"Bearer {settings.api_key}",
"Content-Type": "application/json",
},
json={
"model": settings.model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.5,
},
timeout=60,
)
status_code = getattr(response, "status_code", 200)
if status_code == 401:
logger.error("OpenRouter API key is invalid or expired (401).")
raise PdfAiError(
"AI features are temporarily unavailable due to a configuration issue. Our team has been notified.",
error_code="OPENROUTER_UNAUTHORIZED",
)
if status_code == 402:
logger.error("OpenRouter account has insufficient credits (402).")
raise PdfAiError(
"AI processing credits have been exhausted. Please try again later.",
error_code="OPENROUTER_INSUFFICIENT_CREDITS",
)
if status_code == 429:
logger.warning("OpenRouter rate limit reached (429).")
raise RetryableTranslationError(
"AI service is experiencing high demand. Please wait a moment and try again.",
error_code="OPENROUTER_RATE_LIMIT",
)
if status_code >= 500:
logger.error("OpenRouter server error (%s).", status_code)
raise RetryableTranslationError(
"AI service provider is experiencing issues. Please try again shortly.",
error_code="OPENROUTER_SERVER_ERROR",
)
response.raise_for_status()
data = response.json()
# Handle model-level errors returned inside a 200 response
if data.get("error"):
error_msg = (
data["error"].get("message", "")
if isinstance(data["error"], dict)
else str(data["error"])
)
logger.error("OpenRouter returned an error payload: %s", error_msg)
raise PdfAiError(
"AI service encountered an issue. Please try again.",
error_code="OPENROUTER_ERROR_PAYLOAD",
detail=error_msg,
)
reply = extract_openrouter_text(data)
if not reply:
raise PdfAiError(
"AI returned an empty response. Please try again.",
error_code="OPENROUTER_EMPTY_RESPONSE",
)
# Log usage
try:
from app.services.ai_cost_service import log_ai_usage
usage = data.get("usage", {})
log_ai_usage(
tool=tool_name,
model=settings.model,
input_tokens=usage.get("prompt_tokens", _estimate_tokens(user_message)),
output_tokens=usage.get("completion_tokens", _estimate_tokens(reply)),
)
except Exception:
pass # Don't fail the request if logging fails
return reply
except PdfAiError:
raise
except requests.exceptions.Timeout:
raise RetryableTranslationError(
"AI service timed out. Please try again.",
error_code="OPENROUTER_TIMEOUT",
)
except requests.exceptions.ConnectionError:
logger.error("Cannot connect to OpenRouter API at %s", settings.base_url)
raise RetryableTranslationError(
"AI service is unreachable. Please try again shortly.",
error_code="OPENROUTER_CONNECTION_ERROR",
)
except requests.exceptions.RequestException as e:
logger.error("OpenRouter API error: %s", e)
raise PdfAiError(
"AI service is temporarily unavailable.",
error_code="OPENROUTER_REQUEST_ERROR",
detail=str(e),
)
def _split_translation_chunks(
text: str, max_chars: int = MAX_TRANSLATION_CHUNK_CHARS
) -> list[str]:
"""Split extracted PDF text into stable chunks while preserving page markers."""
chunks: list[str] = []
current: list[str] = []
current_length = 0
for block in text.split("\n\n"):
normalized = block.strip()
if not normalized:
continue
block_length = len(normalized) + 2
if current and current_length + block_length > max_chars:
chunks.append("\n\n".join(current))
current = [normalized]
current_length = block_length
continue
current.append(normalized)
current_length += block_length
if current:
chunks.append("\n\n".join(current))
return chunks or [text]
def _call_deepl_translate(
chunk: str, target_language: str, source_language: str | None = None
) -> dict:
"""Translate a chunk with DeepL when premium credentials are configured."""
settings = _get_deepl_settings()
if not settings.api_key:
raise PdfAiError(
"DeepL is not configured.",
error_code="DEEPL_NOT_CONFIGURED",
)
target_code = DEEPL_LANGUAGE_CODES.get(_normalize_language_code(target_language))
if not target_code:
raise PdfAiError(
f"Target language '{target_language}' is not supported by the premium translation provider.",
error_code="DEEPL_UNSUPPORTED_TARGET_LANGUAGE",
)
payload: dict[str, object] = {
"text": [chunk],
"target_lang": target_code,
"preserve_formatting": True,
"tag_handling": "xml",
"split_sentences": "nonewlines",
}
source_code = DEEPL_LANGUAGE_CODES.get(_normalize_language_code(source_language))
if source_code:
payload["source_lang"] = source_code
try:
response = requests.post(
settings.base_url,
headers={
"Authorization": f"DeepL-Auth-Key {settings.api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=settings.timeout_seconds,
)
except requests.exceptions.Timeout:
raise RetryableTranslationError(
"Premium translation service timed out. Retrying...",
error_code="DEEPL_TIMEOUT",
)
except requests.exceptions.ConnectionError:
raise RetryableTranslationError(
"Premium translation service is temporarily unreachable. Retrying...",
error_code="DEEPL_CONNECTION_ERROR",
)
except requests.exceptions.RequestException as error:
raise PdfAiError(
"Premium translation service is temporarily unavailable.",
error_code="DEEPL_REQUEST_ERROR",
detail=str(error),
)
if response.status_code == 429:
raise RetryableTranslationError(
"Premium translation service is busy. Retrying...",
error_code="DEEPL_RATE_LIMIT",
)
if response.status_code >= 500:
raise RetryableTranslationError(
"Premium translation service is experiencing issues. Retrying...",
error_code="DEEPL_SERVER_ERROR",
)
if response.status_code in {403, 456}:
raise PdfAiError(
"Premium translation provider credits or permissions need attention.",
error_code="DEEPL_CREDITS_OR_PERMISSIONS",
)
response.raise_for_status()
data = response.json()
translations = data.get("translations") or []
if not translations:
raise PdfAiError(
"Premium translation provider returned an empty response.",
error_code="DEEPL_EMPTY_RESPONSE",
)
first = translations[0]
translated_text = str(first.get("text", "")).strip()
if not translated_text:
raise PdfAiError(
"Premium translation provider returned an empty response.",
error_code="DEEPL_EMPTY_TEXT",
)
return {
"translation": translated_text,
"provider": "deepl",
"detected_source_language": str(first.get("detected_source_language", ""))
.strip()
.lower(),
}
def _call_openrouter_translate(
chunk: str, target_language: str, source_language: str | None = None
) -> dict:
source_hint = "auto-detect the source language"
if source_language and _normalize_language_code(source_language) != "auto":
source_hint = f"treat {_language_label(source_language)} as the source language"
system_prompt = (
"You are a professional document translator. "
f"Translate the provided PDF content into {_language_label(target_language)}. "
f"Please {source_hint}. Preserve headings, lists, tables, and page markers. "
"Return only the translated text."
)
translation = _call_openrouter(
system_prompt,
chunk,
max_tokens=2200,
tool_name="pdf_translate_fallback",
)
return {
"translation": translation,
"provider": "openrouter",
"detected_source_language": _normalize_language_code(
source_language, default=""
),
}
def _translate_document_text(
text: str, target_language: str, source_language: str | None = None
) -> dict:
chunks = _split_translation_chunks(text)
translations: list[str] = []
detected_source_language = _normalize_language_code(source_language)
if detected_source_language == "auto":
detected_source_language = ""
providers_used: list[str] = []
for chunk in chunks:
chunk_result: dict | None = None
deepl_settings = _get_deepl_settings()
if deepl_settings.api_key:
try:
chunk_result = _translate_with_retry(
lambda: _call_deepl_translate(
chunk, target_language, source_language
),
provider_name="DeepL",
)
except PdfAiError as deepl_error:
logger.warning(
"DeepL translation failed for chunk; falling back to OpenRouter. code=%s detail=%s",
deepl_error.error_code,
deepl_error.detail,
)
if chunk_result is None:
chunk_result = _translate_with_retry(
lambda: _call_openrouter_translate(
chunk, target_language, source_language
),
provider_name="OpenRouter",
)
translations.append(str(chunk_result["translation"]).strip())
providers_used.append(str(chunk_result["provider"]))
if not detected_source_language and chunk_result.get(
"detected_source_language"
):
detected_source_language = _normalize_language_code(
chunk_result["detected_source_language"]
)
return {
"translation": "\n\n".join(part for part in translations if part),
"provider": ", ".join(sorted(set(providers_used))),
"detected_source_language": detected_source_language,
"chunks_translated": len(translations),
}
# ---------------------------------------------------------------------------
# 1. Chat with PDF
# ---------------------------------------------------------------------------
def chat_with_pdf(input_path: str, question: str) -> dict:
"""
Answer a question about a PDF document.
Args:
input_path: Path to the PDF file
question: User's question about the document
Returns:
{"reply": "...", "pages_analyzed": int}
"""
if not question or not question.strip():
raise PdfAiError(
"Please provide a question.", error_code="PDF_AI_INVALID_INPUT"
)
text = _extract_text_from_pdf(input_path)
if not text.strip():
raise PdfAiError(
"Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY"
)
# Truncate to fit context window
max_chars = 12000
truncated = text[:max_chars]
system_prompt = (
"You are a helpful document assistant. The user has uploaded a PDF document. "
"Answer questions about the document based only on the content provided. "
"If the answer is not in the document, say so. "
"Reply in the same language the user uses."
)
user_msg = f"Document content:\n{truncated}\n\nQuestion: {question}"
reply = _call_openrouter(
system_prompt, user_msg, max_tokens=800, tool_name="pdf_chat"
)
page_count = text.count("[Page ")
return {"reply": reply, "pages_analyzed": page_count}
# ---------------------------------------------------------------------------
# 2. Summarize PDF
# ---------------------------------------------------------------------------
def summarize_pdf(input_path: str, length: str = "medium") -> dict:
"""
Generate a summary of a PDF document.
Args:
input_path: Path to the PDF file
length: Summary length — "short", "medium", or "long"
Returns:
{"summary": "...", "pages_analyzed": int}
"""
text = _extract_text_from_pdf(input_path)
if not text.strip():
raise PdfAiError(
"Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY"
)
length_instruction = {
"short": "Provide a brief summary in 2-3 sentences.",
"medium": "Provide a summary in 1-2 paragraphs covering the main points.",
"long": "Provide a detailed summary covering all key points, arguments, and conclusions.",
}.get(length, "Provide a summary in 1-2 paragraphs covering the main points.")
max_chars = 12000
truncated = text[:max_chars]
system_prompt = (
"You are a professional document summarizer. "
"Summarize the document accurately and concisely. "
"Reply in the same language as the document."
)
user_msg = f"{length_instruction}\n\nDocument content:\n{truncated}"
summary = _call_openrouter(
system_prompt, user_msg, max_tokens=1000, tool_name="pdf_summarize"
)
page_count = text.count("[Page ")
return {"summary": summary, "pages_analyzed": page_count}
# ---------------------------------------------------------------------------
# 3. Translate PDF
# ---------------------------------------------------------------------------
def translate_pdf(
input_path: str, target_language: str, source_language: str | None = None
) -> dict:
"""
Translate the text content of a PDF to another language.
Args:
input_path: Path to the PDF file
target_language: Target language name (e.g. "English", "Arabic", "French")
Returns:
{"translation": "...", "pages_analyzed": int, "target_language": str}
"""
normalized_target_language = _normalize_language_code(target_language)
normalized_source_language = _normalize_language_code(
source_language, default="auto"
)
if not normalized_target_language:
raise PdfAiError(
"Please specify a target language.", error_code="PDF_AI_INVALID_INPUT"
)
if (
normalized_target_language == normalized_source_language
and normalized_source_language != "auto"
):
raise PdfAiError(
"Please choose different source and target languages.",
error_code="PDF_AI_INVALID_INPUT",
)
text = _extract_text_from_pdf(input_path)
if not text.strip():
raise PdfAiError(
"Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY"
)
translated = _translate_document_text(
text,
target_language=normalized_target_language,
source_language=normalized_source_language,
)
page_count = text.count("[Page ")
return {
"translation": translated["translation"],
"pages_analyzed": page_count,
"target_language": normalized_target_language,
"source_language": normalized_source_language,
"detected_source_language": translated["detected_source_language"],
"provider": translated["provider"],
"chunks_translated": translated["chunks_translated"],
}
# ---------------------------------------------------------------------------
# 4. Extract Tables from PDF
# ---------------------------------------------------------------------------
def extract_tables(input_path: str) -> dict:
"""
Extract tables from a PDF and return them as structured data.
Args:
input_path: Path to the PDF file
Returns:
{"tables": [...], "tables_found": int}
"""
try:
import tabula # type: ignore[import-untyped]
from PyPDF2 import PdfReader
# Get total page count
reader = PdfReader(input_path)
total_pages = len(reader.pages)
result_tables = []
table_index = 0
for page_num in range(1, total_pages + 1):
page_tables = tabula.read_pdf(
input_path, pages=str(page_num), multiple_tables=True, silent=True
)
if not page_tables:
continue
for df in page_tables:
if df.empty:
continue
headers = [str(c) for c in df.columns]
rows = []
for _, row in df.iterrows():
cells = []
for col in df.columns:
val = row[col]
if isinstance(val, float) and str(val) == "nan":
cells.append("")
else:
cells.append(str(val))
rows.append(cells)
result_tables.append(
{
"page": page_num,
"table_index": table_index,
"headers": headers,
"rows": rows,
}
)
table_index += 1
if not result_tables:
raise PdfAiError(
"No tables found in the PDF. This tool works best with PDFs containing tabular data.",
error_code="PDF_TABLES_NOT_FOUND",
)
logger.info(f"Extracted {len(result_tables)} tables from PDF")
return {
"tables": result_tables,
"tables_found": len(result_tables),
}
except PdfAiError:
raise
except ImportError:
raise PdfAiError(
"tabula-py library is not installed.", error_code="TABULA_NOT_INSTALLED"
)
except Exception as e:
raise PdfAiError(
"Failed to extract tables.",
error_code="PDF_TABLE_EXTRACTION_FAILED",
detail=str(e),
)