dataloader/src/dataloader/storage/repositories/opu.py

149 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Репозиторий для работы с данными OPU."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from sqlalchemy import DDL, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from dataloader.config import APP_CONFIG
from dataloader.storage.models import BriefDigitalCertificateOpu
class OpuRepository:
"""Репозиторий для работы с таблицей brief_digital_certificate_opu."""
def __init__(self, session: AsyncSession):
"""
Инициализация репозитория.
Args:
session: Асинхронная сессия SQLAlchemy
"""
self.s = session
self.schema = APP_CONFIG.pg.schema_opu
self.batch_size = APP_CONFIG.pg.batch_size
async def truncate(
self, *, cascade: bool = False, restart_identity: bool = True
) -> None:
"""
Быстро очищает таблицу, уважая имя схемы и безопасное квотирование для PostgreSQL.
Args:
cascade: добавляет CASCADE
restart_identity: добавляет RESTART IDENTITY
"""
table = BriefDigitalCertificateOpu.__table__
def quote_ident(name: str) -> str:
"""Экранирует кавычки и оборачивает имя в двойные."""
return f'"{name.replace("\"", "\"\"")}"'
schema_quoted = quote_ident(self.schema)
table_quoted = quote_ident(table.name)
full_table_name = f"{schema_quoted}.{table_quoted}"
opts = []
if restart_identity:
opts.append("RESTART IDENTITY")
if cascade:
opts.append("CASCADE")
suffix = f" {' '.join(opts)}" if opts else ""
await self.s.execute(text(f"TRUNCATE TABLE {full_table_name}{suffix}"))
await self.s.commit()
async def bulk_insert(
self, records: Sequence[dict[str, Any]], batch_size: int | None = None
) -> int:
"""
Массовая вставка записей в таблицу батчами.
Args:
records: Список словарей с данными для вставки
batch_size: Размер батча (default: из конфига PG_BATCH_SIZE)
Returns:
Количество вставленных записей
"""
if not records:
return 0
if batch_size is None:
batch_size = self.batch_size
total_inserted = 0
for i in range(0, len(records), batch_size):
batch = records[i : i + batch_size]
async with self.s.begin_nested():
stmt = pg_insert(BriefDigitalCertificateOpu).values(batch)
await self.s.execute(stmt)
await self.s.flush()
total_inserted += len(batch)
return total_inserted
async def bulk_upsert(
self, records: Sequence[dict[str, Any]], batch_size: int | None = None
) -> int:
"""
Массовая вставка/обновление записей (UPSERT) батчами.
Args:
records: Список словарей с данными
batch_size: Размер батча (default: из конфига PG_BATCH_SIZE)
Returns:
Количество обработанных записей
"""
if not records:
return 0
if batch_size is None:
batch_size = self.batch_size
update_columns = {
c.name
for c in BriefDigitalCertificateOpu.__table__.columns
if not c.primary_key
and c.name not in {"wf_load_id", "wf_load_dttm", "wf_row_id"}
}
total_upserted = 0
for i in range(0, len(records), batch_size):
batch = records[i : i + batch_size]
insert_stmt = pg_insert(BriefDigitalCertificateOpu).values(batch)
update_cols = {col: insert_stmt.excluded[col] for col in update_columns}
stmt = insert_stmt.on_conflict_do_update(
index_elements=[
"object_id",
"desk_nm",
"actdate",
"layer_cd",
"opu_cd",
"opu_lvl",
"opu_prnt_cd",
"object_unit",
],
set_=update_cols,
)
async with self.s.begin_nested():
await self.s.execute(stmt)
await self.s.flush()
total_upserted += len(batch)
return total_upserted