feat: Initialize frontend with React, Vite, and Tailwind CSS
- Set up main entry point for React application. - Create About, Home, NotFound, Privacy, and Terms pages with SEO support. - Implement API service for file uploads and task management. - Add global styles using Tailwind CSS. - Create utility functions for SEO and text processing. - Configure Vite for development and production builds. - Set up Nginx configuration for serving frontend and backend. - Add scripts for cleanup of expired files and sitemap generation. - Implement deployment script for production environment.
This commit is contained in:
1
backend/app/services/__init__.py
Normal file
1
backend/app/services/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Backend application services."""
|
||||
109
backend/app/services/compress_service.py
Normal file
109
backend/app/services/compress_service.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""PDF compression service using Ghostscript."""
|
||||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PDFCompressionError(Exception):
|
||||
"""Custom exception for PDF compression failures."""
|
||||
pass
|
||||
|
||||
|
||||
# Ghostscript quality presets
|
||||
QUALITY_PRESETS = {
|
||||
"low": "/screen", # 72 dpi — smallest file, lowest quality
|
||||
"medium": "/ebook", # 150 dpi — good balance (default)
|
||||
"high": "/printer", # 300 dpi — high quality, moderate compression
|
||||
}
|
||||
|
||||
|
||||
def compress_pdf(
|
||||
input_path: str, output_path: str, quality: str = "medium"
|
||||
) -> dict:
|
||||
"""
|
||||
Compress a PDF file using Ghostscript.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF file
|
||||
output_path: Path for the compressed output file
|
||||
quality: Compression quality — "low", "medium", or "high"
|
||||
|
||||
Returns:
|
||||
dict with original_size, compressed_size, reduction_percent
|
||||
|
||||
Raises:
|
||||
PDFCompressionError: If compression fails
|
||||
"""
|
||||
if quality not in QUALITY_PRESETS:
|
||||
quality = "medium"
|
||||
|
||||
gs_quality = QUALITY_PRESETS[quality]
|
||||
|
||||
# Ensure output directory exists
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"gs",
|
||||
"-sDEVICE=pdfwrite",
|
||||
"-dCompatibilityLevel=1.4",
|
||||
f"-dPDFSETTINGS={gs_quality}",
|
||||
"-dNOPAUSE",
|
||||
"-dQUIET",
|
||||
"-dBATCH",
|
||||
"-dColorImageResolution=150",
|
||||
"-dGrayImageResolution=150",
|
||||
"-dMonoImageResolution=150",
|
||||
f"-sOutputFile={output_path}",
|
||||
input_path,
|
||||
]
|
||||
|
||||
try:
|
||||
original_size = os.path.getsize(input_path)
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Ghostscript compression failed: {result.stderr}")
|
||||
raise PDFCompressionError(
|
||||
f"Compression failed: {result.stderr or 'Unknown error'}"
|
||||
)
|
||||
|
||||
if not os.path.exists(output_path):
|
||||
raise PDFCompressionError("Compressed file was not created.")
|
||||
|
||||
compressed_size = os.path.getsize(output_path)
|
||||
|
||||
# If compressed file is larger, keep original
|
||||
if compressed_size >= original_size:
|
||||
import shutil
|
||||
shutil.copy2(input_path, output_path)
|
||||
compressed_size = original_size
|
||||
|
||||
reduction = (
|
||||
((original_size - compressed_size) / original_size) * 100
|
||||
if original_size > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"PDF compression: {original_size} → {compressed_size} "
|
||||
f"({reduction:.1f}% reduction)"
|
||||
)
|
||||
|
||||
return {
|
||||
"original_size": original_size,
|
||||
"compressed_size": compressed_size,
|
||||
"reduction_percent": round(reduction, 1),
|
||||
}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
raise PDFCompressionError("Compression timed out. File may be too large.")
|
||||
except FileNotFoundError:
|
||||
raise PDFCompressionError("Ghostscript is not installed on the server.")
|
||||
169
backend/app/services/image_service.py
Normal file
169
backend/app/services/image_service.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Image processing service using Pillow."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageProcessingError(Exception):
|
||||
"""Custom exception for image processing failures."""
|
||||
pass
|
||||
|
||||
|
||||
# Supported format mappings
|
||||
FORMAT_MAP = {
|
||||
"jpg": "JPEG",
|
||||
"jpeg": "JPEG",
|
||||
"png": "PNG",
|
||||
"webp": "WEBP",
|
||||
}
|
||||
|
||||
|
||||
def convert_image(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
output_format: str,
|
||||
quality: int = 85,
|
||||
) -> dict:
|
||||
"""
|
||||
Convert an image to a different format.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input image
|
||||
output_path: Path for the output image
|
||||
output_format: Target format ("jpg", "png", "webp")
|
||||
quality: Output quality 1-100 (for lossy formats)
|
||||
|
||||
Returns:
|
||||
dict with original_size, converted_size, dimensions
|
||||
|
||||
Raises:
|
||||
ImageProcessingError: If conversion fails
|
||||
"""
|
||||
output_format = output_format.lower()
|
||||
if output_format not in FORMAT_MAP:
|
||||
raise ImageProcessingError(
|
||||
f"Unsupported output format: {output_format}. "
|
||||
f"Supported: {', '.join(FORMAT_MAP.keys())}"
|
||||
)
|
||||
|
||||
pil_format = FORMAT_MAP[output_format]
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
try:
|
||||
original_size = os.path.getsize(input_path)
|
||||
|
||||
# Open and re-encode (strips any malicious payloads)
|
||||
with Image.open(input_path) as img:
|
||||
# Convert RGBA to RGB for JPEG (JPEG doesn't support alpha)
|
||||
if pil_format == "JPEG" and img.mode in ("RGBA", "P", "LA"):
|
||||
background = Image.new("RGB", img.size, (255, 255, 255))
|
||||
if img.mode == "P":
|
||||
img = img.convert("RGBA")
|
||||
background.paste(img, mask=img.split()[-1] if "A" in img.mode else None)
|
||||
img = background
|
||||
|
||||
width, height = img.size
|
||||
|
||||
# Save with quality setting
|
||||
save_kwargs = {}
|
||||
if pil_format in ("JPEG", "WEBP"):
|
||||
save_kwargs["quality"] = max(1, min(100, quality))
|
||||
save_kwargs["optimize"] = True
|
||||
elif pil_format == "PNG":
|
||||
save_kwargs["optimize"] = True
|
||||
|
||||
img.save(output_path, format=pil_format, **save_kwargs)
|
||||
|
||||
converted_size = os.path.getsize(output_path)
|
||||
|
||||
logger.info(
|
||||
f"Image conversion: {input_path} → {output_format} "
|
||||
f"({original_size} → {converted_size})"
|
||||
)
|
||||
|
||||
return {
|
||||
"original_size": original_size,
|
||||
"converted_size": converted_size,
|
||||
"width": width,
|
||||
"height": height,
|
||||
"format": output_format,
|
||||
}
|
||||
|
||||
except (IOError, OSError, Image.DecompressionBombError) as e:
|
||||
raise ImageProcessingError(f"Image processing failed: {str(e)}")
|
||||
|
||||
|
||||
def resize_image(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
width: int | None = None,
|
||||
height: int | None = None,
|
||||
quality: int = 85,
|
||||
) -> dict:
|
||||
"""
|
||||
Resize an image while maintaining aspect ratio.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input image
|
||||
output_path: Path for the resized image
|
||||
width: Target width (None to auto-calculate from height)
|
||||
height: Target height (None to auto-calculate from width)
|
||||
quality: Output quality 1-100
|
||||
|
||||
Returns:
|
||||
dict with original and new dimensions
|
||||
|
||||
Raises:
|
||||
ImageProcessingError: If resize fails
|
||||
"""
|
||||
if width is None and height is None:
|
||||
raise ImageProcessingError("At least one of width or height must be specified.")
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
try:
|
||||
with Image.open(input_path) as img:
|
||||
orig_width, orig_height = img.size
|
||||
|
||||
# Calculate missing dimension to maintain aspect ratio
|
||||
if width and not height:
|
||||
ratio = width / orig_width
|
||||
height = int(orig_height * ratio)
|
||||
elif height and not width:
|
||||
ratio = height / orig_height
|
||||
width = int(orig_width * ratio)
|
||||
|
||||
# Resize using high-quality resampling
|
||||
resized = img.resize((width, height), Image.Resampling.LANCZOS)
|
||||
|
||||
# Detect format from output extension
|
||||
ext = os.path.splitext(output_path)[1].lower().strip(".")
|
||||
pil_format = FORMAT_MAP.get(ext, "PNG")
|
||||
|
||||
save_kwargs = {"optimize": True}
|
||||
if pil_format in ("JPEG", "WEBP"):
|
||||
save_kwargs["quality"] = quality
|
||||
# Handle RGBA for JPEG
|
||||
if resized.mode in ("RGBA", "P", "LA"):
|
||||
background = Image.new("RGB", resized.size, (255, 255, 255))
|
||||
if resized.mode == "P":
|
||||
resized = resized.convert("RGBA")
|
||||
background.paste(
|
||||
resized, mask=resized.split()[-1] if "A" in resized.mode else None
|
||||
)
|
||||
resized = background
|
||||
|
||||
resized.save(output_path, format=pil_format, **save_kwargs)
|
||||
|
||||
return {
|
||||
"original_width": orig_width,
|
||||
"original_height": orig_height,
|
||||
"new_width": width,
|
||||
"new_height": height,
|
||||
}
|
||||
|
||||
except (IOError, OSError, Image.DecompressionBombError) as e:
|
||||
raise ImageProcessingError(f"Image resize failed: {str(e)}")
|
||||
170
backend/app/services/pdf_service.py
Normal file
170
backend/app/services/pdf_service.py
Normal file
@@ -0,0 +1,170 @@
|
||||
"""PDF conversion service using LibreOffice headless."""
|
||||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
import tempfile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PDFConversionError(Exception):
|
||||
"""Custom exception for PDF conversion failures."""
|
||||
pass
|
||||
|
||||
|
||||
def pdf_to_word(input_path: str, output_dir: str) -> str:
|
||||
"""
|
||||
Convert a PDF file to Word (DOCX) format using LibreOffice headless.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input PDF file
|
||||
output_dir: Directory for the output file
|
||||
|
||||
Returns:
|
||||
Path to the converted DOCX file
|
||||
|
||||
Raises:
|
||||
PDFConversionError: If conversion fails
|
||||
"""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Use a unique user profile per process to avoid lock conflicts
|
||||
user_install_dir = tempfile.mkdtemp(prefix="lo_pdf2word_")
|
||||
|
||||
cmd = [
|
||||
"soffice",
|
||||
"--headless",
|
||||
"--norestore",
|
||||
f"-env:UserInstallation=file://{user_install_dir}",
|
||||
"--infilter=writer_pdf_import",
|
||||
"--convert-to", "docx",
|
||||
"--outdir", output_dir,
|
||||
input_path,
|
||||
]
|
||||
|
||||
try:
|
||||
logger.info(f"Running LibreOffice PDF→Word: {' '.join(cmd)}")
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120, # 2 minute timeout
|
||||
env={**os.environ, "HOME": user_install_dir},
|
||||
)
|
||||
|
||||
logger.info(f"LibreOffice stdout: {result.stdout}")
|
||||
logger.info(f"LibreOffice stderr: {result.stderr}")
|
||||
logger.info(f"LibreOffice returncode: {result.returncode}")
|
||||
|
||||
# LibreOffice names output based on input filename
|
||||
input_basename = os.path.splitext(os.path.basename(input_path))[0]
|
||||
output_path = os.path.join(output_dir, f"{input_basename}.docx")
|
||||
|
||||
# Check output file first — LibreOffice may return non-zero
|
||||
# due to harmless warnings (e.g. javaldx) even on success
|
||||
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
|
||||
logger.info(f"PDF→Word conversion successful: {output_path}")
|
||||
return output_path
|
||||
|
||||
# No output file — now treat as real error
|
||||
if result.returncode != 0:
|
||||
# Filter out known harmless warnings
|
||||
stderr = result.stderr or ""
|
||||
real_errors = [
|
||||
line for line in stderr.strip().splitlines()
|
||||
if not line.startswith("Warning: failed to launch javaldx")
|
||||
]
|
||||
error_msg = "\n".join(real_errors) if real_errors else stderr
|
||||
logger.error(f"LibreOffice PDF→Word failed: {error_msg}")
|
||||
raise PDFConversionError(
|
||||
f"Conversion failed: {error_msg or 'Unknown error'}"
|
||||
)
|
||||
|
||||
# Return code 0 but no output file
|
||||
files_in_dir = os.listdir(output_dir) if os.path.exists(output_dir) else []
|
||||
logger.error(
|
||||
f"Expected output not found at {output_path}. "
|
||||
f"Files in output dir: {files_in_dir}"
|
||||
)
|
||||
raise PDFConversionError("Output file was not created.")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
raise PDFConversionError("Conversion timed out. File may be too large.")
|
||||
except FileNotFoundError:
|
||||
raise PDFConversionError("LibreOffice is not installed on the server.")
|
||||
finally:
|
||||
# Cleanup temporary user profile
|
||||
import shutil
|
||||
shutil.rmtree(user_install_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def word_to_pdf(input_path: str, output_dir: str) -> str:
|
||||
"""
|
||||
Convert a Word (DOC/DOCX) file to PDF format using LibreOffice headless.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input Word file
|
||||
output_dir: Directory for the output file
|
||||
|
||||
Returns:
|
||||
Path to the converted PDF file
|
||||
|
||||
Raises:
|
||||
PDFConversionError: If conversion fails
|
||||
"""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Use a unique user profile per process to avoid lock conflicts
|
||||
user_install_dir = tempfile.mkdtemp(prefix="lo_word2pdf_")
|
||||
|
||||
cmd = [
|
||||
"soffice",
|
||||
"--headless",
|
||||
"--norestore",
|
||||
f"-env:UserInstallation=file://{user_install_dir}",
|
||||
"--convert-to", "pdf",
|
||||
"--outdir", output_dir,
|
||||
input_path,
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
env={**os.environ, "HOME": user_install_dir},
|
||||
)
|
||||
|
||||
input_basename = os.path.splitext(os.path.basename(input_path))[0]
|
||||
output_path = os.path.join(output_dir, f"{input_basename}.pdf")
|
||||
|
||||
# Check output file first — LibreOffice may return non-zero
|
||||
# due to harmless warnings (e.g. javaldx) even on success
|
||||
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
|
||||
logger.info(f"Word→PDF conversion successful: {output_path}")
|
||||
return output_path
|
||||
|
||||
if result.returncode != 0:
|
||||
stderr = result.stderr or ""
|
||||
real_errors = [
|
||||
line for line in stderr.strip().splitlines()
|
||||
if not line.startswith("Warning: failed to launch javaldx")
|
||||
]
|
||||
error_msg = "\n".join(real_errors) if real_errors else stderr
|
||||
logger.error(f"LibreOffice Word→PDF failed: {error_msg}")
|
||||
raise PDFConversionError(
|
||||
f"Conversion failed: {error_msg or 'Unknown error'}"
|
||||
)
|
||||
|
||||
raise PDFConversionError("Output file was not created.")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
raise PDFConversionError("Conversion timed out. File may be too large.")
|
||||
except FileNotFoundError:
|
||||
raise PDFConversionError("LibreOffice is not installed on the server.")
|
||||
finally:
|
||||
# Cleanup temporary user profile
|
||||
import shutil
|
||||
shutil.rmtree(user_install_dir, ignore_errors=True)
|
||||
154
backend/app/services/storage_service.py
Normal file
154
backend/app/services/storage_service.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""Storage service — S3 in production, local files in development."""
|
||||
import os
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
from flask import current_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _is_s3_configured() -> bool:
|
||||
"""Check if AWS S3 credentials are provided."""
|
||||
key = current_app.config.get("AWS_ACCESS_KEY_ID")
|
||||
secret = current_app.config.get("AWS_SECRET_ACCESS_KEY")
|
||||
return bool(key and secret and key.strip() and secret.strip())
|
||||
|
||||
|
||||
class StorageService:
|
||||
"""Handle file storage — uses S3 when configured, local filesystem otherwise."""
|
||||
|
||||
def __init__(self):
|
||||
self._client = None
|
||||
|
||||
@property
|
||||
def use_s3(self) -> bool:
|
||||
return _is_s3_configured()
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
"""Lazy-initialize S3 client (only when S3 is configured)."""
|
||||
if self._client is None:
|
||||
import boto3
|
||||
self._client = boto3.client(
|
||||
"s3",
|
||||
region_name=current_app.config["AWS_S3_REGION"],
|
||||
aws_access_key_id=current_app.config["AWS_ACCESS_KEY_ID"],
|
||||
aws_secret_access_key=current_app.config["AWS_SECRET_ACCESS_KEY"],
|
||||
)
|
||||
return self._client
|
||||
|
||||
@property
|
||||
def bucket(self):
|
||||
return current_app.config["AWS_S3_BUCKET"]
|
||||
|
||||
def upload_file(self, local_path: str, task_id: str, folder: str = "outputs") -> str:
|
||||
"""
|
||||
Upload / store a file.
|
||||
|
||||
In S3 mode: uploads to S3 bucket.
|
||||
In local mode: copies file to the outputs directory.
|
||||
|
||||
Returns:
|
||||
S3 key or local relative path (used as identifier)
|
||||
"""
|
||||
filename = os.path.basename(local_path)
|
||||
key = f"{folder}/{task_id}/{filename}"
|
||||
|
||||
if self.use_s3:
|
||||
from botocore.exceptions import ClientError
|
||||
try:
|
||||
self.client.upload_file(local_path, self.bucket, key)
|
||||
return key
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"Failed to upload file to S3: {e}")
|
||||
else:
|
||||
# Local mode — keep file in the outputs directory
|
||||
output_dir = current_app.config["OUTPUT_FOLDER"]
|
||||
dest_dir = os.path.join(output_dir, task_id)
|
||||
os.makedirs(dest_dir, exist_ok=True)
|
||||
dest_path = os.path.join(dest_dir, filename)
|
||||
|
||||
if os.path.abspath(local_path) != os.path.abspath(dest_path):
|
||||
shutil.copy2(local_path, dest_path)
|
||||
|
||||
logger.info(f"[Local] Stored file: {dest_path}")
|
||||
return key
|
||||
|
||||
def generate_presigned_url(
|
||||
self, s3_key: str, expiry: int | None = None, original_filename: str | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Generate a download URL.
|
||||
|
||||
S3 mode: presigned URL.
|
||||
Local mode: /api/download/<task_id>/<filename>
|
||||
"""
|
||||
if self.use_s3:
|
||||
from botocore.exceptions import ClientError
|
||||
if expiry is None:
|
||||
expiry = current_app.config.get("FILE_EXPIRY_SECONDS", 1800)
|
||||
|
||||
params = {
|
||||
"Bucket": self.bucket,
|
||||
"Key": s3_key,
|
||||
}
|
||||
if original_filename:
|
||||
params["ResponseContentDisposition"] = (
|
||||
f'attachment; filename="{original_filename}"'
|
||||
)
|
||||
try:
|
||||
url = self.client.generate_presigned_url(
|
||||
"get_object",
|
||||
Params=params,
|
||||
ExpiresIn=expiry,
|
||||
)
|
||||
return url
|
||||
except ClientError as e:
|
||||
raise RuntimeError(f"Failed to generate presigned URL: {e}")
|
||||
else:
|
||||
# Local mode — return path to Flask download route
|
||||
parts = s3_key.strip("/").split("/")
|
||||
# key = "outputs/<task_id>/<filename>"
|
||||
if len(parts) >= 3:
|
||||
task_id = parts[1]
|
||||
filename = parts[2]
|
||||
else:
|
||||
task_id = parts[0]
|
||||
filename = parts[-1]
|
||||
|
||||
download_name = original_filename or filename
|
||||
return f"/api/download/{task_id}/{filename}?name={download_name}"
|
||||
|
||||
def delete_file(self, s3_key: str):
|
||||
"""Delete a file from S3 (no-op in local mode)."""
|
||||
if self.use_s3:
|
||||
from botocore.exceptions import ClientError
|
||||
try:
|
||||
self.client.delete_object(Bucket=self.bucket, Key=s3_key)
|
||||
except ClientError:
|
||||
pass
|
||||
|
||||
def file_exists(self, s3_key: str) -> bool:
|
||||
"""Check if a file exists."""
|
||||
if self.use_s3:
|
||||
from botocore.exceptions import ClientError
|
||||
try:
|
||||
self.client.head_object(Bucket=self.bucket, Key=s3_key)
|
||||
return True
|
||||
except ClientError:
|
||||
return False
|
||||
else:
|
||||
parts = s3_key.strip("/").split("/")
|
||||
if len(parts) >= 3:
|
||||
task_id = parts[1]
|
||||
filename = parts[2]
|
||||
else:
|
||||
task_id = parts[0]
|
||||
filename = parts[-1]
|
||||
output_dir = current_app.config["OUTPUT_FOLDER"]
|
||||
return os.path.isfile(os.path.join(output_dir, task_id, filename))
|
||||
|
||||
|
||||
# Singleton instance
|
||||
storage = StorageService()
|
||||
176
backend/app/services/video_service.py
Normal file
176
backend/app/services/video_service.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""Video to GIF conversion service using ffmpeg."""
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VideoProcessingError(Exception):
|
||||
"""Custom exception for video processing failures."""
|
||||
pass
|
||||
|
||||
|
||||
# Safety constraints
|
||||
MAX_DURATION = 15 # seconds
|
||||
MAX_WIDTH = 640 # pixels
|
||||
MAX_FPS = 20
|
||||
DEFAULT_FPS = 10
|
||||
DEFAULT_WIDTH = 480
|
||||
|
||||
|
||||
def video_to_gif(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
start_time: float = 0,
|
||||
duration: float = 5,
|
||||
fps: int = DEFAULT_FPS,
|
||||
width: int = DEFAULT_WIDTH,
|
||||
) -> dict:
|
||||
"""
|
||||
Convert a video clip to an animated GIF using ffmpeg.
|
||||
|
||||
Args:
|
||||
input_path: Path to the input video (MP4/WebM)
|
||||
output_path: Path for the output GIF
|
||||
start_time: Start time in seconds
|
||||
duration: Duration in seconds (max 15)
|
||||
fps: Frames per second (max 20)
|
||||
width: Output width in pixels (max 640)
|
||||
|
||||
Returns:
|
||||
dict with output_size, duration, fps, dimensions
|
||||
|
||||
Raises:
|
||||
VideoProcessingError: If conversion fails
|
||||
"""
|
||||
# Sanitize numeric parameters (prevent injection)
|
||||
start_time = max(0, float(start_time))
|
||||
duration = max(0.5, min(MAX_DURATION, float(duration)))
|
||||
fps = max(1, min(MAX_FPS, int(fps)))
|
||||
width = max(100, min(MAX_WIDTH, int(width)))
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
# Two-pass palette approach for high-quality GIF
|
||||
palette_path = output_path + ".palette.png"
|
||||
|
||||
try:
|
||||
# Pass 1: Generate optimized palette
|
||||
palette_cmd = [
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-ss", str(start_time),
|
||||
"-t", str(duration),
|
||||
"-i", input_path,
|
||||
"-vf", f"fps={fps},scale={width}:-1:flags=lanczos,palettegen=stats_mode=diff",
|
||||
palette_path,
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
palette_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"ffmpeg palette generation failed: {result.stderr}")
|
||||
raise VideoProcessingError("Failed to process video for GIF creation.")
|
||||
|
||||
# Pass 2: Create GIF using palette
|
||||
gif_cmd = [
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-ss", str(start_time),
|
||||
"-t", str(duration),
|
||||
"-i", input_path,
|
||||
"-i", palette_path,
|
||||
"-lavfi", f"fps={fps},scale={width}:-1:flags=lanczos [x]; [x][1:v] paletteuse=dither=bayer:bayer_scale=5",
|
||||
output_path,
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
gif_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"ffmpeg GIF creation failed: {result.stderr}")
|
||||
raise VideoProcessingError("Failed to create GIF from video.")
|
||||
|
||||
if not os.path.exists(output_path):
|
||||
raise VideoProcessingError("GIF file was not created.")
|
||||
|
||||
output_size = os.path.getsize(output_path)
|
||||
|
||||
# Get actual output dimensions
|
||||
actual_width, actual_height = _get_gif_dimensions(output_path)
|
||||
|
||||
logger.info(
|
||||
f"Video→GIF: {input_path} → {output_path} "
|
||||
f"({output_size} bytes, {duration}s, {fps}fps, {actual_width}x{actual_height})"
|
||||
)
|
||||
|
||||
return {
|
||||
"output_size": output_size,
|
||||
"duration": duration,
|
||||
"fps": fps,
|
||||
"width": actual_width,
|
||||
"height": actual_height,
|
||||
}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
raise VideoProcessingError("GIF creation timed out. Video may be too large.")
|
||||
except FileNotFoundError:
|
||||
raise VideoProcessingError("ffmpeg is not installed on the server.")
|
||||
finally:
|
||||
# Cleanup palette file
|
||||
if os.path.exists(palette_path):
|
||||
os.remove(palette_path)
|
||||
|
||||
|
||||
def get_video_duration(input_path: str) -> float:
|
||||
"""Get the duration of a video file in seconds."""
|
||||
cmd = [
|
||||
"ffprobe",
|
||||
"-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
input_path,
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10
|
||||
)
|
||||
return float(result.stdout.strip())
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
return 0.0
|
||||
|
||||
|
||||
def _get_gif_dimensions(gif_path: str) -> tuple[int, int]:
|
||||
"""Get GIF dimensions using ffprobe."""
|
||||
cmd = [
|
||||
"ffprobe",
|
||||
"-v", "error",
|
||||
"-select_streams", "v:0",
|
||||
"-show_entries", "stream=width,height",
|
||||
"-of", "csv=p=0",
|
||||
gif_path,
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10
|
||||
)
|
||||
parts = result.stdout.strip().split(",")
|
||||
if len(parts) == 2:
|
||||
return int(parts[0]), int(parts[1])
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
pass
|
||||
|
||||
return 0, 0
|
||||
Reference in New Issue
Block a user