feat: add PDF manipulation tools including Protect, Rotate, Split, Unlock, and Watermark functionalities
- Implemented ProtectPdf component for adding password protection to PDFs. - Implemented RotatePdf component for rotating PDF pages by specified angles. - Implemented SplitPdf component for splitting PDFs into individual pages or specified ranges. - Implemented UnlockPdf component for removing password protection from PDFs. - Implemented WatermarkPdf component for adding custom text watermarks to PDFs. - Updated i18n files to include translations for new tools. - Enhanced HomePage to include links to new PDF tools. - Updated Nginx configuration to improve security with CSP and Permissions-Policy headers. - Updated sitemap generation script to include new tools.
This commit is contained in:
@@ -61,12 +61,14 @@ def create_app(config_name=None):
|
||||
from app.routes.video import video_bp
|
||||
from app.routes.tasks import tasks_bp
|
||||
from app.routes.download import download_bp
|
||||
from app.routes.pdf_tools import pdf_tools_bp
|
||||
|
||||
app.register_blueprint(health_bp, url_prefix="/api")
|
||||
app.register_blueprint(convert_bp, url_prefix="/api/convert")
|
||||
app.register_blueprint(compress_bp, url_prefix="/api/compress")
|
||||
app.register_blueprint(image_bp, url_prefix="/api/image")
|
||||
app.register_blueprint(video_bp, url_prefix="/api/video")
|
||||
app.register_blueprint(pdf_tools_bp, url_prefix="/api/pdf-tools")
|
||||
app.register_blueprint(tasks_bp, url_prefix="/api/tasks")
|
||||
app.register_blueprint(download_bp, url_prefix="/api/download")
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ def init_celery(app):
|
||||
"app.tasks.compress_tasks.*": {"queue": "compress"},
|
||||
"app.tasks.image_tasks.*": {"queue": "image"},
|
||||
"app.tasks.video_tasks.*": {"queue": "video"},
|
||||
"app.tasks.pdf_tools_tasks.*": {"queue": "pdf_tools"},
|
||||
}
|
||||
|
||||
class ContextTask(celery.Task):
|
||||
|
||||
427
backend/app/routes/pdf_tools.py
Normal file
427
backend/app/routes/pdf_tools.py
Normal file
@@ -0,0 +1,427 @@
|
||||
"""Extended PDF tool routes — Merge, Split, Rotate, Page Numbers, PDF↔Images, Watermark, Protect/Unlock."""
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from flask import Blueprint, request, jsonify
|
||||
|
||||
from app.extensions import limiter
|
||||
from app.utils.file_validator import validate_file, FileValidationError
|
||||
from app.utils.sanitizer import generate_safe_path
|
||||
from app.tasks.pdf_tools_tasks import (
|
||||
merge_pdfs_task,
|
||||
split_pdf_task,
|
||||
rotate_pdf_task,
|
||||
add_page_numbers_task,
|
||||
pdf_to_images_task,
|
||||
images_to_pdf_task,
|
||||
watermark_pdf_task,
|
||||
protect_pdf_task,
|
||||
unlock_pdf_task,
|
||||
)
|
||||
|
||||
pdf_tools_bp = Blueprint("pdf_tools", __name__)
|
||||
|
||||
ALLOWED_IMAGE_TYPES = ["png", "jpg", "jpeg", "webp"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Merge PDFs — POST /api/pdf-tools/merge
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/merge", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def merge_pdfs_route():
|
||||
"""
|
||||
Merge multiple PDF files into one.
|
||||
|
||||
Accepts: multipart/form-data with multiple 'files' fields (PDF)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
files = request.files.getlist("files")
|
||||
if not files or len(files) < 2:
|
||||
return jsonify({"error": "Please upload at least 2 PDF files."}), 400
|
||||
|
||||
if len(files) > 20:
|
||||
return jsonify({"error": "Maximum 20 files allowed."}), 400
|
||||
|
||||
task_id = str(uuid.uuid4())
|
||||
input_paths = []
|
||||
original_filenames = []
|
||||
|
||||
for f in files:
|
||||
try:
|
||||
original_filename, ext = validate_file(f, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
upload_dir = os.path.join("/tmp/uploads", task_id)
|
||||
os.makedirs(upload_dir, exist_ok=True)
|
||||
file_path = os.path.join(upload_dir, f"{uuid.uuid4()}.{ext}")
|
||||
f.save(file_path)
|
||||
input_paths.append(file_path)
|
||||
original_filenames.append(original_filename)
|
||||
|
||||
task = merge_pdfs_task.delay(input_paths, task_id, original_filenames)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Merge started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Split PDF — POST /api/pdf-tools/split
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/split", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def split_pdf_route():
|
||||
"""
|
||||
Split a PDF into individual pages or a specific range.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'mode' (optional): "all" or "range" (default: "all")
|
||||
- 'pages' (optional): Page spec for range mode, e.g. "1,3,5-8"
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
mode = request.form.get("mode", "all")
|
||||
pages = request.form.get("pages")
|
||||
|
||||
if mode not in ("all", "range"):
|
||||
mode = "all"
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = split_pdf_task.delay(input_path, task_id, original_filename, mode, pages)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Split started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rotate PDF — POST /api/pdf-tools/rotate
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/rotate", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def rotate_pdf_route():
|
||||
"""
|
||||
Rotate pages in a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'rotation': Degrees — 90, 180, or 270 (default: 90)
|
||||
- 'pages' (optional): "all" or comma-separated page numbers (default: "all")
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
|
||||
try:
|
||||
rotation = int(request.form.get("rotation", 90))
|
||||
except ValueError:
|
||||
rotation = 90
|
||||
|
||||
if rotation not in (90, 180, 270):
|
||||
return jsonify({"error": "Rotation must be 90, 180, or 270 degrees."}), 400
|
||||
|
||||
pages = request.form.get("pages", "all")
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = rotate_pdf_task.delay(input_path, task_id, original_filename, rotation, pages)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Rotation started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Add Page Numbers — POST /api/pdf-tools/page-numbers
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/page-numbers", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def add_page_numbers_route():
|
||||
"""
|
||||
Add page numbers to a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'position' (optional): "bottom-center", "bottom-right", "bottom-left",
|
||||
"top-center", "top-right", "top-left" (default: "bottom-center")
|
||||
- 'start_number' (optional): Starting number (default: 1)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
position = request.form.get("position", "bottom-center")
|
||||
valid_positions = [
|
||||
"bottom-center", "bottom-right", "bottom-left",
|
||||
"top-center", "top-right", "top-left",
|
||||
]
|
||||
if position not in valid_positions:
|
||||
position = "bottom-center"
|
||||
|
||||
try:
|
||||
start_number = max(1, int(request.form.get("start_number", 1)))
|
||||
except ValueError:
|
||||
start_number = 1
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = add_page_numbers_task.delay(
|
||||
input_path, task_id, original_filename, position, start_number
|
||||
)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Page numbering started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF to Images — POST /api/pdf-tools/pdf-to-images
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/pdf-to-images", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def pdf_to_images_route():
|
||||
"""
|
||||
Convert PDF pages to images.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'format' (optional): "png" or "jpg" (default: "png")
|
||||
- 'dpi' (optional): Resolution 72-600 (default: 200)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
output_format = request.form.get("format", "png").lower()
|
||||
if output_format not in ("png", "jpg"):
|
||||
output_format = "png"
|
||||
|
||||
try:
|
||||
dpi = max(72, min(600, int(request.form.get("dpi", 200))))
|
||||
except ValueError:
|
||||
dpi = 200
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = pdf_to_images_task.delay(
|
||||
input_path, task_id, original_filename, output_format, dpi
|
||||
)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Conversion started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Images to PDF — POST /api/pdf-tools/images-to-pdf
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/images-to-pdf", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def images_to_pdf_route():
|
||||
"""
|
||||
Convert multiple images to a single PDF.
|
||||
|
||||
Accepts: multipart/form-data with multiple 'files' fields (images)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
files = request.files.getlist("files")
|
||||
if not files or len(files) < 1:
|
||||
return jsonify({"error": "Please upload at least 1 image."}), 400
|
||||
|
||||
if len(files) > 50:
|
||||
return jsonify({"error": "Maximum 50 images allowed."}), 400
|
||||
|
||||
task_id = str(uuid.uuid4())
|
||||
input_paths = []
|
||||
original_filenames = []
|
||||
|
||||
for f in files:
|
||||
try:
|
||||
original_filename, ext = validate_file(f, allowed_types=ALLOWED_IMAGE_TYPES)
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
upload_dir = os.path.join("/tmp/uploads", task_id)
|
||||
os.makedirs(upload_dir, exist_ok=True)
|
||||
file_path = os.path.join(upload_dir, f"{uuid.uuid4()}.{ext}")
|
||||
f.save(file_path)
|
||||
input_paths.append(file_path)
|
||||
original_filenames.append(original_filename)
|
||||
|
||||
task = images_to_pdf_task.delay(input_paths, task_id, original_filenames)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Conversion started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Watermark PDF — POST /api/pdf-tools/watermark
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/watermark", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def watermark_pdf_route():
|
||||
"""
|
||||
Add a text watermark to a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'text': Watermark text
|
||||
- 'opacity' (optional): 0.1-1.0 (default: 0.3)
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
watermark_text = request.form.get("text", "").strip()
|
||||
|
||||
if not watermark_text:
|
||||
return jsonify({"error": "Watermark text is required."}), 400
|
||||
|
||||
if len(watermark_text) > 100:
|
||||
return jsonify({"error": "Watermark text must be 100 characters or less."}), 400
|
||||
|
||||
try:
|
||||
opacity = max(0.1, min(1.0, float(request.form.get("opacity", 0.3))))
|
||||
except ValueError:
|
||||
opacity = 0.3
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = watermark_pdf_task.delay(
|
||||
input_path, task_id, original_filename, watermark_text, opacity
|
||||
)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Watermarking started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Protect PDF — POST /api/pdf-tools/protect
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/protect", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def protect_pdf_route():
|
||||
"""
|
||||
Add password protection to a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'password': Password to set
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
password = request.form.get("password", "").strip()
|
||||
|
||||
if not password:
|
||||
return jsonify({"error": "Password is required."}), 400
|
||||
|
||||
if len(password) < 4:
|
||||
return jsonify({"error": "Password must be at least 4 characters."}), 400
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = protect_pdf_task.delay(input_path, task_id, original_filename, password)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Protection started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unlock PDF — POST /api/pdf-tools/unlock
|
||||
# ---------------------------------------------------------------------------
|
||||
@pdf_tools_bp.route("/unlock", methods=["POST"])
|
||||
@limiter.limit("10/minute")
|
||||
def unlock_pdf_route():
|
||||
"""
|
||||
Remove password protection from a PDF.
|
||||
|
||||
Accepts: multipart/form-data with:
|
||||
- 'file': PDF file
|
||||
- 'password': Current password of the PDF
|
||||
Returns: JSON with task_id for polling
|
||||
"""
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "No file provided."}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
password = request.form.get("password", "").strip()
|
||||
|
||||
if not password:
|
||||
return jsonify({"error": "Password is required."}), 400
|
||||
|
||||
try:
|
||||
original_filename, ext = validate_file(file, allowed_types=["pdf"])
|
||||
except FileValidationError as e:
|
||||
return jsonify({"error": e.message}), e.code
|
||||
|
||||
task_id, input_path = generate_safe_path(ext, folder_type="upload")
|
||||
file.save(input_path)
|
||||
|
||||
task = unlock_pdf_task.delay(input_path, task_id, original_filename, password)
|
||||
|
||||
return jsonify({
|
||||
"task_id": task.id,
|
||||
"message": "Unlock started. Poll /api/tasks/{task_id}/status for progress.",
|
||||
}), 202
|
||||
652
backend/app/services/pdf_tools_service.py
Normal file
652
backend/app/services/pdf_tools_service.py
Normal file
@@ -0,0 +1,652 @@
|
||||
"""Extended PDF tools service — Merge, Split, Rotate, Page Numbers, PDF↔Images."""
|
||||
import os
|
||||
import io
|
||||
import logging
|
||||
import subprocess
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PDFToolsError(Exception):
|
||||
"""Custom exception for PDF tools failures."""
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Merge PDFs
|
||||
# ---------------------------------------------------------------------------
|
||||
def merge_pdfs(input_paths: list[str], output_path: str) -> dict:
|
||||
"""
|
||||
Merge multiple PDF files into a single PDF.
|
||||
|
||||
Args:
|
||||
input_paths: List of paths to PDF files (in order)
|
||||
output_path: Path for the merged output PDF
|
||||
|
||||
Returns:
|
||||
dict with total_pages and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If merge fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
|
||||
writer = PdfWriter()
|
||||
total_pages = 0
|
||||
|
||||
for path in input_paths:
|
||||
if not os.path.exists(path):
|
||||
raise PDFToolsError(f"File not found: {os.path.basename(path)}")
|
||||
reader = PdfReader(path)
|
||||
for page in reader.pages:
|
||||
writer.add_page(page)
|
||||
total_pages += 1
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
output_size = os.path.getsize(output_path)
|
||||
logger.info(f"Merged {len(input_paths)} PDFs → {total_pages} pages ({output_size} bytes)")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"files_merged": len(input_paths),
|
||||
"output_size": output_size,
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to merge PDFs: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Split PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
def split_pdf(
|
||||
input_path: str,
|
||||
output_dir: str,
|
||||
mode: str = "all",
|
||||
pages: str | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Split a PDF into individual pages or a specific range.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_dir: Directory for the output files
|
||||
mode: "all" (every page) or "range" (specific pages)
|
||||
pages: Page specification for range mode, e.g. "1,3,5-8"
|
||||
|
||||
Returns:
|
||||
dict with output_files list, total_pages, and zip_path
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If split fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
reader = PdfReader(input_path)
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
if total_pages == 0:
|
||||
raise PDFToolsError("PDF has no pages.")
|
||||
|
||||
# Determine which pages to extract
|
||||
if mode == "range" and pages:
|
||||
page_indices = _parse_page_range(pages, total_pages)
|
||||
else:
|
||||
page_indices = list(range(total_pages))
|
||||
|
||||
output_files = []
|
||||
for idx in page_indices:
|
||||
writer = PdfWriter()
|
||||
writer.add_page(reader.pages[idx])
|
||||
|
||||
page_num = idx + 1
|
||||
out_path = os.path.join(output_dir, f"page_{page_num}.pdf")
|
||||
with open(out_path, "wb") as f:
|
||||
writer.write(f)
|
||||
output_files.append(out_path)
|
||||
|
||||
# Create a ZIP of all output files
|
||||
zip_path = os.path.join(output_dir, "split_pages.zip")
|
||||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for fpath in output_files:
|
||||
zf.write(fpath, os.path.basename(fpath))
|
||||
|
||||
logger.info(f"Split PDF: {total_pages} pages → {len(output_files)} files")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"extracted_pages": len(output_files),
|
||||
"output_size": os.path.getsize(zip_path),
|
||||
"zip_path": zip_path,
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to split PDF: {str(e)}")
|
||||
|
||||
|
||||
def _parse_page_range(spec: str, total: int) -> list[int]:
|
||||
"""Parse a page specification like '1,3,5-8' into 0-based indices."""
|
||||
indices = set()
|
||||
for part in spec.split(","):
|
||||
part = part.strip()
|
||||
if "-" in part:
|
||||
start_s, end_s = part.split("-", 1)
|
||||
start = max(1, int(start_s.strip()))
|
||||
end = min(total, int(end_s.strip()))
|
||||
indices.update(range(start - 1, end))
|
||||
else:
|
||||
page = int(part)
|
||||
if 1 <= page <= total:
|
||||
indices.add(page - 1)
|
||||
if not indices:
|
||||
raise PDFToolsError("No valid pages specified.")
|
||||
return sorted(indices)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Rotate PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
def rotate_pdf(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
rotation: int = 90,
|
||||
pages: str = "all",
|
||||
) -> dict:
|
||||
"""
|
||||
Rotate pages in a PDF.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_path: Path for the rotated output PDF
|
||||
rotation: Degrees to rotate (90, 180, 270)
|
||||
pages: "all" or comma-separated page numbers (1-based)
|
||||
|
||||
Returns:
|
||||
dict with total_pages and rotated_pages
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If rotation fails
|
||||
"""
|
||||
if rotation not in (90, 180, 270):
|
||||
raise PDFToolsError("Rotation must be 90, 180, or 270 degrees.")
|
||||
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
writer = PdfWriter()
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
# Determine which pages to rotate
|
||||
if pages == "all":
|
||||
rotate_indices = set(range(total_pages))
|
||||
else:
|
||||
rotate_indices = set()
|
||||
for part in pages.split(","):
|
||||
part = part.strip()
|
||||
page = int(part)
|
||||
if 1 <= page <= total_pages:
|
||||
rotate_indices.add(page - 1)
|
||||
|
||||
rotated_count = 0
|
||||
for i, page in enumerate(reader.pages):
|
||||
if i in rotate_indices:
|
||||
page.rotate(rotation)
|
||||
rotated_count += 1
|
||||
writer.add_page(page)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
logger.info(f"Rotated {rotated_count}/{total_pages} pages by {rotation}°")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"rotated_pages": rotated_count,
|
||||
"rotation": rotation,
|
||||
"output_size": os.path.getsize(output_path),
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to rotate PDF: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Add Page Numbers
|
||||
# ---------------------------------------------------------------------------
|
||||
def add_page_numbers(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
position: str = "bottom-center",
|
||||
start_number: int = 1,
|
||||
) -> dict:
|
||||
"""
|
||||
Add page numbers to a PDF.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_path: Path for the numbered output PDF
|
||||
position: Number position — "bottom-center", "bottom-right", "bottom-left",
|
||||
"top-center", "top-right", "top-left"
|
||||
start_number: Starting page number
|
||||
|
||||
Returns:
|
||||
dict with total_pages and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If numbering fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.units import mm
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
writer = PdfWriter()
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
for i, page in enumerate(reader.pages):
|
||||
page_num = start_number + i
|
||||
page_width = float(page.mediabox.width)
|
||||
page_height = float(page.mediabox.height)
|
||||
|
||||
# Create overlay with page number
|
||||
packet = io.BytesIO()
|
||||
c = canvas.Canvas(packet, pagesize=(page_width, page_height))
|
||||
c.setFont("Helvetica", 10)
|
||||
|
||||
# Calculate position
|
||||
x, y = _get_number_position(position, page_width, page_height)
|
||||
c.drawCentredString(x, y, str(page_num))
|
||||
c.save()
|
||||
packet.seek(0)
|
||||
|
||||
# Merge overlay onto original page
|
||||
from PyPDF2 import PdfReader as OverlayReader
|
||||
overlay = OverlayReader(packet)
|
||||
page.merge_page(overlay.pages[0])
|
||||
writer.add_page(page)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
logger.info(f"Added page numbers to {total_pages} pages")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"output_size": os.path.getsize(output_path),
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to add page numbers: {str(e)}")
|
||||
|
||||
|
||||
def _get_number_position(
|
||||
position: str, page_width: float, page_height: float
|
||||
) -> tuple[float, float]:
|
||||
"""Calculate x, y coordinates for the page number text."""
|
||||
margin = 30 # points from edge
|
||||
|
||||
positions = {
|
||||
"bottom-center": (page_width / 2, margin),
|
||||
"bottom-right": (page_width - margin, margin),
|
||||
"bottom-left": (margin, margin),
|
||||
"top-center": (page_width / 2, page_height - margin),
|
||||
"top-right": (page_width - margin, page_height - margin),
|
||||
"top-left": (margin, page_height - margin),
|
||||
}
|
||||
|
||||
return positions.get(position, positions["bottom-center"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. PDF to Images
|
||||
# ---------------------------------------------------------------------------
|
||||
def pdf_to_images(
|
||||
input_path: str,
|
||||
output_dir: str,
|
||||
output_format: str = "png",
|
||||
dpi: int = 200,
|
||||
) -> dict:
|
||||
"""
|
||||
Convert each page of a PDF to an image.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_dir: Directory for output images
|
||||
output_format: "png" or "jpg"
|
||||
dpi: Resolution (72-600)
|
||||
|
||||
Returns:
|
||||
dict with page_count, output_files, zip_path, output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If conversion fails
|
||||
"""
|
||||
if output_format not in ("png", "jpg", "jpeg"):
|
||||
output_format = "png"
|
||||
if output_format == "jpeg":
|
||||
output_format = "jpg"
|
||||
|
||||
dpi = max(72, min(600, dpi))
|
||||
|
||||
try:
|
||||
from pdf2image import convert_from_path
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
images = convert_from_path(input_path, dpi=dpi)
|
||||
output_files = []
|
||||
|
||||
for i, img in enumerate(images):
|
||||
page_num = i + 1
|
||||
out_path = os.path.join(output_dir, f"page_{page_num}.{output_format}")
|
||||
|
||||
if output_format == "jpg":
|
||||
# Convert to RGB for JPEG
|
||||
if img.mode in ("RGBA", "P", "LA"):
|
||||
bg = Image.new("RGB", img.size, (255, 255, 255))
|
||||
if img.mode == "P":
|
||||
img = img.convert("RGBA")
|
||||
bg.paste(img, mask=img.split()[-1] if "A" in img.mode else None)
|
||||
img = bg
|
||||
img.save(out_path, "JPEG", quality=90, optimize=True)
|
||||
else:
|
||||
img.save(out_path, "PNG", optimize=True)
|
||||
|
||||
output_files.append(out_path)
|
||||
|
||||
# Create ZIP of all images
|
||||
zip_path = os.path.join(output_dir, "pdf_images.zip")
|
||||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for fpath in output_files:
|
||||
zf.write(fpath, os.path.basename(fpath))
|
||||
|
||||
logger.info(f"PDF→Images: {len(images)} pages → {output_format.upper()} @ {dpi} DPI")
|
||||
|
||||
return {
|
||||
"page_count": len(images),
|
||||
"format": output_format,
|
||||
"dpi": dpi,
|
||||
"output_size": os.path.getsize(zip_path),
|
||||
"zip_path": zip_path,
|
||||
}
|
||||
|
||||
except ImportError:
|
||||
raise PDFToolsError(
|
||||
"pdf2image is not installed. Install it with: pip install pdf2image"
|
||||
)
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to convert PDF to images: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. Images to PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
def images_to_pdf(input_paths: list[str], output_path: str) -> dict:
|
||||
"""
|
||||
Combine multiple images into a single PDF.
|
||||
|
||||
Args:
|
||||
input_paths: List of paths to image files (in order)
|
||||
output_path: Path for the output PDF
|
||||
|
||||
Returns:
|
||||
dict with page_count and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If conversion fails
|
||||
"""
|
||||
try:
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
images = []
|
||||
for path in input_paths:
|
||||
if not os.path.exists(path):
|
||||
raise PDFToolsError(f"Image not found: {os.path.basename(path)}")
|
||||
img = Image.open(path)
|
||||
# Convert to RGB (required for PDF)
|
||||
if img.mode in ("RGBA", "P", "LA"):
|
||||
bg = Image.new("RGB", img.size, (255, 255, 255))
|
||||
if img.mode == "P":
|
||||
img = img.convert("RGBA")
|
||||
bg.paste(img, mask=img.split()[-1] if "A" in img.mode else None)
|
||||
img = bg
|
||||
elif img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
images.append(img)
|
||||
|
||||
if not images:
|
||||
raise PDFToolsError("No valid images provided.")
|
||||
|
||||
# Save all images as a single PDF
|
||||
images[0].save(
|
||||
output_path,
|
||||
"PDF",
|
||||
save_all=True,
|
||||
append_images=images[1:],
|
||||
resolution=150,
|
||||
)
|
||||
|
||||
# Close images
|
||||
for img in images:
|
||||
img.close()
|
||||
|
||||
output_size = os.path.getsize(output_path)
|
||||
logger.info(f"Images→PDF: {len(input_paths)} images → {output_size} bytes")
|
||||
|
||||
return {
|
||||
"page_count": len(input_paths),
|
||||
"output_size": output_size,
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to create PDF from images: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 7. Watermark PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
def add_watermark(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
watermark_text: str,
|
||||
opacity: float = 0.3,
|
||||
font_size: int = 50,
|
||||
rotation: int = 45,
|
||||
) -> dict:
|
||||
"""
|
||||
Add a text watermark to every page of a PDF.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_path: Path for the watermarked output PDF
|
||||
watermark_text: Text to use as watermark
|
||||
opacity: Watermark opacity (0.0-1.0)
|
||||
font_size: Font size for watermark text
|
||||
rotation: Rotation angle in degrees
|
||||
|
||||
Returns:
|
||||
dict with total_pages and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If watermarking fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.colors import Color
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
writer = PdfWriter()
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
for page in reader.pages:
|
||||
page_width = float(page.mediabox.width)
|
||||
page_height = float(page.mediabox.height)
|
||||
|
||||
# Create watermark overlay
|
||||
packet = io.BytesIO()
|
||||
c = canvas.Canvas(packet, pagesize=(page_width, page_height))
|
||||
|
||||
# Set watermark properties
|
||||
c.setFont("Helvetica", font_size)
|
||||
c.setFillColor(Color(0.5, 0.5, 0.5, alpha=opacity))
|
||||
|
||||
# Draw rotated watermark text at center
|
||||
c.saveState()
|
||||
c.translate(page_width / 2, page_height / 2)
|
||||
c.rotate(rotation)
|
||||
c.drawCentredString(0, 0, watermark_text)
|
||||
c.restoreState()
|
||||
|
||||
c.save()
|
||||
packet.seek(0)
|
||||
|
||||
from PyPDF2 import PdfReader as OverlayReader
|
||||
overlay = OverlayReader(packet)
|
||||
page.merge_page(overlay.pages[0])
|
||||
writer.add_page(page)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
logger.info(f"Added watermark '{watermark_text}' to {total_pages} pages")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"output_size": os.path.getsize(output_path),
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to add watermark: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 8. Protect PDF (add password)
|
||||
# ---------------------------------------------------------------------------
|
||||
def protect_pdf(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
password: str,
|
||||
) -> dict:
|
||||
"""
|
||||
Add password protection to a PDF.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_path: Path for the protected output PDF
|
||||
password: Password to set
|
||||
|
||||
Returns:
|
||||
dict with total_pages and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If protection fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
writer = PdfWriter()
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
for page in reader.pages:
|
||||
writer.add_page(page)
|
||||
|
||||
writer.encrypt(password)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
logger.info(f"Protected PDF with password ({total_pages} pages)")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"output_size": os.path.getsize(output_path),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to protect PDF: {str(e)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 9. Unlock PDF (remove password)
|
||||
# ---------------------------------------------------------------------------
|
||||
def unlock_pdf(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
password: str,
|
||||
) -> dict:
|
||||
"""
|
||||
Remove password protection from a PDF.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF
|
||||
output_path: Path for the unlocked output PDF
|
||||
password: Current password of the PDF
|
||||
|
||||
Returns:
|
||||
dict with total_pages and output_size
|
||||
|
||||
Raises:
|
||||
PDFToolsError: If unlock fails
|
||||
"""
|
||||
try:
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
|
||||
reader = PdfReader(input_path)
|
||||
|
||||
if reader.is_encrypted:
|
||||
if not reader.decrypt(password):
|
||||
raise PDFToolsError("Incorrect password.")
|
||||
else:
|
||||
raise PDFToolsError("PDF is not password-protected.")
|
||||
|
||||
writer = PdfWriter()
|
||||
total_pages = len(reader.pages)
|
||||
|
||||
for page in reader.pages:
|
||||
writer.add_page(page)
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "wb") as f:
|
||||
writer.write(f)
|
||||
|
||||
logger.info(f"Unlocked PDF ({total_pages} pages)")
|
||||
|
||||
return {
|
||||
"total_pages": total_pages,
|
||||
"output_size": os.path.getsize(output_path),
|
||||
}
|
||||
|
||||
except PDFToolsError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise PDFToolsError(f"Failed to unlock PDF: {str(e)}")
|
||||
438
backend/app/tasks/pdf_tools_tasks.py
Normal file
438
backend/app/tasks/pdf_tools_tasks.py
Normal file
@@ -0,0 +1,438 @@
|
||||
"""Celery tasks for extended PDF tools (merge, split, rotate, etc.)."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from app.extensions import celery
|
||||
from app.services.pdf_tools_service import (
|
||||
merge_pdfs,
|
||||
split_pdf,
|
||||
rotate_pdf,
|
||||
add_page_numbers,
|
||||
pdf_to_images,
|
||||
images_to_pdf,
|
||||
add_watermark,
|
||||
protect_pdf,
|
||||
unlock_pdf,
|
||||
PDFToolsError,
|
||||
)
|
||||
from app.services.storage_service import storage
|
||||
from app.utils.sanitizer import cleanup_task_files
|
||||
|
||||
|
||||
def _cleanup(task_id: str):
|
||||
cleanup_task_files(task_id, keep_outputs=not storage.use_s3)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Merge PDFs
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.merge_pdfs_task")
|
||||
def merge_pdfs_task(
|
||||
self, input_paths: list[str], task_id: str, original_filenames: list[str]
|
||||
):
|
||||
"""Async task: Merge multiple PDFs into one."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_merged.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Merging PDFs..."})
|
||||
stats = merge_pdfs(input_paths, output_path)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
download_name = "merged.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"files_merged": stats["files_merged"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Merge completed — {stats['files_merged']} files, {stats['total_pages']} pages")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Merge error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Split PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.split_pdf_task")
|
||||
def split_pdf_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
mode: str = "all", pages: str | None = None,
|
||||
):
|
||||
"""Async task: Split a PDF into individual pages."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Splitting PDF..."})
|
||||
stats = split_pdf(input_path, output_dir, mode=mode, pages=pages)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
zip_path = stats["zip_path"]
|
||||
s3_key = storage.upload_file(zip_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_split.zip"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"extracted_pages": stats["extracted_pages"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Split completed — {stats['extracted_pages']} pages extracted")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Split error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rotate PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.rotate_pdf_task")
|
||||
def rotate_pdf_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
rotation: int = 90, pages: str = "all",
|
||||
):
|
||||
"""Async task: Rotate pages in a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_rotated.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": f"Rotating PDF by {rotation}°..."})
|
||||
stats = rotate_pdf(input_path, output_path, rotation=rotation, pages=pages)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_rotated.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"rotated_pages": stats["rotated_pages"],
|
||||
"rotation": stats["rotation"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Rotate completed — {stats['rotated_pages']} pages")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Rotate error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Add Page Numbers
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.add_page_numbers_task")
|
||||
def add_page_numbers_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
position: str = "bottom-center", start_number: int = 1,
|
||||
):
|
||||
"""Async task: Add page numbers to a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_numbered.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Adding page numbers..."})
|
||||
stats = add_page_numbers(input_path, output_path, position=position, start_number=start_number)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_numbered.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Page numbers added to {stats['total_pages']} pages")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Page numbers error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF to Images
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.pdf_to_images_task")
|
||||
def pdf_to_images_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
output_format: str = "png", dpi: int = 200,
|
||||
):
|
||||
"""Async task: Convert PDF pages to images."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Converting PDF to images..."})
|
||||
stats = pdf_to_images(input_path, output_dir, output_format=output_format, dpi=dpi)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
zip_path = stats["zip_path"]
|
||||
s3_key = storage.upload_file(zip_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_images.zip"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"page_count": stats["page_count"],
|
||||
"format": stats["format"],
|
||||
"dpi": stats["dpi"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: PDF→Images completed — {stats['page_count']} pages")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: PDF→Images error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Images to PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.images_to_pdf_task")
|
||||
def images_to_pdf_task(
|
||||
self, input_paths: list[str], task_id: str, original_filenames: list[str]
|
||||
):
|
||||
"""Async task: Combine images into a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_images.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Creating PDF from images..."})
|
||||
stats = images_to_pdf(input_paths, output_path)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
download_name = "images_combined.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"page_count": stats["page_count"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Images→PDF completed — {stats['page_count']} pages")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Images→PDF error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Watermark PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.watermark_pdf_task")
|
||||
def watermark_pdf_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
watermark_text: str, opacity: float = 0.3,
|
||||
):
|
||||
"""Async task: Add watermark to a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_watermarked.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Adding watermark..."})
|
||||
stats = add_watermark(input_path, output_path, watermark_text=watermark_text, opacity=opacity)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_watermarked.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: Watermark added")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Watermark error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Protect PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.protect_pdf_task")
|
||||
def protect_pdf_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
password: str,
|
||||
):
|
||||
"""Async task: Add password protection to a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_protected.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Protecting PDF..."})
|
||||
stats = protect_pdf(input_path, output_path, password=password)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_protected.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: PDF protected")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Protect error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unlock PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
@celery.task(bind=True, name="app.tasks.pdf_tools_tasks.unlock_pdf_task")
|
||||
def unlock_pdf_task(
|
||||
self, input_path: str, task_id: str, original_filename: str,
|
||||
password: str,
|
||||
):
|
||||
"""Async task: Remove password from a PDF."""
|
||||
output_dir = os.path.join("/tmp/outputs", task_id)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, f"{task_id}_unlocked.pdf")
|
||||
|
||||
try:
|
||||
self.update_state(state="PROCESSING", meta={"step": "Unlocking PDF..."})
|
||||
stats = unlock_pdf(input_path, output_path, password=password)
|
||||
|
||||
self.update_state(state="PROCESSING", meta={"step": "Uploading result..."})
|
||||
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
|
||||
|
||||
name_without_ext = os.path.splitext(original_filename)[0]
|
||||
download_name = f"{name_without_ext}_unlocked.pdf"
|
||||
download_url = storage.generate_presigned_url(s3_key, original_filename=download_name)
|
||||
|
||||
result = {
|
||||
"status": "completed",
|
||||
"download_url": download_url,
|
||||
"filename": download_name,
|
||||
"total_pages": stats["total_pages"],
|
||||
"output_size": stats["output_size"],
|
||||
}
|
||||
|
||||
_cleanup(task_id)
|
||||
logger.info(f"Task {task_id}: PDF unlocked")
|
||||
return result
|
||||
|
||||
except PDFToolsError as e:
|
||||
logger.error(f"Task {task_id}: Unlock error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id}: Unexpected error — {e}")
|
||||
_cleanup(task_id)
|
||||
return {"status": "failed", "error": "An unexpected error occurred."}
|
||||
Reference in New Issue
Block a user