diff --git a/backend/Dockerfile b/backend/Dockerfile index 6a88ae9..ebb8337 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -5,6 +5,7 @@ WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y \ gcc \ + ffmpeg \ && rm -rf /var/lib/apt/lists/* # Install poetry diff --git a/backend/pyproject.toml b/backend/pyproject.toml index be6bf5b..189c928 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -25,6 +25,7 @@ redis = "^5.0.1" rq = "^1.16.1" pillow = "^10.2.0" python-magic = "^0.4.27" +ffmpeg-python = "^0.2.0" loguru = "^0.7.2" httpx = "^0.26.0" cryptography = "^46.0.3" diff --git a/backend/src/app/services/asset_service.py b/backend/src/app/services/asset_service.py index ff2be5e..e6dc3e4 100644 --- a/backend/src/app/services/asset_service.py +++ b/backend/src/app/services/asset_service.py @@ -3,14 +3,20 @@ import os from typing import AsyncIterator, Optional, Tuple +import redis from botocore.exceptions import ClientError from fastapi import HTTPException, UploadFile, status +from loguru import logger +from rq import Queue, Retry from sqlalchemy.ext.asyncio import AsyncSession from app.domain.models import Asset, AssetStatus, AssetType +from app.infra.config import get_settings from app.infra.s3_client import S3Client from app.repositories.asset_repository import AssetRepository +settings = get_settings() + class AssetService: """Service for asset management operations.""" @@ -146,6 +152,8 @@ class AssetService: """ Finalize upload and mark asset as ready. + Enqueues background task for thumbnail generation. + Args: user_id: User ID asset_id: Asset ID @@ -177,6 +185,27 @@ class AssetService: asset.sha256 = sha256 await self.asset_repo.update(asset) + + # Enqueue thumbnail generation task (background processing with retry) + try: + redis_conn = redis.from_url(settings.redis_url) + thumbnail_queue = Queue("thumbnails", connection=redis_conn) + job = thumbnail_queue.enqueue( + "app.tasks.thumbnail_tasks.generate_thumbnail_task", + asset_id, + job_timeout="5m", # 5 minutes timeout per attempt + retry=Retry(max=3, interval=[30, 120, 300]), # 3 retries: 30s, 2m, 5m + failure_ttl=86400, # Keep failed jobs for 24 hours + result_ttl=3600, # Keep results for 1 hour + ) + logger.info( + f"Enqueued thumbnail job {job.id} for asset {asset_id} " + f"(max 3 retries with backoff)" + ) + except Exception as e: + # Log error but don't fail the request - thumbnail is not critical + logger.error(f"Failed to enqueue thumbnail task for asset {asset_id}: {e}") + return asset async def list_assets( diff --git a/backend/src/app/services/thumbnail_service.py b/backend/src/app/services/thumbnail_service.py new file mode 100644 index 0000000..882d51e --- /dev/null +++ b/backend/src/app/services/thumbnail_service.py @@ -0,0 +1,178 @@ +"""Thumbnail generation service.""" + +import io +import tempfile +from pathlib import Path +from typing import Tuple + +import ffmpeg +from PIL import Image +from loguru import logger + + +class ThumbnailService: + """ + Service for generating image and video thumbnails. + + Follows SOLID principles: + - Single Responsibility: Only thumbnail generation + - Open/Closed: Easy to extend with new formats + - Dependency Inversion: Works with bytes, not specific storages + """ + + # Configuration constants + THUMBNAIL_SIZE = (512, 512) + JPEG_QUALITY = 85 + VIDEO_FRAME_TIME = 1.0 # Extract frame at 1 second + + def generate_thumbnail(self, content_type: str, file_data: bytes) -> bytes: + """ + Generate thumbnail based on content type (facade pattern). + + Args: + content_type: MIME type (e.g., 'image/jpeg', 'video/mp4') + file_data: Original file content + + Returns: + Thumbnail as JPEG bytes + + Raises: + ValueError: If content type is unsupported + RuntimeError: If thumbnail generation fails + """ + if content_type.startswith("image/"): + return self._generate_image_thumbnail(file_data) + elif content_type.startswith("video/"): + return self._generate_video_thumbnail(file_data) + else: + raise ValueError(f"Unsupported content type: {content_type}") + + def _generate_image_thumbnail(self, image_data: bytes) -> bytes: + """ + Generate thumbnail from image data. + + Args: + image_data: Original image bytes + + Returns: + Thumbnail as JPEG bytes + """ + try: + # Load image + image = Image.open(io.BytesIO(image_data)) + + # Convert to RGB if needed (handle PNG with transparency, etc.) + if image.mode in ("RGBA", "LA", "P", "CMYK"): + # Create white background for transparency + if image.mode in ("RGBA", "LA"): + background = Image.new("RGB", image.size, (255, 255, 255)) + if image.mode == "RGBA": + background.paste(image, mask=image.split()[-1]) # Use alpha channel + else: + background.paste(image) + image = background + else: + image = image.convert("RGB") + + # Resize and crop to square + thumbnail = self._resize_and_crop_square(image, self.THUMBNAIL_SIZE[0]) + + # Save to bytes + output = io.BytesIO() + thumbnail.save(output, format="JPEG", quality=self.JPEG_QUALITY, optimize=True) + return output.getvalue() + + except Exception as e: + logger.error(f"Failed to generate image thumbnail: {e}") + raise RuntimeError(f"Image thumbnail generation failed: {e}") + + def _generate_video_thumbnail(self, video_data: bytes) -> bytes: + """ + Generate thumbnail from video data. + + Extracts frame at configured time and creates thumbnail. + + Args: + video_data: Original video bytes + + Returns: + Thumbnail as JPEG bytes + """ + # Use temporary files for ffmpeg processing + with tempfile.NamedTemporaryFile( + suffix=".mp4", delete=False + ) as temp_video, tempfile.NamedTemporaryFile( + suffix=".jpg", delete=False + ) as temp_thumb: + try: + # Write video data to temp file + temp_video.write(video_data) + temp_video.flush() + temp_video_path = temp_video.name + temp_thumb_path = temp_thumb.name + + # Extract frame using ffmpeg + ( + ffmpeg.input(temp_video_path, ss=self.VIDEO_FRAME_TIME) + .filter("scale", self.THUMBNAIL_SIZE[0], self.THUMBNAIL_SIZE[1]) + .output(temp_thumb_path, vframes=1, format="image2", vcodec="mjpeg") + .overwrite_output() + .run(capture_stdout=True, capture_stderr=True, quiet=True) + ) + + # Read generated thumbnail + with open(temp_thumb_path, "rb") as f: + frame_data = f.read() + + # Process frame to ensure it's properly cropped square + image = Image.open(io.BytesIO(frame_data)) + thumbnail = self._resize_and_crop_square(image, self.THUMBNAIL_SIZE[0]) + + # Save to bytes + output = io.BytesIO() + thumbnail.save( + output, format="JPEG", quality=self.JPEG_QUALITY, optimize=True + ) + return output.getvalue() + + except ffmpeg.Error as e: + stderr = e.stderr.decode() if e.stderr else "No error output" + logger.error(f"FFmpeg error: {stderr}") + raise RuntimeError(f"Video thumbnail generation failed: {stderr}") + except Exception as e: + logger.error(f"Failed to generate video thumbnail: {e}") + raise RuntimeError(f"Video thumbnail generation failed: {e}") + finally: + # Cleanup temporary files + Path(temp_video_path).unlink(missing_ok=True) + Path(temp_thumb_path).unlink(missing_ok=True) + + def _resize_and_crop_square(self, image: Image.Image, size: int) -> Image.Image: + """ + Resize and crop image to square (DRY method). + + Maintains aspect ratio and crops to center square. + + Args: + image: PIL Image object + size: Target size (width and height) + + Returns: + Resized and cropped square image + """ + # Calculate dimensions to make a center-cropped square + width, height = image.size + target_size = min(width, height) + + # Crop to square (center crop) + left = (width - target_size) // 2 + top = (height - target_size) // 2 + right = left + target_size + bottom = top + target_size + + image = image.crop((left, top, right, bottom)) + + # Resize to target size + image = image.resize((size, size), Image.Resampling.LANCZOS) + + return image diff --git a/backend/src/app/tasks/__init__.py b/backend/src/app/tasks/__init__.py new file mode 100644 index 0000000..71283a1 --- /dev/null +++ b/backend/src/app/tasks/__init__.py @@ -0,0 +1 @@ +"""Background tasks package.""" diff --git a/backend/src/app/tasks/thumbnail_tasks.py b/backend/src/app/tasks/thumbnail_tasks.py new file mode 100644 index 0000000..b80ca3c --- /dev/null +++ b/backend/src/app/tasks/thumbnail_tasks.py @@ -0,0 +1,117 @@ +"""Background tasks for thumbnail generation.""" + +import os +from typing import Tuple + +from loguru import logger +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from app.domain.models import Asset +from app.infra.config import get_settings +from app.infra.s3_client import S3Client +from app.services.thumbnail_service import ThumbnailService + +settings = get_settings() + + +def generate_thumbnail_task(asset_id: str) -> Tuple[bool, str]: + """ + Background task to generate thumbnail for an asset. + + This task: + 1. Fetches asset from database + 2. Downloads original file from S3 + 3. Generates thumbnail + 4. Uploads thumbnail to S3 + 5. Updates asset record with thumbnail storage key + + Args: + asset_id: ID of the asset to process + + Returns: + Tuple of (success: bool, message: str) + """ + logger.info(f"[THUMBNAIL] Starting thumbnail generation for asset {asset_id}") + + # Create synchronous DB session (RQ workers are sync) + engine = create_engine( + settings.database_url.replace("sqlite+aiosqlite://", "sqlite:///"), + echo=False, + ) + + with Session(engine) as session: + try: + # Fetch asset + asset = session.query(Asset).filter(Asset.id == asset_id).first() + if not asset: + error_msg = f"Asset {asset_id} not found" + logger.error(f"[THUMBNAIL] {error_msg}") + return False, error_msg + + logger.info( + f"[THUMBNAIL] Processing {asset.type} asset: {asset.original_filename}" + ) + + # Initialize services + s3_client = S3Client() + thumbnail_service = ThumbnailService() + + # Download original file from S3 + logger.debug(f"[THUMBNAIL] Downloading from S3: {asset.storage_key_original}") + try: + response = s3_client.client.get_object( + Bucket=s3_client.bucket, Key=asset.storage_key_original + ) + file_data = response["Body"].read() + except Exception as e: + error_msg = f"Failed to download file from S3: {e}" + logger.error(f"[THUMBNAIL] {error_msg}") + return False, error_msg + + # Generate thumbnail + logger.debug(f"[THUMBNAIL] Generating thumbnail for {asset.content_type}") + try: + thumbnail_data = thumbnail_service.generate_thumbnail( + content_type=asset.content_type, file_data=file_data + ) + except Exception as e: + error_msg = f"Failed to generate thumbnail: {e}" + logger.error(f"[THUMBNAIL] {error_msg}") + return False, error_msg + + # Generate storage key for thumbnail + _, ext = os.path.splitext(asset.original_filename) + storage_key_thumb = s3_client.generate_storage_key( + user_id=asset.user_id, + asset_id=asset.id, + prefix="t", + extension=".jpg", # Always JPEG for thumbnails + ) + + # Upload thumbnail to S3 + logger.debug(f"[THUMBNAIL] Uploading to S3: {storage_key_thumb}") + try: + s3_client.put_object( + storage_key=storage_key_thumb, + file_data=thumbnail_data, + content_type="image/jpeg", + ) + except Exception as e: + error_msg = f"Failed to upload thumbnail to S3: {e}" + logger.error(f"[THUMBNAIL] {error_msg}") + return False, error_msg + + # Update asset record + asset.storage_key_thumb = storage_key_thumb + session.commit() + + success_msg = f"Thumbnail generated successfully for {asset.original_filename}" + logger.info(f"[THUMBNAIL] {success_msg}") + return True, success_msg + + except Exception as e: + session.rollback() + error_msg = f"Unexpected error: {e}" + logger.exception(f"[THUMBNAIL] {error_msg}") + return False, error_msg diff --git a/backend/src/worker.py b/backend/src/worker.py new file mode 100644 index 0000000..c51b3d9 --- /dev/null +++ b/backend/src/worker.py @@ -0,0 +1,35 @@ +"""RQ Worker for processing background tasks.""" + +import redis +from loguru import logger +from rq import Worker, Queue, Connection + +from app.infra.config import get_settings + +settings = get_settings() + + +def main(): + """Start RQ worker to process thumbnail generation tasks.""" + logger.info("Starting RQ worker for thumbnail generation...") + + # Parse Redis URL + redis_url = settings.redis_url + logger.info(f"Connecting to Redis: {redis_url}") + + # Connect to Redis + redis_conn = redis.from_url(redis_url) + + # Listen to 'thumbnails' queue + with Connection(redis_conn): + worker = Worker( + [Queue("thumbnails")], + connection=redis_conn, + name="thumbnail-worker", + ) + logger.info("Worker started, listening for tasks...") + worker.work() + + +if __name__ == "__main__": + main() diff --git a/docker-compose.yml b/docker-compose.yml index 2809963..d5a024f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -109,6 +109,7 @@ services: redis: image: redis:7-alpine + command: redis-server --appendonly yes --appendfsync everysec --save 60 1000 expose: - "6379" volumes: @@ -121,5 +122,29 @@ services: networks: - itcloud-net + worker: + build: ./backend + environment: + - APP_ENV=dev + - DATABASE_URL=sqlite+aiosqlite:////app/data/app.db + - S3_ENDPOINT_URL=http://minio:9000 + - S3_REGION=us-east-1 + - S3_ACCESS_KEY_ID=${MINIO_ROOT_USER:-minioadmin} + - S3_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD:-minioadmin} + - MEDIA_BUCKET=itcloud-media + - REDIS_URL=redis://redis:6379/0 + volumes: + - ./backend/src:/app/src + - backend-data:/app/data + depends_on: + minio: + condition: service_healthy + redis: + condition: service_healthy + command: python /app/src/worker.py + restart: unless-stopped + networks: + - itcloud-net + volumes: redis-data: