hubmc-datagw/src/hubgw/context.py

41 lines
1.2 KiB
Python

"""Application context singleton."""
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from hubgw.core.config import APP_CONFIG
class AppContext:
"""Application context singleton."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.settings = APP_CONFIG
self.engine: AsyncEngine = None
self.session_factory: async_sessionmaker = None
self.initialized = True
async def startup(self):
"""Initialize database engine and session factory."""
self.engine = create_async_engine(
self.settings.database.dsn,
pool_size=self.settings.database.pool_size,
max_overflow=self.settings.database.max_overflow,
echo=self.settings.database.echo
)
self.session_factory = async_sessionmaker(
self.engine,
expire_on_commit=False
)
async def shutdown(self):
"""Close database engine."""
if self.engine:
await self.engine.dispose()