add redis temp

This commit is contained in:
itqop 2025-12-31 00:19:33 +03:00
parent a3615df165
commit 561c4e7ed1
8 changed files with 387 additions and 0 deletions

View File

@ -5,6 +5,7 @@ WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# Install poetry

View File

@ -25,6 +25,7 @@ redis = "^5.0.1"
rq = "^1.16.1"
pillow = "^10.2.0"
python-magic = "^0.4.27"
ffmpeg-python = "^0.2.0"
loguru = "^0.7.2"
httpx = "^0.26.0"
cryptography = "^46.0.3"

View File

@ -3,14 +3,20 @@
import os
from typing import AsyncIterator, Optional, Tuple
import redis
from botocore.exceptions import ClientError
from fastapi import HTTPException, UploadFile, status
from loguru import logger
from rq import Queue, Retry
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models import Asset, AssetStatus, AssetType
from app.infra.config import get_settings
from app.infra.s3_client import S3Client
from app.repositories.asset_repository import AssetRepository
settings = get_settings()
class AssetService:
"""Service for asset management operations."""
@ -146,6 +152,8 @@ class AssetService:
"""
Finalize upload and mark asset as ready.
Enqueues background task for thumbnail generation.
Args:
user_id: User ID
asset_id: Asset ID
@ -177,6 +185,27 @@ class AssetService:
asset.sha256 = sha256
await self.asset_repo.update(asset)
# Enqueue thumbnail generation task (background processing with retry)
try:
redis_conn = redis.from_url(settings.redis_url)
thumbnail_queue = Queue("thumbnails", connection=redis_conn)
job = thumbnail_queue.enqueue(
"app.tasks.thumbnail_tasks.generate_thumbnail_task",
asset_id,
job_timeout="5m", # 5 minutes timeout per attempt
retry=Retry(max=3, interval=[30, 120, 300]), # 3 retries: 30s, 2m, 5m
failure_ttl=86400, # Keep failed jobs for 24 hours
result_ttl=3600, # Keep results for 1 hour
)
logger.info(
f"Enqueued thumbnail job {job.id} for asset {asset_id} "
f"(max 3 retries with backoff)"
)
except Exception as e:
# Log error but don't fail the request - thumbnail is not critical
logger.error(f"Failed to enqueue thumbnail task for asset {asset_id}: {e}")
return asset
async def list_assets(

View File

@ -0,0 +1,178 @@
"""Thumbnail generation service."""
import io
import tempfile
from pathlib import Path
from typing import Tuple
import ffmpeg
from PIL import Image
from loguru import logger
class ThumbnailService:
"""
Service for generating image and video thumbnails.
Follows SOLID principles:
- Single Responsibility: Only thumbnail generation
- Open/Closed: Easy to extend with new formats
- Dependency Inversion: Works with bytes, not specific storages
"""
# Configuration constants
THUMBNAIL_SIZE = (512, 512)
JPEG_QUALITY = 85
VIDEO_FRAME_TIME = 1.0 # Extract frame at 1 second
def generate_thumbnail(self, content_type: str, file_data: bytes) -> bytes:
"""
Generate thumbnail based on content type (facade pattern).
Args:
content_type: MIME type (e.g., 'image/jpeg', 'video/mp4')
file_data: Original file content
Returns:
Thumbnail as JPEG bytes
Raises:
ValueError: If content type is unsupported
RuntimeError: If thumbnail generation fails
"""
if content_type.startswith("image/"):
return self._generate_image_thumbnail(file_data)
elif content_type.startswith("video/"):
return self._generate_video_thumbnail(file_data)
else:
raise ValueError(f"Unsupported content type: {content_type}")
def _generate_image_thumbnail(self, image_data: bytes) -> bytes:
"""
Generate thumbnail from image data.
Args:
image_data: Original image bytes
Returns:
Thumbnail as JPEG bytes
"""
try:
# Load image
image = Image.open(io.BytesIO(image_data))
# Convert to RGB if needed (handle PNG with transparency, etc.)
if image.mode in ("RGBA", "LA", "P", "CMYK"):
# Create white background for transparency
if image.mode in ("RGBA", "LA"):
background = Image.new("RGB", image.size, (255, 255, 255))
if image.mode == "RGBA":
background.paste(image, mask=image.split()[-1]) # Use alpha channel
else:
background.paste(image)
image = background
else:
image = image.convert("RGB")
# Resize and crop to square
thumbnail = self._resize_and_crop_square(image, self.THUMBNAIL_SIZE[0])
# Save to bytes
output = io.BytesIO()
thumbnail.save(output, format="JPEG", quality=self.JPEG_QUALITY, optimize=True)
return output.getvalue()
except Exception as e:
logger.error(f"Failed to generate image thumbnail: {e}")
raise RuntimeError(f"Image thumbnail generation failed: {e}")
def _generate_video_thumbnail(self, video_data: bytes) -> bytes:
"""
Generate thumbnail from video data.
Extracts frame at configured time and creates thumbnail.
Args:
video_data: Original video bytes
Returns:
Thumbnail as JPEG bytes
"""
# Use temporary files for ffmpeg processing
with tempfile.NamedTemporaryFile(
suffix=".mp4", delete=False
) as temp_video, tempfile.NamedTemporaryFile(
suffix=".jpg", delete=False
) as temp_thumb:
try:
# Write video data to temp file
temp_video.write(video_data)
temp_video.flush()
temp_video_path = temp_video.name
temp_thumb_path = temp_thumb.name
# Extract frame using ffmpeg
(
ffmpeg.input(temp_video_path, ss=self.VIDEO_FRAME_TIME)
.filter("scale", self.THUMBNAIL_SIZE[0], self.THUMBNAIL_SIZE[1])
.output(temp_thumb_path, vframes=1, format="image2", vcodec="mjpeg")
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
# Read generated thumbnail
with open(temp_thumb_path, "rb") as f:
frame_data = f.read()
# Process frame to ensure it's properly cropped square
image = Image.open(io.BytesIO(frame_data))
thumbnail = self._resize_and_crop_square(image, self.THUMBNAIL_SIZE[0])
# Save to bytes
output = io.BytesIO()
thumbnail.save(
output, format="JPEG", quality=self.JPEG_QUALITY, optimize=True
)
return output.getvalue()
except ffmpeg.Error as e:
stderr = e.stderr.decode() if e.stderr else "No error output"
logger.error(f"FFmpeg error: {stderr}")
raise RuntimeError(f"Video thumbnail generation failed: {stderr}")
except Exception as e:
logger.error(f"Failed to generate video thumbnail: {e}")
raise RuntimeError(f"Video thumbnail generation failed: {e}")
finally:
# Cleanup temporary files
Path(temp_video_path).unlink(missing_ok=True)
Path(temp_thumb_path).unlink(missing_ok=True)
def _resize_and_crop_square(self, image: Image.Image, size: int) -> Image.Image:
"""
Resize and crop image to square (DRY method).
Maintains aspect ratio and crops to center square.
Args:
image: PIL Image object
size: Target size (width and height)
Returns:
Resized and cropped square image
"""
# Calculate dimensions to make a center-cropped square
width, height = image.size
target_size = min(width, height)
# Crop to square (center crop)
left = (width - target_size) // 2
top = (height - target_size) // 2
right = left + target_size
bottom = top + target_size
image = image.crop((left, top, right, bottom))
# Resize to target size
image = image.resize((size, size), Image.Resampling.LANCZOS)
return image

View File

@ -0,0 +1 @@
"""Background tasks package."""

View File

@ -0,0 +1,117 @@
"""Background tasks for thumbnail generation."""
import os
from typing import Tuple
from loguru import logger
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.domain.models import Asset
from app.infra.config import get_settings
from app.infra.s3_client import S3Client
from app.services.thumbnail_service import ThumbnailService
settings = get_settings()
def generate_thumbnail_task(asset_id: str) -> Tuple[bool, str]:
"""
Background task to generate thumbnail for an asset.
This task:
1. Fetches asset from database
2. Downloads original file from S3
3. Generates thumbnail
4. Uploads thumbnail to S3
5. Updates asset record with thumbnail storage key
Args:
asset_id: ID of the asset to process
Returns:
Tuple of (success: bool, message: str)
"""
logger.info(f"[THUMBNAIL] Starting thumbnail generation for asset {asset_id}")
# Create synchronous DB session (RQ workers are sync)
engine = create_engine(
settings.database_url.replace("sqlite+aiosqlite://", "sqlite:///"),
echo=False,
)
with Session(engine) as session:
try:
# Fetch asset
asset = session.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
error_msg = f"Asset {asset_id} not found"
logger.error(f"[THUMBNAIL] {error_msg}")
return False, error_msg
logger.info(
f"[THUMBNAIL] Processing {asset.type} asset: {asset.original_filename}"
)
# Initialize services
s3_client = S3Client()
thumbnail_service = ThumbnailService()
# Download original file from S3
logger.debug(f"[THUMBNAIL] Downloading from S3: {asset.storage_key_original}")
try:
response = s3_client.client.get_object(
Bucket=s3_client.bucket, Key=asset.storage_key_original
)
file_data = response["Body"].read()
except Exception as e:
error_msg = f"Failed to download file from S3: {e}"
logger.error(f"[THUMBNAIL] {error_msg}")
return False, error_msg
# Generate thumbnail
logger.debug(f"[THUMBNAIL] Generating thumbnail for {asset.content_type}")
try:
thumbnail_data = thumbnail_service.generate_thumbnail(
content_type=asset.content_type, file_data=file_data
)
except Exception as e:
error_msg = f"Failed to generate thumbnail: {e}"
logger.error(f"[THUMBNAIL] {error_msg}")
return False, error_msg
# Generate storage key for thumbnail
_, ext = os.path.splitext(asset.original_filename)
storage_key_thumb = s3_client.generate_storage_key(
user_id=asset.user_id,
asset_id=asset.id,
prefix="t",
extension=".jpg", # Always JPEG for thumbnails
)
# Upload thumbnail to S3
logger.debug(f"[THUMBNAIL] Uploading to S3: {storage_key_thumb}")
try:
s3_client.put_object(
storage_key=storage_key_thumb,
file_data=thumbnail_data,
content_type="image/jpeg",
)
except Exception as e:
error_msg = f"Failed to upload thumbnail to S3: {e}"
logger.error(f"[THUMBNAIL] {error_msg}")
return False, error_msg
# Update asset record
asset.storage_key_thumb = storage_key_thumb
session.commit()
success_msg = f"Thumbnail generated successfully for {asset.original_filename}"
logger.info(f"[THUMBNAIL] {success_msg}")
return True, success_msg
except Exception as e:
session.rollback()
error_msg = f"Unexpected error: {e}"
logger.exception(f"[THUMBNAIL] {error_msg}")
return False, error_msg

35
backend/src/worker.py Normal file
View File

@ -0,0 +1,35 @@
"""RQ Worker for processing background tasks."""
import redis
from loguru import logger
from rq import Worker, Queue, Connection
from app.infra.config import get_settings
settings = get_settings()
def main():
"""Start RQ worker to process thumbnail generation tasks."""
logger.info("Starting RQ worker for thumbnail generation...")
# Parse Redis URL
redis_url = settings.redis_url
logger.info(f"Connecting to Redis: {redis_url}")
# Connect to Redis
redis_conn = redis.from_url(redis_url)
# Listen to 'thumbnails' queue
with Connection(redis_conn):
worker = Worker(
[Queue("thumbnails")],
connection=redis_conn,
name="thumbnail-worker",
)
logger.info("Worker started, listening for tasks...")
worker.work()
if __name__ == "__main__":
main()

View File

@ -109,6 +109,7 @@ services:
redis:
image: redis:7-alpine
command: redis-server --appendonly yes --appendfsync everysec --save 60 1000
expose:
- "6379"
volumes:
@ -121,5 +122,29 @@ services:
networks:
- itcloud-net
worker:
build: ./backend
environment:
- APP_ENV=dev
- DATABASE_URL=sqlite+aiosqlite:////app/data/app.db
- S3_ENDPOINT_URL=http://minio:9000
- S3_REGION=us-east-1
- S3_ACCESS_KEY_ID=${MINIO_ROOT_USER:-minioadmin}
- S3_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD:-minioadmin}
- MEDIA_BUCKET=itcloud-media
- REDIS_URL=redis://redis:6379/0
volumes:
- ./backend/src:/app/src
- backend-data:/app/data
depends_on:
minio:
condition: service_healthy
redis:
condition: service_healthy
command: python /app/src/worker.py
restart: unless-stopped
networks:
- itcloud-net
volumes:
redis-data: