Unify task status error schema and classify PDF AI failures

This commit is contained in:
Ahmed Bakr Ali
2026-03-24 23:30:46 +02:00
parent a6e0cab0b3
commit b2a7678848
5 changed files with 232 additions and 33 deletions

View File

@@ -54,6 +54,57 @@ def _remember_download_alias(actor, download_task_id: str | None):
) )
def _normalized_error_payload(task_id: str, error_code: str, user_message: str, trace_id: str | None = None) -> dict:
"""Return unified error payload for task status responses."""
payload = {
"error_code": error_code,
"user_message": user_message,
"task_id": task_id,
}
if trace_id:
payload["trace_id"] = trace_id
return payload
def _infer_failure_error(task_id: str, info) -> dict:
"""Classify celery failure information into a normalized payload."""
if isinstance(info, dict) and info.get("error_code") and info.get("user_message"):
return _normalized_error_payload(
task_id=task_id,
error_code=info["error_code"],
user_message=info["user_message"],
trace_id=info.get("trace_id"),
)
message = str(info) if info else "Task failed."
lowered = message.lower()
if "notregistered" in lowered or "received unregistered task" in lowered:
return _normalized_error_payload(
task_id,
"CELERY_NOT_REGISTERED",
"Task worker is temporarily unavailable. Please retry in a moment.",
)
if "openrouter" in lowered and "401" in lowered:
return _normalized_error_payload(
task_id,
"OPENROUTER_UNAUTHORIZED",
"AI features are temporarily unavailable due to a configuration issue.",
)
if "openrouter" in lowered and "429" in lowered:
return _normalized_error_payload(
task_id,
"OPENROUTER_RATE_LIMIT",
"AI service is currently busy. Please try again shortly.",
)
return _normalized_error_payload(
task_id,
"TASK_FAILURE",
"Task processing failed. Please retry.",
)
@tasks_bp.route("/<task_id>/status", methods=["GET"]) @tasks_bp.route("/<task_id>/status", methods=["GET"])
@limiter.limit("300/minute", override_defaults=True) @limiter.limit("300/minute", override_defaults=True)
def get_task_status(task_id: str): def get_task_status(task_id: str):
@@ -91,8 +142,15 @@ def get_task_status(task_id: str):
task_result = result.result or {} task_result = result.result or {}
_remember_download_alias(actor, _extract_download_task_id(task_result.get("download_url"))) _remember_download_alias(actor, _extract_download_task_id(task_result.get("download_url")))
response["result"] = task_result response["result"] = task_result
if task_result.get("status") == "failed":
response["error"] = _normalized_error_payload(
task_id=task_id,
error_code=task_result.get("error_code", "TASK_FAILURE"),
user_message=task_result.get("user_message", task_result.get("error", "Task processing failed.")),
trace_id=task_result.get("trace_id"),
)
elif result.state == "FAILURE": elif result.state == "FAILURE":
response["error"] = str(result.info) if result.info else "Task failed." response["error"] = _infer_failure_error(task_id, result.info)
return jsonify(response) return jsonify(response)

View File

@@ -11,7 +11,16 @@ logger = logging.getLogger(__name__)
class PdfAiError(Exception): class PdfAiError(Exception):
"""Custom exception for PDF AI service failures.""" """Custom exception for PDF AI service failures."""
pass def __init__(
self,
user_message: str,
error_code: str = "PDF_AI_ERROR",
detail: str | None = None,
):
super().__init__(user_message)
self.user_message = user_message
self.error_code = error_code
self.detail = detail
def _estimate_tokens(text: str) -> int: def _estimate_tokens(text: str) -> int:
@@ -33,7 +42,11 @@ def _extract_text_from_pdf(input_path: str, max_pages: int = 50) -> str:
texts.append(f"[Page {i + 1}]\n{text}") texts.append(f"[Page {i + 1}]\n{text}")
return "\n\n".join(texts) return "\n\n".join(texts)
except Exception as e: except Exception as e:
raise PdfAiError(f"Failed to extract text from PDF: {str(e)}") raise PdfAiError(
"Failed to extract text from PDF.",
error_code="PDF_TEXT_EXTRACTION_FAILED",
detail=str(e),
)
def _call_openrouter( def _call_openrouter(
@@ -48,7 +61,10 @@ def _call_openrouter(
from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError
check_ai_budget() check_ai_budget()
except AiBudgetExceededError: except AiBudgetExceededError:
raise PdfAiError("Monthly AI processing budget has been reached. Please try again next month.") raise PdfAiError(
"Monthly AI processing budget has been reached. Please try again next month.",
error_code="AI_BUDGET_EXCEEDED",
)
except Exception: except Exception:
pass # Don't block if cost service unavailable pass # Don't block if cost service unavailable
@@ -57,7 +73,8 @@ def _call_openrouter(
if not settings.api_key: if not settings.api_key:
logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.") logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.")
raise PdfAiError( raise PdfAiError(
"AI features are temporarily unavailable. Our team has been notified." "AI features are temporarily unavailable. Our team has been notified.",
error_code="OPENROUTER_MISSING_API_KEY",
) )
messages = [ messages = [
@@ -84,25 +101,29 @@ def _call_openrouter(
if response.status_code == 401: if response.status_code == 401:
logger.error("OpenRouter API key is invalid or expired (401).") logger.error("OpenRouter API key is invalid or expired (401).")
raise PdfAiError( raise PdfAiError(
"AI features are temporarily unavailable due to a configuration issue. Our team has been notified." "AI features are temporarily unavailable due to a configuration issue. Our team has been notified.",
error_code="OPENROUTER_UNAUTHORIZED",
) )
if response.status_code == 402: if response.status_code == 402:
logger.error("OpenRouter account has insufficient credits (402).") logger.error("OpenRouter account has insufficient credits (402).")
raise PdfAiError( raise PdfAiError(
"AI processing credits have been exhausted. Please try again later." "AI processing credits have been exhausted. Please try again later.",
error_code="OPENROUTER_INSUFFICIENT_CREDITS",
) )
if response.status_code == 429: if response.status_code == 429:
logger.warning("OpenRouter rate limit reached (429).") logger.warning("OpenRouter rate limit reached (429).")
raise PdfAiError( raise PdfAiError(
"AI service is experiencing high demand. Please wait a moment and try again." "AI service is experiencing high demand. Please wait a moment and try again.",
error_code="OPENROUTER_RATE_LIMIT",
) )
if response.status_code >= 500: if response.status_code >= 500:
logger.error("OpenRouter server error (%s).", response.status_code) logger.error("OpenRouter server error (%s).", response.status_code)
raise PdfAiError( raise PdfAiError(
"AI service provider is experiencing issues. Please try again shortly." "AI service provider is experiencing issues. Please try again shortly.",
error_code="OPENROUTER_SERVER_ERROR",
) )
response.raise_for_status() response.raise_for_status()
@@ -112,7 +133,11 @@ def _call_openrouter(
if data.get("error"): if data.get("error"):
error_msg = data["error"].get("message", "") if isinstance(data["error"], dict) else str(data["error"]) error_msg = data["error"].get("message", "") if isinstance(data["error"], dict) else str(data["error"])
logger.error("OpenRouter returned an error payload: %s", error_msg) logger.error("OpenRouter returned an error payload: %s", error_msg)
raise PdfAiError("AI service encountered an issue. Please try again.") raise PdfAiError(
"AI service encountered an issue. Please try again.",
error_code="OPENROUTER_ERROR_PAYLOAD",
detail=error_msg,
)
reply = ( reply = (
data.get("choices", [{}])[0] data.get("choices", [{}])[0]
@@ -122,7 +147,10 @@ def _call_openrouter(
) )
if not reply: if not reply:
raise PdfAiError("AI returned an empty response. Please try again.") raise PdfAiError(
"AI returned an empty response. Please try again.",
error_code="OPENROUTER_EMPTY_RESPONSE",
)
# Log usage # Log usage
try: try:
@@ -142,13 +170,23 @@ def _call_openrouter(
except PdfAiError: except PdfAiError:
raise raise
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
raise PdfAiError("AI service timed out. Please try again.") raise PdfAiError(
"AI service timed out. Please try again.",
error_code="OPENROUTER_TIMEOUT",
)
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
logger.error("Cannot connect to OpenRouter API at %s", settings.base_url) logger.error("Cannot connect to OpenRouter API at %s", settings.base_url)
raise PdfAiError("AI service is unreachable. Please try again shortly.") raise PdfAiError(
"AI service is unreachable. Please try again shortly.",
error_code="OPENROUTER_CONNECTION_ERROR",
)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error("OpenRouter API error: %s", e) logger.error("OpenRouter API error: %s", e)
raise PdfAiError("AI service is temporarily unavailable.") raise PdfAiError(
"AI service is temporarily unavailable.",
error_code="OPENROUTER_REQUEST_ERROR",
detail=str(e),
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -166,11 +204,11 @@ def chat_with_pdf(input_path: str, question: str) -> dict:
{"reply": "...", "pages_analyzed": int} {"reply": "...", "pages_analyzed": int}
""" """
if not question or not question.strip(): if not question or not question.strip():
raise PdfAiError("Please provide a question.") raise PdfAiError("Please provide a question.", error_code="PDF_AI_INVALID_INPUT")
text = _extract_text_from_pdf(input_path) text = _extract_text_from_pdf(input_path)
if not text.strip(): if not text.strip():
raise PdfAiError("Could not extract any text from the PDF.") raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
# Truncate to fit context window # Truncate to fit context window
max_chars = 12000 max_chars = 12000
@@ -206,7 +244,7 @@ def summarize_pdf(input_path: str, length: str = "medium") -> dict:
""" """
text = _extract_text_from_pdf(input_path) text = _extract_text_from_pdf(input_path)
if not text.strip(): if not text.strip():
raise PdfAiError("Could not extract any text from the PDF.") raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
length_instruction = { length_instruction = {
"short": "Provide a brief summary in 2-3 sentences.", "short": "Provide a brief summary in 2-3 sentences.",
@@ -245,11 +283,11 @@ def translate_pdf(input_path: str, target_language: str) -> dict:
{"translation": "...", "pages_analyzed": int, "target_language": str} {"translation": "...", "pages_analyzed": int, "target_language": str}
""" """
if not target_language or not target_language.strip(): if not target_language or not target_language.strip():
raise PdfAiError("Please specify a target language.") raise PdfAiError("Please specify a target language.", error_code="PDF_AI_INVALID_INPUT")
text = _extract_text_from_pdf(input_path) text = _extract_text_from_pdf(input_path)
if not text.strip(): if not text.strip():
raise PdfAiError("Could not extract any text from the PDF.") raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
max_chars = 10000 max_chars = 10000
truncated = text[:max_chars] truncated = text[:max_chars]
@@ -325,7 +363,8 @@ def extract_tables(input_path: str) -> dict:
if not result_tables: if not result_tables:
raise PdfAiError( raise PdfAiError(
"No tables found in the PDF. This tool works best with PDFs containing tabular data." "No tables found in the PDF. This tool works best with PDFs containing tabular data.",
error_code="PDF_TABLES_NOT_FOUND",
) )
logger.info(f"Extracted {len(result_tables)} tables from PDF") logger.info(f"Extracted {len(result_tables)} tables from PDF")
@@ -338,6 +377,10 @@ def extract_tables(input_path: str) -> dict:
except PdfAiError: except PdfAiError:
raise raise
except ImportError: except ImportError:
raise PdfAiError("tabula-py library is not installed.") raise PdfAiError("tabula-py library is not installed.", error_code="TABULA_NOT_INSTALLED")
except Exception as e: except Exception as e:
raise PdfAiError(f"Failed to extract tables: {str(e)}") raise PdfAiError(
"Failed to extract tables.",
error_code="PDF_TABLE_EXTRACTION_FAILED",
detail=str(e),
)

View File

@@ -1,6 +1,7 @@
"""Celery tasks for PDF AI tools — Chat, Summarize, Translate, Table Extract.""" """Celery tasks for PDF AI tools — Chat, Summarize, Translate, Table Extract."""
import os import os
import logging import logging
import json
from flask import current_app from flask import current_app
@@ -22,6 +23,35 @@ def _cleanup(task_id: str):
cleanup_task_files(task_id, keep_outputs=False) cleanup_task_files(task_id, keep_outputs=False)
def _build_pdf_ai_error_payload(task_id: str, error: PdfAiError, tool: str) -> dict:
"""Build a normalized error payload for AI tasks and emit structured logs."""
payload = {
"status": "failed",
"error_code": getattr(error, "error_code", "PDF_AI_ERROR"),
"user_message": getattr(error, "user_message", str(error)) or "AI processing failed.",
"task_id": task_id,
}
detail = getattr(error, "detail", None)
if detail:
payload["detail"] = detail
logger.error(
json.dumps(
{
"event": "pdf_ai_task_failed",
"tool": tool,
"task_id": task_id,
"error_code": payload["error_code"],
"user_message": payload["user_message"],
"detail": detail,
},
ensure_ascii=False,
)
)
return payload
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Chat with PDF # Chat with PDF
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -59,8 +89,7 @@ def chat_with_pdf_task(
return result return result
except PdfAiError as e: except PdfAiError as e:
logger.error(f"Task {task_id}: {e}") result = _build_pdf_ai_error_payload(task_id, e, "chat-pdf")
result = {"status": "failed", "error": str(e)}
finalize_task_tracking( finalize_task_tracking(
user_id=user_id, tool="chat-pdf", user_id=user_id, tool="chat-pdf",
original_filename=original_filename, result=result, original_filename=original_filename, result=result,
@@ -120,8 +149,7 @@ def summarize_pdf_task(
return result return result
except PdfAiError as e: except PdfAiError as e:
logger.error(f"Task {task_id}: {e}") result = _build_pdf_ai_error_payload(task_id, e, "summarize-pdf")
result = {"status": "failed", "error": str(e)}
finalize_task_tracking( finalize_task_tracking(
user_id=user_id, tool="summarize-pdf", user_id=user_id, tool="summarize-pdf",
original_filename=original_filename, result=result, original_filename=original_filename, result=result,
@@ -182,8 +210,7 @@ def translate_pdf_task(
return result return result
except PdfAiError as e: except PdfAiError as e:
logger.error(f"Task {task_id}: {e}") result = _build_pdf_ai_error_payload(task_id, e, "translate-pdf")
result = {"status": "failed", "error": str(e)}
finalize_task_tracking( finalize_task_tracking(
user_id=user_id, tool="translate-pdf", user_id=user_id, tool="translate-pdf",
original_filename=original_filename, result=result, original_filename=original_filename, result=result,
@@ -242,8 +269,7 @@ def extract_tables_task(
return result return result
except PdfAiError as e: except PdfAiError as e:
logger.error(f"Task {task_id}: {e}") result = _build_pdf_ai_error_payload(task_id, e, "extract-tables")
result = {"status": "failed", "error": str(e)}
finalize_task_tracking( finalize_task_tracking(
user_id=user_id, tool="extract-tables", user_id=user_id, tool="extract-tables",
original_filename=original_filename, result=result, original_filename=original_filename, result=result,

View File

@@ -0,0 +1,32 @@
"""Tests for PDF AI Celery task error payloads."""
from app.services.pdf_ai_service import PdfAiError
from app.tasks.pdf_ai_tasks import _build_pdf_ai_error_payload
def test_build_pdf_ai_error_payload_contains_classified_fields():
"""Should include error_code and user_message for task status normalization."""
error = PdfAiError(
"AI service is experiencing high demand. Please wait a moment and try again.",
error_code="OPENROUTER_RATE_LIMIT",
)
payload = _build_pdf_ai_error_payload("task-123", error, "chat-pdf")
assert payload["status"] == "failed"
assert payload["error_code"] == "OPENROUTER_RATE_LIMIT"
assert "user_message" in payload
assert payload["task_id"] == "task-123"
def test_build_pdf_ai_error_payload_includes_detail_when_available():
"""Should preserve machine-searchable detail context when provided."""
error = PdfAiError(
"Failed to extract text from PDF.",
error_code="PDF_TEXT_EXTRACTION_FAILED",
detail="EOF marker not found",
)
payload = _build_pdf_ai_error_payload("task-456", error, "summarize-pdf")
assert payload["error_code"] == "PDF_TEXT_EXTRACTION_FAILED"
assert payload["detail"] == "EOF marker not found"

View File

@@ -88,7 +88,7 @@ class TestTaskStatus:
assert has_task_access(user['id'], 'web', 'local-download-id') is True assert has_task_access(user['id'], 'web', 'local-download-id') is True
def test_failure_task(self, client, monkeypatch): def test_failure_task(self, client, monkeypatch):
"""Should return FAILURE state with error message.""" """Should return FAILURE state with normalized error payload."""
mock_result = MagicMock() mock_result = MagicMock()
mock_result.state = 'FAILURE' mock_result.state = 'FAILURE'
mock_result.info = Exception('Conversion failed due to corrupt PDF.') mock_result.info = Exception('Conversion failed due to corrupt PDF.')
@@ -102,10 +102,50 @@ class TestTaskStatus:
assert response.status_code == 200 assert response.status_code == 200
data = response.get_json() data = response.get_json()
assert data['state'] == 'FAILURE' assert data['state'] == 'FAILURE'
assert 'error' in data assert data['error']['error_code'] == 'TASK_FAILURE'
assert 'user_message' in data['error']
assert data['error']['task_id'] == 'failed-id'
def test_failure_task_unregistered_maps_to_specific_code(self, client):
"""Should classify unregistered celery task errors."""
mock_result = MagicMock()
mock_result.state = 'FAILURE'
mock_result.info = Exception("Received unregistered task of type 'app.tasks.missing_task'.")
with client.session_transaction() as session:
session[TASK_ACCESS_SESSION_KEY] = ['missing-task-id']
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/missing-task-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['error']['error_code'] == 'CELERY_NOT_REGISTERED'
def test_success_failed_result_returns_normalized_error(self, client):
"""Should normalize task-level failure payload returned inside SUCCESS state."""
mock_result = MagicMock()
mock_result.state = 'SUCCESS'
mock_result.result = {
'status': 'failed',
'error_code': 'OPENROUTER_RATE_LIMIT',
'user_message': 'AI service is experiencing high demand.',
}
with client.session_transaction() as session:
session[TASK_ACCESS_SESSION_KEY] = ['ai-failed-id']
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/ai-failed-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['state'] == 'SUCCESS'
assert data['error']['error_code'] == 'OPENROUTER_RATE_LIMIT'
assert data['error']['task_id'] == 'ai-failed-id'
def test_unknown_task_without_access_returns_404(self, client): def test_unknown_task_without_access_returns_404(self, client):
"""Should not expose task state without session or API ownership.""" """Should not expose task state without session or API ownership."""
response = client.get('/api/tasks/unknown-task/status') response = client.get('/api/tasks/unknown-task/status')
assert response.status_code == 404 assert response.status_code == 404