11 KiB
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Role and Working Principles
You act as a senior Python / ML / AI / DL developer and system architect.
You work in an enterprise-level project (multi-tier backend) with initial base architecture similar to the REST template from rest_template.md.
You design code strictly according to patterns (Singleton, Repository, Interface, DTO, CRUD, Service, Context, Adapter, etc.).
You write clean production-level Python code (FastAPI, SQLAlchemy 2.x, asyncio, Pydantic v2, PostgreSQL, aiohttp, structlog/loguru).
Working Rules:
- Do not add comments unless explicitly requested.
- Always add docstrings to functions, classes, and modules.
- Always follow PEP 8 style and architectural layer isolation (api / service / repositories / models / schemas / interfaces / logger / config / context).
- Prefer typing via
from __future__ import annotations. - All dependencies are passed through
AppContext(DI Singleton pattern). - Implement logging through the logger with context (
logger.info("msg")without structures). - When creating projects from scratch, rely on the structure from
rest_template.md. - Respond strictly to the point, no fluff, like a senior developer during code review.
- All logic in examples is correct, asynchronous, and production-ready.
- Use only modern library versions.
Your style is minimalistic, precise, clean, and architecturally sound.
Project Overview
Dataloader is an asynchronous FastAPI service for managing and executing long-running ETL tasks via a PostgreSQL-based job queue. The service uses PostgreSQL's LISTEN/NOTIFY for efficient worker wakeup, advisory locks for concurrency control, and SELECT ... FOR UPDATE SKIP LOCKED for job claiming.
This is a Clean Architecture implementation following the project template rest_template.md, built with Python 3.11+, FastAPI, SQLAlchemy 2.0 (async), and asyncpg.
Development Commands
Running the Application
# Install dependencies with Poetry
poetry install
# Run the application
poetry run dataloader
# or
uvicorn dataloader.__main__:main
# The app will start on port 8081 by default (configurable via APP_PORT)
Testing
# Run all tests
poetry run pytest
# Run specific test file
poetry run pytest tests/integration_tests/v1_api/test_service.py
# Run with verbose output
poetry run pytest -v
# Run integration tests only
poetry run pytest tests/integration_tests/
Database
The database schema is already applied (see DDL.sql). The queue uses:
- Table
dl_jobs- main job queue with statuses: queued, running, succeeded, failed, canceled, lost - Table
dl_job_events- audit log of job lifecycle events - PostgreSQL triggers for
LISTEN/NOTIFYon job insertion/updates
Architecture
High-Level Structure
The codebase follows Clean Architecture with clear separation of concerns:
-
API Layer (
src/dataloader/api/)v1/router.py- HTTP endpoints for job managementv1/service.py- Business logic layerv1/schemas.py- Pydantic request/response modelsos_router.py- Infrastructure endpoints (/health,/status) DO NOT MODIFYmetric_router.py- Metrics endpoints (BETA) DO NOT MODIFYmiddleware.py- Request/response logging middleware DO NOT MODIFY
-
Storage Layer (
src/dataloader/storage/)repositories.py- PostgreSQL queue operations using SQLAlchemy ORMdb.py- Database engine and session managementnotify_listener.py- PostgreSQL LISTEN/NOTIFY implementation
-
Worker Layer (
src/dataloader/workers/)manager.py- Manages lifecycle of async worker tasksbase.py- Core worker implementation with claim/heartbeat/execute cyclereaper.py- Background task to requeue lost jobs (expired leases)pipelines/registry.py- Pipeline registration and resolution systempipelines/- Individual pipeline implementations
-
Logger (
src/dataloader/logger/)- Structured logging with automatic sensitive data masking
- DO NOT MODIFY these files - they're from the template
-
Core (
src/dataloader/)__main__.py- Application entry pointconfig.py- Pydantic Settings for all configurationcontext.py- AppContext singleton for dependency injectionbase.py- Base classes and typesexceptions.py- Global exception definitions
Key Architectural Patterns
Job Queue Protocol
Jobs flow through the system via a strict state machine:
-
Enqueue (
triggerAPI) - Creates job inqueuedstatus- Idempotent via
idempotency_key - PostgreSQL trigger fires
LISTEN/NOTIFYto wake workers
- Idempotent via
-
Claim (worker) - Worker acquires job atomically
- Uses
FOR UPDATE SKIP LOCKEDto prevent contention - Sets status to
running, increments attempt counter - Attempts PostgreSQL advisory lock on
lock_key - If lock fails → job goes back to
queuedwith backoff delay
- Uses
-
Execute (worker) - Runs the pipeline with heartbeat
- Heartbeat updates every
DL_HEARTBEAT_SECseconds - Extends
lease_expires_atto prevent reaper from reclaiming - Checks
cancel_requestedflag between pipeline chunks - Pipeline yields between chunks to allow cooperative cancellation
- Heartbeat updates every
-
Complete (worker) - Finalize job status
- Success:
status = succeeded, release advisory lock - Failure:
- If
attempt < max_attempts→status = queued(retry with exponential backoff: 30 * attempt seconds) - If
attempt >= max_attempts→status = failed
- If
- Cancel:
status = canceled - Always releases advisory lock
- Success:
-
Reaper (background) - Recovers lost jobs
- Runs every
DL_REAPER_PERIOD_SEC - Finds jobs where
status = runningANDlease_expires_at < now() - Resets them to
queuedfor retry
- Runs every
Concurrency Control
The system uses multiple layers of concurrency control:
lock_key: PostgreSQL advisory lock ensures only one worker processes jobs with the same lock_keypartition_key: Logical grouping for job ordering (currently informational)FOR UPDATE SKIP LOCKED: Prevents multiple workers from claiming the same job- Async workers: Multiple workers can run concurrently within a single process
Worker Configuration
Workers are configured via WORKERS_JSON environment variable:
[
{"queue": "load.cbr", "concurrency": 2},
{"queue": "load.sgx", "concurrency": 1}
]
This spawns M async tasks (sum of all concurrency values) within the FastAPI process.
Pipeline System
Pipelines are registered via decorator in workers/pipelines/:
from dataloader.workers.pipelines.registry import register
@register("my.task")
async def my_pipeline(args: dict):
# Process chunk 1
yield # Allow heartbeat & cancellation check
# Process chunk 2
yield
# Process chunk 3
The yield statements enable:
- Heartbeat updates during long operations
- Cooperative cancellation via
cancel_requestedchecks - Progress tracking
All pipelines must be imported in workers/pipelines/__init__.py load_all() function.
Application Lifecycle
-
Startup (
lifespaninapi/__init__.py)- Initialize logging
- Create database engine and sessionmaker
- Load all pipelines from registry
- Build WorkerManager from
WORKERS_JSON - Start all worker tasks and reaper
-
Runtime
- FastAPI serves HTTP requests
- Workers poll queue via LISTEN/NOTIFY
- Reaper runs in background
-
Shutdown (on SIGTERM)
- Signal all workers to stop via
asyncio.Event - Cancel worker tasks and wait for completion
- Cancel reaper task
- Dispose database engine
- Signal all workers to stop via
Configuration
All configuration is via environment variables (.env file or system environment):
Application Settings
APP_HOST- Server bind address (default:0.0.0.0)APP_PORT- Server port (default:8081)DEBUG- Debug mode (default:False)LOCAL- Local development flag (default:False)
Database Settings
PG_HOST,PG_PORT,PG_USER,PG_PASSWORD,PG_DATABASE,PG_SCHEMA- PostgreSQL connectionPG_POOL_SIZE,PG_MAX_OVERFLOW,PG_POOL_RECYCLE- Connection pool configurationDL_DB_DSN- Optional override for queue database DSN (if different from main DB)
Worker Settings
WORKERS_JSON- JSON array of worker configurations (required)DL_HEARTBEAT_SEC- Heartbeat interval (default:10)DL_DEFAULT_LEASE_TTL_SEC- Default lease duration (default:60)DL_REAPER_PERIOD_SEC- Reaper run interval (default:10)DL_CLAIM_BACKOFF_SEC- Backoff when advisory lock fails (default:15)
Logging Settings
LOG_PATH,LOG_FILE_NAME- Application log locationMETRIC_PATH,METRIC_FILE_NAME- Metrics log locationAUDIT_LOG_PATH,AUDIT_LOG_FILE_NAME- Audit events log location
API Endpoints
Business API (v1)
-
POST /api/v1/jobs/trigger- Create or get existing job (idempotent)- Body:
{queue, task, args?, idempotency_key?, lock_key, partition_key?, priority?, available_at?} - Response:
{job_id, status}
- Body:
-
GET /api/v1/jobs/{job_id}/status- Get job status- Response:
{job_id, status, attempt, started_at?, finished_at?, heartbeat_at?, error?, progress}
- Response:
-
POST /api/v1/jobs/{job_id}/cancel- Request job cancellation (cooperative)- Response: Same as status endpoint
Infrastructure API
GET /health- Health check (no database access, <20ms)GET /status- Service status with version/uptime
Development Guidelines
Adding a New Pipeline
-
Create pipeline file in
src/dataloader/workers/pipelines/:from dataloader.workers.pipelines.registry import register @register("myqueue.mytask") async def my_task_pipeline(args: dict): # Your implementation # Use yield between chunks for heartbeat yield -
Import in
src/dataloader/workers/pipelines/__init__.py:def load_all() -> None: from . import noop from . import my_task # Add this line -
Add queue to
.env:WORKERS_JSON=[{"queue":"myqueue","concurrency":1}]
Idempotent Operations
All pipelines should be idempotent since jobs may be retried:
- Use
idempotency_keyfor external API calls - Use
UPSERTorINSERT ... ON CONFLICTfor database writes - Design pipelines to be safely re-runnable from any point
Security & Data Masking
The logger automatically masks sensitive fields (defined in logger/utils.py):
- Keywords:
password,token,secret,key,authorization, etc. - Never log credentials directly
- Use structured logging:
logger.info("message", extra={...})
Error Handling
- Pipelines should raise exceptions for transient errors (will trigger retry)
- Use
max_attemptsin job creation to control retry limits - Permanent failures should be logged but not raise (mark job as succeeded but log error in events)
Testing
Integration tests should:
- Use test fixtures from
tests/conftest.py - Test full job lifecycle: trigger → claim → execute → complete
- Test failure scenarios: cancellation, retries, lock contention
- Mock external dependencies, use real database for queue operations
Important Files to Reference
TZ.md- Full technical specification (Russian)TODO.md- Implementation progress and next stepsrest_template.md- Project structure templateDDL.sql- Database schema