Merge pull request #4 from aborayan2022/codex/update-task-status-error-structure
Unify task status error schema and classify PDF AI failures
This commit is contained in:
@@ -54,6 +54,57 @@ def _remember_download_alias(actor, download_task_id: str | None):
|
||||
)
|
||||
|
||||
|
||||
def _normalized_error_payload(task_id: str, error_code: str, user_message: str, trace_id: str | None = None) -> dict:
|
||||
"""Return unified error payload for task status responses."""
|
||||
payload = {
|
||||
"error_code": error_code,
|
||||
"user_message": user_message,
|
||||
"task_id": task_id,
|
||||
}
|
||||
if trace_id:
|
||||
payload["trace_id"] = trace_id
|
||||
return payload
|
||||
|
||||
|
||||
def _infer_failure_error(task_id: str, info) -> dict:
|
||||
"""Classify celery failure information into a normalized payload."""
|
||||
if isinstance(info, dict) and info.get("error_code") and info.get("user_message"):
|
||||
return _normalized_error_payload(
|
||||
task_id=task_id,
|
||||
error_code=info["error_code"],
|
||||
user_message=info["user_message"],
|
||||
trace_id=info.get("trace_id"),
|
||||
)
|
||||
|
||||
message = str(info) if info else "Task failed."
|
||||
lowered = message.lower()
|
||||
|
||||
if "notregistered" in lowered or "received unregistered task" in lowered:
|
||||
return _normalized_error_payload(
|
||||
task_id,
|
||||
"CELERY_NOT_REGISTERED",
|
||||
"Task worker is temporarily unavailable. Please retry in a moment.",
|
||||
)
|
||||
if "openrouter" in lowered and "401" in lowered:
|
||||
return _normalized_error_payload(
|
||||
task_id,
|
||||
"OPENROUTER_UNAUTHORIZED",
|
||||
"AI features are temporarily unavailable due to a configuration issue.",
|
||||
)
|
||||
if "openrouter" in lowered and "429" in lowered:
|
||||
return _normalized_error_payload(
|
||||
task_id,
|
||||
"OPENROUTER_RATE_LIMIT",
|
||||
"AI service is currently busy. Please try again shortly.",
|
||||
)
|
||||
|
||||
return _normalized_error_payload(
|
||||
task_id,
|
||||
"TASK_FAILURE",
|
||||
"Task processing failed. Please retry.",
|
||||
)
|
||||
|
||||
|
||||
@tasks_bp.route("/<task_id>/status", methods=["GET"])
|
||||
@limiter.limit("300/minute", override_defaults=True)
|
||||
def get_task_status(task_id: str):
|
||||
@@ -91,8 +142,15 @@ def get_task_status(task_id: str):
|
||||
task_result = result.result or {}
|
||||
_remember_download_alias(actor, _extract_download_task_id(task_result.get("download_url")))
|
||||
response["result"] = task_result
|
||||
if task_result.get("status") == "failed":
|
||||
response["error"] = _normalized_error_payload(
|
||||
task_id=task_id,
|
||||
error_code=task_result.get("error_code", "TASK_FAILURE"),
|
||||
user_message=task_result.get("user_message", task_result.get("error", "Task processing failed.")),
|
||||
trace_id=task_result.get("trace_id"),
|
||||
)
|
||||
|
||||
elif result.state == "FAILURE":
|
||||
response["error"] = str(result.info) if result.info else "Task failed."
|
||||
response["error"] = _infer_failure_error(task_id, result.info)
|
||||
|
||||
return jsonify(response)
|
||||
|
||||
@@ -11,7 +11,16 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class PdfAiError(Exception):
|
||||
"""Custom exception for PDF AI service failures."""
|
||||
pass
|
||||
def __init__(
|
||||
self,
|
||||
user_message: str,
|
||||
error_code: str = "PDF_AI_ERROR",
|
||||
detail: str | None = None,
|
||||
):
|
||||
super().__init__(user_message)
|
||||
self.user_message = user_message
|
||||
self.error_code = error_code
|
||||
self.detail = detail
|
||||
|
||||
|
||||
def _estimate_tokens(text: str) -> int:
|
||||
@@ -33,7 +42,11 @@ def _extract_text_from_pdf(input_path: str, max_pages: int = 50) -> str:
|
||||
texts.append(f"[Page {i + 1}]\n{text}")
|
||||
return "\n\n".join(texts)
|
||||
except Exception as e:
|
||||
raise PdfAiError(f"Failed to extract text from PDF: {str(e)}")
|
||||
raise PdfAiError(
|
||||
"Failed to extract text from PDF.",
|
||||
error_code="PDF_TEXT_EXTRACTION_FAILED",
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
def _call_openrouter(
|
||||
@@ -48,7 +61,10 @@ def _call_openrouter(
|
||||
from app.services.ai_cost_service import check_ai_budget, AiBudgetExceededError
|
||||
check_ai_budget()
|
||||
except AiBudgetExceededError:
|
||||
raise PdfAiError("Monthly AI processing budget has been reached. Please try again next month.")
|
||||
raise PdfAiError(
|
||||
"Monthly AI processing budget has been reached. Please try again next month.",
|
||||
error_code="AI_BUDGET_EXCEEDED",
|
||||
)
|
||||
except Exception:
|
||||
pass # Don't block if cost service unavailable
|
||||
|
||||
@@ -57,7 +73,8 @@ def _call_openrouter(
|
||||
if not settings.api_key:
|
||||
logger.error("OPENROUTER_API_KEY is not set or is a placeholder value.")
|
||||
raise PdfAiError(
|
||||
"AI features are temporarily unavailable. Our team has been notified."
|
||||
"AI features are temporarily unavailable. Our team has been notified.",
|
||||
error_code="OPENROUTER_MISSING_API_KEY",
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -84,25 +101,29 @@ def _call_openrouter(
|
||||
if response.status_code == 401:
|
||||
logger.error("OpenRouter API key is invalid or expired (401).")
|
||||
raise PdfAiError(
|
||||
"AI features are temporarily unavailable due to a configuration issue. Our team has been notified."
|
||||
"AI features are temporarily unavailable due to a configuration issue. Our team has been notified.",
|
||||
error_code="OPENROUTER_UNAUTHORIZED",
|
||||
)
|
||||
|
||||
if response.status_code == 402:
|
||||
logger.error("OpenRouter account has insufficient credits (402).")
|
||||
raise PdfAiError(
|
||||
"AI processing credits have been exhausted. Please try again later."
|
||||
"AI processing credits have been exhausted. Please try again later.",
|
||||
error_code="OPENROUTER_INSUFFICIENT_CREDITS",
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
logger.warning("OpenRouter rate limit reached (429).")
|
||||
raise PdfAiError(
|
||||
"AI service is experiencing high demand. Please wait a moment and try again."
|
||||
"AI service is experiencing high demand. Please wait a moment and try again.",
|
||||
error_code="OPENROUTER_RATE_LIMIT",
|
||||
)
|
||||
|
||||
if response.status_code >= 500:
|
||||
logger.error("OpenRouter server error (%s).", response.status_code)
|
||||
raise PdfAiError(
|
||||
"AI service provider is experiencing issues. Please try again shortly."
|
||||
"AI service provider is experiencing issues. Please try again shortly.",
|
||||
error_code="OPENROUTER_SERVER_ERROR",
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
@@ -112,7 +133,11 @@ def _call_openrouter(
|
||||
if data.get("error"):
|
||||
error_msg = data["error"].get("message", "") if isinstance(data["error"], dict) else str(data["error"])
|
||||
logger.error("OpenRouter returned an error payload: %s", error_msg)
|
||||
raise PdfAiError("AI service encountered an issue. Please try again.")
|
||||
raise PdfAiError(
|
||||
"AI service encountered an issue. Please try again.",
|
||||
error_code="OPENROUTER_ERROR_PAYLOAD",
|
||||
detail=error_msg,
|
||||
)
|
||||
|
||||
reply = (
|
||||
data.get("choices", [{}])[0]
|
||||
@@ -122,7 +147,10 @@ def _call_openrouter(
|
||||
)
|
||||
|
||||
if not reply:
|
||||
raise PdfAiError("AI returned an empty response. Please try again.")
|
||||
raise PdfAiError(
|
||||
"AI returned an empty response. Please try again.",
|
||||
error_code="OPENROUTER_EMPTY_RESPONSE",
|
||||
)
|
||||
|
||||
# Log usage
|
||||
try:
|
||||
@@ -142,13 +170,23 @@ def _call_openrouter(
|
||||
except PdfAiError:
|
||||
raise
|
||||
except requests.exceptions.Timeout:
|
||||
raise PdfAiError("AI service timed out. Please try again.")
|
||||
raise PdfAiError(
|
||||
"AI service timed out. Please try again.",
|
||||
error_code="OPENROUTER_TIMEOUT",
|
||||
)
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.error("Cannot connect to OpenRouter API at %s", settings.base_url)
|
||||
raise PdfAiError("AI service is unreachable. Please try again shortly.")
|
||||
raise PdfAiError(
|
||||
"AI service is unreachable. Please try again shortly.",
|
||||
error_code="OPENROUTER_CONNECTION_ERROR",
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error("OpenRouter API error: %s", e)
|
||||
raise PdfAiError("AI service is temporarily unavailable.")
|
||||
raise PdfAiError(
|
||||
"AI service is temporarily unavailable.",
|
||||
error_code="OPENROUTER_REQUEST_ERROR",
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -166,11 +204,11 @@ def chat_with_pdf(input_path: str, question: str) -> dict:
|
||||
{"reply": "...", "pages_analyzed": int}
|
||||
"""
|
||||
if not question or not question.strip():
|
||||
raise PdfAiError("Please provide a question.")
|
||||
raise PdfAiError("Please provide a question.", error_code="PDF_AI_INVALID_INPUT")
|
||||
|
||||
text = _extract_text_from_pdf(input_path)
|
||||
if not text.strip():
|
||||
raise PdfAiError("Could not extract any text from the PDF.")
|
||||
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
||||
|
||||
# Truncate to fit context window
|
||||
max_chars = 12000
|
||||
@@ -206,7 +244,7 @@ def summarize_pdf(input_path: str, length: str = "medium") -> dict:
|
||||
"""
|
||||
text = _extract_text_from_pdf(input_path)
|
||||
if not text.strip():
|
||||
raise PdfAiError("Could not extract any text from the PDF.")
|
||||
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
||||
|
||||
length_instruction = {
|
||||
"short": "Provide a brief summary in 2-3 sentences.",
|
||||
@@ -245,11 +283,11 @@ def translate_pdf(input_path: str, target_language: str) -> dict:
|
||||
{"translation": "...", "pages_analyzed": int, "target_language": str}
|
||||
"""
|
||||
if not target_language or not target_language.strip():
|
||||
raise PdfAiError("Please specify a target language.")
|
||||
raise PdfAiError("Please specify a target language.", error_code="PDF_AI_INVALID_INPUT")
|
||||
|
||||
text = _extract_text_from_pdf(input_path)
|
||||
if not text.strip():
|
||||
raise PdfAiError("Could not extract any text from the PDF.")
|
||||
raise PdfAiError("Could not extract any text from the PDF.", error_code="PDF_TEXT_EMPTY")
|
||||
|
||||
max_chars = 10000
|
||||
truncated = text[:max_chars]
|
||||
@@ -325,7 +363,8 @@ def extract_tables(input_path: str) -> dict:
|
||||
|
||||
if not result_tables:
|
||||
raise PdfAiError(
|
||||
"No tables found in the PDF. This tool works best with PDFs containing tabular data."
|
||||
"No tables found in the PDF. This tool works best with PDFs containing tabular data.",
|
||||
error_code="PDF_TABLES_NOT_FOUND",
|
||||
)
|
||||
|
||||
logger.info(f"Extracted {len(result_tables)} tables from PDF")
|
||||
@@ -338,6 +377,10 @@ def extract_tables(input_path: str) -> dict:
|
||||
except PdfAiError:
|
||||
raise
|
||||
except ImportError:
|
||||
raise PdfAiError("tabula-py library is not installed.")
|
||||
raise PdfAiError("tabula-py library is not installed.", error_code="TABULA_NOT_INSTALLED")
|
||||
except Exception as e:
|
||||
raise PdfAiError(f"Failed to extract tables: {str(e)}")
|
||||
raise PdfAiError(
|
||||
"Failed to extract tables.",
|
||||
error_code="PDF_TABLE_EXTRACTION_FAILED",
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Celery tasks for PDF AI tools — Chat, Summarize, Translate, Table Extract."""
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
|
||||
from flask import current_app
|
||||
|
||||
@@ -22,6 +23,35 @@ def _cleanup(task_id: str):
|
||||
cleanup_task_files(task_id, keep_outputs=False)
|
||||
|
||||
|
||||
def _build_pdf_ai_error_payload(task_id: str, error: PdfAiError, tool: str) -> dict:
|
||||
"""Build a normalized error payload for AI tasks and emit structured logs."""
|
||||
payload = {
|
||||
"status": "failed",
|
||||
"error_code": getattr(error, "error_code", "PDF_AI_ERROR"),
|
||||
"user_message": getattr(error, "user_message", str(error)) or "AI processing failed.",
|
||||
"task_id": task_id,
|
||||
}
|
||||
|
||||
detail = getattr(error, "detail", None)
|
||||
if detail:
|
||||
payload["detail"] = detail
|
||||
|
||||
logger.error(
|
||||
json.dumps(
|
||||
{
|
||||
"event": "pdf_ai_task_failed",
|
||||
"tool": tool,
|
||||
"task_id": task_id,
|
||||
"error_code": payload["error_code"],
|
||||
"user_message": payload["user_message"],
|
||||
"detail": detail,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
)
|
||||
return payload
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Chat with PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -59,8 +89,7 @@ def chat_with_pdf_task(
|
||||
return result
|
||||
|
||||
except PdfAiError as e:
|
||||
logger.error(f"Task {task_id}: {e}")
|
||||
result = {"status": "failed", "error": str(e)}
|
||||
result = _build_pdf_ai_error_payload(task_id, e, "chat-pdf")
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool="chat-pdf",
|
||||
original_filename=original_filename, result=result,
|
||||
@@ -120,8 +149,7 @@ def summarize_pdf_task(
|
||||
return result
|
||||
|
||||
except PdfAiError as e:
|
||||
logger.error(f"Task {task_id}: {e}")
|
||||
result = {"status": "failed", "error": str(e)}
|
||||
result = _build_pdf_ai_error_payload(task_id, e, "summarize-pdf")
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool="summarize-pdf",
|
||||
original_filename=original_filename, result=result,
|
||||
@@ -182,8 +210,7 @@ def translate_pdf_task(
|
||||
return result
|
||||
|
||||
except PdfAiError as e:
|
||||
logger.error(f"Task {task_id}: {e}")
|
||||
result = {"status": "failed", "error": str(e)}
|
||||
result = _build_pdf_ai_error_payload(task_id, e, "translate-pdf")
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool="translate-pdf",
|
||||
original_filename=original_filename, result=result,
|
||||
@@ -242,8 +269,7 @@ def extract_tables_task(
|
||||
return result
|
||||
|
||||
except PdfAiError as e:
|
||||
logger.error(f"Task {task_id}: {e}")
|
||||
result = {"status": "failed", "error": str(e)}
|
||||
result = _build_pdf_ai_error_payload(task_id, e, "extract-tables")
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool="extract-tables",
|
||||
original_filename=original_filename, result=result,
|
||||
|
||||
32
backend/tests/test_pdf_ai_tasks.py
Normal file
32
backend/tests/test_pdf_ai_tasks.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Tests for PDF AI Celery task error payloads."""
|
||||
from app.services.pdf_ai_service import PdfAiError
|
||||
from app.tasks.pdf_ai_tasks import _build_pdf_ai_error_payload
|
||||
|
||||
|
||||
def test_build_pdf_ai_error_payload_contains_classified_fields():
|
||||
"""Should include error_code and user_message for task status normalization."""
|
||||
error = PdfAiError(
|
||||
"AI service is experiencing high demand. Please wait a moment and try again.",
|
||||
error_code="OPENROUTER_RATE_LIMIT",
|
||||
)
|
||||
|
||||
payload = _build_pdf_ai_error_payload("task-123", error, "chat-pdf")
|
||||
|
||||
assert payload["status"] == "failed"
|
||||
assert payload["error_code"] == "OPENROUTER_RATE_LIMIT"
|
||||
assert "user_message" in payload
|
||||
assert payload["task_id"] == "task-123"
|
||||
|
||||
|
||||
def test_build_pdf_ai_error_payload_includes_detail_when_available():
|
||||
"""Should preserve machine-searchable detail context when provided."""
|
||||
error = PdfAiError(
|
||||
"Failed to extract text from PDF.",
|
||||
error_code="PDF_TEXT_EXTRACTION_FAILED",
|
||||
detail="EOF marker not found",
|
||||
)
|
||||
|
||||
payload = _build_pdf_ai_error_payload("task-456", error, "summarize-pdf")
|
||||
|
||||
assert payload["error_code"] == "PDF_TEXT_EXTRACTION_FAILED"
|
||||
assert payload["detail"] == "EOF marker not found"
|
||||
@@ -88,7 +88,7 @@ class TestTaskStatus:
|
||||
assert has_task_access(user['id'], 'web', 'local-download-id') is True
|
||||
|
||||
def test_failure_task(self, client, monkeypatch):
|
||||
"""Should return FAILURE state with error message."""
|
||||
"""Should return FAILURE state with normalized error payload."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.state = 'FAILURE'
|
||||
mock_result.info = Exception('Conversion failed due to corrupt PDF.')
|
||||
@@ -102,7 +102,47 @@ class TestTaskStatus:
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['state'] == 'FAILURE'
|
||||
assert 'error' in data
|
||||
assert data['error']['error_code'] == 'TASK_FAILURE'
|
||||
assert 'user_message' in data['error']
|
||||
assert data['error']['task_id'] == 'failed-id'
|
||||
|
||||
def test_failure_task_unregistered_maps_to_specific_code(self, client):
|
||||
"""Should classify unregistered celery task errors."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.state = 'FAILURE'
|
||||
mock_result.info = Exception("Received unregistered task of type 'app.tasks.missing_task'.")
|
||||
|
||||
with client.session_transaction() as session:
|
||||
session[TASK_ACCESS_SESSION_KEY] = ['missing-task-id']
|
||||
|
||||
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
|
||||
response = client.get('/api/tasks/missing-task-id/status')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['error']['error_code'] == 'CELERY_NOT_REGISTERED'
|
||||
|
||||
def test_success_failed_result_returns_normalized_error(self, client):
|
||||
"""Should normalize task-level failure payload returned inside SUCCESS state."""
|
||||
mock_result = MagicMock()
|
||||
mock_result.state = 'SUCCESS'
|
||||
mock_result.result = {
|
||||
'status': 'failed',
|
||||
'error_code': 'OPENROUTER_RATE_LIMIT',
|
||||
'user_message': 'AI service is experiencing high demand.',
|
||||
}
|
||||
|
||||
with client.session_transaction() as session:
|
||||
session[TASK_ACCESS_SESSION_KEY] = ['ai-failed-id']
|
||||
|
||||
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
|
||||
response = client.get('/api/tasks/ai-failed-id/status')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data['state'] == 'SUCCESS'
|
||||
assert data['error']['error_code'] == 'OPENROUTER_RATE_LIMIT'
|
||||
assert data['error']['task_id'] == 'ai-failed-id'
|
||||
|
||||
def test_unknown_task_without_access_returns_404(self, client):
|
||||
"""Should not expose task state without session or API ownership."""
|
||||
|
||||
Reference in New Issue
Block a user