feat: Enhance PDF tools with new reorder and watermark removal functionalities

- Added tests for rotating PDFs, removing watermarks, and reordering pages in the backend.
- Implemented frontend logic to read page counts from uploaded PDFs and validate page orders.
- Updated internationalization files to include new strings for reorder and watermark removal features.
- Improved user feedback during page count reading and validation in the Reorder PDF component.
- Ensured that the reorder functionality requires a complete permutation of pages.
This commit is contained in:
Your Name
2026-03-11 14:21:25 +02:00
parent e4e3b9fe2d
commit e06e64f85f
8 changed files with 641 additions and 69 deletions

View File

@@ -2,6 +2,8 @@
import os
import io
import logging
import math
import re
import subprocess
import tempfile
import zipfile
@@ -10,6 +12,42 @@ from PIL import Image
logger = logging.getLogger(__name__)
_TRAILING_TEXT_WATERMARK_RE = re.compile(
rb"q\s*"
rb"0 0 [-+]?\d*\.?\d+ [-+]?\d*\.?\d+ re\s*"
rb"W\s*n\s*"
rb"1 0 0 1 0 0 cm\s*"
rb"BT\s*/[^\s]+\s+[-+]?\d*\.?\d+\s+Tf\s+[-+]?\d*\.?\d+\s+TL\s+ET\s*"
rb"BT\s*/[^\s]+\s+[-+]?\d*\.?\d+\s+Tf\s+[-+]?\d*\.?\d+\s+TL\s+ET\s*"
rb"[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+rg\s*"
rb"/[^\s]+\s+gs\s*"
rb"q\s*[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+cm\s*"
rb"BT\s+1 0 0 1\s+[-+]?\d*\.?\d+\s+[-+]?\d*\.?\d+\s+Tm\s*"
rb".*?"
rb"ET\s*Q\s*Q\s*\Z",
re.DOTALL,
)
_TRAILING_IMAGE_WATERMARK_RE = re.compile(
rb"q\s*"
rb"(?:"
rb"0 0 [-+]?\d*\.?\d+ [-+]?\d*\.?\d+ re\s*"
rb"W\s*n\s*"
rb"1 0 0 1 0 0 cm\s*"
rb"(?:BT\s*/[^\s]+\s+[-+]?\d*\.?\d+\s+Tf\s+[-+]?\d*\.?\d+\s+TL\s+ET\s*)?"
rb")?"
rb"(?:q\s*)?"
rb"(?:/[^\s]+\s+gs\s*)?"
rb"(?P<a>[-+]?\d*\.?\d+)\s+"
rb"(?P<b>[-+]?\d*\.?\d+)\s+"
rb"(?P<c>[-+]?\d*\.?\d+)\s+"
rb"(?P<d>[-+]?\d*\.?\d+)\s+"
rb"(?P<e>[-+]?\d*\.?\d+)\s+"
rb"(?P<f>[-+]?\d*\.?\d+)\s+cm\s*"
rb"/(?P<name>[^\s]+)\s+Do\s*Q(?:\s*Q)?\s*\Z",
re.DOTALL,
)
class PDFToolsError(Exception):
"""Custom exception for PDF tools failures."""
@@ -250,12 +288,7 @@ def rotate_pdf(
if pages == "all":
rotate_indices = set(range(total_pages))
else:
rotate_indices = set()
for part in pages.split(","):
part = part.strip()
page = int(part)
if 1 <= page <= total_pages:
rotate_indices.add(page - 1)
rotate_indices = set(_parse_page_range(pages, total_pages))
rotated_count = 0
for i, page in enumerate(reader.pages):
@@ -715,8 +748,7 @@ def remove_watermark(
output_path: str,
) -> dict:
"""
Attempt to remove text-based watermarks from a PDF by rebuilding pages
without the largest semi-transparent text overlay.
Attempt to remove supported trailing watermark overlays from a PDF.
Args:
input_path: Path to the input PDF
@@ -730,30 +762,51 @@ def remove_watermark(
"""
try:
from PyPDF2 import PdfReader, PdfWriter
import re
from PyPDF2.generic import DecodedStreamObject, NameObject
reader = PdfReader(input_path)
writer = PdfWriter()
total_pages = len(reader.pages)
cleaned_pages = 0
removed_watermarks = 0
for page in reader.pages:
# Extract page content and attempt to remove watermark-like artifacts
# by rebuilding without operations that set very low opacity text
contents = page.get("/Contents")
contents = page.get_contents()
if contents is not None:
# Simple approach: copy page as-is (full removal requires
# content-stream parsing which varies by generator).
pass
cleaned_stream, removed_count = _strip_known_watermarks(
contents.get_data(),
float(page.mediabox.width),
float(page.mediabox.height),
)
if removed_count > 0:
replacement_stream = DecodedStreamObject()
replacement_stream.set_data(cleaned_stream)
page[NameObject("/Contents")] = replacement_stream
cleaned_pages += 1
removed_watermarks += removed_count
writer.add_page(page)
if removed_watermarks == 0:
raise PDFToolsError(
"No removable watermark overlay was detected. "
"Flattened or embedded page-content watermarks are not currently supported."
)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "wb") as f:
writer.write(f)
logger.info(f"Remove watermark processed {total_pages} pages")
logger.info(
"Remove watermark cleaned %s watermark block(s) across %s/%s page(s)",
removed_watermarks,
cleaned_pages,
total_pages,
)
return {
"total_pages": total_pages,
"cleaned_pages": cleaned_pages,
"output_size": os.path.getsize(output_path),
}
@@ -763,6 +816,109 @@ def remove_watermark(
raise PDFToolsError(f"Failed to remove watermark: {str(e)}")
def _strip_known_watermarks(
stream_data: bytes,
page_width: float,
page_height: float,
) -> tuple[bytes, int]:
"""Remove supported trailing text or image watermark overlays from a page stream."""
cleaned_stream = stream_data
removed_count = 0
while True:
cleaned_stream, removed_text_count = _strip_known_text_watermarks(cleaned_stream)
cleaned_stream, removed_image_count = _strip_known_image_watermarks(
cleaned_stream,
page_width,
page_height,
)
if removed_text_count == 0 and removed_image_count == 0:
break
removed_count += removed_text_count + removed_image_count
return cleaned_stream, removed_count
def _strip_known_text_watermarks(stream_data: bytes) -> tuple[bytes, int]:
"""Remove trailing text watermark overlays generated by common PDF watermark flows."""
cleaned_stream = stream_data
removed_count = 0
while True:
updated_stream, replacements = _TRAILING_TEXT_WATERMARK_RE.subn(
b"", cleaned_stream, count=1
)
if replacements == 0:
break
cleaned_stream = updated_stream.rstrip(b"\r\n")
removed_count += replacements
return cleaned_stream, removed_count
def _strip_known_image_watermarks(
stream_data: bytes,
page_width: float,
page_height: float,
) -> tuple[bytes, int]:
"""Remove trailing image XObject watermark overlays when they match the supported pattern."""
cleaned_stream = stream_data
removed_count = 0
while True:
match = _TRAILING_IMAGE_WATERMARK_RE.search(cleaned_stream)
if match is None:
break
if not _is_probable_image_watermark(match, page_width, page_height):
break
cleaned_stream = cleaned_stream[:match.start()].rstrip(b"\r\n")
removed_count += 1
return cleaned_stream, removed_count
def _is_probable_image_watermark(
match: re.Match[bytes],
page_width: float,
page_height: float,
) -> bool:
"""Heuristic guardrail so only overlay-style trailing image blocks are stripped."""
name = match.group("name").lower()
if not name.startswith((b"formxob", b"im", b"img", b"image")):
return False
a = float(match.group("a"))
b = float(match.group("b"))
c = float(match.group("c"))
d = float(match.group("d"))
e = float(match.group("e"))
f = float(match.group("f"))
width = math.hypot(a, b)
height = math.hypot(c, d)
if width < 24 or height < 24:
return False
page_area = page_width * page_height
overlay_area = width * height
if page_area <= 0 or overlay_area <= 0:
return False
coverage_ratio = overlay_area / page_area
if coverage_ratio > 0.95:
return False
if e == 0 and f == 0 and coverage_ratio > 0.6:
return False
return True
# ---------------------------------------------------------------------------
# 11. Reorder PDF Pages
# ---------------------------------------------------------------------------
@@ -792,15 +948,7 @@ def reorder_pdf_pages(
writer = PdfWriter()
total_pages = len(reader.pages)
if not page_order:
raise PDFToolsError("No page order specified.")
# Validate all page numbers
for p in page_order:
if p < 1 or p > total_pages:
raise PDFToolsError(
f"Page {p} is out of range. PDF has {total_pages} pages."
)
_validate_full_page_permutation(page_order, total_pages)
# Build new PDF in the requested order
for p in page_order:
@@ -824,6 +972,43 @@ def reorder_pdf_pages(
raise PDFToolsError(f"Failed to reorder PDF pages: {str(e)}")
def _validate_full_page_permutation(page_order: list[int], total_pages: int) -> None:
"""Require reorder requests to provide every page exactly once."""
if not page_order:
raise PDFToolsError("No page order specified.")
out_of_range = sorted({page for page in page_order if page < 1 or page > total_pages})
if out_of_range:
pages = ", ".join(str(page) for page in out_of_range)
raise PDFToolsError(
f"Page order contains out-of-range pages: {pages}. This PDF has {total_pages} pages."
)
duplicates: list[int] = []
seen: set[int] = set()
for page in page_order:
if page in seen and page not in duplicates:
duplicates.append(page)
seen.add(page)
missing = [page for page in range(1, total_pages + 1) if page not in seen]
if duplicates or missing or len(page_order) != total_pages:
details = ["Provide every page exactly once in the new order."]
if duplicates:
details.append(
f"Duplicate pages: {', '.join(str(page) for page in duplicates)}."
)
if missing:
details.append(
f"Missing pages: {', '.join(str(page) for page in missing)}."
)
raise PDFToolsError(
"Invalid page order. "
+ " ".join(details)
+ f" This PDF has {total_pages} pages."
)
# ---------------------------------------------------------------------------
# 12. Extract Pages (explicit extraction to new PDF)
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,8 @@
....................................................... [100%]
============================== warnings summary ===============================
backend/tests/test_pdf_tools_service.py::TestMergePdfsService::test_merge_file_not_found_raises
C:\Users\ahmed\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\LocalCache\local-packages\Python313\site-packages\PyPDF2\__init__.py:21: DeprecationWarning: PyPDF2 is deprecated. Please move to the pypdf library instead.
warnings.warn(
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
55 passed, 1 warning in 31.81s

View File

@@ -4,9 +4,13 @@ import pytest
from unittest.mock import patch, MagicMock
from app.services.pdf_tools_service import (
add_watermark,
merge_pdfs,
split_pdf,
PDFToolsError,
remove_watermark,
reorder_pdf_pages,
rotate_pdf,
split_pdf,
)
@@ -71,6 +75,122 @@ class TestSplitPdfService:
except ImportError:
pytest.skip("PyPDF2 not installed")
class TestRotatePdfService:
def test_rotate_range_invalid_format_returns_clear_message(self, app, tmp_path):
"""Should raise a clear error for malformed page specs instead of failing generically."""
with app.app_context():
try:
from PyPDF2 import PdfWriter
input_path = str(tmp_path / 'rotate-source.pdf')
output_path = str(tmp_path / 'rotate-output.pdf')
writer = PdfWriter()
writer.add_blank_page(width=612, height=792)
writer.add_blank_page(width=612, height=792)
with open(input_path, 'wb') as f:
writer.write(f)
with pytest.raises(PDFToolsError, match='Invalid page format'):
rotate_pdf(input_path, output_path, rotation=90, pages='1-two')
except ImportError:
pytest.skip("PyPDF2 not installed")
class TestRemoveWatermarkService:
def test_remove_text_watermark_from_reportlab_overlay(self, app, tmp_path):
"""Should remove text watermarks generated by the platform watermark flow."""
with app.app_context():
try:
from reportlab.pdfgen import canvas
from PyPDF2 import PdfReader
input_path = str(tmp_path / 'source.pdf')
watermarked_path = str(tmp_path / 'watermarked.pdf')
output_path = str(tmp_path / 'cleaned.pdf')
c = canvas.Canvas(input_path)
c.drawString(100, 700, 'Hello world')
c.save()
add_watermark(input_path, watermarked_path, 'CONFIDENTIAL')
result = remove_watermark(watermarked_path, output_path)
extracted_text = PdfReader(output_path).pages[0].extract_text() or ''
assert result['total_pages'] == 1
assert result['cleaned_pages'] == 1
assert result['output_size'] > 0
assert os.path.exists(output_path)
assert 'Hello world' in extracted_text
assert 'CONFIDENTIAL' not in extracted_text
except ImportError:
pytest.skip("PyPDF2/reportlab not installed")
def test_remove_image_watermark_overlay_from_trailing_xobject(self, app, tmp_path):
"""Should remove supported trailing image watermark overlays while preserving page text."""
with app.app_context():
try:
from PIL import Image
from reportlab.pdfgen import canvas
from PyPDF2 import PdfReader, PdfWriter
input_path = str(tmp_path / 'source.pdf')
overlay_path = str(tmp_path / 'overlay.pdf')
watermarked_path = str(tmp_path / 'image-watermarked.pdf')
output_path = str(tmp_path / 'image-cleaned.pdf')
watermark_image_path = str(tmp_path / 'watermark.png')
c = canvas.Canvas(input_path)
c.drawString(100, 700, 'Hello world')
c.save()
Image.new('RGBA', (200, 80), (220, 38, 38, 96)).save(watermark_image_path)
c = canvas.Canvas(overlay_path)
c.drawImage(watermark_image_path, 180, 360, width=240, height=96, mask='auto')
c.save()
base_page = PdfReader(input_path).pages[0]
overlay_page = PdfReader(overlay_path).pages[0]
base_page.merge_page(overlay_page)
writer = PdfWriter()
writer.add_page(base_page)
with open(watermarked_path, 'wb') as f:
writer.write(f)
result = remove_watermark(watermarked_path, output_path)
cleaned_page = PdfReader(output_path).pages[0]
extracted_text = cleaned_page.extract_text() or ''
cleaned_stream = cleaned_page.get_contents().get_data()
assert result['total_pages'] == 1
assert result['cleaned_pages'] == 1
assert 'Hello world' in extracted_text
assert b'/FormXob' not in cleaned_stream
except ImportError:
pytest.skip('PyPDF2/reportlab/Pillow not installed')
def test_remove_watermark_raises_when_no_supported_pattern_found(self, app, tmp_path):
"""Should fail clearly instead of returning an unchanged PDF as success."""
with app.app_context():
try:
from reportlab.pdfgen import canvas
input_path = str(tmp_path / 'plain.pdf')
output_path = str(tmp_path / 'plain_cleaned.pdf')
c = canvas.Canvas(input_path)
c.drawString(72, 720, 'Plain PDF without watermark')
c.save()
with pytest.raises(PDFToolsError, match='No removable watermark overlay'):
remove_watermark(input_path, output_path)
except ImportError:
pytest.skip("reportlab not installed")
def test_split_range_out_of_bounds_includes_total_pages(self, app, tmp_path):
"""Should raise a clear error when requested pages exceed document page count."""
with app.app_context():
@@ -108,4 +228,52 @@ class TestSplitPdfService:
with pytest.raises(PDFToolsError, match='Invalid page format'):
split_pdf(input_path, output_dir, mode='range', pages='1-2-3')
except ImportError:
pytest.skip("PyPDF2 not installed")
pytest.skip("PyPDF2 not installed")
class TestReorderPdfService:
def test_reorder_requires_full_page_permutation(self, app, tmp_path):
"""Should reject duplicates or omissions instead of silently dropping pages."""
with app.app_context():
try:
from PyPDF2 import PdfWriter
input_path = str(tmp_path / 'reorder-source.pdf')
output_path = str(tmp_path / 'reorder-output.pdf')
writer = PdfWriter()
for _ in range(3):
writer.add_blank_page(width=612, height=792)
with open(input_path, 'wb') as f:
writer.write(f)
with pytest.raises(PDFToolsError, match='Provide every page exactly once'):
reorder_pdf_pages(input_path, output_path, [3, 1, 1])
except ImportError:
pytest.skip('PyPDF2 not installed')
def test_reorder_accepts_full_page_permutation(self, app, tmp_path):
"""Should reorder when every page is present exactly once."""
with app.app_context():
try:
from reportlab.pdfgen import canvas
from PyPDF2 import PdfReader
input_path = str(tmp_path / 'reorder-valid-source.pdf')
output_path = str(tmp_path / 'reorder-valid-output.pdf')
c = canvas.Canvas(input_path)
for page_number in range(1, 4):
c.drawString(100, 700, f'Page {page_number}')
c.showPage()
c.save()
result = reorder_pdf_pages(input_path, output_path, [3, 1, 2])
reader = PdfReader(output_path)
assert result['reordered_pages'] == 3
assert 'Page 3' in (reader.pages[0].extract_text() or '')
assert 'Page 1' in (reader.pages[1].extract_text() or '')
assert 'Page 2' in (reader.pages[2].extract_text() or '')
except ImportError:
pytest.skip('PyPDF2/reportlab not installed')

View File

@@ -85,6 +85,54 @@ class TestPdfToolsTaskRoutes:
assert args[3] == 'CONFIDENTIAL'
assert args[4] == 0.3
def test_remove_watermark_dispatches_task(self, client, monkeypatch):
"""Remove watermark route should dispatch the correct Celery task."""
mock_task = MagicMock()
mock_task.id = 'remove-wm-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_actor_file',
lambda f, allowed_types, actor: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('remove-wm-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.remove_watermark_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
}
response = client.post('/api/pdf-tools/remove-watermark', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[0] == '/tmp/test.pdf'
assert args[1] == 'remove-wm-id'
assert args[2] == 'test.pdf'
def test_reorder_dispatches_task(self, client, monkeypatch):
"""Reorder route should dispatch with the parsed page order list."""
mock_task = MagicMock()
mock_task.id = 'reorder-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_actor_file',
lambda f, allowed_types, actor: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('reorder-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.reorder_pdf_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'page_order': '3,1,2',
}
response = client.post('/api/pdf-tools/reorder', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[0] == '/tmp/test.pdf'
assert args[1] == 'reorder-id'
assert args[2] == 'test.pdf'
assert args[3] == [3, 1, 2]
def test_protect_dispatches_task(self, client, monkeypatch):
"""Protect route should dispatch with password."""
mock_task = MagicMock()