itcloud/SECURITY_AND_IMPROVEMENTS_A...

2361 lines
72 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ITCloud - Полный Аудит Безопасности и Рекомендации
**Дата:** 2026-01-05
**Аудитор:** Claude Code (Senior Security & Architecture Review)
**Версия:** 1.0
---
## 📋 EXECUTIVE SUMMARY
Проведен комплексный аудит проекта ITCloud (облачное хранилище фото/видео). Проект демонстрирует хорошую архитектуру (Clean Architecture) и некоторые правильные практики безопасности, но имеет **критические уязвимости** и **отсутствуют ключевые функции**.
### Статус готовности к продакшену: ❌ НЕ ГОТОВ
**Критические блокеры:**
1. ❌ Отсутствует rate limiting (защита от brute force)
2. ❌ Нет soft delete/корзины (указано в требованиях)
3. ❌ Отсутствует управление квотами хранилища
4. ❌ Нет механизма отзыва токенов
5. ❌ Отсутствует стратегия бэкапов БД
6. ❌ Минимальное покрытие тестами (<10%)
**Оценка времени до готовности к продакшену: 6-8 недель**
---
## 🔴 1. КРИТИЧЕСКИЕ УЯЗВИМОСТИ БЕЗОПАСНОСТИ
### 1.1 Отсутствует Rate Limiting
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** Все API endpoints
**Файлы:** `backend/src/app/main.py`
**Проблема:**
- Нет ограничений на количество запросов
- Атакующий может делать неограниченное количество попыток логина
- Возможна DoS атака через перегрузку API
**Риски:**
- Brute force атаки на пароли пользователей
- Credential stuffing атаки
- Исчерпание ресурсов сервера
- Спам загрузок файлов
**Решение:**
```python
# Установить: pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Для каждого endpoint:
@router.post("/login")
@limiter.limit("5/minute") # 5 попыток в минуту
async def login(...):
...
@router.post("/uploads/create")
@limiter.limit("100/hour") # 100 загрузок в час
async def create_upload(...):
...
```
**Рекомендуемые лимиты:**
- `/auth/login`: 5 запросов/минуту
- `/auth/register`: 3 запроса/час
- `/uploads/create`: 100 запросов/час
- Остальные endpoints: 1000 запросов/час
---
### 1.2 JWT Secret по умолчанию слабый
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** `backend/.env.example:18`
**Проблема:**
```env
JWT_SECRET=your-secret-key-change-this-in-production
```
Если пользователь задеплоит с дефолтным секретом, токены можно подделать.
**Решение:**
1. Добавить валидацию при старте:
```python
# В config.py
if settings.APP_ENV == "prod" and settings.JWT_SECRET in [
"your-secret-key-change-this-in-production",
"changeme",
"secret",
]:
raise ValueError("КРИТИЧЕСКАЯ ОШИБКА: Используется слабый JWT_SECRET в продакшене!")
```
2. Генерировать случайный секрет при деплое:
```python
import secrets
print(secrets.token_urlsafe(64))
```
---
### 1.3 Отсутствует ограничение размера при чтении файла в память
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/services/asset_service.py:138-182`
**Проблема:**
```python
# Строка 151
file_data = await file.read() # Читает ВЕСЬ файл в память!
```
Атакующий может загрузить огромный файл и вызвать OOM (Out Of Memory).
**Решение:**
Стримить файл чанками в S3:
```python
async def upload_file_to_s3(
self,
user_id: str,
asset_id: str,
file: UploadFile,
) -> Asset:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Проверить размер ПЕРЕД чтением
if file.size and file.size > self.config.max_upload_size_bytes:
raise HTTPException(status_code=413, detail="File too large")
# Стрим в S3 чанками
try:
await self.s3_client.upload_fileobj_streaming(
file.file, # file-like object
self.config.media_bucket,
asset.storage_key_original,
content_type=asset.content_type,
)
except Exception as e:
logger.error(f"S3 upload failed: {e}")
raise HTTPException(status_code=500, detail="Upload failed")
```
Добавить в `S3Client`:
```python
async def upload_fileobj_streaming(
self,
fileobj,
bucket: str,
key: str,
content_type: str,
chunk_size: int = 8 * 1024 * 1024, # 8MB chunks
):
"""Stream file to S3 in chunks"""
try:
await asyncio.to_thread(
self.client.upload_fileobj,
fileobj,
bucket,
key,
ExtraArgs={
'ContentType': content_type,
'ServerSideEncryption': 'AES256', # Важно!
},
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=chunk_size,
multipart_chunksize=chunk_size,
),
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
```
---
### 1.4 Отсутствует CSRF защита
**Серьезность: ВЫСОКАЯ** 🟡 (частично смягчено Bearer токенами)
**Расположение:** `backend/src/app/main.py`
**Текущее состояние:**
- Используются Bearer токены в заголовках CSRF не критичен
- НО: если в будущем перейдете на cookie-based auth станет критично
**Рекомендация на будущее:**
```python
from fastapi_csrf_protect import CsrfProtect
@app.post("/api/v1/assets/{asset_id}/delete")
async def delete_asset(
asset_id: str,
csrf_protect: CsrfProtect = Depends(),
):
await csrf_protect.validate_csrf(request)
# ...
```
---
### 1.5 Небезопасная генерация share токенов
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/repositories/share_repository.py:25-27`
**Проблема:**
```python
token = secrets.token_urlsafe(32) # Хорошо!
# НО нет проверки на коллизии
```
**Решение:**
```python
async def create(
self,
owner_user_id: str,
asset_id: Optional[str] = None,
album_id: Optional[str] = None,
expires_at: Optional[datetime] = None,
password_hash: Optional[str] = None,
) -> Share:
# Генерировать с проверкой уникальности
max_attempts = 5
for attempt in range(max_attempts):
token = secrets.token_urlsafe(32)
# Проверить, что токен не существует
existing = await self.session.execute(
select(Share).where(Share.token == token)
)
if not existing.scalar_one_or_none():
break
else:
raise RuntimeError("Failed to generate unique share token")
share = Share(
owner_user_id=owner_user_id,
asset_id=asset_id,
album_id=album_id,
token=token,
expires_at=expires_at,
password_hash=password_hash,
)
self.session.add(share)
await self.session.flush()
await self.session.refresh(share)
return share
```
---
## 🟡 2. ПРОБЛЕМЫ АУТЕНТИФИКАЦИИ И АВТОРИЗАЦИИ
### 2.1 Отсутствует механизм отзыва токенов
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/infra/security.py`
**Проблема:**
- JWT токены включают `jti` (token ID), но нет механизма отзыва
- Если токен скомпрометирован, он остается валидным до истечения срока (15 минут для access, 14 дней для refresh)
- Пользователь не может "выйти со всех устройств"
**Решение с Redis:**
```python
# В infra/redis_client.py
from redis.asyncio import Redis
class TokenBlacklist:
def __init__(self, redis: Redis):
self.redis = redis
async def revoke_token(self, jti: str, ttl_seconds: int):
"""Добавить токен в blacklist"""
await self.redis.setex(f"blacklist:{jti}", ttl_seconds, "1")
async def is_revoked(self, jti: str) -> bool:
"""Проверить, отозван ли токен"""
return await self.redis.exists(f"blacklist:{jti}") > 0
# В security.py
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
) -> User:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti: str = payload.get("jti")
# Проверить blacklist
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
# ... остальная логика
```
**Добавить endpoint:**
```python
@router.post("/auth/logout")
async def logout(
current_user: CurrentUser,
token: str = Depends(oauth2_scheme),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Отозвать текущий токен"""
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
jti = payload.get("jti")
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
return {"message": "Logged out successfully"}
```
---
### 2.2 Нет refresh token rotation
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/v1/auth.py`
**Проблема:**
- Нет endpoint для обновления access token через refresh token
- Пользователи должны логиниться заново каждые 15 минут
**Решение:**
```python
@router.post("/auth/refresh", response_model=AuthTokens)
async def refresh_token(
refresh_token: str = Body(..., embed=True),
db: AsyncSession = Depends(get_db),
blacklist: TokenBlacklist = Depends(get_token_blacklist),
):
"""Обновить access token используя refresh token"""
try:
# Декодировать refresh token
payload = jwt.decode(
refresh_token,
settings.JWT_SECRET,
algorithms=["HS256"]
)
# Проверить тип токена
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
# Проверить blacklist
jti = payload.get("jti")
if await blacklist.is_revoked(jti):
raise HTTPException(status_code=401, detail="Token has been revoked")
user_id = payload.get("sub")
# Получить пользователя
user_repo = UserRepository(db)
user = await user_repo.get_by_id(user_id)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found")
# ВАЖНО: Отозвать старый refresh token (rotation)
exp = payload.get("exp")
ttl = exp - int(datetime.utcnow().timestamp())
await blacklist.revoke_token(jti, ttl)
# Сгенерировать новые токены
auth_service = AuthService(db)
tokens = auth_service.create_tokens(user)
return tokens
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
```
**Frontend:**
```typescript
// В api.ts
async refreshAccessToken(): Promise<AuthTokens> {
const refreshToken = localStorage.getItem('refresh_token');
if (!refreshToken) {
throw new Error('No refresh token');
}
const { data } = await this.client.post('/auth/refresh', {
refresh_token: refreshToken,
});
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
return data;
}
// Добавить в interceptor
this.client.interceptors.response.use(
(response) => response,
async (error) => {
const originalRequest = error.config;
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
try {
await this.refreshAccessToken();
return this.client(originalRequest);
} catch (refreshError) {
// Refresh failed, redirect to login
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
if (!['/login', '/register'].includes(window.location.pathname)) {
window.location.href = '/login';
}
return Promise.reject(refreshError);
}
}
return Promise.reject(error);
}
);
```
---
### 2.3 Слабая валидация паролей
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/schemas.py:16`
**Текущее состояние:**
```python
password: str = Field(min_length=8)
```
Можно установить пароль "12345678"
**Решение:**
```python
import re
from pydantic import field_validator
class UserRegisterRequest(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""Проверка сложности пароля"""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
# Проверки
has_upper = re.search(r'[A-Z]', v)
has_lower = re.search(r'[a-z]', v)
has_digit = re.search(r'\d', v)
has_special = re.search(r'[!@#$%^&*(),.?":{}|<>]', v)
checks_passed = sum([
bool(has_upper),
bool(has_lower),
bool(has_digit),
bool(has_special),
])
if checks_passed < 3:
raise ValueError(
"Password must contain at least 3 of: "
"uppercase letter, lowercase letter, digit, special character"
)
# Проверка на распространенные пароли
common_passwords = [
"password", "12345678", "qwerty123", "admin123"
]
if v.lower() in common_passwords:
raise ValueError("This password is too common")
return v
```
---
### 2.4 Нет блокировки аккаунта после неудачных попыток
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/services/auth_service.py:55-84`
**Проблема:**
Можно делать бесконечные попытки логина (если нет rate limiting)
**Решение с Redis:**
```python
class LoginAttemptTracker:
def __init__(self, redis: Redis):
self.redis = redis
self.max_attempts = 5
self.lockout_duration = 900 # 15 минут
async def record_failed_attempt(self, email: str):
"""Записать неудачную попытку"""
key = f"login_attempts:{email}"
attempts = await self.redis.incr(key)
if attempts == 1:
# Установить TTL на первую попытку
await self.redis.expire(key, 3600) # 1 час
if attempts >= self.max_attempts:
# Заблокировать аккаунт
await self.redis.setex(
f"account_locked:{email}",
self.lockout_duration,
"1"
)
async def clear_attempts(self, email: str):
"""Очистить счетчик после успешного логина"""
await self.redis.delete(f"login_attempts:{email}")
async def is_locked(self, email: str) -> bool:
"""Проверить, заблокирован ли аккаунт"""
return await self.redis.exists(f"account_locked:{email}") > 0
async def get_lockout_remaining(self, email: str) -> int:
"""Получить оставшееся время блокировки (секунды)"""
return await self.redis.ttl(f"account_locked:{email}")
# В auth.py
@router.post("/login", response_model=AuthResponse)
async def login(
data: UserLoginRequest,
session: DatabaseSession,
tracker: LoginAttemptTracker = Depends(get_login_tracker),
):
# Проверить блокировку
if await tracker.is_locked(data.email):
remaining = await tracker.get_lockout_remaining(data.email)
raise HTTPException(
status_code=429,
detail=f"Account locked due to too many failed attempts. "
f"Try again in {remaining // 60} minutes."
)
auth_service = AuthService(session)
try:
user, tokens = await auth_service.login(data.email, data.password)
# Успешный логин - очистить счетчик
await tracker.clear_attempts(data.email)
return AuthResponse(user=user, tokens=tokens)
except HTTPException as e:
if e.status_code == 401:
# Неудачная попытка
await tracker.record_failed_attempt(data.email)
raise
```
---
## 🔍 3. ПРОБЛЕМЫ ВАЛИДАЦИИ ВХОДНЫХ ДАННЫХ
### 3.1 Недостаточная валидация Content-Type
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/api/schemas.py:85-91`
**Текущее состояние:**
```python
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
if not (v.startswith("image/") or v.startswith("video/")):
raise ValueError("Only image/* and video/* content types are supported")
return v
```
Принимает ЛЮБОЙ image/* или video/* тип, включая опасные (SVG с JS, вредоносные кодеки)
**Решение - whitelist:**
```python
ALLOWED_IMAGE_TYPES = {
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
"image/heic",
"image/heif",
}
ALLOWED_VIDEO_TYPES = {
"video/mp4",
"video/mpeg",
"video/quicktime", # .mov
"video/x-msvideo", # .avi
"video/x-matroska", # .mkv
"video/webm",
}
@field_validator("content_type")
@classmethod
def validate_content_type(cls, v: str) -> str:
v = v.lower().strip()
if v not in ALLOWED_IMAGE_TYPES and v not in ALLOWED_VIDEO_TYPES:
raise ValueError(
f"Content type '{v}' not supported. "
f"Allowed: {', '.join(ALLOWED_IMAGE_TYPES | ALLOWED_VIDEO_TYPES)}"
)
return v
```
---
### 3.2 Отсутствует проверка "магических байтов"
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** Upload flow
**Проблема:**
Полагаемся на content-type от клиента. Вредоносный файл может притвориться изображением.
**Решение с python-magic:**
```bash
pip install python-magic-bin # Windows
pip install python-magic # Linux/Mac
```
```python
import magic
async def verify_file_type(file: UploadFile, expected_type: str) -> bool:
"""Проверить реальный тип файла по магическим байтам"""
# Прочитать первые 2048 байт
header = await file.read(2048)
await file.seek(0) # Вернуться в начало
# Определить MIME type
mime = magic.from_buffer(header, mime=True)
# Проверить соответствие
if expected_type.startswith("image/"):
return mime in ALLOWED_IMAGE_TYPES
elif expected_type.startswith("video/"):
return mime in ALLOWED_VIDEO_TYPES
return False
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# ... существующий код ...
# Добавить в метаданные для проверки при finalize
asset.metadata = {
"expected_content_type": content_type,
"needs_verification": True,
}
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... существующий код ...
# Проверить магические байты если требуется
if asset.metadata.get("needs_verification"):
# Загрузить первые байты из S3
try:
response = await self.s3_client.get_object_range(
self.config.media_bucket,
asset.storage_key_original,
bytes_range=(0, 2047),
)
header = response['Body'].read()
mime = magic.from_buffer(header, mime=True)
expected = asset.metadata.get("expected_content_type")
# Проверить соответствие
if (expected.startswith("image/") and mime not in ALLOWED_IMAGE_TYPES) or \
(expected.startswith("video/") and mime not in ALLOWED_VIDEO_TYPES):
# Удалить файл из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
asset.status = AssetStatus.FAILED
await self.asset_repo.update(asset)
raise HTTPException(
status_code=400,
detail="File type verification failed"
)
except Exception as e:
logger.warning(f"File verification failed: {e}")
```
---
### 3.3 Защита от Path Traversal реализована ✅
**Серьезность: N/A (ХОРОШО)**
**Расположение:** `backend/src/app/services/asset_service.py:23-50`
**Статус:** Правильно реализовано:
- Используется `os.path.basename()`
- Удаляются path separators и null bytes
- Ограничение длины имени файла
Это **правильная** реализация. Оставить как есть.
---
## 🗄️ 4. БЕЗОПАСНОСТЬ S3 ХРАНИЛИЩА
### 4.1 Отсутствует шифрование S3 объектов
**Серьезность: СРЕДНЯЯ-ВЫСОКАЯ** 🟡
**Расположение:** `backend/src/app/infra/s3_client.py`
**Проблема:**
Файлы хранятся в S3 без шифрования на стороне сервера.
**Решение:**
```python
# Во ВСЕХ методах загрузки добавить ServerSideEncryption
async def upload_object(
self,
bucket: str,
key: str,
data: bytes,
content_type: str,
) -> None:
try:
await asyncio.to_thread(
self.client.put_object,
Bucket=bucket,
Key=key,
Body=data,
ContentType=content_type,
ServerSideEncryption='AES256', # ← ДОБАВИТЬ
)
except ClientError as e:
logger.error(f"S3 upload error: {e}")
raise
# Для presigned URLs тоже нужно добавить
def generate_presigned_post(
self,
bucket: str,
key: str,
content_type: str,
max_size_bytes: int,
expires_in: int = 600,
) -> dict:
try:
return self.client.generate_presigned_post(
Bucket=bucket,
Key=key,
Fields={
"Content-Type": content_type,
"x-amz-server-side-encryption": "AES256", # ← ДОБАВИТЬ
},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, max_size_bytes],
{"x-amz-server-side-encryption": "AES256"}, # ← ДОБАВИТЬ
],
ExpiresIn=expires_in,
)
except ClientError as e:
logger.error(f"Presigned POST generation error: {e}")
raise
```
**Важно:** Также настроить bucket policy:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnencryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::your-bucket-name/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
}
]
}
```
---
### 4.2 Hardcoded bucket name для корзины
**Серьезность: НИЗКАЯ** 🟢
**Расположение:** `backend/src/app/infra/s3_client.py:28, :152`
**Проблема:**
```python
TRASH_BUCKET = "itcloud-trash" # Hardcoded
```
**Решение:**
```python
# В config.py
class Settings(BaseSettings):
# ... existing ...
MEDIA_BUCKET: str
TRASH_BUCKET: str = "itcloud-trash" # default, но можно переопределить
# В s3_client.py
def __init__(self, config: Settings):
self.config = config
# ... existing ...
self.trash_bucket = config.TRASH_BUCKET # Использовать из конфига
```
---
### 4.3 Слишком длинный TTL для pre-signed URLs
**Серьезность: НИЗКАЯ** 🟢
**Расположение:** `backend/src/app/infra/config.py:43`
**Текущее:**
```python
SIGNED_URL_TTL_SECONDS: int = 600 # 10 минут
```
**Рекомендация:**
Для продакшена уменьшить до 300 секунд (5 минут) для чувствительного контента:
```python
SIGNED_URL_TTL_SECONDS: int = Field(default=300, ge=60, le=3600)
```
---
## 🔐 5. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ ПО БЕЗОПАСНОСТИ
### 5.1 CORS Headers слишком permissive
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/main.py:34-37`
**Проблема:**
```python
allow_headers=["*"], # Разрешены ВСЕ заголовки
expose_headers=["*"], # Экспортируются ВСЕ заголовки
```
**Решение:**
```python
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS.split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
# Whitelist конкретных заголовков
allow_headers=[
"Authorization",
"Content-Type",
"X-Requested-With",
"Accept",
],
# Экспортировать только нужные
expose_headers=[
"Content-Length",
"Content-Type",
"X-Total-Count",
],
max_age=3600,
)
```
---
### 5.2 Добавить Security Headers
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/main.py`
**Добавить middleware для security headers:**
```python
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
# Защита от clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Защита от XSS
response.headers["X-Content-Type-Options"] = "nosniff"
# Защита от XSS для старых браузеров
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"img-src 'self' data: https:; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline';"
)
# HSTS (если используется HTTPS)
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
```
---
### 5.3 Токены в localStorage (Frontend)
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `frontend/src/services/api.ts:30-34, :62-64`
**Проблема:**
localStorage доступен для XSS атак.
**Альтернативы:**
1. **httpOnly cookies** для refresh token (лучше всего)
2. **Memory storage** для access token (но теряется при refresh страницы)
**Компромиссное решение:**
```typescript
// Хранить refresh token в httpOnly cookie (настроить на бэкенде)
// Access token держать в памяти
class TokenManager {
private accessToken: string | null = null;
setAccessToken(token: string) {
this.accessToken = token;
// Также можно в sessionStorage (лучше чем localStorage)
sessionStorage.setItem('access_token', token);
}
getAccessToken(): string | null {
if (this.accessToken) return this.accessToken;
return sessionStorage.getItem('access_token');
}
clearTokens() {
this.accessToken = null;
sessionStorage.removeItem('access_token');
// Refresh token будет удален через httpOnly cookie с бэкенда
}
}
```
---
## 🏗️ 6. АРХИТЕКТУРНЫЕ УЛУЧШЕНИЯ
### 6.1 Отсутствует Soft Delete для Assets ❌
**Серьезность: КРИТИЧЕСКАЯ** 🔴
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
В спецификации (CLAUDE.md) указано:
> "Soft delete to trash with restore capability"
Но в модели Asset нет поля `deleted_at`:
```python
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# НЕТ: deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
```
**Решение:**
1. **Создать миграцию:**
```bash
cd backend
alembic revision -m "add_soft_delete_to_assets"
```
2. **Добавить в миграцию:**
```python
def upgrade() -> None:
op.add_column('assets', sa.Column('deleted_at', sa.DateTime(), nullable=True))
op.create_index('ix_assets_deleted_at', 'assets', ['deleted_at'])
def downgrade() -> None:
op.drop_index('ix_assets_deleted_at', table_name='assets')
op.drop_column('assets', 'deleted_at')
```
3. **Обновить модель:**
```python
from typing import Optional
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import DateTime
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime, nullable=True, index=True, default=None
)
```
4. **Добавить методы в AssetRepository:**
```python
async def soft_delete(self, asset: Asset) -> Asset:
"""Мягкое удаление (в корзину)"""
asset.deleted_at = datetime.utcnow()
await self.session.flush()
await self.session.refresh(asset)
return asset
async def restore(self, asset: Asset) -> Asset:
"""Восстановить из корзины"""
asset.deleted_at = None
await self.session.flush()
await self.session.refresh(asset)
return asset
async def list_trash(
self,
user_id: str,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
"""Список удаленных файлов"""
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.isnot(None))
.order_by(Asset.deleted_at.desc())
)
if cursor:
query = query.where(Asset.id < cursor)
query = query.limit(limit + 1)
result = await self.session.execute(query)
return list(result.scalars().all())
async def hard_delete(self, asset: Asset) -> None:
"""Полное удаление (безвозвратно)"""
await self.session.delete(asset)
await self.session.flush()
```
5. **Добавить endpoints:**
```python
# В assets.py
@router.get("/trash", response_model=AssetListResponse)
async def list_trash(
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
cursor: Optional[str] = Query(None),
):
"""Список файлов в корзине"""
asset_service = AssetService(session, s3_client)
assets, next_cursor, has_more = await asset_service.list_trash(
user_id=current_user.id,
limit=limit,
cursor=cursor,
)
return AssetListResponse(
items=assets,
next_cursor=next_cursor,
has_more=has_more,
)
@router.post("/{asset_id}/restore", response_model=AssetResponse)
async def restore_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Восстановить файл из корзины"""
asset_service = AssetService(session, s3_client)
asset = await asset_service.restore_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return asset
@router.delete("/{asset_id}/purge")
async def purge_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Удалить файл безвозвратно из корзины"""
asset_service = AssetService(session, s3_client)
await asset_service.purge_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset permanently deleted"}
```
6. **Обновить существующий DELETE endpoint:**
```python
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: str,
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
):
"""Переместить файл в корзину (мягкое удаление)"""
asset_service = AssetService(session, s3_client)
await asset_service.soft_delete_asset(
user_id=current_user.id,
asset_id=asset_id,
)
return {"message": "Asset moved to trash"}
```
7. **Обновить list_assets чтобы НЕ показывать удаленные:**
```python
# В asset_repository.py
async def list_by_user(
self,
user_id: str,
folder_id: Optional[str] = None,
limit: int = 50,
cursor: Optional[str] = None,
) -> list[Asset]:
query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None)) # ← ДОБАВИТЬ
.where(Asset.status == AssetStatus.READY)
)
# ... rest of logic
```
8. **Background job для очистки старых файлов:**
```python
# В tasks/cleanup_tasks.py
from datetime import datetime, timedelta
async def cleanup_old_trash():
"""Удалить файлы, которые в корзине >30 дней"""
async with get_db_session() as session:
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
assets = await session.execute(
select(Asset).where(
Asset.deleted_at < thirty_days_ago
)
)
for asset in assets.scalars():
# Удалить из S3
await s3_client.delete_object(bucket, asset.storage_key_original)
if asset.storage_key_thumb:
await s3_client.delete_object(bucket, asset.storage_key_thumb)
# Удалить из БД
await session.delete(asset)
await session.commit()
```
---
### 6.2 Отсутствуют индексы в БД
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
Медленные запросы при больших объемах данных.
**Решение - добавить составные индексы:**
```python
# В models.py
from sqlalchemy import Index
class Asset(Base):
__tablename__ = "assets"
# ... fields ...
# Составные индексы для часто используемых запросов
__table_args__ = (
Index(
'ix_assets_user_folder_created',
'user_id', 'folder_id', 'created_at'
),
Index(
'ix_assets_user_status',
'user_id', 'status'
),
Index(
'ix_assets_user_deleted',
'user_id', 'deleted_at'
),
)
class Share(Base):
__tablename__ = "shares"
# ... fields ...
__table_args__ = (
Index(
'ix_shares_token_expires',
'token', 'expires_at'
),
Index(
'ix_shares_owner_created',
'owner_user_id', 'created_at'
),
)
class Folder(Base):
__tablename__ = "folders"
# ... fields ...
__table_args__ = (
Index(
'ix_folders_user_parent',
'user_id', 'parent_folder_id'
),
)
```
**Создать миграцию:**
```bash
alembic revision -m "add_composite_indexes"
```
---
### 6.3 ZIP создается в памяти
**Серьезность: ВЫСОКАЯ** 🔴
**Расположение:** `backend/src/app/services/batch_operations_service.py:206`
**Проблема:**
```python
zip_buffer = io.BytesIO() # Весь ZIP в памяти!
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Добавление файлов...
```
При скачивании 10GB файлов OOM.
**Решение - стримить через temp file:**
```python
from app.services.batch_operations_service import temp_file_manager
async def download_assets_batch(
self,
user_id: str,
asset_ids: list[str],
) -> tuple[str, bytes]:
"""Скачать несколько файлов как ZIP архив"""
# Использовать временный файл вместо памяти
async with temp_file_manager(suffix='.zip') as temp_path:
# Создать ZIP в temp файле
with zipfile.ZipFile(temp_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for asset_id in asset_ids:
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
continue
# Стримить файл из S3
try:
response = await asyncio.to_thread(
self.s3_client.client.get_object,
Bucket=self.config.media_bucket,
Key=asset.storage_key_original,
)
# Читать чанками
with response['Body'] as stream:
# Генерировать уникальное имя файла
base_name = sanitize_filename(asset.original_filename)
unique_name = self._get_unique_filename(
zip_file,
base_name
)
# Записать в ZIP chunk by chunk
with zip_file.open(unique_name, 'w') as dest:
while True:
chunk = stream.read(8 * 1024 * 1024) # 8MB
if not chunk:
break
dest.write(chunk)
except Exception as e:
logger.warning(f"Failed to add asset {asset_id}: {e}")
continue
# Прочитать готовый ZIP
with open(temp_path, 'rb') as f:
zip_data = f.read()
filename = f"assets_batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return filename, zip_data
# Но ЛУЧШЕ стримить ответ напрямую без чтения в память:
from starlette.responses import FileResponse
@router.post("/batch/download")
async def download_batch(
data: BatchDownloadRequest,
current_user: CurrentUser,
session: DatabaseSession,
):
"""Скачать файлы как ZIP (streaming response)"""
# Создать temp ZIP
async with temp_file_manager(suffix='.zip') as temp_path:
batch_service = BatchOperationsService(session, s3_client)
# Создать ZIP в temp файле (не загружая в память)
await batch_service.create_zip_file(
user_id=current_user.id,
asset_ids=data.asset_ids,
output_path=temp_path,
)
# Вернуть как streaming response
filename = f"assets_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.zip"
return FileResponse(
path=temp_path,
media_type='application/zip',
filename=filename,
background=BackgroundTask(os.unlink, temp_path), # Удалить после отправки
)
```
---
### 6.4 Смешанные обязанности в FolderService
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/services/folder_service.py:195-223`
**Проблема:**
```python
# Строка 207
# TODO: Should use AssetService methods here
asset.folder_id = None
await self.asset_repo.update(asset)
```
FolderService напрямую работает с AssetRepository, нарушая SRP.
**Решение:**
```python
# Создать Orchestration Service
class FolderManagementService:
"""Оркестрирует операции над папками и связанными ресурсами"""
def __init__(
self,
session: AsyncSession,
s3_client: S3Client,
):
self.folder_service = FolderService(session)
self.asset_service = AssetService(session, s3_client)
async def delete_folder_with_contents(
self,
user_id: str,
folder_id: str,
delete_assets: bool = False,
) -> None:
"""
Удалить папку и обработать связанные assets.
Args:
user_id: ID пользователя
folder_id: ID папки
delete_assets: True - удалить assets, False - переместить в root
"""
# Получить все assets в папке и подпапках
assets = await self.asset_service.list_assets_in_folder_recursive(
user_id=user_id,
folder_id=folder_id,
)
if delete_assets:
# Удалить все assets (soft delete)
for asset in assets:
await self.asset_service.soft_delete_asset(
user_id=user_id,
asset_id=asset.id,
)
else:
# Переместить в root
for asset in assets:
await self.asset_service.move_asset(
user_id=user_id,
asset_id=asset.id,
target_folder_id=None,
)
# Удалить папку
await self.folder_service.delete_folder(
user_id=user_id,
folder_id=folder_id,
)
```
---
### 6.5 Отсутствуют Foreign Keys в моделях
**Серьезность: СРЕДНЯЯ** 🟡
**Расположение:** `backend/src/app/domain/models.py`
**Проблема:**
Нет явных relationship и foreign key constraints.
**Решение:**
```python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Asset(Base):
__tablename__ = "assets"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="SET NULL"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="assets")
folder: Mapped[Optional["Folder"]] = relationship("Folder", back_populates="assets")
class Folder(Base):
__tablename__ = "folders"
# ... existing fields ...
user_id: Mapped[str] = mapped_column(
String(36),
ForeignKey("users.id", ondelete="CASCADE"),
index=True
)
parent_folder_id: Mapped[Optional[str]] = mapped_column(
String(36),
ForeignKey("folders.id", ondelete="CASCADE"),
nullable=True,
index=True
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="folders")
parent: Mapped[Optional["Folder"]] = relationship(
"Folder",
remote_side="Folder.id",
back_populates="children"
)
children: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="parent",
cascade="all, delete-orphan"
)
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="folder",
cascade="all, delete-orphan"
)
class User(Base):
__tablename__ = "users"
# ... existing fields ...
# Relationships
assets: Mapped[list["Asset"]] = relationship(
"Asset",
back_populates="user",
cascade="all, delete-orphan"
)
folders: Mapped[list["Folder"]] = relationship(
"Folder",
back_populates="user",
cascade="all, delete-orphan"
)
```
**Миграция:**
```bash
alembic revision -m "add_foreign_keys_and_relationships"
```
---
## 📦 7. ОТСУТСТВУЮЩИЕ КЛЮЧЕВЫЕ ФУНКЦИИ
### 7.1 Password Reset Flow
**Приоритет: ВЫСОКИЙ** 🔴
**Что нужно:**
1. Endpoint `/auth/forgot-password` - отправить email с токеном
2. Endpoint `/auth/reset-password` - сбросить пароль по токену
3. Email сервис (SMTP)
4. Токены сброса пароля с expiration
**Пример реализации:**
```python
# В models.py
class PasswordResetToken(Base):
__tablename__ = "password_reset_tokens"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4()))
user_id: Mapped[str] = mapped_column(String(36), ForeignKey("users.id"), index=True)
token: Mapped[str] = mapped_column(String(64), unique=True, index=True)
expires_at: Mapped[datetime] = mapped_column(DateTime)
used_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# В auth_service.py
async def request_password_reset(self, email: str) -> None:
"""Создать токен сброса пароля и отправить email"""
user = await self.user_repo.get_by_email(email)
if not user:
# НЕ раскрывать что пользователь не найден (безопасность)
logger.info(f"Password reset requested for non-existent email: {email}")
return
# Создать токен
token = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(hours=1)
reset_token = PasswordResetToken(
user_id=user.id,
token=token,
expires_at=expires_at,
)
self.session.add(reset_token)
await self.session.commit()
# Отправить email
reset_url = f"{settings.FRONTEND_URL}/reset-password?token={token}"
await email_service.send_password_reset(user.email, reset_url)
async def reset_password(self, token: str, new_password: str) -> None:
"""Сбросить пароль по токену"""
# Найти токен
result = await self.session.execute(
select(PasswordResetToken).where(
PasswordResetToken.token == token,
PasswordResetToken.used_at.is_(None),
PasswordResetToken.expires_at > datetime.utcnow(),
)
)
reset_token = result.scalar_one_or_none()
if not reset_token:
raise HTTPException(status_code=400, detail="Invalid or expired token")
# Обновить пароль
user = await self.user_repo.get_by_id(reset_token.user_id)
password_hash = self.password_context.hash(new_password)
user.password_hash = password_hash
# Пометить токен как использованный
reset_token.used_at = datetime.utcnow()
await self.session.commit()
# Endpoints
@router.post("/auth/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
session: DatabaseSession,
):
"""Запросить сброс пароля"""
auth_service = AuthService(session)
await auth_service.request_password_reset(data.email)
# Всегда возвращать успех (не раскрывать существование email)
return {"message": "If email exists, reset link has been sent"}
@router.post("/auth/reset-password")
async def reset_password(
data: ResetPasswordRequest,
session: DatabaseSession,
):
"""Сбросить пароль"""
auth_service = AuthService(session)
await auth_service.reset_password(data.token, data.new_password)
return {"message": "Password reset successful"}
```
---
### 7.2 Storage Quota Management
**Приоритет: ВЫСОКИЙ** 🔴
**Что нужно:**
1. Поле `storage_quota_bytes` и `storage_used_bytes` в User
2. Проверка квоты перед загрузкой
3. Endpoint для получения статистики использования
4. Background job для пересчета использования
**Реализация:**
```python
# В models.py
class User(Base):
__tablename__ = "users"
# ... existing fields ...
storage_quota_bytes: Mapped[int] = mapped_column(
BigInteger,
default=10 * 1024 * 1024 * 1024 # 10GB по умолчанию
)
storage_used_bytes: Mapped[int] = mapped_column(
BigInteger,
default=0,
index=True
)
# В asset_service.py
async def create_upload(
self,
user_id: str,
original_filename: str,
content_type: str,
size_bytes: int,
folder_id: Optional[str] = None,
) -> tuple[Asset, dict]:
# Проверить квоту ДО создания upload
user = await self.user_repo.get_by_id(user_id)
if user.storage_used_bytes + size_bytes > user.storage_quota_bytes:
remaining = user.storage_quota_bytes - user.storage_used_bytes
raise HTTPException(
status_code=413,
detail=f"Storage quota exceeded. "
f"Available: {remaining / 1024 / 1024:.2f} MB, "
f"Required: {size_bytes / 1024 / 1024:.2f} MB"
)
# ... existing code ...
async def finalize_upload(
self,
user_id: str,
asset_id: str,
etag: Optional[str] = None,
sha256: Optional[str] = None,
) -> Asset:
# ... existing code ...
# Обновить использованное место
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes += asset.size_bytes
await self.session.commit()
return asset
async def soft_delete_asset(
self,
user_id: str,
asset_id: str,
) -> Asset:
# ... existing code ...
# НЕ освобождать место при soft delete
# Место освободится при purge
return asset
async def purge_asset(
self,
user_id: str,
asset_id: str,
) -> None:
"""Безвозвратно удалить asset"""
asset = await self.asset_repo.get_by_id(asset_id)
if not asset or asset.user_id != user_id:
raise HTTPException(status_code=404, detail="Asset not found")
# Удалить из S3
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_original
)
if asset.storage_key_thumb:
await self.s3_client.delete_object(
self.config.media_bucket,
asset.storage_key_thumb
)
# Освободить место в квоте
user = await self.user_repo.get_by_id(user_id)
user.storage_used_bytes = max(0, user.storage_used_bytes - asset.size_bytes)
# Удалить из БД
await self.asset_repo.hard_delete(asset)
await self.session.commit()
# Endpoint для статистики
@router.get("/users/me/storage", response_model=StorageStatsResponse)
async def get_storage_stats(
current_user: CurrentUser,
):
"""Получить статистику использования хранилища"""
return StorageStatsResponse(
quota_bytes=current_user.storage_quota_bytes,
used_bytes=current_user.storage_used_bytes,
available_bytes=current_user.storage_quota_bytes - current_user.storage_used_bytes,
percentage_used=round(
(current_user.storage_used_bytes / current_user.storage_quota_bytes) * 100,
2
),
)
```
---
### 7.3 EXIF Metadata Extraction
**Приоритет: СРЕДНЯЯ** 🟡
**Реализация:**
```bash
pip install pillow pillow-heif # Для изображений
pip install ffmpeg-python # Для видео
```
```python
# В tasks/thumbnail_tasks.py
from PIL import Image
from PIL.ExifTags import TAGS
import ffmpeg
def extract_image_metadata(file_path: str) -> dict:
"""Извлечь EXIF из изображения"""
try:
with Image.open(file_path) as img:
exif_data = img._getexif()
if not exif_data:
return {}
metadata = {}
for tag_id, value in exif_data.items():
tag = TAGS.get(tag_id, tag_id)
metadata[tag] = value
return {
"width": img.width,
"height": img.height,
"captured_at": metadata.get("DateTimeOriginal"),
"camera_make": metadata.get("Make"),
"camera_model": metadata.get("Model"),
"gps_latitude": metadata.get("GPSLatitude"),
"gps_longitude": metadata.get("GPSLongitude"),
}
except Exception as e:
logger.error(f"EXIF extraction failed: {e}")
return {}
def extract_video_metadata(file_path: str) -> dict:
"""Извлечь метаданные из видео"""
try:
probe = ffmpeg.probe(file_path)
video_stream = next(
s for s in probe['streams'] if s['codec_type'] == 'video'
)
return {
"width": int(video_stream.get('width', 0)),
"height": int(video_stream.get('height', 0)),
"duration_sec": float(probe['format'].get('duration', 0)),
"codec": video_stream.get('codec_name'),
"bitrate": int(probe['format'].get('bit_rate', 0)),
}
except Exception as e:
logger.error(f"Video metadata extraction failed: {e}")
return {}
@celery_app.task
def generate_thumbnail_and_extract_metadata(asset_id: str):
"""Генерировать thumbnail И извлечь metadata"""
# ... existing thumbnail generation ...
# Извлечь metadata
if asset.type == AssetType.PHOTO:
metadata = extract_image_metadata(temp_file_path)
elif asset.type == AssetType.VIDEO:
metadata = extract_video_metadata(temp_file_path)
# Обновить asset
if metadata.get("width"):
asset.width = metadata["width"]
if metadata.get("height"):
asset.height = metadata["height"]
if metadata.get("captured_at"):
asset.captured_at = parse_datetime(metadata["captured_at"])
if metadata.get("duration_sec"):
asset.duration_sec = metadata["duration_sec"]
db.commit()
```
---
### 7.4 Asset Search
**Приоритет: ВЫСОКИЙ** 🔴
**Простая реализация с PostgreSQL full-text search:**
```python
# В asset_repository.py
async def search_assets(
self,
user_id: str,
query: str,
limit: int = 50,
) -> list[Asset]:
"""Поиск assets по имени файла"""
search_query = (
select(Asset)
.where(Asset.user_id == user_id)
.where(Asset.deleted_at.is_(None))
.where(Asset.status == AssetStatus.READY)
.where(Asset.original_filename.ilike(f"%{query}%"))
.order_by(Asset.created_at.desc())
.limit(limit)
)
result = await self.session.execute(search_query)
return list(result.scalars().all())
# Endpoint
@router.get("/assets/search", response_model=AssetListResponse)
async def search_assets(
q: str = Query(..., min_length=1),
current_user: CurrentUser,
session: DatabaseSession,
s3_client: S3ClientDep,
limit: int = Query(50, ge=1, le=100),
):
"""Поиск файлов по имени"""
asset_service = AssetService(session, s3_client)
assets = await asset_service.search_assets(
user_id=current_user.id,
query=q,
limit=limit,
)
return AssetListResponse(
items=assets,
next_cursor=None,
has_more=False,
)
```
---
### 7.5 Database Backup Strategy
**Приоритет: КРИТИЧЕСКИЙ** 🔴
**Для SQLite:**
```bash
#!/bin/bash
# backup_db.sh
BACKUP_DIR="/backups"
DB_PATH="/app/data/app.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/backup_$TIMESTAMP.db"
# Создать backup
sqlite3 $DB_PATH ".backup '$BACKUP_FILE'"
# Сжать
gzip $BACKUP_FILE
# Загрузить в S3
aws s3 cp "${BACKUP_FILE}.gz" "s3://your-backup-bucket/database/"
# Удалить старые локальные backups (старше 7 дней)
find $BACKUP_DIR -name "backup_*.db.gz" -mtime +7 -delete
# Удалить старые S3 backups (старше 30 дней)
aws s3 ls "s3://your-backup-bucket/database/" | \
awk '{print $4}' | \
while read file; do
# ... deletion logic
done
```
**Cron job:**
```cron
# Каждые 6 часов
0 */6 * * * /app/scripts/backup_db.sh >> /var/log/backup.log 2>&1
```
**Для PostgreSQL:**
```bash
#!/bin/bash
# pg_backup.sh
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="backup_$TIMESTAMP.sql.gz"
# pg_dump с сжатием
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > "/tmp/$BACKUP_FILE"
# Загрузить в S3
aws s3 cp "/tmp/$BACKUP_FILE" "s3://your-backup-bucket/database/"
# Cleanup
rm "/tmp/$BACKUP_FILE"
```
---
## 🧪 8. ТЕСТИРОВАНИЕ
### 8.1 Текущее покрытие: <10% ❌
**Найдено только:** `backend/tests/test_security.py`
**Что КРИТИЧЕСКИ нужно:**
1. **Unit тесты для services:**
```python
# tests/unit/services/test_asset_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_create_upload_exceeds_quota():
"""Тест: загрузка превышает квоту"""
user = User(
id="user1",
storage_quota_bytes=1000,
storage_used_bytes=900,
)
asset_service = AssetService(mock_session, mock_s3)
with pytest.raises(HTTPException) as exc:
await asset_service.create_upload(
user_id=user.id,
original_filename="large.jpg",
content_type="image/jpeg",
size_bytes=200, # 900 + 200 > 1000
)
assert exc.value.status_code == 413
assert "quota exceeded" in exc.value.detail.lower()
```
2. **Integration тесты для API:**
```python
# tests/integration/test_auth_api.py
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_login_rate_limiting(client: AsyncClient):
"""Тест: rate limiting блокирует brute force"""
# 5 неудачных попыток
for i in range(5):
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 401
# 6-я попытка должна быть заблокирована
response = await client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrong"
})
assert response.status_code == 429
```
3. **Security тесты:**
```python
# tests/security/test_path_traversal.py
@pytest.mark.asyncio
async def test_filename_sanitization():
"""Тест: path traversal защита"""
malicious_filenames = [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32",
"test\x00.jpg.exe",
"folder/../../../secret.txt",
]
for filename in malicious_filenames:
sanitized = sanitize_filename(filename)
# Проверить, что путь не содержит separators
assert "/" not in sanitized
assert "\\" not in sanitized
assert "\x00" not in sanitized
assert ".." not in sanitized
```
4. **Load тесты:**
```python
# tests/load/test_upload_performance.py
from locust import HttpUser, task, between
class UploadUser(HttpUser):
wait_time = between(1, 3)
@task
def upload_file(self):
# Симулировать загрузку файла
files = {'file': ('test.jpg', b'fake image data', 'image/jpeg')}
self.client.post("/api/v1/uploads/create", files=files)
```
**Цель:** Достичь минимум 70% code coverage
---
## 📊 9. ДОПОЛНИТЕЛЬНЫЕ РЕКОМЕНДАЦИИ
### 9.1 Frontend Type Safety
**Проблема:** Много `any` типов в `frontend/src/services/api.ts`
**Решение:**
```typescript
// Заменить все any на конкретные типы
export interface Folder {
id: string;
user_id: string;
name: string;
parent_folder_id: string | null;
created_at: string;
updated_at: string;
}
export interface FolderListResponse {
items: Folder[];
}
export interface FolderBreadcrumb {
id: string;
name: string;
}
// В api.ts
async listFolders(
parentFolderId?: string | null,
all: boolean = false
): Promise<FolderListResponse> {
// ...
}
async getFolder(folderId: string): Promise<Folder> {
// ...
}
async getFolderBreadcrumbs(folderId: string): Promise<FolderBreadcrumb[]> {
// ...
}
```
---
### 9.2 Logging Strategy
**Добавить structured logging:**
```python
# В config.py
import sys
from loguru import logger
# Настроить loguru
logger.remove() # Удалить дефолтный handler
if settings.APP_ENV == "prod":
# Production: JSON формат для парсинга
logger.add(
sys.stdout,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
level="INFO",
serialize=True, # JSON output
)
else:
# Development: красивый формат
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG",
colorize=True,
)
# Логировать в файл
logger.add(
"logs/app.log",
rotation="100 MB",
retention="30 days",
compression="zip",
level="INFO",
)
# Middleware для логирования всех requests
from starlette.middleware.base import BaseHTTPMiddleware
import time
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
# Логировать request
logger.info(
f"Request started",
extra={
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None,
}
)
try:
response = await call_next(request)
# Логировать response
duration = time.time() - start_time
logger.info(
f"Request completed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration * 1000, 2),
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed",
extra={
"method": request.method,
"path": request.url.path,
"error": str(e),
"duration_ms": round(duration * 1000, 2),
}
)
raise
app.add_middleware(RequestLoggingMiddleware)
```
---
### 9.3 Environment Validation
**Проверять критичные env vars при старте:**
```python
# В main.py
from app.infra.config import settings
def validate_production_config():
"""Проверить конфигурацию для продакшена"""
errors = []
if settings.APP_ENV == "prod":
# Проверить JWT secret
if settings.JWT_SECRET in ["your-secret-key-change-this-in-production", "changeme"]:
errors.append("JWT_SECRET must be changed in production")
# Проверить S3 конфигурацию
if not settings.S3_ENDPOINT_URL or "localhost" in settings.S3_ENDPOINT_URL:
errors.append("S3_ENDPOINT_URL must be set to production S3")
# Проверить database
if "sqlite" in settings.DATABASE_URL.lower():
logger.warning("Using SQLite in production - consider PostgreSQL")
# Проверить CORS
if "*" in settings.CORS_ORIGINS:
errors.append("CORS_ORIGINS should not contain '*' in production")
if errors:
logger.error("Production configuration errors:")
for error in errors:
logger.error(f" - {error}")
raise ValueError("Invalid production configuration")
@app.on_event("startup")
async def startup():
validate_production_config()
logger.info(f"Application started in {settings.APP_ENV} mode")
```
---
## 🎯 10. ROADMAP ПО ПРИОРИТЕТАМ
### ФАЗА 1: Критическая безопасность (1-2 недели) 🔴
**Блокирует продакшен**
1. Implement rate limiting (slowapi)
2. Add token revocation mechanism (Redis blacklist)
3. Implement refresh token rotation
4. Add storage quota management
5. Enable S3 server-side encryption
6. Fix memory exhaustion in file uploads (streaming)
7. Add strong password validation
8. Implement account lockout after failed attempts
9. Reduce error message verbosity
10. Add security headers middleware
**Результат:** Приложение защищено от базовых атак
---
### ФАЗА 2: Core Features (2-3 недели) 🟡
**Необходимо для MVP**
1. Implement soft delete / trash functionality (SPEC REQUIREMENT)
2. Add password reset flow
3. Implement multipart upload для больших файлов
4. Add asset search by filename
5. Extract EXIF metadata (captured_at, dimensions)
6. Add database indexes для производительности
7. Fix ZIP creation (stream instead of memory)
8. Add foreign key constraints
**Результат:** Полнофункциональное MVP
---
### ФАЗА 3: Production Readiness (2-3 недели) 🟢
**DevOps и мониторинг**
1. Comprehensive test suite (70%+ coverage)
2. Database backup automation
3. Monitoring & alerting setup (Prometheus + Grafana)
4. Error tracking (Sentry integration)
5. CI/CD pipeline (GitHub Actions)
6. Environment configuration validation
7. Structured logging (loguru + JSON)
8. Documentation completion
9. Load testing (Locust)
10. Security penetration testing
**Результат:** Готово к продакшену
---
### ФАЗА 4: Enhancements (постоянно) 💚
**Улучшения UX**
1. Video transcoding & adaptive streaming (HLS)
2. Albums feature
3. Tags system
4. Advanced search & filters (date range, type, size)
5. Share analytics & permissions
6. Image optimization (WebP conversion)
7. Duplicate detection (SHA256 deduplication)
8. Two-factor authentication (TOTP)
9. Email verification
10. Shared albums (collaborative)
**Результат:** Feature-rich продукт
---
## 📋 11. SUMMARY: КРИТИЧЕСКИЕ ДЕЙСТВИЯ
### Что сделать ПРЯМО СЕЙЧАС перед продакшеном:
1. **Установить rate limiting** - защита от brute force
2. **Реализовать soft delete** - требование спецификации
3. **Добавить storage quota** - предотвратить злоупотребление
4. **Включить S3 encryption** - защита данных в покое
5. **Стримить файлы** - предотвратить OOM
6. **Добавить тесты** - минимум 50% coverage
7. **Настроить backups** - защита от потери данных
8. **Валидировать env vars** - предотвратить misconfig
### Оценка времени:
- **Минимум для продакшена:** 4 недели (Фаза 1 + критичное из Фазы 2)
- **Полное MVP:** 6-8 недель (Фазы 1-3)
- **Production-grade:** 10-12 недель (все фазы)
---
## 🎓 ЗАКЛЮЧЕНИЕ
Проект **ITCloud** демонстрирует хорошую архитектурную основу и правильные практики в некоторых областях:
**Что сделано хорошо:**
- Clean Architecture с четким разделением слоев
- Безопасное хеширование паролей (Argon2)
- Защита от path traversal
- JWT аутентификация
- Pre-signed URLs для S3
- Async/await паттерны
- Docker setup
**Критические проблемы:**
- Отсутствует rate limiting (BRUTE FORCE VULNERABLE)
- Нет soft delete (SPEC VIOLATION)
- Нет storage quota (ABUSE VULNERABLE)
- Memory exhaustion риски
- Минимальное тестирование
- Нет token revocation
**ИТОГОВАЯ ОЦЕНКА:** 6/10 (хорошая база, но не готов к продакшену)
**РЕКОМЕНДАЦИЯ:** Выполнить Фазу 1 и критичные элементы Фазы 2 перед любым публичным запуском.
---
**Дата аудита:** 2026-01-05
**Версия документа:** 1.0
**Следующий аудит:** После внедрения рекомендаций Фазы 1