From b2a76788486b3880e957245ad5986a223079cc96 Mon Sep 17 00:00:00 2001 From: Ahmed Bakr Ali <119736744+aborayan2022@users.noreply.github.com> Date: Tue, 24 Mar 2026 23:30:46 +0200 Subject: [PATCH] Unify task status error schema and classify PDF AI failures --- backend/app/routes/tasks.py | 60 +++++++++++++++++- backend/app/services/pdf_ai_service.py | 85 +++++++++++++++++++------- backend/app/tasks/pdf_ai_tasks.py | 42 ++++++++++--- backend/tests/test_pdf_ai_tasks.py | 32 ++++++++++ backend/tests/test_tasks_route.py | 46 +++++++++++++- 5 files changed, 232 insertions(+), 33 deletions(-) create mode 100644 backend/tests/test_pdf_ai_tasks.py diff --git a/backend/app/routes/tasks.py b/backend/app/routes/tasks.py index 74b07bc..9517609 100644 --- a/backend/app/routes/tasks.py +++ b/backend/app/routes/tasks.py @@ -54,6 +54,57 @@ def _remember_download_alias(actor, download_task_id: str | None): ) +def _normalized_error_payload(task_id: str, error_code: str, user_message: str, trace_id: str | None = None) -> dict: + """Return unified error payload for task status responses.""" + payload = { + "error_code": error_code, + "user_message": user_message, + "task_id": task_id, + } + if trace_id: + payload["trace_id"] = trace_id + return payload + + +def _infer_failure_error(task_id: str, info) -> dict: + """Classify celery failure information into a normalized payload.""" + if isinstance(info, dict) and info.get("error_code") and info.get("user_message"): + return _normalized_error_payload( + task_id=task_id, + error_code=info["error_code"], + user_message=info["user_message"], + trace_id=info.get("trace_id"), + ) + + message = str(info) if info else "Task failed." + lowered = message.lower() + + if "notregistered" in lowered or "received unregistered task" in lowered: + return _normalized_error_payload( + task_id, + "CELERY_NOT_REGISTERED", + "Task worker is temporarily unavailable. Please retry in a moment.", + ) + if "openrouter" in lowered and "401" in lowered: + return _normalized_error_payload( + task_id, + "OPENROUTER_UNAUTHORIZED", + "AI features are temporarily unavailable due to a configuration issue.", + ) + if "openrouter" in lowered and "429" in lowered: + return _normalized_error_payload( + task_id, + "OPENROUTER_RATE_LIMIT", + "AI service is currently busy. Please try again shortly.", + ) + + return _normalized_error_payload( + task_id, + "TASK_FAILURE", + "Task processing failed. Please retry.", + ) + + @tasks_bp.route("//status", methods=["GET"]) @limiter.limit("300/minute", override_defaults=True) def get_task_status(task_id: str): @@ -91,8 +142,15 @@ def get_task_status(task_id: str): task_result = result.result or {} _remember_download_alias(actor, _extract_download_task_id(task_result.get("download_url"))) response["result"] = task_result + if task_result.get("status") == "failed": + response["error"] = _normalized_error_payload( + task_id=task_id, + error_code=task_result.get("error_code", "TASK_FAILURE"), + user_message=task_result.get("user_message", task_result.get("error", "Task processing failed.")), + trace_id=task_result.get("trace_id"), + ) elif result.state == "FAILURE": - response["error"] = str(result.info) if result.info else "Task failed." + response["error"] = _infer_failure_error(task_id, result.info) return jsonify(response) diff --git a/backend/app/services/pdf_ai_service.py b/backend/app/services/pdf_ai_service.py index 5e05b74..c3a5806 100644 --- a/backend/app/services/pdf_ai_service.py +++ b/backend/app/services/pdf_ai_service.py @@ -11,7 +11,16 @@ logger = logging.getLogger(__name__) class PdfAiError(Exception): """Custom exception for PDF AI service failures.""" - pass + def __init__( + self, + user_message: str, + error_code: str = "PDF_AI_ERROR", + detail: str | None = None, + ): + super().__init__(user_message) + self.user_message = user_message + self.error_code = error_code + self.detail = detail def _estimate_tokens(text: str) -> int: @@ -33,7 +42,11 @@ def _extract_text_from_pdf(input_path: str, max_pages: int = 50) -> str: texts.append(f"[Page {i + 1}]\n{text}") return "\n\n".join(texts) except Exception as e: - raise PdfAiError(f"Failed to extract text from PDF: {str(e)}") + raise PdfAiError( + "Failed to extract text from PDF.", + error_code="PDF_TEXT_EXTRACTION_FAILED", + detail=str(e), + ) def _call_openrouter( @@ -48,7 +61,10 @@ def _call_openrouter( from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError check_ai_budget() except AiBudgetExceededError: - raise PdfAiError("Monthly AI processing budget has been reached. Please try again next month.") + raise PdfAiError( + "Monthly AI processing budget has been reached. Please try again next month.", + error_code="AI_BUDGET_EXCEEDED", + ) except Exception: pass # Don't block if cost service unavailable @@ -57,7 +73,8 @@ def _call_openrouter( if not settings.api_key: logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.") raise PdfAiError( - "AI features are temporarily unavailable. Our team has been notified." + "AI features are temporarily unavailable. Our team has been notified.", + error_code="OPENROUTER_MISSING_API_KEY", ) messages = [ @@ -84,25 +101,29 @@ def _call_openrouter( if response.status_code == 401: logger.error("OpenRouter API key is invalid or expired (401).") raise PdfAiError( - "AI features are temporarily unavailable due to a configuration issue. Our team has been notified." + "AI features are temporarily unavailable due to a configuration issue. Our team has been notified.", + error_code="OPENROUTER_UNAUTHORIZED", ) if response.status_code == 402: logger.error("OpenRouter account has insufficient credits (402).") raise PdfAiError( - "AI processing credits have been exhausted. Please try again later." + "AI processing credits have been exhausted. Please try again later.", + error_code="OPENROUTER_INSUFFICIENT_CREDITS", ) if response.status_code == 429: logger.warning("OpenRouter rate limit reached (429).") raise PdfAiError( - "AI service is experiencing high demand. Please wait a moment and try again." + "AI service is experiencing high demand. Please wait a moment and try again.", + error_code="OPENROUTER_RATE_LIMIT", ) if response.status_code >= 500: logger.error("OpenRouter server error (%s).", response.status_code) raise PdfAiError( - "AI service provider is experiencing issues. Please try again shortly." + "AI service provider is experiencing issues. Please try again shortly.", + error_code="OPENROUTER_SERVER_ERROR", ) response.raise_for_status() @@ -112,7 +133,11 @@ def _call_openrouter( if data.get("error"): error_msg = data["error"].get("message", "") if isinstance(data["error"], dict) else str(data["error"]) logger.error("OpenRouter returned an error payload: %s", error_msg) - raise PdfAiError("AI service encountered an issue. Please try again.") + raise PdfAiError( + "AI service encountered an issue. Please try again.", + error_code="OPENROUTER_ERROR_PAYLOAD", + detail=error_msg, + ) reply = ( data.get("choices", [{}])[0] @@ -122,7 +147,10 @@ def _call_openrouter( ) if not reply: - raise PdfAiError("AI returned an empty response. Please try again.") + raise PdfAiError( + "AI returned an empty response. Please try again.", + error_code="OPENROUTER_EMPTY_RESPONSE", + ) # Log usage try: @@ -142,13 +170,23 @@ def _call_openrouter( except PdfAiError: raise except requests.exceptions.Timeout: - raise PdfAiError("AI service timed out. Please try again.") + raise PdfAiError( + "AI service timed out. Please try again.", + error_code="OPENROUTER_TIMEOUT", + ) except requests.exceptions.ConnectionError: logger.error("Cannot connect to OpenRouter API at %s", settings.base_url) - raise PdfAiError("AI service is unreachable. Please try again shortly.") + raise PdfAiError( + "AI service is unreachable. Please try again shortly.", + error_code="OPENROUTER_CONNECTION_ERROR", + ) except requests.exceptions.RequestException as e: logger.error("OpenRouter API error: %s", e) - raise PdfAiError("AI service is temporarily unavailable.") + raise PdfAiError( + "AI service is temporarily unavailable.", + error_code="OPENROUTER_REQUEST_ERROR", + detail=str(e), + ) # --------------------------------------------------------------------------- @@ -166,11 +204,11 @@ def chat_with_pdf(input_path: str, question: str) -> dict: {"reply": "...", "pages_analyzed": int} """ if not question or not question.strip(): - raise PdfAiError("Please provide a question.") + raise PdfAiError("Please provide a question.", error_code="PDF_AI_INVALID_INPUT") text = _extract_text_from_pdf(input_path) if not text.strip(): - raise PdfAiError("Could not extract any text from the PDF.") + raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY") # Truncate to fit context window max_chars = 12000 @@ -206,7 +244,7 @@ def summarize_pdf(input_path: str, length: str = "medium") -> dict: """ text = _extract_text_from_pdf(input_path) if not text.strip(): - raise PdfAiError("Could not extract any text from the PDF.") + raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY") length_instruction = { "short": "Provide a brief summary in 2-3 sentences.", @@ -245,11 +283,11 @@ def translate_pdf(input_path: str, target_language: str) -> dict: {"translation": "...", "pages_analyzed": int, "target_language": str} """ if not target_language or not target_language.strip(): - raise PdfAiError("Please specify a target language.") + raise PdfAiError("Please specify a target language.", error_code="PDF_AI_INVALID_INPUT") text = _extract_text_from_pdf(input_path) if not text.strip(): - raise PdfAiError("Could not extract any text from the PDF.") + raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY") max_chars = 10000 truncated = text[:max_chars] @@ -325,7 +363,8 @@ def extract_tables(input_path: str) -> dict: if not result_tables: raise PdfAiError( - "No tables found in the PDF. This tool works best with PDFs containing tabular data." + "No tables found in the PDF. This tool works best with PDFs containing tabular data.", + error_code="PDF_TABLES_NOT_FOUND", ) logger.info(f"Extracted {len(result_tables)} tables from PDF") @@ -338,6 +377,10 @@ def extract_tables(input_path: str) -> dict: except PdfAiError: raise except ImportError: - raise PdfAiError("tabula-py library is not installed.") + raise PdfAiError("tabula-py library is not installed.", error_code="TABULA_NOT_INSTALLED") except Exception as e: - raise PdfAiError(f"Failed to extract tables: {str(e)}") + raise PdfAiError( + "Failed to extract tables.", + error_code="PDF_TABLE_EXTRACTION_FAILED", + detail=str(e), + ) diff --git a/backend/app/tasks/pdf_ai_tasks.py b/backend/app/tasks/pdf_ai_tasks.py index 28901cd..a1df423 100644 --- a/backend/app/tasks/pdf_ai_tasks.py +++ b/backend/app/tasks/pdf_ai_tasks.py @@ -1,6 +1,7 @@ """Celery tasks for PDF AI tools — Chat, Summarize, Translate, Table Extract.""" import os import logging +import json from flask import current_app @@ -22,6 +23,35 @@ def _cleanup(task_id: str): cleanup_task_files(task_id, keep_outputs=False) +def _build_pdf_ai_error_payload(task_id: str, error: PdfAiError, tool: str) -> dict: + """Build a normalized error payload for AI tasks and emit structured logs.""" + payload = { + "status": "failed", + "error_code": getattr(error, "error_code", "PDF_AI_ERROR"), + "user_message": getattr(error, "user_message", str(error)) or "AI processing failed.", + "task_id": task_id, + } + + detail = getattr(error, "detail", None) + if detail: + payload["detail"] = detail + + logger.error( + json.dumps( + { + "event": "pdf_ai_task_failed", + "tool": tool, + "task_id": task_id, + "error_code": payload["error_code"], + "user_message": payload["user_message"], + "detail": detail, + }, + ensure_ascii=False, + ) + ) + return payload + + # --------------------------------------------------------------------------- # Chat with PDF # --------------------------------------------------------------------------- @@ -59,8 +89,7 @@ def chat_with_pdf_task( return result except PdfAiError as e: - logger.error(f"Task {task_id}: {e}") - result = {"status": "failed", "error": str(e)} + result = _build_pdf_ai_error_payload(task_id, e, "chat-pdf") finalize_task_tracking( user_id=user_id, tool="chat-pdf", original_filename=original_filename, result=result, @@ -120,8 +149,7 @@ def summarize_pdf_task( return result except PdfAiError as e: - logger.error(f"Task {task_id}: {e}") - result = {"status": "failed", "error": str(e)} + result = _build_pdf_ai_error_payload(task_id, e, "summarize-pdf") finalize_task_tracking( user_id=user_id, tool="summarize-pdf", original_filename=original_filename, result=result, @@ -182,8 +210,7 @@ def translate_pdf_task( return result except PdfAiError as e: - logger.error(f"Task {task_id}: {e}") - result = {"status": "failed", "error": str(e)} + result = _build_pdf_ai_error_payload(task_id, e, "translate-pdf") finalize_task_tracking( user_id=user_id, tool="translate-pdf", original_filename=original_filename, result=result, @@ -242,8 +269,7 @@ def extract_tables_task( return result except PdfAiError as e: - logger.error(f"Task {task_id}: {e}") - result = {"status": "failed", "error": str(e)} + result = _build_pdf_ai_error_payload(task_id, e, "extract-tables") finalize_task_tracking( user_id=user_id, tool="extract-tables", original_filename=original_filename, result=result, diff --git a/backend/tests/test_pdf_ai_tasks.py b/backend/tests/test_pdf_ai_tasks.py new file mode 100644 index 0000000..3eceeb8 --- /dev/null +++ b/backend/tests/test_pdf_ai_tasks.py @@ -0,0 +1,32 @@ +"""Tests for PDF AI Celery task error payloads.""" +from app.services.pdf_ai_service import PdfAiError +from app.tasks.pdf_ai_tasks import _build_pdf_ai_error_payload + + +def test_build_pdf_ai_error_payload_contains_classified_fields(): + """Should include error_code and user_message for task status normalization.""" + error = PdfAiError( + "AI service is experiencing high demand. Please wait a moment and try again.", + error_code="OPENROUTER_RATE_LIMIT", + ) + + payload = _build_pdf_ai_error_payload("task-123", error, "chat-pdf") + + assert payload["status"] == "failed" + assert payload["error_code"] == "OPENROUTER_RATE_LIMIT" + assert "user_message" in payload + assert payload["task_id"] == "task-123" + + +def test_build_pdf_ai_error_payload_includes_detail_when_available(): + """Should preserve machine-searchable detail context when provided.""" + error = PdfAiError( + "Failed to extract text from PDF.", + error_code="PDF_TEXT_EXTRACTION_FAILED", + detail="EOF marker not found", + ) + + payload = _build_pdf_ai_error_payload("task-456", error, "summarize-pdf") + + assert payload["error_code"] == "PDF_TEXT_EXTRACTION_FAILED" + assert payload["detail"] == "EOF marker not found" diff --git a/backend/tests/test_tasks_route.py b/backend/tests/test_tasks_route.py index 1869f1e..b06e861 100644 --- a/backend/tests/test_tasks_route.py +++ b/backend/tests/test_tasks_route.py @@ -88,7 +88,7 @@ class TestTaskStatus: assert has_task_access(user['id'], 'web', 'local-download-id') is True def test_failure_task(self, client, monkeypatch): - """Should return FAILURE state with error message.""" + """Should return FAILURE state with normalized error payload.""" mock_result = MagicMock() mock_result.state = 'FAILURE' mock_result.info = Exception('Conversion failed due to corrupt PDF.') @@ -102,10 +102,50 @@ class TestTaskStatus: assert response.status_code == 200 data = response.get_json() assert data['state'] == 'FAILURE' - assert 'error' in data + assert data['error']['error_code'] == 'TASK_FAILURE' + assert 'user_message' in data['error'] + assert data['error']['task_id'] == 'failed-id' + + def test_failure_task_unregistered_maps_to_specific_code(self, client): + """Should classify unregistered celery task errors.""" + mock_result = MagicMock() + mock_result.state = 'FAILURE' + mock_result.info = Exception("Received unregistered task of type 'app.tasks.missing_task'.") + + with client.session_transaction() as session: + session[TASK_ACCESS_SESSION_KEY] = ['missing-task-id'] + + with patch('app.routes.tasks.AsyncResult', return_value=mock_result): + response = client.get('/api/tasks/missing-task-id/status') + + assert response.status_code == 200 + data = response.get_json() + assert data['error']['error_code'] == 'CELERY_NOT_REGISTERED' + + def test_success_failed_result_returns_normalized_error(self, client): + """Should normalize task-level failure payload returned inside SUCCESS state.""" + mock_result = MagicMock() + mock_result.state = 'SUCCESS' + mock_result.result = { + 'status': 'failed', + 'error_code': 'OPENROUTER_RATE_LIMIT', + 'user_message': 'AI service is experiencing high demand.', + } + + with client.session_transaction() as session: + session[TASK_ACCESS_SESSION_KEY] = ['ai-failed-id'] + + with patch('app.routes.tasks.AsyncResult', return_value=mock_result): + response = client.get('/api/tasks/ai-failed-id/status') + + assert response.status_code == 200 + data = response.get_json() + assert data['state'] == 'SUCCESS' + assert data['error']['error_code'] == 'OPENROUTER_RATE_LIMIT' + assert data['error']['task_id'] == 'ai-failed-id' def test_unknown_task_without_access_returns_404(self, client): """Should not expose task state without session or API ownership.""" response = client.get('/api/tasks/unknown-task/status') - assert response.status_code == 404 \ No newline at end of file + assert response.status_code == 404