diff --git a/src/dataloader/config.py b/src/dataloader/config.py index 8831cd9..acad1de 100644 --- a/src/dataloader/config.py +++ b/src/dataloader/config.py @@ -118,6 +118,7 @@ class PGSettings(BaseSettings): schema_queue: str = Field(validation_alias="PG_SCHEMA_QUEUE", default="public") schema_quotes: str = Field(validation_alias="PG_SCHEMA_QUOTES", default="public") schema_opu: str = Field(validation_alias="PG_SCHEMA_OPU", default="public") + batch_size: int = Field(validation_alias="PG_BATCH_SIZE", default=1000) use_pool: bool = Field(validation_alias="PG_USE_POOL", default=True) pool_size: int = Field(validation_alias="PG_POOL_SIZE", default=5) max_overflow: int = Field(validation_alias="PG_MAX_OVERFLOW", default=10) diff --git a/src/dataloader/storage/repositories/opu.py b/src/dataloader/storage/repositories/opu.py index 189a093..2cd2d59 100644 --- a/src/dataloader/storage/repositories/opu.py +++ b/src/dataloader/storage/repositories/opu.py @@ -5,10 +5,11 @@ from __future__ import annotations from collections.abc import Sequence from typing import Any -from sqlalchemy import text +from sqlalchemy import DDL, text from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession +from dataloader.config import APP_CONFIG from dataloader.storage.models import BriefDigitalCertificateOpu @@ -23,25 +24,49 @@ class OpuRepository: session: Асинхронная сессия SQLAlchemy """ self.s = session + self.schema = APP_CONFIG.pg.schema_opu + self.batch_size = APP_CONFIG.pg.batch_size - async def truncate(self) -> None: + async def truncate( + self, *, cascade: bool = False, restart_identity: bool = True + ) -> None: """ - Очищает таблицу brief_digital_certificate_opu. + Быстро очищает таблицу, уважая имя схемы и безопасное квотирование для PostgreSQL. - Использует TRUNCATE для быстрой очистки. + Args: + cascade: добавляет CASCADE + restart_identity: добавляет RESTART IDENTITY """ - table_name = BriefDigitalCertificateOpu.__tablename__ - schema = BriefDigitalCertificateOpu.__table_args__[0]["schema"] - full_table_name = f"{schema}.{table_name}" + table = BriefDigitalCertificateOpu.__table__ - await self.s.execute(text(f"TRUNCATE TABLE {full_table_name}")) + def quote_ident(name: str) -> str: + """Экранирует кавычки и оборачивает имя в двойные.""" + return f'"{name.replace("\"", "\"\"")}"' - async def bulk_insert(self, records: Sequence[dict[str, Any]]) -> int: + schema_quoted = quote_ident(self.schema) + table_quoted = quote_ident(table.name) + full_table_name = f"{schema_quoted}.{table_quoted}" + + opts = [] + if restart_identity: + opts.append("RESTART IDENTITY") + if cascade: + opts.append("CASCADE") + + suffix = f" {' '.join(opts)}" if opts else "" + + await self.s.execute(text(f"TRUNCATE TABLE {full_table_name}{suffix}")) + await self.s.commit() + + async def bulk_insert( + self, records: Sequence[dict[str, Any]], batch_size: int | None = None + ) -> int: """ - Массовая вставка записей в таблицу. + Массовая вставка записей в таблицу батчами. Args: records: Список словарей с данными для вставки + batch_size: Размер батча (default: из конфига PG_BATCH_SIZE) Returns: Количество вставленных записей @@ -49,19 +74,32 @@ class OpuRepository: if not records: return 0 - async with self.s.begin_nested(): - stmt = pg_insert(BriefDigitalCertificateOpu).values(records) - await self.s.execute(stmt) - await self.s.flush() + if batch_size is None: + batch_size = self.batch_size - return len(records) + total_inserted = 0 - async def bulk_upsert(self, records: Sequence[dict[str, Any]]) -> int: + for i in range(0, len(records), batch_size): + batch = records[i : i + batch_size] + + async with self.s.begin_nested(): + stmt = pg_insert(BriefDigitalCertificateOpu).values(batch) + await self.s.execute(stmt) + await self.s.flush() + + total_inserted += len(batch) + + return total_inserted + + async def bulk_upsert( + self, records: Sequence[dict[str, Any]], batch_size: int | None = None + ) -> int: """ - Массовая вставка/обновление записей (UPSERT). + Массовая вставка/обновление записей (UPSERT) батчами. Args: records: Список словарей с данными + batch_size: Размер батча (default: из конфига PG_BATCH_SIZE) Returns: Количество обработанных записей @@ -69,6 +107,9 @@ class OpuRepository: if not records: return 0 + if batch_size is None: + batch_size = self.batch_size + update_columns = { c.name for c in BriefDigitalCertificateOpu.__table__.columns @@ -76,25 +117,32 @@ class OpuRepository: and c.name not in {"wf_load_id", "wf_load_dttm", "wf_row_id"} } - insert_stmt = pg_insert(BriefDigitalCertificateOpu).values(records) - update_cols = {col: insert_stmt.excluded[col] for col in update_columns} + total_upserted = 0 - stmt = insert_stmt.on_conflict_do_update( - index_elements=[ - "object_id", - "desk_nm", - "actdate", - "layer_cd", - "opu_cd", - "opu_lvl", - "opu_prnt_cd", - "object_unit", - ], - set_=update_cols, - ) + for i in range(0, len(records), batch_size): + batch = records[i : i + batch_size] - async with self.s.begin_nested(): - await self.s.execute(stmt) - await self.s.flush() + insert_stmt = pg_insert(BriefDigitalCertificateOpu).values(batch) + update_cols = {col: insert_stmt.excluded[col] for col in update_columns} - return len(records) + stmt = insert_stmt.on_conflict_do_update( + index_elements=[ + "object_id", + "desk_nm", + "actdate", + "layer_cd", + "opu_cd", + "opu_lvl", + "opu_prnt_cd", + "object_unit", + ], + set_=update_cols, + ) + + async with self.s.begin_nested(): + await self.s.execute(stmt) + await self.s.flush() + + total_upserted += len(batch) + + return total_upserted