Go to file
itqop 6364c97f21 chore: add new README 2025-11-05 18:49:23 +03:00
src/dataloader refactor: rename columns 2025-11-05 17:57:42 +03:00
tests tests: add new tests 2025-11-05 18:49:12 +03:00
.coveragerc tests: add tests 2025-11-05 15:00:41 +03:00
.gitignore refactor: claude v1 2025-11-05 14:13:51 +03:00
CLAUDE.md refactor: claude v1 2025-11-05 14:13:51 +03:00
README.md chore: add new README 2025-11-05 18:49:23 +03:00
TODO.md feat: add storage 2025-11-05 01:34:42 +03:00
TZ.md chore: refactor 2025-11-05 16:01:37 +03:00
poetry.lock feat: first 2025-11-05 01:11:41 +03:00
pyproject.toml feat: first 2025-11-05 01:11:41 +03:00
pytest.ini refactor: claude v2 2025-11-05 14:41:56 +03:00
rest_template.md chore: refactor 2025-11-05 16:01:37 +03:00

README.md

Dataloader

Асинхронный сервис постановки и исполнения ETLзадач поверх очереди в PostgreSQL. Предоставляет HTTP API для триггера задач, мониторинга статуса и отмены. Внутри процесса поднимает пул воркеров, которые конкурируют за задачи через SELECT … FOR UPDATE SKIP LOCKED и обрабатывают их с учётом lease/heartbeat и кооперативной отмены. Для пробуждения воркеров используется LISTEN/NOTIFY.

Содержание

  • О проекте
  • Быстрый старт
  • Конфигурация окружения
  • Архитектура и потоки данных
  • Структура проекта
  • Взаимодействие с БД
  • HTTP API
  • Воркеры, пайплайны и добавление новых ETLзадач
  • Логирование, метрики, аудит
  • Тестирование
  • Эксплуатация и масштабирование

О проекте

Сервис решает типовую задачу фоновой обработки задач: один общий пул воркеров, одна очередь в БД, несколько типов задач (пайплайнов), идемпотентность, повторные попытки, контроль конкуренции через advisorylock по lock_key, кооперативная отмена, возврат «потерянных» задач.

Быстрый старт

  1. Установить зависимости (poetry):
    poetry install
    
  2. Подготовить PostgreSQL (см. DDL в DDL.sql) и переменные окружения (см. «Конфигурация»).
  3. Запуск сервиса:
    poetry run python -m dataloader
    
  4. Проверка доступности:
    curl http://localhost:8081/health
    
  5. Пример постановки задачи:
    curl -X POST http://localhost:8081/api/v1/jobs/trigger \
      -H "Content-Type: application/json" \
      -d '{
            "queue": "etl.default",
            "task": "noop",
            "args": {"sleep1": 1, "sleep2": 1, "sleep3": 1},
            "lock_key": "customer:42",
            "priority": 100
          }'
    

Конфигурация окружения

Настройки собираются в src/dataloader/config.py через Pydantic Settings.

  • Приложение (AppSettings):

    • APP_HOST (def: 0.0.0.0)
    • APP_PORT (def: 8081)
    • TIMEZONE (def: Europe/Moscow)
  • Логирование (LogSettings):

    • LOG_PATH, LOG_FILE_NAME, LOG_ROTATION
    • METRIC_PATH, METRIC_FILE_NAME
    • AUDIT_LOG_PATH, AUDIT_LOG_FILE_NAME
    • DEBUG переключает уровень на DEBUG
  • PostgreSQL (PGSettings):

    • PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DATABASE
    • PG_SCHEMA_QUEUE - схема таблиц очереди (логическая, маппится через schema_translate_map)
    • Параметры пула: PG_USE_POOL, PG_POOL_SIZE, PG_MAX_OVERFLOW, PG_POOL_RECYCLE
    • Таймауты: PG_CONNECT_TIMEOUT, PG_COMMAND_TIMEOUT
  • Воркеры (WorkerSettings):

    • WORKERS_JSON - список конфигураций воркеров, например: [{"queue":"etl.default","concurrency":2}]
    • DL_HEARTBEAT_SEC (def: 10)
    • DL_DEFAULT_LEASE_TTL_SEC (def: 60)
    • DL_REAPER_PERIOD_SEC (def: 10)
    • DL_CLAIM_BACKOFF_SEC (def: 15)

Архитектура и потоки данных

  • HTTP слой: FastAPIприложение (dataloader.api) с v1 API и инфраструктурными маршрутами (/health, /info).
  • Контекст: AppContext инициализирует логирование, AsyncEngine, async_sessionmaker и предоставляет DI (get_session).
  • Очередь: одна таблица dl_jobs и журнал dl_job_events в PostgreSQL. Идемпотентность на уровне idempotency_key. Пробуждение воркеров через триггеры NOTIFY в БД и listener на стороне приложения.
  • Воркеры: WorkerManager поднимает N асинхронных воркеров (PGWorker) на основании WORKERS_JSON. Каждый воркер:
    1. ждёт уведомление (LISTEN/NOTIFY) или таймаут,
    2. пытается «захватить» одну задачу (SELECT … FOR UPDATE SKIP LOCKED),
    3. выставляет running, получает advisorylock по lock_key,
    4. исполняет соответствующий пайплайн с heartbeat,
    5. завершает задачу: succeeded/failed/canceled или возвращает в очередь на ретрай.
  • Реапер: фоновая задача, периодически возвращает «потерянные» runningзадачи в queued.

Структура

src/dataloader/
├── __main__.py              # Запуск uvicorn, lifecycle через FastAPI lifespan
├── config.py                # Pydantic-настройки (app/log/pg/worker)
├── context.py               # AppContext: engine, sessionmaker, логгер, DI
│
├── api/
│   ├── __init__.py          # FastAPI app, middleware, routers, lifespan
│   ├── middleware.py        # Логирование запросов + метрики/аудит
│   ├── os_router.py         # /health, /info (инфраструктурные ручки)
│   ├── metric_router.py     # Примеры метрик (like/dislike)
│   │
│   └── v1/
│       ├── router.py        # /api/v1/jobs: trigger, status, cancel
│       ├── service.py       # Бизнес-логика поверх репозитория
│       ├── schemas.py       # Pydantic DTO API
│       └── utils.py         # Утилиты (генерация UUID и т.д.)
│
├── storage/
│   ├── engine.py            # AsyncEngine, sessionmaker, schema_translate_map
│   ├── notify_listener.py   # asyncpg LISTEN/NOTIFY по каналу dl_jobs
│   │
│   ├── models/
│   │   ├── base.py          # Declarative base
│   │   └── queue.py         # ORM-модели DLJob, DLJobEvent
│   │
│   ├── repositories/
│   │   └── queue.py         # QueueRepository (CRUD операции)
│   │
│   └── schemas/
│       └── queue.py         # CreateJobRequest, JobStatus
│
├── workers/
│   ├── base.py              # PGWorker: главный цикл, heartbeat, вызов пайплайнов
│   ├── manager.py           # WorkerManager: запуск/остановка + reaper
│   ├── reaper.py            # Requeue_lost на базе репозитория
│   │
│   └── pipelines/
│       ├── __init__.py      # Автозагрузка модулей для регистрации
│       ├── registry.py      # Реестр обработчиков задач (@register)
│       └── noop.py          # Пример эталонного пайплайна
│
└── logger/
    └── ...                  # Логирование

Взаимодействие с БД

  • Подключение: postgresql+asyncpg через SQLAlchemy AsyncEngine.
  • Схемы: логическое имя queue маппится на реальную через schema_translate_map (см. engine.py), имя реальной схемы задаётся PG_SCHEMA_QUEUE.
  • DDL: см. DDL.sql. Ключевые элементы:
    • dl_jobs с индексами на claim и runninglease,
    • dl_job_events как журнал событий,
    • триггер notify_job_ready() + LISTEN dl_jobs для пробуждения воркеров.
  • Конкуренция: claim через FOR UPDATE SKIP LOCKED, взаимное исключение по бизнес‑сущности через advisorylock pg_try_advisory_lock(hashtext(lock_key)).
  • Надёжность: atleastonce. Пайплайны должны быть идемпотентны в части записи в целевые таблицы.

HTTP API (v1)

POST /api/v1/jobs/trigger

Постановка задачи в очередь (идемпотентная операция).

Request:

{
    "queue": "load.cbr",                          // обязательно: имя очереди
    "task": "load.cbr.rates",                     // обязательно: имя задачи для registry
    "args": {                                     // опционально: аргументы задачи
        "date": "2025-01-10",
        "currencies": ["USD", "EUR"]
    },
    "idempotency_key": "cbr_2025-01-10",         // опционально: ключ идемпотентности
    "lock_key": "cbr_rates",                      // обязательно: ключ для advisory lock
    "partition_key": "2025-01-10",                // опционально: ключ партиционирования
    "priority": 100,                              // опционально: приоритет (меньше = выше)
    "available_at": "2025-01-10T00:00:00Z",      // опционально: отложенный запуск
    "max_attempts": 3,                            // опционально: макс попыток (def: 5)
    "lease_ttl_sec": 300,                         // опционально: TTL аренды (def: 60)
    "producer": "api-client",                     // опционально: кто поставил
    "consumer_group": "cbr-loaders"               // опционально: группа потребителей
}

Response 200:

{
    "job_id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "queued"
}

Коды ответов:

  • 200 OK - задача создана или уже существует (идемпотентность)
  • 400 Bad Request - невалидные данные
  • 500 Internal Server Error - ошибка сервера

GET /api/v1/jobs/{job_id}/status

Получение статуса задачи.

Response 200:

{
    "job_id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "running",                          // queued/running/succeeded/failed/canceled
    "attempt": 1,                                 // текущая попытка
    "started_at": "2025-01-10T12:00:00Z",        // время первого запуска
    "finished_at": null,                          // время завершения (если есть)
    "heartbeat_at": "2025-01-10T12:01:30Z",      // последний heartbeat
    "error": null,                                // текст ошибки (если есть)
    "progress": {                                 // прогресс выполнения (custom)
        "processed": 500,
        "total": 1000
    }
}

Коды ответов:

  • 200 OK - статус получен
  • 404 Not Found - задача не найдена

POST /api/v1/jobs/{job_id}/cancel

Запрос кооперативной отмены задачи.

Response 200:

{
    "job_id": "550e8400-e29b-41d4-a716-446655440000",
    "status": "running",
    "attempt": 1,
    "started_at": "2025-01-10T12:00:00Z",
    "heartbeat_at": "2025-01-10T12:01:30Z"
}

Поведение:

  • Устанавливает флаг cancel_requested = true в БД
  • Воркер проверяет флаг между yield в пайплайне
  • При обнаружении флага воркер завершает задачу со статусом canceled

Коды ответов:

  • 200 OK - запрос отмены принят
  • 404 Not Found - задача не найдена

Инфраструктурные эндпоинты

GET /health - проверка работоспособности (без БД, < 20ms)

{"status": "healthy"}

GET /info - информация о сервисе

{
    "service": "dataloader",
    "version": "1.0.0",
    "environment": "production"
}

Воркеры, пайплайны и добавление новых ETLзадач

Как работает воркер

  1. Ожидает сигнал (LISTEN/NOTIFY) или таймаут DL_CLAIM_BACKOFF_SEC.
  2. Пытается забрать одну задачу своей очереди: status='queued' AND available_at<=now() с FOR UPDATE SKIP LOCKED.
  3. Переводит в running, увеличивает attempt, выставляет lease_expires_at, делает heartbeat каждые DL_HEARTBEAT_SEC.
  4. Захватывает advisorylock по lock_key (если не получилось - возвращает в queued с бэкоффом).
  5. Выполняет пайплайн (task) с поддержкой итеративных шагов и кооперативной отмены.
  6. По завершении: succeeded или failed/canceled; при ошибках возможны ретраи до max_attempts.

Протокол выполнения задачи (SQL)

Claim (захват задачи):

WITH cte AS (
    SELECT job_id
    FROM dl_jobs
    WHERE status = 'queued'
      AND queue = :queue
      AND available_at <= now()
    ORDER BY priority ASC, created_at ASC
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE dl_jobs j
SET status = 'running',
    started_at = COALESCE(started_at, now()),
    attempt = attempt + 1,
    lease_expires_at = now() + make_interval(secs => j.lease_ttl_sec),
    heartbeat_at = now()
FROM cte
WHERE j.job_id = cte.job_id
RETURNING j.job_id, j.task, j.args, j.lock_key, j.lease_ttl_sec;

Heartbeat (продление аренды):

UPDATE dl_jobs
SET heartbeat_at = now(),
    lease_expires_at = now() + make_interval(secs => :ttl)
WHERE job_id = :job_id AND status = 'running'
RETURNING cancel_requested;

Завершение успешное:

UPDATE dl_jobs
SET status = 'succeeded',
    finished_at = now(),
    lease_expires_at = NULL
WHERE job_id = :job_id;

SELECT pg_advisory_unlock(hashtext(:lock_key));

Завершение с ошибкой (retry):

UPDATE dl_jobs
SET status = CASE WHEN attempt < max_attempts THEN 'queued' ELSE 'failed' END,
    available_at = CASE WHEN attempt < max_attempts
                        THEN now() + make_interval(secs => 30 * attempt)
                        ELSE now() END,
    error = :error_message,
    lease_expires_at = NULL,
    finished_at = CASE WHEN attempt >= max_attempts THEN now() ELSE NULL END
WHERE job_id = :job_id;

SELECT pg_advisory_unlock(hashtext(:lock_key));

Reaper (возврат потерянных задач):

UPDATE dl_jobs
SET status = 'queued',
    available_at = now(),
    lease_expires_at = NULL
WHERE status = 'running'
  AND lease_expires_at IS NOT NULL
  AND lease_expires_at < now()
RETURNING job_id;

Интерфейс пайплайна

Пайплайн - обычная функция, возвращающая одно из:

  • асинхронный генератор шагов (рекомендуется для длинных процессов),
  • корутину,
  • синхронную функцию.

Каждый «yield» в асинхронном генераторе - безопасная точка, где воркер выполнит heartbeat и проверит cancel_requested.

Регистрация нового пайплайна через декоратор @register("task_name") в модуле src/dataloader/workers/pipelines/<your_task>.py.

Пример:

from __future__ import annotations
import asyncio
from typing import AsyncIterator
from dataloader.workers.pipelines.registry import register

@register("load.customers")
async def load_customers(args: dict) -> AsyncIterator[None]:
    # шаг 1  вытягиваем данные
    await some_fetch(args)
    yield
    # шаг 2  пишем в БД идемпотентно (upsert/merge)
    await upsert_customers(args)
    yield
    # шаг 3  пост‑обработка
    await finalize(args)
    yield

Важно:

  • Пайплайны должны быть идемпотентны (повторный запуск не должен ломать данные).
  • Долгие операции разбивайте на шаги с yield, чтобы работал heartbeat и отмена.
  • Для бизнес‑взаимного исключения выбирайте корректный lock_key (например, customer:{id}), чтобы параллельные задачи не конфликтовали.

Добавление ETLзадачи (шаги)

1. Создать пайплайн в src/dataloader/workers/pipelines/:

# src/dataloader/workers/pipelines/load_cbr_rates.py
from __future__ import annotations
from typing import AsyncIterator
from datetime import datetime
from dataloader.workers.pipelines.registry import register

@register("load.cbr.rates")
async def load_cbr_rates(args: dict) -> AsyncIterator[None]:
    """
    Загрузка курсов валют ЦБ РФ.

    Args:
        args: {"date": "YYYY-MM-DD", "currencies": ["USD", "EUR"]}
    """
    date = datetime.fromisoformat(args["date"])
    currencies = args.get("currencies", ["USD", "EUR"])

    # Извлечение данных
    data = await fetch_cbr_rates(date, currencies)
    yield  # Heartbeat checkpoint

    # Трансформация
    transformed = transform_rates(data)
    yield  # Heartbeat checkpoint

    # Идемпотентная загрузка в БД
    async with get_target_session() as session:
        await session.execute(
            insert(CbrRates)
            .values(transformed)
            .on_conflict_do_update(
                index_elements=["date", "currency"],
                set_={"rate": excluded.c.rate, "updated_at": func.now()}
            )
        )
        await session.commit()
    yield

2. Настроить воркеры в .env:

WORKERS_JSON='[{"queue":"load.cbr","concurrency":2}]'

3. Поставить задачу через API:

curl -X POST http://localhost:8081/api/v1/jobs/trigger \
  -H "Content-Type: application/json" \
  -d '{
    "queue": "load.cbr",
    "task": "load.cbr.rates",
    "args": {"date": "2025-01-10", "currencies": ["USD", "EUR"]},
    "lock_key": "cbr_rates_2025-01-10",
    "partition_key": "2025-01-10",
    "priority": 100,
    "max_attempts": 3,
    "lease_ttl_sec": 300
  }'

4. Мониторить выполнение:

# Получить job_id из ответа и проверить статус
curl http://localhost:8081/api/v1/jobs/{job_id}/status

Ключевые моменты:

  • Пайплайн должен быть идемпотентным (повторный запуск не должен ломать данные)
  • Используйте yield после каждого значимого чанка работы для heartbeat
  • lock_key должен обеспечивать взаимное исключение (например, customer:{id})
  • partition_key используется для параллелизации независимых задач

Логирование, метрики, аудит

  • Логи: структурированные, через logger/*. Middleware (api/middleware.py) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события.
  • Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms).
  • Аудит: запись бизнес‑событий начала/окончания обработки запроса.

Тестирование

Структура тестов

tests/
├── conftest.py                     # Глобальные фикстуры (db_engine, db_session, client)
├── integration_tests/              # Интеграционные тесты с реальной БД
│   ├── test_queue_repository.py    # 12 тестов репозитория
│   └── test_api_endpoints.py       # 7 тестов API endpoints
└── unit/                           # Юнит-тесты с моками (92 теста)
    ├── test_config.py              # 30 тестов конфигурации
    ├── test_context.py             # 13 тестов AppContext
    ├── test_api_service.py         # 10 тестов сервисного слоя
    ├── test_notify_listener.py     # 13 тестов LISTEN/NOTIFY
    ├── test_workers_base.py        # 14 тестов PGWorker
    ├── test_workers_manager.py     # 10 тестов WorkerManager
    └── test_pipeline_registry.py   # 5 тестов реестра пайплайнов

Запуск тестов

# Все тесты (111 тестов)
poetry run pytest

# Только юнит-тесты
poetry run pytest tests/unit/ -m unit

# Только интеграционные
poetry run pytest tests/integration_tests/ -m integration

# С покрытием кода
poetry run pytest --cov=dataloader --cov-report=html

# С подробным выводом
poetry run pytest -v -s

Покрытие кода

Текущее покрытие: 91% (788 строк / 715 покрыто)

Name                                      Stmts   Miss   Cover
---------------------------------------------------------------
src/dataloader/config.py                     79      0   100%
src/dataloader/context.py                    39      0   100%
src/dataloader/api/v1/service.py             32      0   100%
src/dataloader/storage/models/queue.py       43      0   100%
src/dataloader/storage/schemas/queue.py      29      0   100%
src/dataloader/storage/notify_listener.py    49      0   100%
src/dataloader/workers/base.py              102      3    97%
src/dataloader/workers/manager.py            64      0   100%
src/dataloader/storage/repositories/queue    130     12    91%
---------------------------------------------------------------
TOTAL                                       788     73    91%

Ключевые тест-сценарии

Интеграционные тесты:

  • Постановка задачи через API → проверка статуса
  • Идемпотентность через idempotency_key
  • Claim задачи → heartbeat → успешное завершение
  • Claim задачи → ошибка → retry → финальный fail
  • Конкуренция воркеров через advisory lock
  • Возврат потерянных задач (reaper)
  • Отмена задачи пользователем

Юнит-тесты:

  • Конфигурация из переменных окружения
  • Создание и управление воркерами
  • LISTEN/NOTIFY механизм
  • Сервисный слой и репозиторий
  • Протокол heartbeat и отмены

Масштабирование

  • Вертикальное: Увеличение concurrency в WORKERS_JSON для существующих воркеров
  • Горизонтальное: Увеличение количества реплик (pods). Очередь в БД и advisory-lock обеспечат корректность при конкуренции между репликами
  • По очередям: Разные deployment'ы для разных очередей с разными ресурсами

Graceful Shutdown

При получении SIGTERM:

  1. Останавливает прием новых HTTP запросов
  2. Сигнализирует воркерам о необходимости завершения
  3. Ждет завершения текущих задач (timeout 30 сек)
  4. Останавливает reaper
  5. Закрывает соединения с БД

Мониторинг

Health Checks:

  • GET /health - проверка работоспособности (без БД, < 20ms)
  • GET /info - информация о версии

Метрики (если включен metric_router):

  • Количество задач по статусам (queued, running, succeeded, failed)
  • Время выполнения задач (p50, p95, p99)
  • Количество активных воркеров
  • Частота ошибок

Логи: Структурированные JSON-логи с уровнями: DEBUG, INFO, WARNING, ERROR, CRITICAL

Ключевые события для алертов:

  • worker.claim.backoff - частые backoff'ы (возможна конкуренция)
  • worker.complete.failed - высокий процент ошибок
  • reaper.requeued - частый возврат потерянных задач (проблемы с lease)
  • api.error - ошибки API

Troubleshooting

Задачи застревают в статусе queued

Симптомы: Задачи не начинают выполняться, остаются в queued.

Возможные причины:

  1. Воркеры не запущены или упали
  2. Нет воркеров для данной очереди в WORKERS_JSON
  3. available_at в будущем

Решение:

# Проверить логи воркеров
docker logs dataloader | grep worker

# Проверить конфигурацию
echo $WORKERS_JSON

# Проверить задачи в БД
SELECT job_id, queue, status, available_at, created_at
FROM dl_jobs
WHERE status = 'queued'
ORDER BY created_at DESC
LIMIT 10;

Задачи часто возвращаются в queued (backoff)

Симптомы: В логах частые события worker.claim.backoff.

Причины:

  • Конкуренция за lock_key: несколько задач с одинаковым lock_key одновременно
  • Advisory lock уже занят другим процессом

Решение:

  • Проверить корректность выбора lock_key (должен быть уникальным для бизнес-сущности)
  • Использовать partition_key для распределения нагрузки
  • Снизить concurrency для данной очереди

Высокий процент failed задач

Симптомы: Много задач завершаются с status = 'failed'.

Диагностика:

SELECT job_id, task, error, attempt, max_attempts
FROM dl_jobs
WHERE status = 'failed'
ORDER BY finished_at DESC
LIMIT 20;

Возможные причины:

  • Ошибки в коде пайплайна
  • Недоступность внешних сервисов
  • Таймауты (превышение lease_ttl_sec)
  • Неверные аргументы в args

Решение:

  • Проверить логи с job_id
  • Увеличить max_attempts для retry
  • Увеличить lease_ttl_sec для долгих операций
  • Исправить код пайплайна

Медленное выполнение задач

Симптомы: Задачи выполняются дольше ожидаемого.

Диагностика:

SELECT
    task,
    AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_sec,
    COUNT(*) as total
FROM dl_jobs
WHERE status IN ('succeeded', 'failed')
  AND finished_at > NOW() - INTERVAL '1 hour'
GROUP BY task
ORDER BY avg_duration_sec DESC;

Возможные причины:

  • Неоптимальный код пайплайна
  • Медленные внешние сервисы
  • Недостаточно воркеров (concurrency слишком мал)
  • Проблемы с БД (медленные запросы, блокировки)

Решение:

  • Профилировать код пайплайна
  • Увеличить concurrency в WORKERS_JSON
  • Оптимизировать запросы к БД (индексы, batching)
  • Масштабировать горизонтально (больше реплик)

Утечка памяти

Симптомы: Постепенный рост потребления памяти, OOM kills.

Диагностика:

# Мониторинг памяти
kubectl top pods -l app=dataloader --containers

# Проверить логи перед падением
kubectl logs dataloader-xxx --previous

Возможные причины:

  • Накопление объектов в памяти в пайплайне
  • Незакрытые соединения/файлы
  • Утечки в зависимостях

Решение:

  • Использовать context managers (async with) для ресурсов
  • Обрабатывать данные чанками, не загружать всё в память
  • Периодически перезапускать воркеры (restart policy)

Проблемы с LISTEN/NOTIFY

Симптомы: Воркеры не просыпаются сразу после постановки задачи.

Диагностика:

# Проверить логи listener
docker logs dataloader | grep "notify_listener\|LISTEN"

# Проверить триггеры в БД
SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';

Возможные причины:

  • Триггеры не созданы или отключены
  • Проблемы с подключением asyncpg
  • Воркер не подписан на канал

Решение:

  • Воркер автоматически fallback'ится на polling при проблемах с LISTEN/NOTIFY
  • Проверить DDL: триггеры dl_jobs_notify_ins и dl_jobs_notify_upd
  • Проверить права пользователя БД на LISTEN/NOTIFY