From 99ed4130cd1d0f09c592e3f2f6520ec6c7258168 Mon Sep 17 00:00:00 2001 From: itqop Date: Thu, 6 Nov 2025 22:16:54 +0300 Subject: [PATCH] fixes --- src/dataloader/config.py | 1 + src/dataloader/storage/repositories/opu.py | 45 ++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/dataloader/config.py b/src/dataloader/config.py index acad1de..c2349cc 100644 --- a/src/dataloader/config.py +++ b/src/dataloader/config.py @@ -119,6 +119,7 @@ class PGSettings(BaseSettings): schema_quotes: str = Field(validation_alias="PG_SCHEMA_QUOTES", default="public") schema_opu: str = Field(validation_alias="PG_SCHEMA_OPU", default="public") batch_size: int = Field(validation_alias="PG_BATCH_SIZE", default=1000) + deduplicate: bool = Field(validation_alias="PG_DEDUPLICATE", default=False) use_pool: bool = Field(validation_alias="PG_USE_POOL", default=True) pool_size: int = Field(validation_alias="PG_POOL_SIZE", default=5) max_overflow: int = Field(validation_alias="PG_MAX_OVERFLOW", default=10) diff --git a/src/dataloader/storage/repositories/opu.py b/src/dataloader/storage/repositories/opu.py index 2cd2d59..b871dfc 100644 --- a/src/dataloader/storage/repositories/opu.py +++ b/src/dataloader/storage/repositories/opu.py @@ -26,6 +26,45 @@ class OpuRepository: self.s = session self.schema = APP_CONFIG.pg.schema_opu self.batch_size = APP_CONFIG.pg.batch_size + self.deduplicate = APP_CONFIG.pg.deduplicate + + def _deduplicate_records(self, records: Sequence[dict[str, Any]]) -> Sequence[dict[str, Any]]: + """ + Быстро удаляет дубликаты по уникальному индексу, оставляя последнее вхождение. + + Args: + records: Список словарей с данными + + Returns: + Список уникальных записей (или исходный если дедупликация выключена) + """ + if not self.deduplicate: + return records # Возвращаем как есть, без копирования + + if not records: + return [] + + # Ключи уникального индекса для OPU + unique_keys = ( + "object_id", + "desk_nm", + "actdate", + "layer_cd", + "opu_cd", + "opu_lvl", + "opu_prnt_cd", + "object_unit", + ) + + seen = {} + for idx, record in enumerate(records): + # Формируем ключ из уникальных полей + key = tuple(record.get(k) for k in unique_keys) + # Оставляем последнее вхождение (перезаписываем индекс) + seen[key] = idx + + # Возвращаем записи в порядке их последнего появления + return [records[idx] for idx in sorted(seen.values())] async def truncate( self, *, cascade: bool = False, restart_identity: bool = True @@ -77,6 +116,9 @@ class OpuRepository: if batch_size is None: batch_size = self.batch_size + # Дедупликация всех записей перед разбиением на батчи + records = self._deduplicate_records(records) + total_inserted = 0 for i in range(0, len(records), batch_size): @@ -110,6 +152,9 @@ class OpuRepository: if batch_size is None: batch_size = self.batch_size + # Дедупликация всех записей перед разбиением на батчи + records = self._deduplicate_records(records) + update_columns = { c.name for c in BriefDigitalCertificateOpu.__table__.columns