From 74243fd2589bd21a7b71a14c9c98bd99dc62d88f Mon Sep 17 00:00:00 2001 From: itqop Date: Wed, 5 Nov 2025 20:54:15 +0300 Subject: [PATCH] chore: update README --- README.md | 149 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index f8e2199..b23692c 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ - Взаимодействие с БД - HTTP API - Воркеры, пайплайны и добавление новых ETL‑задач +- Существующие пайплайны - Логирование, метрики, аудит - Тестирование - Эксплуатация и масштабирование @@ -455,6 +456,124 @@ curl http://localhost:8081/api/v1/jobs/{job_id}/status - `partition_key` используется для параллелизации независимых задач +## Существующие пайплайны + +### `load.tenera` - Загрузка котировок + +**Описание:** +Загружает финансовые котировки (биржевые индексы, валюты, сырьевые товары) из SuperTenera API. + +**Источник данных:** +- SuperTenera API - агрегатор котировок из множества источников (CBR, Investing.com, SGX, Bloomberg, TradingView, TradingEconomics) + +**Назначение:** +- Таблицы в схеме `quotes`: + - `quote_section` - разделы котировок (CBR, SGX, и т.д.) + - `quote` - инструменты (тикеры, названия) + - `quote_value` - временные ряды значений (OHLCV + дополнительные метрики) + +**Процесс:** +1. Запрос данных из SuperTenera API +2. Парсинг различных форматов (каждый источник имеет свою структуру) +3. Трансформация в единый формат с преобразованием временных зон +4. UPSERT в БД (идемпотентная вставка/обновление) + +**Пример запуска:** + +```bash +# Настроить воркер в .env +WORKERS_JSON='[{"queue":"load.tenera","concurrency":1}]' + +# Запустить задачу через API +curl -X POST http://localhost:8081/api/v1/jobs/trigger \ + -H "Content-Type: application/json" \ + -d '{ + "queue": "load.tenera", + "task": "load.tenera", + "args": {}, + "lock_key": "tenera_quotes", + "priority": 100, + "max_attempts": 3, + "lease_ttl_sec": 300 + }' +``` + +**Особенности:** +- Поддержка множества форматов временных меток (ISO, Unix timestamp, кастомные форматы) +- Автоматическое преобразование временных зон в Europe/Moscow +- Обработка разных структур данных от различных провайдеров +- UPSERT по композитному ключу (quote_id + date/timestamp) + + +### `load.opu` - Загрузка данных OPU + +**Описание:** +Загружает данные OPU из Gmap2Brief API. Данные выгружаются в виде zstandard-сжатого архива с JSON Lines. + +**Источник данных:** +- Gmap2Brief API - экспорт данных о структуре OPU + +**Назначение:** +- Таблица `brief_digital_certificate_opu` в схеме `opu` (23 поля, композитный PK из 8 полей) + +**Процесс:** +1. Запуск экспорта через API (`/export/opu/start`) +2. Polling статуса экспорта до завершения +3. Скачивание zstandard-архива с JSON Lines +4. TRUNCATE целевой таблицы (полная перезагрузка) +5. Потоковая распаковка архива (64KB чанки) +6. Парсинг JSON Lines и батчевая вставка (по 5000 записей) +7. Преобразование ISO-дат в date/datetime объекты + +**Пример запуска:** + +```bash +# Настроить воркер в .env +WORKERS_JSON='[{"queue":"load.opu","concurrency":1}]' + +# Запустить задачу через API +curl -X POST http://localhost:8081/api/v1/jobs/trigger \ + -H "Content-Type: application/json" \ + -d '{ + "queue": "load.opu", + "task": "load.opu", + "args": {}, + "lock_key": "opu_export", + "priority": 100, + "max_attempts": 2, + "lease_ttl_sec": 600 + }' +``` + +**Особенности:** +- Потоковая обработка больших архивов без полной загрузки в память +- Zstandard декомпрессия с буферизацией неполных строк +- TRUNCATE перед загрузкой (стратегия полной замены данных) +- Батчевая вставка для оптимизации производительности +- Heartbeat после каждого батча для отслеживания прогресса +- Композитный первичный ключ из 8 полей обеспечивает уникальность + + +### `noop` - Тестовый пайплайн + +**Описание:** +Демонстрационный пайплайн для тестирования инфраструктуры очереди. Выполняет серию sleep-операций с heartbeat. + +**Пример запуска:** + +```bash +curl -X POST http://localhost:8081/api/v1/jobs/trigger \ + -H "Content-Type: application/json" \ + -d '{ + "queue": "etl.default", + "task": "noop", + "args": {"sleep1": 2, "sleep2": 3, "sleep3": 1}, + "lock_key": "test_noop", + "priority": 100 + }' +``` + + ## Логирование, метрики, аудит - Логи: структурированные, через `logger/*`. Middleware (`api/middleware.py`) логирует входящие запросы/исходящие ответы, время обработки, пишет метрики и аудит‑события. - Метрики: простые счётчики (пример: likes/dislikes, requests_total, responses_total, duration_ms). @@ -612,11 +731,6 @@ TOTAL 778 35 95.50% **Решение:** ```bash -# Проверить логи воркеров -docker logs dataloader | grep worker - -# Проверить конфигурацию -echo $WORKERS_JSON # Проверить задачи в БД SELECT job_id, queue, status, available_at, created_at @@ -694,37 +808,12 @@ ORDER BY avg_duration_sec DESC; - Оптимизировать запросы к БД (индексы, batching) - Масштабировать горизонтально (больше реплик) -### Утечка памяти - -**Симптомы:** Постепенный рост потребления памяти, OOM kills. - -**Диагностика:** -```bash -# Мониторинг памяти -kubectl top pods -l app=dataloader --containers - -# Проверить логи перед падением -kubectl logs dataloader-xxx --previous -``` - -**Возможные причины:** -- Накопление объектов в памяти в пайплайне -- Незакрытые соединения/файлы -- Утечки в зависимостях - -**Решение:** -- Использовать context managers (`async with`) для ресурсов -- Обрабатывать данные чанками, не загружать всё в память -- Периодически перезапускать воркеры (restart policy) - ### Проблемы с LISTEN/NOTIFY **Симптомы:** Воркеры не просыпаются сразу после постановки задачи. **Диагностика:** ```bash -# Проверить логи listener -docker logs dataloader | grep "notify_listener\|LISTEN" # Проверить триггеры в БД SELECT * FROM pg_trigger WHERE tgname LIKE 'dl_jobs%';