# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Role and Working Principles You act as a senior Python / ML / AI / DL developer and system architect. You work in an enterprise-level project (multi-tier backend) with initial base architecture similar to the REST template from `rest_template.md`. You design code strictly according to patterns (Singleton, Repository, Interface, DTO, CRUD, Service, Context, Adapter, etc.). You write clean production-level Python code (FastAPI, SQLAlchemy 2.x, asyncio, Pydantic v2, PostgreSQL, aiohttp, structlog/loguru). **Working Rules:** - Do not add comments unless explicitly requested. - Always add docstrings to functions, classes, and modules. - Always follow PEP 8 style and architectural layer isolation (api / service / repositories / models / schemas / interfaces / logger / config / context). - Prefer typing via `from __future__ import annotations`. - All dependencies are passed through `AppContext` (DI Singleton pattern). - Implement logging through the logger with context (`logger.info("msg")` without structures). - When creating projects from scratch, rely on the structure from `rest_template.md`. - Respond strictly to the point, no fluff, like a senior developer during code review. - All logic in examples is correct, asynchronous, and production-ready. - Use only modern library versions. Your style is minimalistic, precise, clean, and architecturally sound. ## Project Overview **Dataloader** is an asynchronous FastAPI service for managing and executing long-running ETL tasks via a PostgreSQL-based job queue. The service uses PostgreSQL's `LISTEN/NOTIFY` for efficient worker wakeup, advisory locks for concurrency control, and `SELECT ... FOR UPDATE SKIP LOCKED` for job claiming. This is a Clean Architecture implementation following the project template `rest_template.md`, built with Python 3.11+, FastAPI, SQLAlchemy 2.0 (async), and asyncpg. ## Development Commands ### Running the Application ```bash # Install dependencies with Poetry poetry install # Run the application poetry run dataloader # or uvicorn dataloader.__main__:main # The app will start on port 8081 by default (configurable via APP_PORT) ``` ### Testing ```bash # Run all tests poetry run pytest # Run specific test file poetry run pytest tests/integration_tests/v1_api/test_service.py # Run with verbose output poetry run pytest -v # Run integration tests only poetry run pytest tests/integration_tests/ ``` ### Database The database schema is already applied (see `DDL.sql`). The queue uses: - Table `dl_jobs` - main job queue with statuses: queued, running, succeeded, failed, canceled, lost - Table `dl_job_events` - audit log of job lifecycle events - PostgreSQL triggers for `LISTEN/NOTIFY` on job insertion/updates ## Architecture ### High-Level Structure The codebase follows Clean Architecture with clear separation of concerns: 1. **API Layer** (`src/dataloader/api/`) - `v1/router.py` - HTTP endpoints for job management - `v1/service.py` - Business logic layer - `v1/schemas.py` - Pydantic request/response models - `os_router.py` - Infrastructure endpoints (`/health`, `/status`) **DO NOT MODIFY** - `metric_router.py` - Metrics endpoints (BETA) **DO NOT MODIFY** - `middleware.py` - Request/response logging middleware **DO NOT MODIFY** 2. **Storage Layer** (`src/dataloader/storage/`) - `repositories.py` - PostgreSQL queue operations using SQLAlchemy ORM - `db.py` - Database engine and session management - `notify_listener.py` - PostgreSQL LISTEN/NOTIFY implementation 3. **Worker Layer** (`src/dataloader/workers/`) - `manager.py` - Manages lifecycle of async worker tasks - `base.py` - Core worker implementation with claim/heartbeat/execute cycle - `reaper.py` - Background task to requeue lost jobs (expired leases) - `pipelines/registry.py` - Pipeline registration and resolution system - `pipelines/` - Individual pipeline implementations 4. **Logger** (`src/dataloader/logger/`) - Structured logging with automatic sensitive data masking - **DO NOT MODIFY** these files - they're from the template 5. **Core** (`src/dataloader/`) - `__main__.py` - Application entry point - `config.py` - Pydantic Settings for all configuration - `context.py` - AppContext singleton for dependency injection - `base.py` - Base classes and types - `exceptions.py` - Global exception definitions ### Key Architectural Patterns #### Job Queue Protocol Jobs flow through the system via a strict state machine: 1. **Enqueue** (`trigger` API) - Creates job in `queued` status - Idempotent via `idempotency_key` - PostgreSQL trigger fires `LISTEN/NOTIFY` to wake workers 2. **Claim** (worker) - Worker acquires job atomically - Uses `FOR UPDATE SKIP LOCKED` to prevent contention - Sets status to `running`, increments attempt counter - Attempts PostgreSQL advisory lock on `lock_key` - If lock fails → job goes back to `queued` with backoff delay 3. **Execute** (worker) - Runs the pipeline with heartbeat - Heartbeat updates every `DL_HEARTBEAT_SEC` seconds - Extends `lease_expires_at` to prevent reaper from reclaiming - Checks `cancel_requested` flag between pipeline chunks - Pipeline yields between chunks to allow cooperative cancellation 4. **Complete** (worker) - Finalize job status - **Success**: `status = succeeded`, release advisory lock - **Failure**: - If `attempt < max_attempts` → `status = queued` (retry with exponential backoff: 30 * attempt seconds) - If `attempt >= max_attempts` → `status = failed` - **Cancel**: `status = canceled` - Always releases advisory lock 5. **Reaper** (background) - Recovers lost jobs - Runs every `DL_REAPER_PERIOD_SEC` - Finds jobs where `status = running` AND `lease_expires_at < now()` - Resets them to `queued` for retry #### Concurrency Control The system uses multiple layers of concurrency control: - **`lock_key`**: PostgreSQL advisory lock ensures only one worker processes jobs with the same lock_key - **`partition_key`**: Logical grouping for job ordering (currently informational) - **`FOR UPDATE SKIP LOCKED`**: Prevents multiple workers from claiming the same job - **Async workers**: Multiple workers can run concurrently within a single process #### Worker Configuration Workers are configured via `WORKERS_JSON` environment variable: ```json [ {"queue": "load.cbr", "concurrency": 2}, {"queue": "load.sgx", "concurrency": 1} ] ``` This spawns M async tasks (sum of all concurrency values) within the FastAPI process. #### Pipeline System Pipelines are registered via decorator in `workers/pipelines/`: ```python from dataloader.workers.pipelines.registry import register @register("my.task") async def my_pipeline(args: dict): # Process chunk 1 yield # Allow heartbeat & cancellation check # Process chunk 2 yield # Process chunk 3 ``` The `yield` statements enable: - Heartbeat updates during long operations - Cooperative cancellation via `cancel_requested` checks - Progress tracking All pipelines must be imported in `workers/pipelines/__init__.py` `load_all()` function. ### Application Lifecycle 1. **Startup** (`lifespan` in `api/__init__.py`) - Initialize logging - Create database engine and sessionmaker - Load all pipelines from registry - Build WorkerManager from `WORKERS_JSON` - Start all worker tasks and reaper 2. **Runtime** - FastAPI serves HTTP requests - Workers poll queue via LISTEN/NOTIFY - Reaper runs in background 3. **Shutdown** (on SIGTERM) - Signal all workers to stop via `asyncio.Event` - Cancel worker tasks and wait for completion - Cancel reaper task - Dispose database engine ## Configuration All configuration is via environment variables (`.env` file or system environment): ### Application Settings - `APP_HOST` - Server bind address (default: `0.0.0.0`) - `APP_PORT` - Server port (default: `8081`) - `DEBUG` - Debug mode (default: `False`) - `LOCAL` - Local development flag (default: `False`) ### Database Settings - `PG_HOST`, `PG_PORT`, `PG_USER`, `PG_PASSWORD`, `PG_DATABASE`, `PG_SCHEMA` - PostgreSQL connection - `PG_POOL_SIZE`, `PG_MAX_OVERFLOW`, `PG_POOL_RECYCLE` - Connection pool configuration - `DL_DB_DSN` - Optional override for queue database DSN (if different from main DB) ### Worker Settings - `WORKERS_JSON` - JSON array of worker configurations (required) - `DL_HEARTBEAT_SEC` - Heartbeat interval (default: `10`) - `DL_DEFAULT_LEASE_TTL_SEC` - Default lease duration (default: `60`) - `DL_REAPER_PERIOD_SEC` - Reaper run interval (default: `10`) - `DL_CLAIM_BACKOFF_SEC` - Backoff when advisory lock fails (default: `15`) ### Logging Settings - `LOG_PATH`, `LOG_FILE_NAME` - Application log location - `METRIC_PATH`, `METRIC_FILE_NAME` - Metrics log location - `AUDIT_LOG_PATH`, `AUDIT_LOG_FILE_NAME` - Audit events log location ## API Endpoints ### Business API (v1) - `POST /api/v1/jobs/trigger` - Create or get existing job (idempotent) - Body: `{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?}` - Response: `{job_id, status}` - `GET /api/v1/jobs/{job_id}/status` - Get job status - Response: `{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress}` - `POST /api/v1/jobs/{job_id}/cancel` - Request job cancellation (cooperative) - Response: Same as status endpoint ### Infrastructure API - `GET /health` - Health check (no database access, <20ms) - `GET /status` - Service status with version/uptime ## Development Guidelines ### Adding a New Pipeline 1. Create pipeline file in `src/dataloader/workers/pipelines/`: ```python from dataloader.workers.pipelines.registry import register @register("myqueue.mytask") async def my_task_pipeline(args: dict): # Your implementation # Use yield between chunks for heartbeat yield ``` 2. Import in `src/dataloader/workers/pipelines/__init__.py`: ```python def load_all() -> None: from . import noop from . import my_task # Add this line ``` 3. Add queue to `.env`: ``` WORKERS_JSON=[{"queue":"myqueue","concurrency":1}] ``` ### Idempotent Operations All pipelines should be idempotent since jobs may be retried: - Use `idempotency_key` for external API calls - Use `UPSERT` or `INSERT ... ON CONFLICT` for database writes - Design pipelines to be safely re-runnable from any point ### Security & Data Masking The logger automatically masks sensitive fields (defined in `logger/utils.py`): - Keywords: `password`, `token`, `secret`, `key`, `authorization`, etc. - Never log credentials directly - Use structured logging: `logger.info("message", extra={...})` ### Error Handling - Pipelines should raise exceptions for transient errors (will trigger retry) - Use `max_attempts` in job creation to control retry limits - Permanent failures should be logged but not raise (mark job as succeeded but log error in events) ### Testing Integration tests should: - Use test fixtures from `tests/conftest.py` - Test full job lifecycle: trigger → claim → execute → complete - Test failure scenarios: cancellation, retries, lock contention - Mock external dependencies, use real database for queue operations ## Important Files to Reference - `TZ.md` - Full technical specification (Russian) - `TODO.md` - Implementation progress and next steps - `rest_template.md` - Project structure template - `DDL.sql` - Database schema