itcloud/SECURITY_AND_IMPROVEMENTS_A...

72 KiB
Raw Permalink Blame History

ITCloud - Полный Аудит Безопасности и Рекомендации

Дата: 2026-01-05 Аудитор: Claude Code (Senior Security & Architecture Review) Версия: 1.0


📋 EXECUTIVE SUMMARY

Проведен комплексный аудит проекта ITCloud (облачное хранилище фото/видео). Проект демонстрирует хорошую архитектуру (Clean Architecture) и некоторые правильные практики безопасности, но имеет критические уязвимости и отсутствуют ключевые функции.

Статус готовности к продакшену: НЕ ГОТОВ

Критические блокеры:

  1. Отсутствует rate limiting (защита от brute force)
  2. Нет soft delete/корзины (указано в требованиях)
  3. Отсутствует управление квотами хранилища
  4. Нет механизма отзыва токенов
  5. Отсутствует стратегия бэкапов БД
  6. Минимальное покрытие тестами (<10%)

Оценка времени до готовности к продакшену: 6-8 недель


🔴 1. КРИТИЧЕСКИЕ УЯЗВИМОСТИ БЕЗОПАСНОСТИ

1.1 Отсутствует Rate Limiting

Серьезность: КРИТИЧЕСКАЯ 🔴 Расположение: Все API endpoints Файлы: backend/src/app/main.py

Проблема:

  • Нет ограничений на количество запросов
  • Атакующий может делать неограниченное количество попыток логина
  • Возможна DoS атака через перегрузку API

Риски:

  • Brute force атаки на пароли пользователей
  • Credential stuffing атаки
  • Исчерпание ресурсов сервера
  • Спам загрузок файлов

Решение:

# Установить: pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Для каждого endpoint:
@router.post("/login")
@limiter.limit("5/minute")  # 5 попыток в минуту
async def login(...):
    ...

@router.post("/uploads/create")
@limiter.limit("100/hour")  # 100 загрузок в час
async def create_upload(...):
    ...

Рекомендуемые лимиты:

  • /auth/login: 5 запросов/минуту
  • /auth/register: 3 запроса/час
  • /uploads/create: 100 запросов/час
  • Остальные endpoints: 1000 запросов/час

1.2 JWT Secret по умолчанию слабый

Серьезность: КРИТИЧЕСКАЯ 🔴 Расположение: backend/.env.example:18

Проблема:

JWT_SECRET=your-secret-key-change-this-in-production

Если пользователь задеплоит с дефолтным секретом, токены можно подделать.

Решение:

  1. Добавить валидацию при старте:
# В config.py
if settings.APP_ENV == "prod" and settings.JWT_SECRET in [
    "your-secret-key-change-this-in-production",
    "changeme",
    "secret",
]:
    raise ValueError("КРИТИЧЕСКАЯ ОШИБКА: Используется слабый JWT_SECRET в продакшене!")
  1. Генерировать случайный секрет при деплое:
import secrets
print(secrets.token_urlsafe(64))

1.3 Отсутствует ограничение размера при чтении файла в память

Серьезность: ВЫСОКАЯ 🔴 Расположение: backend/src/app/services/asset_service.py:138-182

Проблема:

# Строка 151
file_data = await file.read()  # Читает ВЕСЬ файл в память!

Атакующий может загрузить огромный файл и вызвать OOM (Out Of Memory).

Решение: Стримить файл чанками в S3:

async def upload_file_to_s3(
    self,
    user_id: str,
    asset_id: str,
    file: UploadFile,
) -> Asset:
    asset = await self.asset_repo.get_by_id(asset_id)
    if not asset or asset.user_id != user_id:
        raise HTTPException(status_code=404, detail="Asset not found")

    # Проверить размер ПЕРЕД чтением
    if file.size and file.size > self.config.max_upload_size_bytes:
        raise HTTPException(status_code=413, detail="File too large")

    # Стрим в S3 чанками
    try:
        await self.s3_client.upload_fileobj_streaming(
            file.file,  # file-like object
            self.config.media_bucket,
            asset.storage_key_original,
            content_type=asset.content_type,
        )
    except Exception as e:
        logger.error(f"S3 upload failed: {e}")
        raise HTTPException(status_code=500, detail="Upload failed")

Добавить в S3Client:

async def upload_fileobj_streaming(
    self,
    fileobj,
    bucket: str,
    key: str,
    content_type: str,
    chunk_size: int = 8 * 1024 * 1024,  # 8MB chunks
):
    """Stream file to S3 in chunks"""
    try:
        await asyncio.to_thread(
            self.client.upload_fileobj,
            fileobj,
            bucket,
            key,
            ExtraArgs={
                'ContentType': content_type,
                'ServerSideEncryption': 'AES256',  # Важно!
            },
            Config=boto3.s3.transfer.TransferConfig(
                multipart_threshold=chunk_size,
                multipart_chunksize=chunk_size,
            ),
        )
    except ClientError as e:
        logger.error(f"S3 upload error: {e}")
        raise

1.4 Отсутствует CSRF защита

Серьезность: ВЫСОКАЯ 🟡 (частично смягчено Bearer токенами) Расположение: backend/src/app/main.py

Текущее состояние:

  • Используются Bearer токены в заголовках → CSRF не критичен
  • НО: если в будущем перейдете на cookie-based auth → станет критично

Рекомендация на будущее:

from fastapi_csrf_protect import CsrfProtect

@app.post("/api/v1/assets/{asset_id}/delete")
async def delete_asset(
    asset_id: str,
    csrf_protect: CsrfProtect = Depends(),
):
    await csrf_protect.validate_csrf(request)
    # ...

1.5 Небезопасная генерация share токенов

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/repositories/share_repository.py:25-27

Проблема:

token = secrets.token_urlsafe(32)  # Хорошо!
# НО нет проверки на коллизии

Решение:

async def create(
    self,
    owner_user_id: str,
    asset_id: Optional[str] = None,
    album_id: Optional[str] = None,
    expires_at: Optional[datetime] = None,
    password_hash: Optional[str] = None,
) -> Share:
    # Генерировать с проверкой уникальности
    max_attempts = 5
    for attempt in range(max_attempts):
        token = secrets.token_urlsafe(32)
        # Проверить, что токен не существует
        existing = await self.session.execute(
            select(Share).where(Share.token == token)
        )
        if not existing.scalar_one_or_none():
            break
    else:
        raise RuntimeError("Failed to generate unique share token")

    share = Share(
        owner_user_id=owner_user_id,
        asset_id=asset_id,
        album_id=album_id,
        token=token,
        expires_at=expires_at,
        password_hash=password_hash,
    )
    self.session.add(share)
    await self.session.flush()
    await self.session.refresh(share)
    return share

🟡 2. ПРОБЛЕМЫ АУТЕНТИФИКАЦИИ И АВТОРИЗАЦИИ

2.1 Отсутствует механизм отзыва токенов

Серьезность: ВЫСОКАЯ 🔴 Расположение: backend/src/app/infra/security.py

Проблема:

  • JWT токены включают jti (token ID), но нет механизма отзыва
  • Если токен скомпрометирован, он остается валидным до истечения срока (15 минут для access, 14 дней для refresh)
  • Пользователь не может "выйти со всех устройств"

Решение с Redis:

# В infra/redis_client.py
from redis.asyncio import Redis

class TokenBlacklist:
    def __init__(self, redis: Redis):
        self.redis = redis

    async def revoke_token(self, jti: str, ttl_seconds: int):
        """Добавить токен в blacklist"""
        await self.redis.setex(f"blacklist:{jti}", ttl_seconds, "1")

    async def is_revoked(self, jti: str) -> bool:
        """Проверить, отозван ли токен"""
        return await self.redis.exists(f"blacklist:{jti}") > 0

# В security.py
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
    blacklist: TokenBlacklist = Depends(get_token_blacklist),
) -> User:
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
        jti: str = payload.get("jti")

        # Проверить blacklist
        if await blacklist.is_revoked(jti):
            raise HTTPException(status_code=401, detail="Token has been revoked")

        # ... остальная логика

Добавить endpoint:

@router.post("/auth/logout")
async def logout(
    current_user: CurrentUser,
    token: str = Depends(oauth2_scheme),
    blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
    """Отозвать текущий токен"""
    payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
    jti = payload.get("jti")
    exp = payload.get("exp")
    ttl = exp - int(datetime.utcnow().timestamp())

    await blacklist.revoke_token(jti, ttl)
    return {"message": "Logged out successfully"}

2.2 Нет refresh token rotation

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/api/v1/auth.py

Проблема:

  • Нет endpoint для обновления access token через refresh token
  • Пользователи должны логиниться заново каждые 15 минут

Решение:

@router.post("/auth/refresh", response_model=AuthTokens)
async def refresh_token(
    refresh_token: str = Body(..., embed=True),
    db: AsyncSession = Depends(get_db),
    blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
    """Обновить access token используя refresh token"""
    try:
        # Декодировать refresh token
        payload = jwt.decode(
            refresh_token,
            settings.JWT_SECRET,
            algorithms=["HS256"]
        )

        # Проверить тип токена
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="Invalid token type")

        # Проверить blacklist
        jti = payload.get("jti")
        if await blacklist.is_revoked(jti):
            raise HTTPException(status_code=401, detail="Token has been revoked")

        user_id = payload.get("sub")

        # Получить пользователя
        user_repo = UserRepository(db)
        user = await user_repo.get_by_id(user_id)
        if not user or not user.is_active:
            raise HTTPException(status_code=401, detail="User not found")

        # ВАЖНО: Отозвать старый refresh token (rotation)
        exp = payload.get("exp")
        ttl = exp - int(datetime.utcnow().timestamp())
        await blacklist.revoke_token(jti, ttl)

        # Сгенерировать новые токены
        auth_service = AuthService(db)
        tokens = auth_service.create_tokens(user)

        return tokens

    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

Frontend:

// В api.ts
async refreshAccessToken(): Promise<AuthTokens> {
  const refreshToken = localStorage.getItem('refresh_token');
  if (!refreshToken) {
    throw new Error('No refresh token');
  }

  const { data } = await this.client.post('/auth/refresh', {
    refresh_token: refreshToken,
  });

  localStorage.setItem('access_token', data.access_token);
  localStorage.setItem('refresh_token', data.refresh_token);

  return data;
}

// Добавить в interceptor
this.client.interceptors.response.use(
  (response) => response,
  async (error) => {
    const originalRequest = error.config;

    if (error.response?.status === 401 && !originalRequest._retry) {
      originalRequest._retry = true;

      try {
        await this.refreshAccessToken();
        return this.client(originalRequest);
      } catch (refreshError) {
        // Refresh failed, redirect to login
        localStorage.removeItem('access_token');
        localStorage.removeItem('refresh_token');
        if (!['/login', '/register'].includes(window.location.pathname)) {
          window.location.href = '/login';
        }
        return Promise.reject(refreshError);
      }
    }

    return Promise.reject(error);
  }
);

2.3 Слабая валидация паролей

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/api/schemas.py:16

Текущее состояние:

password: str = Field(min_length=8)

Можно установить пароль "12345678"

Решение:

import re
from pydantic import field_validator

class UserRegisterRequest(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

    @field_validator("password")
    @classmethod
    def validate_password_strength(cls, v: str) -> str:
        """Проверка сложности пароля"""
        if len(v) < 8:
            raise ValueError("Password must be at least 8 characters")

        # Проверки
        has_upper = re.search(r'[A-Z]', v)
        has_lower = re.search(r'[a-z]', v)
        has_digit = re.search(r'\d', v)
        has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', v)

        checks_passed = sum([
            bool(has_upper),
            bool(has_lower),
            bool(has_digit),
            bool(has_special),
        ])

        if checks_passed < 3:
            raise ValueError(
                "Password must contain at least 3 of: "
                "uppercase letter, lowercase letter, digit, special character"
            )

        # Проверка на распространенные пароли
        common_passwords = [
            "password", "12345678", "qwerty123", "admin123"
        ]
        if v.lower() in common_passwords:
            raise ValueError("This password is too common")

        return v

2.4 Нет блокировки аккаунта после неудачных попыток

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/services/auth_service.py:55-84

Проблема: Можно делать бесконечные попытки логина (если нет rate limiting)

Решение с Redis:

class LoginAttemptTracker:
    def __init__(self, redis: Redis):
        self.redis = redis
        self.max_attempts = 5
        self.lockout_duration = 900  # 15 минут

    async def record_failed_attempt(self, email: str):
        """Записать неудачную попытку"""
        key = f"login_attempts:{email}"
        attempts = await self.redis.incr(key)

        if attempts == 1:
            # Установить TTL на первую попытку
            await self.redis.expire(key, 3600)  # 1 час

        if attempts >= self.max_attempts:
            # Заблокировать аккаунт
            await self.redis.setex(
                f"account_locked:{email}",
                self.lockout_duration,
                "1"
            )

    async def clear_attempts(self, email: str):
        """Очистить счетчик после успешного логина"""
        await self.redis.delete(f"login_attempts:{email}")

    async def is_locked(self, email: str) -> bool:
        """Проверить, заблокирован ли аккаунт"""
        return await self.redis.exists(f"account_locked:{email}") > 0

    async def get_lockout_remaining(self, email: str) -> int:
        """Получить оставшееся время блокировки (секунды)"""
        return await self.redis.ttl(f"account_locked:{email}")

# В auth.py
@router.post("/login", response_model=AuthResponse)
async def login(
    data: UserLoginRequest,
    session: DatabaseSession,
    tracker: LoginAttemptTracker = Depends(get_login_tracker),
):
    # Проверить блокировку
    if await tracker.is_locked(data.email):
        remaining = await tracker.get_lockout_remaining(data.email)
        raise HTTPException(
            status_code=429,
            detail=f"Account locked due to too many failed attempts. "
                   f"Try again in {remaining // 60} minutes."
        )

    auth_service = AuthService(session)

    try:
        user, tokens = await auth_service.login(data.email, data.password)

        # Успешный логин - очистить счетчик
        await tracker.clear_attempts(data.email)

        return AuthResponse(user=user, tokens=tokens)

    except HTTPException as e:
        if e.status_code == 401:
            # Неудачная попытка
            await tracker.record_failed_attempt(data.email)
        raise

🔍 3. ПРОБЛЕМЫ ВАЛИДАЦИИ ВХОДНЫХ ДАННЫХ

3.1 Недостаточная валидация Content-Type

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/api/schemas.py:85-91

Текущее состояние:

@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
    if not (v.startswith("image/") or v.startswith("video/")):
        raise ValueError("Only image/* and video/* content types are supported")
    return v

Принимает ЛЮБОЙ image/* или video/* тип, включая опасные (SVG с JS, вредоносные кодеки)

Решение - whitelist:

ALLOWED_IMAGE_TYPES = {
    "image/jpeg",
    "image/jpg",
    "image/png",
    "image/gif",
    "image/webp",
    "image/heic",
    "image/heif",
}

ALLOWED_VIDEO_TYPES = {
    "video/mp4",
    "video/mpeg",
    "video/quicktime",  # .mov
    "video/x-msvideo",  # .avi
    "video/x-matroska", # .mkv
    "video/webm",
}

@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
    v = v.lower().strip()

    if v not in ALLOWED_IMAGE_TYPES and v not in ALLOWED_VIDEO_TYPES:
        raise ValueError(
            f"Content type '{v}' not supported. "
            f"Allowed: {', '.join(ALLOWED_IMAGE_TYPES | ALLOWED_VIDEO_TYPES)}"
        )

    return v

3.2 Отсутствует проверка "магических байтов"

Серьезность: СРЕДНЯЯ 🟡 Расположение: Upload flow

Проблема: Полагаемся на content-type от клиента. Вредоносный файл может притвориться изображением.

Решение с python-magic:

pip install python-magic-bin  # Windows
pip install python-magic       # Linux/Mac
import magic

async def verify_file_type(file: UploadFile, expected_type: str) -> bool:
    """Проверить реальный тип файла по магическим байтам"""
    # Прочитать первые 2048 байт
    header = await file.read(2048)
    await file.seek(0)  # Вернуться в начало

    # Определить MIME type
    mime = magic.from_buffer(header, mime=True)

    # Проверить соответствие
    if expected_type.startswith("image/"):
        return mime in ALLOWED_IMAGE_TYPES
    elif expected_type.startswith("video/"):
        return mime in ALLOWED_VIDEO_TYPES

    return False

# В asset_service.py
async def create_upload(
    self,
    user_id: str,
    original_filename: str,
    content_type: str,
    size_bytes: int,
    folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
    # ... существующий код ...

    # Добавить в метаданные для проверки при finalize
    asset.metadata = {
        "expected_content_type": content_type,
        "needs_verification": True,
    }

async def finalize_upload(
    self,
    user_id: str,
    asset_id: str,
    etag: Optional[str] = None,
    sha256: Optional[str] = None,
) -> Asset:
    # ... существующий код ...

    # Проверить магические байты если требуется
    if asset.metadata.get("needs_verification"):
        # Загрузить первые байты из S3
        try:
            response = await self.s3_client.get_object_range(
                self.config.media_bucket,
                asset.storage_key_original,
                bytes_range=(0, 2047),
            )
            header = response['Body'].read()

            mime = magic.from_buffer(header, mime=True)
            expected = asset.metadata.get("expected_content_type")

            # Проверить соответствие
            if (expected.startswith("image/") and mime not in ALLOWED_IMAGE_TYPES) or \
               (expected.startswith("video/") and mime not in ALLOWED_VIDEO_TYPES):
                # Удалить файл из S3
                await self.s3_client.delete_object(
                    self.config.media_bucket,
                    asset.storage_key_original
                )
                asset.status = AssetStatus.FAILED
                await self.asset_repo.update(asset)
                raise HTTPException(
                    status_code=400,
                    detail="File type verification failed"
                )
        except Exception as e:
            logger.warning(f"File verification failed: {e}")

3.3 Защита от Path Traversal реализована

Серьезность: N/A (ХОРОШО) Расположение: backend/src/app/services/asset_service.py:23-50

Статус: Правильно реализовано:

  • Используется os.path.basename()
  • Удаляются path separators и null bytes
  • Ограничение длины имени файла

Это правильная реализация. Оставить как есть.


🗄️ 4. БЕЗОПАСНОСТЬ S3 ХРАНИЛИЩА

4.1 Отсутствует шифрование S3 объектов

Серьезность: СРЕДНЯЯ-ВЫСОКАЯ 🟡 Расположение: backend/src/app/infra/s3_client.py

Проблема: Файлы хранятся в S3 без шифрования на стороне сервера.

Решение:

# Во ВСЕХ методах загрузки добавить ServerSideEncryption
async def upload_object(
    self,
    bucket: str,
    key: str,
    data: bytes,
    content_type: str,
) -> None:
    try:
        await asyncio.to_thread(
            self.client.put_object,
            Bucket=bucket,
            Key=key,
            Body=data,
            ContentType=content_type,
            ServerSideEncryption='AES256',  # ← ДОБАВИТЬ
        )
    except ClientError as e:
        logger.error(f"S3 upload error: {e}")
        raise

# Для presigned URLs тоже нужно добавить
def generate_presigned_post(
    self,
    bucket: str,
    key: str,
    content_type: str,
    max_size_bytes: int,
    expires_in: int = 600,
) -> dict:
    try:
        return self.client.generate_presigned_post(
            Bucket=bucket,
            Key=key,
            Fields={
                "Content-Type": content_type,
                "x-amz-server-side-encryption": "AES256",  # ← ДОБАВИТЬ
            },
            Conditions=[
                {"Content-Type": content_type},
                ["content-length-range", 1, max_size_bytes],
                {"x-amz-server-side-encryption": "AES256"},  # ← ДОБАВИТЬ
            ],
            ExpiresIn=expires_in,
        )
    except ClientError as e:
        logger.error(f"Presigned POST generation error: {e}")
        raise

Важно: Также настроить bucket policy:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DenyUnencryptedObjectUploads",
      "Effect": "Deny",
      "Principal": "*",
      "Action": "s3:PutObject",
      "Resource": "arn:aws:s3:::your-bucket-name/*",
      "Condition": {
        "StringNotEquals": {
          "s3:x-amz-server-side-encryption": "AES256"
        }
      }
    }
  ]
}

4.2 Hardcoded bucket name для корзины

Серьезность: НИЗКАЯ 🟢 Расположение: backend/src/app/infra/s3_client.py:28, :152

Проблема:

TRASH_BUCKET = "itcloud-trash"  # Hardcoded

Решение:

# В config.py
class Settings(BaseSettings):
    # ... existing ...
    MEDIA_BUCKET: str
    TRASH_BUCKET: str = "itcloud-trash"  # default, но можно переопределить

# В s3_client.py
def __init__(self, config: Settings):
    self.config = config
    # ... existing ...
    self.trash_bucket = config.TRASH_BUCKET  # Использовать из конфига

4.3 Слишком длинный TTL для pre-signed URLs

Серьезность: НИЗКАЯ 🟢 Расположение: backend/src/app/infra/config.py:43

Текущее:

SIGNED_URL_TTL_SECONDS: int = 600  # 10 минут

Рекомендация: Для продакшена уменьшить до 300 секунд (5 минут) для чувствительного контента:

SIGNED_URL_TTL_SECONDS: int = Field(default=300, ge=60, le=3600)

🔐 5. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ПО БЕЗОПАСНОСТИ

5.1 CORS Headers слишком permissive

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/main.py:34-37

Проблема:

allow_headers=["*"],   # Разрешены ВСЕ заголовки
expose_headers=["*"],  # Экспортируются ВСЕ заголовки

Решение:

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.CORS_ORIGINS.split(","),
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    # Whitelist конкретных заголовков
    allow_headers=[
        "Authorization",
        "Content-Type",
        "X-Requested-With",
        "Accept",
    ],
    # Экспортировать только нужные
    expose_headers=[
        "Content-Length",
        "Content-Type",
        "X-Total-Count",
    ],
    max_age=3600,
)

5.2 Добавить Security Headers

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/main.py

Добавить middleware для security headers:

from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)

        # Защита от clickjacking
        response.headers["X-Frame-Options"] = "DENY"

        # Защита от XSS
        response.headers["X-Content-Type-Options"] = "nosniff"

        # Защита от XSS для старых браузеров
        response.headers["X-XSS-Protection"] = "1; mode=block"

        # Content Security Policy
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "img-src 'self' data: https:; "
            "script-src 'self'; "
            "style-src 'self' 'unsafe-inline';"
        )

        # HSTS (если используется HTTPS)
        if request.url.scheme == "https":
            response.headers["Strict-Transport-Security"] = (
                "max-age=31536000; includeSubDomains"
            )

        # Referrer Policy
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        # Permissions Policy
        response.headers["Permissions-Policy"] = (
            "geolocation=(), microphone=(), camera=()"
        )

        return response

app.add_middleware(SecurityHeadersMiddleware)

5.3 Токены в localStorage (Frontend)

Серьезность: СРЕДНЯЯ 🟡 Расположение: frontend/src/services/api.ts:30-34, :62-64

Проблема: localStorage доступен для XSS атак.

Альтернативы:

  1. httpOnly cookies для refresh token (лучше всего)
  2. Memory storage для access token (но теряется при refresh страницы)

Компромиссное решение:

// Хранить refresh token в httpOnly cookie (настроить на бэкенде)
// Access token держать в памяти

class TokenManager {
  private accessToken: string | null = null;

  setAccessToken(token: string) {
    this.accessToken = token;
    // Также можно в sessionStorage (лучше чем localStorage)
    sessionStorage.setItem('access_token', token);
  }

  getAccessToken(): string | null {
    if (this.accessToken) return this.accessToken;
    return sessionStorage.getItem('access_token');
  }

  clearTokens() {
    this.accessToken = null;
    sessionStorage.removeItem('access_token');
    // Refresh token будет удален через httpOnly cookie с бэкенда
  }
}

🏗️ 6. АРХИТЕКТУРНЫЕ УЛУЧШЕНИЯ

6.1 Отсутствует Soft Delete для Assets

Серьезность: КРИТИЧЕСКАЯ 🔴 Расположение: backend/src/app/domain/models.py

Проблема: В спецификации (CLAUDE.md) указано:

"Soft delete to trash with restore capability"

Но в модели Asset нет поля deleted_at:

class Asset(Base):
    __tablename__ = "assets"
    # ... fields ...
    # НЕТ: deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime)

Решение:

  1. Создать миграцию:
cd backend
alembic revision -m "add_soft_delete_to_assets"
  1. Добавить в миграцию:
def upgrade() -> None:
    op.add_column('assets', sa.Column('deleted_at', sa.DateTime(), nullable=True))
    op.create_index('ix_assets_deleted_at', 'assets', ['deleted_at'])

def downgrade() -> None:
    op.drop_index('ix_assets_deleted_at', table_name='assets')
    op.drop_column('assets', 'deleted_at')
  1. Обновить модель:
from typing import Optional
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import DateTime

class Asset(Base):
    __tablename__ = "assets"

    # ... existing fields ...

    deleted_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime, nullable=True, index=True, default=None
    )
  1. Добавить методы в AssetRepository:
async def soft_delete(self, asset: Asset) -> Asset:
    """Мягкое удаление (в корзину)"""
    asset.deleted_at = datetime.utcnow()
    await self.session.flush()
    await self.session.refresh(asset)
    return asset

async def restore(self, asset: Asset) -> Asset:
    """Восстановить из корзины"""
    asset.deleted_at = None
    await self.session.flush()
    await self.session.refresh(asset)
    return asset

async def list_trash(
    self,
    user_id: str,
    limit: int = 50,
    cursor: Optional[str] = None,
) -> list[Asset]:
    """Список удаленных файлов"""
    query = (
        select(Asset)
        .where(Asset.user_id == user_id)
        .where(Asset.deleted_at.isnot(None))
        .order_by(Asset.deleted_at.desc())
    )

    if cursor:
        query = query.where(Asset.id < cursor)

    query = query.limit(limit + 1)
    result = await self.session.execute(query)
    return list(result.scalars().all())

async def hard_delete(self, asset: Asset) -> None:
    """Полное удаление (безвозвратно)"""
    await self.session.delete(asset)
    await self.session.flush()
  1. Добавить endpoints:
# В assets.py

@router.get("/trash", response_model=AssetListResponse)
async def list_trash(
    current_user: CurrentUser,
    session: DatabaseSession,
    s3_client: S3ClientDep,
    limit: int = Query(50, ge=1, le=100),
    cursor: Optional[str] = Query(None),
):
    """Список файлов в корзине"""
    asset_service = AssetService(session, s3_client)
    assets, next_cursor, has_more = await asset_service.list_trash(
        user_id=current_user.id,
        limit=limit,
        cursor=cursor,
    )
    return AssetListResponse(
        items=assets,
        next_cursor=next_cursor,
        has_more=has_more,
    )

@router.post("/{asset_id}/restore", response_model=AssetResponse)
async def restore_asset(
    asset_id: str,
    current_user: CurrentUser,
    session: DatabaseSession,
    s3_client: S3ClientDep,
):
    """Восстановить файл из корзины"""
    asset_service = AssetService(session, s3_client)
    asset = await asset_service.restore_asset(
        user_id=current_user.id,
        asset_id=asset_id,
    )
    return asset

@router.delete("/{asset_id}/purge")
async def purge_asset(
    asset_id: str,
    current_user: CurrentUser,
    session: DatabaseSession,
    s3_client: S3ClientDep,
):
    """Удалить файл безвозвратно из корзины"""
    asset_service = AssetService(session, s3_client)
    await asset_service.purge_asset(
        user_id=current_user.id,
        asset_id=asset_id,
    )
    return {"message": "Asset permanently deleted"}
  1. Обновить существующий DELETE endpoint:
@router.delete("/{asset_id}")
async def delete_asset(
    asset_id: str,
    current_user: CurrentUser,
    session: DatabaseSession,
    s3_client: S3ClientDep,
):
    """Переместить файл в корзину (мягкое удаление)"""
    asset_service = AssetService(session, s3_client)
    await asset_service.soft_delete_asset(
        user_id=current_user.id,
        asset_id=asset_id,
    )
    return {"message": "Asset moved to trash"}
  1. Обновить list_assets чтобы НЕ показывать удаленные:
# В asset_repository.py
async def list_by_user(
    self,
    user_id: str,
    folder_id: Optional[str] = None,
    limit: int = 50,
    cursor: Optional[str] = None,
) -> list[Asset]:
    query = (
        select(Asset)
        .where(Asset.user_id == user_id)
        .where(Asset.deleted_at.is_(None))  # ← ДОБАВИТЬ
        .where(Asset.status == AssetStatus.READY)
    )
    # ... rest of logic
  1. Background job для очистки старых файлов:
# В tasks/cleanup_tasks.py
from datetime import datetime, timedelta

async def cleanup_old_trash():
    """Удалить файлы, которые в корзине >30 дней"""
    async with get_db_session() as session:
        thirty_days_ago = datetime.utcnow() - timedelta(days=30)

        assets = await session.execute(
            select(Asset).where(
                Asset.deleted_at < thirty_days_ago
            )
        )

        for asset in assets.scalars():
            # Удалить из S3
            await s3_client.delete_object(bucket, asset.storage_key_original)
            if asset.storage_key_thumb:
                await s3_client.delete_object(bucket, asset.storage_key_thumb)

            # Удалить из БД
            await session.delete(asset)

        await session.commit()

6.2 Отсутствуют индексы в БД

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/domain/models.py

Проблема: Медленные запросы при больших объемах данных.

Решение - добавить составные индексы:

# В models.py
from sqlalchemy import Index

class Asset(Base):
    __tablename__ = "assets"

    # ... fields ...

    # Составные индексы для часто используемых запросов
    __table_args__ = (
        Index(
            'ix_assets_user_folder_created',
            'user_id', 'folder_id', 'created_at'
        ),
        Index(
            'ix_assets_user_status',
            'user_id', 'status'
        ),
        Index(
            'ix_assets_user_deleted',
            'user_id', 'deleted_at'
        ),
    )

class Share(Base):
    __tablename__ = "shares"

    # ... fields ...

    __table_args__ = (
        Index(
            'ix_shares_token_expires',
            'token', 'expires_at'
        ),
        Index(
            'ix_shares_owner_created',
            'owner_user_id', 'created_at'
        ),
    )

class Folder(Base):
    __tablename__ = "folders"

    # ... fields ...

    __table_args__ = (
        Index(
            'ix_folders_user_parent',
            'user_id', 'parent_folder_id'
        ),
    )

Создать миграцию:

alembic revision -m "add_composite_indexes"

6.3 ZIP создается в памяти

Серьезность: ВЫСОКАЯ 🔴 Расположение: backend/src/app/services/batch_operations_service.py:206

Проблема:

zip_buffer = io.BytesIO()  # Весь ZIP в памяти!
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
    # Добавление файлов...

При скачивании 10GB файлов → OOM.

Решение - стримить через temp file:

from app.services.batch_operations_service import temp_file_manager

async def download_assets_batch(
    self,
    user_id: str,
    asset_ids: list[str],
) -> tuple[str, bytes]:
    """Скачать несколько файлов как ZIP архив"""

    # Использовать временный файл вместо памяти
    async with temp_file_manager(suffix='.zip') as temp_path:
        # Создать ZIP в temp файле
        with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
            for asset_id in asset_ids:
                asset = await self.asset_repo.get_by_id(asset_id)
                if not asset or asset.user_id != user_id:
                    continue

                # Стримить файл из S3
                try:
                    response = await asyncio.to_thread(
                        self.s3_client.client.get_object,
                        Bucket=self.config.media_bucket,
                        Key=asset.storage_key_original,
                    )

                    # Читать чанками
                    with response['Body'] as stream:
                        # Генерировать уникальное имя файла
                        base_name = sanitize_filename(asset.original_filename)
                        unique_name = self._get_unique_filename(
                            zip_file,
                            base_name
                        )

                        # Записать в ZIP chunk by chunk
                        with zip_file.open(unique_name, 'w') as dest:
                            while True:
                                chunk = stream.read(8 * 1024 * 1024)  # 8MB
                                if not chunk:
                                    break
                                dest.write(chunk)

                except Exception as e:
                    logger.warning(f"Failed to add asset {asset_id}: {e}")
                    continue

        # Прочитать готовый ZIP
        with open(temp_path, 'rb') as f:
            zip_data = f.read()

        filename = f"assets_batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
        return filename, zip_data

# Но ЛУЧШЕ стримить ответ напрямую без чтения в память:
from starlette.responses import FileResponse

@router.post("/batch/download")
async def download_batch(
    data: BatchDownloadRequest,
    current_user: CurrentUser,
    session: DatabaseSession,
):
    """Скачать файлы как ZIP (streaming response)"""

    # Создать temp ZIP
    async with temp_file_manager(suffix='.zip') as temp_path:
        batch_service = BatchOperationsService(session, s3_client)

        # Создать ZIP в temp файле (не загружая в память)
        await batch_service.create_zip_file(
            user_id=current_user.id,
            asset_ids=data.asset_ids,
            output_path=temp_path,
        )

        # Вернуть как streaming response
        filename = f"assets_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
        return FileResponse(
            path=temp_path,
            media_type='application/zip',
            filename=filename,
            background=BackgroundTask(os.unlink, temp_path),  # Удалить после отправки
        )

6.4 Смешанные обязанности в FolderService

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/services/folder_service.py:195-223

Проблема:

# Строка 207
# TODO: Should use AssetService methods here
asset.folder_id = None
await self.asset_repo.update(asset)

FolderService напрямую работает с AssetRepository, нарушая SRP.

Решение:

# Создать Orchestration Service
class FolderManagementService:
    """Оркестрирует операции над папками и связанными ресурсами"""

    def __init__(
        self,
        session: AsyncSession,
        s3_client: S3Client,
    ):
        self.folder_service = FolderService(session)
        self.asset_service = AssetService(session, s3_client)

    async def delete_folder_with_contents(
        self,
        user_id: str,
        folder_id: str,
        delete_assets: bool = False,
    ) -> None:
        """
        Удалить папку и обработать связанные assets.

        Args:
            user_id: ID пользователя
            folder_id: ID папки
            delete_assets: True - удалить assets, False - переместить в root
        """
        # Получить все assets в папке и подпапках
        assets = await self.asset_service.list_assets_in_folder_recursive(
            user_id=user_id,
            folder_id=folder_id,
        )

        if delete_assets:
            # Удалить все assets (soft delete)
            for asset in assets:
                await self.asset_service.soft_delete_asset(
                    user_id=user_id,
                    asset_id=asset.id,
                )
        else:
            # Переместить в root
            for asset in assets:
                await self.asset_service.move_asset(
                    user_id=user_id,
                    asset_id=asset.id,
                    target_folder_id=None,
                )

        # Удалить папку
        await self.folder_service.delete_folder(
            user_id=user_id,
            folder_id=folder_id,
        )

6.5 Отсутствуют Foreign Keys в моделях

Серьезность: СРЕДНЯЯ 🟡 Расположение: backend/src/app/domain/models.py

Проблема: Нет явных relationship и foreign key constraints.

Решение:

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Asset(Base):
    __tablename__ = "assets"

    # ... existing fields ...

    user_id: Mapped[str] = mapped_column(
        String(36),
        ForeignKey("users.id", ondelete="CASCADE"),
        index=True
    )

    folder_id: Mapped[Optional[str]] = mapped_column(
        String(36),
        ForeignKey("folders.id", ondelete="SET NULL"),
        nullable=True,
        index=True
    )

    # Relationships
    user: Mapped["User"] = relationship("User", back_populates="assets")
    folder: Mapped[Optional["Folder"]] = relationship("Folder", back_populates="assets")


class Folder(Base):
    __tablename__ = "folders"

    # ... existing fields ...

    user_id: Mapped[str] = mapped_column(
        String(36),
        ForeignKey("users.id", ondelete="CASCADE"),
        index=True
    )

    parent_folder_id: Mapped[Optional[str]] = mapped_column(
        String(36),
        ForeignKey("folders.id", ondelete="CASCADE"),
        nullable=True,
        index=True
    )

    # Relationships
    user: Mapped["User"] = relationship("User", back_populates="folders")
    parent: Mapped[Optional["Folder"]] = relationship(
        "Folder",
        remote_side="Folder.id",
        back_populates="children"
    )
    children: Mapped[list["Folder"]] = relationship(
        "Folder",
        back_populates="parent",
        cascade="all, delete-orphan"
    )
    assets: Mapped[list["Asset"]] = relationship(
        "Asset",
        back_populates="folder",
        cascade="all, delete-orphan"
    )


class User(Base):
    __tablename__ = "users"

    # ... existing fields ...

    # Relationships
    assets: Mapped[list["Asset"]] = relationship(
        "Asset",
        back_populates="user",
        cascade="all, delete-orphan"
    )
    folders: Mapped[list["Folder"]] = relationship(
        "Folder",
        back_populates="user",
        cascade="all, delete-orphan"
    )

Миграция:

alembic revision -m "add_foreign_keys_and_relationships"

📦 7. ОТСУТСТВУЮЩИЕ КЛЮЧЕВЫЕ ФУНКЦИИ

7.1 Password Reset Flow

Приоритет: ВЫСОКИЙ 🔴

Что нужно:

  1. Endpoint /auth/forgot-password - отправить email с токеном
  2. Endpoint /auth/reset-password - сбросить пароль по токену
  3. Email сервис (SMTP)
  4. Токены сброса пароля с expiration

Пример реализации:

# В models.py
class PasswordResetToken(Base):
    __tablename__ = "password_reset_tokens"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4()))
    user_id: Mapped[str] = mapped_column(String(36), ForeignKey("users.id"), index=True)
    token: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    expires_at: Mapped[datetime] = mapped_column(DateTime)
    used_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())

# В auth_service.py
async def request_password_reset(self, email: str) -> None:
    """Создать токен сброса пароля и отправить email"""
    user = await self.user_repo.get_by_email(email)
    if not user:
        # НЕ раскрывать что пользователь не найден (безопасность)
        logger.info(f"Password reset requested for non-existent email: {email}")
        return

    # Создать токен
    token = secrets.token_urlsafe(32)
    expires_at = datetime.utcnow() + timedelta(hours=1)

    reset_token = PasswordResetToken(
        user_id=user.id,
        token=token,
        expires_at=expires_at,
    )
    self.session.add(reset_token)
    await self.session.commit()

    # Отправить email
    reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}"
    await email_service.send_password_reset(user.email, reset_url)

async def reset_password(self, token: str, new_password: str) -> None:
    """Сбросить пароль по токену"""
    # Найти токен
    result = await self.session.execute(
        select(PasswordResetToken).where(
            PasswordResetToken.token == token,
            PasswordResetToken.used_at.is_(None),
            PasswordResetToken.expires_at > datetime.utcnow(),
        )
    )
    reset_token = result.scalar_one_or_none()

    if not reset_token:
        raise HTTPException(status_code=400, detail="Invalid or expired token")

    # Обновить пароль
    user = await self.user_repo.get_by_id(reset_token.user_id)
    password_hash = self.password_context.hash(new_password)
    user.password_hash = password_hash

    # Пометить токен как использованный
    reset_token.used_at = datetime.utcnow()

    await self.session.commit()

# Endpoints
@router.post("/auth/forgot-password")
async def forgot_password(
    data: ForgotPasswordRequest,
    session: DatabaseSession,
):
    """Запросить сброс пароля"""
    auth_service = AuthService(session)
    await auth_service.request_password_reset(data.email)

    # Всегда возвращать успех (не раскрывать существование email)
    return {"message": "If email exists, reset link has been sent"}

@router.post("/auth/reset-password")
async def reset_password(
    data: ResetPasswordRequest,
    session: DatabaseSession,
):
    """Сбросить пароль"""
    auth_service = AuthService(session)
    await auth_service.reset_password(data.token, data.new_password)
    return {"message": "Password reset successful"}

7.2 Storage Quota Management

Приоритет: ВЫСОКИЙ 🔴

Что нужно:

  1. Поле storage_quota_bytes и storage_used_bytes в User
  2. Проверка квоты перед загрузкой
  3. Endpoint для получения статистики использования
  4. Background job для пересчета использования

Реализация:

# В models.py
class User(Base):
    __tablename__ = "users"

    # ... existing fields ...

    storage_quota_bytes: Mapped[int] = mapped_column(
        BigInteger,
        default=10 * 1024 * 1024 * 1024  # 10GB по умолчанию
    )
    storage_used_bytes: Mapped[int] = mapped_column(
        BigInteger,
        default=0,
        index=True
    )

# В asset_service.py
async def create_upload(
    self,
    user_id: str,
    original_filename: str,
    content_type: str,
    size_bytes: int,
    folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
    # Проверить квоту ДО создания upload
    user = await self.user_repo.get_by_id(user_id)

    if user.storage_used_bytes + size_bytes > user.storage_quota_bytes:
        remaining = user.storage_quota_bytes - user.storage_used_bytes
        raise HTTPException(
            status_code=413,
            detail=f"Storage quota exceeded. "
                   f"Available: {remaining / 1024 / 1024:.2f} MB, "
                   f"Required: {size_bytes / 1024 / 1024:.2f} MB"
        )

    # ... existing code ...

async def finalize_upload(
    self,
    user_id: str,
    asset_id: str,
    etag: Optional[str] = None,
    sha256: Optional[str] = None,
) -> Asset:
    # ... existing code ...

    # Обновить использованное место
    user = await self.user_repo.get_by_id(user_id)
    user.storage_used_bytes += asset.size_bytes
    await self.session.commit()

    return asset

async def soft_delete_asset(
    self,
    user_id: str,
    asset_id: str,
) -> Asset:
    # ... existing code ...

    # НЕ освобождать место при soft delete
    # Место освободится при purge

    return asset

async def purge_asset(
    self,
    user_id: str,
    asset_id: str,
) -> None:
    """Безвозвратно удалить asset"""
    asset = await self.asset_repo.get_by_id(asset_id)
    if not asset or asset.user_id != user_id:
        raise HTTPException(status_code=404, detail="Asset not found")

    # Удалить из S3
    await self.s3_client.delete_object(
        self.config.media_bucket,
        asset.storage_key_original
    )
    if asset.storage_key_thumb:
        await self.s3_client.delete_object(
            self.config.media_bucket,
            asset.storage_key_thumb
        )

    # Освободить место в квоте
    user = await self.user_repo.get_by_id(user_id)
    user.storage_used_bytes = max(0, user.storage_used_bytes - asset.size_bytes)

    # Удалить из БД
    await self.asset_repo.hard_delete(asset)

    await self.session.commit()

# Endpoint для статистики
@router.get("/users/me/storage", response_model=StorageStatsResponse)
async def get_storage_stats(
    current_user: CurrentUser,
):
    """Получить статистику использования хранилища"""
    return StorageStatsResponse(
        quota_bytes=current_user.storage_quota_bytes,
        used_bytes=current_user.storage_used_bytes,
        available_bytes=current_user.storage_quota_bytes - current_user.storage_used_bytes,
        percentage_used=round(
            (current_user.storage_used_bytes / current_user.storage_quota_bytes) * 100,
            2
        ),
    )

7.3 EXIF Metadata Extraction

Приоритет: СРЕДНЯЯ 🟡

Реализация:

pip install pillow pillow-heif  # Для изображений
pip install ffmpeg-python       # Для видео
# В tasks/thumbnail_tasks.py
from PIL import Image
from PIL.ExifTags import TAGS
import ffmpeg

def extract_image_metadata(file_path: str) -> dict:
    """Извлечь EXIF из изображения"""
    try:
        with Image.open(file_path) as img:
            exif_data = img._getexif()
            if not exif_data:
                return {}

            metadata = {}
            for tag_id, value in exif_data.items():
                tag = TAGS.get(tag_id, tag_id)
                metadata[tag] = value

            return {
                "width": img.width,
                "height": img.height,
                "captured_at": metadata.get("DateTimeOriginal"),
                "camera_make": metadata.get("Make"),
                "camera_model": metadata.get("Model"),
                "gps_latitude": metadata.get("GPSLatitude"),
                "gps_longitude": metadata.get("GPSLongitude"),
            }
    except Exception as e:
        logger.error(f"EXIF extraction failed: {e}")
        return {}

def extract_video_metadata(file_path: str) -> dict:
    """Извлечь метаданные из видео"""
    try:
        probe = ffmpeg.probe(file_path)
        video_stream = next(
            s for s in probe['streams'] if s['codec_type'] == 'video'
        )

        return {
            "width": int(video_stream.get('width', 0)),
            "height": int(video_stream.get('height', 0)),
            "duration_sec": float(probe['format'].get('duration', 0)),
            "codec": video_stream.get('codec_name'),
            "bitrate": int(probe['format'].get('bit_rate', 0)),
        }
    except Exception as e:
        logger.error(f"Video metadata extraction failed: {e}")
        return {}

@celery_app.task
def generate_thumbnail_and_extract_metadata(asset_id: str):
    """Генерировать thumbnail И извлечь metadata"""
    # ... existing thumbnail generation ...

    # Извлечь metadata
    if asset.type == AssetType.PHOTO:
        metadata = extract_image_metadata(temp_file_path)
    elif asset.type == AssetType.VIDEO:
        metadata = extract_video_metadata(temp_file_path)

    # Обновить asset
    if metadata.get("width"):
        asset.width = metadata["width"]
    if metadata.get("height"):
        asset.height = metadata["height"]
    if metadata.get("captured_at"):
        asset.captured_at = parse_datetime(metadata["captured_at"])
    if metadata.get("duration_sec"):
        asset.duration_sec = metadata["duration_sec"]

    db.commit()

Приоритет: ВЫСОКИЙ 🔴

Простая реализация с PostgreSQL full-text search:

# В asset_repository.py
async def search_assets(
    self,
    user_id: str,
    query: str,
    limit: int = 50,
) -> list[Asset]:
    """Поиск assets по имени файла"""
    search_query = (
        select(Asset)
        .where(Asset.user_id == user_id)
        .where(Asset.deleted_at.is_(None))
        .where(Asset.status == AssetStatus.READY)
        .where(Asset.original_filename.ilike(f"%{query}%"))
        .order_by(Asset.created_at.desc())
        .limit(limit)
    )

    result = await self.session.execute(search_query)
    return list(result.scalars().all())

# Endpoint
@router.get("/assets/search", response_model=AssetListResponse)
async def search_assets(
    q: str = Query(..., min_length=1),
    current_user: CurrentUser,
    session: DatabaseSession,
    s3_client: S3ClientDep,
    limit: int = Query(50, ge=1, le=100),
):
    """Поиск файлов по имени"""
    asset_service = AssetService(session, s3_client)
    assets = await asset_service.search_assets(
        user_id=current_user.id,
        query=q,
        limit=limit,
    )
    return AssetListResponse(
        items=assets,
        next_cursor=None,
        has_more=False,
    )

7.5 Database Backup Strategy

Приоритет: КРИТИЧЕСКИЙ 🔴

Для SQLite:

#!/bin/bash
# backup_db.sh

BACKUP_DIR="/backups"
DB_PATH="/app/data/app.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/backup_$TIMESTAMP.db"

# Создать backup
sqlite3 $DB_PATH ".backup '$BACKUP_FILE'"

# Сжать
gzip $BACKUP_FILE

# Загрузить в S3
aws s3 cp "${BACKUP_FILE}.gz" "s3://your-backup-bucket/database/"

# Удалить старые локальные backups (старше 7 дней)
find $BACKUP_DIR -name "backup_*.db.gz" -mtime +7 -delete

# Удалить старые S3 backups (старше 30 дней)
aws s3 ls "s3://your-backup-bucket/database/" | \
  awk '{print $4}' | \
  while read file; do
    # ... deletion logic
  done

Cron job:

# Каждые 6 часов
0 */6 * * * /app/scripts/backup_db.sh >> /var/log/backup.log 2>&1

Для PostgreSQL:

#!/bin/bash
# pg_backup.sh

TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="backup_$TIMESTAMP.sql.gz"

# pg_dump с сжатием
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > "/tmp/$BACKUP_FILE"

# Загрузить в S3
aws s3 cp "/tmp/$BACKUP_FILE" "s3://your-backup-bucket/database/"

# Cleanup
rm "/tmp/$BACKUP_FILE"

🧪 8. ТЕСТИРОВАНИЕ

8.1 Текущее покрытие: <10%

Найдено только: backend/tests/test_security.py

Что КРИТИЧЕСКИ нужно:

  1. Unit тесты для services:
# tests/unit/services/test_asset_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock

@pytest.mark.asyncio
async def test_create_upload_exceeds_quota():
    """Тест: загрузка превышает квоту"""
    user = User(
        id="user1",
        storage_quota_bytes=1000,
        storage_used_bytes=900,
    )

    asset_service = AssetService(mock_session, mock_s3)

    with pytest.raises(HTTPException) as exc:
        await asset_service.create_upload(
            user_id=user.id,
            original_filename="large.jpg",
            content_type="image/jpeg",
            size_bytes=200,  # 900 + 200 > 1000
        )

    assert exc.value.status_code == 413
    assert "quota exceeded" in exc.value.detail.lower()
  1. Integration тесты для API:
# tests/integration/test_auth_api.py
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_login_rate_limiting(client: AsyncClient):
    """Тест: rate limiting блокирует brute force"""

    # 5 неудачных попыток
    for i in range(5):
        response = await client.post("/api/v1/auth/login", json={
            "email": "test@example.com",
            "password": "wrong"
        })
        assert response.status_code == 401

    # 6-я попытка должна быть заблокирована
    response = await client.post("/api/v1/auth/login", json={
        "email": "test@example.com",
        "password": "wrong"
    })
    assert response.status_code == 429
  1. Security тесты:
# tests/security/test_path_traversal.py
@pytest.mark.asyncio
async def test_filename_sanitization():
    """Тест: path traversal защита"""
    malicious_filenames = [
        "../../../etc/passwd",
        "..\\..\\..\\windows\\system32",
        "test\x00.jpg.exe",
        "folder/../../../secret.txt",
    ]

    for filename in malicious_filenames:
        sanitized = sanitize_filename(filename)

        # Проверить, что путь не содержит separators
        assert "/" not in sanitized
        assert "\\" not in sanitized
        assert "\x00" not in sanitized
        assert ".." not in sanitized
  1. Load тесты:
# tests/load/test_upload_performance.py
from locust import HttpUser, task, between

class UploadUser(HttpUser):
    wait_time = between(1, 3)

    @task
    def upload_file(self):
        # Симулировать загрузку файла
        files = {'file': ('test.jpg', b'fake image data', 'image/jpeg')}
        self.client.post("/api/v1/uploads/create", files=files)

Цель: Достичь минимум 70% code coverage


📊 9. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ

9.1 Frontend Type Safety

Проблема: Много any типов в frontend/src/services/api.ts

Решение:

// Заменить все any на конкретные типы

export interface Folder {
  id: string;
  user_id: string;
  name: string;
  parent_folder_id: string | null;
  created_at: string;
  updated_at: string;
}

export interface FolderListResponse {
  items: Folder[];
}

export interface FolderBreadcrumb {
  id: string;
  name: string;
}

// В api.ts
async listFolders(
  parentFolderId?: string | null,
  all: boolean = false
): Promise<FolderListResponse> {
  // ...
}

async getFolder(folderId: string): Promise<Folder> {
  // ...
}

async getFolderBreadcrumbs(folderId: string): Promise<FolderBreadcrumb[]> {
  // ...
}

9.2 Logging Strategy

Добавить structured logging:

# В config.py
import sys
from loguru import logger

# Настроить loguru
logger.remove()  # Удалить дефолтный handler

if settings.APP_ENV == "prod":
    # Production: JSON формат для парсинга
    logger.add(
        sys.stdout,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
        level="INFO",
        serialize=True,  # JSON output
    )
else:
    # Development: красивый формат
    logger.add(
        sys.stdout,
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
        level="DEBUG",
        colorize=True,
    )

# Логировать в файл
logger.add(
    "logs/app.log",
    rotation="100 MB",
    retention="30 days",
    compression="zip",
    level="INFO",
)

# Middleware для логирования всех requests
from starlette.middleware.base import BaseHTTPMiddleware
import time

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start_time = time.time()

        # Логировать request
        logger.info(
            f"Request started",
            extra={
                "method": request.method,
                "path": request.url.path,
                "client": request.client.host if request.client else None,
            }
        )

        try:
            response = await call_next(request)

            # Логировать response
            duration = time.time() - start_time
            logger.info(
                f"Request completed",
                extra={
                    "method": request.method,
                    "path": request.url.path,
                    "status_code": response.status_code,
                    "duration_ms": round(duration * 1000, 2),
                }
            )

            return response

        except Exception as e:
            duration = time.time() - start_time
            logger.error(
                f"Request failed",
                extra={
                    "method": request.method,
                    "path": request.url.path,
                    "error": str(e),
                    "duration_ms": round(duration * 1000, 2),
                }
            )
            raise

app.add_middleware(RequestLoggingMiddleware)

9.3 Environment Validation

Проверять критичные env vars при старте:

# В main.py
from app.infra.config import settings

def validate_production_config():
    """Проверить конфигурацию для продакшена"""
    errors = []

    if settings.APP_ENV == "prod":
        # Проверить JWT secret
        if settings.JWT_SECRET in ["your-secret-key-change-this-in-production", "changeme"]:
            errors.append("JWT_SECRET must be changed in production")

        # Проверить S3 конфигурацию
        if not settings.S3_ENDPOINT_URL or "localhost" in settings.S3_ENDPOINT_URL:
            errors.append("S3_ENDPOINT_URL must be set to production S3")

        # Проверить database
        if "sqlite" in settings.DATABASE_URL.lower():
            logger.warning("Using SQLite in production - consider PostgreSQL")

        # Проверить CORS
        if "*" in settings.CORS_ORIGINS:
            errors.append("CORS_ORIGINS should not contain '*' in production")

    if errors:
        logger.error("Production configuration errors:")
        for error in errors:
            logger.error(f"  - {error}")
        raise ValueError("Invalid production configuration")

@app.on_event("startup")
async def startup():
    validate_production_config()
    logger.info(f"Application started in {settings.APP_ENV} mode")

🎯 10. ROADMAP ПО ПРИОРИТЕТАМ

ФАЗА 1: Критическая безопасность (1-2 недели) 🔴

Блокирует продакшен

  1. Implement rate limiting (slowapi)
  2. Add token revocation mechanism (Redis blacklist)
  3. Implement refresh token rotation
  4. Add storage quota management
  5. Enable S3 server-side encryption
  6. Fix memory exhaustion in file uploads (streaming)
  7. Add strong password validation
  8. Implement account lockout after failed attempts
  9. Reduce error message verbosity
  10. Add security headers middleware

Результат: Приложение защищено от базовых атак


ФАЗА 2: Core Features (2-3 недели) 🟡

Необходимо для MVP

  1. Implement soft delete / trash functionality (SPEC REQUIREMENT)
  2. Add password reset flow
  3. Implement multipart upload для больших файлов
  4. Add asset search by filename
  5. Extract EXIF metadata (captured_at, dimensions)
  6. Add database indexes для производительности
  7. Fix ZIP creation (stream instead of memory)
  8. Add foreign key constraints

Результат: Полнофункциональное MVP


ФАЗА 3: Production Readiness (2-3 недели) 🟢

DevOps и мониторинг

  1. Comprehensive test suite (70%+ coverage)
  2. Database backup automation
  3. Monitoring & alerting setup (Prometheus + Grafana)
  4. Error tracking (Sentry integration)
  5. CI/CD pipeline (GitHub Actions)
  6. Environment configuration validation
  7. Structured logging (loguru + JSON)
  8. Documentation completion
  9. Load testing (Locust)
  10. Security penetration testing

Результат: Готово к продакшену


ФАЗА 4: Enhancements (постоянно) 💚

Улучшения UX

  1. Video transcoding & adaptive streaming (HLS)
  2. Albums feature
  3. Tags system
  4. Advanced search & filters (date range, type, size)
  5. Share analytics & permissions
  6. Image optimization (WebP conversion)
  7. Duplicate detection (SHA256 deduplication)
  8. Two-factor authentication (TOTP)
  9. Email verification
  10. Shared albums (collaborative)

Результат: Feature-rich продукт


📋 11. SUMMARY: КРИТИЧЕСКИЕ ДЕЙСТВИЯ

Что сделать ПРЯМО СЕЙЧАС перед продакшеном:

  1. Установить rate limiting - защита от brute force
  2. Реализовать soft delete - требование спецификации
  3. Добавить storage quota - предотвратить злоупотребление
  4. Включить S3 encryption - защита данных в покое
  5. Стримить файлы - предотвратить OOM
  6. Добавить тесты - минимум 50% coverage
  7. Настроить backups - защита от потери данных
  8. Валидировать env vars - предотвратить misconfig

Оценка времени:

  • Минимум для продакшена: 4 недели (Фаза 1 + критичное из Фазы 2)
  • Полное MVP: 6-8 недель (Фазы 1-3)
  • Production-grade: 10-12 недель (все фазы)

🎓 ЗАКЛЮЧЕНИЕ

Проект ITCloud демонстрирует хорошую архитектурную основу и правильные практики в некоторых областях:

Что сделано хорошо:

  • Clean Architecture с четким разделением слоев
  • Безопасное хеширование паролей (Argon2)
  • Защита от path traversal
  • JWT аутентификация
  • Pre-signed URLs для S3
  • Async/await паттерны
  • Docker setup

Критические проблемы:

  • Отсутствует rate limiting (BRUTE FORCE VULNERABLE)
  • Нет soft delete (SPEC VIOLATION)
  • Нет storage quota (ABUSE VULNERABLE)
  • Memory exhaustion риски
  • Минимальное тестирование
  • Нет token revocation

ИТОГОВАЯ ОЦЕНКА: 6/10 (хорошая база, но не готов к продакшену)

РЕКОМЕНДАЦИЯ: Выполнить Фазу 1 и критичные элементы Фазы 2 перед любым публичным запуском.


Дата аудита: 2026-01-05 Версия документа: 1.0 Следующий аудит: После внедрения рекомендаций Фазы 1