itcloud/backend/src/app/services/batch_operations_service.py

305 lines
9.3 KiB
Python

"""Batch operations service for bulk asset management."""
import io
import os
import tempfile
import zipfile
from contextlib import contextmanager
from pathlib import Path
from typing import AsyncIterator, Optional
from fastapi import HTTPException, status
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.models import Asset
from app.infra.s3_client import S3Client
from app.repositories.asset_repository import AssetRepository
from app.repositories.folder_repository import FolderRepository
from app.services.asset_service import sanitize_filename
@contextmanager
def temp_file_manager():
"""
Context manager for automatic temp file cleanup (DRY principle).
Yields:
List to store temp file paths
Usage:
with temp_file_manager() as temp_files:
temp_files.append(path)
# Files automatically deleted on exit
"""
temp_files = []
try:
yield temp_files
finally:
for file_path in temp_files:
try:
Path(file_path).unlink(missing_ok=True)
logger.debug(f"Cleaned up temp file: {file_path}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file {file_path}: {e}")
class BatchOperationsService:
"""
Service for batch asset operations (SOLID: Single Responsibility).
Handles bulk delete, move, and download operations with streaming support.
"""
def __init__(self, session: AsyncSession, s3_client: S3Client):
"""
Initialize batch operations service.
Args:
session: Database session
s3_client: S3 client instance
"""
self.asset_repo = AssetRepository(session)
self.folder_repo = FolderRepository(session)
self.s3_client = s3_client
async def delete_assets_batch(
self,
user_id: str,
asset_ids: list[str],
) -> dict:
"""
Delete multiple assets (move to trash bucket, delete from DB).
Args:
user_id: User ID
asset_ids: List of asset IDs to delete
Returns:
Dict with deletion statistics
Raises:
HTTPException: If no assets found or permission denied
"""
if not asset_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No asset IDs provided",
)
# Get assets with ownership check
assets = await self.asset_repo.get_by_ids(user_id, asset_ids)
if not assets:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No assets found or permission denied",
)
deleted_count = 0
failed_count = 0
for asset in assets:
try:
# Move files to trash bucket
self.s3_client.move_to_trash(asset.storage_key_original)
if asset.storage_key_thumb:
self.s3_client.move_to_trash(asset.storage_key_thumb)
# Delete from database
await self.asset_repo.delete(asset)
deleted_count += 1
except Exception as e:
logger.error(f"Failed to delete asset {asset.id}: {e}")
failed_count += 1
return {
"deleted": deleted_count,
"failed": failed_count,
"total": len(asset_ids),
}
async def move_assets_batch(
self,
user_id: str,
asset_ids: list[str],
target_folder_id: Optional[str],
) -> dict:
"""
Move multiple assets to a folder.
Args:
user_id: User ID
asset_ids: List of asset IDs to move
target_folder_id: Target folder ID (None for root)
Returns:
Dict with move statistics
Raises:
HTTPException: If no assets found or permission denied
"""
if not asset_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No asset IDs provided",
)
# Validate target folder if specified
if target_folder_id:
folder = await self.folder_repo.get_by_id(target_folder_id)
if not folder or folder.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Target folder not found",
)
# Update folder_id for all assets
updated_count = await self.asset_repo.update_folder_batch(
user_id=user_id,
asset_ids=asset_ids,
folder_id=target_folder_id,
)
return {
"moved": updated_count,
"requested": len(asset_ids),
}
async def download_assets_batch(
self,
user_id: str,
asset_ids: list[str],
) -> tuple[bytes, str]:
"""
Download multiple assets as a ZIP archive.
Uses streaming to avoid loading entire archive in memory.
Args:
user_id: User ID
asset_ids: List of asset IDs to download
Returns:
Tuple of (zip_data, filename)
Raises:
HTTPException: If no assets found or permission denied
"""
if not asset_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No asset IDs provided",
)
# Get assets with ownership check
assets = await self.asset_repo.get_by_ids(user_id, asset_ids)
if not assets:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No assets found or permission denied",
)
# Create ZIP archive in memory
zip_buffer = io.BytesIO()
with temp_file_manager() as temp_files:
try:
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Track filenames to avoid duplicates
used_names = set()
for asset in assets:
try:
# Download file from S3
response = self.s3_client.client.get_object(
Bucket=self.s3_client.bucket,
Key=asset.storage_key_original,
)
file_data = response["Body"].read()
# Generate unique filename (sanitized to prevent path traversal)
base_name = sanitize_filename(asset.original_filename)
unique_name = base_name
counter = 1
while unique_name in used_names:
name, ext = os.path.splitext(base_name)
unique_name = f"{name}_{counter}{ext}"
counter += 1
used_names.add(unique_name)
# Add to ZIP
zip_file.writestr(unique_name, file_data)
logger.debug(f"Added {unique_name} to ZIP archive")
except Exception as e:
logger.error(f"Failed to add asset {asset.id} to ZIP: {e}")
# Continue with other files
# Get ZIP data
zip_data = zip_buffer.getvalue()
# Generate filename
filename = f"download_{len(assets)}_files.zip"
return zip_data, filename
except Exception as e:
logger.exception(f"Failed to create ZIP archive: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create archive",
)
async def download_folder(
self,
user_id: str,
folder_id: str,
) -> tuple[bytes, str]:
"""
Download all assets in a folder as a ZIP archive.
Args:
user_id: User ID
folder_id: Folder ID
Returns:
Tuple of (zip_data, filename)
Raises:
HTTPException: If folder not found or permission denied
"""
# Verify folder ownership
folder = await self.folder_repo.get_by_id(folder_id)
if not folder or folder.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found",
)
# Get all assets in folder (with reasonable limit)
assets = await self.asset_repo.list_by_folder(
user_id=user_id,
folder_id=folder_id,
limit=1000, # Reasonable limit to prevent memory issues
)
if not assets:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder is empty",
)
# Get asset IDs and use existing download method
asset_ids = [asset.id for asset in assets]
zip_data, _ = await self.download_assets_batch(user_id, asset_ids)
# Use folder name in filename
filename = f"{folder.name}.zip"
return zip_data, filename