refactor: claude v1

This commit is contained in:
itqop 2025-11-05 14:13:51 +03:00
parent 7152f4b61e
commit 8facab266d
16 changed files with 677 additions and 199 deletions

3
.gitignore vendored
View File

@ -58,3 +58,6 @@ Thumbs.db
# Документация
docs/_build/
.claude
nul

314
CLAUDE.md Normal file
View File

@ -0,0 +1,314 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Role and Working Principles
You act as a senior Python / ML / AI / DL developer and system architect.
You work in an enterprise-level project (multi-tier backend) with initial base architecture similar to the REST template from `rest_template.md`.
You design code strictly according to patterns (Singleton, Repository, Interface, DTO, CRUD, Service, Context, Adapter, etc.).
You write clean production-level Python code (FastAPI, SQLAlchemy 2.x, asyncio, Pydantic v2, PostgreSQL, aiohttp, structlog/loguru).
**Working Rules:**
- Do not add comments unless explicitly requested.
- Always add docstrings to functions, classes, and modules.
- Always follow PEP 8 style and architectural layer isolation (api / service / repositories / models / schemas / interfaces / logger / config / context).
- Prefer typing via `from __future__ import annotations`.
- All dependencies are passed through `AppContext` (DI Singleton pattern).
- Implement logging through the logger with context (`logger.info("msg")` without structures).
- When creating projects from scratch, rely on the structure from `rest_template.md`.
- Respond strictly to the point, no fluff, like a senior developer during code review.
- All logic in examples is correct, asynchronous, and production-ready.
- Use only modern library versions.
Your style is minimalistic, precise, clean, and architecturally sound.
## Project Overview
**Dataloader** is an asynchronous FastAPI service for managing and executing long-running ETL tasks via a PostgreSQL-based job queue. The service uses PostgreSQL's `LISTEN/NOTIFY` for efficient worker wakeup, advisory locks for concurrency control, and `SELECT ... FOR UPDATE SKIP LOCKED` for job claiming.
This is a Clean Architecture implementation following the project template `rest_template.md`, built with Python 3.11+, FastAPI, SQLAlchemy 2.0 (async), and asyncpg.
## Development Commands
### Running the Application
```bash
# Install dependencies with Poetry
poetry install
# Run the application
poetry run dataloader
# or
uvicorn dataloader.__main__:main
# The app will start on port 8081 by default (configurable via APP_PORT)
```
### Testing
```bash
# Run all tests
poetry run pytest
# Run specific test file
poetry run pytest tests/integration_tests/v1_api/test_service.py
# Run with verbose output
poetry run pytest -v
# Run integration tests only
poetry run pytest tests/integration_tests/
```
### Database
The database schema is already applied (see `DDL.sql`). The queue uses:
- Table `dl_jobs` - main job queue with statuses: queued, running, succeeded, failed, canceled, lost
- Table `dl_job_events` - audit log of job lifecycle events
- PostgreSQL triggers for `LISTEN/NOTIFY` on job insertion/updates
## Architecture
### High-Level Structure
The codebase follows Clean Architecture with clear separation of concerns:
1. **API Layer** (`src/dataloader/api/`)
- `v1/router.py` - HTTP endpoints for job management
- `v1/service.py` - Business logic layer
- `v1/schemas.py` - Pydantic request/response models
- `os_router.py` - Infrastructure endpoints (`/health`, `/status`) **DO NOT MODIFY**
- `metric_router.py` - Metrics endpoints (BETA) **DO NOT MODIFY**
- `middleware.py` - Request/response logging middleware **DO NOT MODIFY**
2. **Storage Layer** (`src/dataloader/storage/`)
- `repositories.py` - PostgreSQL queue operations using SQLAlchemy ORM
- `db.py` - Database engine and session management
- `notify_listener.py` - PostgreSQL LISTEN/NOTIFY implementation
3. **Worker Layer** (`src/dataloader/workers/`)
- `manager.py` - Manages lifecycle of async worker tasks
- `base.py` - Core worker implementation with claim/heartbeat/execute cycle
- `reaper.py` - Background task to requeue lost jobs (expired leases)
- `pipelines/registry.py` - Pipeline registration and resolution system
- `pipelines/` - Individual pipeline implementations
4. **Logger** (`src/dataloader/logger/`)
- Structured logging with automatic sensitive data masking
- **DO NOT MODIFY** these files - they're from the template
5. **Core** (`src/dataloader/`)
- `__main__.py` - Application entry point
- `config.py` - Pydantic Settings for all configuration
- `context.py` - AppContext singleton for dependency injection
- `base.py` - Base classes and types
- `exceptions.py` - Global exception definitions
### Key Architectural Patterns
#### Job Queue Protocol
Jobs flow through the system via a strict state machine:
1. **Enqueue** (`trigger` API) - Creates job in `queued` status
- Idempotent via `idempotency_key`
- PostgreSQL trigger fires `LISTEN/NOTIFY` to wake workers
2. **Claim** (worker) - Worker acquires job atomically
- Uses `FOR UPDATE SKIP LOCKED` to prevent contention
- Sets status to `running`, increments attempt counter
- Attempts PostgreSQL advisory lock on `lock_key`
- If lock fails → job goes back to `queued` with backoff delay
3. **Execute** (worker) - Runs the pipeline with heartbeat
- Heartbeat updates every `DL_HEARTBEAT_SEC` seconds
- Extends `lease_expires_at` to prevent reaper from reclaiming
- Checks `cancel_requested` flag between pipeline chunks
- Pipeline yields between chunks to allow cooperative cancellation
4. **Complete** (worker) - Finalize job status
- **Success**: `status = succeeded`, release advisory lock
- **Failure**:
- If `attempt < max_attempts``status = queued` (retry with exponential backoff: 30 * attempt seconds)
- If `attempt >= max_attempts``status = failed`
- **Cancel**: `status = canceled`
- Always releases advisory lock
5. **Reaper** (background) - Recovers lost jobs
- Runs every `DL_REAPER_PERIOD_SEC`
- Finds jobs where `status = running` AND `lease_expires_at < now()`
- Resets them to `queued` for retry
#### Concurrency Control
The system uses multiple layers of concurrency control:
- **`lock_key`**: PostgreSQL advisory lock ensures only one worker processes jobs with the same lock_key
- **`partition_key`**: Logical grouping for job ordering (currently informational)
- **`FOR UPDATE SKIP LOCKED`**: Prevents multiple workers from claiming the same job
- **Async workers**: Multiple workers can run concurrently within a single process
#### Worker Configuration
Workers are configured via `WORKERS_JSON` environment variable:
```json
[
{"queue": "load.cbr", "concurrency": 2},
{"queue": "load.sgx", "concurrency": 1}
]
```
This spawns M async tasks (sum of all concurrency values) within the FastAPI process.
#### Pipeline System
Pipelines are registered via decorator in `workers/pipelines/`:
```python
from dataloader.workers.pipelines.registry import register
@register("my.task")
async def my_pipeline(args: dict):
# Process chunk 1
yield # Allow heartbeat & cancellation check
# Process chunk 2
yield
# Process chunk 3
```
The `yield` statements enable:
- Heartbeat updates during long operations
- Cooperative cancellation via `cancel_requested` checks
- Progress tracking
All pipelines must be imported in `workers/pipelines/__init__.py` `load_all()` function.
### Application Lifecycle
1. **Startup** (`lifespan` in `api/__init__.py`)
- Initialize logging
- Create database engine and sessionmaker
- Load all pipelines from registry
- Build WorkerManager from `WORKERS_JSON`
- Start all worker tasks and reaper
2. **Runtime**
- FastAPI serves HTTP requests
- Workers poll queue via LISTEN/NOTIFY
- Reaper runs in background
3. **Shutdown** (on SIGTERM)
- Signal all workers to stop via `asyncio.Event`
- Cancel worker tasks and wait for completion
- Cancel reaper task
- Dispose database engine
## Configuration
All configuration is via environment variables (`.env` file or system environment):
### Application Settings
- `APP_HOST` - Server bind address (default: `0.0.0.0`)
- `APP_PORT` - Server port (default: `8081`)
- `DEBUG` - Debug mode (default: `False`)
- `LOCAL` - Local development flag (default: `False`)
### Database Settings
- `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE`, `PG_SCHEMA` - PostgreSQL connection
- `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` - Connection pool configuration
- `DL_DB_DSN` - Optional override for queue database DSN (if different from main DB)
### Worker Settings
- `WORKERS_JSON` - JSON array of worker configurations (required)
- `DL_HEARTBEAT_SEC` - Heartbeat interval (default: `10`)
- `DL_DEFAULT_LEASE_TTL_SEC` - Default lease duration (default: `60`)
- `DL_REAPER_PERIOD_SEC` - Reaper run interval (default: `10`)
- `DL_CLAIM_BACKOFF_SEC` - Backoff when advisory lock fails (default: `15`)
### Logging Settings
- `LOG_PATH`, `LOG_FILE_NAME` - Application log location
- `METRIC_PATH`, `METRIC_FILE_NAME` - Metrics log location
- `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` - Audit events log location
## API Endpoints
### Business API (v1)
- `POST /api/v1/jobs/trigger` - Create or get existing job (idempotent)
- Body: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?}`
- Response: `{job_id, status}`
- `GET /api/v1/jobs/{job_id}/status` - Get job status
- Response: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress}`
- `POST /api/v1/jobs/{job_id}/cancel` - Request job cancellation (cooperative)
- Response: Same as status endpoint
### Infrastructure API
- `GET /health` - Health check (no database access, <20ms)
- `GET /status` - Service status with version/uptime
## Development Guidelines
### Adding a New Pipeline
1. Create pipeline file in `src/dataloader/workers/pipelines/`:
```python
from dataloader.workers.pipelines.registry import register
@register("myqueue.mytask")
async def my_task_pipeline(args: dict):
# Your implementation
# Use yield between chunks for heartbeat
yield
```
2. Import in `src/dataloader/workers/pipelines/__init__.py`:
```python
def load_all() -> None:
from . import noop
from . import my_task # Add this line
```
3. Add queue to `.env`:
```
WORKERS_JSON=[{"queue":"myqueue","concurrency":1}]
```
### Idempotent Operations
All pipelines should be idempotent since jobs may be retried:
- Use `idempotency_key` for external API calls
- Use `UPSERT` or `INSERT ... ON CONFLICT` for database writes
- Design pipelines to be safely re-runnable from any point
### Security & Data Masking
The logger automatically masks sensitive fields (defined in `logger/utils.py`):
- Keywords: `password`, `token`, `secret`, `key`, `authorization`, etc.
- Never log credentials directly
- Use structured logging: `logger.info("message", extra={...})`
### Error Handling
- Pipelines should raise exceptions for transient errors (will trigger retry)
- Use `max_attempts` in job creation to control retry limits
- Permanent failures should be logged but not raise (mark job as succeeded but log error in events)
### Testing
Integration tests should:
- Use test fixtures from `tests/conftest.py`
- Test full job lifecycle: trigger → claim → execute → complete
- Test failure scenarios: cancellation, retries, lock contention
- Mock external dependencies, use real database for queue operations
## Important Files to Reference
- `TZ.md` - Full technical specification (Russian)
- `TODO.md` - Implementation progress and next steps
- `rest_template.md` - Project structure template
- `DDL.sql` - Database schema

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID, uuid4
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
@ -61,10 +61,3 @@ class CancelJobResponse(BaseModel):
"""
job_id: UUID = Field(...)
status: str = Field(...)
def new_job_id() -> UUID:
"""
Возвращает новый UUID для идентификатора задачи.
"""
return uuid4()

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Optional
from typing import Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
@ -11,12 +11,10 @@ from dataloader.api.v1.schemas import (
JobStatusResponse,
TriggerJobRequest,
TriggerJobResponse,
new_job_id,
)
from dataloader.storage.repositories import (
CreateJobRequest,
QueueRepository,
)
from dataloader.api.v1.utils import new_job_id
from dataloader.storage.schemas import CreateJobRequest
from dataloader.storage.repositories import QueueRepository
from dataloader.logger.logger import get_logger

View File

@ -1,2 +1,14 @@
"""Утилиты для API v1."""
# src/dataloader/api/v1/utils.py
from __future__ import annotations
from uuid import UUID, uuid4
def new_job_id() -> UUID:
"""
Генерирует новый UUID для идентификатора задачи.
Возвращает:
UUID для задачи
"""
return uuid4()

View File

@ -77,7 +77,7 @@ class PGSettings(BaseSettings):
user: str = Field(validation_alias="PG_USER", default="postgres")
password: str = Field(validation_alias="PG_PASSWORD", default="")
database: str = Field(validation_alias="PG_DATABASE", default="postgres")
schema_: str = Field(validation_alias="PG_SCHEMA", default="public")
schema_queue: str = Field(validation_alias="PG_SCHEMA_QUEUE", default="public")
use_pool: bool = Field(validation_alias="PG_USE_POOL", default=True)
pool_size: int = Field(validation_alias="PG_POOL_SIZE", default=5)
max_overflow: int = Field(validation_alias="PG_MAX_OVERFLOW", default=10)
@ -93,16 +93,15 @@ class PGSettings(BaseSettings):
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
class Settings(BaseSettings):
class WorkerSettings(BaseSettings):
"""
Настройки очереди и воркеров.
"""
dl_db_dsn: str = Field(validation_alias="DL_DB_DSN", default="")
workers_json: str = Field(validation_alias="WORKERS_JSON", default="[]")
dl_heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10)
dl_default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60)
dl_reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10)
dl_claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15)
heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10)
default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60)
reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10)
claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15)
def parsed_workers(self) -> list[dict[str, Any]]:
"""
@ -122,20 +121,13 @@ class Secrets:
app: AppSettings = AppSettings()
log: LogSettings = LogSettings()
pg: PGSettings = PGSettings()
dl: Settings = Settings()
@property
def resolved_dsn(self) -> str:
"""
Возвращает DSN для очереди: DL_DB_DSN или URL из PG.
"""
return self.dl.dl_db_dsn or self.pg.url
worker: WorkerSettings = WorkerSettings()
APP_CONFIG = Secrets()
__all__ = [
"Settings",
"WorkerSettings",
"Secrets",
"APP_CONFIG",
]

View File

@ -1,4 +1,6 @@
# src/dataloader/context.py
from __future__ import annotations
from typing import AsyncGenerator
from logging import Logger
@ -9,6 +11,9 @@ from .logger.context_vars import ContextVarsContainer
class AppContext:
"""
Контекст приложения, хранящий глобальные зависимости (Singleton pattern).
"""
def __init__(self) -> None:
self._engine: AsyncEngine | None = None
self._sessionmaker: async_sessionmaker[AsyncSession] | None = None
@ -16,34 +21,68 @@ class AppContext:
@property
def engine(self) -> AsyncEngine:
"""
Возвращает инициализированный движок БД.
Raises:
RuntimeError: Если движок не инициализирован
"""
if self._engine is None:
raise RuntimeError("Database engine is not initialized.")
return self._engine
@property
def sessionmaker(self) -> async_sessionmaker[AsyncSession]:
"""
Возвращает фабрику сессий БД.
Raises:
RuntimeError: Если sessionmaker не инициализирован
"""
if self._sessionmaker is None:
raise RuntimeError("Sessionmaker is not initialized.")
return self._sessionmaker
def startup(self) -> None:
from .storage.db import create_engine, create_sessionmaker
async def on_startup(self) -> None:
"""
Инициализация контекста при старте приложения.
Настраивает логирование, создаёт движок и sessionmaker.
"""
from .logger.logger import setup_logging
from .storage.engine import create_engine, create_sessionmaker
setup_logging()
self._engine = create_engine(APP_CONFIG.pg.url)
self._sessionmaker = create_sessionmaker(self._engine)
async def shutdown(self) -> None:
async def on_shutdown(self) -> None:
"""
Очистка ресурсов при остановке приложения.
Закрывает соединения с БД.
"""
if self._engine:
await self._engine.dispose()
def get_logger(self, name: str | None = None) -> Logger:
"""Returns a configured logger instance."""
"""
Возвращает настроенный логгер.
Параметры:
name: Имя логгера (опционально)
Возвращает:
Экземпляр Logger
"""
from .logger.logger import get_logger as get_app_logger
return get_app_logger(name)
def get_context_vars_container(self) -> ContextVarsContainer:
"""
Возвращает контейнер контекстных переменных для логирования.
Возвращает:
ContextVarsContainer
"""
return self._context_vars_container
@ -52,8 +91,10 @@ APP_CTX = AppContext()
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency to get a database session.
Yields a session from the global sessionmaker and ensures it's closed.
FastAPI dependency для получения сессии БД.
Yields:
AsyncSession для работы с БД
"""
async with APP_CTX.sessionmaker() as session:
yield session

View File

@ -48,7 +48,7 @@ def setup_logging():
logger.remove()
logger.add(
sys.stdout,
level=APP_CONFIG.log.log_level.upper(),
level=APP_CONFIG.log.log_lvl,
filter=regular_log_filter,
colorize=True,
)

View File

@ -3,37 +3,19 @@ from __future__ import annotations
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from dataloader.context import APP_CTX
class Base(DeclarativeBase):
"""
Базовый класс моделей ORM.
"""
pass
def get_engine() -> AsyncEngine:
"""
Возвращает AsyncEngine, инициализированный в контексте приложения.
"""
return APP_CTX.engine
def get_sessionmaker() -> async_sessionmaker[AsyncSession]:
"""
Возвращает фабрику асинхронных сессий.
"""
return APP_CTX.sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
async def session_scope() -> AsyncIterator[AsyncSession]:
"""
Асинхронный контекст жизненного цикла сессии.
Получает sessionmaker из AppContext.
Yields:
AsyncSession для работы с БД
"""
sm = get_sessionmaker()
from dataloader.context import APP_CTX
sm = APP_CTX.sessionmaker
async with sm() as s:
yield s

View File

@ -0,0 +1,58 @@
# src/dataloader/storage/engine.py
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from dataloader.config import APP_CONFIG
def create_engine(dsn: str) -> AsyncEngine:
"""
Создаёт асинхронный движок SQLAlchemy с поддержкой маппинга логических схем.
Параметры:
dsn: Строка подключения к PostgreSQL
Возвращает:
Настроенный AsyncEngine с schema_translate_map
"""
pg = APP_CONFIG.pg
schema_map = {
"queue": pg.schema_queue,
}
return create_async_engine(
dsn,
echo=False,
pool_size=pg.pool_size if pg.use_pool else 0,
max_overflow=pg.max_overflow if pg.use_pool else 0,
pool_recycle=pg.pool_recycle,
pool_pre_ping=True,
connect_args={
"timeout": pg.connect_timeout,
"command_timeout": pg.command_timeout,
},
execution_options={
"schema_translate_map": schema_map,
},
)
def create_sessionmaker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
"""
Создаёт фабрику асинхронных сессий.
Параметры:
engine: AsyncEngine для создания сессий
Возвращает:
Настроенный async_sessionmaker
"""
return async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)

View File

@ -0,0 +1,78 @@
# src/dataloader/storage/models.py
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
from sqlalchemy import BigInteger, DateTime, Text
from sqlalchemy.dialects.postgresql import ENUM, JSONB, UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""
Базовый класс для всех ORM моделей.
"""
pass
dl_status_enum = ENUM(
"queued",
"running",
"succeeded",
"failed",
"canceled",
"lost",
name="dl_status",
create_type=False,
native_enum=True,
)
class DLJob(Base):
"""
Модель таблицы очереди задач dl_jobs.
Использует логическое имя схемы 'queue' для поддержки schema_translate_map.
"""
__tablename__ = "dl_jobs"
__table_args__ = {"schema": "queue"}
job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True)
queue: Mapped[str] = mapped_column(Text, nullable=False)
task: Mapped[str] = mapped_column(Text, nullable=False)
args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True)
lock_key: Mapped[str] = mapped_column(Text, nullable=False)
partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False)
priority: Mapped[int] = mapped_column(nullable=False, default=100)
available_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued")
attempt: Mapped[int] = mapped_column(nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(nullable=False, default=5)
lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60)
lease_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
heartbeat_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False)
progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
error: Mapped[Optional[str]] = mapped_column(Text)
producer: Mapped[Optional[str]] = mapped_column(Text)
consumer_group: Mapped[Optional[str]] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
class DLJobEvent(Base):
"""
Модель таблицы журнала событий dl_job_events.
Использует логическое имя схемы 'queue' для поддержки schema_translate_map.
"""
__tablename__ = "dl_job_events"
__table_args__ = {"schema": "queue"}
event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False)
queue: Mapped[str] = mapped_column(Text, nullable=False)
ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
kind: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB)

View File

@ -5,14 +5,13 @@ import asyncio
import asyncpg
from typing import Callable, Optional
from dataloader.config import APP_CONFIG
class PGNotifyListener:
"""
Прослушиватель PostgreSQL NOTIFY для канала 'dl_jobs'.
"""
def __init__(self, queue: str, callback: Callable[[], None], stop_event: asyncio.Event):
def __init__(self, dsn: str, queue: str, callback: Callable[[], None], stop_event: asyncio.Event):
self._dsn = dsn
self._queue = queue
self._callback = callback
self._stop = stop_event
@ -24,31 +23,23 @@ class PGNotifyListener:
"""
Запускает прослушивание уведомлений.
"""
dsn = APP_CONFIG.resolved_dsn
# Преобразуем SQLAlchemy DSN в asyncpg DSN
dsn = self._dsn
if dsn.startswith("postgresql+asyncpg://"):
dsn = dsn.replace("postgresql+asyncpg://", "postgresql://")
self._conn = await asyncpg.connect(dsn)
def on_notify(connection, pid, channel, payload):
# Проверяем, что это уведомление для нашей очереди
# payload содержит имя очереди из триггера
# Callback вызывается из потока asyncpg, поэтому используем asyncio.ensure_future
if channel == "dl_jobs" and payload == self._queue:
try:
# Event.set() потокобезопасен
self._callback()
except Exception:
# Игнорируем ошибки в callback, чтобы не сломать listener
pass
self._on_notify_handler = on_notify
# Сначала выполняем LISTEN, затем добавляем listener
await self._conn.execute("LISTEN dl_jobs")
await self._conn.add_listener("dl_jobs", self._on_notify_handler)
# Запускаем задачу для мониторинга соединения
self._task = asyncio.create_task(self._monitor_connection())
async def _monitor_connection(self) -> None:
@ -81,4 +72,3 @@ class PGNotifyListener:
except Exception:
pass
self._conn = None

View File

@ -1,114 +1,19 @@
# src/dataloader/storage/repositories.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from sqlalchemy import BigInteger, String, Text, select, func, update, DateTime
from sqlalchemy.dialects.postgresql import JSONB, ENUM, UUID
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from dataloader.storage.db import Base
dl_status_enum = ENUM(
"queued",
"running",
"succeeded",
"failed",
"canceled",
"lost",
name="dl_status",
create_type=False,
native_enum=True,
)
class DLJob(Base):
"""
Модель очереди dl_jobs.
"""
__tablename__ = "dl_jobs"
job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True)
queue: Mapped[str] = mapped_column(Text, nullable=False)
task: Mapped[str] = mapped_column(Text, nullable=False)
args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True)
lock_key: Mapped[str] = mapped_column(Text, nullable=False)
partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False)
priority: Mapped[int] = mapped_column(nullable=False, default=100)
available_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued")
attempt: Mapped[int] = mapped_column(nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(nullable=False, default=5)
lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60)
lease_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
heartbeat_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False)
progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False)
error: Mapped[Optional[str]] = mapped_column(Text)
producer: Mapped[Optional[str]] = mapped_column(Text)
consumer_group: Mapped[Optional[str]] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
class DLJobEvent(Base):
"""
Модель журнала событий dl_job_events.
"""
__tablename__ = "dl_job_events"
event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False)
queue: Mapped[str] = mapped_column(Text, nullable=False)
ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
kind: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB)
@dataclass(frozen=True)
class CreateJobRequest:
"""
Параметры постановки задачи.
"""
job_id: str
queue: str
task: str
args: dict[str, Any]
idempotency_key: Optional[str]
lock_key: str
partition_key: str
priority: int
available_at: datetime
max_attempts: int
lease_ttl_sec: int
producer: Optional[str]
consumer_group: Optional[str]
@dataclass(frozen=True)
class JobStatus:
"""
Снимок статуса задачи.
"""
job_id: str
status: str
attempt: int
started_at: Optional[datetime]
finished_at: Optional[datetime]
heartbeat_at: Optional[datetime]
error: Optional[str]
progress: dict[str, Any]
from dataloader.storage.models import DLJob, DLJobEvent
from dataloader.storage.schemas import CreateJobRequest, JobStatus
class QueueRepository:
"""
Репозиторий очереди и событий с полнотой ORM.
Репозиторий для работы с очередью задач и журналом событий.
"""
def __init__(self, session: AsyncSession):
self.s = session
@ -116,6 +21,12 @@ class QueueRepository:
async def create_or_get(self, req: CreateJobRequest) -> tuple[str, str]:
"""
Идемпотентно создаёт запись в очереди и возвращает (job_id, status).
Параметры:
req: DTO с параметрами задачи
Возвращает:
Кортеж (job_id, status)
"""
async with self.s.begin():
if req.idempotency_key:
@ -157,6 +68,12 @@ class QueueRepository:
async def get_status(self, job_id: str) -> Optional[JobStatus]:
"""
Возвращает статус задачи.
Параметры:
job_id: Идентификатор задачи
Возвращает:
DTO JobStatus или None, если задача не найдена
"""
q = select(
DLJob.job_id,
@ -186,6 +103,12 @@ class QueueRepository:
async def cancel(self, job_id: str) -> bool:
"""
Устанавливает флаг отмены для задачи.
Параметры:
job_id: Идентификатор задачи
Возвращает:
True, если задача найдена и флаг установлен
"""
async with self.s.begin():
job = await self._get(job_id)
@ -198,6 +121,13 @@ class QueueRepository:
async def claim_one(self, queue: str, claim_backoff_sec: int) -> Optional[dict[str, Any]]:
"""
Захватывает одну задачу из очереди с учётом блокировок и выставляет running.
Параметры:
queue: Имя очереди
claim_backoff_sec: Время отката при неудаче с advisory lock
Возвращает:
Словарь с данными задачи или None
"""
async with self.s.begin():
q = (
@ -244,7 +174,13 @@ class QueueRepository:
async def heartbeat(self, job_id: str, ttl_sec: int) -> tuple[bool, bool]:
"""
Обновляет heartbeat и продлевает lease.
Возвращает (success, cancel_requested).
Параметры:
job_id: Идентификатор задачи
ttl_sec: TTL аренды в секундах
Возвращает:
Кортеж (success, cancel_requested)
"""
async with self.s.begin():
job = await self._get(job_id)
@ -265,6 +201,9 @@ class QueueRepository:
async def finish_ok(self, job_id: str) -> None:
"""
Помечает задачу как выполненную успешно и снимает advisory-lock.
Параметры:
job_id: Идентификатор задачи
"""
async with self.s.begin():
job = await self._get(job_id)
@ -279,6 +218,11 @@ class QueueRepository:
async def finish_fail_or_retry(self, job_id: str, err: str, is_canceled: bool = False) -> None:
"""
Помечает задачу как failed, canceled или возвращает в очередь с задержкой.
Параметры:
job_id: Идентификатор задачи
err: Текст ошибки
is_canceled: Флаг отмены
"""
async with self.s.begin():
job = await self._get(job_id)
@ -310,6 +254,12 @@ class QueueRepository:
async def requeue_lost(self, now: Optional[datetime] = None) -> list[str]:
"""
Возвращает протухшие running-задачи в очередь.
Параметры:
now: Текущее время (по умолчанию используется текущий момент UTC)
Возвращает:
Список job_id перепоставленных задач
"""
now = now or datetime.now(timezone.utc)
async with self.s.begin():
@ -335,7 +285,13 @@ class QueueRepository:
async def _get(self, job_id: str) -> Optional[DLJob]:
"""
Возвращает ORM-объект задачи.
Возвращает ORM-объект задачи с блокировкой.
Параметры:
job_id: Идентификатор задачи
Возвращает:
ORM модель DLJob или None
"""
r = await self.s.execute(select(DLJob).where(DLJob.job_id == job_id).with_for_update(skip_locked=True))
return r.scalar_one_or_none()
@ -343,6 +299,12 @@ class QueueRepository:
async def _resolve_queue(self, job_id: str) -> str:
"""
Возвращает имя очереди для события.
Параметры:
job_id: Идентификатор задачи
Возвращает:
Имя очереди
"""
r = await self.s.execute(select(DLJob.queue).where(DLJob.job_id == job_id))
v = r.scalar_one_or_none()
@ -351,6 +313,12 @@ class QueueRepository:
async def _append_event(self, job_id: str, queue: str, kind: str, payload: Optional[dict[str, Any]]) -> None:
"""
Добавляет запись в журнал событий.
Параметры:
job_id: Идентификатор задачи
queue: Имя очереди
kind: Тип события
payload: Дополнительные данные события
"""
ev = DLJobEvent(
job_id=job_id,
@ -364,6 +332,12 @@ class QueueRepository:
async def _try_advisory_lock(self, lock_key: str) -> bool:
"""
Пытается получить advisory-lock в Postgres.
Параметры:
lock_key: Ключ блокировки
Возвращает:
True, если блокировка получена
"""
r = await self.s.execute(select(func.pg_try_advisory_lock(func.hashtext(lock_key))))
return bool(r.scalar())
@ -371,5 +345,8 @@ class QueueRepository:
async def _advisory_unlock(self, lock_key: str) -> None:
"""
Снимает advisory-lock в Postgres.
Параметры:
lock_key: Ключ блокировки
"""
await self.s.execute(select(func.pg_advisory_unlock(func.hashtext(lock_key))))

View File

@ -0,0 +1,41 @@
# src/dataloader/storage/schemas.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
@dataclass(frozen=True)
class CreateJobRequest:
"""
DTO для создания задачи в очереди.
"""
job_id: str
queue: str
task: str
args: dict[str, Any]
idempotency_key: Optional[str]
lock_key: str
partition_key: str
priority: int
available_at: datetime
max_attempts: int
lease_ttl_sec: int
producer: Optional[str]
consumer_group: Optional[str]
@dataclass(frozen=True)
class JobStatus:
"""
DTO для статуса задачи.
"""
job_id: str
status: str
attempt: int
started_at: Optional[datetime]
finished_at: Optional[datetime]
heartbeat_at: Optional[datetime]
error: Optional[str]
progress: dict[str, Any]

View File

@ -7,8 +7,8 @@ from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator, Callable, Optional
from dataloader.config import APP_CONFIG
from dataloader.context import APP_CTX
from dataloader.storage.db import get_sessionmaker
from dataloader.storage.repositories import QueueRepository
from dataloader.storage.notify_listener import PGNotifyListener
from dataloader.workers.pipelines.registry import resolve as resolve_pipeline
@ -32,7 +32,7 @@ class PGWorker:
self._cfg = cfg
self._stop = stop_event
self._log = APP_CTX.get_logger()
self._sm = get_sessionmaker()
self._sm = APP_CTX.sessionmaker
self._notify_wakeup = asyncio.Event()
self._listener: Optional[PGNotifyListener] = None
@ -42,11 +42,11 @@ class PGWorker:
"""
self._log.info(f"worker.start queue={self._cfg.queue}")
# Запускаем LISTEN/NOTIFY
self._listener = PGNotifyListener(
self._cfg.queue,
lambda: self._notify_wakeup.set(),
self._stop
dsn=APP_CONFIG.pg.url,
queue=self._cfg.queue,
callback=lambda: self._notify_wakeup.set(),
stop_event=self._stop
)
try:
await self._listener.start()

View File

@ -5,9 +5,8 @@ import asyncio
import contextlib
from dataclasses import dataclass
from dataloader.context import APP_CTX
from dataloader.config import APP_CONFIG
from dataloader.storage.db import get_sessionmaker
from dataloader.context import APP_CTX
from dataloader.workers.base import PGWorker, WorkerConfig
from dataloader.workers.reaper import requeue_lost
@ -36,8 +35,8 @@ class WorkerManager:
"""
Стартует воркеры и фоновую задачу реапера.
"""
hb = int(APP_CONFIG.dl.dl_heartbeat_sec)
backoff = int(APP_CONFIG.dl.dl_claim_backoff_sec)
hb = int(APP_CONFIG.worker.heartbeat_sec)
backoff = int(APP_CONFIG.worker.claim_backoff_sec)
for spec in self._specs:
for i in range(max(1, spec.concurrency)):
@ -75,8 +74,8 @@ class WorkerManager:
"""
Фоновый цикл возврата потерянных задач в очередь.
"""
period = int(APP_CONFIG.dl.dl_reaper_period_sec)
sm = get_sessionmaker()
period = int(APP_CONFIG.worker.reaper_period_sec)
sm = APP_CTX.sessionmaker
while not self._stop.is_set():
try:
async with sm() as s:
@ -96,7 +95,7 @@ def build_manager_from_env() -> WorkerManager:
Собирает WorkerManager из WORKERS_JSON.
"""
specs: list[WorkerSpec] = []
for item in APP_CONFIG.dl.parsed_workers():
for item in APP_CONFIG.worker.parsed_workers():
q = str(item.get("queue", "")).strip()
c = int(item.get("concurrency", 1))
if q: