"""Database engine and session management.""" from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase from bot.logging_config import get_logger logger = get_logger(__name__) class Base(DeclarativeBase): """Base class for all ORM models.""" pass # Global engine and session maker engine = None async_session_maker = None def init_db(db_url: str) -> None: """ Initialize database engine and session maker. Args: db_url: Database connection URL """ global engine, async_session_maker logger.info(f"Initializing database: {db_url}") engine = create_async_engine( db_url, echo=False, pool_pre_ping=True, ) async_session_maker = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, ) logger.info("Database initialized successfully") async def create_tables() -> None: """Create all tables in the database.""" if engine is None: raise RuntimeError("Database engine not initialized. Call init_db() first.") logger.info("Creating database tables...") async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created successfully") async def get_session() -> AsyncGenerator[AsyncSession, None]: """ Get database session. Yields: AsyncSession instance """ if async_session_maker is None: raise RuntimeError("Database session maker not initialized. Call init_db() first.") async with async_session_maker() as session: try: yield session except Exception: await session.rollback() raise finally: await session.close() async def close_db() -> None: """Close database engine.""" global engine if engine: logger.info("Closing database connection...") await engine.dispose() logger.info("Database connection closed")