"""Asset management service.""" import os from typing import AsyncIterator, Optional, Tuple import redis from botocore.exceptions import ClientError from fastapi import HTTPException, UploadFile, status from loguru import logger from rq import Queue, Retry from sqlalchemy.ext.asyncio import AsyncSession from app.domain.models import Asset, AssetStatus, AssetType from app.infra.config import get_settings from app.infra.s3_client import S3Client from app.repositories.asset_repository import AssetRepository settings = get_settings() class AssetService: """Service for asset management operations.""" def __init__(self, session: AsyncSession, s3_client: S3Client): """ Initialize asset service. Args: session: Database session s3_client: S3 client instance """ self.asset_repo = AssetRepository(session) self.s3_client = s3_client def _get_asset_type(self, content_type: str) -> AssetType: """Determine asset type from content type.""" if content_type.startswith("image/"): return AssetType.PHOTO elif content_type.startswith("video/"): return AssetType.VIDEO else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported content type", ) async def create_upload( self, user_id: str, original_filename: str, content_type: str, size_bytes: int, folder_id: Optional[str] = None, ) -> tuple[Asset, dict]: """ Create an asset and generate pre-signed upload URL. Args: user_id: Owner user ID original_filename: Original filename content_type: MIME type size_bytes: File size in bytes folder_id: Optional folder ID to upload to Returns: Tuple of (asset, presigned_post_data) """ asset_type = self._get_asset_type(content_type) _, ext = os.path.splitext(original_filename) # Create asset record asset = await self.asset_repo.create( user_id=user_id, asset_type=asset_type, original_filename=original_filename, content_type=content_type, size_bytes=size_bytes, storage_key_original="", # Will be set after upload folder_id=folder_id, ) # Generate storage key storage_key = self.s3_client.generate_storage_key( user_id=user_id, asset_id=asset.id, prefix="o", extension=ext, ) # Update asset with storage key asset.storage_key_original = storage_key await self.asset_repo.update(asset) # Generate pre-signed POST presigned_post = self.s3_client.generate_presigned_post( storage_key=storage_key, content_type=content_type, max_size=size_bytes, ) return asset, presigned_post async def upload_file_to_s3( self, user_id: str, asset_id: str, file: UploadFile, ) -> None: """ Upload file content to S3 through backend. Args: user_id: User ID asset_id: Asset ID file: File to upload Raises: HTTPException: If asset not found or not authorized """ # Get asset and verify ownership asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) if not asset.storage_key_original: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Asset has no storage key", ) # Upload file to S3 try: content = await file.read() self.s3_client.put_object( storage_key=asset.storage_key_original, file_data=content, content_type=asset.content_type, ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to upload file: {str(e)}", ) async def finalize_upload( self, user_id: str, asset_id: str, etag: Optional[str] = None, sha256: Optional[str] = None, ) -> Asset: """ Finalize upload and mark asset as ready. Enqueues background task for thumbnail generation. Args: user_id: User ID asset_id: Asset ID etag: Optional S3 ETag sha256: Optional file SHA256 hash Returns: Updated asset Raises: HTTPException: If asset not found or not authorized """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) # Verify file was uploaded if not self.s3_client.object_exists(asset.storage_key_original): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File not found in storage", ) asset.status = AssetStatus.READY if sha256: asset.sha256 = sha256 await self.asset_repo.update(asset) # Enqueue thumbnail generation task (background processing with retry) try: redis_conn = redis.from_url(settings.redis_url) thumbnail_queue = Queue("thumbnails", connection=redis_conn) job = thumbnail_queue.enqueue( "app.tasks.thumbnail_tasks.generate_thumbnail_task", asset_id, job_timeout="5m", # 5 minutes timeout per attempt retry=Retry(max=3, interval=[30, 120, 300]), # 3 retries: 30s, 2m, 5m failure_ttl=86400, # Keep failed jobs for 24 hours result_ttl=3600, # Keep results for 1 hour ) logger.info( f"Enqueued thumbnail job {job.id} for asset {asset_id} " f"(max 3 retries with backoff)" ) except Exception as e: # Log error but don't fail the request - thumbnail is not critical logger.error(f"Failed to enqueue thumbnail task for asset {asset_id}: {e}") return asset async def list_assets( self, user_id: str, limit: int = 50, cursor: Optional[str] = None, asset_type: Optional[AssetType] = None, folder_id: Optional[str] = None, ) -> tuple[list[Asset], Optional[str], bool]: """ List user's assets. Args: user_id: User ID limit: Maximum number of results cursor: Pagination cursor asset_type: Filter by asset type folder_id: Filter by folder (None for root) Returns: Tuple of (assets, next_cursor, has_more) """ assets = await self.asset_repo.list_by_user( user_id=user_id, limit=limit + 1, # Fetch one more to check if there are more cursor=cursor, asset_type=asset_type, folder_id=folder_id, ) has_more = len(assets) > limit if has_more: assets = assets[:limit] next_cursor = assets[-1].id if has_more and assets else None return assets, next_cursor, has_more async def get_asset(self, user_id: str, asset_id: str) -> Asset: """ Get asset by ID. Args: user_id: User ID asset_id: Asset ID Returns: Asset instance Raises: HTTPException: If asset not found or not authorized """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) return asset async def get_download_url( self, user_id: str, asset_id: str, kind: str = "original" ) -> str: """ Get pre-signed download URL for an asset. Args: user_id: User ID asset_id: Asset ID kind: 'original' or 'thumb' Returns: Pre-signed download URL Raises: HTTPException: If asset not found or not authorized """ asset = await self.get_asset(user_id, asset_id) if kind == "thumb": storage_key = asset.storage_key_thumb if not storage_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Thumbnail not available", ) else: storage_key = asset.storage_key_original return self.s3_client.generate_presigned_url(storage_key) async def stream_media( self, user_id: str, asset_id: str, kind: str = "original" ) -> Tuple[AsyncIterator[bytes], str, int]: """ Stream media file content from S3. Args: user_id: User ID asset_id: Asset ID kind: 'original' or 'thumb' Returns: Tuple of (file_stream, content_type, content_length) Raises: HTTPException: If asset not found or not authorized """ asset = await self.get_asset(user_id, asset_id) if kind == "thumb": storage_key = asset.storage_key_thumb content_type = "image/jpeg" # thumbnails are always JPEG if not storage_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Thumbnail not available", ) else: storage_key = asset.storage_key_original content_type = asset.content_type # Stream file from S3 try: file_stream, content_length = await self.s3_client.stream_object(storage_key) except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "") if error_code == "NoSuchKey": # File not found in S3 - delete asset from database await self.asset_repo.delete(asset) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Media file not found in storage", ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve media from storage: {error_code}", ) return file_stream, content_type, content_length async def delete_asset(self, user_id: str, asset_id: str) -> None: """ Delete an asset permanently (move files to trash bucket, delete from DB). Args: user_id: User ID asset_id: Asset ID """ asset = await self.get_asset(user_id, asset_id) # Move files to trash bucket in S3 self.s3_client.move_to_trash(asset.storage_key_original) if asset.storage_key_thumb: self.s3_client.move_to_trash(asset.storage_key_thumb) # Delete from database await self.asset_repo.delete(asset)