From 8facab266d1969bf8d5e19e8542404a649c18984 Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 14:13:51 +0300 Subject: [PATCH] refactor: claude v1 --- .gitignore | 3 + CLAUDE.md | 314 ++++++++++++++++++++++ src/dataloader/api/v1/schemas.py | 9 +- src/dataloader/api/v1/service.py | 10 +- src/dataloader/api/v1/utils.py | 14 +- src/dataloader/config.py | 24 +- src/dataloader/context.py | 55 +++- src/dataloader/logger/logger.py | 2 +- src/dataloader/storage/db.py | 34 +-- src/dataloader/storage/engine.py | 58 ++++ src/dataloader/storage/models.py | 78 ++++++ src/dataloader/storage/notify_listener.py | 24 +- src/dataloader/storage/repositories.py | 183 ++++++------- src/dataloader/storage/schemas.py | 41 +++ src/dataloader/workers/base.py | 14 +- src/dataloader/workers/manager.py | 13 +- 16 files changed, 677 insertions(+), 199 deletions(-) create mode 100644 CLAUDE.md create mode 100644 src/dataloader/storage/engine.py create mode 100644 src/dataloader/storage/models.py create mode 100644 src/dataloader/storage/schemas.py diff --git a/.gitignore b/.gitignore index 91c1e9f..01b5e2f 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,6 @@ Thumbs.db # Документация docs/_build/ + +.claude +nul \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..8257569 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,314 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Role and Working Principles + +You act as a senior Python / ML / AI / DL developer and system architect. +You work in an enterprise-level project (multi-tier backend) with initial base architecture similar to the REST template from `rest_template.md`. +You design code strictly according to patterns (Singleton, Repository, Interface, DTO, CRUD, Service, Context, Adapter, etc.). +You write clean production-level Python code (FastAPI, SQLAlchemy 2.x, asyncio, Pydantic v2, PostgreSQL, aiohttp, structlog/loguru). + +**Working Rules:** +- Do not add comments unless explicitly requested. +- Always add docstrings to functions, classes, and modules. +- Always follow PEP 8 style and architectural layer isolation (api / service / repositories / models / schemas / interfaces / logger / config / context). +- Prefer typing via `from __future__ import annotations`. +- All dependencies are passed through `AppContext` (DI Singleton pattern). +- Implement logging through the logger with context (`logger.info("msg")` without structures). +- When creating projects from scratch, rely on the structure from `rest_template.md`. +- Respond strictly to the point, no fluff, like a senior developer during code review. +- All logic in examples is correct, asynchronous, and production-ready. +- Use only modern library versions. + +Your style is minimalistic, precise, clean, and architecturally sound. + +## Project Overview + +**Dataloader** is an asynchronous FastAPI service for managing and executing long-running ETL tasks via a PostgreSQL-based job queue. The service uses PostgreSQL's `LISTEN/NOTIFY` for efficient worker wakeup, advisory locks for concurrency control, and `SELECT ... FOR UPDATE SKIP LOCKED` for job claiming. + +This is a Clean Architecture implementation following the project template `rest_template.md`, built with Python 3.11+, FastAPI, SQLAlchemy 2.0 (async), and asyncpg. + +## Development Commands + +### Running the Application + +```bash +# Install dependencies with Poetry +poetry install + +# Run the application +poetry run dataloader +# or +uvicorn dataloader.__main__:main + +# The app will start on port 8081 by default (configurable via APP_PORT) +``` + +### Testing + +```bash +# Run all tests +poetry run pytest + +# Run specific test file +poetry run pytest tests/integration_tests/v1_api/test_service.py + +# Run with verbose output +poetry run pytest -v + +# Run integration tests only +poetry run pytest tests/integration_tests/ +``` + +### Database + +The database schema is already applied (see `DDL.sql`). The queue uses: +- Table `dl_jobs` - main job queue with statuses: queued, running, succeeded, failed, canceled, lost +- Table `dl_job_events` - audit log of job lifecycle events +- PostgreSQL triggers for `LISTEN/NOTIFY` on job insertion/updates + +## Architecture + +### High-Level Structure + +The codebase follows Clean Architecture with clear separation of concerns: + +1. **API Layer** (`src/dataloader/api/`) + - `v1/router.py` - HTTP endpoints for job management + - `v1/service.py` - Business logic layer + - `v1/schemas.py` - Pydantic request/response models + - `os_router.py` - Infrastructure endpoints (`/health`, `/status`) **DO NOT MODIFY** + - `metric_router.py` - Metrics endpoints (BETA) **DO NOT MODIFY** + - `middleware.py` - Request/response logging middleware **DO NOT MODIFY** + +2. **Storage Layer** (`src/dataloader/storage/`) + - `repositories.py` - PostgreSQL queue operations using SQLAlchemy ORM + - `db.py` - Database engine and session management + - `notify_listener.py` - PostgreSQL LISTEN/NOTIFY implementation + +3. **Worker Layer** (`src/dataloader/workers/`) + - `manager.py` - Manages lifecycle of async worker tasks + - `base.py` - Core worker implementation with claim/heartbeat/execute cycle + - `reaper.py` - Background task to requeue lost jobs (expired leases) + - `pipelines/registry.py` - Pipeline registration and resolution system + - `pipelines/` - Individual pipeline implementations + +4. **Logger** (`src/dataloader/logger/`) + - Structured logging with automatic sensitive data masking + - **DO NOT MODIFY** these files - they're from the template + +5. **Core** (`src/dataloader/`) + - `__main__.py` - Application entry point + - `config.py` - Pydantic Settings for all configuration + - `context.py` - AppContext singleton for dependency injection + - `base.py` - Base classes and types + - `exceptions.py` - Global exception definitions + +### Key Architectural Patterns + +#### Job Queue Protocol + +Jobs flow through the system via a strict state machine: + +1. **Enqueue** (`trigger` API) - Creates job in `queued` status + - Idempotent via `idempotency_key` + - PostgreSQL trigger fires `LISTEN/NOTIFY` to wake workers + +2. **Claim** (worker) - Worker acquires job atomically + - Uses `FOR UPDATE SKIP LOCKED` to prevent contention + - Sets status to `running`, increments attempt counter + - Attempts PostgreSQL advisory lock on `lock_key` + - If lock fails → job goes back to `queued` with backoff delay + +3. **Execute** (worker) - Runs the pipeline with heartbeat + - Heartbeat updates every `DL_HEARTBEAT_SEC` seconds + - Extends `lease_expires_at` to prevent reaper from reclaiming + - Checks `cancel_requested` flag between pipeline chunks + - Pipeline yields between chunks to allow cooperative cancellation + +4. **Complete** (worker) - Finalize job status + - **Success**: `status = succeeded`, release advisory lock + - **Failure**: + - If `attempt < max_attempts` → `status = queued` (retry with exponential backoff: 30 * attempt seconds) + - If `attempt >= max_attempts` → `status = failed` + - **Cancel**: `status = canceled` + - Always releases advisory lock + +5. **Reaper** (background) - Recovers lost jobs + - Runs every `DL_REAPER_PERIOD_SEC` + - Finds jobs where `status = running` AND `lease_expires_at < now()` + - Resets them to `queued` for retry + +#### Concurrency Control + +The system uses multiple layers of concurrency control: + +- **`lock_key`**: PostgreSQL advisory lock ensures only one worker processes jobs with the same lock_key +- **`partition_key`**: Logical grouping for job ordering (currently informational) +- **`FOR UPDATE SKIP LOCKED`**: Prevents multiple workers from claiming the same job +- **Async workers**: Multiple workers can run concurrently within a single process + +#### Worker Configuration + +Workers are configured via `WORKERS_JSON` environment variable: + +```json +[ + {"queue": "load.cbr", "concurrency": 2}, + {"queue": "load.sgx", "concurrency": 1} +] +``` + +This spawns M async tasks (sum of all concurrency values) within the FastAPI process. + +#### Pipeline System + +Pipelines are registered via decorator in `workers/pipelines/`: + +```python +from dataloader.workers.pipelines.registry import register + +@register("my.task") +async def my_pipeline(args: dict): + # Process chunk 1 + yield # Allow heartbeat & cancellation check + # Process chunk 2 + yield + # Process chunk 3 +``` + +The `yield` statements enable: +- Heartbeat updates during long operations +- Cooperative cancellation via `cancel_requested` checks +- Progress tracking + +All pipelines must be imported in `workers/pipelines/__init__.py` `load_all()` function. + +### Application Lifecycle + +1. **Startup** (`lifespan` in `api/__init__.py`) + - Initialize logging + - Create database engine and sessionmaker + - Load all pipelines from registry + - Build WorkerManager from `WORKERS_JSON` + - Start all worker tasks and reaper + +2. **Runtime** + - FastAPI serves HTTP requests + - Workers poll queue via LISTEN/NOTIFY + - Reaper runs in background + +3. **Shutdown** (on SIGTERM) + - Signal all workers to stop via `asyncio.Event` + - Cancel worker tasks and wait for completion + - Cancel reaper task + - Dispose database engine + +## Configuration + +All configuration is via environment variables (`.env` file or system environment): + +### Application Settings +- `APP_HOST` - Server bind address (default: `0.0.0.0`) +- `APP_PORT` - Server port (default: `8081`) +- `DEBUG` - Debug mode (default: `False`) +- `LOCAL` - Local development flag (default: `False`) + +### Database Settings +- `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE`, `PG_SCHEMA` - PostgreSQL connection +- `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` - Connection pool configuration +- `DL_DB_DSN` - Optional override for queue database DSN (if different from main DB) + +### Worker Settings +- `WORKERS_JSON` - JSON array of worker configurations (required) +- `DL_HEARTBEAT_SEC` - Heartbeat interval (default: `10`) +- `DL_DEFAULT_LEASE_TTL_SEC` - Default lease duration (default: `60`) +- `DL_REAPER_PERIOD_SEC` - Reaper run interval (default: `10`) +- `DL_CLAIM_BACKOFF_SEC` - Backoff when advisory lock fails (default: `15`) + +### Logging Settings +- `LOG_PATH`, `LOG_FILE_NAME` - Application log location +- `METRIC_PATH`, `METRIC_FILE_NAME` - Metrics log location +- `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` - Audit events log location + +## API Endpoints + +### Business API (v1) + +- `POST /api/v1/jobs/trigger` - Create or get existing job (idempotent) + - Body: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?}` + - Response: `{job_id, status}` + +- `GET /api/v1/jobs/{job_id}/status` - Get job status + - Response: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress}` + +- `POST /api/v1/jobs/{job_id}/cancel` - Request job cancellation (cooperative) + - Response: Same as status endpoint + +### Infrastructure API + +- `GET /health` - Health check (no database access, <20ms) +- `GET /status` - Service status with version/uptime + +## Development Guidelines + +### Adding a New Pipeline + +1. Create pipeline file in `src/dataloader/workers/pipelines/`: + ```python + from dataloader.workers.pipelines.registry import register + + @register("myqueue.mytask") + async def my_task_pipeline(args: dict): + # Your implementation + # Use yield between chunks for heartbeat + yield + ``` + +2. Import in `src/dataloader/workers/pipelines/__init__.py`: + ```python + def load_all() -> None: + from . import noop + from . import my_task # Add this line + ``` + +3. Add queue to `.env`: + ``` + WORKERS_JSON=[{"queue":"myqueue","concurrency":1}] + ``` + +### Idempotent Operations + +All pipelines should be idempotent since jobs may be retried: +- Use `idempotency_key` for external API calls +- Use `UPSERT` or `INSERT ... ON CONFLICT` for database writes +- Design pipelines to be safely re-runnable from any point + +### Security & Data Masking + +The logger automatically masks sensitive fields (defined in `logger/utils.py`): +- Keywords: `password`, `token`, `secret`, `key`, `authorization`, etc. +- Never log credentials directly +- Use structured logging: `logger.info("message", extra={...})` + +### Error Handling + +- Pipelines should raise exceptions for transient errors (will trigger retry) +- Use `max_attempts` in job creation to control retry limits +- Permanent failures should be logged but not raise (mark job as succeeded but log error in events) + +### Testing + +Integration tests should: +- Use test fixtures from `tests/conftest.py` +- Test full job lifecycle: trigger → claim → execute → complete +- Test failure scenarios: cancellation, retries, lock contention +- Mock external dependencies, use real database for queue operations + +## Important Files to Reference + +- `TZ.md` - Full technical specification (Russian) +- `TODO.md` - Implementation progress and next steps +- `rest_template.md` - Project structure template +- `DDL.sql` - Database schema diff --git a/src/dataloader/api/v1/schemas.py b/src/dataloader/api/v1/schemas.py index 0643eeb..3ec39f0 100644 --- a/src/dataloader/api/v1/schemas.py +++ b/src/dataloader/api/v1/schemas.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timezone from typing import Any, Optional -from uuid import UUID, uuid4 +from uuid import UUID from pydantic import BaseModel, Field, field_validator @@ -61,10 +61,3 @@ class CancelJobResponse(BaseModel): """ job_id: UUID = Field(...) status: str = Field(...) - - -def new_job_id() -> UUID: - """ - Возвращает новый UUID для идентификатора задачи. - """ - return uuid4() diff --git a/src/dataloader/api/v1/service.py b/src/dataloader/api/v1/service.py index 622e288..3151653 100644 --- a/src/dataloader/api/v1/service.py +++ b/src/dataloader/api/v1/service.py @@ -2,7 +2,7 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import Any, Optional +from typing import Optional from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession @@ -11,12 +11,10 @@ from dataloader.api.v1.schemas import ( JobStatusResponse, TriggerJobRequest, TriggerJobResponse, - new_job_id, -) -from dataloader.storage.repositories import ( - CreateJobRequest, - QueueRepository, ) +from dataloader.api.v1.utils import new_job_id +from dataloader.storage.schemas import CreateJobRequest +from dataloader.storage.repositories import QueueRepository from dataloader.logger.logger import get_logger diff --git a/src/dataloader/api/v1/utils.py b/src/dataloader/api/v1/utils.py index af722af..8c6731d 100644 --- a/src/dataloader/api/v1/utils.py +++ b/src/dataloader/api/v1/utils.py @@ -1,2 +1,14 @@ -"""Утилиты для API v1.""" +# src/dataloader/api/v1/utils.py +from __future__ import annotations +from uuid import UUID, uuid4 + + +def new_job_id() -> UUID: + """ + Генерирует новый UUID для идентификатора задачи. + + Возвращает: + UUID для задачи + """ + return uuid4() diff --git a/src/dataloader/config.py b/src/dataloader/config.py index 85db588..3a38892 100644 --- a/src/dataloader/config.py +++ b/src/dataloader/config.py @@ -77,7 +77,7 @@ class PGSettings(BaseSettings): user: str = Field(validation_alias="PG_USER", default="postgres") password: str = Field(validation_alias="PG_PASSWORD", default="") database: str = Field(validation_alias="PG_DATABASE", default="postgres") - schema_: str = Field(validation_alias="PG_SCHEMA", default="public") + schema_queue: str = Field(validation_alias="PG_SCHEMA_QUEUE", default="public") use_pool: bool = Field(validation_alias="PG_USE_POOL", default=True) pool_size: int = Field(validation_alias="PG_POOL_SIZE", default=5) max_overflow: int = Field(validation_alias="PG_MAX_OVERFLOW", default=10) @@ -93,16 +93,15 @@ class PGSettings(BaseSettings): return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" -class Settings(BaseSettings): +class WorkerSettings(BaseSettings): """ Настройки очереди и воркеров. """ - dl_db_dsn: str = Field(validation_alias="DL_DB_DSN", default="") workers_json: str = Field(validation_alias="WORKERS_JSON", default="[]") - dl_heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10) - dl_default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60) - dl_reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10) - dl_claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15) + heartbeat_sec: int = Field(validation_alias="DL_HEARTBEAT_SEC", default=10) + default_lease_ttl_sec: int = Field(validation_alias="DL_DEFAULT_LEASE_TTL_SEC", default=60) + reaper_period_sec: int = Field(validation_alias="DL_REAPER_PERIOD_SEC", default=10) + claim_backoff_sec: int = Field(validation_alias="DL_CLAIM_BACKOFF_SEC", default=15) def parsed_workers(self) -> list[dict[str, Any]]: """ @@ -122,20 +121,13 @@ class Secrets: app: AppSettings = AppSettings() log: LogSettings = LogSettings() pg: PGSettings = PGSettings() - dl: Settings = Settings() - - @property - def resolved_dsn(self) -> str: - """ - Возвращает DSN для очереди: DL_DB_DSN или URL из PG. - """ - return self.dl.dl_db_dsn or self.pg.url + worker: WorkerSettings = WorkerSettings() APP_CONFIG = Secrets() __all__ = [ - "Settings", + "WorkerSettings", "Secrets", "APP_CONFIG", ] diff --git a/src/dataloader/context.py b/src/dataloader/context.py index 8d05762..ac450ec 100644 --- a/src/dataloader/context.py +++ b/src/dataloader/context.py @@ -1,4 +1,6 @@ # src/dataloader/context.py +from __future__ import annotations + from typing import AsyncGenerator from logging import Logger @@ -9,6 +11,9 @@ from .logger.context_vars import ContextVarsContainer class AppContext: + """ + Контекст приложения, хранящий глобальные зависимости (Singleton pattern). + """ def __init__(self) -> None: self._engine: AsyncEngine | None = None self._sessionmaker: async_sessionmaker[AsyncSession] | None = None @@ -16,34 +21,68 @@ class AppContext: @property def engine(self) -> AsyncEngine: + """ + Возвращает инициализированный движок БД. + + Raises: + RuntimeError: Если движок не инициализирован + """ if self._engine is None: raise RuntimeError("Database engine is not initialized.") return self._engine @property def sessionmaker(self) -> async_sessionmaker[AsyncSession]: + """ + Возвращает фабрику сессий БД. + + Raises: + RuntimeError: Если sessionmaker не инициализирован + """ if self._sessionmaker is None: raise RuntimeError("Sessionmaker is not initialized.") return self._sessionmaker - def startup(self) -> None: - from .storage.db import create_engine, create_sessionmaker + async def on_startup(self) -> None: + """ + Инициализация контекста при старте приложения. + Настраивает логирование, создаёт движок и sessionmaker. + """ from .logger.logger import setup_logging - + from .storage.engine import create_engine, create_sessionmaker + setup_logging() self._engine = create_engine(APP_CONFIG.pg.url) self._sessionmaker = create_sessionmaker(self._engine) - async def shutdown(self) -> None: + async def on_shutdown(self) -> None: + """ + Очистка ресурсов при остановке приложения. + Закрывает соединения с БД. + """ if self._engine: await self._engine.dispose() def get_logger(self, name: str | None = None) -> Logger: - """Returns a configured logger instance.""" + """ + Возвращает настроенный логгер. + + Параметры: + name: Имя логгера (опционально) + + Возвращает: + Экземпляр Logger + """ from .logger.logger import get_logger as get_app_logger return get_app_logger(name) def get_context_vars_container(self) -> ContextVarsContainer: + """ + Возвращает контейнер контекстных переменных для логирования. + + Возвращает: + ContextVarsContainer + """ return self._context_vars_container @@ -52,8 +91,10 @@ APP_CTX = AppContext() async def get_session() -> AsyncGenerator[AsyncSession, None]: """ - FastAPI dependency to get a database session. - Yields a session from the global sessionmaker and ensures it's closed. + FastAPI dependency для получения сессии БД. + + Yields: + AsyncSession для работы с БД """ async with APP_CTX.sessionmaker() as session: yield session diff --git a/src/dataloader/logger/logger.py b/src/dataloader/logger/logger.py index b8c06f5..3c94ca7 100644 --- a/src/dataloader/logger/logger.py +++ b/src/dataloader/logger/logger.py @@ -48,7 +48,7 @@ def setup_logging(): logger.remove() logger.add( sys.stdout, - level=APP_CONFIG.log.log_level.upper(), + level=APP_CONFIG.log.log_lvl, filter=regular_log_filter, colorize=True, ) diff --git a/src/dataloader/storage/db.py b/src/dataloader/storage/db.py index 60ed027..19d6140 100644 --- a/src/dataloader/storage/db.py +++ b/src/dataloader/storage/db.py @@ -3,37 +3,19 @@ from __future__ import annotations from typing import AsyncIterator -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker -from sqlalchemy.orm import DeclarativeBase - -from dataloader.context import APP_CTX - - -class Base(DeclarativeBase): - """ - Базовый класс моделей ORM. - """ - pass - - -def get_engine() -> AsyncEngine: - """ - Возвращает AsyncEngine, инициализированный в контексте приложения. - """ - return APP_CTX.engine - - -def get_sessionmaker() -> async_sessionmaker[AsyncSession]: - """ - Возвращает фабрику асинхронных сессий. - """ - return APP_CTX.sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession async def session_scope() -> AsyncIterator[AsyncSession]: """ Асинхронный контекст жизненного цикла сессии. + Получает sessionmaker из AppContext. + + Yields: + AsyncSession для работы с БД """ - sm = get_sessionmaker() + from dataloader.context import APP_CTX + + sm = APP_CTX.sessionmaker async with sm() as s: yield s diff --git a/src/dataloader/storage/engine.py b/src/dataloader/storage/engine.py new file mode 100644 index 0000000..35912c5 --- /dev/null +++ b/src/dataloader/storage/engine.py @@ -0,0 +1,58 @@ +# src/dataloader/storage/engine.py +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from dataloader.config import APP_CONFIG + + +def create_engine(dsn: str) -> AsyncEngine: + """ + Создаёт асинхронный движок SQLAlchemy с поддержкой маппинга логических схем. + + Параметры: + dsn: Строка подключения к PostgreSQL + + Возвращает: + Настроенный AsyncEngine с schema_translate_map + """ + pg = APP_CONFIG.pg + + schema_map = { + "queue": pg.schema_queue, + } + + return create_async_engine( + dsn, + echo=False, + pool_size=pg.pool_size if pg.use_pool else 0, + max_overflow=pg.max_overflow if pg.use_pool else 0, + pool_recycle=pg.pool_recycle, + pool_pre_ping=True, + connect_args={ + "timeout": pg.connect_timeout, + "command_timeout": pg.command_timeout, + }, + execution_options={ + "schema_translate_map": schema_map, + }, + ) + + +def create_sessionmaker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: + """ + Создаёт фабрику асинхронных сессий. + + Параметры: + engine: AsyncEngine для создания сессий + + Возвращает: + Настроенный async_sessionmaker + """ + return async_sessionmaker( + bind=engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, + autocommit=False, + ) diff --git a/src/dataloader/storage/models.py b/src/dataloader/storage/models.py new file mode 100644 index 0000000..9097f71 --- /dev/null +++ b/src/dataloader/storage/models.py @@ -0,0 +1,78 @@ +# src/dataloader/storage/models.py +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional + +from sqlalchemy import BigInteger, DateTime, Text +from sqlalchemy.dialects.postgresql import ENUM, JSONB, UUID +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class Base(DeclarativeBase): + """ + Базовый класс для всех ORM моделей. + """ + pass + + +dl_status_enum = ENUM( + "queued", + "running", + "succeeded", + "failed", + "canceled", + "lost", + name="dl_status", + create_type=False, + native_enum=True, +) + + +class DLJob(Base): + """ + Модель таблицы очереди задач dl_jobs. + Использует логическое имя схемы 'queue' для поддержки schema_translate_map. + """ + __tablename__ = "dl_jobs" + __table_args__ = {"schema": "queue"} + + job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True) + queue: Mapped[str] = mapped_column(Text, nullable=False) + task: Mapped[str] = mapped_column(Text, nullable=False) + args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) + idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True) + lock_key: Mapped[str] = mapped_column(Text, nullable=False) + partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False) + priority: Mapped[int] = mapped_column(nullable=False, default=100) + available_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued") + attempt: Mapped[int] = mapped_column(nullable=False, default=0) + max_attempts: Mapped[int] = mapped_column(nullable=False, default=5) + lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60) + lease_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + heartbeat_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False) + progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) + error: Mapped[Optional[str]] = mapped_column(Text) + producer: Mapped[Optional[str]] = mapped_column(Text) + consumer_group: Mapped[Optional[str]] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + + +class DLJobEvent(Base): + """ + Модель таблицы журнала событий dl_job_events. + Использует логическое имя схемы 'queue' для поддержки schema_translate_map. + """ + __tablename__ = "dl_job_events" + __table_args__ = {"schema": "queue"} + + event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False) + queue: Mapped[str] = mapped_column(Text, nullable=False) + ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + kind: Mapped[str] = mapped_column(Text, nullable=False) + payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB) diff --git a/src/dataloader/storage/notify_listener.py b/src/dataloader/storage/notify_listener.py index dcb6177..248eec9 100644 --- a/src/dataloader/storage/notify_listener.py +++ b/src/dataloader/storage/notify_listener.py @@ -5,14 +5,13 @@ import asyncio import asyncpg from typing import Callable, Optional -from dataloader.config import APP_CONFIG - class PGNotifyListener: """ Прослушиватель PostgreSQL NOTIFY для канала 'dl_jobs'. """ - def __init__(self, queue: str, callback: Callable[[], None], stop_event: asyncio.Event): + def __init__(self, dsn: str, queue: str, callback: Callable[[], None], stop_event: asyncio.Event): + self._dsn = dsn self._queue = queue self._callback = callback self._stop = stop_event @@ -24,31 +23,23 @@ class PGNotifyListener: """ Запускает прослушивание уведомлений. """ - dsn = APP_CONFIG.resolved_dsn - # Преобразуем SQLAlchemy DSN в asyncpg DSN + dsn = self._dsn if dsn.startswith("postgresql+asyncpg://"): dsn = dsn.replace("postgresql+asyncpg://", "postgresql://") - + self._conn = await asyncpg.connect(dsn) - + def on_notify(connection, pid, channel, payload): - # Проверяем, что это уведомление для нашей очереди - # payload содержит имя очереди из триггера - # Callback вызывается из потока asyncpg, поэтому используем asyncio.ensure_future if channel == "dl_jobs" and payload == self._queue: try: - # Event.set() потокобезопасен self._callback() except Exception: - # Игнорируем ошибки в callback, чтобы не сломать listener pass self._on_notify_handler = on_notify - # Сначала выполняем LISTEN, затем добавляем listener await self._conn.execute("LISTEN dl_jobs") await self._conn.add_listener("dl_jobs", self._on_notify_handler) - - # Запускаем задачу для мониторинга соединения + self._task = asyncio.create_task(self._monitor_connection()) async def _monitor_connection(self) -> None: @@ -70,7 +61,7 @@ class PGNotifyListener: await self._task except asyncio.CancelledError: pass - + if self._conn and self._on_notify_handler: try: await self._conn.remove_listener("dl_jobs", self._on_notify_handler) @@ -81,4 +72,3 @@ class PGNotifyListener: except Exception: pass self._conn = None - diff --git a/src/dataloader/storage/repositories.py b/src/dataloader/storage/repositories.py index aa66fb5..9d448a7 100644 --- a/src/dataloader/storage/repositories.py +++ b/src/dataloader/storage/repositories.py @@ -1,114 +1,19 @@ # src/dataloader/storage/repositories.py from __future__ import annotations -from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Optional -from sqlalchemy import BigInteger, String, Text, select, func, update, DateTime -from sqlalchemy.dialects.postgresql import JSONB, ENUM, UUID +from sqlalchemy import func, select, update from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import Mapped, mapped_column -from dataloader.storage.db import Base - - -dl_status_enum = ENUM( - "queued", - "running", - "succeeded", - "failed", - "canceled", - "lost", - name="dl_status", - create_type=False, - native_enum=True, -) - - -class DLJob(Base): - """ - Модель очереди dl_jobs. - """ - __tablename__ = "dl_jobs" - - job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True) - queue: Mapped[str] = mapped_column(Text, nullable=False) - task: Mapped[str] = mapped_column(Text, nullable=False) - args: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) - idempotency_key: Mapped[Optional[str]] = mapped_column(Text, unique=True) - lock_key: Mapped[str] = mapped_column(Text, nullable=False) - partition_key: Mapped[str] = mapped_column(Text, default="", nullable=False) - priority: Mapped[int] = mapped_column(nullable=False, default=100) - available_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - status: Mapped[str] = mapped_column(dl_status_enum, nullable=False, default="queued") - attempt: Mapped[int] = mapped_column(nullable=False, default=0) - max_attempts: Mapped[int] = mapped_column(nullable=False, default=5) - lease_ttl_sec: Mapped[int] = mapped_column(nullable=False, default=60) - lease_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - heartbeat_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - cancel_requested: Mapped[bool] = mapped_column(nullable=False, default=False) - progress: Mapped[dict[str, Any]] = mapped_column(JSONB, default=dict, nullable=False) - error: Mapped[Optional[str]] = mapped_column(Text) - producer: Mapped[Optional[str]] = mapped_column(Text) - consumer_group: Mapped[Optional[str]] = mapped_column(Text) - created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - - -class DLJobEvent(Base): - """ - Модель журнала событий dl_job_events. - """ - __tablename__ = "dl_job_events" - - event_id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) - job_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False) - queue: Mapped[str] = mapped_column(Text, nullable=False) - ts: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - kind: Mapped[str] = mapped_column(Text, nullable=False) - payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB) - - -@dataclass(frozen=True) -class CreateJobRequest: - """ - Параметры постановки задачи. - """ - job_id: str - queue: str - task: str - args: dict[str, Any] - idempotency_key: Optional[str] - lock_key: str - partition_key: str - priority: int - available_at: datetime - max_attempts: int - lease_ttl_sec: int - producer: Optional[str] - consumer_group: Optional[str] - - -@dataclass(frozen=True) -class JobStatus: - """ - Снимок статуса задачи. - """ - job_id: str - status: str - attempt: int - started_at: Optional[datetime] - finished_at: Optional[datetime] - heartbeat_at: Optional[datetime] - error: Optional[str] - progress: dict[str, Any] +from dataloader.storage.models import DLJob, DLJobEvent +from dataloader.storage.schemas import CreateJobRequest, JobStatus class QueueRepository: """ - Репозиторий очереди и событий с полнотой ORM. + Репозиторий для работы с очередью задач и журналом событий. """ def __init__(self, session: AsyncSession): self.s = session @@ -116,6 +21,12 @@ class QueueRepository: async def create_or_get(self, req: CreateJobRequest) -> tuple[str, str]: """ Идемпотентно создаёт запись в очереди и возвращает (job_id, status). + + Параметры: + req: DTO с параметрами задачи + + Возвращает: + Кортеж (job_id, status) """ async with self.s.begin(): if req.idempotency_key: @@ -157,6 +68,12 @@ class QueueRepository: async def get_status(self, job_id: str) -> Optional[JobStatus]: """ Возвращает статус задачи. + + Параметры: + job_id: Идентификатор задачи + + Возвращает: + DTO JobStatus или None, если задача не найдена """ q = select( DLJob.job_id, @@ -186,6 +103,12 @@ class QueueRepository: async def cancel(self, job_id: str) -> bool: """ Устанавливает флаг отмены для задачи. + + Параметры: + job_id: Идентификатор задачи + + Возвращает: + True, если задача найдена и флаг установлен """ async with self.s.begin(): job = await self._get(job_id) @@ -198,6 +121,13 @@ class QueueRepository: async def claim_one(self, queue: str, claim_backoff_sec: int) -> Optional[dict[str, Any]]: """ Захватывает одну задачу из очереди с учётом блокировок и выставляет running. + + Параметры: + queue: Имя очереди + claim_backoff_sec: Время отката при неудаче с advisory lock + + Возвращает: + Словарь с данными задачи или None """ async with self.s.begin(): q = ( @@ -244,13 +174,19 @@ class QueueRepository: async def heartbeat(self, job_id: str, ttl_sec: int) -> tuple[bool, bool]: """ Обновляет heartbeat и продлевает lease. - Возвращает (success, cancel_requested). + + Параметры: + job_id: Идентификатор задачи + ttl_sec: TTL аренды в секундах + + Возвращает: + Кортеж (success, cancel_requested) """ async with self.s.begin(): job = await self._get(job_id) if not job or job.status != "running": return False, False - + cancel_requested = bool(job.cancel_requested) now = datetime.now(timezone.utc) q = ( @@ -265,6 +201,9 @@ class QueueRepository: async def finish_ok(self, job_id: str) -> None: """ Помечает задачу как выполненную успешно и снимает advisory-lock. + + Параметры: + job_id: Идентификатор задачи """ async with self.s.begin(): job = await self._get(job_id) @@ -279,12 +218,17 @@ class QueueRepository: async def finish_fail_or_retry(self, job_id: str, err: str, is_canceled: bool = False) -> None: """ Помечает задачу как failed, canceled или возвращает в очередь с задержкой. + + Параметры: + job_id: Идентификатор задачи + err: Текст ошибки + is_canceled: Флаг отмены """ async with self.s.begin(): job = await self._get(job_id) if not job: return - + if is_canceled: job.status = "canceled" job.error = err @@ -310,6 +254,12 @@ class QueueRepository: async def requeue_lost(self, now: Optional[datetime] = None) -> list[str]: """ Возвращает протухшие running-задачи в очередь. + + Параметры: + now: Текущее время (по умолчанию используется текущий момент UTC) + + Возвращает: + Список job_id перепоставленных задач """ now = now or datetime.now(timezone.utc) async with self.s.begin(): @@ -335,7 +285,13 @@ class QueueRepository: async def _get(self, job_id: str) -> Optional[DLJob]: """ - Возвращает ORM-объект задачи. + Возвращает ORM-объект задачи с блокировкой. + + Параметры: + job_id: Идентификатор задачи + + Возвращает: + ORM модель DLJob или None """ r = await self.s.execute(select(DLJob).where(DLJob.job_id == job_id).with_for_update(skip_locked=True)) return r.scalar_one_or_none() @@ -343,6 +299,12 @@ class QueueRepository: async def _resolve_queue(self, job_id: str) -> str: """ Возвращает имя очереди для события. + + Параметры: + job_id: Идентификатор задачи + + Возвращает: + Имя очереди """ r = await self.s.execute(select(DLJob.queue).where(DLJob.job_id == job_id)) v = r.scalar_one_or_none() @@ -351,6 +313,12 @@ class QueueRepository: async def _append_event(self, job_id: str, queue: str, kind: str, payload: Optional[dict[str, Any]]) -> None: """ Добавляет запись в журнал событий. + + Параметры: + job_id: Идентификатор задачи + queue: Имя очереди + kind: Тип события + payload: Дополнительные данные события """ ev = DLJobEvent( job_id=job_id, @@ -364,6 +332,12 @@ class QueueRepository: async def _try_advisory_lock(self, lock_key: str) -> bool: """ Пытается получить advisory-lock в Postgres. + + Параметры: + lock_key: Ключ блокировки + + Возвращает: + True, если блокировка получена """ r = await self.s.execute(select(func.pg_try_advisory_lock(func.hashtext(lock_key)))) return bool(r.scalar()) @@ -371,5 +345,8 @@ class QueueRepository: async def _advisory_unlock(self, lock_key: str) -> None: """ Снимает advisory-lock в Postgres. + + Параметры: + lock_key: Ключ блокировки """ await self.s.execute(select(func.pg_advisory_unlock(func.hashtext(lock_key)))) diff --git a/src/dataloader/storage/schemas.py b/src/dataloader/storage/schemas.py new file mode 100644 index 0000000..92a9ddd --- /dev/null +++ b/src/dataloader/storage/schemas.py @@ -0,0 +1,41 @@ +# src/dataloader/storage/schemas.py +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Optional + + +@dataclass(frozen=True) +class CreateJobRequest: + """ + DTO для создания задачи в очереди. + """ + job_id: str + queue: str + task: str + args: dict[str, Any] + idempotency_key: Optional[str] + lock_key: str + partition_key: str + priority: int + available_at: datetime + max_attempts: int + lease_ttl_sec: int + producer: Optional[str] + consumer_group: Optional[str] + + +@dataclass(frozen=True) +class JobStatus: + """ + DTO для статуса задачи. + """ + job_id: str + status: str + attempt: int + started_at: Optional[datetime] + finished_at: Optional[datetime] + heartbeat_at: Optional[datetime] + error: Optional[str] + progress: dict[str, Any] diff --git a/src/dataloader/workers/base.py b/src/dataloader/workers/base.py index 0aec038..d8466a6 100644 --- a/src/dataloader/workers/base.py +++ b/src/dataloader/workers/base.py @@ -7,8 +7,8 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import AsyncIterator, Callable, Optional +from dataloader.config import APP_CONFIG from dataloader.context import APP_CTX -from dataloader.storage.db import get_sessionmaker from dataloader.storage.repositories import QueueRepository from dataloader.storage.notify_listener import PGNotifyListener from dataloader.workers.pipelines.registry import resolve as resolve_pipeline @@ -32,7 +32,7 @@ class PGWorker: self._cfg = cfg self._stop = stop_event self._log = APP_CTX.get_logger() - self._sm = get_sessionmaker() + self._sm = APP_CTX.sessionmaker self._notify_wakeup = asyncio.Event() self._listener: Optional[PGNotifyListener] = None @@ -41,12 +41,12 @@ class PGWorker: Главный цикл: ожидание → claim → исполнение → завершение. """ self._log.info(f"worker.start queue={self._cfg.queue}") - - # Запускаем LISTEN/NOTIFY + self._listener = PGNotifyListener( - self._cfg.queue, - lambda: self._notify_wakeup.set(), - self._stop + dsn=APP_CONFIG.pg.url, + queue=self._cfg.queue, + callback=lambda: self._notify_wakeup.set(), + stop_event=self._stop ) try: await self._listener.start() diff --git a/src/dataloader/workers/manager.py b/src/dataloader/workers/manager.py index 6678826..6d4e722 100644 --- a/src/dataloader/workers/manager.py +++ b/src/dataloader/workers/manager.py @@ -5,9 +5,8 @@ import asyncio import contextlib from dataclasses import dataclass -from dataloader.context import APP_CTX from dataloader.config import APP_CONFIG -from dataloader.storage.db import get_sessionmaker +from dataloader.context import APP_CTX from dataloader.workers.base import PGWorker, WorkerConfig from dataloader.workers.reaper import requeue_lost @@ -36,8 +35,8 @@ class WorkerManager: """ Стартует воркеры и фоновую задачу реапера. """ - hb = int(APP_CONFIG.dl.dl_heartbeat_sec) - backoff = int(APP_CONFIG.dl.dl_claim_backoff_sec) + hb = int(APP_CONFIG.worker.heartbeat_sec) + backoff = int(APP_CONFIG.worker.claim_backoff_sec) for spec in self._specs: for i in range(max(1, spec.concurrency)): @@ -75,8 +74,8 @@ class WorkerManager: """ Фоновый цикл возврата потерянных задач в очередь. """ - period = int(APP_CONFIG.dl.dl_reaper_period_sec) - sm = get_sessionmaker() + period = int(APP_CONFIG.worker.reaper_period_sec) + sm = APP_CTX.sessionmaker while not self._stop.is_set(): try: async with sm() as s: @@ -96,7 +95,7 @@ def build_manager_from_env() -> WorkerManager: Собирает WorkerManager из WORKERS_JSON. """ specs: list[WorkerSpec] = [] - for item in APP_CONFIG.dl.parsed_workers(): + for item in APP_CONFIG.worker.parsed_workers(): q = str(item.get("queue", "")).strip() c = int(item.get("concurrency", 1)) if q: