# ITCloud - Полный Аудит Безопасности и Рекомендации **Дата:** 2026-01-05 **Аудитор:** Claude Code (Senior Security & Architecture Review) **Версия:** 1.0 --- ## 📋 EXECUTIVE SUMMARY Проведен комплексный аудит проекта ITCloud (облачное хранилище фото/видео). Проект демонстрирует хорошую архитектуру (Clean Architecture) и некоторые правильные практики безопасности, но имеет **критические уязвимости** и **отсутствуют ключевые функции**. ### Статус готовности к продакшену: ❌ НЕ ГОТОВ **Критические блокеры:** 1. ❌ Отсутствует rate limiting (защита от brute force) 2. ❌ Нет soft delete/корзины (указано в требованиях) 3. ❌ Отсутствует управление квотами хранилища 4. ❌ Нет механизма отзыва токенов 5. ❌ Отсутствует стратегия бэкапов БД 6. ❌ Минимальное покрытие тестами (<10%) **Оценка времени до готовности к продакшену: 6-8 недель** --- ## 🔴 1. КРИТИЧЕСКИЕ УЯЗВИМОСТИ БЕЗОПАСНОСТИ ### 1.1 Отсутствует Rate Limiting **Серьезность: КРИТИЧЕСКАЯ** 🔴 **Расположение:** Все API endpoints **Файлы:** `backend/src/app/main.py` **Проблема:** - Нет ограничений на количество запросов - Атакующий может делать неограниченное количество попыток логина - Возможна DoS атака через перегрузку API **Риски:** - Brute force атаки на пароли пользователей - Credential stuffing атаки - Исчерпание ресурсов сервера - Спам загрузок файлов **Решение:** ```python # Установить: pip install slowapi from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Для каждого endpoint: @router.post("/login") @limiter.limit("5/minute") # 5 попыток в минуту async def login(...): ... @router.post("/uploads/create") @limiter.limit("100/hour") # 100 загрузок в час async def create_upload(...): ... ``` **Рекомендуемые лимиты:** - `/auth/login`: 5 запросов/минуту - `/auth/register`: 3 запроса/час - `/uploads/create`: 100 запросов/час - Остальные endpoints: 1000 запросов/час --- ### 1.2 JWT Secret по умолчанию слабый **Серьезность: КРИТИЧЕСКАЯ** 🔴 **Расположение:** `backend/.env.example:18` **Проблема:** ```env JWT_SECRET=your-secret-key-change-this-in-production ``` Если пользователь задеплоит с дефолтным секретом, токены можно подделать. **Решение:** 1. Добавить валидацию при старте: ```python # В config.py if settings.APP_ENV == "prod" and settings.JWT_SECRET in [ "your-secret-key-change-this-in-production", "changeme", "secret", ]: raise ValueError("КРИТИЧЕСКАЯ ОШИБКА: Используется слабый JWT_SECRET в продакшене!") ``` 2. Генерировать случайный секрет при деплое: ```python import secrets print(secrets.token_urlsafe(64)) ``` --- ### 1.3 Отсутствует ограничение размера при чтении файла в память **Серьезность: ВЫСОКАЯ** 🔴 **Расположение:** `backend/src/app/services/asset_service.py:138-182` **Проблема:** ```python # Строка 151 file_data = await file.read() # Читает ВЕСЬ файл в память! ``` Атакующий может загрузить огромный файл и вызвать OOM (Out Of Memory). **Решение:** Стримить файл чанками в S3: ```python async def upload_file_to_s3( self, user_id: str, asset_id: str, file: UploadFile, ) -> Asset: asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException(status_code=404, detail="Asset not found") # Проверить размер ПЕРЕД чтением if file.size and file.size > self.config.max_upload_size_bytes: raise HTTPException(status_code=413, detail="File too large") # Стрим в S3 чанками try: await self.s3_client.upload_fileobj_streaming( file.file, # file-like object self.config.media_bucket, asset.storage_key_original, content_type=asset.content_type, ) except Exception as e: logger.error(f"S3 upload failed: {e}") raise HTTPException(status_code=500, detail="Upload failed") ``` Добавить в `S3Client`: ```python async def upload_fileobj_streaming( self, fileobj, bucket: str, key: str, content_type: str, chunk_size: int = 8 * 1024 * 1024, # 8MB chunks ): """Stream file to S3 in chunks""" try: await asyncio.to_thread( self.client.upload_fileobj, fileobj, bucket, key, ExtraArgs={ 'ContentType': content_type, 'ServerSideEncryption': 'AES256', # Важно! }, Config=boto3.s3.transfer.TransferConfig( multipart_threshold=chunk_size, multipart_chunksize=chunk_size, ), ) except ClientError as e: logger.error(f"S3 upload error: {e}") raise ``` --- ### 1.4 Отсутствует CSRF защита **Серьезность: ВЫСОКАЯ** 🟡 (частично смягчено Bearer токенами) **Расположение:** `backend/src/app/main.py` **Текущее состояние:** - Используются Bearer токены в заголовках → CSRF не критичен - НО: если в будущем перейдете на cookie-based auth → станет критично **Рекомендация на будущее:** ```python from fastapi_csrf_protect import CsrfProtect @app.post("/api/v1/assets/{asset_id}/delete") async def delete_asset( asset_id: str, csrf_protect: CsrfProtect = Depends(), ): await csrf_protect.validate_csrf(request) # ... ``` --- ### 1.5 Небезопасная генерация share токенов **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/repositories/share_repository.py:25-27` **Проблема:** ```python token = secrets.token_urlsafe(32) # Хорошо! # НО нет проверки на коллизии ``` **Решение:** ```python async def create( self, owner_user_id: str, asset_id: Optional[str] = None, album_id: Optional[str] = None, expires_at: Optional[datetime] = None, password_hash: Optional[str] = None, ) -> Share: # Генерировать с проверкой уникальности max_attempts = 5 for attempt in range(max_attempts): token = secrets.token_urlsafe(32) # Проверить, что токен не существует existing = await self.session.execute( select(Share).where(Share.token == token) ) if not existing.scalar_one_or_none(): break else: raise RuntimeError("Failed to generate unique share token") share = Share( owner_user_id=owner_user_id, asset_id=asset_id, album_id=album_id, token=token, expires_at=expires_at, password_hash=password_hash, ) self.session.add(share) await self.session.flush() await self.session.refresh(share) return share ``` --- ## 🟡 2. ПРОБЛЕМЫ АУТЕНТИФИКАЦИИ И АВТОРИЗАЦИИ ### 2.1 Отсутствует механизм отзыва токенов **Серьезность: ВЫСОКАЯ** 🔴 **Расположение:** `backend/src/app/infra/security.py` **Проблема:** - JWT токены включают `jti` (token ID), но нет механизма отзыва - Если токен скомпрометирован, он остается валидным до истечения срока (15 минут для access, 14 дней для refresh) - Пользователь не может "выйти со всех устройств" **Решение с Redis:** ```python # В infra/redis_client.py from redis.asyncio import Redis class TokenBlacklist: def __init__(self, redis: Redis): self.redis = redis async def revoke_token(self, jti: str, ttl_seconds: int): """Добавить токен в blacklist""" await self.redis.setex(f"blacklist:{jti}", ttl_seconds, "1") async def is_revoked(self, jti: str) -> bool: """Проверить, отозван ли токен""" return await self.redis.exists(f"blacklist:{jti}") > 0 # В security.py async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db), blacklist: TokenBlacklist = Depends(get_token_blacklist), ) -> User: try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"]) jti: str = payload.get("jti") # Проверить blacklist if await blacklist.is_revoked(jti): raise HTTPException(status_code=401, detail="Token has been revoked") # ... остальная логика ``` **Добавить endpoint:** ```python @router.post("/auth/logout") async def logout( current_user: CurrentUser, token: str = Depends(oauth2_scheme), blacklist: TokenBlacklist = Depends(get_token_blacklist), ): """Отозвать текущий токен""" payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"]) jti = payload.get("jti") exp = payload.get("exp") ttl = exp - int(datetime.utcnow().timestamp()) await blacklist.revoke_token(jti, ttl) return {"message": "Logged out successfully"} ``` --- ### 2.2 Нет refresh token rotation **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/api/v1/auth.py` **Проблема:** - Нет endpoint для обновления access token через refresh token - Пользователи должны логиниться заново каждые 15 минут **Решение:** ```python @router.post("/auth/refresh", response_model=AuthTokens) async def refresh_token( refresh_token: str = Body(..., embed=True), db: AsyncSession = Depends(get_db), blacklist: TokenBlacklist = Depends(get_token_blacklist), ): """Обновить access token используя refresh token""" try: # Декодировать refresh token payload = jwt.decode( refresh_token, settings.JWT_SECRET, algorithms=["HS256"] ) # Проверить тип токена if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid token type") # Проверить blacklist jti = payload.get("jti") if await blacklist.is_revoked(jti): raise HTTPException(status_code=401, detail="Token has been revoked") user_id = payload.get("sub") # Получить пользователя user_repo = UserRepository(db) user = await user_repo.get_by_id(user_id) if not user or not user.is_active: raise HTTPException(status_code=401, detail="User not found") # ВАЖНО: Отозвать старый refresh token (rotation) exp = payload.get("exp") ttl = exp - int(datetime.utcnow().timestamp()) await blacklist.revoke_token(jti, ttl) # Сгенерировать новые токены auth_service = AuthService(db) tokens = auth_service.create_tokens(user) return tokens except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token") ``` **Frontend:** ```typescript // В api.ts async refreshAccessToken(): Promise { const refreshToken = localStorage.getItem('refresh_token'); if (!refreshToken) { throw new Error('No refresh token'); } const { data } = await this.client.post('/auth/refresh', { refresh_token: refreshToken, }); localStorage.setItem('access_token', data.access_token); localStorage.setItem('refresh_token', data.refresh_token); return data; } // Добавить в interceptor this.client.interceptors.response.use( (response) => response, async (error) => { const originalRequest = error.config; if (error.response?.status === 401 && !originalRequest._retry) { originalRequest._retry = true; try { await this.refreshAccessToken(); return this.client(originalRequest); } catch (refreshError) { // Refresh failed, redirect to login localStorage.removeItem('access_token'); localStorage.removeItem('refresh_token'); if (!['/login', '/register'].includes(window.location.pathname)) { window.location.href = '/login'; } return Promise.reject(refreshError); } } return Promise.reject(error); } ); ``` --- ### 2.3 Слабая валидация паролей **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/api/schemas.py:16` **Текущее состояние:** ```python password: str = Field(min_length=8) ``` Можно установить пароль "12345678" ❌ **Решение:** ```python import re from pydantic import field_validator class UserRegisterRequest(BaseModel): email: EmailStr password: str = Field(min_length=8, max_length=128) @field_validator("password") @classmethod def validate_password_strength(cls, v: str) -> str: """Проверка сложности пароля""" if len(v) < 8: raise ValueError("Password must be at least 8 characters") # Проверки has_upper = re.search(r'[A-Z]', v) has_lower = re.search(r'[a-z]', v) has_digit = re.search(r'\d', v) has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', v) checks_passed = sum([ bool(has_upper), bool(has_lower), bool(has_digit), bool(has_special), ]) if checks_passed < 3: raise ValueError( "Password must contain at least 3 of: " "uppercase letter, lowercase letter, digit, special character" ) # Проверка на распространенные пароли common_passwords = [ "password", "12345678", "qwerty123", "admin123" ] if v.lower() in common_passwords: raise ValueError("This password is too common") return v ``` --- ### 2.4 Нет блокировки аккаунта после неудачных попыток **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/services/auth_service.py:55-84` **Проблема:** Можно делать бесконечные попытки логина (если нет rate limiting) **Решение с Redis:** ```python class LoginAttemptTracker: def __init__(self, redis: Redis): self.redis = redis self.max_attempts = 5 self.lockout_duration = 900 # 15 минут async def record_failed_attempt(self, email: str): """Записать неудачную попытку""" key = f"login_attempts:{email}" attempts = await self.redis.incr(key) if attempts == 1: # Установить TTL на первую попытку await self.redis.expire(key, 3600) # 1 час if attempts >= self.max_attempts: # Заблокировать аккаунт await self.redis.setex( f"account_locked:{email}", self.lockout_duration, "1" ) async def clear_attempts(self, email: str): """Очистить счетчик после успешного логина""" await self.redis.delete(f"login_attempts:{email}") async def is_locked(self, email: str) -> bool: """Проверить, заблокирован ли аккаунт""" return await self.redis.exists(f"account_locked:{email}") > 0 async def get_lockout_remaining(self, email: str) -> int: """Получить оставшееся время блокировки (секунды)""" return await self.redis.ttl(f"account_locked:{email}") # В auth.py @router.post("/login", response_model=AuthResponse) async def login( data: UserLoginRequest, session: DatabaseSession, tracker: LoginAttemptTracker = Depends(get_login_tracker), ): # Проверить блокировку if await tracker.is_locked(data.email): remaining = await tracker.get_lockout_remaining(data.email) raise HTTPException( status_code=429, detail=f"Account locked due to too many failed attempts. " f"Try again in {remaining // 60} minutes." ) auth_service = AuthService(session) try: user, tokens = await auth_service.login(data.email, data.password) # Успешный логин - очистить счетчик await tracker.clear_attempts(data.email) return AuthResponse(user=user, tokens=tokens) except HTTPException as e: if e.status_code == 401: # Неудачная попытка await tracker.record_failed_attempt(data.email) raise ``` --- ## 🔍 3. ПРОБЛЕМЫ ВАЛИДАЦИИ ВХОДНЫХ ДАННЫХ ### 3.1 Недостаточная валидация Content-Type **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/api/schemas.py:85-91` **Текущее состояние:** ```python @field_validator("content_type") @classmethod def validate_content_type(cls, v: str) -> str: if not (v.startswith("image/") or v.startswith("video/")): raise ValueError("Only image/* and video/* content types are supported") return v ``` Принимает ЛЮБОЙ image/* или video/* тип, включая опасные (SVG с JS, вредоносные кодеки) **Решение - whitelist:** ```python ALLOWED_IMAGE_TYPES = { "image/jpeg", "image/jpg", "image/png", "image/gif", "image/webp", "image/heic", "image/heif", } ALLOWED_VIDEO_TYPES = { "video/mp4", "video/mpeg", "video/quicktime", # .mov "video/x-msvideo", # .avi "video/x-matroska", # .mkv "video/webm", } @field_validator("content_type") @classmethod def validate_content_type(cls, v: str) -> str: v = v.lower().strip() if v not in ALLOWED_IMAGE_TYPES and v not in ALLOWED_VIDEO_TYPES: raise ValueError( f"Content type '{v}' not supported. " f"Allowed: {', '.join(ALLOWED_IMAGE_TYPES | ALLOWED_VIDEO_TYPES)}" ) return v ``` --- ### 3.2 Отсутствует проверка "магических байтов" **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** Upload flow **Проблема:** Полагаемся на content-type от клиента. Вредоносный файл может притвориться изображением. **Решение с python-magic:** ```bash pip install python-magic-bin # Windows pip install python-magic # Linux/Mac ``` ```python import magic async def verify_file_type(file: UploadFile, expected_type: str) -> bool: """Проверить реальный тип файла по магическим байтам""" # Прочитать первые 2048 байт header = await file.read(2048) await file.seek(0) # Вернуться в начало # Определить MIME type mime = magic.from_buffer(header, mime=True) # Проверить соответствие if expected_type.startswith("image/"): return mime in ALLOWED_IMAGE_TYPES elif expected_type.startswith("video/"): return mime in ALLOWED_VIDEO_TYPES return False # В asset_service.py async def create_upload( self, user_id: str, original_filename: str, content_type: str, size_bytes: int, folder_id: Optional[str] = None, ) -> tuple[Asset, dict]: # ... существующий код ... # Добавить в метаданные для проверки при finalize asset.metadata = { "expected_content_type": content_type, "needs_verification": True, } async def finalize_upload( self, user_id: str, asset_id: str, etag: Optional[str] = None, sha256: Optional[str] = None, ) -> Asset: # ... существующий код ... # Проверить магические байты если требуется if asset.metadata.get("needs_verification"): # Загрузить первые байты из S3 try: response = await self.s3_client.get_object_range( self.config.media_bucket, asset.storage_key_original, bytes_range=(0, 2047), ) header = response['Body'].read() mime = magic.from_buffer(header, mime=True) expected = asset.metadata.get("expected_content_type") # Проверить соответствие if (expected.startswith("image/") and mime not in ALLOWED_IMAGE_TYPES) or \ (expected.startswith("video/") and mime not in ALLOWED_VIDEO_TYPES): # Удалить файл из S3 await self.s3_client.delete_object( self.config.media_bucket, asset.storage_key_original ) asset.status = AssetStatus.FAILED await self.asset_repo.update(asset) raise HTTPException( status_code=400, detail="File type verification failed" ) except Exception as e: logger.warning(f"File verification failed: {e}") ``` --- ### 3.3 Защита от Path Traversal реализована ✅ **Серьезность: N/A (ХОРОШО)** ✅ **Расположение:** `backend/src/app/services/asset_service.py:23-50` **Статус:** Правильно реализовано: - Используется `os.path.basename()` - Удаляются path separators и null bytes - Ограничение длины имени файла Это **правильная** реализация. Оставить как есть. --- ## 🗄️ 4. БЕЗОПАСНОСТЬ S3 ХРАНИЛИЩА ### 4.1 Отсутствует шифрование S3 объектов **Серьезность: СРЕДНЯЯ-ВЫСОКАЯ** 🟡 **Расположение:** `backend/src/app/infra/s3_client.py` **Проблема:** Файлы хранятся в S3 без шифрования на стороне сервера. **Решение:** ```python # Во ВСЕХ методах загрузки добавить ServerSideEncryption async def upload_object( self, bucket: str, key: str, data: bytes, content_type: str, ) -> None: try: await asyncio.to_thread( self.client.put_object, Bucket=bucket, Key=key, Body=data, ContentType=content_type, ServerSideEncryption='AES256', # ← ДОБАВИТЬ ) except ClientError as e: logger.error(f"S3 upload error: {e}") raise # Для presigned URLs тоже нужно добавить def generate_presigned_post( self, bucket: str, key: str, content_type: str, max_size_bytes: int, expires_in: int = 600, ) -> dict: try: return self.client.generate_presigned_post( Bucket=bucket, Key=key, Fields={ "Content-Type": content_type, "x-amz-server-side-encryption": "AES256", # ← ДОБАВИТЬ }, Conditions=[ {"Content-Type": content_type}, ["content-length-range", 1, max_size_bytes], {"x-amz-server-side-encryption": "AES256"}, # ← ДОБАВИТЬ ], ExpiresIn=expires_in, ) except ClientError as e: logger.error(f"Presigned POST generation error: {e}") raise ``` **Важно:** Также настроить bucket policy: ```json { "Version": "2012-10-17", "Statement": [ { "Sid": "DenyUnencryptedObjectUploads", "Effect": "Deny", "Principal": "*", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::your-bucket-name/*", "Condition": { "StringNotEquals": { "s3:x-amz-server-side-encryption": "AES256" } } } ] } ``` --- ### 4.2 Hardcoded bucket name для корзины **Серьезность: НИЗКАЯ** 🟢 **Расположение:** `backend/src/app/infra/s3_client.py:28, :152` **Проблема:** ```python TRASH_BUCKET = "itcloud-trash" # Hardcoded ``` **Решение:** ```python # В config.py class Settings(BaseSettings): # ... existing ... MEDIA_BUCKET: str TRASH_BUCKET: str = "itcloud-trash" # default, но можно переопределить # В s3_client.py def __init__(self, config: Settings): self.config = config # ... existing ... self.trash_bucket = config.TRASH_BUCKET # Использовать из конфига ``` --- ### 4.3 Слишком длинный TTL для pre-signed URLs **Серьезность: НИЗКАЯ** 🟢 **Расположение:** `backend/src/app/infra/config.py:43` **Текущее:** ```python SIGNED_URL_TTL_SECONDS: int = 600 # 10 минут ``` **Рекомендация:** Для продакшена уменьшить до 300 секунд (5 минут) для чувствительного контента: ```python SIGNED_URL_TTL_SECONDS: int = Field(default=300, ge=60, le=3600) ``` --- ## 🔐 5. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ПО БЕЗОПАСНОСТИ ### 5.1 CORS Headers слишком permissive **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/main.py:34-37` **Проблема:** ```python allow_headers=["*"], # Разрешены ВСЕ заголовки expose_headers=["*"], # Экспортируются ВСЕ заголовки ``` **Решение:** ```python app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS.split(","), allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"], # Whitelist конкретных заголовков allow_headers=[ "Authorization", "Content-Type", "X-Requested-With", "Accept", ], # Экспортировать только нужные expose_headers=[ "Content-Length", "Content-Type", "X-Total-Count", ], max_age=3600, ) ``` --- ### 5.2 Добавить Security Headers **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/main.py` **Добавить middleware для security headers:** ```python from starlette.middleware.base import BaseHTTPMiddleware class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): response = await call_next(request) # Защита от clickjacking response.headers["X-Frame-Options"] = "DENY" # Защита от XSS response.headers["X-Content-Type-Options"] = "nosniff" # Защита от XSS для старых браузеров response.headers["X-XSS-Protection"] = "1; mode=block" # Content Security Policy response.headers["Content-Security-Policy"] = ( "default-src 'self'; " "img-src 'self' data: https:; " "script-src 'self'; " "style-src 'self' 'unsafe-inline';" ) # HSTS (если используется HTTPS) if request.url.scheme == "https": response.headers["Strict-Transport-Security"] = ( "max-age=31536000; includeSubDomains" ) # Referrer Policy response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" # Permissions Policy response.headers["Permissions-Policy"] = ( "geolocation=(), microphone=(), camera=()" ) return response app.add_middleware(SecurityHeadersMiddleware) ``` --- ### 5.3 Токены в localStorage (Frontend) **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `frontend/src/services/api.ts:30-34, :62-64` **Проблема:** localStorage доступен для XSS атак. **Альтернативы:** 1. **httpOnly cookies** для refresh token (лучше всего) 2. **Memory storage** для access token (но теряется при refresh страницы) **Компромиссное решение:** ```typescript // Хранить refresh token в httpOnly cookie (настроить на бэкенде) // Access token держать в памяти class TokenManager { private accessToken: string | null = null; setAccessToken(token: string) { this.accessToken = token; // Также можно в sessionStorage (лучше чем localStorage) sessionStorage.setItem('access_token', token); } getAccessToken(): string | null { if (this.accessToken) return this.accessToken; return sessionStorage.getItem('access_token'); } clearTokens() { this.accessToken = null; sessionStorage.removeItem('access_token'); // Refresh token будет удален через httpOnly cookie с бэкенда } } ``` --- ## 🏗️ 6. АРХИТЕКТУРНЫЕ УЛУЧШЕНИЯ ### 6.1 Отсутствует Soft Delete для Assets ❌ **Серьезность: КРИТИЧЕСКАЯ** 🔴 **Расположение:** `backend/src/app/domain/models.py` **Проблема:** В спецификации (CLAUDE.md) указано: > "Soft delete to trash with restore capability" Но в модели Asset нет поля `deleted_at`: ```python class Asset(Base): __tablename__ = "assets" # ... fields ... # НЕТ: deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime) ``` **Решение:** 1. **Создать миграцию:** ```bash cd backend alembic revision -m "add_soft_delete_to_assets" ``` 2. **Добавить в миграцию:** ```python def upgrade() -> None: op.add_column('assets', sa.Column('deleted_at', sa.DateTime(), nullable=True)) op.create_index('ix_assets_deleted_at', 'assets', ['deleted_at']) def downgrade() -> None: op.drop_index('ix_assets_deleted_at', table_name='assets') op.drop_column('assets', 'deleted_at') ``` 3. **Обновить модель:** ```python from typing import Optional from datetime import datetime from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy import DateTime class Asset(Base): __tablename__ = "assets" # ... existing fields ... deleted_at: Mapped[Optional[datetime]] = mapped_column( DateTime, nullable=True, index=True, default=None ) ``` 4. **Добавить методы в AssetRepository:** ```python async def soft_delete(self, asset: Asset) -> Asset: """Мягкое удаление (в корзину)""" asset.deleted_at = datetime.utcnow() await self.session.flush() await self.session.refresh(asset) return asset async def restore(self, asset: Asset) -> Asset: """Восстановить из корзины""" asset.deleted_at = None await self.session.flush() await self.session.refresh(asset) return asset async def list_trash( self, user_id: str, limit: int = 50, cursor: Optional[str] = None, ) -> list[Asset]: """Список удаленных файлов""" query = ( select(Asset) .where(Asset.user_id == user_id) .where(Asset.deleted_at.isnot(None)) .order_by(Asset.deleted_at.desc()) ) if cursor: query = query.where(Asset.id < cursor) query = query.limit(limit + 1) result = await self.session.execute(query) return list(result.scalars().all()) async def hard_delete(self, asset: Asset) -> None: """Полное удаление (безвозвратно)""" await self.session.delete(asset) await self.session.flush() ``` 5. **Добавить endpoints:** ```python # В assets.py @router.get("/trash", response_model=AssetListResponse) async def list_trash( current_user: CurrentUser, session: DatabaseSession, s3_client: S3ClientDep, limit: int = Query(50, ge=1, le=100), cursor: Optional[str] = Query(None), ): """Список файлов в корзине""" asset_service = AssetService(session, s3_client) assets, next_cursor, has_more = await asset_service.list_trash( user_id=current_user.id, limit=limit, cursor=cursor, ) return AssetListResponse( items=assets, next_cursor=next_cursor, has_more=has_more, ) @router.post("/{asset_id}/restore", response_model=AssetResponse) async def restore_asset( asset_id: str, current_user: CurrentUser, session: DatabaseSession, s3_client: S3ClientDep, ): """Восстановить файл из корзины""" asset_service = AssetService(session, s3_client) asset = await asset_service.restore_asset( user_id=current_user.id, asset_id=asset_id, ) return asset @router.delete("/{asset_id}/purge") async def purge_asset( asset_id: str, current_user: CurrentUser, session: DatabaseSession, s3_client: S3ClientDep, ): """Удалить файл безвозвратно из корзины""" asset_service = AssetService(session, s3_client) await asset_service.purge_asset( user_id=current_user.id, asset_id=asset_id, ) return {"message": "Asset permanently deleted"} ``` 6. **Обновить существующий DELETE endpoint:** ```python @router.delete("/{asset_id}") async def delete_asset( asset_id: str, current_user: CurrentUser, session: DatabaseSession, s3_client: S3ClientDep, ): """Переместить файл в корзину (мягкое удаление)""" asset_service = AssetService(session, s3_client) await asset_service.soft_delete_asset( user_id=current_user.id, asset_id=asset_id, ) return {"message": "Asset moved to trash"} ``` 7. **Обновить list_assets чтобы НЕ показывать удаленные:** ```python # В asset_repository.py async def list_by_user( self, user_id: str, folder_id: Optional[str] = None, limit: int = 50, cursor: Optional[str] = None, ) -> list[Asset]: query = ( select(Asset) .where(Asset.user_id == user_id) .where(Asset.deleted_at.is_(None)) # ← ДОБАВИТЬ .where(Asset.status == AssetStatus.READY) ) # ... rest of logic ``` 8. **Background job для очистки старых файлов:** ```python # В tasks/cleanup_tasks.py from datetime import datetime, timedelta async def cleanup_old_trash(): """Удалить файлы, которые в корзине >30 дней""" async with get_db_session() as session: thirty_days_ago = datetime.utcnow() - timedelta(days=30) assets = await session.execute( select(Asset).where( Asset.deleted_at < thirty_days_ago ) ) for asset in assets.scalars(): # Удалить из S3 await s3_client.delete_object(bucket, asset.storage_key_original) if asset.storage_key_thumb: await s3_client.delete_object(bucket, asset.storage_key_thumb) # Удалить из БД await session.delete(asset) await session.commit() ``` --- ### 6.2 Отсутствуют индексы в БД **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/domain/models.py` **Проблема:** Медленные запросы при больших объемах данных. **Решение - добавить составные индексы:** ```python # В models.py from sqlalchemy import Index class Asset(Base): __tablename__ = "assets" # ... fields ... # Составные индексы для часто используемых запросов __table_args__ = ( Index( 'ix_assets_user_folder_created', 'user_id', 'folder_id', 'created_at' ), Index( 'ix_assets_user_status', 'user_id', 'status' ), Index( 'ix_assets_user_deleted', 'user_id', 'deleted_at' ), ) class Share(Base): __tablename__ = "shares" # ... fields ... __table_args__ = ( Index( 'ix_shares_token_expires', 'token', 'expires_at' ), Index( 'ix_shares_owner_created', 'owner_user_id', 'created_at' ), ) class Folder(Base): __tablename__ = "folders" # ... fields ... __table_args__ = ( Index( 'ix_folders_user_parent', 'user_id', 'parent_folder_id' ), ) ``` **Создать миграцию:** ```bash alembic revision -m "add_composite_indexes" ``` --- ### 6.3 ZIP создается в памяти **Серьезность: ВЫСОКАЯ** 🔴 **Расположение:** `backend/src/app/services/batch_operations_service.py:206` **Проблема:** ```python zip_buffer = io.BytesIO() # Весь ZIP в памяти! with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Добавление файлов... ``` При скачивании 10GB файлов → OOM. **Решение - стримить через temp file:** ```python from app.services.batch_operations_service import temp_file_manager async def download_assets_batch( self, user_id: str, asset_ids: list[str], ) -> tuple[str, bytes]: """Скачать несколько файлов как ZIP архив""" # Использовать временный файл вместо памяти async with temp_file_manager(suffix='.zip') as temp_path: # Создать ZIP в temp файле with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: for asset_id in asset_ids: asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: continue # Стримить файл из S3 try: response = await asyncio.to_thread( self.s3_client.client.get_object, Bucket=self.config.media_bucket, Key=asset.storage_key_original, ) # Читать чанками with response['Body'] as stream: # Генерировать уникальное имя файла base_name = sanitize_filename(asset.original_filename) unique_name = self._get_unique_filename( zip_file, base_name ) # Записать в ZIP chunk by chunk with zip_file.open(unique_name, 'w') as dest: while True: chunk = stream.read(8 * 1024 * 1024) # 8MB if not chunk: break dest.write(chunk) except Exception as e: logger.warning(f"Failed to add asset {asset_id}: {e}") continue # Прочитать готовый ZIP with open(temp_path, 'rb') as f: zip_data = f.read() filename = f"assets_batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip" return filename, zip_data # Но ЛУЧШЕ стримить ответ напрямую без чтения в память: from starlette.responses import FileResponse @router.post("/batch/download") async def download_batch( data: BatchDownloadRequest, current_user: CurrentUser, session: DatabaseSession, ): """Скачать файлы как ZIP (streaming response)""" # Создать temp ZIP async with temp_file_manager(suffix='.zip') as temp_path: batch_service = BatchOperationsService(session, s3_client) # Создать ZIP в temp файле (не загружая в память) await batch_service.create_zip_file( user_id=current_user.id, asset_ids=data.asset_ids, output_path=temp_path, ) # Вернуть как streaming response filename = f"assets_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip" return FileResponse( path=temp_path, media_type='application/zip', filename=filename, background=BackgroundTask(os.unlink, temp_path), # Удалить после отправки ) ``` --- ### 6.4 Смешанные обязанности в FolderService **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/services/folder_service.py:195-223` **Проблема:** ```python # Строка 207 # TODO: Should use AssetService methods here asset.folder_id = None await self.asset_repo.update(asset) ``` FolderService напрямую работает с AssetRepository, нарушая SRP. **Решение:** ```python # Создать Orchestration Service class FolderManagementService: """Оркестрирует операции над папками и связанными ресурсами""" def __init__( self, session: AsyncSession, s3_client: S3Client, ): self.folder_service = FolderService(session) self.asset_service = AssetService(session, s3_client) async def delete_folder_with_contents( self, user_id: str, folder_id: str, delete_assets: bool = False, ) -> None: """ Удалить папку и обработать связанные assets. Args: user_id: ID пользователя folder_id: ID папки delete_assets: True - удалить assets, False - переместить в root """ # Получить все assets в папке и подпапках assets = await self.asset_service.list_assets_in_folder_recursive( user_id=user_id, folder_id=folder_id, ) if delete_assets: # Удалить все assets (soft delete) for asset in assets: await self.asset_service.soft_delete_asset( user_id=user_id, asset_id=asset.id, ) else: # Переместить в root for asset in assets: await self.asset_service.move_asset( user_id=user_id, asset_id=asset.id, target_folder_id=None, ) # Удалить папку await self.folder_service.delete_folder( user_id=user_id, folder_id=folder_id, ) ``` --- ### 6.5 Отсутствуют Foreign Keys в моделях **Серьезность: СРЕДНЯЯ** 🟡 **Расположение:** `backend/src/app/domain/models.py` **Проблема:** Нет явных relationship и foreign key constraints. **Решение:** ```python from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship class Asset(Base): __tablename__ = "assets" # ... existing fields ... user_id: Mapped[str] = mapped_column( String(36), ForeignKey("users.id", ondelete="CASCADE"), index=True ) folder_id: Mapped[Optional[str]] = mapped_column( String(36), ForeignKey("folders.id", ondelete="SET NULL"), nullable=True, index=True ) # Relationships user: Mapped["User"] = relationship("User", back_populates="assets") folder: Mapped[Optional["Folder"]] = relationship("Folder", back_populates="assets") class Folder(Base): __tablename__ = "folders" # ... existing fields ... user_id: Mapped[str] = mapped_column( String(36), ForeignKey("users.id", ondelete="CASCADE"), index=True ) parent_folder_id: Mapped[Optional[str]] = mapped_column( String(36), ForeignKey("folders.id", ondelete="CASCADE"), nullable=True, index=True ) # Relationships user: Mapped["User"] = relationship("User", back_populates="folders") parent: Mapped[Optional["Folder"]] = relationship( "Folder", remote_side="Folder.id", back_populates="children" ) children: Mapped[list["Folder"]] = relationship( "Folder", back_populates="parent", cascade="all, delete-orphan" ) assets: Mapped[list["Asset"]] = relationship( "Asset", back_populates="folder", cascade="all, delete-orphan" ) class User(Base): __tablename__ = "users" # ... existing fields ... # Relationships assets: Mapped[list["Asset"]] = relationship( "Asset", back_populates="user", cascade="all, delete-orphan" ) folders: Mapped[list["Folder"]] = relationship( "Folder", back_populates="user", cascade="all, delete-orphan" ) ``` **Миграция:** ```bash alembic revision -m "add_foreign_keys_and_relationships" ``` --- ## 📦 7. ОТСУТСТВУЮЩИЕ КЛЮЧЕВЫЕ ФУНКЦИИ ### 7.1 Password Reset Flow **Приоритет: ВЫСОКИЙ** 🔴 **Что нужно:** 1. Endpoint `/auth/forgot-password` - отправить email с токеном 2. Endpoint `/auth/reset-password` - сбросить пароль по токену 3. Email сервис (SMTP) 4. Токены сброса пароля с expiration **Пример реализации:** ```python # В models.py class PasswordResetToken(Base): __tablename__ = "password_reset_tokens" id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4())) user_id: Mapped[str] = mapped_column(String(36), ForeignKey("users.id"), index=True) token: Mapped[str] = mapped_column(String(64), unique=True, index=True) expires_at: Mapped[datetime] = mapped_column(DateTime) used_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now()) # В auth_service.py async def request_password_reset(self, email: str) -> None: """Создать токен сброса пароля и отправить email""" user = await self.user_repo.get_by_email(email) if not user: # НЕ раскрывать что пользователь не найден (безопасность) logger.info(f"Password reset requested for non-existent email: {email}") return # Создать токен token = secrets.token_urlsafe(32) expires_at = datetime.utcnow() + timedelta(hours=1) reset_token = PasswordResetToken( user_id=user.id, token=token, expires_at=expires_at, ) self.session.add(reset_token) await self.session.commit() # Отправить email reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}" await email_service.send_password_reset(user.email, reset_url) async def reset_password(self, token: str, new_password: str) -> None: """Сбросить пароль по токену""" # Найти токен result = await self.session.execute( select(PasswordResetToken).where( PasswordResetToken.token == token, PasswordResetToken.used_at.is_(None), PasswordResetToken.expires_at > datetime.utcnow(), ) ) reset_token = result.scalar_one_or_none() if not reset_token: raise HTTPException(status_code=400, detail="Invalid or expired token") # Обновить пароль user = await self.user_repo.get_by_id(reset_token.user_id) password_hash = self.password_context.hash(new_password) user.password_hash = password_hash # Пометить токен как использованный reset_token.used_at = datetime.utcnow() await self.session.commit() # Endpoints @router.post("/auth/forgot-password") async def forgot_password( data: ForgotPasswordRequest, session: DatabaseSession, ): """Запросить сброс пароля""" auth_service = AuthService(session) await auth_service.request_password_reset(data.email) # Всегда возвращать успех (не раскрывать существование email) return {"message": "If email exists, reset link has been sent"} @router.post("/auth/reset-password") async def reset_password( data: ResetPasswordRequest, session: DatabaseSession, ): """Сбросить пароль""" auth_service = AuthService(session) await auth_service.reset_password(data.token, data.new_password) return {"message": "Password reset successful"} ``` --- ### 7.2 Storage Quota Management **Приоритет: ВЫСОКИЙ** 🔴 **Что нужно:** 1. Поле `storage_quota_bytes` и `storage_used_bytes` в User 2. Проверка квоты перед загрузкой 3. Endpoint для получения статистики использования 4. Background job для пересчета использования **Реализация:** ```python # В models.py class User(Base): __tablename__ = "users" # ... existing fields ... storage_quota_bytes: Mapped[int] = mapped_column( BigInteger, default=10 * 1024 * 1024 * 1024 # 10GB по умолчанию ) storage_used_bytes: Mapped[int] = mapped_column( BigInteger, default=0, index=True ) # В asset_service.py async def create_upload( self, user_id: str, original_filename: str, content_type: str, size_bytes: int, folder_id: Optional[str] = None, ) -> tuple[Asset, dict]: # Проверить квоту ДО создания upload user = await self.user_repo.get_by_id(user_id) if user.storage_used_bytes + size_bytes > user.storage_quota_bytes: remaining = user.storage_quota_bytes - user.storage_used_bytes raise HTTPException( status_code=413, detail=f"Storage quota exceeded. " f"Available: {remaining / 1024 / 1024:.2f} MB, " f"Required: {size_bytes / 1024 / 1024:.2f} MB" ) # ... existing code ... async def finalize_upload( self, user_id: str, asset_id: str, etag: Optional[str] = None, sha256: Optional[str] = None, ) -> Asset: # ... existing code ... # Обновить использованное место user = await self.user_repo.get_by_id(user_id) user.storage_used_bytes += asset.size_bytes await self.session.commit() return asset async def soft_delete_asset( self, user_id: str, asset_id: str, ) -> Asset: # ... existing code ... # НЕ освобождать место при soft delete # Место освободится при purge return asset async def purge_asset( self, user_id: str, asset_id: str, ) -> None: """Безвозвратно удалить asset""" asset = await self.asset_repo.get_by_id(asset_id) if not asset or asset.user_id != user_id: raise HTTPException(status_code=404, detail="Asset not found") # Удалить из S3 await self.s3_client.delete_object( self.config.media_bucket, asset.storage_key_original ) if asset.storage_key_thumb: await self.s3_client.delete_object( self.config.media_bucket, asset.storage_key_thumb ) # Освободить место в квоте user = await self.user_repo.get_by_id(user_id) user.storage_used_bytes = max(0, user.storage_used_bytes - asset.size_bytes) # Удалить из БД await self.asset_repo.hard_delete(asset) await self.session.commit() # Endpoint для статистики @router.get("/users/me/storage", response_model=StorageStatsResponse) async def get_storage_stats( current_user: CurrentUser, ): """Получить статистику использования хранилища""" return StorageStatsResponse( quota_bytes=current_user.storage_quota_bytes, used_bytes=current_user.storage_used_bytes, available_bytes=current_user.storage_quota_bytes - current_user.storage_used_bytes, percentage_used=round( (current_user.storage_used_bytes / current_user.storage_quota_bytes) * 100, 2 ), ) ``` --- ### 7.3 EXIF Metadata Extraction **Приоритет: СРЕДНЯЯ** 🟡 **Реализация:** ```bash pip install pillow pillow-heif # Для изображений pip install ffmpeg-python # Для видео ``` ```python # В tasks/thumbnail_tasks.py from PIL import Image from PIL.ExifTags import TAGS import ffmpeg def extract_image_metadata(file_path: str) -> dict: """Извлечь EXIF из изображения""" try: with Image.open(file_path) as img: exif_data = img._getexif() if not exif_data: return {} metadata = {} for tag_id, value in exif_data.items(): tag = TAGS.get(tag_id, tag_id) metadata[tag] = value return { "width": img.width, "height": img.height, "captured_at": metadata.get("DateTimeOriginal"), "camera_make": metadata.get("Make"), "camera_model": metadata.get("Model"), "gps_latitude": metadata.get("GPSLatitude"), "gps_longitude": metadata.get("GPSLongitude"), } except Exception as e: logger.error(f"EXIF extraction failed: {e}") return {} def extract_video_metadata(file_path: str) -> dict: """Извлечь метаданные из видео""" try: probe = ffmpeg.probe(file_path) video_stream = next( s for s in probe['streams'] if s['codec_type'] == 'video' ) return { "width": int(video_stream.get('width', 0)), "height": int(video_stream.get('height', 0)), "duration_sec": float(probe['format'].get('duration', 0)), "codec": video_stream.get('codec_name'), "bitrate": int(probe['format'].get('bit_rate', 0)), } except Exception as e: logger.error(f"Video metadata extraction failed: {e}") return {} @celery_app.task def generate_thumbnail_and_extract_metadata(asset_id: str): """Генерировать thumbnail И извлечь metadata""" # ... existing thumbnail generation ... # Извлечь metadata if asset.type == AssetType.PHOTO: metadata = extract_image_metadata(temp_file_path) elif asset.type == AssetType.VIDEO: metadata = extract_video_metadata(temp_file_path) # Обновить asset if metadata.get("width"): asset.width = metadata["width"] if metadata.get("height"): asset.height = metadata["height"] if metadata.get("captured_at"): asset.captured_at = parse_datetime(metadata["captured_at"]) if metadata.get("duration_sec"): asset.duration_sec = metadata["duration_sec"] db.commit() ``` --- ### 7.4 Asset Search **Приоритет: ВЫСОКИЙ** 🔴 **Простая реализация с PostgreSQL full-text search:** ```python # В asset_repository.py async def search_assets( self, user_id: str, query: str, limit: int = 50, ) -> list[Asset]: """Поиск assets по имени файла""" search_query = ( select(Asset) .where(Asset.user_id == user_id) .where(Asset.deleted_at.is_(None)) .where(Asset.status == AssetStatus.READY) .where(Asset.original_filename.ilike(f"%{query}%")) .order_by(Asset.created_at.desc()) .limit(limit) ) result = await self.session.execute(search_query) return list(result.scalars().all()) # Endpoint @router.get("/assets/search", response_model=AssetListResponse) async def search_assets( q: str = Query(..., min_length=1), current_user: CurrentUser, session: DatabaseSession, s3_client: S3ClientDep, limit: int = Query(50, ge=1, le=100), ): """Поиск файлов по имени""" asset_service = AssetService(session, s3_client) assets = await asset_service.search_assets( user_id=current_user.id, query=q, limit=limit, ) return AssetListResponse( items=assets, next_cursor=None, has_more=False, ) ``` --- ### 7.5 Database Backup Strategy **Приоритет: КРИТИЧЕСКИЙ** 🔴 **Для SQLite:** ```bash #!/bin/bash # backup_db.sh BACKUP_DIR="/backups" DB_PATH="/app/data/app.db" TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="$BACKUP_DIR/backup_$TIMESTAMP.db" # Создать backup sqlite3 $DB_PATH ".backup '$BACKUP_FILE'" # Сжать gzip $BACKUP_FILE # Загрузить в S3 aws s3 cp "${BACKUP_FILE}.gz" "s3://your-backup-bucket/database/" # Удалить старые локальные backups (старше 7 дней) find $BACKUP_DIR -name "backup_*.db.gz" -mtime +7 -delete # Удалить старые S3 backups (старше 30 дней) aws s3 ls "s3://your-backup-bucket/database/" | \ awk '{print $4}' | \ while read file; do # ... deletion logic done ``` **Cron job:** ```cron # Каждые 6 часов 0 */6 * * * /app/scripts/backup_db.sh >> /var/log/backup.log 2>&1 ``` **Для PostgreSQL:** ```bash #!/bin/bash # pg_backup.sh TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="backup_$TIMESTAMP.sql.gz" # pg_dump с сжатием pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > "/tmp/$BACKUP_FILE" # Загрузить в S3 aws s3 cp "/tmp/$BACKUP_FILE" "s3://your-backup-bucket/database/" # Cleanup rm "/tmp/$BACKUP_FILE" ``` --- ## 🧪 8. ТЕСТИРОВАНИЕ ### 8.1 Текущее покрытие: <10% ❌ **Найдено только:** `backend/tests/test_security.py` **Что КРИТИЧЕСКИ нужно:** 1. **Unit тесты для services:** ```python # tests/unit/services/test_asset_service.py import pytest from unittest.mock import AsyncMock, MagicMock @pytest.mark.asyncio async def test_create_upload_exceeds_quota(): """Тест: загрузка превышает квоту""" user = User( id="user1", storage_quota_bytes=1000, storage_used_bytes=900, ) asset_service = AssetService(mock_session, mock_s3) with pytest.raises(HTTPException) as exc: await asset_service.create_upload( user_id=user.id, original_filename="large.jpg", content_type="image/jpeg", size_bytes=200, # 900 + 200 > 1000 ) assert exc.value.status_code == 413 assert "quota exceeded" in exc.value.detail.lower() ``` 2. **Integration тесты для API:** ```python # tests/integration/test_auth_api.py from httpx import AsyncClient @pytest.mark.asyncio async def test_login_rate_limiting(client: AsyncClient): """Тест: rate limiting блокирует brute force""" # 5 неудачных попыток for i in range(5): response = await client.post("/api/v1/auth/login", json={ "email": "test@example.com", "password": "wrong" }) assert response.status_code == 401 # 6-я попытка должна быть заблокирована response = await client.post("/api/v1/auth/login", json={ "email": "test@example.com", "password": "wrong" }) assert response.status_code == 429 ``` 3. **Security тесты:** ```python # tests/security/test_path_traversal.py @pytest.mark.asyncio async def test_filename_sanitization(): """Тест: path traversal защита""" malicious_filenames = [ "../../../etc/passwd", "..\\..\\..\\windows\\system32", "test\x00.jpg.exe", "folder/../../../secret.txt", ] for filename in malicious_filenames: sanitized = sanitize_filename(filename) # Проверить, что путь не содержит separators assert "/" not in sanitized assert "\\" not in sanitized assert "\x00" not in sanitized assert ".." not in sanitized ``` 4. **Load тесты:** ```python # tests/load/test_upload_performance.py from locust import HttpUser, task, between class UploadUser(HttpUser): wait_time = between(1, 3) @task def upload_file(self): # Симулировать загрузку файла files = {'file': ('test.jpg', b'fake image data', 'image/jpeg')} self.client.post("/api/v1/uploads/create", files=files) ``` **Цель:** Достичь минимум 70% code coverage --- ## 📊 9. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ### 9.1 Frontend Type Safety **Проблема:** Много `any` типов в `frontend/src/services/api.ts` **Решение:** ```typescript // Заменить все any на конкретные типы export interface Folder { id: string; user_id: string; name: string; parent_folder_id: string | null; created_at: string; updated_at: string; } export interface FolderListResponse { items: Folder[]; } export interface FolderBreadcrumb { id: string; name: string; } // В api.ts async listFolders( parentFolderId?: string | null, all: boolean = false ): Promise { // ... } async getFolder(folderId: string): Promise { // ... } async getFolderBreadcrumbs(folderId: string): Promise { // ... } ``` --- ### 9.2 Logging Strategy **Добавить structured logging:** ```python # В config.py import sys from loguru import logger # Настроить loguru logger.remove() # Удалить дефолтный handler if settings.APP_ENV == "prod": # Production: JSON формат для парсинга logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}", level="INFO", serialize=True, # JSON output ) else: # Development: красивый формат logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="DEBUG", colorize=True, ) # Логировать в файл logger.add( "logs/app.log", rotation="100 MB", retention="30 days", compression="zip", level="INFO", ) # Middleware для логирования всех requests from starlette.middleware.base import BaseHTTPMiddleware import time class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): start_time = time.time() # Логировать request logger.info( f"Request started", extra={ "method": request.method, "path": request.url.path, "client": request.client.host if request.client else None, } ) try: response = await call_next(request) # Логировать response duration = time.time() - start_time logger.info( f"Request completed", extra={ "method": request.method, "path": request.url.path, "status_code": response.status_code, "duration_ms": round(duration * 1000, 2), } ) return response except Exception as e: duration = time.time() - start_time logger.error( f"Request failed", extra={ "method": request.method, "path": request.url.path, "error": str(e), "duration_ms": round(duration * 1000, 2), } ) raise app.add_middleware(RequestLoggingMiddleware) ``` --- ### 9.3 Environment Validation **Проверять критичные env vars при старте:** ```python # В main.py from app.infra.config import settings def validate_production_config(): """Проверить конфигурацию для продакшена""" errors = [] if settings.APP_ENV == "prod": # Проверить JWT secret if settings.JWT_SECRET in ["your-secret-key-change-this-in-production", "changeme"]: errors.append("JWT_SECRET must be changed in production") # Проверить S3 конфигурацию if not settings.S3_ENDPOINT_URL or "localhost" in settings.S3_ENDPOINT_URL: errors.append("S3_ENDPOINT_URL must be set to production S3") # Проверить database if "sqlite" in settings.DATABASE_URL.lower(): logger.warning("Using SQLite in production - consider PostgreSQL") # Проверить CORS if "*" in settings.CORS_ORIGINS: errors.append("CORS_ORIGINS should not contain '*' in production") if errors: logger.error("Production configuration errors:") for error in errors: logger.error(f" - {error}") raise ValueError("Invalid production configuration") @app.on_event("startup") async def startup(): validate_production_config() logger.info(f"Application started in {settings.APP_ENV} mode") ``` --- ## 🎯 10. ROADMAP ПО ПРИОРИТЕТАМ ### ФАЗА 1: Критическая безопасность (1-2 недели) 🔴 **Блокирует продакшен** 1. ✅ Implement rate limiting (slowapi) 2. ✅ Add token revocation mechanism (Redis blacklist) 3. ✅ Implement refresh token rotation 4. ✅ Add storage quota management 5. ✅ Enable S3 server-side encryption 6. ✅ Fix memory exhaustion in file uploads (streaming) 7. ✅ Add strong password validation 8. ✅ Implement account lockout after failed attempts 9. ✅ Reduce error message verbosity 10. ✅ Add security headers middleware **Результат:** Приложение защищено от базовых атак --- ### ФАЗА 2: Core Features (2-3 недели) 🟡 **Необходимо для MVP** 1. ✅ Implement soft delete / trash functionality (SPEC REQUIREMENT) 2. ✅ Add password reset flow 3. ✅ Implement multipart upload для больших файлов 4. ✅ Add asset search by filename 5. ✅ Extract EXIF metadata (captured_at, dimensions) 6. ✅ Add database indexes для производительности 7. ✅ Fix ZIP creation (stream instead of memory) 8. ✅ Add foreign key constraints **Результат:** Полнофункциональное MVP --- ### ФАЗА 3: Production Readiness (2-3 недели) 🟢 **DevOps и мониторинг** 1. ✅ Comprehensive test suite (70%+ coverage) 2. ✅ Database backup automation 3. ✅ Monitoring & alerting setup (Prometheus + Grafana) 4. ✅ Error tracking (Sentry integration) 5. ✅ CI/CD pipeline (GitHub Actions) 6. ✅ Environment configuration validation 7. ✅ Structured logging (loguru + JSON) 8. ✅ Documentation completion 9. ✅ Load testing (Locust) 10. ✅ Security penetration testing **Результат:** Готово к продакшену --- ### ФАЗА 4: Enhancements (постоянно) 💚 **Улучшения UX** 1. Video transcoding & adaptive streaming (HLS) 2. Albums feature 3. Tags system 4. Advanced search & filters (date range, type, size) 5. Share analytics & permissions 6. Image optimization (WebP conversion) 7. Duplicate detection (SHA256 deduplication) 8. Two-factor authentication (TOTP) 9. Email verification 10. Shared albums (collaborative) **Результат:** Feature-rich продукт --- ## 📋 11. SUMMARY: КРИТИЧЕСКИЕ ДЕЙСТВИЯ ### Что сделать ПРЯМО СЕЙЧАС перед продакшеном: 1. **Установить rate limiting** - защита от brute force 2. **Реализовать soft delete** - требование спецификации 3. **Добавить storage quota** - предотвратить злоупотребление 4. **Включить S3 encryption** - защита данных в покое 5. **Стримить файлы** - предотвратить OOM 6. **Добавить тесты** - минимум 50% coverage 7. **Настроить backups** - защита от потери данных 8. **Валидировать env vars** - предотвратить misconfig ### Оценка времени: - **Минимум для продакшена:** 4 недели (Фаза 1 + критичное из Фазы 2) - **Полное MVP:** 6-8 недель (Фазы 1-3) - **Production-grade:** 10-12 недель (все фазы) --- ## 🎓 ЗАКЛЮЧЕНИЕ Проект **ITCloud** демонстрирует хорошую архитектурную основу и правильные практики в некоторых областях: ✅ **Что сделано хорошо:** - Clean Architecture с четким разделением слоев - Безопасное хеширование паролей (Argon2) - Защита от path traversal - JWT аутентификация - Pre-signed URLs для S3 - Async/await паттерны - Docker setup ❌ **Критические проблемы:** - Отсутствует rate limiting (BRUTE FORCE VULNERABLE) - Нет soft delete (SPEC VIOLATION) - Нет storage quota (ABUSE VULNERABLE) - Memory exhaustion риски - Минимальное тестирование - Нет token revocation **ИТОГОВАЯ ОЦЕНКА:** 6/10 (хорошая база, но не готов к продакшену) **РЕКОМЕНДАЦИЯ:** Выполнить Фазу 1 и критичные элементы Фазы 2 перед любым публичным запуском. --- **Дата аудита:** 2026-01-05 **Версия документа:** 1.0 **Следующий аудит:** После внедрения рекомендаций Фазы 1