Add OCR, Background Removal, and PDF Editor features with tests
- Implemented OCR functionality using pytesseract for image and PDF text extraction. - Added Background Removal service using rembg for image processing. - Developed PDF Editor service for applying text annotations to PDF files. - Created corresponding API routes for OCR, Background Removal, and PDF Editor. - Added frontend components for OCR and Background Removal tools. - Integrated feature flagging for new tools, ensuring they are disabled by default. - Implemented comprehensive unit tests for OCR service, PDF editor, and background removal. - Updated documentation to reflect new features and usage instructions. - Added translations for new features in English, Arabic, and French.
This commit is contained in:
@@ -13,6 +13,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ffmpeg \
|
||||
libmagic1 \
|
||||
imagemagick \
|
||||
tesseract-ocr \
|
||||
tesseract-ocr-eng \
|
||||
tesseract-ocr-ara \
|
||||
tesseract-ocr-fra \
|
||||
curl \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
@@ -90,6 +90,9 @@ def create_app(config_name=None):
|
||||
from app.routes.flowchart import flowchart_bp
|
||||
from app.routes.v1.tools import v1_bp
|
||||
from app.routes.config import config_bp
|
||||
from app.routes.ocr import ocr_bp
|
||||
from app.routes.removebg import removebg_bp
|
||||
from app.routes.pdf_editor import pdf_editor_bp
|
||||
|
||||
app.register_blueprint(health_bp, url_prefix="/api")
|
||||
app.register_blueprint(auth_bp, url_prefix="/api/auth")
|
||||
@@ -106,5 +109,8 @@ def create_app(config_name=None):
|
||||
app.register_blueprint(download_bp, url_prefix="/api/download")
|
||||
app.register_blueprint(v1_bp, url_prefix="/api/v1")
|
||||
app.register_blueprint(config_bp, url_prefix="/api/config")
|
||||
app.register_blueprint(ocr_bp, url_prefix="/api/ocr")
|
||||
app.register_blueprint(removebg_bp, url_prefix="/api/remove-bg")
|
||||
app.register_blueprint(pdf_editor_bp, url_prefix="/api/pdf-editor")
|
||||
|
||||
return app
|
||||
|
||||
@@ -32,6 +32,9 @@ def init_celery(app):
|
||||
"app.tasks.video_tasks.*": {"queue": "video"},
|
||||
"app.tasks.pdf_tools_tasks.*": {"queue": "pdf_tools"},
|
||||
"app.tasks.flowchart_tasks.*": {"queue": "flowchart"},
|
||||
"app.tasks.ocr_tasks.*": {"queue": "image"},
|
||||
"app.tasks.removebg_tasks.*": {"queue": "image"},
|
||||
"app.tasks.pdf_editor_tasks.*": {"queue": "pdf_tools"},
|
||||
}
|
||||
|
||||
# Celery Beat — periodic tasks
|
||||
|
||||
134
backend/app/routes/ocr.py
Normal file
134
backend/app/routes/ocr.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""OCR routes — extract text from images and PDFs."""
|
||||
from flask import Blueprint, request, jsonify, current_app
|
||||
|
||||
from app.extensions import limiter
|
||||
from app.services.policy_service import (
|
||||
assert_quota_available,
|
||||
build_task_tracking_kwargs,
|
||||
PolicyError,
|
||||
record_accepted_usage,
|
||||
resolve_web_actor,
|
||||
validate_actor_file,
|
||||
)
|
||||
from app.services.ocr_service import SUPPORTED_LANGUAGES
|
||||
from app.utils.file_validator import FileValidationError
|
||||
from app.utils.sanitizer import generate_safe_path
|
||||
from app.tasks.ocr_tasks import ocr_image_task, ocr_pdf_task
|
||||
|
||||
ocr_bp = Blueprint("ocr", __name__)
|
||||
|
||||
ALLOWED_IMAGE_TYPES = ["png", "jpg", "jpeg", "webp", "tiff", "bmp"]
|
||||
ALLOWED_OCR_TYPES = ALLOWED_IMAGE_TYPES + ["pdf"]
|
||||
|
||||
|
||||
def _check_feature_flag():
|
||||
"""Return an error response if FEATURE_EDITOR is disabled."""
|
||||
if not current_app.config.get("FEATURE_EDITOR", False):
|
||||
return jsonify({"error": "This feature is not enabled."}), 403
|
||||
return None
|
||||
|
||||
|
||||
@ocr_bp.route("/image", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def ocr_image_route():
|
||||
"""Extract text from an image using OCR.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': Image file
|
||||
- 'lang' (optional): Language code — eng, ara, fra (default: eng)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
flag_err = _check_feature_flag()
|
||||
if flag_err:
|
||||
return flag_err
|
||||
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
lang = request.form.get("lang", "eng").lower()
|
||||
if lang not in SUPPORTED_LANGUAGES:
|
||||
lang = "eng"
|
||||
|
||||
actor = resolve_web_actor()
|
||||
try:
|
||||
assert_quota_available(actor)
|
||||
except PolicyError as e:
|
||||
return jsonify({"error": e.message}), e.status_code
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_actor_file(
|
||||
file, allowed_types=ALLOWED_IMAGE_TYPES, actor=actor
|
||||
)
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = ocr_image_task.delay(
|
||||
input_path, task_id, original_filename, lang,
|
||||
**build_task_tracking_kwargs(actor),
|
||||
)
|
||||
record_accepted_usage(actor, "ocr-image", task.id)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "OCR started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
@ocr_bp.route("/pdf", methods=["POST"])
|
||||
@limiter.limit("5/minute")
|
||||
def ocr_pdf_route():
|
||||
"""Extract text from a scanned PDF using OCR.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'lang' (optional): Language code — eng, ara, fra (default: eng)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
flag_err = _check_feature_flag()
|
||||
if flag_err:
|
||||
return flag_err
|
||||
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
lang = request.form.get("lang", "eng").lower()
|
||||
if lang not in SUPPORTED_LANGUAGES:
|
||||
lang = "eng"
|
||||
|
||||
actor = resolve_web_actor()
|
||||
try:
|
||||
assert_quota_available(actor)
|
||||
except PolicyError as e:
|
||||
return jsonify({"error": e.message}), e.status_code
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_actor_file(
|
||||
file, allowed_types=["pdf"], actor=actor
|
||||
)
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = ocr_pdf_task.delay(
|
||||
input_path, task_id, original_filename, lang,
|
||||
**build_task_tracking_kwargs(actor),
|
||||
)
|
||||
record_accepted_usage(actor, "ocr-pdf", task.id)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "OCR started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
@ocr_bp.route("/languages", methods=["GET"])
|
||||
def ocr_languages_route():
|
||||
"""Return the list of supported OCR languages."""
|
||||
return jsonify({"languages": SUPPORTED_LANGUAGES}), 200
|
||||
80
backend/app/routes/pdf_editor.py
Normal file
80
backend/app/routes/pdf_editor.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""PDF Editor route — apply text annotations to PDFs."""
|
||||
import json
|
||||
|
||||
from flask import Blueprint, request, jsonify, current_app
|
||||
|
||||
from app.extensions import limiter
|
||||
from app.services.policy_service import (
|
||||
assert_quota_available,
|
||||
build_task_tracking_kwargs,
|
||||
PolicyError,
|
||||
record_accepted_usage,
|
||||
resolve_web_actor,
|
||||
validate_actor_file,
|
||||
)
|
||||
from app.utils.file_validator import FileValidationError
|
||||
from app.utils.sanitizer import generate_safe_path
|
||||
from app.tasks.pdf_editor_tasks import edit_pdf_task
|
||||
|
||||
pdf_editor_bp = Blueprint("pdf_editor", __name__)
|
||||
|
||||
|
||||
@pdf_editor_bp.route("/edit", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def edit_pdf_route():
|
||||
"""Apply text annotations to a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'edits': JSON string — array of edit objects
|
||||
Each edit: { type: "text", page: 1, x: 100, y: 200, content: "Hello", fontSize: 14, color: "#000000" }
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if not current_app.config.get("FEATURE_EDITOR", False):
|
||||
return jsonify({"error": "This feature is not enabled."}), 403
|
||||
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
edits_raw = request.form.get("edits", "[]")
|
||||
|
||||
try:
|
||||
edits = json.loads(edits_raw)
|
||||
if not isinstance(edits, list):
|
||||
return jsonify({"error": "Edits must be a JSON array."}), 400
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return jsonify({"error": "Invalid JSON in 'edits' field."}), 400
|
||||
|
||||
if not edits:
|
||||
return jsonify({"error": "At least one edit is required."}), 400
|
||||
|
||||
if len(edits) > 500:
|
||||
return jsonify({"error": "Maximum 500 edits allowed."}), 400
|
||||
|
||||
actor = resolve_web_actor()
|
||||
try:
|
||||
assert_quota_available(actor)
|
||||
except PolicyError as e:
|
||||
return jsonify({"error": e.message}), e.status_code
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_actor_file(
|
||||
file, allowed_types=["pdf"], actor=actor
|
||||
)
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = edit_pdf_task.delay(
|
||||
input_path, task_id, original_filename, edits,
|
||||
**build_task_tracking_kwargs(actor),
|
||||
)
|
||||
record_accepted_usage(actor, "pdf-edit", task.id)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "PDF editing started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
64
backend/app/routes/removebg.py
Normal file
64
backend/app/routes/removebg.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Background removal route."""
|
||||
from flask import Blueprint, request, jsonify, current_app
|
||||
|
||||
from app.extensions import limiter
|
||||
from app.services.policy_service import (
|
||||
assert_quota_available,
|
||||
build_task_tracking_kwargs,
|
||||
PolicyError,
|
||||
record_accepted_usage,
|
||||
resolve_web_actor,
|
||||
validate_actor_file,
|
||||
)
|
||||
from app.utils.file_validator import FileValidationError
|
||||
from app.utils.sanitizer import generate_safe_path
|
||||
from app.tasks.removebg_tasks import remove_bg_task
|
||||
|
||||
removebg_bp = Blueprint("removebg", __name__)
|
||||
|
||||
ALLOWED_IMAGE_TYPES = ["png", "jpg", "jpeg", "webp"]
|
||||
|
||||
|
||||
@removebg_bp.route("", methods=["POST"])
|
||||
@limiter.limit("5/minute")
|
||||
def remove_bg_route():
|
||||
"""Remove the background from an image.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': Image file (PNG, JPG, JPEG, WebP)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if not current_app.config.get("FEATURE_EDITOR", False):
|
||||
return jsonify({"error": "This feature is not enabled."}), 403
|
||||
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
|
||||
actor = resolve_web_actor()
|
||||
try:
|
||||
assert_quota_available(actor)
|
||||
except PolicyError as e:
|
||||
return jsonify({"error": e.message}), e.status_code
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_actor_file(
|
||||
file, allowed_types=ALLOWED_IMAGE_TYPES, actor=actor
|
||||
)
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = remove_bg_task.delay(
|
||||
input_path, task_id, original_filename,
|
||||
**build_task_tracking_kwargs(actor),
|
||||
)
|
||||
record_accepted_usage(actor, "remove-bg", task.id)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Background removal started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
121
backend/app/services/ocr_service.py
Normal file
121
backend/app/services/ocr_service.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""OCR service — extract text from images and PDFs using Tesseract."""
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OCRError(Exception):
|
||||
"""Custom exception for OCR failures."""
|
||||
pass
|
||||
|
||||
|
||||
# Tesseract language codes
|
||||
SUPPORTED_LANGUAGES = {
|
||||
"eng": "English",
|
||||
"ara": "Arabic",
|
||||
"fra": "French",
|
||||
}
|
||||
|
||||
DEFAULT_LANG = "eng"
|
||||
|
||||
|
||||
def _get_tesseract_cmd() -> str:
|
||||
"""Return the tesseract binary path."""
|
||||
return os.getenv("TESSERACT_CMD", "tesseract")
|
||||
|
||||
|
||||
def ocr_image(input_path: str, lang: str = DEFAULT_LANG) -> dict:
|
||||
"""Extract text from an image file using Tesseract.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input image.
|
||||
lang: Tesseract language code (e.g. "eng", "ara", "fra").
|
||||
|
||||
Returns:
|
||||
dict with ``text``, ``lang``, ``char_count``.
|
||||
|
||||
Raises:
|
||||
OCRError: If the OCR operation fails.
|
||||
"""
|
||||
if lang not in SUPPORTED_LANGUAGES:
|
||||
lang = DEFAULT_LANG
|
||||
|
||||
try:
|
||||
import pytesseract
|
||||
|
||||
pytesseract.pytesseract.tesseract_cmd = _get_tesseract_cmd()
|
||||
|
||||
with Image.open(input_path) as img:
|
||||
# Convert to RGB if needed (tesseract works best with RGB)
|
||||
if img.mode not in ("RGB", "L"):
|
||||
img = img.convert("RGB")
|
||||
text = pytesseract.image_to_string(img, lang=lang)
|
||||
|
||||
text = text.strip()
|
||||
return {
|
||||
"text": text,
|
||||
"lang": lang,
|
||||
"char_count": len(text),
|
||||
}
|
||||
except ImportError:
|
||||
raise OCRError("pytesseract is not installed.")
|
||||
except Exception as e:
|
||||
raise OCRError(f"OCR failed: {str(e)}")
|
||||
|
||||
|
||||
def ocr_pdf(input_path: str, output_path: str, lang: str = DEFAULT_LANG) -> dict:
|
||||
"""Extract text from a scanned PDF by converting pages to images first.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF.
|
||||
output_path: Path for the output text file.
|
||||
lang: Tesseract language code.
|
||||
|
||||
Returns:
|
||||
dict with ``text``, ``page_count``, ``char_count``.
|
||||
|
||||
Raises:
|
||||
OCRError: If the OCR operation fails.
|
||||
"""
|
||||
if lang not in SUPPORTED_LANGUAGES:
|
||||
lang = DEFAULT_LANG
|
||||
|
||||
try:
|
||||
from pdf2image import convert_from_path
|
||||
import pytesseract
|
||||
|
||||
pytesseract.pytesseract.tesseract_cmd = _get_tesseract_cmd()
|
||||
|
||||
images = convert_from_path(input_path, dpi=300)
|
||||
if not images:
|
||||
raise OCRError("Could not convert PDF to images — file may be empty.")
|
||||
|
||||
all_text = []
|
||||
for i, img in enumerate(images, 1):
|
||||
if img.mode not in ("RGB", "L"):
|
||||
img = img.convert("RGB")
|
||||
page_text = pytesseract.image_to_string(img, lang=lang)
|
||||
all_text.append(f"--- Page {i} ---\n{page_text.strip()}")
|
||||
|
||||
full_text = "\n\n".join(all_text)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(full_text)
|
||||
|
||||
return {
|
||||
"text": full_text,
|
||||
"page_count": len(images),
|
||||
"char_count": len(full_text),
|
||||
}
|
||||
except ImportError as e:
|
||||
raise OCRError(f"Missing dependency: {e}")
|
||||
except OCRError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise OCRError(f"PDF OCR failed: {str(e)}")
|
||||
120
backend/app/services/pdf_editor_service.py
Normal file
120
backend/app/services/pdf_editor_service.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""PDF Editor service — add text annotations and simple edits to PDFs."""
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PDFEditorError(Exception):
|
||||
"""Custom exception for PDF editor failures."""
|
||||
pass
|
||||
|
||||
|
||||
def apply_pdf_edits(input_path: str, output_path: str, edits: list[dict]) -> dict:
|
||||
"""Apply a list of edits (text annotations) to an existing PDF.
|
||||
|
||||
Each edit dict can contain:
|
||||
- type: "text"
|
||||
- page: 1-based page number
|
||||
- x, y: position in points from bottom-left
|
||||
- content: text string to place
|
||||
- fontSize: optional, default 12
|
||||
- color: optional hex e.g. "#000000"
|
||||
|
||||
Args:
|
||||
input_path: Path to the source PDF.
|
||||
output_path: Path for the edited PDF.
|
||||
edits: List of edit operation dicts.
|
||||
|
||||
Returns:
|
||||
dict with ``page_count``, ``edits_applied``, ``output_size``.
|
||||
|
||||
Raises:
|
||||
PDFEditorError: If the edit fails.
|
||||
"""
|
||||
if not edits:
|
||||
raise PDFEditorError("No edits provided.")
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.colors import HexColor
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
writer = PdfWriter()
|
||||
page_count = len(reader.pages)
|
||||
|
||||
if page_count == 0:
|
||||
raise PDFEditorError("PDF has no pages.")
|
||||
|
||||
# Group edits by page
|
||||
edits_by_page: dict[int, list[dict]] = {}
|
||||
for edit in edits:
|
||||
page_num = int(edit.get("page", 1))
|
||||
if page_num < 1 or page_num > page_count:
|
||||
continue
|
||||
edits_by_page.setdefault(page_num, []).append(edit)
|
||||
|
||||
edits_applied = 0
|
||||
|
||||
for page_idx in range(page_count):
|
||||
page = reader.pages[page_idx]
|
||||
page_num = page_idx + 1
|
||||
page_edits = edits_by_page.get(page_num, [])
|
||||
|
||||
if page_edits:
|
||||
# Get page dimensions
|
||||
media_box = page.mediabox
|
||||
page_width = float(media_box.width)
|
||||
page_height = float(media_box.height)
|
||||
|
||||
# Create overlay with annotations
|
||||
packet = io.BytesIO()
|
||||
c = canvas.Canvas(packet, pagesize=(page_width, page_height))
|
||||
|
||||
for edit in page_edits:
|
||||
edit_type = edit.get("type", "text")
|
||||
if edit_type == "text":
|
||||
x = float(edit.get("x", 72))
|
||||
y = float(edit.get("y", 72))
|
||||
content = str(edit.get("content", ""))
|
||||
font_size = int(edit.get("fontSize", 12))
|
||||
color = str(edit.get("color", "#000000"))
|
||||
|
||||
try:
|
||||
c.setFillColor(HexColor(color))
|
||||
except Exception:
|
||||
c.setFillColor(HexColor("#000000"))
|
||||
|
||||
c.setFont("Helvetica", font_size)
|
||||
c.drawString(x, y, content)
|
||||
edits_applied += 1
|
||||
|
||||
c.save()
|
||||
packet.seek(0)
|
||||
|
||||
overlay_reader = PdfReader(packet)
|
||||
if len(overlay_reader.pages) > 0:
|
||||
page.merge_page(overlay_reader.pages[0])
|
||||
|
||||
writer.add_page(page)
|
||||
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
output_size = os.path.getsize(output_path)
|
||||
|
||||
return {
|
||||
"page_count": page_count,
|
||||
"edits_applied": edits_applied,
|
||||
"output_size": output_size,
|
||||
}
|
||||
|
||||
except PDFEditorError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFEditorError(f"PDF editing failed: {str(e)}")
|
||||
60
backend/app/services/removebg_service.py
Normal file
60
backend/app/services/removebg_service.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""Background removal service using rembg."""
|
||||
import logging
|
||||
import os
|
||||
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RemoveBGError(Exception):
|
||||
"""Custom exception for background removal failures."""
|
||||
pass
|
||||
|
||||
|
||||
def remove_background(input_path: str, output_path: str) -> dict:
|
||||
"""Remove the background from an image.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input image.
|
||||
output_path: Path for the output PNG (always PNG — transparency).
|
||||
|
||||
Returns:
|
||||
dict with ``original_size``, ``output_size``, ``width``, ``height``.
|
||||
|
||||
Raises:
|
||||
RemoveBGError: If the operation fails.
|
||||
"""
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
try:
|
||||
from rembg import remove as rembg_remove
|
||||
|
||||
with Image.open(input_path) as img:
|
||||
if img.mode != "RGBA":
|
||||
img = img.convert("RGBA")
|
||||
width, height = img.size
|
||||
original_size = os.path.getsize(input_path)
|
||||
|
||||
result = rembg_remove(img)
|
||||
result.save(output_path, format="PNG", optimize=True)
|
||||
|
||||
output_size = os.path.getsize(output_path)
|
||||
|
||||
logger.info(
|
||||
"Background removed: %s → %s (%d → %d bytes)",
|
||||
input_path, output_path, original_size, output_size,
|
||||
)
|
||||
|
||||
return {
|
||||
"original_size": original_size,
|
||||
"output_size": output_size,
|
||||
"width": width,
|
||||
"height": height,
|
||||
}
|
||||
except ImportError:
|
||||
raise RemoveBGError("rembg is not installed.")
|
||||
except (IOError, OSError) as e:
|
||||
raise RemoveBGError(f"Background removal failed: {str(e)}")
|
||||
except Exception as e:
|
||||
raise RemoveBGError(f"Background removal failed: {str(e)}")
|
||||
159
backend/app/tasks/ocr_tasks.py
Normal file
159
backend/app/tasks/ocr_tasks.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""Celery tasks for OCR processing."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from app.extensions import celery
|
||||
from app.services.ocr_service import ocr_image, ocr_pdf, OCRError
|
||||
from app.services.storage_service import storage
|
||||
from app.services.task_tracking_service import finalize_task_tracking
|
||||
from app.utils.sanitizer import cleanup_task_files
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _cleanup(task_id: str):
|
||||
cleanup_task_files(task_id, keep_outputs=not storage.use_s3)
|
||||
|
||||
|
||||
def _get_output_dir(task_id: str) -> str:
|
||||
output_dir = os.path.join(current_app.config["OUTPUT_FOLDER"], task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
return output_dir
|
||||
|
||||
|
||||
def _finalize_task(
|
||||
task_id, user_id, tool, original_filename, result,
|
||||
usage_source, api_key_id, celery_task_id,
|
||||
):
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool=tool, original_filename=original_filename,
|
||||
result=result, usage_source=usage_source,
|
||||
api_key_id=api_key_id, celery_task_id=celery_task_id,
|
||||
)
|
||||
_cleanup(task_id)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(bind=True, name="app.tasks.ocr_tasks.ocr_image_task")
|
||||
def ocr_image_task(
|
||||
self,
|
||||
input_path: str,
|
||||
task_id: str,
|
||||
original_filename: str,
|
||||
lang: str = "eng",
|
||||
user_id: int | None = None,
|
||||
usage_source: str = "web",
|
||||
api_key_id: int | None = None,
|
||||
):
|
||||
"""Async task: Extract text from an image via OCR."""
|
||||
output_dir = _get_output_dir(task_id)
|
||||
output_path = os.path.join(output_dir, f"{task_id}.txt")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Running OCR on image..."})
|
||||
|
||||
stats = ocr_image(input_path, lang=lang)
|
||||
|
||||
# Write text to file for download
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(stats["text"])
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_ocr.txt"
|
||||
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"text": stats["text"][:5000], # preview (first 5k chars)
|
||||
"char_count": stats["char_count"],
|
||||
"lang": stats["lang"],
|
||||
}
|
||||
|
||||
logger.info("Task %s: OCR image completed (%d chars)", task_id, stats["char_count"])
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-image", original_filename,
|
||||
result, usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
|
||||
except OCRError as e:
|
||||
logger.error("Task %s: OCR error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-image", original_filename,
|
||||
{"status": "failed", "error": str(e)},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Task %s: Unexpected error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-image", original_filename,
|
||||
{"status": "failed", "error": "An unexpected error occurred."},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
|
||||
|
||||
@celery.task(bind=True, name="app.tasks.ocr_tasks.ocr_pdf_task")
|
||||
def ocr_pdf_task(
|
||||
self,
|
||||
input_path: str,
|
||||
task_id: str,
|
||||
original_filename: str,
|
||||
lang: str = "eng",
|
||||
user_id: int | None = None,
|
||||
usage_source: str = "web",
|
||||
api_key_id: int | None = None,
|
||||
):
|
||||
"""Async task: Extract text from a scanned PDF via OCR."""
|
||||
output_dir = _get_output_dir(task_id)
|
||||
output_path = os.path.join(output_dir, f"{task_id}.txt")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Converting PDF pages & running OCR..."})
|
||||
|
||||
stats = ocr_pdf(input_path, output_path, lang=lang)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_ocr.txt"
|
||||
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"text": stats["text"][:5000],
|
||||
"page_count": stats["page_count"],
|
||||
"char_count": stats["char_count"],
|
||||
"lang": lang,
|
||||
}
|
||||
|
||||
logger.info("Task %s: OCR PDF completed (%d pages, %d chars)", task_id, stats["page_count"], stats["char_count"])
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-pdf", original_filename,
|
||||
result, usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
|
||||
except OCRError as e:
|
||||
logger.error("Task %s: OCR error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-pdf", original_filename,
|
||||
{"status": "failed", "error": str(e)},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Task %s: Unexpected error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "ocr-pdf", original_filename,
|
||||
{"status": "failed", "error": "An unexpected error occurred."},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
95
backend/app/tasks/pdf_editor_tasks.py
Normal file
95
backend/app/tasks/pdf_editor_tasks.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""Celery tasks for PDF editing."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from app.extensions import celery
|
||||
from app.services.pdf_editor_service import apply_pdf_edits, PDFEditorError
|
||||
from app.services.storage_service import storage
|
||||
from app.services.task_tracking_service import finalize_task_tracking
|
||||
from app.utils.sanitizer import cleanup_task_files
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _cleanup(task_id: str):
|
||||
cleanup_task_files(task_id, keep_outputs=not storage.use_s3)
|
||||
|
||||
|
||||
def _get_output_dir(task_id: str) -> str:
|
||||
output_dir = os.path.join(current_app.config["OUTPUT_FOLDER"], task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
return output_dir
|
||||
|
||||
|
||||
def _finalize_task(
|
||||
task_id, user_id, tool, original_filename, result,
|
||||
usage_source, api_key_id, celery_task_id,
|
||||
):
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool=tool, original_filename=original_filename,
|
||||
result=result, usage_source=usage_source,
|
||||
api_key_id=api_key_id, celery_task_id=celery_task_id,
|
||||
)
|
||||
_cleanup(task_id)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(bind=True, name="app.tasks.pdf_editor_tasks.edit_pdf_task")
|
||||
def edit_pdf_task(
|
||||
self,
|
||||
input_path: str,
|
||||
task_id: str,
|
||||
original_filename: str,
|
||||
edits: list[dict],
|
||||
user_id: int | None = None,
|
||||
usage_source: str = "web",
|
||||
api_key_id: int | None = None,
|
||||
):
|
||||
"""Async task: Apply text annotations to a PDF."""
|
||||
output_dir = _get_output_dir(task_id)
|
||||
output_path = os.path.join(output_dir, f"{task_id}.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Applying edits to PDF..."})
|
||||
|
||||
stats = apply_pdf_edits(input_path, output_path, edits)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_edited.pdf"
|
||||
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"page_count": stats["page_count"],
|
||||
"edits_applied": stats["edits_applied"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
logger.info("Task %s: PDF edit completed (%d edits)", task_id, stats["edits_applied"])
|
||||
return _finalize_task(
|
||||
task_id, user_id, "pdf-edit", original_filename,
|
||||
result, usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
|
||||
except PDFEditorError as e:
|
||||
logger.error("Task %s: PDF edit error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "pdf-edit", original_filename,
|
||||
{"status": "failed", "error": str(e)},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Task %s: Unexpected error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "pdf-edit", original_filename,
|
||||
{"status": "failed", "error": "An unexpected error occurred."},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
95
backend/app/tasks/removebg_tasks.py
Normal file
95
backend/app/tasks/removebg_tasks.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""Celery tasks for background removal."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from app.extensions import celery
|
||||
from app.services.removebg_service import remove_background, RemoveBGError
|
||||
from app.services.storage_service import storage
|
||||
from app.services.task_tracking_service import finalize_task_tracking
|
||||
from app.utils.sanitizer import cleanup_task_files
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _cleanup(task_id: str):
|
||||
cleanup_task_files(task_id, keep_outputs=not storage.use_s3)
|
||||
|
||||
|
||||
def _get_output_dir(task_id: str) -> str:
|
||||
output_dir = os.path.join(current_app.config["OUTPUT_FOLDER"], task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
return output_dir
|
||||
|
||||
|
||||
def _finalize_task(
|
||||
task_id, user_id, tool, original_filename, result,
|
||||
usage_source, api_key_id, celery_task_id,
|
||||
):
|
||||
finalize_task_tracking(
|
||||
user_id=user_id, tool=tool, original_filename=original_filename,
|
||||
result=result, usage_source=usage_source,
|
||||
api_key_id=api_key_id, celery_task_id=celery_task_id,
|
||||
)
|
||||
_cleanup(task_id)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(bind=True, name="app.tasks.removebg_tasks.remove_bg_task")
|
||||
def remove_bg_task(
|
||||
self,
|
||||
input_path: str,
|
||||
task_id: str,
|
||||
original_filename: str,
|
||||
user_id: int | None = None,
|
||||
usage_source: str = "web",
|
||||
api_key_id: int | None = None,
|
||||
):
|
||||
"""Async task: Remove background from an image."""
|
||||
output_dir = _get_output_dir(task_id)
|
||||
output_path = os.path.join(output_dir, f"{task_id}.png")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Removing background..."})
|
||||
|
||||
stats = remove_background(input_path, output_path)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_nobg.png"
|
||||
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"original_size": stats["original_size"],
|
||||
"output_size": stats["output_size"],
|
||||
"width": stats["width"],
|
||||
"height": stats["height"],
|
||||
}
|
||||
|
||||
logger.info("Task %s: Background removal completed", task_id)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "remove-bg", original_filename,
|
||||
result, usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
|
||||
except RemoveBGError as e:
|
||||
logger.error("Task %s: RemoveBG error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "remove-bg", original_filename,
|
||||
{"status": "failed", "error": str(e)},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Task %s: Unexpected error — %s", task_id, e)
|
||||
return _finalize_task(
|
||||
task_id, user_id, "remove-bg", original_filename,
|
||||
{"status": "failed", "error": "An unexpected error occurred."},
|
||||
usage_source, api_key_id, self.request.id,
|
||||
)
|
||||
@@ -12,3 +12,6 @@ import app.tasks.video_tasks # noqa: F401
|
||||
import app.tasks.pdf_tools_tasks # noqa: F401
|
||||
import app.tasks.flowchart_tasks # noqa: F401
|
||||
import app.tasks.maintenance_tasks # noqa: F401
|
||||
import app.tasks.ocr_tasks # noqa: F401
|
||||
import app.tasks.removebg_tasks # noqa: F401
|
||||
import app.tasks.pdf_editor_tasks # noqa: F401
|
||||
|
||||
@@ -21,6 +21,13 @@ PyPDF2>=3.0,<4.0
|
||||
reportlab>=4.0,<5.0
|
||||
pdf2image>=1.16,<2.0
|
||||
|
||||
# OCR
|
||||
pytesseract>=0.3.10,<1.0
|
||||
|
||||
# Background Removal
|
||||
rembg>=2.0,<3.0
|
||||
onnxruntime>=1.16,<2.0
|
||||
|
||||
# AWS
|
||||
boto3>=1.34,<2.0
|
||||
|
||||
|
||||
163
backend/tests/test_ocr.py
Normal file
163
backend/tests/test_ocr.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""Tests for OCR routes — /api/ocr/image, /api/ocr/pdf, /api/ocr/languages."""
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from tests.conftest import make_png_bytes, make_pdf_bytes
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature flag enforcement
|
||||
# =========================================================================
|
||||
class TestOcrFeatureFlag:
|
||||
def test_ocr_image_disabled_by_default(self, client):
|
||||
"""OCR image should return 403 when FEATURE_EDITOR is off."""
|
||||
data = {"file": (io.BytesIO(make_png_bytes()), "test.png")}
|
||||
response = client.post(
|
||||
"/api/ocr/image",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert "not enabled" in response.get_json()["error"]
|
||||
|
||||
def test_ocr_pdf_disabled_by_default(self, client):
|
||||
"""OCR PDF should return 403 when FEATURE_EDITOR is off."""
|
||||
data = {"file": (io.BytesIO(make_pdf_bytes()), "scan.pdf")}
|
||||
response = client.post(
|
||||
"/api/ocr/pdf",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
def test_languages_always_available(self, client):
|
||||
"""GET /api/ocr/languages should work even when feature is disabled."""
|
||||
response = client.get("/api/ocr/languages")
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
langs = data["languages"]
|
||||
assert "eng" in langs
|
||||
assert "ara" in langs
|
||||
assert "fra" in langs
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Validation
|
||||
# =========================================================================
|
||||
class TestOcrValidation:
|
||||
def test_ocr_image_no_file(self, client, app):
|
||||
"""Should return 400 when no file provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
response = client.post("/api/ocr/image")
|
||||
assert response.status_code == 400
|
||||
assert "No file" in response.get_json()["error"]
|
||||
|
||||
def test_ocr_pdf_no_file(self, client, app):
|
||||
"""Should return 400 when no file provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
response = client.post("/api/ocr/pdf")
|
||||
assert response.status_code == 400
|
||||
assert "No file" in response.get_json()["error"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Success paths
|
||||
# =========================================================================
|
||||
class TestOcrSuccess:
|
||||
def test_ocr_image_success(self, client, app, monkeypatch):
|
||||
"""Should return 202 with task_id when valid image provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = "ocr-img-task-1"
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
save_path = os.path.join(tmp_dir, "mock.png")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.validate_actor_file",
|
||||
lambda f, allowed_types, actor: ("test.png", "png"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.generate_safe_path",
|
||||
lambda ext, folder_type: ("mock-id", save_path),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.ocr_image_task.delay",
|
||||
MagicMock(return_value=mock_task),
|
||||
)
|
||||
|
||||
data = {"file": (io.BytesIO(make_png_bytes()), "test.png"), "lang": "eng"}
|
||||
response = client.post(
|
||||
"/api/ocr/image",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 202
|
||||
body = response.get_json()
|
||||
assert body["task_id"] == "ocr-img-task-1"
|
||||
|
||||
def test_ocr_pdf_success(self, client, app, monkeypatch):
|
||||
"""Should return 202 with task_id when valid PDF provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = "ocr-pdf-task-1"
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
save_path = os.path.join(tmp_dir, "mock.pdf")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.validate_actor_file",
|
||||
lambda f, allowed_types, actor: ("scan.pdf", "pdf"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.generate_safe_path",
|
||||
lambda ext, folder_type: ("mock-id", save_path),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.ocr_pdf_task.delay",
|
||||
MagicMock(return_value=mock_task),
|
||||
)
|
||||
|
||||
data = {"file": (io.BytesIO(make_pdf_bytes()), "scan.pdf"), "lang": "ara"}
|
||||
response = client.post(
|
||||
"/api/ocr/pdf",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 202
|
||||
body = response.get_json()
|
||||
assert body["task_id"] == "ocr-pdf-task-1"
|
||||
|
||||
def test_ocr_image_invalid_lang_falls_back(self, client, app, monkeypatch):
|
||||
"""Invalid lang should fall back to 'eng' without error."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = "ocr-lang-task"
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
save_path = os.path.join(tmp_dir, "mock.png")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.validate_actor_file",
|
||||
lambda f, allowed_types, actor: ("test.png", "png"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.ocr.generate_safe_path",
|
||||
lambda ext, folder_type: ("mock-id", save_path),
|
||||
)
|
||||
mock_delay = MagicMock(return_value=mock_task)
|
||||
monkeypatch.setattr("app.routes.ocr.ocr_image_task.delay", mock_delay)
|
||||
|
||||
data = {"file": (io.BytesIO(make_png_bytes()), "test.png"), "lang": "invalid"}
|
||||
response = client.post(
|
||||
"/api/ocr/image",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 202
|
||||
# Verify 'eng' was passed to the task
|
||||
call_args = mock_delay.call_args
|
||||
assert call_args[0][3] == "eng" # 4th positional arg is lang
|
||||
66
backend/tests/test_ocr_service.py
Normal file
66
backend/tests/test_ocr_service.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Tests for OCR service and PDF editor service — unit tests with mocking."""
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from app.services.ocr_service import ocr_image, OCRError, SUPPORTED_LANGUAGES
|
||||
|
||||
|
||||
class TestOcrServiceConstants:
|
||||
def test_supported_languages(self):
|
||||
"""Verify the supported languages dict."""
|
||||
assert "eng" in SUPPORTED_LANGUAGES
|
||||
assert "ara" in SUPPORTED_LANGUAGES
|
||||
assert "fra" in SUPPORTED_LANGUAGES
|
||||
assert len(SUPPORTED_LANGUAGES) == 3
|
||||
|
||||
|
||||
class TestOcrImage:
|
||||
def test_ocr_image_success(self):
|
||||
"""Should return text and char_count from image (mocked pytesseract)."""
|
||||
mock_pytesseract = MagicMock()
|
||||
mock_pytesseract.image_to_string.return_value = " Hello World "
|
||||
mock_pytesseract.pytesseract.tesseract_cmd = ""
|
||||
|
||||
mock_img = MagicMock()
|
||||
mock_img.mode = "RGB"
|
||||
mock_img.__enter__ = MagicMock(return_value=mock_img)
|
||||
mock_img.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch.dict(sys.modules, {"pytesseract": mock_pytesseract}):
|
||||
with patch("app.services.ocr_service.Image") as mock_pil:
|
||||
mock_pil.open.return_value = mock_img
|
||||
result = ocr_image("/fake/path.png", lang="eng")
|
||||
|
||||
assert result["text"] == "Hello World"
|
||||
assert result["char_count"] == 11
|
||||
assert result["lang"] == "eng"
|
||||
|
||||
def test_ocr_image_invalid_lang_fallback(self):
|
||||
"""Invalid language should fall back to 'eng'."""
|
||||
mock_pytesseract = MagicMock()
|
||||
mock_pytesseract.image_to_string.return_value = "Test"
|
||||
mock_pytesseract.pytesseract.tesseract_cmd = ""
|
||||
|
||||
mock_img = MagicMock()
|
||||
mock_img.mode = "RGB"
|
||||
mock_img.__enter__ = MagicMock(return_value=mock_img)
|
||||
mock_img.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch.dict(sys.modules, {"pytesseract": mock_pytesseract}):
|
||||
with patch("app.services.ocr_service.Image") as mock_pil:
|
||||
mock_pil.open.return_value = mock_img
|
||||
result = ocr_image("/fake/path.png", lang="zzzz")
|
||||
|
||||
assert result["lang"] == "eng"
|
||||
|
||||
|
||||
class TestPdfEditorService:
|
||||
def test_no_edits_raises(self):
|
||||
"""Should raise PDFEditorError when no edits provided."""
|
||||
from app.services.pdf_editor_service import apply_pdf_edits, PDFEditorError
|
||||
with pytest.raises(PDFEditorError, match="No edits"):
|
||||
apply_pdf_edits("/fake.pdf", "/out.pdf", [])
|
||||
144
backend/tests/test_pdf_editor.py
Normal file
144
backend/tests/test_pdf_editor.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""Tests for PDF editor route — /api/pdf-editor/edit."""
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from tests.conftest import make_pdf_bytes
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature flag enforcement
|
||||
# =========================================================================
|
||||
class TestPdfEditorFeatureFlag:
|
||||
def test_pdf_editor_disabled_by_default(self, client):
|
||||
"""Should return 403 when FEATURE_EDITOR is off."""
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": json.dumps([{"type": "text", "page": 1, "x": 100, "y": 200, "content": "Hello"}]),
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert "not enabled" in response.get_json()["error"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Validation
|
||||
# =========================================================================
|
||||
class TestPdfEditorValidation:
|
||||
def test_pdf_editor_no_file(self, client, app):
|
||||
"""Should return 400 when no file provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
response = client.post("/api/pdf-editor/edit")
|
||||
assert response.status_code == 400
|
||||
assert "No file" in response.get_json()["error"]
|
||||
|
||||
def test_pdf_editor_invalid_json(self, client, app):
|
||||
"""Should return 400 when edits is invalid JSON."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": "not valid json{",
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "Invalid JSON" in response.get_json()["error"]
|
||||
|
||||
def test_pdf_editor_edits_not_array(self, client, app):
|
||||
"""Should return 400 when edits is not an array."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": json.dumps({"type": "text"}),
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "JSON array" in response.get_json()["error"]
|
||||
|
||||
def test_pdf_editor_empty_edits(self, client, app):
|
||||
"""Should return 400 when edits array is empty."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": json.dumps([]),
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "At least one edit" in response.get_json()["error"]
|
||||
|
||||
def test_pdf_editor_too_many_edits(self, client, app):
|
||||
"""Should return 400 when more than 500 edits."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
edits = [{"type": "text", "page": 1, "x": 10, "y": 10, "content": "x"}] * 501
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": json.dumps(edits),
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "500" in response.get_json()["error"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Success paths
|
||||
# =========================================================================
|
||||
class TestPdfEditorSuccess:
|
||||
def test_pdf_editor_success(self, client, app, monkeypatch):
|
||||
"""Should return 202 with task_id when valid request provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = "edit-task-1"
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
save_path = os.path.join(tmp_dir, "mock.pdf")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.routes.pdf_editor.validate_actor_file",
|
||||
lambda f, allowed_types, actor: ("doc.pdf", "pdf"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.pdf_editor.generate_safe_path",
|
||||
lambda ext, folder_type: ("mock-id", save_path),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.pdf_editor.edit_pdf_task.delay",
|
||||
MagicMock(return_value=mock_task),
|
||||
)
|
||||
|
||||
edits = [
|
||||
{"type": "text", "page": 1, "x": 100, "y": 200, "content": "Hello World", "fontSize": 14},
|
||||
]
|
||||
data = {
|
||||
"file": (io.BytesIO(make_pdf_bytes()), "doc.pdf"),
|
||||
"edits": json.dumps(edits),
|
||||
}
|
||||
response = client.post(
|
||||
"/api/pdf-editor/edit",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 202
|
||||
body = response.get_json()
|
||||
assert body["task_id"] == "edit-task-1"
|
||||
assert "PDF editing started" in body["message"]
|
||||
73
backend/tests/test_removebg.py
Normal file
73
backend/tests/test_removebg.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Tests for background removal route — /api/remove-bg."""
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from tests.conftest import make_png_bytes, make_pdf_bytes
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature flag enforcement
|
||||
# =========================================================================
|
||||
class TestRemoveBgFeatureFlag:
|
||||
def test_removebg_disabled_by_default(self, client):
|
||||
"""Should return 403 when FEATURE_EDITOR is off."""
|
||||
data = {"file": (io.BytesIO(make_png_bytes()), "photo.png")}
|
||||
response = client.post(
|
||||
"/api/remove-bg",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert "not enabled" in response.get_json()["error"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Validation
|
||||
# =========================================================================
|
||||
class TestRemoveBgValidation:
|
||||
def test_removebg_no_file(self, client, app):
|
||||
"""Should return 400 when no file provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
response = client.post("/api/remove-bg")
|
||||
assert response.status_code == 400
|
||||
assert "No file" in response.get_json()["error"]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Success paths
|
||||
# =========================================================================
|
||||
class TestRemoveBgSuccess:
|
||||
def test_removebg_success(self, client, app, monkeypatch):
|
||||
"""Should return 202 with task_id when valid image provided."""
|
||||
app.config["FEATURE_EDITOR"] = True
|
||||
mock_task = MagicMock()
|
||||
mock_task.id = "rembg-task-1"
|
||||
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
save_path = os.path.join(tmp_dir, "mock.png")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.routes.removebg.validate_actor_file",
|
||||
lambda f, allowed_types, actor: ("photo.png", "png"),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.removebg.generate_safe_path",
|
||||
lambda ext, folder_type: ("mock-id", save_path),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.routes.removebg.remove_bg_task.delay",
|
||||
MagicMock(return_value=mock_task),
|
||||
)
|
||||
|
||||
data = {"file": (io.BytesIO(make_png_bytes()), "photo.png")}
|
||||
response = client.post(
|
||||
"/api/remove-bg",
|
||||
data=data,
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
assert response.status_code == 202
|
||||
body = response.get_json()
|
||||
assert body["task_id"] == "rembg-task-1"
|
||||
assert "Background removal started" in body["message"]
|
||||
Reference in New Issue
Block a user