72 KiB
ITCloud - Полный Аудит Безопасности и Рекомендации
Дата: 2026-01-05 Аудитор: Claude Code (Senior Security & Architecture Review) Версия: 1.0
📋 EXECUTIVE SUMMARY
Проведен комплексный аудит проекта ITCloud (облачное хранилище фото/видео). Проект демонстрирует хорошую архитектуру (Clean Architecture) и некоторые правильные практики безопасности, но имеет критические уязвимости и отсутствуют ключевые функции.
Статус готовности к продакшену: ❌ НЕ ГОТОВ
Критические блокеры:
- ❌ Отсутствует rate limiting (защита от brute force)
- ❌ Нет soft delete/корзины (указано в требованиях)
- ❌ Отсутствует управление квотами хранилища
- ❌ Нет механизма отзыва токенов
- ❌ Отсутствует стратегия бэкапов БД
- ❌ Минимальное покрытие тестами (<10%)
Оценка времени до готовности к продакшену: 6-8 недель
🔴 1. КРИТИЧЕСКИЕ УЯЗВИМОСТИ БЕЗОПАСНОСТИ
1.1 Отсутствует Rate Limiting
Серьезность: КРИТИЧЕСКАЯ 🔴
Расположение: Все API endpoints
Файлы: backend/src/app/main.py
Проблема:
- Нет ограничений на количество запросов
- Атакующий может делать неограниченное количество попыток логина
- Возможна DoS атака через перегрузку API
Риски:
- Brute force атаки на пароли пользователей
- Credential stuffing атаки
- Исчерпание ресурсов сервера
- Спам загрузок файлов
Решение:
# Установить: pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Для каждого endpoint:
@router.post("/login")
@limiter.limit("5/minute") # 5 попыток в минуту
async def login(...):
...
@router.post("/uploads/create")
@limiter.limit("100/hour") # 100 загрузок в час
async def create_upload(...):
...
Рекомендуемые лимиты:
/auth/login: 5 запросов/минуту/auth/register: 3 запроса/час/uploads/create: 100 запросов/час- Остальные endpoints: 1000 запросов/час
1.2 JWT Secret по умолчанию слабый
Серьезность: КРИТИЧЕСКАЯ 🔴
Расположение: backend/.env.example:18
Проблема:
JWT_SECRET=your-secret-key-change-this-in-production
Если пользователь задеплоит с дефолтным секретом, токены можно подделать.
Решение:
- Добавить валидацию при старте:
# В config.py
if settings.APP_ENV == "prod" and settings.JWT_SECRET in [
"your-secret-key-change-this-in-production",
"changeme",
"secret",
]:
raise ValueError("КРИТИЧЕСКАЯ ОШИБКА: Используется слабый JWT_SECRET в продакшене!")
- Генерировать случайный секрет при деплое:
import secrets
print(secrets.token_urlsafe(64))
1.3 Отсутствует ограничение размера при чтении файла в память
Серьезность: ВЫСОКАЯ 🔴
Расположение: backend/src/app/services/asset_service.py:138-182
Проблема:
# Строка 151
file_data = await file.read() # Читает ВЕСЬ файл в память!
Атакующий может загрузить огромный файл и вызвать OOM (Out Of Memory).
Решение: Стримить файл чанками в S3:
async def upload_file_to_s3(
self,
user_id: str,
asset_id: str,
file: UploadFile,
) -> Asset:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Проверить размер ПЕРЕД чтением
if file.size and file.size > self.config.max_upload_size_bytes:
raise HTTPException(status_code=413, detail="File too large")
# Стрим в S3 чанками
try:
await self.s3_client.upload_fileobj_streaming(
file.file, # file-like object
self.config.media_bucket,
asset.storage_key_original,
content_type=asset.content_type,
)
except Exception as e:
logger.error(f"S3 upload failed: {e}")
raise HTTPException(status_code=500, detail="Upload failed")
Добавить в S3Client:
async def upload_fileobj_streaming(
self,
fileobj,
bucket: str,
key: str,
content_type: str,
chunk_size: int = 8 * 1024 * 1024, # 8MB chunks
):
"""Stream file to S3 in chunks"""
try:
await asyncio.to_thread(
self.client.upload_fileobj,
fileobj,
bucket,
key,
ExtraArgs={
'ContentType': content_type,
'ServerSideEncryption': 'AES256', # Важно!
},
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=chunk_size,
multipart_chunksize=chunk_size,
),
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
1.4 Отсутствует CSRF защита
Серьезность: ВЫСОКАЯ 🟡 (частично смягчено Bearer токенами)
Расположение: backend/src/app/main.py
Текущее состояние:
- Используются Bearer токены в заголовках → CSRF не критичен
- НО: если в будущем перейдете на cookie-based auth → станет критично
Рекомендация на будущее:
from fastapi_csrf_protect import CsrfProtect
@app.post("/api/v1/assets/{asset_id}/delete")
async def delete_asset(
asset_id: str,
csrf_protect: CsrfProtect = Depends(),
):
await csrf_protect.validate_csrf(request)
# ...
1.5 Небезопасная генерация share токенов
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/repositories/share_repository.py:25-27
Проблема:
token = secrets.token_urlsafe(32) # Хорошо!
# НО нет проверки на коллизии
Решение:
async def create(
self,
owner_user_id: str,
asset_id: Optional[str] = None,
album_id: Optional[str] = None,
expires_at: Optional[datetime] = None,
password_hash: Optional[str] = None,
) -> Share:
# Генерировать с проверкой уникальности
max_attempts = 5
for attempt in range(max_attempts):
token = secrets.token_urlsafe(32)
# Проверить, что токен не существует
existing = await self.session.execute(
select(Share).where(Share.token == token)
)
if not existing.scalar_one_or_none():
break
else:
raise RuntimeError("Failed to generate unique share token")
share = Share(
owner_user_id=owner_user_id,
asset_id=asset_id,
album_id=album_id,
token=token,
expires_at=expires_at,
password_hash=password_hash,
)
self.session.add(share)
await self.session.flush()
await self.session.refresh(share)
return share
🟡 2. ПРОБЛЕМЫ АУТЕНТИФИКАЦИИ И АВТОРИЗАЦИИ
2.1 Отсутствует механизм отзыва токенов
Серьезность: ВЫСОКАЯ 🔴
Расположение: backend/src/app/infra/security.py
Проблема:
- JWT токены включают
jti(token ID), но нет механизма отзыва - Если токен скомпрометирован, он остается валидным до истечения срока (15 минут для access, 14 дней для refresh)
- Пользователь не может "выйти со всех устройств"
Решение с Redis:
# В infra/redis_client.py
from redis.asyncio import Redis
class TokenBlacklist:
def __init__(self, redis: Redis):
self.redis = redis
async def revoke_token(self, jti: str, ttl_seconds: int):
"""Добавить токен в blacklist"""
await self.redis.setex(f"blacklist:{jti}", ttl_seconds, "1")
async def is_revoked(self, jti: str) -> bool:
"""Проверить, отозван ли токен"""
return await self.redis.exists(f"blacklist:{jti}") > 0
# В security.py
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
) -> User:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti: str = payload.get("jti")
# Проверить blacklist
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
# ... остальная логика
Добавить endpoint:
@router.post("/auth/logout")
async def logout(
current_user: CurrentUser,
token: str = Depends(oauth2_scheme),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Отозвать текущий токен"""
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti = payload.get("jti")
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
return {"message": "Logged out successfully"}
2.2 Нет refresh token rotation
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/api/v1/auth.py
Проблема:
- Нет endpoint для обновления access token через refresh token
- Пользователи должны логиниться заново каждые 15 минут
Решение:
@router.post("/auth/refresh", response_model=AuthTokens)
async def refresh_token(
refresh_token: str = Body(..., embed=True),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Обновить access token используя refresh token"""
try:
# Декодировать refresh token
payload = jwt.decode(
refresh_token,
settings.JWT_SECRET,
algorithms=["HS256"]
)
# Проверить тип токена
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
# Проверить blacklist
jti = payload.get("jti")
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
user_id = payload.get("sub")
# Получить пользователя
user_repo = UserRepository(db)
user = await user_repo.get_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found")
# ВАЖНО: Отозвать старый refresh token (rotation)
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
# Сгенерировать новые токены
auth_service = AuthService(db)
tokens = auth_service.create_tokens(user)
return tokens
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
Frontend:
// В api.ts
async refreshAccessToken(): Promise<AuthTokens> {
const refreshToken = localStorage.getItem('refresh_token');
if (!refreshToken) {
throw new Error('No refresh token');
}
const { data } = await this.client.post('/auth/refresh', {
refresh_token: refreshToken,
});
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
return data;
}
// Добавить в interceptor
this.client.interceptors.response.use(
(response) => response,
async (error) => {
const originalRequest = error.config;
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
try {
await this.refreshAccessToken();
return this.client(originalRequest);
} catch (refreshError) {
// Refresh failed, redirect to login
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
if (!['/login', '/register'].includes(window.location.pathname)) {
window.location.href = '/login';
}
return Promise.reject(refreshError);
}
}
return Promise.reject(error);
}
);
2.3 Слабая валидация паролей
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/api/schemas.py:16
Текущее состояние:
password: str = Field(min_length=8)
Можно установить пароль "12345678" ❌
Решение:
import re
from pydantic import field_validator
class UserRegisterRequest(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""Проверка сложности пароля"""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
# Проверки
has_upper = re.search(r'[A-Z]', v)
has_lower = re.search(r'[a-z]', v)
has_digit = re.search(r'\d', v)
has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', v)
checks_passed = sum([
bool(has_upper),
bool(has_lower),
bool(has_digit),
bool(has_special),
])
if checks_passed < 3:
raise ValueError(
"Password must contain at least 3 of: "
"uppercase letter, lowercase letter, digit, special character"
)
# Проверка на распространенные пароли
common_passwords = [
"password", "12345678", "qwerty123", "admin123"
]
if v.lower() in common_passwords:
raise ValueError("This password is too common")
return v
2.4 Нет блокировки аккаунта после неудачных попыток
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/services/auth_service.py:55-84
Проблема: Можно делать бесконечные попытки логина (если нет rate limiting)
Решение с Redis:
class LoginAttemptTracker:
def __init__(self, redis: Redis):
self.redis = redis
self.max_attempts = 5
self.lockout_duration = 900 # 15 минут
async def record_failed_attempt(self, email: str):
"""Записать неудачную попытку"""
key = f"login_attempts:{email}"
attempts = await self.redis.incr(key)
if attempts == 1:
# Установить TTL на первую попытку
await self.redis.expire(key, 3600) # 1 час
if attempts >= self.max_attempts:
# Заблокировать аккаунт
await self.redis.setex(
f"account_locked:{email}",
self.lockout_duration,
"1"
)
async def clear_attempts(self, email: str):
"""Очистить счетчик после успешного логина"""
await self.redis.delete(f"login_attempts:{email}")
async def is_locked(self, email: str) -> bool:
"""Проверить, заблокирован ли аккаунт"""
return await self.redis.exists(f"account_locked:{email}") > 0
async def get_lockout_remaining(self, email: str) -> int:
"""Получить оставшееся время блокировки (секунды)"""
return await self.redis.ttl(f"account_locked:{email}")
# В auth.py
@router.post("/login", response_model=AuthResponse)
async def login(
data: UserLoginRequest,
session: DatabaseSession,
tracker: LoginAttemptTracker = Depends(get_login_tracker),
):
# Проверить блокировку
if await tracker.is_locked(data.email):
remaining = await tracker.get_lockout_remaining(data.email)
raise HTTPException(
status_code=429,
detail=f"Account locked due to too many failed attempts. "
f"Try again in {remaining // 60} minutes."
)
auth_service = AuthService(session)
try:
user, tokens = await auth_service.login(data.email, data.password)
# Успешный логин - очистить счетчик
await tracker.clear_attempts(data.email)
return AuthResponse(user=user, tokens=tokens)
except HTTPException as e:
if e.status_code == 401:
# Неудачная попытка
await tracker.record_failed_attempt(data.email)
raise
🔍 3. ПРОБЛЕМЫ ВАЛИДАЦИИ ВХОДНЫХ ДАННЫХ
3.1 Недостаточная валидация Content-Type
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/api/schemas.py:85-91
Текущее состояние:
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
if not (v.startswith("image/") or v.startswith("video/")):
raise ValueError("Only image/* and video/* content types are supported")
return v
Принимает ЛЮБОЙ image/* или video/* тип, включая опасные (SVG с JS, вредоносные кодеки)
Решение - whitelist:
ALLOWED_IMAGE_TYPES = {
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
"image/heic",
"image/heif",
}
ALLOWED_VIDEO_TYPES = {
"video/mp4",
"video/mpeg",
"video/quicktime", # .mov
"video/x-msvideo", # .avi
"video/x-matroska", # .mkv
"video/webm",
}
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
v = v.lower().strip()
if v not in ALLOWED_IMAGE_TYPES and v not in ALLOWED_VIDEO_TYPES:
raise ValueError(
f"Content type '{v}' not supported. "
f"Allowed: {', '.join(ALLOWED_IMAGE_TYPES | ALLOWED_VIDEO_TYPES)}"
)
return v
3.2 Отсутствует проверка "магических байтов"
Серьезность: СРЕДНЯЯ 🟡 Расположение: Upload flow
Проблема: Полагаемся на content-type от клиента. Вредоносный файл может притвориться изображением.
Решение с python-magic:
pip install python-magic-bin # Windows
pip install python-magic # Linux/Mac
import magic
async def verify_file_type(file: UploadFile, expected_type: str) -> bool:
"""Проверить реальный тип файла по магическим байтам"""
# Прочитать первые 2048 байт
header = await file.read(2048)
await file.seek(0) # Вернуться в начало
# Определить MIME type
mime = magic.from_buffer(header, mime=True)
# Проверить соответствие
if expected_type.startswith("image/"):
return mime in ALLOWED_IMAGE_TYPES
elif expected_type.startswith("video/"):
return mime in ALLOWED_VIDEO_TYPES
return False
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# ... существующий код ...
# Добавить в метаданные для проверки при finalize
asset.metadata = {
"expected_content_type": content_type,
"needs_verification": True,
}
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... существующий код ...
# Проверить магические байты если требуется
if asset.metadata.get("needs_verification"):
# Загрузить первые байты из S3
try:
response = await self.s3_client.get_object_range(
self.config.media_bucket,
asset.storage_key_original,
bytes_range=(0, 2047),
)
header = response['Body'].read()
mime = magic.from_buffer(header, mime=True)
expected = asset.metadata.get("expected_content_type")
# Проверить соответствие
if (expected.startswith("image/") and mime not in ALLOWED_IMAGE_TYPES) or \
(expected.startswith("video/") and mime not in ALLOWED_VIDEO_TYPES):
# Удалить файл из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
asset.status = AssetStatus.FAILED
await self.asset_repo.update(asset)
raise HTTPException(
status_code=400,
detail="File type verification failed"
)
except Exception as e:
logger.warning(f"File verification failed: {e}")
3.3 Защита от Path Traversal реализована ✅
Серьезность: N/A (ХОРОШО) ✅
Расположение: backend/src/app/services/asset_service.py:23-50
Статус: Правильно реализовано:
- Используется
os.path.basename() - Удаляются path separators и null bytes
- Ограничение длины имени файла
Это правильная реализация. Оставить как есть.
🗄️ 4. БЕЗОПАСНОСТЬ S3 ХРАНИЛИЩА
4.1 Отсутствует шифрование S3 объектов
Серьезность: СРЕДНЯЯ-ВЫСОКАЯ 🟡
Расположение: backend/src/app/infra/s3_client.py
Проблема: Файлы хранятся в S3 без шифрования на стороне сервера.
Решение:
# Во ВСЕХ методах загрузки добавить ServerSideEncryption
async def upload_object(
self,
bucket: str,
key: str,
data: bytes,
content_type: str,
) -> None:
try:
await asyncio.to_thread(
self.client.put_object,
Bucket=bucket,
Key=key,
Body=data,
ContentType=content_type,
ServerSideEncryption='AES256', # ← ДОБАВИТЬ
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
# Для presigned URLs тоже нужно добавить
def generate_presigned_post(
self,
bucket: str,
key: str,
content_type: str,
max_size_bytes: int,
expires_in: int = 600,
) -> dict:
try:
return self.client.generate_presigned_post(
Bucket=bucket,
Key=key,
Fields={
"Content-Type": content_type,
"x-amz-server-side-encryption": "AES256", # ← ДОБАВИТЬ
},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, max_size_bytes],
{"x-amz-server-side-encryption": "AES256"}, # ← ДОБАВИТЬ
],
ExpiresIn=expires_in,
)
except ClientError as e:
logger.error(f"Presigned POST generation error: {e}")
raise
Важно: Также настроить bucket policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnencryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::your-bucket-name/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
}
]
}
4.2 Hardcoded bucket name для корзины
Серьезность: НИЗКАЯ 🟢
Расположение: backend/src/app/infra/s3_client.py:28, :152
Проблема:
TRASH_BUCKET = "itcloud-trash" # Hardcoded
Решение:
# В config.py
class Settings(BaseSettings):
# ... existing ...
MEDIA_BUCKET: str
TRASH_BUCKET: str = "itcloud-trash" # default, но можно переопределить
# В s3_client.py
def __init__(self, config: Settings):
self.config = config
# ... existing ...
self.trash_bucket = config.TRASH_BUCKET # Использовать из конфига
4.3 Слишком длинный TTL для pre-signed URLs
Серьезность: НИЗКАЯ 🟢
Расположение: backend/src/app/infra/config.py:43
Текущее:
SIGNED_URL_TTL_SECONDS: int = 600 # 10 минут
Рекомендация: Для продакшена уменьшить до 300 секунд (5 минут) для чувствительного контента:
SIGNED_URL_TTL_SECONDS: int = Field(default=300, ge=60, le=3600)
🔐 5. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ПО БЕЗОПАСНОСТИ
5.1 CORS Headers слишком permissive
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/main.py:34-37
Проблема:
allow_headers=["*"], # Разрешены ВСЕ заголовки
expose_headers=["*"], # Экспортируются ВСЕ заголовки
Решение:
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS.split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
# Whitelist конкретных заголовков
allow_headers=[
"Authorization",
"Content-Type",
"X-Requested-With",
"Accept",
],
# Экспортировать только нужные
expose_headers=[
"Content-Length",
"Content-Type",
"X-Total-Count",
],
max_age=3600,
)
5.2 Добавить Security Headers
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/main.py
Добавить middleware для security headers:
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
# Защита от clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Защита от XSS
response.headers["X-Content-Type-Options"] = "nosniff"
# Защита от XSS для старых браузеров
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"img-src 'self' data: https:; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline';"
)
# HSTS (если используется HTTPS)
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
5.3 Токены в localStorage (Frontend)
Серьезность: СРЕДНЯЯ 🟡
Расположение: frontend/src/services/api.ts:30-34, :62-64
Проблема: localStorage доступен для XSS атак.
Альтернативы:
- httpOnly cookies для refresh token (лучше всего)
- Memory storage для access token (но теряется при refresh страницы)
Компромиссное решение:
// Хранить refresh token в httpOnly cookie (настроить на бэкенде)
// Access token держать в памяти
class TokenManager {
private accessToken: string | null = null;
setAccessToken(token: string) {
this.accessToken = token;
// Также можно в sessionStorage (лучше чем localStorage)
sessionStorage.setItem('access_token', token);
}
getAccessToken(): string | null {
if (this.accessToken) return this.accessToken;
return sessionStorage.getItem('access_token');
}
clearTokens() {
this.accessToken = null;
sessionStorage.removeItem('access_token');
// Refresh token будет удален через httpOnly cookie с бэкенда
}
}
🏗️ 6. АРХИТЕКТУРНЫЕ УЛУЧШЕНИЯ
6.1 Отсутствует Soft Delete для Assets ❌
Серьезность: КРИТИЧЕСКАЯ 🔴
Расположение: backend/src/app/domain/models.py
Проблема: В спецификации (CLAUDE.md) указано:
"Soft delete to trash with restore capability"
Но в модели Asset нет поля deleted_at:
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# НЕТ: deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
Решение:
- Создать миграцию:
cd backend
alembic revision -m "add_soft_delete_to_assets"
- Добавить в миграцию:
def upgrade() -> None:
op.add_column('assets', sa.Column('deleted_at', sa.DateTime(), nullable=True))
op.create_index('ix_assets_deleted_at', 'assets', ['deleted_at'])
def downgrade() -> None:
op.drop_index('ix_assets_deleted_at', table_name='assets')
op.drop_column('assets', 'deleted_at')
- Обновить модель:
from typing import Optional
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import DateTime
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True, index=True, default=None
)
- Добавить методы в AssetRepository:
async def soft_delete(self, asset: Asset) -> Asset:
"""Мягкое удаление (в корзину)"""
asset.deleted_at = datetime.utcnow()
await self.session.flush()
await self.session.refresh(asset)
return asset
async def restore(self, asset: Asset) -> Asset:
"""Восстановить из корзины"""
asset.deleted_at = None
await self.session.flush()
await self.session.refresh(asset)
return asset
async def list_trash(
self,
user_id: str,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
"""Список удаленных файлов"""
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.isnot(None))
.order_by(Asset.deleted_at.desc())
)
if cursor:
query = query.where(Asset.id < cursor)
query = query.limit(limit + 1)
result = await self.session.execute(query)
return list(result.scalars().all())
async def hard_delete(self, asset: Asset) -> None:
"""Полное удаление (безвозвратно)"""
await self.session.delete(asset)
await self.session.flush()
- Добавить endpoints:
# В assets.py
@router.get("/trash", response_model=AssetListResponse)
async def list_trash(
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
cursor: Optional[str] = Query(None),
):
"""Список файлов в корзине"""
asset_service = AssetService(session, s3_client)
assets, next_cursor, has_more = await asset_service.list_trash(
user_id=current_user.id,
limit=limit,
cursor=cursor,
)
return AssetListResponse(
items=assets,
next_cursor=next_cursor,
has_more=has_more,
)
@router.post("/{asset_id}/restore", response_model=AssetResponse)
async def restore_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Восстановить файл из корзины"""
asset_service = AssetService(session, s3_client)
asset = await asset_service.restore_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return asset
@router.delete("/{asset_id}/purge")
async def purge_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Удалить файл безвозвратно из корзины"""
asset_service = AssetService(session, s3_client)
await asset_service.purge_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset permanently deleted"}
- Обновить существующий DELETE endpoint:
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Переместить файл в корзину (мягкое удаление)"""
asset_service = AssetService(session, s3_client)
await asset_service.soft_delete_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset moved to trash"}
- Обновить list_assets чтобы НЕ показывать удаленные:
# В asset_repository.py
async def list_by_user(
self,
user_id: str,
folder_id: Optional[str] = None,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None)) # ← ДОБАВИТЬ
.where(Asset.status == AssetStatus.READY)
)
# ... rest of logic
- Background job для очистки старых файлов:
# В tasks/cleanup_tasks.py
from datetime import datetime, timedelta
async def cleanup_old_trash():
"""Удалить файлы, которые в корзине >30 дней"""
async with get_db_session() as session:
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
assets = await session.execute(
select(Asset).where(
Asset.deleted_at < thirty_days_ago
)
)
for asset in assets.scalars():
# Удалить из S3
await s3_client.delete_object(bucket, asset.storage_key_original)
if asset.storage_key_thumb:
await s3_client.delete_object(bucket, asset.storage_key_thumb)
# Удалить из БД
await session.delete(asset)
await session.commit()
6.2 Отсутствуют индексы в БД
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/domain/models.py
Проблема: Медленные запросы при больших объемах данных.
Решение - добавить составные индексы:
# В models.py
from sqlalchemy import Index
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# Составные индексы для часто используемых запросов
__table_args__ = (
Index(
'ix_assets_user_folder_created',
'user_id', 'folder_id', 'created_at'
),
Index(
'ix_assets_user_status',
'user_id', 'status'
),
Index(
'ix_assets_user_deleted',
'user_id', 'deleted_at'
),
)
class Share(Base):
__tablename__ = "shares"
# ... fields ...
__table_args__ = (
Index(
'ix_shares_token_expires',
'token', 'expires_at'
),
Index(
'ix_shares_owner_created',
'owner_user_id', 'created_at'
),
)
class Folder(Base):
__tablename__ = "folders"
# ... fields ...
__table_args__ = (
Index(
'ix_folders_user_parent',
'user_id', 'parent_folder_id'
),
)
Создать миграцию:
alembic revision -m "add_composite_indexes"
6.3 ZIP создается в памяти
Серьезность: ВЫСОКАЯ 🔴
Расположение: backend/src/app/services/batch_operations_service.py:206
Проблема:
zip_buffer = io.BytesIO() # Весь ZIP в памяти!
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Добавление файлов...
При скачивании 10GB файлов → OOM.
Решение - стримить через temp file:
from app.services.batch_operations_service import temp_file_manager
async def download_assets_batch(
self,
user_id: str,
asset_ids: list[str],
) -> tuple[str, bytes]:
"""Скачать несколько файлов как ZIP архив"""
# Использовать временный файл вместо памяти
async with temp_file_manager(suffix='.zip') as temp_path:
# Создать ZIP в temp файле
with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for asset_id in asset_ids:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
continue
# Стримить файл из S3
try:
response = await asyncio.to_thread(
self.s3_client.client.get_object,
Bucket=self.config.media_bucket,
Key=asset.storage_key_original,
)
# Читать чанками
with response['Body'] as stream:
# Генерировать уникальное имя файла
base_name = sanitize_filename(asset.original_filename)
unique_name = self._get_unique_filename(
zip_file,
base_name
)
# Записать в ZIP chunk by chunk
with zip_file.open(unique_name, 'w') as dest:
while True:
chunk = stream.read(8 * 1024 * 1024) # 8MB
if not chunk:
break
dest.write(chunk)
except Exception as e:
logger.warning(f"Failed to add asset {asset_id}: {e}")
continue
# Прочитать готовый ZIP
with open(temp_path, 'rb') as f:
zip_data = f.read()
filename = f"assets_batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return filename, zip_data
# Но ЛУЧШЕ стримить ответ напрямую без чтения в память:
from starlette.responses import FileResponse
@router.post("/batch/download")
async def download_batch(
data: BatchDownloadRequest,
current_user: CurrentUser,
session: DatabaseSession,
):
"""Скачать файлы как ZIP (streaming response)"""
# Создать temp ZIP
async with temp_file_manager(suffix='.zip') as temp_path:
batch_service = BatchOperationsService(session, s3_client)
# Создать ZIP в temp файле (не загружая в память)
await batch_service.create_zip_file(
user_id=current_user.id,
asset_ids=data.asset_ids,
output_path=temp_path,
)
# Вернуть как streaming response
filename = f"assets_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return FileResponse(
path=temp_path,
media_type='application/zip',
filename=filename,
background=BackgroundTask(os.unlink, temp_path), # Удалить после отправки
)
6.4 Смешанные обязанности в FolderService
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/services/folder_service.py:195-223
Проблема:
# Строка 207
# TODO: Should use AssetService methods here
asset.folder_id = None
await self.asset_repo.update(asset)
FolderService напрямую работает с AssetRepository, нарушая SRP.
Решение:
# Создать Orchestration Service
class FolderManagementService:
"""Оркестрирует операции над папками и связанными ресурсами"""
def __init__(
self,
session: AsyncSession,
s3_client: S3Client,
):
self.folder_service = FolderService(session)
self.asset_service = AssetService(session, s3_client)
async def delete_folder_with_contents(
self,
user_id: str,
folder_id: str,
delete_assets: bool = False,
) -> None:
"""
Удалить папку и обработать связанные assets.
Args:
user_id: ID пользователя
folder_id: ID папки
delete_assets: True - удалить assets, False - переместить в root
"""
# Получить все assets в папке и подпапках
assets = await self.asset_service.list_assets_in_folder_recursive(
user_id=user_id,
folder_id=folder_id,
)
if delete_assets:
# Удалить все assets (soft delete)
for asset in assets:
await self.asset_service.soft_delete_asset(
user_id=user_id,
asset_id=asset.id,
)
else:
# Переместить в root
for asset in assets:
await self.asset_service.move_asset(
user_id=user_id,
asset_id=asset.id,
target_folder_id=None,
)
# Удалить папку
await self.folder_service.delete_folder(
user_id=user_id,
folder_id=folder_id,
)
6.5 Отсутствуют Foreign Keys в моделях
Серьезность: СРЕДНЯЯ 🟡
Расположение: backend/src/app/domain/models.py
Проблема: Нет явных relationship и foreign key constraints.
Решение:
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="SET NULL"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="assets")
folder: Mapped[Optional["Folder"]] = relationship("Folder", back_populates="assets")
class Folder(Base):
__tablename__ = "folders"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
parent_folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="CASCADE"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="folders")
parent: Mapped[Optional["Folder"]] = relationship(
"Folder",
remote_side="Folder.id",
back_populates="children"
)
children: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="parent",
cascade="all, delete-orphan"
)
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="folder",
cascade="all, delete-orphan"
)
class User(Base):
__tablename__ = "users"
# ... existing fields ...
# Relationships
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="user",
cascade="all, delete-orphan"
)
folders: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="user",
cascade="all, delete-orphan"
)
Миграция:
alembic revision -m "add_foreign_keys_and_relationships"
📦 7. ОТСУТСТВУЮЩИЕ КЛЮЧЕВЫЕ ФУНКЦИИ
7.1 Password Reset Flow
Приоритет: ВЫСОКИЙ 🔴
Что нужно:
- Endpoint
/auth/forgot-password- отправить email с токеном - Endpoint
/auth/reset-password- сбросить пароль по токену - Email сервис (SMTP)
- Токены сброса пароля с expiration
Пример реализации:
# В models.py
class PasswordResetToken(Base):
__tablename__ = "password_reset_tokens"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4()))
user_id: Mapped[str] = mapped_column(String(36), ForeignKey("users.id"), index=True)
token: Mapped[str] = mapped_column(String(64), unique=True, index=True)
expires_at: Mapped[datetime] = mapped_column(DateTime)
used_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# В auth_service.py
async def request_password_reset(self, email: str) -> None:
"""Создать токен сброса пароля и отправить email"""
user = await self.user_repo.get_by_email(email)
if not user:
# НЕ раскрывать что пользователь не найден (безопасность)
logger.info(f"Password reset requested for non-existent email: {email}")
return
# Создать токен
token = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(hours=1)
reset_token = PasswordResetToken(
user_id=user.id,
token=token,
expires_at=expires_at,
)
self.session.add(reset_token)
await self.session.commit()
# Отправить email
reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}"
await email_service.send_password_reset(user.email, reset_url)
async def reset_password(self, token: str, new_password: str) -> None:
"""Сбросить пароль по токену"""
# Найти токен
result = await self.session.execute(
select(PasswordResetToken).where(
PasswordResetToken.token == token,
PasswordResetToken.used_at.is_(None),
PasswordResetToken.expires_at > datetime.utcnow(),
)
)
reset_token = result.scalar_one_or_none()
if not reset_token:
raise HTTPException(status_code=400, detail="Invalid or expired token")
# Обновить пароль
user = await self.user_repo.get_by_id(reset_token.user_id)
password_hash = self.password_context.hash(new_password)
user.password_hash = password_hash
# Пометить токен как использованный
reset_token.used_at = datetime.utcnow()
await self.session.commit()
# Endpoints
@router.post("/auth/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
session: DatabaseSession,
):
"""Запросить сброс пароля"""
auth_service = AuthService(session)
await auth_service.request_password_reset(data.email)
# Всегда возвращать успех (не раскрывать существование email)
return {"message": "If email exists, reset link has been sent"}
@router.post("/auth/reset-password")
async def reset_password(
data: ResetPasswordRequest,
session: DatabaseSession,
):
"""Сбросить пароль"""
auth_service = AuthService(session)
await auth_service.reset_password(data.token, data.new_password)
return {"message": "Password reset successful"}
7.2 Storage Quota Management
Приоритет: ВЫСОКИЙ 🔴
Что нужно:
- Поле
storage_quota_bytesиstorage_used_bytesв User - Проверка квоты перед загрузкой
- Endpoint для получения статистики использования
- Background job для пересчета использования
Реализация:
# В models.py
class User(Base):
__tablename__ = "users"
# ... existing fields ...
storage_quota_bytes: Mapped[int] = mapped_column(
BigInteger,
default=10 * 1024 * 1024 * 1024 # 10GB по умолчанию
)
storage_used_bytes: Mapped[int] = mapped_column(
BigInteger,
default=0,
index=True
)
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# Проверить квоту ДО создания upload
user = await self.user_repo.get_by_id(user_id)
if user.storage_used_bytes + size_bytes > user.storage_quota_bytes:
remaining = user.storage_quota_bytes - user.storage_used_bytes
raise HTTPException(
status_code=413,
detail=f"Storage quota exceeded. "
f"Available: {remaining / 1024 / 1024:.2f} MB, "
f"Required: {size_bytes / 1024 / 1024:.2f} MB"
)
# ... existing code ...
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... existing code ...
# Обновить использованное место
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes += asset.size_bytes
await self.session.commit()
return asset
async def soft_delete_asset(
self,
user_id: str,
asset_id: str,
) -> Asset:
# ... existing code ...
# НЕ освобождать место при soft delete
# Место освободится при purge
return asset
async def purge_asset(
self,
user_id: str,
asset_id: str,
) -> None:
"""Безвозвратно удалить asset"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Удалить из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
if asset.storage_key_thumb:
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_thumb
)
# Освободить место в квоте
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes = max(0, user.storage_used_bytes - asset.size_bytes)
# Удалить из БД
await self.asset_repo.hard_delete(asset)
await self.session.commit()
# Endpoint для статистики
@router.get("/users/me/storage", response_model=StorageStatsResponse)
async def get_storage_stats(
current_user: CurrentUser,
):
"""Получить статистику использования хранилища"""
return StorageStatsResponse(
quota_bytes=current_user.storage_quota_bytes,
used_bytes=current_user.storage_used_bytes,
available_bytes=current_user.storage_quota_bytes - current_user.storage_used_bytes,
percentage_used=round(
(current_user.storage_used_bytes / current_user.storage_quota_bytes) * 100,
2
),
)
7.3 EXIF Metadata Extraction
Приоритет: СРЕДНЯЯ 🟡
Реализация:
pip install pillow pillow-heif # Для изображений
pip install ffmpeg-python # Для видео
# В tasks/thumbnail_tasks.py
from PIL import Image
from PIL.ExifTags import TAGS
import ffmpeg
def extract_image_metadata(file_path: str) -> dict:
"""Извлечь EXIF из изображения"""
try:
with Image.open(file_path) as img:
exif_data = img._getexif()
if not exif_data:
return {}
metadata = {}
for tag_id, value in exif_data.items():
tag = TAGS.get(tag_id, tag_id)
metadata[tag] = value
return {
"width": img.width,
"height": img.height,
"captured_at": metadata.get("DateTimeOriginal"),
"camera_make": metadata.get("Make"),
"camera_model": metadata.get("Model"),
"gps_latitude": metadata.get("GPSLatitude"),
"gps_longitude": metadata.get("GPSLongitude"),
}
except Exception as e:
logger.error(f"EXIF extraction failed: {e}")
return {}
def extract_video_metadata(file_path: str) -> dict:
"""Извлечь метаданные из видео"""
try:
probe = ffmpeg.probe(file_path)
video_stream = next(
s for s in probe['streams'] if s['codec_type'] == 'video'
)
return {
"width": int(video_stream.get('width', 0)),
"height": int(video_stream.get('height', 0)),
"duration_sec": float(probe['format'].get('duration', 0)),
"codec": video_stream.get('codec_name'),
"bitrate": int(probe['format'].get('bit_rate', 0)),
}
except Exception as e:
logger.error(f"Video metadata extraction failed: {e}")
return {}
@celery_app.task
def generate_thumbnail_and_extract_metadata(asset_id: str):
"""Генерировать thumbnail И извлечь metadata"""
# ... existing thumbnail generation ...
# Извлечь metadata
if asset.type == AssetType.PHOTO:
metadata = extract_image_metadata(temp_file_path)
elif asset.type == AssetType.VIDEO:
metadata = extract_video_metadata(temp_file_path)
# Обновить asset
if metadata.get("width"):
asset.width = metadata["width"]
if metadata.get("height"):
asset.height = metadata["height"]
if metadata.get("captured_at"):
asset.captured_at = parse_datetime(metadata["captured_at"])
if metadata.get("duration_sec"):
asset.duration_sec = metadata["duration_sec"]
db.commit()
7.4 Asset Search
Приоритет: ВЫСОКИЙ 🔴
Простая реализация с PostgreSQL full-text search:
# В asset_repository.py
async def search_assets(
self,
user_id: str,
query: str,
limit: int = 50,
) -> list[Asset]:
"""Поиск assets по имени файла"""
search_query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None))
.where(Asset.status == AssetStatus.READY)
.where(Asset.original_filename.ilike(f"%{query}%"))
.order_by(Asset.created_at.desc())
.limit(limit)
)
result = await self.session.execute(search_query)
return list(result.scalars().all())
# Endpoint
@router.get("/assets/search", response_model=AssetListResponse)
async def search_assets(
q: str = Query(..., min_length=1),
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
):
"""Поиск файлов по имени"""
asset_service = AssetService(session, s3_client)
assets = await asset_service.search_assets(
user_id=current_user.id,
query=q,
limit=limit,
)
return AssetListResponse(
items=assets,
next_cursor=None,
has_more=False,
)
7.5 Database Backup Strategy
Приоритет: КРИТИЧЕСКИЙ 🔴
Для SQLite:
#!/bin/bash
# backup_db.sh
BACKUP_DIR="/backups"
DB_PATH="/app/data/app.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/backup_$TIMESTAMP.db"
# Создать backup
sqlite3 $DB_PATH ".backup '$BACKUP_FILE'"
# Сжать
gzip $BACKUP_FILE
# Загрузить в S3
aws s3 cp "${BACKUP_FILE}.gz" "s3://your-backup-bucket/database/"
# Удалить старые локальные backups (старше 7 дней)
find $BACKUP_DIR -name "backup_*.db.gz" -mtime +7 -delete
# Удалить старые S3 backups (старше 30 дней)
aws s3 ls "s3://your-backup-bucket/database/" | \
awk '{print $4}' | \
while read file; do
# ... deletion logic
done
Cron job:
# Каждые 6 часов
0 */6 * * * /app/scripts/backup_db.sh >> /var/log/backup.log 2>&1
Для PostgreSQL:
#!/bin/bash
# pg_backup.sh
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="backup_$TIMESTAMP.sql.gz"
# pg_dump с сжатием
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > "/tmp/$BACKUP_FILE"
# Загрузить в S3
aws s3 cp "/tmp/$BACKUP_FILE" "s3://your-backup-bucket/database/"
# Cleanup
rm "/tmp/$BACKUP_FILE"
🧪 8. ТЕСТИРОВАНИЕ
8.1 Текущее покрытие: <10% ❌
Найдено только: backend/tests/test_security.py
Что КРИТИЧЕСКИ нужно:
- Unit тесты для services:
# tests/unit/services/test_asset_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_create_upload_exceeds_quota():
"""Тест: загрузка превышает квоту"""
user = User(
id="user1",
storage_quota_bytes=1000,
storage_used_bytes=900,
)
asset_service = AssetService(mock_session, mock_s3)
with pytest.raises(HTTPException) as exc:
await asset_service.create_upload(
user_id=user.id,
original_filename="large.jpg",
content_type="image/jpeg",
size_bytes=200, # 900 + 200 > 1000
)
assert exc.value.status_code == 413
assert "quota exceeded" in exc.value.detail.lower()
- Integration тесты для API:
# tests/integration/test_auth_api.py
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_login_rate_limiting(client: AsyncClient):
"""Тест: rate limiting блокирует brute force"""
# 5 неудачных попыток
for i in range(5):
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 401
# 6-я попытка должна быть заблокирована
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 429
- Security тесты:
# tests/security/test_path_traversal.py
@pytest.mark.asyncio
async def test_filename_sanitization():
"""Тест: path traversal защита"""
malicious_filenames = [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32",
"test\x00.jpg.exe",
"folder/../../../secret.txt",
]
for filename in malicious_filenames:
sanitized = sanitize_filename(filename)
# Проверить, что путь не содержит separators
assert "/" not in sanitized
assert "\\" not in sanitized
assert "\x00" not in sanitized
assert ".." not in sanitized
- Load тесты:
# tests/load/test_upload_performance.py
from locust import HttpUser, task, between
class UploadUser(HttpUser):
wait_time = between(1, 3)
@task
def upload_file(self):
# Симулировать загрузку файла
files = {'file': ('test.jpg', b'fake image data', 'image/jpeg')}
self.client.post("/api/v1/uploads/create", files=files)
Цель: Достичь минимум 70% code coverage
📊 9. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ
9.1 Frontend Type Safety
Проблема: Много any типов в frontend/src/services/api.ts
Решение:
// Заменить все any на конкретные типы
export interface Folder {
id: string;
user_id: string;
name: string;
parent_folder_id: string | null;
created_at: string;
updated_at: string;
}
export interface FolderListResponse {
items: Folder[];
}
export interface FolderBreadcrumb {
id: string;
name: string;
}
// В api.ts
async listFolders(
parentFolderId?: string | null,
all: boolean = false
): Promise<FolderListResponse> {
// ...
}
async getFolder(folderId: string): Promise<Folder> {
// ...
}
async getFolderBreadcrumbs(folderId: string): Promise<FolderBreadcrumb[]> {
// ...
}
9.2 Logging Strategy
Добавить structured logging:
# В config.py
import sys
from loguru import logger
# Настроить loguru
logger.remove() # Удалить дефолтный handler
if settings.APP_ENV == "prod":
# Production: JSON формат для парсинга
logger.add(
sys.stdout,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
level="INFO",
serialize=True, # JSON output
)
else:
# Development: красивый формат
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG",
colorize=True,
)
# Логировать в файл
logger.add(
"logs/app.log",
rotation="100 MB",
retention="30 days",
compression="zip",
level="INFO",
)
# Middleware для логирования всех requests
from starlette.middleware.base import BaseHTTPMiddleware
import time
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
# Логировать request
logger.info(
f"Request started",
extra={
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None,
}
)
try:
response = await call_next(request)
# Логировать response
duration = time.time() - start_time
logger.info(
f"Request completed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration * 1000, 2),
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed",
extra={
"method": request.method,
"path": request.url.path,
"error": str(e),
"duration_ms": round(duration * 1000, 2),
}
)
raise
app.add_middleware(RequestLoggingMiddleware)
9.3 Environment Validation
Проверять критичные env vars при старте:
# В main.py
from app.infra.config import settings
def validate_production_config():
"""Проверить конфигурацию для продакшена"""
errors = []
if settings.APP_ENV == "prod":
# Проверить JWT secret
if settings.JWT_SECRET in ["your-secret-key-change-this-in-production", "changeme"]:
errors.append("JWT_SECRET must be changed in production")
# Проверить S3 конфигурацию
if not settings.S3_ENDPOINT_URL or "localhost" in settings.S3_ENDPOINT_URL:
errors.append("S3_ENDPOINT_URL must be set to production S3")
# Проверить database
if "sqlite" in settings.DATABASE_URL.lower():
logger.warning("Using SQLite in production - consider PostgreSQL")
# Проверить CORS
if "*" in settings.CORS_ORIGINS:
errors.append("CORS_ORIGINS should not contain '*' in production")
if errors:
logger.error("Production configuration errors:")
for error in errors:
logger.error(f" - {error}")
raise ValueError("Invalid production configuration")
@app.on_event("startup")
async def startup():
validate_production_config()
logger.info(f"Application started in {settings.APP_ENV} mode")
🎯 10. ROADMAP ПО ПРИОРИТЕТАМ
ФАЗА 1: Критическая безопасность (1-2 недели) 🔴
Блокирует продакшен
- ✅ Implement rate limiting (slowapi)
- ✅ Add token revocation mechanism (Redis blacklist)
- ✅ Implement refresh token rotation
- ✅ Add storage quota management
- ✅ Enable S3 server-side encryption
- ✅ Fix memory exhaustion in file uploads (streaming)
- ✅ Add strong password validation
- ✅ Implement account lockout after failed attempts
- ✅ Reduce error message verbosity
- ✅ Add security headers middleware
Результат: Приложение защищено от базовых атак
ФАЗА 2: Core Features (2-3 недели) 🟡
Необходимо для MVP
- ✅ Implement soft delete / trash functionality (SPEC REQUIREMENT)
- ✅ Add password reset flow
- ✅ Implement multipart upload для больших файлов
- ✅ Add asset search by filename
- ✅ Extract EXIF metadata (captured_at, dimensions)
- ✅ Add database indexes для производительности
- ✅ Fix ZIP creation (stream instead of memory)
- ✅ Add foreign key constraints
Результат: Полнофункциональное MVP
ФАЗА 3: Production Readiness (2-3 недели) 🟢
DevOps и мониторинг
- ✅ Comprehensive test suite (70%+ coverage)
- ✅ Database backup automation
- ✅ Monitoring & alerting setup (Prometheus + Grafana)
- ✅ Error tracking (Sentry integration)
- ✅ CI/CD pipeline (GitHub Actions)
- ✅ Environment configuration validation
- ✅ Structured logging (loguru + JSON)
- ✅ Documentation completion
- ✅ Load testing (Locust)
- ✅ Security penetration testing
Результат: Готово к продакшену
ФАЗА 4: Enhancements (постоянно) 💚
Улучшения UX
- Video transcoding & adaptive streaming (HLS)
- Albums feature
- Tags system
- Advanced search & filters (date range, type, size)
- Share analytics & permissions
- Image optimization (WebP conversion)
- Duplicate detection (SHA256 deduplication)
- Two-factor authentication (TOTP)
- Email verification
- Shared albums (collaborative)
Результат: Feature-rich продукт
📋 11. SUMMARY: КРИТИЧЕСКИЕ ДЕЙСТВИЯ
Что сделать ПРЯМО СЕЙЧАС перед продакшеном:
- Установить rate limiting - защита от brute force
- Реализовать soft delete - требование спецификации
- Добавить storage quota - предотвратить злоупотребление
- Включить S3 encryption - защита данных в покое
- Стримить файлы - предотвратить OOM
- Добавить тесты - минимум 50% coverage
- Настроить backups - защита от потери данных
- Валидировать env vars - предотвратить misconfig
Оценка времени:
- Минимум для продакшена: 4 недели (Фаза 1 + критичное из Фазы 2)
- Полное MVP: 6-8 недель (Фазы 1-3)
- Production-grade: 10-12 недель (все фазы)
🎓 ЗАКЛЮЧЕНИЕ
Проект ITCloud демонстрирует хорошую архитектурную основу и правильные практики в некоторых областях:
✅ Что сделано хорошо:
- Clean Architecture с четким разделением слоев
- Безопасное хеширование паролей (Argon2)
- Защита от path traversal
- JWT аутентификация
- Pre-signed URLs для S3
- Async/await паттерны
- Docker setup
❌ Критические проблемы:
- Отсутствует rate limiting (BRUTE FORCE VULNERABLE)
- Нет soft delete (SPEC VIOLATION)
- Нет storage quota (ABUSE VULNERABLE)
- Memory exhaustion риски
- Минимальное тестирование
- Нет token revocation
ИТОГОВАЯ ОЦЕНКА: 6/10 (хорошая база, но не готов к продакшену)
РЕКОМЕНДАЦИЯ: Выполнить Фазу 1 и критичные элементы Фазы 2 перед любым публичным запуском.
Дата аудита: 2026-01-05 Версия документа: 1.0 Следующий аудит: После внедрения рекомендаций Фазы 1