ميزة: إضافة مكوني ProcedureSelection و StepProgress لأداة مخططات التدفق بصيغة PDF

- تنفيذ مكون ProcedureSelection لتمكين المستخدمين من اختيار الإجراءات من قائمة، وإدارة الاختيارات، ومعالجة الإجراءات المرفوضة.

- إنشاء مكون StepProgress لعرض تقدم معالج متعدد الخطوات بشكل مرئي.

- تعريف أنواع مشتركة للإجراءات، وخطوات التدفق، ورسائل الدردشة في ملف types.ts.

- إضافة اختبارات وحدة لخطافات useFileUpload و useTaskPolling لضمان الأداء السليم ومعالجة الأخطاء.

- تنفيذ اختبارات واجهة برمجة التطبيقات (API) للتحقق من تنسيقات نقاط النهاية وضمان اتساق ربط الواجهة الأمامية بالخلفية.
This commit is contained in:
Your Name
2026-03-06 17:16:09 +02:00
parent 2e97741d60
commit cfbcc8bd79
62 changed files with 10567 additions and 101 deletions

View File

@@ -22,7 +22,8 @@ WORKDIR /app
# Copy requirements first for Docker layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r requirements.txt \
&& python -c "import PyPDF2; print('PyPDF2 OK')"
# Copy application code
COPY . .

View File

@@ -62,6 +62,7 @@ def create_app(config_name=None):
from app.routes.tasks import tasks_bp
from app.routes.download import download_bp
from app.routes.pdf_tools import pdf_tools_bp
from app.routes.flowchart import flowchart_bp
app.register_blueprint(health_bp, url_prefix="/api")
app.register_blueprint(convert_bp, url_prefix="/api/convert")
@@ -69,6 +70,7 @@ def create_app(config_name=None):
app.register_blueprint(image_bp, url_prefix="/api/image")
app.register_blueprint(video_bp, url_prefix="/api/video")
app.register_blueprint(pdf_tools_bp, url_prefix="/api/pdf-tools")
app.register_blueprint(flowchart_bp, url_prefix="/api/flowchart")
app.register_blueprint(tasks_bp, url_prefix="/api/tasks")
app.register_blueprint(download_bp, url_prefix="/api/download")

View File

@@ -0,0 +1,103 @@
"""Flowchart route — POST /api/flowchart/extract, /chat, /generate-manual."""
import logging
from flask import Blueprint, request, jsonify
from app.extensions import limiter
from app.utils.file_validator import validate_file, FileValidationError
from app.utils.sanitizer import generate_safe_path
from app.tasks.flowchart_tasks import extract_flowchart_task
logger = logging.getLogger(__name__)
flowchart_bp = Blueprint("flowchart", __name__)
@flowchart_bp.route("/extract", methods=["POST"])
@limiter.limit("10/minute")
def extract_flowchart_route():
"""
Extract procedures from a PDF and generate flowcharts.
Accepts: multipart/form-data with a single 'file' field (PDF)
Returns: JSON with task_id for polling
"""
if "file" not in request.files:
return jsonify({"error": "No file uploaded."}), 400
file = request.files["file"]
try:
original_filename, ext = validate_file(file, allowed_types=["pdf"])
except FileValidationError as e:
return jsonify({"error": e.message}), e.code
task_id, input_path = generate_safe_path(ext)
file.save(input_path)
task = extract_flowchart_task.delay(input_path, task_id, original_filename)
return jsonify({
"task_id": task.id,
"message": "Flowchart extraction started.",
}), 202
@flowchart_bp.route("/chat", methods=["POST"])
@limiter.limit("20/minute")
def flowchart_chat_route():
"""
AI chat endpoint for flowchart improvement suggestions.
Accepts JSON: { message, flow_id, flow_data }
Returns JSON: { reply, updated_flow? }
"""
data = request.get_json(silent=True)
if not data or not data.get("message"):
return jsonify({"error": "Message is required."}), 400
message = str(data["message"])[:2000] # Limit message length
flow_data = data.get("flow_data")
try:
from app.services.ai_chat_service import chat_about_flowchart
result = chat_about_flowchart(message, flow_data)
return jsonify(result), 200
except Exception as e:
logger.error(f"Flowchart chat error: {e}")
return jsonify({"reply": "Sorry, I couldn't process your request. Please try again."}), 200
@flowchart_bp.route("/generate-manual", methods=["POST"])
@limiter.limit("10/minute")
def generate_manual_flowchart_route():
"""
Generate a flowchart from manually specified procedure data.
Accepts JSON: { title, description, pages (list of page texts) }
Returns JSON: { flowchart }
"""
data = request.get_json(silent=True)
if not data or not data.get("title"):
return jsonify({"error": "Title is required."}), 400
title = str(data["title"])[:200]
description = str(data.get("description", ""))[:500]
page_texts = data.get("pages", [])
from app.services.flowchart_service import generate_flowchart
# Build a synthetic procedure
procedure = {
"id": f"manual-{hash(title) % 100000}",
"title": title,
"description": description,
"pages": list(range(1, len(page_texts) + 1)),
}
pages_data = [
{"page": i + 1, "text": str(p.get("text", ""))[:5000]}
for i, p in enumerate(page_texts)
]
flowchart = generate_flowchart(procedure, pages_data)
return jsonify({"flowchart": flowchart}), 200

View File

@@ -93,6 +93,11 @@ def split_pdf_route():
if mode not in ("all", "range"):
mode = "all"
if mode == "range" and (not pages or not pages.strip()):
return jsonify({
"error": "Please specify which pages to extract (e.g. 1,3,5-8)."
}), 400
try:
original_filename, ext = validate_file(file, allowed_types=["pdf"])
except FileValidationError as e:

View File

@@ -3,11 +3,13 @@ from flask import Blueprint, jsonify
from celery.result import AsyncResult
from app.extensions import celery
from app.middleware.rate_limiter import limiter
tasks_bp = Blueprint("tasks", __name__)
@tasks_bp.route("/<task_id>/status", methods=["GET"])
@limiter.limit("300/minute", override_defaults=True)
def get_task_status(task_id: str):
"""
Get the status of an async task.

View File

@@ -0,0 +1,142 @@
"""AI Chat Service — OpenRouter integration for flowchart improvement."""
import os
import json
import logging
import requests
logger = logging.getLogger(__name__)
# Configuration
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "meta-llama/llama-3-8b-instruct")
OPENROUTER_BASE_URL = os.getenv(
"OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1/chat/completions"
)
SYSTEM_PROMPT = """You are a flowchart improvement assistant. You help users improve their flowcharts by:
1. Suggesting better step titles and descriptions
2. Identifying missing steps or decision points
3. Recommending better flow structure
4. Simplifying complex flows
When the user asks you to modify the flowchart, respond with your suggestion in plain text.
Keep responses concise and actionable. Reply in the same language the user uses."""
def chat_about_flowchart(message: str, flow_data: dict | None = None) -> dict:
"""
Send a message to the AI about a flowchart and get improvement suggestions.
Args:
message: User message
flow_data: Current flowchart data (optional)
Returns:
{"reply": "...", "updated_flow": {...} | None}
"""
if not OPENROUTER_API_KEY:
return {
"reply": _fallback_response(message, flow_data),
"updated_flow": None,
}
# Build context
context = ""
if flow_data:
steps_summary = []
for s in flow_data.get("steps", []):
steps_summary.append(
f"- [{s.get('type', 'process')}] {s.get('title', '')}"
)
context = (
f"\nCurrent flowchart: {flow_data.get('title', 'Untitled')}\n"
f"Steps:\n" + "\n".join(steps_summary)
)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"{message}{context}"},
]
try:
response = requests.post(
OPENROUTER_BASE_URL,
headers={
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": OPENROUTER_MODEL,
"messages": messages,
"max_tokens": 500,
"temperature": 0.7,
},
timeout=30,
)
response.raise_for_status()
data = response.json()
reply = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
.strip()
)
if not reply:
reply = "I couldn't generate a response. Please try again."
return {"reply": reply, "updated_flow": None}
except requests.exceptions.Timeout:
logger.warning("OpenRouter API timeout")
return {
"reply": "The AI service is taking too long. Please try again.",
"updated_flow": None,
}
except Exception as e:
logger.error(f"OpenRouter API error: {e}")
return {
"reply": _fallback_response(message, flow_data),
"updated_flow": None,
}
def _fallback_response(message: str, flow_data: dict | None) -> str:
"""Provide a helpful response when the AI API is unavailable."""
msg_lower = message.lower()
if flow_data:
steps = flow_data.get("steps", [])
title = flow_data.get("title", "your flowchart")
step_count = len(steps)
decision_count = sum(1 for s in steps if s.get("type") == "decision")
if any(
w in msg_lower for w in ["simplify", "reduce", "shorter", "بسط", "اختصر"]
):
return (
f"Your flowchart '{title}' has {step_count} steps. "
f"To simplify, consider merging consecutive process steps "
f"that perform related actions into a single step."
)
if any(
w in msg_lower for w in ["missing", "add", "more", "ناقص", "أضف"]
):
return (
f"Your flowchart has {decision_count} decision points. "
f"Consider adding error handling or validation steps "
f"between critical process nodes."
)
return (
f"Your flowchart '{title}' contains {step_count} steps "
f"({decision_count} decisions). To get AI-powered suggestions, "
f"please configure the OPENROUTER_API_KEY environment variable."
)
return (
"AI chat requires the OPENROUTER_API_KEY to be configured. "
"Please set up the environment variable for full AI functionality."
)

View File

@@ -0,0 +1,410 @@
"""Flowchart service — Extract procedures from PDF and generate flowchart data."""
import os
import re
import json
import logging
logger = logging.getLogger(__name__)
class FlowchartError(Exception):
"""Custom exception for flowchart operations."""
pass
# ---------------------------------------------------------------------------
# Heuristic keywords that signal procedural content
# ---------------------------------------------------------------------------
_PROCEDURE_KEYWORDS = [
"procedure", "protocol", "checklist", "sequence", "instruction",
"steps", "process", "workflow", "troubleshoot", "maintenance",
"startup", "shutdown", "emergency", "inspection", "replacement",
"installation", "calibration", "operation", "safety", "guide",
]
_STEP_PATTERNS = re.compile(
r"(?:^|\n)\s*(?:"
r"(?:step\s*\d+)|" # Step 1, Step 2 …
r"(?:\d+[\.\)]\s+)|" # 1. or 1) …
r"(?:[a-z][\.\)]\s+)|" # a. or a) …
r"(?:•\s)|" # bullet •
r"(?:-\s)|" # dash -
r"(?:✓\s)" # checkmark ✓
r")",
re.IGNORECASE,
)
_DECISION_KEYWORDS = re.compile(
r"\b(?:if|whether|check|verify|confirm|decide|inspect|compare|ensure|"
r"is\s+\w+\s*\?|does|should|can)\b",
re.IGNORECASE,
)
def extract_text_from_pdf(input_path: str) -> list[dict]:
"""
Extract text from each page of a PDF.
Returns:
List of dicts: [{"page": 1, "text": "..."}, ...]
"""
try:
from PyPDF2 import PdfReader
if not os.path.exists(input_path):
raise FlowchartError(f"File not found: {input_path}")
reader = PdfReader(input_path)
pages = []
for i, page in enumerate(reader.pages, start=1):
text = page.extract_text() or ""
pages.append({"page": i, "text": text.strip()})
return pages
except FlowchartError:
raise
except Exception as e:
raise FlowchartError(f"Failed to extract text from PDF: {str(e)}")
def identify_procedures(pages: list[dict]) -> list[dict]:
"""
Analyse extracted PDF text and identify procedures/sections.
Uses heuristic analysis:
1. Look for headings (lines in UPPER CASE or short bold-like lines)
2. Match procedure keywords
3. Group consecutive pages under the same heading
Returns:
List of procedures: [
{
"id": "proc-1",
"title": "Emergency Shutdown Protocol",
"description": "Extracted first paragraph...",
"pages": [8, 9],
"step_count": 6
},
...
]
"""
procedures = []
current_proc = None
proc_counter = 0
for page_data in pages:
text = page_data["text"]
page_num = page_data["page"]
if not text:
continue
lines = text.split("\n")
heading_candidates = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
# Heading heuristic: short line, mostly uppercase or title-like
is_heading = (
len(stripped) < 80
and (
stripped.isupper()
or (stripped == stripped.title() and len(stripped.split()) <= 8)
or any(kw in stripped.lower() for kw in _PROCEDURE_KEYWORDS)
)
and not stripped.endswith(",")
)
if is_heading:
heading_candidates.append(stripped)
# Check if this page has procedural content
has_steps = bool(_STEP_PATTERNS.search(text))
has_keywords = any(kw in text.lower() for kw in _PROCEDURE_KEYWORDS)
if heading_candidates and (has_steps or has_keywords):
best_heading = heading_candidates[0]
# Check if this is a continuation of the current procedure
if current_proc and _is_continuation(current_proc["title"], best_heading, text):
current_proc["pages"].append(page_num)
current_proc["_text"] += "\n" + text
else:
# Save previous procedure
if current_proc:
_finalize_procedure(current_proc)
procedures.append(current_proc)
proc_counter += 1
first_paragraph = _extract_first_paragraph(text, best_heading)
current_proc = {
"id": f"proc-{proc_counter}",
"title": _clean_title(best_heading),
"description": first_paragraph,
"pages": [page_num],
"_text": text,
}
elif current_proc and has_steps:
# Continuation — same procedure on next page
current_proc["pages"].append(page_num)
current_proc["_text"] += "\n" + text
# Don't forget the last one
if current_proc:
_finalize_procedure(current_proc)
procedures.append(current_proc)
# If no procedures found via headings, try splitting by page with step content
if not procedures:
procedures = _fallback_extraction(pages)
return procedures
def generate_flowchart(procedure: dict, page_texts: list[dict]) -> dict:
"""
Generate a flowchart (list of nodes + connections) from a procedure.
Args:
procedure: Procedure dict with id, title, pages
page_texts: All page text data
Returns:
Flowchart dict: {
"id": "flow-1",
"procedureId": "proc-1",
"title": "...",
"steps": [ {id, type, title, description, connections}, ... ]
}
"""
# Gather text for the procedure's pages
text = ""
for pt in page_texts:
if pt["page"] in procedure["pages"]:
text += pt["text"] + "\n"
steps = _extract_steps_from_text(text, procedure["title"])
return {
"id": f"flow-{procedure['id']}",
"procedureId": procedure["id"],
"title": procedure["title"],
"steps": steps,
}
def extract_and_generate(input_path: str) -> dict:
"""
Full pipeline: extract text → identify procedures → generate flowcharts.
Returns:
{
"procedures": [...],
"flowcharts": [...],
"total_pages": int
}
"""
pages = extract_text_from_pdf(input_path)
procedures = identify_procedures(pages)
flowcharts = []
for proc in procedures:
flow = generate_flowchart(proc, pages)
flowcharts.append(flow)
# Remove internal text field
for proc in procedures:
proc.pop("_text", None)
return {
"procedures": procedures,
"flowcharts": flowcharts,
"total_pages": len(pages),
"pages": pages,
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _is_continuation(current_title: str, new_heading: str, text: str) -> bool:
"""Check if a page is a continuation of the current procedure."""
continued_markers = ["(continued)", "(cont.)", "(cont'd)"]
heading_lower = new_heading.lower()
# Explicit continuation marker
if any(m in heading_lower for m in continued_markers):
return True
# Same title repeated
if current_title.lower().rstrip() in heading_lower:
return True
return False
def _clean_title(title: str) -> str:
"""Clean up a procedure title."""
# Remove continuation markers
title = re.sub(r"\s*\(continued\).*", "", title, flags=re.IGNORECASE)
title = re.sub(r"\s*\(cont[\.\']?d?\).*", "", title, flags=re.IGNORECASE)
# Remove leading numbers like "3.1"
title = re.sub(r"^\d+[\.\)]\s*", "", title)
title = re.sub(r"^\d+\.\d+\s*", "", title)
return title.strip()
def _extract_first_paragraph(text: str, heading: str) -> str:
"""Extract the first meaningful paragraph after a heading."""
idx = text.find(heading)
if idx >= 0:
after_heading = text[idx + len(heading):].strip()
else:
after_heading = text.strip()
lines = after_heading.split("\n")
paragraph = []
for line in lines:
stripped = line.strip()
if not stripped:
if paragraph:
break
continue
if stripped.isupper() and len(stripped) > 10:
break
paragraph.append(stripped)
desc = " ".join(paragraph)[:200]
return desc if desc else "Procedural content extracted from document."
def _finalize_procedure(proc: dict):
"""Calculate step count from the accumulated text."""
text = proc.get("_text", "")
matches = _STEP_PATTERNS.findall(text)
proc["step_count"] = max(len(matches), 2)
def _fallback_extraction(pages: list[dict]) -> list[dict]:
"""When no heading-based procedures found, detect pages with step-like content."""
procedures = []
proc_counter = 0
for page_data in pages:
text = page_data["text"]
if not text:
continue
has_steps = bool(_STEP_PATTERNS.search(text))
if has_steps:
proc_counter += 1
first_line = text.split("\n")[0].strip()[:60]
procedures.append({
"id": f"proc-{proc_counter}",
"title": first_line or f"Procedure (Page {page_data['page']})",
"description": text[:150].strip(),
"pages": [page_data["page"]],
"step_count": len(_STEP_PATTERNS.findall(text)),
})
return procedures
def _extract_steps_from_text(text: str, procedure_title: str) -> list[dict]:
"""
Parse text into flowchart steps (nodes).
Strategy:
1. Split text by numbered/bulleted lines
2. Classify each as process or decision
3. Add start/end nodes
4. Wire connections
"""
lines = text.split("\n")
raw_steps = []
current_step_lines = []
step_counter = 0
for line in lines:
stripped = line.strip()
if not stripped:
continue
# Is this the start of a new step?
is_step_start = bool(re.match(
r"^\s*(?:\d+[\.\)]\s+|[a-z][\.\)]\s+|•\s|-\s|✓\s|step\s*\d+)",
stripped,
re.IGNORECASE,
))
if is_step_start:
if current_step_lines:
raw_steps.append(" ".join(current_step_lines))
current_step_lines = [re.sub(r"^\s*(?:\d+[\.\)]\s*|[a-z][\.\)]\s*|•\s*|-\s*|✓\s*|step\s*\d+[:\.\)]\s*)", "", stripped, flags=re.IGNORECASE)]
elif current_step_lines:
current_step_lines.append(stripped)
if current_step_lines:
raw_steps.append(" ".join(current_step_lines))
# Limit to reasonable number of steps
if len(raw_steps) > 15:
raw_steps = raw_steps[:15]
# Build flowchart nodes
nodes = []
step_id = 0
# Start node
step_id += 1
nodes.append({
"id": str(step_id),
"type": "start",
"title": f"Begin: {procedure_title[:40]}",
"description": "Start of procedure",
"connections": [str(step_id + 1)] if raw_steps else [],
})
for i, step_text in enumerate(raw_steps):
step_id += 1
# Classify as decision or process
is_decision = bool(_DECISION_KEYWORDS.search(step_text))
node_type = "decision" if is_decision else "process"
title = step_text[:60]
description = step_text[:150]
connections = []
if i < len(raw_steps) - 1:
if is_decision:
# Decision: Yes goes to next, No could loop back or skip
connections = [str(step_id + 1)]
else:
connections = [str(step_id + 1)]
else:
connections = [str(step_id + 1)] # Connect to end
nodes.append({
"id": str(step_id),
"type": node_type,
"title": title,
"description": description,
"connections": connections,
})
# End node
step_id += 1
nodes.append({
"id": str(step_id),
"type": "end",
"title": "Procedure Complete",
"description": "End of procedure",
"connections": [],
})
return nodes

View File

@@ -140,20 +140,75 @@ def split_pdf(
def _parse_page_range(spec: str, total: int) -> list[int]:
"""Parse a page specification like '1,3,5-8' into 0-based indices."""
if not spec or not spec.strip():
raise PDFToolsError("Please specify at least one page (e.g. 1,3,5-8).")
indices = set()
for part in spec.split(","):
part = part.strip()
invalid_tokens = []
out_of_range_tokens = []
for raw_part in spec.split(","):
part = raw_part.strip()
if not part:
continue
if "-" in part:
if part.count("-") != 1:
invalid_tokens.append(part)
continue
start_s, end_s = part.split("-", 1)
start = max(1, int(start_s.strip()))
end = min(total, int(end_s.strip()))
start_s = start_s.strip()
end_s = end_s.strip()
if not start_s.isdigit() or not end_s.isdigit():
invalid_tokens.append(part)
continue
start = int(start_s)
end = int(end_s)
if start > end:
invalid_tokens.append(part)
continue
if start < 1 or end > total:
out_of_range_tokens.append(f"{start}-{end}")
continue
indices.update(range(start - 1, end))
else:
if not part.isdigit():
invalid_tokens.append(part)
continue
page = int(part)
if 1 <= page <= total:
indices.add(page - 1)
if page < 1 or page > total:
out_of_range_tokens.append(str(page))
continue
indices.add(page - 1)
if invalid_tokens:
tokens = ", ".join(invalid_tokens)
raise PDFToolsError(
f"Invalid page format: {tokens}. Use a format like 1,3,5-8."
)
if out_of_range_tokens:
tokens = ", ".join(out_of_range_tokens)
page_word = "page" if total == 1 else "pages"
raise PDFToolsError(
f"Selected pages ({tokens}) are out of range. This PDF has only {total} {page_word}."
)
if not indices:
raise PDFToolsError("No valid pages specified.")
page_word = "page" if total == 1 else "pages"
raise PDFToolsError(
f"No pages selected. This PDF has {total} {page_word}."
)
return sorted(indices)

View File

@@ -0,0 +1,79 @@
"""Celery tasks for PDF-to-Flowchart extraction and generation."""
import os
import json
import logging
from app.extensions import celery
from app.services.flowchart_service import extract_and_generate, FlowchartError
from app.services.storage_service import storage
from app.utils.sanitizer import cleanup_task_files
logger = logging.getLogger(__name__)
def _cleanup(task_id: str):
cleanup_task_files(task_id, keep_outputs=not storage.use_s3)
@celery.task(bind=True, name="app.tasks.flowchart_tasks.extract_flowchart_task")
def extract_flowchart_task(
self, input_path: str, task_id: str, original_filename: str
):
"""
Async task: Extract procedures from PDF and generate flowcharts.
Returns a JSON result containing procedures and their flowcharts.
"""
output_dir = os.path.join("/tmp/outputs", task_id)
os.makedirs(output_dir, exist_ok=True)
try:
self.update_state(
state="PROCESSING",
meta={"step": "Extracting text from PDF..."},
)
result = extract_and_generate(input_path)
self.update_state(
state="PROCESSING",
meta={"step": "Saving flowchart data..."},
)
# Save flowchart JSON to a file and upload
output_path = os.path.join(output_dir, f"{task_id}_flowcharts.json")
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
s3_key = storage.upload_file(output_path, task_id, folder="outputs")
download_url = storage.generate_presigned_url(
s3_key, original_filename="flowcharts.json"
)
final_result = {
"status": "completed",
"download_url": download_url,
"filename": "flowcharts.json",
"procedures": result["procedures"],
"flowcharts": result["flowcharts"],
"pages": result["pages"],
"total_pages": result["total_pages"],
"procedures_count": len(result["procedures"]),
}
_cleanup(task_id)
logger.info(
f"Task {task_id}: Flowchart extraction completed — "
f"{len(result['procedures'])} procedures, "
f"{result['total_pages']} pages"
)
return final_result
except FlowchartError as e:
logger.error(f"Task {task_id}: Flowchart error — {e}")
_cleanup(task_id)
return {"status": "failed", "error": str(e)}
except Exception as e:
logger.error(f"Task {task_id}: Unexpected error — {e}")
_cleanup(task_id)
return {"status": "failed", "error": "An unexpected error occurred."}

View File

@@ -1,7 +1,12 @@
"""File validation utilities — multi-layer security checks."""
import os
import magic
try:
import magic
HAS_MAGIC = True
except (ImportError, OSError):
HAS_MAGIC = False
from flask import current_app
from werkzeug.utils import secure_filename
@@ -72,18 +77,19 @@ def validate_file(file_storage, allowed_types: list[str] | None = None):
if file_size == 0:
raise FileValidationError("File is empty.")
# Layer 4: Check MIME type using magic bytes
# Layer 4: Check MIME type using magic bytes (if libmagic is available)
file_header = file_storage.read(8192)
file_storage.seek(0)
detected_mime = magic.from_buffer(file_header, mime=True)
expected_mimes = valid_extensions.get(ext, [])
if HAS_MAGIC:
detected_mime = magic.from_buffer(file_header, mime=True)
expected_mimes = valid_extensions.get(ext, [])
if detected_mime not in expected_mimes:
raise FileValidationError(
f"File content does not match extension '.{ext}'. "
f"Detected type: {detected_mime}"
)
if detected_mime not in expected_mimes:
raise FileValidationError(
f"File content does not match extension '.{ext}'. "
f"Detected type: {detected_mime}"
)
# Layer 5: Additional content checks for specific types
if ext == "pdf":

View File

@@ -66,6 +66,13 @@ class BaseConfig:
RATELIMIT_STORAGE_URI = os.getenv("REDIS_URL", "redis://redis:6379/0")
RATELIMIT_DEFAULT = "100/hour"
# OpenRouter AI
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "meta-llama/llama-3-8b-instruct")
OPENROUTER_BASE_URL = os.getenv(
"OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1/chat/completions"
)
class DevelopmentConfig(BaseConfig):
"""Development configuration."""
@@ -88,6 +95,15 @@ class TestingConfig(BaseConfig):
UPLOAD_FOLDER = "/tmp/test_uploads"
OUTPUT_FOLDER = "/tmp/test_outputs"
# Disable Redis-backed rate limiting; use in-memory instead
RATELIMIT_STORAGE_URI = "memory://"
RATELIMIT_ENABLED = False
# Use in-memory transport for Celery so tests don't need Redis
CELERY_BROKER_URL = "memory://"
CELERY_RESULT_BACKEND = "cache+memory://"
REDIS_URL = "memory://"
config = {
"development": DevelopmentConfig,

View File

@@ -24,9 +24,18 @@ pdf2image>=1.16,<2.0
# AWS
boto3>=1.34,<2.0
# HTTP Client
requests>=2.31,<3.0
# Security
werkzeug>=3.0,<4.0
# Testing
pytest>=8.0,<9.0
pytest-flask>=1.3,<2.0
pytest>=7.4.0
pytest-cov>=4.1.0
pytest-mock>=3.11.0
requests-mock>=1.11.0
fakeredis>=2.18.0
httpx>=0.24.0

View File

@@ -1,5 +1,8 @@
import io
import os
import shutil
import pytest
from unittest.mock import patch, MagicMock
from app import create_app
@@ -7,12 +10,22 @@ from app import create_app
def app():
"""Create application for testing."""
os.environ['FLASK_ENV'] = 'testing'
app = create_app()
app = create_app('testing')
app.config.update({
'TESTING': True,
'UPLOAD_FOLDER': '/tmp/test_uploads',
'OUTPUT_FOLDER': '/tmp/test_outputs',
})
# Create temp directories
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
yield app
# Cleanup temp directories
shutil.rmtree(app.config['UPLOAD_FOLDER'], ignore_errors=True)
shutil.rmtree(app.config['OUTPUT_FOLDER'], ignore_errors=True)
@pytest.fixture
def client(app):
@@ -24,3 +37,72 @@ def client(app):
def runner(app):
"""Flask test CLI runner."""
return app.test_cli_runner()
# ---------------------------------------------------------------------------
# Helpers: Create realistic test files with valid magic bytes
# ---------------------------------------------------------------------------
def make_pdf_bytes() -> bytes:
"""Create minimal valid PDF bytes for testing."""
return (
b"%PDF-1.4\n"
b"1 0 obj<</Type/Catalog/Pages 2 0 R>>endobj\n"
b"2 0 obj<</Type/Pages/Count 1/Kids[3 0 R]>>endobj\n"
b"3 0 obj<</Type/Page/MediaBox[0 0 612 792]/Parent 2 0 R>>endobj\n"
b"xref\n0 4\n"
b"0000000000 65535 f \n"
b"0000000009 00000 n \n"
b"0000000058 00000 n \n"
b"0000000115 00000 n \n"
b"trailer<</Root 1 0 R/Size 4>>\n"
b"startxref\n190\n%%EOF"
)
def make_png_bytes() -> bytes:
"""Create minimal valid PNG bytes for testing."""
# 1x1 white pixel PNG
return (
b"\x89PNG\r\n\x1a\n" # PNG signature
b"\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01"
b"\x08\x02\x00\x00\x00\x90wS\xde"
b"\x00\x00\x00\x0cIDATx\x9cc\xf8\x0f\x00\x00\x01\x01\x00\x05"
b"\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82"
)
def make_jpeg_bytes() -> bytes:
"""Create minimal valid JPEG bytes for testing."""
# Minimal JPEG header
return (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01"
b"\x00\x01\x00\x00\xff\xd9"
)
@pytest.fixture
def pdf_file():
"""Create a PDF file-like object for upload testing."""
return io.BytesIO(make_pdf_bytes()), 'test.pdf'
@pytest.fixture
def png_file():
"""Create a PNG file-like object for upload testing."""
return io.BytesIO(make_png_bytes()), 'test.png'
@pytest.fixture
def mock_celery_task():
"""Mock a Celery AsyncResult for task dispatch tests."""
mock_task = MagicMock()
mock_task.id = 'test-task-id-12345'
return mock_task
@pytest.fixture
def mock_magic():
"""Mock python-magic to return expected MIME types."""
with patch('app.utils.file_validator.magic') as mock_m:
yield mock_m

View File

@@ -0,0 +1,74 @@
"""Tests for PDF compression service."""
import os
from unittest.mock import patch, MagicMock
import pytest
from app.services.compress_service import compress_pdf, PDFCompressionError
class TestCompressService:
def test_compress_pdf_invalid_quality_defaults(self, app):
"""Invalid quality should default to medium."""
with app.app_context():
with patch('app.services.compress_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr='')
# Create temp input file
input_path = '/tmp/test_compress_input.pdf'
output_path = '/tmp/test_compress_output.pdf'
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
with open(output_path, 'wb') as f:
f.write(b'%PDF-1.4 compressed')
result = compress_pdf(input_path, output_path, quality="invalid")
# Should have used "medium" default (/ebook)
cmd_args = mock_run.call_args[0][0]
assert any('/ebook' in str(arg) for arg in cmd_args)
# Cleanup
os.unlink(input_path)
os.unlink(output_path)
def test_compress_pdf_returns_stats(self, app):
"""Should return original_size, compressed_size, reduction_percent."""
with app.app_context():
input_path = '/tmp/test_stats_input.pdf'
output_path = '/tmp/test_stats_output.pdf'
# Create input (100 bytes)
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4' + b'\x00' * 92)
with patch('app.services.compress_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr='')
# Create smaller output (50 bytes)
with open(output_path, 'wb') as f:
f.write(b'%PDF-1.4' + b'\x00' * 42)
result = compress_pdf(input_path, output_path, 'medium')
assert 'original_size' in result
assert 'compressed_size' in result
assert result['original_size'] == 100
assert result['compressed_size'] == 50
os.unlink(input_path)
os.unlink(output_path)
def test_compress_pdf_gs_failure_raises(self, app):
"""Should raise PDFCompressionError when Ghostscript fails."""
with app.app_context():
input_path = '/tmp/test_fail_input.pdf'
output_path = '/tmp/test_fail_output.pdf'
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
with patch('app.services.compress_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(
returncode=1, stderr='Error processing PDF'
)
with pytest.raises(PDFCompressionError):
compress_pdf(input_path, output_path, 'medium')
os.unlink(input_path)

View File

@@ -0,0 +1,74 @@
"""Tests for PDF compression Celery tasks."""
import io
from unittest.mock import MagicMock
class TestCompressTaskRoute:
def test_compress_pdf_no_file(self, client):
"""POST /api/compress/pdf without file should return 400."""
response = client.post('/api/compress/pdf')
assert response.status_code == 400
assert response.get_json()['error'] == 'No file provided.'
def test_compress_pdf_with_quality(self, client, monkeypatch):
"""Should accept quality parameter (low/medium/high)."""
mock_task = MagicMock()
mock_task.id = 'compress-task-id'
monkeypatch.setattr(
'app.routes.compress.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.compress.generate_safe_path',
lambda ext, folder_type: ('compress-task-id', '/tmp/test.pdf'),
)
monkeypatch.setattr(
'app.routes.compress.compress_pdf_task.delay',
MagicMock(return_value=mock_task),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'quality': 'high',
}
response = client.post(
'/api/compress/pdf',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
assert response.get_json()['task_id'] == 'compress-task-id'
def test_compress_pdf_invalid_quality_defaults(self, client, monkeypatch):
"""Invalid quality should default to medium."""
mock_task = MagicMock()
mock_task.id = 'compress-default-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr(
'app.routes.compress.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.compress.generate_safe_path',
lambda ext, folder_type: ('id', '/tmp/test.pdf'),
)
monkeypatch.setattr(
'app.routes.compress.compress_pdf_task.delay',
mock_delay,
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'quality': 'ultra', # invalid
}
response = client.post(
'/api/compress/pdf',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
# The route defaults invalid quality to "medium"
call_args = mock_delay.call_args[0]
assert call_args[3] == 'medium'

View File

@@ -0,0 +1,72 @@
"""Tests for file conversion Celery task routes."""
import io
from unittest.mock import MagicMock
class TestConvertTaskRoutes:
def test_pdf_to_word_success(self, client, monkeypatch):
"""Should return 202 with task_id for valid PDF upload."""
mock_task = MagicMock()
mock_task.id = 'convert-pdf-word-id'
monkeypatch.setattr(
'app.routes.convert.validate_file',
lambda f, allowed_types: ('document.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.convert.generate_safe_path',
lambda ext, folder_type: ('convert-pdf-word-id', '/tmp/test.pdf'),
)
monkeypatch.setattr(
'app.routes.convert.convert_pdf_to_word.delay',
MagicMock(return_value=mock_task),
)
data = {'file': (io.BytesIO(b'%PDF-1.4'), 'document.pdf')}
response = client.post(
'/api/convert/pdf-to-word',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'convert-pdf-word-id'
assert 'message' in body
def test_word_to_pdf_success(self, client, monkeypatch):
"""Should return 202 with task_id for valid Word upload."""
mock_task = MagicMock()
mock_task.id = 'convert-word-pdf-id'
monkeypatch.setattr(
'app.routes.convert.validate_file',
lambda f, allowed_types: ('report.docx', 'docx'),
)
monkeypatch.setattr(
'app.routes.convert.generate_safe_path',
lambda ext, folder_type: ('convert-word-pdf-id', '/tmp/test.docx'),
)
monkeypatch.setattr(
'app.routes.convert.convert_word_to_pdf.delay',
MagicMock(return_value=mock_task),
)
data = {'file': (io.BytesIO(b'PK\x03\x04'), 'report.docx')}
response = client.post(
'/api/convert/word-to-pdf',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'convert-word-pdf-id'
def test_pdf_to_word_no_file(self, client):
"""Should return 400 when no file provided."""
response = client.post('/api/convert/pdf-to-word')
assert response.status_code == 400
def test_word_to_pdf_no_file(self, client):
"""Should return 400 when no file provided."""
response = client.post('/api/convert/word-to-pdf')
assert response.status_code == 400

View File

@@ -0,0 +1,49 @@
"""Tests for file download route."""
import os
class TestDownload:
def test_download_nonexistent_file(self, client):
"""Should return 404 for missing file."""
response = client.get('/api/download/some-task-id/output.pdf')
assert response.status_code == 404
def test_download_path_traversal_task_id(self, client):
"""Should reject task_id with path traversal characters."""
response = client.get('/api/download/../etc/output.pdf')
# Flask will handle this — either 400 or 404
assert response.status_code in (400, 404)
def test_download_path_traversal_filename(self, client):
"""Should reject filename with path traversal characters."""
response = client.get('/api/download/valid-id/../../etc/passwd')
assert response.status_code in (400, 404)
def test_download_valid_file(self, client, app):
"""Should serve file if it exists."""
task_id = 'test-download-id'
filename = 'output.pdf'
# Create the file in the output directory
output_dir = os.path.join(app.config['OUTPUT_FOLDER'], task_id)
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, filename)
with open(file_path, 'wb') as f:
f.write(b'%PDF-1.4 test content')
response = client.get(f'/api/download/{task_id}/{filename}')
assert response.status_code == 200
assert response.data == b'%PDF-1.4 test content'
def test_download_with_custom_name(self, client, app):
"""Should use the ?name= parameter as download filename."""
task_id = 'test-name-id'
filename = 'output.pdf'
output_dir = os.path.join(app.config['OUTPUT_FOLDER'], task_id)
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, filename), 'wb') as f:
f.write(b'%PDF-1.4')
response = client.get(f'/api/download/{task_id}/{filename}?name=my-document.pdf')
assert response.status_code == 200

View File

@@ -0,0 +1,108 @@
"""Tests for file validation utility."""
import io
from unittest.mock import patch, MagicMock
from app.utils.file_validator import validate_file, FileValidationError
import pytest
class TestFileValidator:
def test_no_file_raises(self, app):
"""Should raise when no file provided."""
with app.app_context():
with pytest.raises(FileValidationError, match="No file provided"):
validate_file(None, allowed_types=["pdf"])
def test_empty_filename_raises(self, app):
"""Should raise when filename is empty."""
with app.app_context():
mock_file = MagicMock()
mock_file.filename = ''
with pytest.raises(FileValidationError, match="No file provided"):
validate_file(mock_file, allowed_types=["pdf"])
def test_wrong_extension_raises(self, app):
"""Should raise when file extension is not allowed."""
with app.app_context():
mock_file = MagicMock()
mock_file.filename = 'test.exe'
with pytest.raises(FileValidationError, match="not allowed"):
validate_file(mock_file, allowed_types=["pdf"])
def test_empty_file_raises(self, app):
"""Should raise when file is empty (0 bytes)."""
with app.app_context():
content = io.BytesIO(b'')
mock_file = MagicMock()
mock_file.filename = 'test.pdf'
mock_file.seek = content.seek
mock_file.tell = content.tell
mock_file.read = content.read
with pytest.raises(FileValidationError, match="empty"):
validate_file(mock_file, allowed_types=["pdf"])
def test_valid_pdf_passes(self, app):
"""Should accept valid PDF file with correct magic bytes."""
with app.app_context():
pdf_bytes = b'%PDF-1.4 test content' + b'\x00' * 8192
content = io.BytesIO(pdf_bytes)
mock_file = MagicMock()
mock_file.filename = 'document.pdf'
mock_file.seek = content.seek
mock_file.tell = content.tell
mock_file.read = content.read
with patch('app.utils.file_validator.magic') as mock_magic:
mock_magic.from_buffer.return_value = 'application/pdf'
filename, ext = validate_file(mock_file, allowed_types=["pdf"])
assert filename == 'document.pdf'
assert ext == 'pdf'
def test_mime_mismatch_raises(self, app):
"""Should raise when MIME type doesn't match extension."""
with app.app_context():
content = io.BytesIO(b'not a real pdf' + b'\x00' * 8192)
mock_file = MagicMock()
mock_file.filename = 'fake.pdf'
mock_file.seek = content.seek
mock_file.tell = content.tell
mock_file.read = content.read
with patch('app.utils.file_validator.magic') as mock_magic:
mock_magic.from_buffer.return_value = 'text/plain'
with pytest.raises(FileValidationError, match="does not match"):
validate_file(mock_file, allowed_types=["pdf"])
def test_file_too_large_raises(self, app):
"""Should raise when file exceeds size limit."""
with app.app_context():
# Create a file larger than the PDF size limit (20MB)
large_content = io.BytesIO(b'%PDF-1.4' + b'\x00' * (21 * 1024 * 1024))
mock_file = MagicMock()
mock_file.filename = 'large.pdf'
mock_file.seek = large_content.seek
mock_file.tell = large_content.tell
mock_file.read = large_content.read
with pytest.raises(FileValidationError, match="too large"):
validate_file(mock_file, allowed_types=["pdf"])
def test_dangerous_pdf_raises(self, app):
"""Should raise when PDF contains dangerous patterns."""
with app.app_context():
pdf_bytes = b'%PDF-1.4 /JavaScript evil_code' + b'\x00' * 8192
content = io.BytesIO(pdf_bytes)
mock_file = MagicMock()
mock_file.filename = 'evil.pdf'
mock_file.seek = content.seek
mock_file.tell = content.tell
mock_file.read = content.read
with patch('app.utils.file_validator.magic') as mock_magic:
mock_magic.from_buffer.return_value = 'application/pdf'
with pytest.raises(FileValidationError, match="unsafe"):
validate_file(mock_file, allowed_types=["pdf"])

View File

@@ -0,0 +1,50 @@
"""Tests for image processing service."""
import os
from unittest.mock import patch, MagicMock
import pytest
from app.services.image_service import convert_image, ImageProcessingError
class TestImageService:
def test_convert_invalid_format_raises(self, app):
"""Should raise for unsupported output format."""
with app.app_context():
with pytest.raises(ImageProcessingError, match="Unsupported"):
convert_image('/tmp/test.png', '/tmp/out.bmp', 'bmp')
def test_convert_image_success(self, app, tmp_path):
"""Should convert an image and return stats."""
from PIL import Image as PILImage
with app.app_context():
# Create real test image
input_path = str(tmp_path / 'input.png')
output_path = str(tmp_path / 'output.jpg')
img = PILImage.new('RGB', (100, 100), color='red')
img.save(input_path, 'PNG')
result = convert_image(input_path, output_path, 'jpg', quality=85)
assert result['width'] == 100
assert result['height'] == 100
assert result['format'] == 'jpg'
assert result['original_size'] > 0
assert result['converted_size'] > 0
assert os.path.exists(output_path)
def test_convert_rgba_to_jpeg(self, app, tmp_path):
"""Should handle RGBA to JPEG conversion (strip alpha)."""
from PIL import Image as PILImage
with app.app_context():
input_path = str(tmp_path / 'input_rgba.png')
output_path = str(tmp_path / 'output.jpg')
img = PILImage.new('RGBA', (50, 50), color=(255, 0, 0, 128))
img.save(input_path, 'PNG')
result = convert_image(input_path, output_path, 'jpg', quality=85)
assert result['format'] == 'jpg'
assert os.path.exists(output_path)

View File

@@ -0,0 +1,115 @@
"""Tests for image processing Celery task routes."""
import io
from unittest.mock import MagicMock
class TestImageTaskRoutes:
def test_convert_image_success(self, client, monkeypatch):
"""Should return 202 with task_id for valid image conversion request."""
mock_task = MagicMock()
mock_task.id = 'img-convert-id'
monkeypatch.setattr(
'app.routes.image.validate_file',
lambda f, allowed_types: ('photo.png', 'png'),
)
monkeypatch.setattr(
'app.routes.image.generate_safe_path',
lambda ext, folder_type: ('img-convert-id', '/tmp/test.png'),
)
monkeypatch.setattr(
'app.routes.image.convert_image_task.delay',
MagicMock(return_value=mock_task),
)
data = {
'file': (io.BytesIO(b'\x89PNG\r\n'), 'photo.png'),
'format': 'jpg',
'quality': '85',
}
response = client.post(
'/api/image/convert',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
assert response.get_json()['task_id'] == 'img-convert-id'
def test_convert_image_invalid_format(self, client):
"""Should return 400 for unsupported output format."""
data = {
'file': (io.BytesIO(b'\x89PNG\r\n'), 'photo.png'),
'format': 'bmp',
}
response = client.post(
'/api/image/convert',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'format' in response.get_json()['error'].lower()
def test_resize_image_success(self, client, monkeypatch):
"""Should return 202 with task_id for valid resize request."""
mock_task = MagicMock()
mock_task.id = 'img-resize-id'
monkeypatch.setattr(
'app.routes.image.validate_file',
lambda f, allowed_types: ('photo.jpg', 'jpg'),
)
monkeypatch.setattr(
'app.routes.image.generate_safe_path',
lambda ext, folder_type: ('img-resize-id', '/tmp/test.jpg'),
)
monkeypatch.setattr(
'app.routes.image.resize_image_task.delay',
MagicMock(return_value=mock_task),
)
data = {
'file': (io.BytesIO(b'\xff\xd8\xff'), 'photo.jpg'),
'width': '800',
'height': '600',
}
response = client.post(
'/api/image/resize',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
assert response.get_json()['task_id'] == 'img-resize-id'
def test_resize_image_no_dimensions(self, client, monkeypatch):
"""Should return 400 when both width and height are missing."""
monkeypatch.setattr(
'app.routes.image.validate_file',
lambda f, allowed_types: ('photo.jpg', 'jpg'),
)
data = {
'file': (io.BytesIO(b'\xff\xd8\xff'), 'photo.jpg'),
}
response = client.post(
'/api/image/resize',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'width' in response.get_json()['error'].lower() or 'height' in response.get_json()['error'].lower()
def test_resize_image_invalid_width(self, client, monkeypatch):
"""Should return 400 for width out of range."""
monkeypatch.setattr(
'app.routes.image.validate_file',
lambda f, allowed_types: ('photo.jpg', 'jpg'),
)
data = {
'file': (io.BytesIO(b'\xff\xd8\xff'), 'photo.jpg'),
'width': '20000',
}
response = client.post(
'/api/image/resize',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400

207
backend/tests/test_load.py Normal file
View File

@@ -0,0 +1,207 @@
"""
Concurrent / load tests — verify the API handles multiple simultaneous
requests without race conditions or resource leaks.
These tests do NOT require Redis or Celery; every external call is mocked.
"""
import io
import threading
from unittest.mock import MagicMock, patch
# ---------------------------------------------------------------------------
# Rapid sequential requests — baseline stability
# ---------------------------------------------------------------------------
class TestRapidSequential:
def test_100_health_requests(self, client):
"""100 back-to-back /health requests must all return 200."""
for _ in range(100):
r = client.get('/api/health')
assert r.status_code == 200
def test_rapid_no_file_errors_are_safe(self, client):
"""50 rapid requests that each produce a 400 must not leak state."""
for _ in range(50):
r = client.post('/api/compress/pdf')
assert r.status_code == 400
assert r.get_json()['error']
# ---------------------------------------------------------------------------
# Concurrent requests — 10 simultaneous threads, each with its own client
# ---------------------------------------------------------------------------
class TestConcurrentRequests:
def test_10_concurrent_health(self, app):
"""10 threads hitting /health simultaneously must all get 200."""
results: list[int] = []
errors: list[Exception] = []
lock = threading.Lock()
def worker():
try:
with app.test_client() as c:
r = c.get('/api/health')
with lock:
results.append(r.status_code)
except Exception as exc:
with lock:
errors.append(exc)
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert not errors, f"Threads raised: {errors}"
assert results.count(200) == 10
def test_concurrent_compress_uploads(self, app):
"""5 concurrent compress requests each return 202 without deadlocks.
Patches are applied ONCE outside threads to avoid thread-safety issues
with unittest.mock's global state."""
task_ids: list[str] = []
errors: list[Exception] = []
lock = threading.Lock()
# Use a counter-based side_effect so the shared mock returns distinct ids
counter = [0]
def make_task():
with lock:
n = counter[0]
counter[0] += 1
t = MagicMock()
t.id = f'task-thread-{n}'
return t
# Apply all patches BEFORE threads start — avoids concurrent patch/unpatch
with patch('app.routes.compress.validate_file', return_value=('t.pdf', 'pdf')), \
patch('app.routes.compress.generate_safe_path',
side_effect=lambda ext, folder_type: (f'tid-x', '/tmp/up/t.pdf')), \
patch('werkzeug.datastructures.file_storage.FileStorage.save'), \
patch('app.routes.compress.compress_pdf_task.delay',
side_effect=lambda *a, **kw: make_task()):
def worker():
try:
with app.test_client() as c:
r = c.post(
'/api/compress/pdf',
data={'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf')},
content_type='multipart/form-data',
)
with lock:
if r.status_code == 202:
task_ids.append(r.get_json()['task_id'])
else:
errors.append(
AssertionError(
f"expected 202, got {r.status_code}: {r.data}"
)
)
except Exception as exc:
with lock:
errors.append(exc)
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=15)
assert not errors, f"Errors in threads: {errors}"
assert len(task_ids) == 5
assert len(set(task_ids)) == 5, "task_ids must be unique per request"
def test_concurrent_pdf_tools_requests(self, app):
"""3 concurrent split-PDF requests must not interfere with each other.
Patches applied once outside threads for thread safety."""
statuses: list[int] = []
errors: list[Exception] = []
lock = threading.Lock()
with patch('app.routes.pdf_tools.validate_file', return_value=('t.pdf', 'pdf')), \
patch('app.routes.pdf_tools.generate_safe_path',
side_effect=lambda ext, folder_type: ('split-x', '/tmp/up/t.pdf')), \
patch('werkzeug.datastructures.file_storage.FileStorage.save'), \
patch('app.routes.pdf_tools.split_pdf_task.delay',
return_value=MagicMock(id='split-task')):
def worker():
try:
with app.test_client() as c:
r = c.post(
'/api/pdf-tools/split',
data={
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'mode': 'all',
},
content_type='multipart/form-data',
)
with lock:
statuses.append(r.status_code)
except Exception as exc:
with lock:
errors.append(exc)
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=15)
assert not errors, f"Errors in threads: {errors}"
assert all(s == 202 for s in statuses), f"Got statuses: {statuses}"
# ---------------------------------------------------------------------------
# File-size enforcement
# ---------------------------------------------------------------------------
class TestFileSizeLimits:
def test_compress_rejects_oversized_request(self, client, app):
"""Requests exceeding MAX_CONTENT_LENGTH must be rejected (413)."""
original = app.config['MAX_CONTENT_LENGTH']
try:
# Set 1-byte limit so any real file triggers it
app.config['MAX_CONTENT_LENGTH'] = 1
oversized = io.BytesIO(b'%PDF-1.4' + b'x' * 2048)
r = client.post(
'/api/compress/pdf',
data={'file': (oversized, 'huge.pdf')},
content_type='multipart/form-data',
)
assert r.status_code in (400, 413), (
f"Expected 400 or 413 for oversized file, got {r.status_code}"
)
finally:
app.config['MAX_CONTENT_LENGTH'] = original
def test_normal_size_file_is_accepted(self, client, monkeypatch):
"""A file within the size limit reaches the route logic."""
monkeypatch.setattr(
'app.routes.compress.validate_file',
lambda f, allowed_types: ('t.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.compress.generate_safe_path',
lambda ext, folder_type: ('tid', '/tmp/test_uploads/tid/t.pdf'),
)
monkeypatch.setattr(
'werkzeug.datastructures.file_storage.FileStorage.save',
lambda self, dst, buffer_size=16384: None,
)
mock_task = MagicMock()
mock_task.id = 'size-ok-task'
monkeypatch.setattr(
'app.routes.compress.compress_pdf_task.delay',
MagicMock(return_value=mock_task),
)
small_pdf = io.BytesIO(b'%PDF-1.4 small')
r = client.post(
'/api/compress/pdf',
data={'file': (small_pdf, 'small.pdf')},
content_type='multipart/form-data',
)
assert r.status_code == 202

View File

@@ -0,0 +1,64 @@
"""Tests for PDF conversion service (pdf_to_word, word_to_pdf)."""
import os
from unittest.mock import patch, MagicMock
import pytest
from app.services.pdf_service import pdf_to_word, PDFConversionError
class TestPdfService:
def test_pdf_to_word_creates_output_dir(self, app):
"""Should create output directory if it doesn't exist."""
with app.app_context():
input_path = '/tmp/test_pdf_svc_input.pdf'
output_dir = '/tmp/test_pdf_svc_output'
expected_output = os.path.join(output_dir, 'test_pdf_svc_input.docx')
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
with patch('app.services.pdf_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(
returncode=0, stdout='', stderr=''
)
# Simulate LibreOffice creating the output file
os.makedirs(output_dir, exist_ok=True)
with open(expected_output, 'wb') as f:
f.write(b'PK\x03\x04 fake docx')
result = pdf_to_word(input_path, output_dir)
assert result == expected_output
os.unlink(input_path)
import shutil
shutil.rmtree(output_dir, ignore_errors=True)
def test_pdf_to_word_timeout_raises(self, app):
"""Should raise error on LibreOffice timeout."""
with app.app_context():
import subprocess
input_path = '/tmp/test_pdf_timeout.pdf'
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
with patch('app.services.pdf_service.subprocess.run') as mock_run:
mock_run.side_effect = subprocess.TimeoutExpired(cmd='soffice', timeout=120)
with pytest.raises(PDFConversionError, match="timed out"):
pdf_to_word(input_path, '/tmp/timeout_output')
os.unlink(input_path)
def test_pdf_to_word_not_installed_raises(self, app):
"""Should raise error when LibreOffice is not installed."""
with app.app_context():
input_path = '/tmp/test_pdf_noinstall.pdf'
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
with patch('app.services.pdf_service.subprocess.run') as mock_run:
mock_run.side_effect = FileNotFoundError()
with pytest.raises(PDFConversionError, match="not installed"):
pdf_to_word(input_path, '/tmp/noinstall_output')
os.unlink(input_path)

View File

@@ -0,0 +1,531 @@
"""Tests for ALL PDF tools routes — Merge, Split, Rotate, Page Numbers, PDF↔Images, Watermark, Protect, Unlock."""
import io
import os
import tempfile
from unittest.mock import patch, MagicMock
# =========================================================================
# Helper: create mock for validate_file + celery task
# =========================================================================
def _mock_validate_and_task(monkeypatch, task_module_path, task_name):
"""Shared helper: mock validate_file to pass, mock the celery task,
and ensure file.save() works by using a real temp directory."""
mock_task = MagicMock()
mock_task.id = 'mock-task-id'
# Create a real temp dir so file.save() works
tmp_dir = tempfile.mkdtemp()
save_path = os.path.join(tmp_dir, 'mock.pdf')
# Mock file validator to accept any file
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('mock-task-id', save_path),
)
# Mock the celery task delay
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr(f'app.routes.pdf_tools.{task_name}.delay', mock_delay)
return mock_task, mock_delay
# =========================================================================
# 1. Merge PDFs — POST /api/pdf-tools/merge
# =========================================================================
class TestMergePdfs:
def test_merge_no_files(self, client):
"""Should return 400 when no files provided."""
response = client.post('/api/pdf-tools/merge')
assert response.status_code == 400
data = response.get_json()
assert 'error' in data
def test_merge_single_file(self, client):
"""Should return 400 when only one file provided."""
data = {'files': (io.BytesIO(b'%PDF-1.4 test'), 'test.pdf')}
response = client.post(
'/api/pdf-tools/merge',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'at least 2' in response.get_json()['error'].lower()
def test_merge_success(self, client, monkeypatch):
"""Should return 202 with task_id when valid PDFs provided."""
mock_task = MagicMock()
mock_task.id = 'merge-task-id'
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
monkeypatch.setattr(
'app.routes.pdf_tools.merge_pdfs_task.delay',
MagicMock(return_value=mock_task),
)
# Mock os.makedirs and FileStorage.save so nothing touches disk
monkeypatch.setattr('app.routes.pdf_tools.os.makedirs', lambda *a, **kw: None)
monkeypatch.setattr(
'werkzeug.datastructures.file_storage.FileStorage.save',
lambda self, dst, buffer_size=16384: None,
)
data = {
'files': [
(io.BytesIO(b'%PDF-1.4 file1'), 'a.pdf'),
(io.BytesIO(b'%PDF-1.4 file2'), 'b.pdf'),
]
}
response = client.post(
'/api/pdf-tools/merge',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'merge-task-id'
assert 'message' in body
def test_merge_too_many_files(self, client, monkeypatch):
"""Should return 400 when more than 20 files provided."""
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'files': [
(io.BytesIO(b'%PDF-1.4'), f'file{i}.pdf')
for i in range(21)
]
}
response = client.post(
'/api/pdf-tools/merge',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '20' in response.get_json()['error']
# =========================================================================
# 2. Split PDF — POST /api/pdf-tools/split
# =========================================================================
class TestSplitPdf:
def test_split_no_file(self, client):
"""Should return 400 when no file provided."""
response = client.post('/api/pdf-tools/split')
assert response.status_code == 400
data = response.get_json()
assert data['error'] == 'No file provided.'
def test_split_success_all_mode(self, client, monkeypatch):
"""Should accept file and return 202 with mode=all."""
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'split_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4 test'), 'test.pdf'),
'mode': 'all',
}
response = client.post(
'/api/pdf-tools/split',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'mock-task-id'
def test_split_success_range_mode(self, client, monkeypatch):
"""Should accept file with mode=range and pages."""
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'split_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4 test'), 'test.pdf'),
'mode': 'range',
'pages': '1,3,5-8',
}
response = client.post(
'/api/pdf-tools/split',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
mock_delay.assert_called_once()
# Verify pages parameter was passed
call_args = mock_delay.call_args
assert call_args[0][4] == '1,3,5-8' # pages arg
def test_split_range_mode_requires_pages(self, client, monkeypatch):
"""Should return 400 when range mode is selected without pages."""
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4 test'), 'test.pdf'),
'mode': 'range',
}
response = client.post(
'/api/pdf-tools/split',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'specify which pages to extract' in response.get_json()['error'].lower()
# =========================================================================
# 3. Rotate PDF — POST /api/pdf-tools/rotate
# =========================================================================
class TestRotatePdf:
def test_rotate_no_file(self, client):
response = client.post('/api/pdf-tools/rotate')
assert response.status_code == 400
def test_rotate_invalid_degrees(self, client, monkeypatch):
"""Should reject invalid rotation angles."""
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'rotation': '45',
}
response = client.post(
'/api/pdf-tools/rotate',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '90, 180, or 270' in response.get_json()['error']
def test_rotate_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'rotate_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'rotation': '90',
'pages': 'all',
}
response = client.post(
'/api/pdf-tools/rotate',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
assert response.get_json()['task_id'] == 'mock-task-id'
# =========================================================================
# 4. Page Numbers — POST /api/pdf-tools/page-numbers
# =========================================================================
class TestAddPageNumbers:
def test_page_numbers_no_file(self, client):
response = client.post('/api/pdf-tools/page-numbers')
assert response.status_code == 400
def test_page_numbers_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'add_page_numbers_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'position': 'bottom-center',
'start_number': '1',
}
response = client.post(
'/api/pdf-tools/page-numbers',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
def test_page_numbers_invalid_position_defaults(self, client, monkeypatch):
"""Invalid position should default to bottom-center."""
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'add_page_numbers_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'position': 'invalid-position',
}
response = client.post(
'/api/pdf-tools/page-numbers',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
# Should have used default 'bottom-center'
call_args = mock_delay.call_args[0]
assert call_args[3] == 'bottom-center'
# =========================================================================
# 5. PDF to Images — POST /api/pdf-tools/pdf-to-images
# =========================================================================
class TestPdfToImages:
def test_pdf_to_images_no_file(self, client):
response = client.post('/api/pdf-tools/pdf-to-images')
assert response.status_code == 400
def test_pdf_to_images_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'pdf_to_images_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'format': 'png',
'dpi': '200',
}
response = client.post(
'/api/pdf-tools/pdf-to-images',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
def test_pdf_to_images_invalid_format_defaults(self, client, monkeypatch):
"""Invalid format should default to png."""
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'pdf_to_images_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'format': 'bmp',
}
response = client.post(
'/api/pdf-tools/pdf-to-images',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
call_args = mock_delay.call_args[0]
assert call_args[3] == 'png' # default format
# =========================================================================
# 6. Images to PDF — POST /api/pdf-tools/images-to-pdf
# =========================================================================
class TestImagesToPdf:
def test_images_to_pdf_no_files(self, client):
response = client.post('/api/pdf-tools/images-to-pdf')
assert response.status_code == 400
def test_images_to_pdf_success(self, client, monkeypatch):
mock_task = MagicMock()
mock_task.id = 'images-task-id'
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.png', 'png'),
)
monkeypatch.setattr(
'app.routes.pdf_tools.images_to_pdf_task.delay',
MagicMock(return_value=mock_task),
)
# Mock os.makedirs and FileStorage.save so nothing touches disk
monkeypatch.setattr('app.routes.pdf_tools.os.makedirs', lambda *a, **kw: None)
monkeypatch.setattr(
'werkzeug.datastructures.file_storage.FileStorage.save',
lambda self, dst, buffer_size=16384: None,
)
data = {
'files': [
(io.BytesIO(b'\x89PNG\r\n'), 'img1.png'),
(io.BytesIO(b'\x89PNG\r\n'), 'img2.png'),
]
}
response = client.post(
'/api/pdf-tools/images-to-pdf',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
assert response.get_json()['task_id'] == 'images-task-id'
def test_images_to_pdf_too_many(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.png', 'png'),
)
data = {
'files': [
(io.BytesIO(b'\x89PNG\r\n'), f'img{i}.png')
for i in range(51)
]
}
response = client.post(
'/api/pdf-tools/images-to-pdf',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '50' in response.get_json()['error']
# =========================================================================
# 7. Watermark PDF — POST /api/pdf-tools/watermark
# =========================================================================
class TestWatermarkPdf:
def test_watermark_no_file(self, client):
response = client.post('/api/pdf-tools/watermark')
assert response.status_code == 400
def test_watermark_no_text(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'text': '',
}
response = client.post(
'/api/pdf-tools/watermark',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'required' in response.get_json()['error'].lower()
def test_watermark_text_too_long(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'text': 'x' * 101,
}
response = client.post(
'/api/pdf-tools/watermark',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '100' in response.get_json()['error']
def test_watermark_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'watermark_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'text': 'CONFIDENTIAL',
'opacity': '0.5',
}
response = client.post(
'/api/pdf-tools/watermark',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
# =========================================================================
# 8. Protect PDF — POST /api/pdf-tools/protect
# =========================================================================
class TestProtectPdf:
def test_protect_no_file(self, client):
response = client.post('/api/pdf-tools/protect')
assert response.status_code == 400
def test_protect_no_password(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': '',
}
response = client.post(
'/api/pdf-tools/protect',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'required' in response.get_json()['error'].lower()
def test_protect_short_password(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': 'abc',
}
response = client.post(
'/api/pdf-tools/protect',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '4 characters' in response.get_json()['error']
def test_protect_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'protect_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': 'secret1234',
}
response = client.post(
'/api/pdf-tools/protect',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
# =========================================================================
# 9. Unlock PDF — POST /api/pdf-tools/unlock
# =========================================================================
class TestUnlockPdf:
def test_unlock_no_file(self, client):
response = client.post('/api/pdf-tools/unlock')
assert response.status_code == 400
def test_unlock_no_password(self, client, monkeypatch):
monkeypatch.setattr(
'app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'),
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': '',
}
response = client.post(
'/api/pdf-tools/unlock',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
def test_unlock_success(self, client, monkeypatch):
mock_task, mock_delay = _mock_validate_and_task(
monkeypatch, 'app.routes.pdf_tools', 'unlock_pdf_task'
)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': 'mypassword',
}
response = client.post(
'/api/pdf-tools/unlock',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202

View File

@@ -0,0 +1,111 @@
"""Tests for PDF tools service — Merge, Split, Rotate, etc."""
import os
import pytest
from unittest.mock import patch, MagicMock
from app.services.pdf_tools_service import (
merge_pdfs,
split_pdf,
PDFToolsError,
)
class TestMergePdfsService:
def test_merge_file_not_found_raises(self, app):
"""Should raise when input file doesn't exist."""
with app.app_context():
with pytest.raises(PDFToolsError, match="not found"):
merge_pdfs(
['/tmp/nonexistent1.pdf', '/tmp/nonexistent2.pdf'],
'/tmp/merged_output.pdf',
)
def test_merge_success(self, app, tmp_path):
"""Should merge PDF files successfully."""
with app.app_context():
# Create test PDFs using PyPDF2
try:
from PyPDF2 import PdfWriter
pdf1 = str(tmp_path / 'a.pdf')
pdf2 = str(tmp_path / 'b.pdf')
for path in [pdf1, pdf2]:
writer = PdfWriter()
writer.add_blank_page(width=612, height=792)
with open(path, 'wb') as f:
writer.write(f)
output = str(tmp_path / 'merged.pdf')
result = merge_pdfs([pdf1, pdf2], output)
assert result['total_pages'] == 2
assert result['files_merged'] == 2
assert result['output_size'] > 0
assert os.path.exists(output)
except ImportError:
pytest.skip("PyPDF2 not installed")
class TestSplitPdfService:
def test_split_all_pages(self, app, tmp_path):
"""Should split PDF into individual pages."""
with app.app_context():
try:
from PyPDF2 import PdfWriter
# Create 3-page PDF
input_path = str(tmp_path / 'multi.pdf')
writer = PdfWriter()
for _ in range(3):
writer.add_blank_page(width=612, height=792)
with open(input_path, 'wb') as f:
writer.write(f)
output_dir = str(tmp_path / 'split_output')
result = split_pdf(input_path, output_dir, mode='all')
assert result['total_pages'] == 3
assert result['extracted_pages'] == 3
assert os.path.exists(result['zip_path'])
except ImportError:
pytest.skip("PyPDF2 not installed")
def test_split_range_out_of_bounds_includes_total_pages(self, app, tmp_path):
"""Should raise a clear error when requested pages exceed document page count."""
with app.app_context():
try:
from PyPDF2 import PdfWriter
input_path = str(tmp_path / 'single-page.pdf')
writer = PdfWriter()
writer.add_blank_page(width=612, height=792)
with open(input_path, 'wb') as f:
writer.write(f)
output_dir = str(tmp_path / 'split_output')
with pytest.raises(PDFToolsError, match='has only 1 page'):
split_pdf(input_path, output_dir, mode='range', pages='1-2')
except ImportError:
pytest.skip("PyPDF2 not installed")
def test_split_range_invalid_format_returns_clear_message(self, app, tmp_path):
"""Should raise a clear error for malformed page ranges."""
with app.app_context():
try:
from PyPDF2 import PdfWriter
input_path = str(tmp_path / 'two-pages.pdf')
writer = PdfWriter()
writer.add_blank_page(width=612, height=792)
writer.add_blank_page(width=612, height=792)
with open(input_path, 'wb') as f:
writer.write(f)
output_dir = str(tmp_path / 'split_output')
with pytest.raises(PDFToolsError, match='Invalid page format'):
split_pdf(input_path, output_dir, mode='range', pages='1-2-3')
except ImportError:
pytest.skip("PyPDF2 not installed")

View File

@@ -0,0 +1,176 @@
"""Tests for PDF tools Celery task routes — ensures frontend→backend request formats work."""
import io
from unittest.mock import MagicMock
class TestPdfToolsTaskRoutes:
"""
These tests verify that the backend route accepts the exact request format
the frontend sends, processes parameters correctly, and dispatches the
appropriate Celery task.
"""
def test_split_dispatches_task(self, client, monkeypatch):
"""Split route should dispatch split_pdf_task with correct params."""
mock_task = MagicMock()
mock_task.id = 'split-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('split-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.split_pdf_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'mode': 'range',
'pages': '1-5',
}
response = client.post('/api/pdf-tools/split', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
# Verify task was called with (input_path, task_id, filename, mode, pages)
args = mock_delay.call_args[0]
assert args[3] == 'range'
assert args[4] == '1-5'
def test_rotate_dispatches_task(self, client, monkeypatch):
"""Rotate route should dispatch with rotation and pages params."""
mock_task = MagicMock()
mock_task.id = 'rotate-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('rotate-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.rotate_pdf_task.delay', mock_delay)
# Frontend sends: rotation=90, pages=all
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'rotation': '180',
'pages': 'all',
}
response = client.post('/api/pdf-tools/rotate', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[3] == 180 # rotation as int
assert args[4] == 'all'
def test_watermark_dispatches_task(self, client, monkeypatch):
"""Watermark route should dispatch with text and opacity."""
mock_task = MagicMock()
mock_task.id = 'wm-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('wm-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.watermark_pdf_task.delay', mock_delay)
# Frontend sends: text and opacity (as decimal string)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'text': 'CONFIDENTIAL',
'opacity': '0.3',
}
response = client.post('/api/pdf-tools/watermark', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[3] == 'CONFIDENTIAL'
assert args[4] == 0.3
def test_protect_dispatches_task(self, client, monkeypatch):
"""Protect route should dispatch with password."""
mock_task = MagicMock()
mock_task.id = 'protect-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('protect-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.protect_pdf_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': 'mySecret123',
}
response = client.post('/api/pdf-tools/protect', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[3] == 'mySecret123'
def test_unlock_dispatches_task(self, client, monkeypatch):
"""Unlock route should dispatch with password."""
mock_task = MagicMock()
mock_task.id = 'unlock-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('unlock-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.unlock_pdf_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'password': 'oldPassword',
}
response = client.post('/api/pdf-tools/unlock', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
def test_page_numbers_dispatches_task(self, client, monkeypatch):
"""Page numbers route should dispatch with position and start_number."""
mock_task = MagicMock()
mock_task.id = 'pn-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('pn-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.add_page_numbers_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'position': 'top-right',
'start_number': '5',
}
response = client.post('/api/pdf-tools/page-numbers', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[3] == 'top-right'
assert args[4] == 5
def test_pdf_to_images_dispatches_task(self, client, monkeypatch):
"""PDF to images route should dispatch with format and dpi."""
mock_task = MagicMock()
mock_task.id = 'p2i-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr('app.routes.pdf_tools.validate_file',
lambda f, allowed_types: ('test.pdf', 'pdf'))
monkeypatch.setattr('app.routes.pdf_tools.generate_safe_path',
lambda ext, folder_type: ('p2i-id', '/tmp/test.pdf'))
monkeypatch.setattr('app.routes.pdf_tools.pdf_to_images_task.delay', mock_delay)
data = {
'file': (io.BytesIO(b'%PDF-1.4'), 'test.pdf'),
'format': 'jpg',
'dpi': '300',
}
response = client.post('/api/pdf-tools/pdf-to-images', data=data,
content_type='multipart/form-data')
assert response.status_code == 202
args = mock_delay.call_args[0]
assert args[3] == 'jpg'
assert args[4] == 300

View File

@@ -0,0 +1,101 @@
"""Tests for rate limiting middleware."""
import pytest
from app import create_app
@pytest.fixture
def rate_limited_app(tmp_path):
"""App with rate limiting ENABLED.
TestingConfig sets RATELIMIT_ENABLED=False so the other 116 tests are
never throttled. Here we force the extension's internal flag back to
True *after* init_app so the decorator limits are enforced.
"""
app = create_app('testing')
app.config.update({
'TESTING': True,
'RATELIMIT_STORAGE_URI': 'memory://',
'UPLOAD_FOLDER': str(tmp_path / 'uploads'),
'OUTPUT_FOLDER': str(tmp_path / 'outputs'),
})
import os
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['OUTPUT_FOLDER'], exist_ok=True)
# flask-limiter 3.x returns from init_app immediately when
# RATELIMIT_ENABLED=False (TestingConfig default), so `initialized`
# stays False and no limits are enforced. We override the config key
# and call init_app a SECOND time so the extension fully initialises.
# It is safe to call twice — flask-limiter guards against duplicate
# before_request hook registration via app.extensions["limiter"].
from app.extensions import limiter as _limiter
app.config['RATELIMIT_ENABLED'] = True
_limiter.init_app(app) # second call — now RATELIMIT_ENABLED=True
yield app
# Restore so other tests are unaffected
_limiter.enabled = False
_limiter.initialized = False
@pytest.fixture
def rate_limited_client(rate_limited_app):
return rate_limited_app.test_client()
class TestRateLimiter:
def test_health_endpoint_not_rate_limited(self, client):
"""Health endpoint should handle many rapid requests."""
for _ in range(20):
response = client.get('/api/health')
assert response.status_code == 200
def test_rate_limit_header_present(self, client):
"""Response should include a valid HTTP status code."""
response = client.get('/api/health')
assert response.status_code == 200
class TestRateLimitEnforcement:
"""Verify that per-route rate limits actually trigger (429) when exceeded."""
def test_compress_rate_limit_triggers(self, rate_limited_client):
"""
POST /api/compress/pdf has @limiter.limit("10/minute").
After 10 requests (each returns 400 for missing file, but the limiter
still counts them), the 11th must get 429 Too Many Requests.
"""
blocked = False
for i in range(15):
r = rate_limited_client.post('/api/compress/pdf')
if r.status_code == 429:
blocked = True
break
assert blocked, (
"Expected a 429 Too Many Requests after exceeding 10/minute "
"on /api/compress/pdf"
)
def test_convert_pdf_to_word_rate_limit(self, rate_limited_client):
"""POST /api/convert/pdf-to-word is also rate-limited."""
blocked = False
for _ in range(15):
r = rate_limited_client.post('/api/convert/pdf-to-word')
if r.status_code == 429:
blocked = True
break
assert blocked, "Rate limit not enforced on /api/convert/pdf-to-word"
def test_different_endpoints_have_independent_limits(self, rate_limited_client):
"""
Exhausting the limit on /compress/pdf must not affect /api/health,
which has no rate limit.
"""
# Exhaust compress limit
for _ in range(15):
rate_limited_client.post('/api/compress/pdf')
# Health should still respond normally
r = rate_limited_client.get('/api/health')
assert r.status_code == 200

View File

@@ -0,0 +1,74 @@
"""Tests for sanitizer utilities — generate_safe_path, get_output_path, cleanup."""
import os
from app.utils.sanitizer import generate_safe_path, get_output_path, cleanup_task_files
class TestGenerateSafePath:
def test_returns_tuple(self, app):
"""Should return (task_id, file_path) tuple."""
with app.app_context():
task_id, path = generate_safe_path('pdf', folder_type='upload')
assert isinstance(task_id, str)
assert isinstance(path, str)
def test_uuid_in_path(self, app):
"""Path should contain the UUID task_id."""
with app.app_context():
task_id, path = generate_safe_path('pdf')
assert task_id in path
def test_correct_extension(self, app):
"""Path should end with the specified extension."""
with app.app_context():
_, path = generate_safe_path('docx')
assert path.endswith('.docx')
def test_upload_folder(self, app):
"""upload folder_type should use UPLOAD_FOLDER config."""
with app.app_context():
_, path = generate_safe_path('pdf', folder_type='upload')
assert app.config['UPLOAD_FOLDER'] in path
def test_output_folder(self, app):
"""output folder_type should use OUTPUT_FOLDER config."""
with app.app_context():
_, path = generate_safe_path('pdf', folder_type='output')
assert app.config['OUTPUT_FOLDER'] in path
class TestGetOutputPath:
def test_returns_correct_path(self, app):
"""Should return path in OUTPUT_FOLDER with task_id and extension."""
with app.app_context():
path = get_output_path('my-task-id', 'pdf')
assert 'my-task-id' in path
assert path.endswith('.pdf')
assert app.config['OUTPUT_FOLDER'] in path
class TestCleanupTaskFiles:
def test_cleanup_removes_upload_dir(self, app):
"""Should remove upload directory for the task."""
with app.app_context():
task_id = 'cleanup-test-id'
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], task_id)
os.makedirs(upload_dir, exist_ok=True)
# Create a test file
with open(os.path.join(upload_dir, 'test.pdf'), 'w') as f:
f.write('test')
cleanup_task_files(task_id)
assert not os.path.exists(upload_dir)
def test_cleanup_keeps_outputs_when_requested(self, app):
"""Should keep output directory when keep_outputs=True."""
with app.app_context():
task_id = 'keep-output-id'
output_dir = os.path.join(app.config['OUTPUT_FOLDER'], task_id)
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, 'out.pdf'), 'w') as f:
f.write('test')
cleanup_task_files(task_id, keep_outputs=True)
assert os.path.exists(output_dir)

View File

@@ -0,0 +1,56 @@
"""Tests for storage service — local mode (S3 not configured in tests)."""
import os
from app.services.storage_service import StorageService
class TestStorageServiceLocal:
def test_use_s3_false_in_test(self, app):
"""S3 should not be configured in test environment."""
with app.app_context():
svc = StorageService()
assert svc.use_s3 is False
def test_upload_file_local(self, app):
"""Should copy file to outputs directory in local mode."""
with app.app_context():
svc = StorageService()
task_id = 'local-upload-test'
# Create a source file
input_path = '/tmp/test_storage_input.pdf'
with open(input_path, 'wb') as f:
f.write(b'%PDF-1.4 test')
key = svc.upload_file(input_path, task_id)
assert task_id in key
assert 'test_storage_input.pdf' in key
os.unlink(input_path)
def test_generate_presigned_url_local(self, app):
"""In local mode should return /api/download/... URL."""
with app.app_context():
svc = StorageService()
url = svc.generate_presigned_url(
'outputs/task-123/output.pdf',
original_filename='my-doc.pdf',
)
assert '/api/download/task-123/output.pdf' in url
assert 'name=my-doc.pdf' in url
def test_file_exists_local(self, app):
"""Should check file existence on local filesystem."""
with app.app_context():
svc = StorageService()
# Non-existent file
assert svc.file_exists('outputs/nonexistent/file.pdf') is False
# Create existing file
task_id = 'exists-test'
output_dir = os.path.join(app.config['OUTPUT_FOLDER'], task_id)
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, 'test.pdf'), 'w') as f:
f.write('test')
assert svc.file_exists(f'outputs/{task_id}/test.pdf') is True

View File

@@ -0,0 +1,66 @@
"""Tests for task status polling route."""
from unittest.mock import patch, MagicMock
class TestTaskStatus:
def test_pending_task(self, client, monkeypatch):
"""Should return PENDING state for a queued task."""
mock_result = MagicMock()
mock_result.state = 'PENDING'
mock_result.info = None
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/test-task-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['task_id'] == 'test-task-id'
assert data['state'] == 'PENDING'
assert 'progress' in data
def test_processing_task(self, client, monkeypatch):
"""Should return PROCESSING state with step info."""
mock_result = MagicMock()
mock_result.state = 'PROCESSING'
mock_result.info = {'step': 'Converting page 3 of 10...'}
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/processing-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['state'] == 'PROCESSING'
assert data['progress'] == 'Converting page 3 of 10...'
def test_success_task(self, client, monkeypatch):
"""Should return SUCCESS state with result data."""
mock_result = MagicMock()
mock_result.state = 'SUCCESS'
mock_result.result = {
'status': 'completed',
'download_url': '/api/download/task-id/output.pdf',
'filename': 'output.pdf',
}
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/success-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['state'] == 'SUCCESS'
assert data['result']['status'] == 'completed'
assert 'download_url' in data['result']
def test_failure_task(self, client, monkeypatch):
"""Should return FAILURE state with error message."""
mock_result = MagicMock()
mock_result.state = 'FAILURE'
mock_result.info = Exception('Conversion failed due to corrupt PDF.')
with patch('app.routes.tasks.AsyncResult', return_value=mock_result):
response = client.get('/api/tasks/failed-id/status')
assert response.status_code == 200
data = response.get_json()
assert data['state'] == 'FAILURE'
assert 'error' in data

View File

@@ -1,19 +1,21 @@
"""Tests for text utility functions."""
import sys
import os
# Add backend to path so we can import utils directly
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from app.utils.file_validator import validate_file
"""Tests for general utility functions."""
from app.utils.sanitizer import generate_safe_path
def test_generate_safe_path():
def test_generate_safe_path(app):
"""generate_safe_path should produce UUID-based path."""
path = generate_safe_path('uploads', 'test.pdf')
assert path.startswith('uploads')
assert path.endswith('.pdf')
# Should contain a UUID directory
parts = path.replace('\\', '/').split('/')
assert len(parts) >= 3 # uploads / uuid / filename.pdf
with app.app_context():
task_id, path = generate_safe_path('pdf', folder_type='upload')
assert task_id in path
assert path.endswith('.pdf')
# Should contain a UUID directory
parts = path.replace('\\', '/').split('/')
assert len(parts) >= 3 # /tmp/test_uploads / uuid / filename.pdf
def test_generate_safe_path_unique(app):
"""Each call should produce a unique task_id."""
with app.app_context():
id1, _ = generate_safe_path('pdf')
id2, _ = generate_safe_path('pdf')
assert id1 != id2

151
backend/tests/test_video.py Normal file
View File

@@ -0,0 +1,151 @@
"""Tests for video processing routes — Video to GIF."""
import io
from unittest.mock import MagicMock
class TestVideoToGif:
def test_to_gif_no_file(self, client):
"""POST /api/video/to-gif without file should return 400."""
response = client.post('/api/video/to-gif')
assert response.status_code == 400
data = response.get_json()
assert data['error'] == 'No file provided.'
def test_to_gif_invalid_params(self, client, monkeypatch):
"""Should return 400 for non-numeric parameters."""
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': 'abc',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert 'numeric' in response.get_json()['error'].lower()
def test_to_gif_negative_start(self, client, monkeypatch):
"""Should reject negative start time."""
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': '-5',
'duration': '5',
'fps': '10',
'width': '480',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
def test_to_gif_duration_too_long(self, client, monkeypatch):
"""Should reject duration > 15 seconds."""
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': '0',
'duration': '20',
'fps': '10',
'width': '480',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
assert '15' in response.get_json()['error']
def test_to_gif_fps_out_of_range(self, client, monkeypatch):
"""Should reject FPS > 20."""
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': '0',
'duration': '5',
'fps': '30',
'width': '480',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
def test_to_gif_width_out_of_range(self, client, monkeypatch):
"""Should reject width > 640."""
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': '0',
'duration': '5',
'fps': '10',
'width': '1000',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 400
def test_to_gif_success(self, client, monkeypatch):
"""Should return 202 with valid parameters."""
mock_task = MagicMock()
mock_task.id = 'gif-task-id'
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('test.mp4', 'mp4'),
)
monkeypatch.setattr(
'app.routes.video.generate_safe_path',
lambda ext, folder_type: ('gif-task-id', '/tmp/test_uploads/gif-task-id/test.mp4'),
)
monkeypatch.setattr(
'app.routes.video.create_gif_task.delay',
MagicMock(return_value=mock_task),
)
# Mock FileStorage.save so nothing touches disk
monkeypatch.setattr(
'werkzeug.datastructures.file_storage.FileStorage.save',
lambda self, dst, buffer_size=16384: None,
)
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'test.mp4'),
'start_time': '0',
'duration': '5',
'fps': '10',
'width': '480',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'gif-task-id'
assert 'message' in body

View File

@@ -0,0 +1,37 @@
"""Tests for video processing service."""
import os
from unittest.mock import patch, MagicMock
import pytest
from app.services.video_service import video_to_gif, VideoProcessingError
class TestVideoService:
def test_sanitizes_parameters(self, app):
"""Should clamp parameters to safe ranges."""
with app.app_context():
with patch('app.services.video_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(returncode=1, stderr='test error')
# Even with crazy params, it should clamp them
with pytest.raises(VideoProcessingError):
video_to_gif(
'/tmp/test.mp4', '/tmp/out.gif',
start_time=-10, duration=100,
fps=50, width=2000,
)
def test_ffmpeg_palette_failure_raises(self, app):
"""Should raise when ffmpeg palette generation fails."""
with app.app_context():
input_path = '/tmp/test_vid_fail.mp4'
with open(input_path, 'wb') as f:
f.write(b'\x00\x00\x00\x1cftyp')
with patch('app.services.video_service.subprocess.run') as mock_run:
mock_run.return_value = MagicMock(
returncode=1, stderr='Invalid video'
)
with pytest.raises(VideoProcessingError):
video_to_gif(input_path, '/tmp/fail_out.gif')
os.unlink(input_path)

View File

@@ -0,0 +1,83 @@
"""Tests for video task routes — Video to GIF."""
import io
from unittest.mock import MagicMock
class TestVideoTaskRoutes:
def test_video_to_gif_dispatches_task(self, client, monkeypatch):
"""Should dispatch create_gif_task with correct parameters."""
mock_task = MagicMock()
mock_task.id = 'gif-task-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('video.mp4', 'mp4'),
)
monkeypatch.setattr(
'app.routes.video.generate_safe_path',
lambda ext, folder_type: ('gif-task-id', '/tmp/test.mp4'),
)
monkeypatch.setattr(
'app.routes.video.create_gif_task.delay',
mock_delay,
)
# Simulate exact frontend request format
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'video.mp4'),
'start_time': '2.5',
'duration': '5',
'fps': '10',
'width': '480',
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
body = response.get_json()
assert body['task_id'] == 'gif-task-id'
# Verify task arguments match what the route sends
args = mock_delay.call_args[0]
assert args[0] == '/tmp/test.mp4' # input_path
assert args[1] == 'gif-task-id' # task_id
assert args[2] == 'video.mp4' # original_filename
def test_video_to_gif_default_params(self, client, monkeypatch):
"""Should use default params when not provided."""
mock_task = MagicMock()
mock_task.id = 'gif-default-id'
mock_delay = MagicMock(return_value=mock_task)
monkeypatch.setattr(
'app.routes.video.validate_file',
lambda f, allowed_types: ('video.mp4', 'mp4'),
)
monkeypatch.setattr(
'app.routes.video.generate_safe_path',
lambda ext, folder_type: ('gif-default-id', '/tmp/test.mp4'),
)
monkeypatch.setattr(
'app.routes.video.create_gif_task.delay',
mock_delay,
)
# Only send file, no extra params
data = {
'file': (io.BytesIO(b'\x00\x00\x00\x1cftyp'), 'video.mp4'),
}
response = client.post(
'/api/video/to-gif',
data=data,
content_type='multipart/form-data',
)
assert response.status_code == 202
# Defaults: start_time=0, duration=5, fps=10, width=480
args = mock_delay.call_args[0]
assert args[3] == 0 # start_time
assert args[4] == 5 # duration
assert args[5] == 10 # fps
assert args[6] == 480 # width