"""Asset management service.""" import os from typing import Optional from fastapi import HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.domain.models import Asset, AssetStatus, AssetType from app.infra.s3_client import S3Client from app.repositories.asset_repository import AssetRepository class AssetService: """Service for asset management operations.""" def __init__(self, session: AsyncSession, s3_client: S3Client): """ Initialize asset service. Args: session: Database session s3_client: S3 client instance """ self.asset_repo = AssetRepository(session) self.s3_client = s3_client def _get_asset_type(self, content_type: str) -> AssetType: """Determine asset type from content type.""" if content_type.startswith("image/"): return AssetType.PHOTO elif content_type.startswith("video/"): return AssetType.VIDEO else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported content type", ) async def create_upload( self, user_id: str, original_filename: str, content_type: str, size_bytes: int, ) -> tuple[Asset, dict]: """ Create an asset and generate pre-signed upload URL. Args: user_id: Owner user ID original_filename: Original filename content_type: MIME type size_bytes: File size in bytes Returns: Tuple of (asset, presigned_post_data) """ asset_type = self._get_asset_type(content_type) _, ext = os.path.splitext(original_filename) # Create asset record asset = await self.asset_repo.create( user_id=user_id, asset_type=asset_type, original_filename=original_filename, content_type=content_type, size_bytes=size_bytes, storage_key_original="", # Will be set after upload ) # Generate storage key storage_key = self.s3_client.generate_storage_key( user_id=user_id, asset_id=asset.id, prefix="o", extension=ext, ) # Update asset with storage key asset.storage_key_original = storage_key await self.asset_repo.update(asset) # Generate pre-signed POST presigned_post = self.s3_client.generate_presigned_post( storage_key=storage_key, content_type=content_type, max_size=size_bytes, ) return asset, presigned_post async def finalize_upload( self, user_id: str, asset_id: str, etag: Optional[str] = None, sha256: Optional[str] = None, ) -> Asset: """ Finalize upload and mark asset as ready. Args: user_id: User ID asset_id: Asset ID etag: Optional S3 ETag sha256: Optional file SHA256 hash Returns: Updated asset Raises: HTTPException: If asset not found or not authorized """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) # Verify file was uploaded if not self.s3_client.object_exists(asset.storage_key_original): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File not found in storage", ) asset.status = AssetStatus.READY if sha256: asset.sha256 = sha256 await self.asset_repo.update(asset) return asset async def list_assets( self, user_id: str, limit: int = 50, cursor: Optional[str] = None, asset_type: Optional[AssetType] = None, ) -> tuple[list[Asset], Optional[str], bool]: """ List user's assets. Args: user_id: User ID limit: Maximum number of results cursor: Pagination cursor asset_type: Filter by asset type Returns: Tuple of (assets, next_cursor, has_more) """ assets = await self.asset_repo.list_by_user( user_id=user_id, limit=limit + 1, # Fetch one more to check if there are more cursor=cursor, asset_type=asset_type, ) has_more = len(assets) > limit if has_more: assets = assets[:limit] next_cursor = assets[-1].id if has_more and assets else None return assets, next_cursor, has_more async def get_asset(self, user_id: str, asset_id: str) -> Asset: """ Get asset by ID. Args: user_id: User ID asset_id: Asset ID Returns: Asset instance Raises: HTTPException: If asset not found or not authorized """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) return asset async def get_download_url( self, user_id: str, asset_id: str, kind: str = "original" ) -> str: """ Get pre-signed download URL for an asset. Args: user_id: User ID asset_id: Asset ID kind: 'original' or 'thumb' Returns: Pre-signed download URL Raises: HTTPException: If asset not found or not authorized """ asset = await self.get_asset(user_id, asset_id) if kind == "thumb": storage_key = asset.storage_key_thumb if not storage_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Thumbnail not available", ) else: storage_key = asset.storage_key_original return self.s3_client.generate_presigned_url(storage_key) async def delete_asset(self, user_id: str, asset_id: str) -> Asset: """ Soft delete an asset. Args: user_id: User ID asset_id: Asset ID Returns: Updated asset """ asset = await self.get_asset(user_id, asset_id) return await self.asset_repo.soft_delete(asset) async def restore_asset(self, user_id: str, asset_id: str) -> Asset: """ Restore a soft-deleted asset. Args: user_id: User ID asset_id: Asset ID Returns: Updated asset """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) return await self.asset_repo.restore(asset) async def purge_asset(self, user_id: str, asset_id: str) -> None: """ Permanently delete an asset. Args: user_id: User ID asset_id: Asset ID """ asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found", ) if not asset.deleted_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Asset must be deleted before purging", ) # Delete from S3 self.s3_client.delete_object(asset.storage_key_original) if asset.storage_key_thumb: self.s3_client.delete_object(asset.storage_key_thumb) # Delete from database await self.asset_repo.delete(asset)