387 lines
13 KiB
Python
387 lines
13 KiB
Python
"""PDF AI services — Chat, Summarize, Translate, Table Extract."""
|
|
import json
|
|
import logging
|
|
|
|
import requests
|
|
|
|
from app.services.openrouter_config_service import get_openrouter_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PdfAiError(Exception):
|
|
"""Custom exception for PDF AI service failures."""
|
|
def __init__(
|
|
self,
|
|
user_message: str,
|
|
error_code: str = "PDF_AI_ERROR",
|
|
detail: str | None = None,
|
|
):
|
|
super().__init__(user_message)
|
|
self.user_message = user_message
|
|
self.error_code = error_code
|
|
self.detail = detail
|
|
|
|
|
|
def _estimate_tokens(text: str) -> int:
|
|
"""Rough token estimate: ~4 chars per token for English."""
|
|
return max(1, len(text) // 4)
|
|
|
|
|
|
def _extract_text_from_pdf(input_path: str, max_pages: int = 50) -> str:
|
|
"""Extract text content from a PDF file."""
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
|
|
reader = PdfReader(input_path)
|
|
pages = reader.pages[:max_pages]
|
|
texts = []
|
|
for i, page in enumerate(pages):
|
|
text = page.extract_text() or ""
|
|
if text.strip():
|
|
texts.append(f"[Page {i + 1}]\n{text}")
|
|
return "\n\n".join(texts)
|
|
except Exception as e:
|
|
raise PdfAiError(
|
|
"Failed to extract text from PDF.",
|
|
error_code="PDF_TEXT_EXTRACTION_FAILED",
|
|
detail=str(e),
|
|
)
|
|
|
|
|
|
def _call_openrouter(
|
|
system_prompt: str,
|
|
user_message: str,
|
|
max_tokens: int = 1000,
|
|
tool_name: str = "pdf_ai",
|
|
) -> str:
|
|
"""Send a request to OpenRouter API and return the reply."""
|
|
# Budget guard
|
|
try:
|
|
from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError
|
|
check_ai_budget()
|
|
except AiBudgetExceededError:
|
|
raise PdfAiError(
|
|
"Monthly AI processing budget has been reached. Please try again next month.",
|
|
error_code="AI_BUDGET_EXCEEDED",
|
|
)
|
|
except Exception:
|
|
pass # Don't block if cost service unavailable
|
|
|
|
settings = get_openrouter_settings()
|
|
|
|
if not settings.api_key:
|
|
logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.")
|
|
raise PdfAiError(
|
|
"AI features are temporarily unavailable. Our team has been notified.",
|
|
error_code="OPENROUTER_MISSING_API_KEY",
|
|
)
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_message},
|
|
]
|
|
|
|
try:
|
|
response = requests.post(
|
|
settings.base_url,
|
|
headers={
|
|
"Authorization": f"Bearer {settings.api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"model": settings.model,
|
|
"messages": messages,
|
|
"max_tokens": max_tokens,
|
|
"temperature": 0.5,
|
|
},
|
|
timeout=60,
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
logger.error("OpenRouter API key is invalid or expired (401).")
|
|
raise PdfAiError(
|
|
"AI features are temporarily unavailable due to a configuration issue. Our team has been notified.",
|
|
error_code="OPENROUTER_UNAUTHORIZED",
|
|
)
|
|
|
|
if response.status_code == 402:
|
|
logger.error("OpenRouter account has insufficient credits (402).")
|
|
raise PdfAiError(
|
|
"AI processing credits have been exhausted. Please try again later.",
|
|
error_code="OPENROUTER_INSUFFICIENT_CREDITS",
|
|
)
|
|
|
|
if response.status_code == 429:
|
|
logger.warning("OpenRouter rate limit reached (429).")
|
|
raise PdfAiError(
|
|
"AI service is experiencing high demand. Please wait a moment and try again.",
|
|
error_code="OPENROUTER_RATE_LIMIT",
|
|
)
|
|
|
|
if response.status_code >= 500:
|
|
logger.error("OpenRouter server error (%s).", response.status_code)
|
|
raise PdfAiError(
|
|
"AI service provider is experiencing issues. Please try again shortly.",
|
|
error_code="OPENROUTER_SERVER_ERROR",
|
|
)
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Handle model-level errors returned inside a 200 response
|
|
if data.get("error"):
|
|
error_msg = data["error"].get("message", "") if isinstance(data["error"], dict) else str(data["error"])
|
|
logger.error("OpenRouter returned an error payload: %s", error_msg)
|
|
raise PdfAiError(
|
|
"AI service encountered an issue. Please try again.",
|
|
error_code="OPENROUTER_ERROR_PAYLOAD",
|
|
detail=error_msg,
|
|
)
|
|
|
|
reply = (
|
|
data.get("choices", [{}])[0]
|
|
.get("message", {})
|
|
.get("content", "")
|
|
.strip()
|
|
)
|
|
|
|
if not reply:
|
|
raise PdfAiError(
|
|
"AI returned an empty response. Please try again.",
|
|
error_code="OPENROUTER_EMPTY_RESPONSE",
|
|
)
|
|
|
|
# Log usage
|
|
try:
|
|
from app.services.ai_cost_service import log_ai_usage
|
|
usage = data.get("usage", {})
|
|
log_ai_usage(
|
|
tool=tool_name,
|
|
model=settings.model,
|
|
input_tokens=usage.get("prompt_tokens", _estimate_tokens(user_message)),
|
|
output_tokens=usage.get("completion_tokens", _estimate_tokens(reply)),
|
|
)
|
|
except Exception:
|
|
pass # Don't fail the request if logging fails
|
|
|
|
return reply
|
|
|
|
except PdfAiError:
|
|
raise
|
|
except requests.exceptions.Timeout:
|
|
raise PdfAiError(
|
|
"AI service timed out. Please try again.",
|
|
error_code="OPENROUTER_TIMEOUT",
|
|
)
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error("Cannot connect to OpenRouter API at %s", settings.base_url)
|
|
raise PdfAiError(
|
|
"AI service is unreachable. Please try again shortly.",
|
|
error_code="OPENROUTER_CONNECTION_ERROR",
|
|
)
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error("OpenRouter API error: %s", e)
|
|
raise PdfAiError(
|
|
"AI service is temporarily unavailable.",
|
|
error_code="OPENROUTER_REQUEST_ERROR",
|
|
detail=str(e),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Chat with PDF
|
|
# ---------------------------------------------------------------------------
|
|
def chat_with_pdf(input_path: str, question: str) -> dict:
|
|
"""
|
|
Answer a question about a PDF document.
|
|
|
|
Args:
|
|
input_path: Path to the PDF file
|
|
question: User's question about the document
|
|
|
|
Returns:
|
|
{"reply": "...", "pages_analyzed": int}
|
|
"""
|
|
if not question or not question.strip():
|
|
raise PdfAiError("Please provide a question.", error_code="PDF_AI_INVALID_INPUT")
|
|
|
|
text = _extract_text_from_pdf(input_path)
|
|
if not text.strip():
|
|
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
|
|
|
# Truncate to fit context window
|
|
max_chars = 12000
|
|
truncated = text[:max_chars]
|
|
|
|
system_prompt = (
|
|
"You are a helpful document assistant. The user has uploaded a PDF document. "
|
|
"Answer questions about the document based only on the content provided. "
|
|
"If the answer is not in the document, say so. "
|
|
"Reply in the same language the user uses."
|
|
)
|
|
|
|
user_msg = f"Document content:\n{truncated}\n\nQuestion: {question}"
|
|
reply = _call_openrouter(system_prompt, user_msg, max_tokens=800, tool_name="pdf_chat")
|
|
|
|
page_count = text.count("[Page ")
|
|
return {"reply": reply, "pages_analyzed": page_count}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Summarize PDF
|
|
# ---------------------------------------------------------------------------
|
|
def summarize_pdf(input_path: str, length: str = "medium") -> dict:
|
|
"""
|
|
Generate a summary of a PDF document.
|
|
|
|
Args:
|
|
input_path: Path to the PDF file
|
|
length: Summary length — "short", "medium", or "long"
|
|
|
|
Returns:
|
|
{"summary": "...", "pages_analyzed": int}
|
|
"""
|
|
text = _extract_text_from_pdf(input_path)
|
|
if not text.strip():
|
|
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
|
|
|
length_instruction = {
|
|
"short": "Provide a brief summary in 2-3 sentences.",
|
|
"medium": "Provide a summary in 1-2 paragraphs covering the main points.",
|
|
"long": "Provide a detailed summary covering all key points, arguments, and conclusions.",
|
|
}.get(length, "Provide a summary in 1-2 paragraphs covering the main points.")
|
|
|
|
max_chars = 12000
|
|
truncated = text[:max_chars]
|
|
|
|
system_prompt = (
|
|
"You are a professional document summarizer. "
|
|
"Summarize the document accurately and concisely. "
|
|
"Reply in the same language as the document."
|
|
)
|
|
|
|
user_msg = f"{length_instruction}\n\nDocument content:\n{truncated}"
|
|
summary = _call_openrouter(system_prompt, user_msg, max_tokens=1000, tool_name="pdf_summarize")
|
|
|
|
page_count = text.count("[Page ")
|
|
return {"summary": summary, "pages_analyzed": page_count}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Translate PDF
|
|
# ---------------------------------------------------------------------------
|
|
def translate_pdf(input_path: str, target_language: str) -> dict:
|
|
"""
|
|
Translate the text content of a PDF to another language.
|
|
|
|
Args:
|
|
input_path: Path to the PDF file
|
|
target_language: Target language name (e.g. "English", "Arabic", "French")
|
|
|
|
Returns:
|
|
{"translation": "...", "pages_analyzed": int, "target_language": str}
|
|
"""
|
|
if not target_language or not target_language.strip():
|
|
raise PdfAiError("Please specify a target language.", error_code="PDF_AI_INVALID_INPUT")
|
|
|
|
text = _extract_text_from_pdf(input_path)
|
|
if not text.strip():
|
|
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
|
|
|
max_chars = 10000
|
|
truncated = text[:max_chars]
|
|
|
|
system_prompt = (
|
|
f"You are a professional translator. Translate the following document "
|
|
f"content into {target_language}. Preserve the original formatting and "
|
|
f"structure as much as possible. Only output the translation, nothing else."
|
|
)
|
|
|
|
translation = _call_openrouter(system_prompt, truncated, max_tokens=2000, tool_name="pdf_translate")
|
|
|
|
page_count = text.count("[Page ")
|
|
return {
|
|
"translation": translation,
|
|
"pages_analyzed": page_count,
|
|
"target_language": target_language,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Extract Tables from PDF
|
|
# ---------------------------------------------------------------------------
|
|
def extract_tables(input_path: str) -> dict:
|
|
"""
|
|
Extract tables from a PDF and return them as structured data.
|
|
|
|
Args:
|
|
input_path: Path to the PDF file
|
|
|
|
Returns:
|
|
{"tables": [...], "tables_found": int}
|
|
"""
|
|
try:
|
|
import tabula # type: ignore[import-untyped]
|
|
from PyPDF2 import PdfReader
|
|
|
|
# Get total page count
|
|
reader = PdfReader(input_path)
|
|
total_pages = len(reader.pages)
|
|
|
|
result_tables = []
|
|
table_index = 0
|
|
|
|
for page_num in range(1, total_pages + 1):
|
|
page_tables = tabula.read_pdf(
|
|
input_path, pages=str(page_num), multiple_tables=True, silent=True
|
|
)
|
|
if not page_tables:
|
|
continue
|
|
for df in page_tables:
|
|
if df.empty:
|
|
continue
|
|
headers = [str(c) for c in df.columns]
|
|
rows = []
|
|
for _, row in df.iterrows():
|
|
cells = []
|
|
for col in df.columns:
|
|
val = row[col]
|
|
if isinstance(val, float) and str(val) == "nan":
|
|
cells.append("")
|
|
else:
|
|
cells.append(str(val))
|
|
rows.append(cells)
|
|
|
|
result_tables.append({
|
|
"page": page_num,
|
|
"table_index": table_index,
|
|
"headers": headers,
|
|
"rows": rows,
|
|
})
|
|
table_index += 1
|
|
|
|
if not result_tables:
|
|
raise PdfAiError(
|
|
"No tables found in the PDF. This tool works best with PDFs containing tabular data.",
|
|
error_code="PDF_TABLES_NOT_FOUND",
|
|
)
|
|
|
|
logger.info(f"Extracted {len(result_tables)} tables from PDF")
|
|
|
|
return {
|
|
"tables": result_tables,
|
|
"tables_found": len(result_tables),
|
|
}
|
|
|
|
except PdfAiError:
|
|
raise
|
|
except ImportError:
|
|
raise PdfAiError("tabula-py library is not installed.", error_code="TABULA_NOT_INSTALLED")
|
|
except Exception as e:
|
|
raise PdfAiError(
|
|
"Failed to extract tables.",
|
|
error_code="PDF_TABLE_EXTRACTION_FAILED",
|
|
detail=str(e),
|
|
)
|