reminder-bot/bot/db/base.py

86 lines
2.0 KiB
Python

"""Database engine and session management."""
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from bot.logging_config import get_logger
logger = get_logger(__name__)
class Base(DeclarativeBase):
"""Base class for all ORM models."""
pass
# Global engine and session maker
engine = None
async_session_maker = None
def init_db(db_url: str) -> None:
"""
Initialize database engine and session maker.
Args:
db_url: Database connection URL
"""
global engine, async_session_maker
logger.info(f"Initializing database: {db_url}")
engine = create_async_engine(
db_url,
echo=False,
pool_pre_ping=True,
)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
logger.info("Database initialized successfully")
async def create_tables() -> None:
"""Create all tables in the database."""
if engine is None:
raise RuntimeError("Database engine not initialized. Call init_db() first.")
logger.info("Creating database tables...")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables created successfully")
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
Get database session.
Yields:
AsyncSession instance
"""
if async_session_maker is None:
raise RuntimeError("Database session maker not initialized. Call init_db() first.")
async with async_session_maker() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def close_db() -> None:
"""Close database engine."""
global engine
if engine:
logger.info("Closing database connection...")
await engine.dispose()
logger.info("Database connection closed")