refactor: clean code

This commit is contained in:
itqop 2025-10-18 16:10:17 +03:00
parent f788127716
commit 9501ea92fe
59 changed files with 826 additions and 594 deletions

View File

@ -1,18 +1,14 @@
"""Entry point for hubgw application."""
import uvicorn
from hubgw.main import create_app
def main():
"""Main entry point."""
app = create_app()
uvicorn.run(
app,
host="0.0.0.0",
port=8080,
log_level="info"
)
uvicorn.run(app, host="0.0.0.0", port=8080, log_level="info")
if __name__ == "__main__":

View File

@ -1,21 +1,22 @@
"""Dependency providers for FastAPI."""
from collections.abc import AsyncGenerator
from fastapi import Depends, HTTPException, Header
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from fastapi import Depends, Header, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.context import AppContext
from hubgw.services.audit_service import AuditService
from hubgw.services.cooldowns_service import CooldownsService
from hubgw.services.homes_service import HomesService
from hubgw.services.kits_service import KitsService
from hubgw.services.cooldowns_service import CooldownsService
from hubgw.services.luckperms_service import LuckPermsService
from hubgw.services.punishments_service import PunishmentsService
from hubgw.services.teleport_history_service import TeleportHistoryService
from hubgw.services.users_service import UserService
from hubgw.services.warps_service import WarpsService
from hubgw.services.whitelist_service import WhitelistService
from hubgw.services.punishments_service import PunishmentsService
from hubgw.services.audit_service import AuditService
from hubgw.services.luckperms_service import LuckPermsService
from hubgw.services.teleport_history_service import TeleportHistoryService
async def get_context() -> AppContext:
@ -41,59 +42,82 @@ async def get_azuriom_session(
async def verify_api_key(
x_api_key: Annotated[str, Header(alias="X-API-Key")],
context: Annotated[AppContext, Depends(get_context)]
context: Annotated[AppContext, Depends(get_context)],
) -> str:
"""Verify API key."""
if x_api_key != context.settings.security.api_key:
raise HTTPException(status_code=401, detail=f"Invalid API key, {x_api_key=}, {context.settings.security.api_key=}")
raise HTTPException(
status_code=401,
detail=f"Invalid API key, {x_api_key=}, {context.settings.security.api_key=}",
)
return x_api_key
def get_homes_service(session: Annotated[AsyncSession, Depends(get_session)]) -> HomesService:
def get_homes_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> HomesService:
"""Get homes service."""
return HomesService(session)
def get_kits_service(session: Annotated[AsyncSession, Depends(get_session)]) -> KitsService:
def get_kits_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> KitsService:
"""Get kits service."""
return KitsService(session)
def get_cooldowns_service(session: Annotated[AsyncSession, Depends(get_session)]) -> CooldownsService:
def get_cooldowns_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> CooldownsService:
"""Get cooldowns service."""
return CooldownsService(session)
def get_warps_service(session: Annotated[AsyncSession, Depends(get_session)]) -> WarpsService:
def get_warps_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> WarpsService:
"""Get warps service."""
return WarpsService(session)
def get_whitelist_service(session: Annotated[AsyncSession, Depends(get_session)]) -> WhitelistService:
def get_whitelist_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> WhitelistService:
"""Get whitelist service."""
return WhitelistService(session)
def get_punishments_service(session: Annotated[AsyncSession, Depends(get_session)]) -> PunishmentsService:
def get_punishments_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> PunishmentsService:
"""Get punishments service."""
return PunishmentsService(session)
def get_audit_service(session: Annotated[AsyncSession, Depends(get_session)]) -> AuditService:
def get_audit_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> AuditService:
"""Get audit service."""
return AuditService(session)
def get_luckperms_service(session: Annotated[AsyncSession, Depends(get_session)]) -> LuckPermsService:
def get_luckperms_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> LuckPermsService:
"""Get luckperms service."""
return LuckPermsService(session)
def get_teleport_history_service(session: Annotated[AsyncSession, Depends(get_session)]) -> TeleportHistoryService:
def get_teleport_history_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> TeleportHistoryService:
"""Get teleport history service."""
return TeleportHistoryService(session)
def get_user_service(session: Annotated[AsyncSession, Depends(get_azuriom_session)]) -> UserService:
def get_user_service(
session: Annotated[AsyncSession, Depends(get_azuriom_session)]
) -> UserService:
"""Get user service."""
return UserService(session)

View File

@ -1,12 +1,13 @@
"""Audit endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_audit_service, verify_api_key
from hubgw.services.audit_service import AuditService
from hubgw.schemas.audit import CommandAuditRequest, CommandAuditResponse
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.audit import CommandAuditRequest, CommandAuditResponse
from hubgw.services.audit_service import AuditService
router = APIRouter()
@ -15,7 +16,7 @@ router = APIRouter()
async def log_command(
request: CommandAuditRequest,
service: Annotated[AuditService, Depends(get_audit_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Log command execution for audit."""
try:

View File

@ -1,13 +1,14 @@
"""Cooldowns endpoints."""
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_cooldowns_service, verify_api_key
from hubgw.services.cooldowns_service import CooldownsService
from hubgw.schemas.cooldowns import CooldownCheckRequest, CooldownCheckResponse, CooldownCreate
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.cooldowns import (CooldownCheckRequest,
CooldownCheckResponse, CooldownCreate)
from hubgw.services.cooldowns_service import CooldownsService
router = APIRouter()
@ -16,7 +17,7 @@ router = APIRouter()
async def check_cooldown(
request: CooldownCheckRequest,
service: Annotated[CooldownsService, Depends(get_cooldowns_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Check cooldown status."""
try:
@ -29,7 +30,7 @@ async def check_cooldown(
async def create_cooldown(
request: CooldownCreate,
service: Annotated[CooldownsService, Depends(get_cooldowns_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Create cooldown."""
try:

View File

@ -1,13 +1,14 @@
"""Homes endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_homes_service, verify_api_key
from hubgw.services.homes_service import HomesService
from hubgw.schemas.homes import HomeUpsertRequest, HomeGetRequest, Home, HomeGetResponse, HomeListResponse
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.homes import (Home, HomeGetRequest, HomeGetResponse,
HomeListResponse, HomeUpsertRequest)
from hubgw.services.homes_service import HomesService
router = APIRouter()
@ -16,7 +17,7 @@ router = APIRouter()
async def upsert_home(
request: HomeUpsertRequest,
service: Annotated[HomesService, Depends(get_homes_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Upsert home."""
try:
@ -29,7 +30,7 @@ async def upsert_home(
async def get_home(
request: HomeGetRequest,
service: Annotated[HomesService, Depends(get_homes_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get home."""
try:
@ -42,7 +43,7 @@ async def get_home(
async def list_homes(
player_uuid: str,
service: Annotated[HomesService, Depends(get_homes_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""List homes for player."""
try:

View File

@ -1,12 +1,13 @@
"""Kits endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_kits_service, verify_api_key
from hubgw.services.kits_service import KitsService
from hubgw.schemas.kits import KitClaimRequest, KitClaimResponse
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.kits import KitClaimRequest, KitClaimResponse
from hubgw.services.kits_service import KitsService
router = APIRouter()
@ -15,7 +16,7 @@ router = APIRouter()
async def claim_kit(
request: KitClaimRequest,
service: Annotated[KitsService, Depends(get_kits_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Claim kit."""
try:

View File

@ -1,16 +1,18 @@
"""LuckPerms endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from hubgw.api.deps import get_luckperms_service, get_user_service, verify_api_key
from fastapi import APIRouter, Depends
from hubgw.api.deps import (get_luckperms_service, get_user_service,
verify_api_key)
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.luckperms import (LuckPermsGroup, LuckPermsPlayer,
LuckPermsPlayerCreateRequest,
LuckPermsPlayerWithPermissions,
LuckPermsUserPermission)
from hubgw.services.luckperms_service import LuckPermsService
from hubgw.services.users_service import UserService
from hubgw.schemas.luckperms import (
LuckPermsPlayer, LuckPermsGroup, LuckPermsUserPermission,
LuckPermsPlayerWithPermissions, LuckPermsPlayerCreateRequest
)
from hubgw.core.errors import AppError, create_http_exception
router = APIRouter()
@ -19,7 +21,7 @@ router = APIRouter()
async def get_player(
uuid: str,
service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get player by UUID."""
try:
@ -32,7 +34,7 @@ async def get_player(
async def get_player_by_username(
username: str,
service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get player by username."""
try:
@ -45,7 +47,7 @@ async def get_player_by_username(
async def get_group(
name: str,
service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get group by name."""
try:
@ -58,7 +60,7 @@ async def get_group(
async def get_user_permissions(
uuid: str,
service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get user permissions."""
try:
@ -67,11 +69,13 @@ async def get_user_permissions(
raise create_http_exception(e)
@router.get("/players/{uuid}/with-permissions", response_model=LuckPermsPlayerWithPermissions)
@router.get(
"/players/{uuid}/with-permissions", response_model=LuckPermsPlayerWithPermissions
)
async def get_player_with_permissions(
uuid: str,
service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get player with permissions."""
try:
@ -85,7 +89,7 @@ async def create_player(
request: LuckPermsPlayerCreateRequest,
luckperms_service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
user_service: Annotated[UserService, Depends(get_user_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Create a new player in LuckPerms."""
try:

View File

@ -1,16 +1,17 @@
"""Punishments endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_punishments_service, verify_api_key
from hubgw.services.punishments_service import PunishmentsService
from hubgw.schemas.punishments import (
PunishmentCreateRequest, PunishmentRevokeRequest, PunishmentQuery,
PunishmentBase, PunishmentListResponse, ActiveBanStatusResponse, ActiveMuteStatusResponse
)
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.punishments import (ActiveBanStatusResponse,
ActiveMuteStatusResponse,
PunishmentBase, PunishmentCreateRequest,
PunishmentListResponse, PunishmentQuery,
PunishmentRevokeRequest)
from hubgw.services.punishments_service import PunishmentsService
router = APIRouter()
@ -19,7 +20,7 @@ router = APIRouter()
async def create_punishment(
request: PunishmentCreateRequest,
service: Annotated[PunishmentsService, Depends(get_punishments_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Create punishment."""
try:
@ -32,7 +33,7 @@ async def create_punishment(
async def revoke_punishment(
request: PunishmentRevokeRequest,
service: Annotated[PunishmentsService, Depends(get_punishments_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Revoke punishment."""
try:
@ -45,7 +46,7 @@ async def revoke_punishment(
async def query_punishments(
query: PunishmentQuery,
service: Annotated[PunishmentsService, Depends(get_punishments_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Query punishments."""
try:
@ -58,7 +59,7 @@ async def query_punishments(
async def get_ban_status(
player_uuid: str,
service: Annotated[PunishmentsService, Depends(get_punishments_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get active ban status for player."""
try:
@ -71,7 +72,7 @@ async def get_ban_status(
async def get_mute_status(
player_uuid: str,
service: Annotated[PunishmentsService, Depends(get_punishments_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get active mute status for player."""
try:

View File

@ -1,7 +1,10 @@
"""Main API v1 router."""
from fastapi import APIRouter
from hubgw.api.v1 import health, homes, kits, cooldowns, warps, whitelist, punishments, audit, luckperms, teleport_history, users
from hubgw.api.v1 import (audit, cooldowns, health, homes, kits, luckperms,
punishments, teleport_history, users, warps,
whitelist)
api_router = APIRouter()
@ -12,8 +15,12 @@ api_router.include_router(kits.router, prefix="/kits", tags=["kits"])
api_router.include_router(cooldowns.router, prefix="/cooldowns", tags=["cooldowns"])
api_router.include_router(warps.router, prefix="/warps", tags=["warps"])
api_router.include_router(whitelist.router, prefix="/whitelist", tags=["whitelist"])
api_router.include_router(punishments.router, prefix="/punishments", tags=["punishments"])
api_router.include_router(
punishments.router, prefix="/punishments", tags=["punishments"]
)
api_router.include_router(audit.router, prefix="/audit", tags=["audit"])
api_router.include_router(luckperms.router, prefix="/luckperms", tags=["luckperms"])
api_router.include_router(teleport_history.router, prefix="/teleport-history", tags=["teleport-history"])
api_router.include_router(
teleport_history.router, prefix="/teleport-history", tags=["teleport-history"]
)
api_router.include_router(users.router, prefix="/users", tags=["users"])

View File

@ -1,12 +1,14 @@
"""Teleport History endpoints."""
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from hubgw.api.deps import get_teleport_history_service, verify_api_key
from hubgw.services.teleport_history_service import TeleportHistoryService
from hubgw.schemas.teleport_history import TeleportHistoryCreate, TeleportHistory
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.teleport_history import (TeleportHistory,
TeleportHistoryCreate)
from hubgw.services.teleport_history_service import TeleportHistoryService
router = APIRouter()
@ -15,7 +17,7 @@ router = APIRouter()
async def create_teleport(
request: TeleportHistoryCreate,
service: Annotated[TeleportHistoryService, Depends(get_teleport_history_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Create teleport history entry."""
try:
@ -29,10 +31,12 @@ async def list_player_teleports(
player_uuid: str,
service: Annotated[TeleportHistoryService, Depends(get_teleport_history_service)],
_: Annotated[str, Depends(verify_api_key)],
limit: int = Query(100, ge=1, le=1000, description="Maximum number of entries to return")
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of entries to return"
),
):
"""List teleport history for player."""
try:
return await service.list_player_teleports(player_uuid, limit)
except AppError as e:
raise create_http_exception(e)
raise create_http_exception(e)

View File

@ -1,12 +1,13 @@
"""User endpoints."""
from fastapi import APIRouter, Depends
from typing import Annotated
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_user_service, verify_api_key
from hubgw.services.users_service import UserService
from hubgw.schemas.users import GetUserGameIdResponse
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.users import GetUserGameIdResponse
from hubgw.services.users_service import UserService
router = APIRouter()
@ -15,7 +16,7 @@ router = APIRouter()
async def get_user_game_id(
name: str,
service: Annotated[UserService, Depends(get_user_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get game ID by user name."""
try:

View File

@ -1,15 +1,16 @@
"""Warps endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated
from fastapi import APIRouter, Depends
from hubgw.api.deps import get_warps_service, verify_api_key
from hubgw.services.warps_service import WarpsService
from hubgw.schemas.warps import (
WarpCreateRequest, WarpUpdateRequest, WarpDeleteRequest, WarpGetRequest,
Warp, WarpGetResponse, WarpQuery, WarpListResponse
)
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.warps import (Warp, WarpCreateRequest, WarpDeleteRequest,
WarpGetRequest, WarpGetResponse,
WarpListResponse, WarpQuery,
WarpUpdateRequest)
from hubgw.services.warps_service import WarpsService
router = APIRouter()
@ -18,7 +19,7 @@ router = APIRouter()
async def create_warp(
request: WarpCreateRequest,
service: Annotated[WarpsService, Depends(get_warps_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Create warp."""
try:
@ -31,7 +32,7 @@ async def create_warp(
async def update_warp(
request: WarpUpdateRequest,
service: Annotated[WarpsService, Depends(get_warps_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Update warp."""
try:
@ -44,7 +45,7 @@ async def update_warp(
async def delete_warp(
request: WarpDeleteRequest,
service: Annotated[WarpsService, Depends(get_warps_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Delete warp."""
try:
@ -57,7 +58,7 @@ async def delete_warp(
async def get_warp(
request: WarpGetRequest,
service: Annotated[WarpsService, Depends(get_warps_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get warp."""
try:
@ -70,7 +71,7 @@ async def get_warp(
async def list_warps(
query: WarpQuery,
service: Annotated[WarpsService, Depends(get_warps_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""List warps."""
try:

View File

@ -1,17 +1,19 @@
"""Whitelist endpoints."""
from fastapi import APIRouter, Depends
from typing import Annotated, Optional
from typing import Annotated
from hubgw.api.deps import get_whitelist_service, get_luckperms_service, get_user_service, verify_api_key
from hubgw.services.whitelist_service import WhitelistService
from fastapi import APIRouter, Depends
from hubgw.api.deps import (get_luckperms_service, get_user_service,
get_whitelist_service, verify_api_key)
from hubgw.core.errors import AppError, create_http_exception
from hubgw.schemas.whitelist import (WhitelistAddRequest,
WhitelistCheckRequest,
WhitelistCheckResponse, WhitelistEntry,
WhitelistListResponse, WhitelistRemoveRequest)
from hubgw.services.luckperms_service import LuckPermsService
from hubgw.services.users_service import UserService
from hubgw.schemas.whitelist import (
WhitelistAddRequest, WhitelistRemoveRequest, WhitelistCheckRequest,
WhitelistEntry, WhitelistCheckResponse, WhitelistListResponse, WhitelistQuery
)
from hubgw.core.errors import AppError, create_http_exception
from hubgw.services.whitelist_service import WhitelistService
router = APIRouter()
@ -22,7 +24,7 @@ async def add_player(
service: Annotated[WhitelistService, Depends(get_whitelist_service)],
luckperms_service: Annotated[LuckPermsService, Depends(get_luckperms_service)],
user_service: Annotated[UserService, Depends(get_user_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Add player to whitelist."""
try:
@ -35,7 +37,7 @@ async def add_player(
async def remove_player(
request: WhitelistRemoveRequest,
service: Annotated[WhitelistService, Depends(get_whitelist_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Remove player from whitelist."""
try:
@ -48,7 +50,7 @@ async def remove_player(
async def check_player(
request: WhitelistCheckRequest,
service: Annotated[WhitelistService, Depends(get_whitelist_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Check if player is whitelisted."""
try:
@ -60,7 +62,7 @@ async def check_player(
@router.get("/", response_model=WhitelistListResponse)
async def list_players(
service: Annotated[WhitelistService, Depends(get_whitelist_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""List all whitelisted players with optional filters and pagination."""
try:
@ -72,7 +74,7 @@ async def list_players(
@router.get("/count")
async def get_count(
service: Annotated[WhitelistService, Depends(get_whitelist_service)],
_: Annotated[str, Depends(verify_api_key)]
_: Annotated[str, Depends(verify_api_key)],
):
"""Get total count of whitelisted players."""
try:
@ -80,4 +82,3 @@ async def get_count(
return {"total": count}
except AppError as e:
raise create_http_exception(e)

View File

@ -2,8 +2,8 @@
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import (AsyncSession, async_sessionmaker,
create_async_engine)
from hubgw.core.config import APP_CONFIG
from hubgw.models.base import Base
@ -121,7 +121,7 @@ class AppContext(metaclass=Singleton):
self._engine = None
except Exception as e:
print(f"Error disposing engine: {e}")
try:
if self._azuriom_engine is not None:
await self._azuriom_engine.dispose()
@ -134,4 +134,4 @@ APP_CTX = AppContext()
__all__ = [
"APP_CTX",
]
]

View File

@ -1,6 +1,7 @@
"""Core module for hubgw."""
from .config import APP_CONFIG
__all__ = [
"APP_CONFIG",
]
]

View File

@ -1,69 +1,68 @@
"""Application configuration using Pydantic Settings."""
from typing import Optional
from urllib.parse import quote_plus
from dotenv import load_dotenv
from pydantic import Field, computed_field
from pydantic_settings import BaseSettings
from typing import Optional
from urllib.parse import quote_plus
load_dotenv()
class DatabaseSettings(BaseSettings):
"""Database configuration settings."""
host: str = Field(
default="localhost",
validation_alias="DATABASE__HOST",
description="Database host"
description="Database host",
)
port: int = Field(
default=5432,
validation_alias="DATABASE__PORT",
ge=1,
le=65535,
description="Database port"
description="Database port",
)
user: str = Field(
default="user",
validation_alias="DATABASE__USER",
description="Database user"
default="user", validation_alias="DATABASE__USER", description="Database user"
)
password: str = Field(
default="pass",
validation_alias="DATABASE__PASSWORD",
description="Database password"
description="Database password",
)
database: str = Field(
default="hubgw",
validation_alias="DATABASE__DATABASE",
description="Database name"
description="Database name",
)
azuriom_database: str = Field(
default="azuriom",
validation_alias="DATABASE__AZURIOM_DATABASE",
description="Azuriom database name"
description="Azuriom database name",
)
pool_size: int = Field(
default=10,
validation_alias="DATABASE__POOL_SIZE",
ge=1,
le=100,
description="Database connection pool size"
description="Database connection pool size",
)
max_overflow: int = Field(
default=10,
validation_alias="DATABASE__MAX_OVERFLOW",
ge=0,
le=100,
description="Maximum number of overflow connections"
description="Maximum number of overflow connections",
)
echo: bool = Field(
default=False,
validation_alias="DATABASE__ECHO",
description="Enable SQLAlchemy query logging"
description="Enable SQLAlchemy query logging",
)
@computed_field
@property
def dsn(self) -> str:
@ -72,7 +71,7 @@ class DatabaseSettings(BaseSettings):
f"postgresql+asyncpg://{self.user}:{quote_plus(self.password)}"
f"@{self.host}:{self.port}/{self.database}"
)
@computed_field
@property
def azuriom_dsn(self) -> str:
@ -85,55 +84,50 @@ class DatabaseSettings(BaseSettings):
class SecuritySettings(BaseSettings):
"""Security configuration settings."""
api_key: str = Field(
default="your-api-key",
validation_alias="SECURITY__API_KEY",
min_length=8,
description="API key for authentication"
description="API key for authentication",
)
rate_limit_per_min: Optional[int] = Field(
default=None,
validation_alias="SECURITY__RATE_LIMIT_PER_MIN",
ge=1,
description="Rate limit per minute (None = disabled)"
description="Rate limit per minute (None = disabled)",
)
class AppSettings(BaseSettings):
"""Application settings."""
env: str = Field(
default="dev",
validation_alias="APP__ENV",
description="Application environment (dev/prod/test)"
description="Application environment (dev/prod/test)",
)
host: str = Field(
default="0.0.0.0",
validation_alias="APP__HOST",
description="Application host"
default="0.0.0.0", validation_alias="APP__HOST", description="Application host"
)
port: int = Field(
default=8080,
validation_alias="APP__PORT",
ge=1,
le=65535,
description="Application port"
description="Application port",
)
log_level: str = Field(
default="INFO",
validation_alias="APP__LOG_LEVEL",
description="Logging level"
default="INFO", validation_alias="APP__LOG_LEVEL", description="Logging level"
)
class Secrets():
class Secrets:
"""Main configuration container with all settings."""
app: AppSettings = AppSettings()
database: DatabaseSettings = DatabaseSettings()
security: SecuritySettings = SecuritySettings()
APP_CONFIG = Secrets()

View File

@ -1,17 +1,18 @@
"""Custom exceptions and error handlers."""
from fastapi import HTTPException
from typing import Any, Dict, Optional
from fastapi import HTTPException
class AppError(Exception):
"""Base application error."""
def __init__(
self,
message: str,
code: str = "app_error",
details: Optional[Dict[str, Any]] = None
details: Optional[Dict[str, Any]] = None,
):
self.message = message
self.code = code
@ -21,28 +22,28 @@ class AppError(Exception):
class ValidationError(AppError):
"""Validation error."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
super().__init__(message, "invalid_state", details)
class NotFoundError(AppError):
"""Not found error."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
super().__init__(message, "not_found", details)
class AlreadyExistsError(AppError):
"""Already exists error."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
super().__init__(message, "already_exists", details)
class CooldownActiveError(AppError):
"""Cooldown active error."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
super().__init__(message, "cooldown_active", details)
@ -54,12 +55,8 @@ def create_http_exception(error: AppError) -> HTTPException:
status_code = 404
elif isinstance(error, AlreadyExistsError):
status_code = 409
return HTTPException(
status_code=status_code,
detail={
"message": error.message,
"code": error.code,
"details": error.details
}
detail={"message": error.message, "code": error.code, "details": error.details},
)

View File

@ -1,25 +1,27 @@
"""Logging configuration using loguru."""
import sys
from loguru import logger
from hubgw.core.config import APP_CONFIG
def setup_logging():
"""Setup loguru logging configuration."""
settings = APP_CONFIG.app
# Remove default handler
logger.remove()
# Add console handler
logger.add(
sys.stdout,
level=settings.log_level,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
colorize=True
colorize=True,
)
# Add file handler for production
if settings.env == "prod":
logger.add(
@ -28,5 +30,5 @@ def setup_logging():
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
rotation="1 day",
retention="30 days",
compression="zip"
compression="zip",
)

View File

@ -1,9 +1,12 @@
"""FastAPI application factory."""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from hubgw.core.logging import setup_logging
from hubgw.context import AppContext
from hubgw.api.v1.router import api_router
from hubgw.context import AppContext
from hubgw.core.logging import setup_logging
@asynccontextmanager
@ -21,11 +24,11 @@ def create_app() -> FastAPI:
title="HubGW",
description="FastAPI Gateway for HubMC",
version="0.1.0",
lifespan=lifespan
lifespan=lifespan,
)
setup_logging()
app.include_router(api_router, prefix="/api/v1")
return app

View File

@ -1,12 +1,14 @@
"""Base model class for SQLAlchemy."""
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, DateTime, func
from datetime import datetime
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
"""Base class for all models."""
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
updated_at = Column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)

View File

@ -1,7 +1,9 @@
"""Cooldown model."""
from sqlalchemy import Column, String, DateTime, Integer, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy import (Column, DateTime, ForeignKey, Integer, String,
UniqueConstraint)
from sqlalchemy.dialects.postgresql import JSONB, UUID
from hubgw.models.base import Base
@ -10,11 +12,20 @@ class Cooldown(Base):
__tablename__ = "hub_cooldowns"
__table_args__ = (
UniqueConstraint('player_uuid', 'cooldown_type', name='idx_hub_cooldowns_player_type'),
UniqueConstraint(
"player_uuid", "cooldown_type", name="idx_hub_cooldowns_player_type"
),
)
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
player_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="CASCADE"), nullable=False, index=True)
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
player_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="CASCADE"),
nullable=False,
index=True,
)
cooldown_type = Column(String(50), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False, index=True)
cooldown_seconds = Column(Integer, nullable=False)

View File

@ -1,7 +1,9 @@
"""Home model."""
from sqlalchemy import Column, Float, String, Boolean, ForeignKey, REAL, Text, UniqueConstraint
from sqlalchemy import (REAL, Boolean, Column, Float, ForeignKey, String, Text,
UniqueConstraint)
from sqlalchemy.dialects.postgresql import UUID
from hubgw.models.base import Base
@ -10,11 +12,18 @@ class Home(Base):
__tablename__ = "hub_homes"
__table_args__ = (
UniqueConstraint('player_uuid', 'name', name='idx_hub_homes_player_name'),
UniqueConstraint("player_uuid", "name", name="idx_hub_homes_player_name"),
)
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
player_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="CASCADE"), nullable=False, index=True)
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
player_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="CASCADE"),
nullable=False,
index=True,
)
name = Column(String(255), nullable=False)
world = Column(Text, nullable=False)
x = Column(Float, nullable=False)

View File

@ -1,14 +1,7 @@
"""LuckPerms models."""
from sqlalchemy import (
Column,
String,
Boolean,
BigInteger,
ForeignKey,
Integer,
Index,
)
from sqlalchemy import (BigInteger, Boolean, Column, ForeignKey, Index,
Integer, String)
from sqlalchemy.orm import relationship
from hubgw.models.base import Base
@ -23,7 +16,9 @@ class LuckPermsPlayer(Base):
username = Column(String(16), nullable=False, index=True)
primary_group = Column(String(36), nullable=False)
permissions = relationship("LuckPermsUserPermission", back_populates="player", cascade="all, delete-orphan")
permissions = relationship(
"LuckPermsUserPermission", back_populates="player", cascade="all, delete-orphan"
)
class LuckPermsGroup(Base):
@ -39,11 +34,22 @@ class LuckPermsUserPermission(Base):
__tablename__ = "luckperms_user_permissions"
__table_args__ = (
Index('idx_luckperms_user_permissions_lookup', 'uuid', 'permission', 'server', 'world'),
Index(
"idx_luckperms_user_permissions_lookup",
"uuid",
"permission",
"server",
"world",
),
)
id = Column(Integer, primary_key=True, autoincrement=True)
uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="CASCADE"), nullable=False, index=True)
uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="CASCADE"),
nullable=False,
index=True,
)
permission = Column(String(200), nullable=False)
value = Column(Boolean, nullable=False)
server = Column(String(36))
@ -51,4 +57,4 @@ class LuckPermsUserPermission(Base):
expiry = Column(BigInteger)
contexts = Column(String(200))
player = relationship("LuckPermsPlayer", back_populates="permissions")
player = relationship("LuckPermsPlayer", back_populates="permissions")

View File

@ -1,7 +1,9 @@
"""Punishment model."""
from sqlalchemy import Column, String, DateTime, Boolean, Text, ForeignKey, CheckConstraint, Index
from sqlalchemy.dialects.postgresql import UUID, INET
from sqlalchemy import (Boolean, CheckConstraint, Column, DateTime, ForeignKey,
Index, String, Text)
from sqlalchemy.dialects.postgresql import INET, UUID
from hubgw.models.base import Base
@ -12,25 +14,52 @@ class Punishment(Base):
__table_args__ = (
CheckConstraint(
"punishment_type IN ('BAN', 'MUTE', 'KICK', 'WARN', 'TEMPBAN', 'TEMPMUTE')",
name='check_punishment_type'
name="check_punishment_type",
),
Index(
"idx_hub_punishments_player_active",
"player_uuid",
"is_active",
postgresql_where=Column("is_active"),
),
Index(
"idx_hub_punishments_type_active",
"punishment_type",
"is_active",
postgresql_where=Column("is_active"),
),
Index(
"idx_hub_punishments_player_ip",
"player_ip",
postgresql_where=Column("player_ip") != None,
),
Index('idx_hub_punishments_player_active', 'player_uuid', 'is_active', postgresql_where=Column('is_active')),
Index('idx_hub_punishments_type_active', 'punishment_type', 'is_active', postgresql_where=Column('is_active')),
Index('idx_hub_punishments_player_ip', 'player_ip', postgresql_where=Column('player_ip') != None),
)
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
player_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="CASCADE"), nullable=False, index=True)
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
player_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="CASCADE"),
nullable=False,
index=True,
)
player_name = Column(String(255), nullable=False)
player_ip = Column(INET)
punishment_type = Column(String(50), nullable=False)
reason = Column(Text, nullable=False)
staff_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="SET NULL"), nullable=False)
staff_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="SET NULL"),
nullable=False,
)
staff_name = Column(String(255), nullable=False)
expires_at = Column(DateTime(timezone=True))
is_active = Column(Boolean, default=True)
revoked_at = Column(DateTime(timezone=True))
revoked_by = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="SET NULL"))
revoked_by = Column(
String(36), ForeignKey("luckperms_players.uuid", ondelete="SET NULL")
)
revoked_reason = Column(Text)
evidence_url = Column(Text)
notes = Column(Text)

View File

@ -1,8 +1,9 @@
"""Teleport History model."""
from sqlalchemy import Column, String, Float, ForeignKey, Text, DateTime
from sqlalchemy import Column, DateTime, Float, ForeignKey, String, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import func
from hubgw.models.base import Base
@ -11,8 +12,15 @@ class TeleportHistory(Base):
__tablename__ = "hub_teleport_history"
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
player_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="CASCADE"), nullable=False, index=True)
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
player_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="CASCADE"),
nullable=False,
index=True,
)
from_world = Column(Text)
from_x = Column(Float) # DOUBLE PRECISION
from_y = Column(Float) # DOUBLE PRECISION
@ -23,4 +31,6 @@ class TeleportHistory(Base):
to_z = Column(Float, nullable=False) # DOUBLE PRECISION
tp_type = Column(String(50), nullable=False)
target_name = Column(String(255))
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)

View File

@ -1,7 +1,8 @@
"""User model."""
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Numeric, ForeignKey, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import (Boolean, Column, DateTime, ForeignKey, Index, Integer,
Numeric, String)
from hubgw.models.base import Base
@ -18,7 +19,7 @@ class User(Base):
role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
money = Column(Numeric(14, 2), default=0)
game_id = Column(String(191), nullable=True)
avatar = Column(String, nullable=True) # TEXT поле
avatar = Column(String, nullable=True) # TEXT поле
access_token = Column(String(191), nullable=True)
two_factor_secret = Column(String(191), nullable=True)
two_factor_recovery_codes = Column(String(191), nullable=True)
@ -31,6 +32,4 @@ class User(Base):
deleted_at = Column(DateTime(timezone=True), nullable=True)
password_changed_at = Column(DateTime(timezone=True), nullable=True)
__table_args__ = (
Index('idx_users_email', 'email', unique=True),
)
__table_args__ = (Index("idx_users_email", "email", unique=True),)

View File

@ -1,7 +1,8 @@
"""Warp model."""
from sqlalchemy import Column, String, Float, Boolean, REAL, Text, Index
from sqlalchemy import REAL, Boolean, Column, Float, Index, String, Text
from sqlalchemy.dialects.postgresql import UUID
from hubgw.models.base import Base
@ -10,11 +11,15 @@ class Warp(Base):
__tablename__ = "hub_warps"
__table_args__ = (
Index('idx_hub_warps_world', 'world'),
Index('idx_hub_warps_public', 'is_public', postgresql_where=Column('is_public')),
Index("idx_hub_warps_world", "world"),
Index(
"idx_hub_warps_public", "is_public", postgresql_where=Column("is_public")
),
)
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
name = Column(String(255), nullable=False, unique=True, index=True)
world = Column(Text, nullable=False)
x = Column(Float, nullable=False)

View File

@ -1,7 +1,8 @@
"""Whitelist model."""
from sqlalchemy import Column, String, DateTime, Boolean, ForeignKey, Index
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Index, String
from sqlalchemy.dialects.postgresql import UUID
from hubgw.models.base import Base
@ -10,13 +11,27 @@ class WhitelistEntry(Base):
__tablename__ = "hub_whitelist"
__table_args__ = (
Index('idx_hub_whitelist_active', 'is_active', postgresql_where=Column('is_active')),
Index('idx_hub_whitelist_expires', 'expires_at', postgresql_where=Column('expires_at') != None),
Index(
"idx_hub_whitelist_active",
"is_active",
postgresql_where=Column("is_active"),
),
Index(
"idx_hub_whitelist_expires",
"expires_at",
postgresql_where=Column("expires_at") != None,
),
)
id = Column(UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()")
id = Column(
UUID(as_uuid=True), primary_key=True, server_default="gen_random_uuid()"
)
player_name = Column(String(255), nullable=False, unique=True, index=True)
player_uuid = Column(String(36), ForeignKey("luckperms_players.uuid", ondelete="SET NULL"), index=True)
player_uuid = Column(
String(36),
ForeignKey("luckperms_players.uuid", ondelete="SET NULL"),
index=True,
)
added_by = Column(String(255), nullable=False)
added_at = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True))

View File

@ -1,10 +1,11 @@
"""Cooldowns repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, update, delete, func
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.cooldown import Cooldown
from hubgw.schemas.cooldowns import CooldownCreate, CooldownQuery
@ -25,7 +26,7 @@ class CooldownsRepository:
cooldown_type=request.cooldown_type,
expires_at=expires_at,
cooldown_seconds=request.cooldown_seconds,
metadata=request.metadata
metadata=request.metadata,
)
self.session.add(cooldown)
await self.session.commit()
@ -38,12 +39,14 @@ class CooldownsRepository:
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_active_by_type(self, player_uuid: str, cooldown_type: str) -> Optional[Cooldown]:
async def get_active_by_type(
self, player_uuid: str, cooldown_type: str
) -> Optional[Cooldown]:
"""Get active cooldown by type."""
stmt = select(Cooldown).where(
Cooldown.player_uuid == player_uuid,
Cooldown.cooldown_type == cooldown_type,
Cooldown.expires_at > func.now()
Cooldown.expires_at > func.now(),
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
@ -87,7 +90,9 @@ class CooldownsRepository:
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
stmt = (
stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
)
result = await self.session.execute(stmt)
cooldowns = list(result.scalars().all())

View File

@ -1,11 +1,12 @@
"""Homes repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, func
from sqlalchemy.dialects.postgresql import insert
from typing import List, Optional
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.home import Home
from hubgw.schemas.homes import HomeCreate, HomeQuery
@ -27,10 +28,10 @@ class HomesRepository:
z=request.z,
yaw=request.yaw,
pitch=request.pitch,
is_public=request.is_public
is_public=request.is_public,
)
stmt = stmt.on_conflict_do_update(
index_elements=['player_uuid', 'name'],
index_elements=["player_uuid", "name"],
set_={
"world": stmt.excluded.world,
"x": stmt.excluded.x,
@ -39,7 +40,7 @@ class HomesRepository:
"yaw": stmt.excluded.yaw,
"pitch": stmt.excluded.pitch,
"is_public": stmt.excluded.is_public,
}
},
).returning(Home)
result = await self.session.execute(stmt)
await self.session.commit()
@ -51,12 +52,11 @@ class HomesRepository:
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_player_and_name(self, player_uuid: str, name: str) -> Optional[Home]:
async def get_by_player_and_name(
self, player_uuid: str, name: str
) -> Optional[Home]:
"""Get home by player and name."""
stmt = select(Home).where(
Home.player_uuid == player_uuid,
Home.name == name
)
stmt = select(Home).where(Home.player_uuid == player_uuid, Home.name == name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()

View File

@ -1,10 +1,11 @@
"""Kits repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.cooldown import Cooldown
from hubgw.schemas.kits import KitCooldownQuery
@ -13,7 +14,7 @@ from hubgw.schemas.kits import KitCooldownQuery
class KitsRepository:
"""
Kits repository for database operations.
NOTE: This repository manages cooldowns for kits, not the kits themselves.
Consider refactoring to move this logic to CooldownsRepository to follow SRP.
"""
@ -21,24 +22,28 @@ class KitsRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_active_cooldown(self, player_uuid: UUID, kit_name: str) -> Optional[Cooldown]:
async def get_active_cooldown(
self, player_uuid: UUID, kit_name: str
) -> Optional[Cooldown]:
"""Check if player has active cooldown for kit."""
stmt = select(Cooldown).where(
Cooldown.player_uuid == str(player_uuid),
Cooldown.cooldown_type == f"kit_{kit_name}",
Cooldown.expires_at > func.now()
Cooldown.expires_at > func.now(),
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def create_cooldown(self, player_uuid: UUID, kit_name: str, cooldown_seconds: int) -> Cooldown:
async def create_cooldown(
self, player_uuid: UUID, kit_name: str, cooldown_seconds: int
) -> Cooldown:
"""Create cooldown for kit."""
expires_at = datetime.utcnow() + timedelta(seconds=cooldown_seconds)
cooldown = Cooldown(
player_uuid=str(player_uuid),
cooldown_type=f"kit_{kit_name}",
expires_at=expires_at,
cooldown_seconds=cooldown_seconds
cooldown_seconds=cooldown_seconds,
)
self.session.add(cooldown)
await self.session.commit()
@ -48,8 +53,7 @@ class KitsRepository:
async def get_cooldown_by_id(self, cooldown_id: UUID) -> Optional[Cooldown]:
"""Get kit cooldown by id."""
stmt = select(Cooldown).where(
Cooldown.id == cooldown_id,
Cooldown.cooldown_type.like("kit_%")
Cooldown.id == cooldown_id, Cooldown.cooldown_type.like("kit_%")
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
@ -57,25 +61,32 @@ class KitsRepository:
async def delete_cooldown_by_id(self, cooldown_id: UUID) -> bool:
"""Delete kit cooldown by id."""
stmt = delete(Cooldown).where(
Cooldown.id == cooldown_id,
Cooldown.cooldown_type.like("kit_%")
Cooldown.id == cooldown_id, Cooldown.cooldown_type.like("kit_%")
)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0
async def query_cooldowns(self, query: KitCooldownQuery) -> tuple[List[Cooldown], int]:
async def query_cooldowns(
self, query: KitCooldownQuery
) -> tuple[List[Cooldown], int]:
"""Query kit cooldowns with filters and pagination."""
stmt = select(Cooldown).where(Cooldown.cooldown_type.like("kit_%"))
count_stmt = select(func.count(Cooldown.id)).where(Cooldown.cooldown_type.like("kit_%"))
count_stmt = select(func.count(Cooldown.id)).where(
Cooldown.cooldown_type.like("kit_%")
)
# Apply filters
if query.player_uuid:
stmt = stmt.where(Cooldown.player_uuid == str(query.player_uuid))
count_stmt = count_stmt.where(Cooldown.player_uuid == str(query.player_uuid))
count_stmt = count_stmt.where(
Cooldown.player_uuid == str(query.player_uuid)
)
if query.kit_name:
stmt = stmt.where(Cooldown.cooldown_type == f"kit_{query.kit_name}")
count_stmt = count_stmt.where(Cooldown.cooldown_type == f"kit_{query.kit_name}")
count_stmt = count_stmt.where(
Cooldown.cooldown_type == f"kit_{query.kit_name}"
)
if query.is_active is not None:
if query.is_active:
stmt = stmt.where(Cooldown.expires_at > func.now())
@ -90,7 +101,9 @@ class KitsRepository:
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
stmt = (
stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
)
result = await self.session.execute(stmt)
cooldowns = list(result.scalars().all())

View File

@ -1,46 +1,54 @@
"""LuckPerms repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from hubgw.models.luckperms import LuckPermsPlayer, LuckPermsGroup, LuckPermsUserPermission
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.luckperms import (LuckPermsGroup, LuckPermsPlayer,
LuckPermsUserPermission)
class LuckPermsRepository:
"""LuckPerms repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_player(self, uuid: str) -> Optional[LuckPermsPlayer]:
"""Get player by UUID."""
stmt = select(LuckPermsPlayer).where(LuckPermsPlayer.uuid == uuid)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_player_by_username(self, username: str) -> Optional[LuckPermsPlayer]:
"""Get player by username."""
stmt = select(LuckPermsPlayer).where(LuckPermsPlayer.username == username)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_group(self, name: str) -> Optional[LuckPermsGroup]:
"""Get group by name."""
stmt = select(LuckPermsGroup).where(LuckPermsGroup.name == name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_user_permissions(self, uuid: str) -> List[LuckPermsUserPermission]:
"""Get user permissions."""
stmt = select(LuckPermsUserPermission).where(LuckPermsUserPermission.uuid == uuid)
stmt = select(LuckPermsUserPermission).where(
LuckPermsUserPermission.uuid == uuid
)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def create_player(self, uuid: str, username: str, primary_group: str) -> LuckPermsPlayer:
async def create_player(
self, uuid: str, username: str, primary_group: str
) -> LuckPermsPlayer:
"""Create a new player in LuckPerms."""
player = LuckPermsPlayer(uuid=uuid, username=username, primary_group=primary_group)
player = LuckPermsPlayer(
uuid=uuid, username=username, primary_group=primary_group
)
self.session.add(player)
await self.session.commit()
await self.session.refresh(player)
return player
return player

View File

@ -1,21 +1,23 @@
"""Punishments repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, update, func
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from typing import List, Optional
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.punishment import Punishment
from hubgw.schemas.punishments import PunishmentCreateRequest, PunishmentRevokeRequest, PunishmentQuery
from hubgw.schemas.punishments import (PunishmentCreateRequest,
PunishmentQuery,
PunishmentRevokeRequest)
class PunishmentsRepository:
"""Punishments repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, request: PunishmentCreateRequest) -> Punishment:
"""Create punishment."""
punishment = Punishment(
@ -28,80 +30,86 @@ class PunishmentsRepository:
staff_name=request.staff_name,
expires_at=request.expires_at,
evidence_url=request.evidence_url,
notes=request.notes
notes=request.notes,
)
self.session.add(punishment)
await self.session.commit()
await self.session.refresh(punishment)
return punishment
async def revoke(self, request: PunishmentRevokeRequest) -> Optional[Punishment]:
"""Revoke punishment."""
stmt = select(Punishment).where(Punishment.id == request.punishment_id)
result = await self.session.execute(stmt)
punishment = result.scalar_one_or_none()
if not punishment:
return None
punishment.is_active = False
punishment.revoked_at = datetime.utcnow()
punishment.revoked_by = request.revoked_by
punishment.revoked_reason = request.revoked_reason
await self.session.commit()
await self.session.refresh(punishment)
return punishment
async def query(self, query: PunishmentQuery) -> tuple[List[Punishment], int]:
"""Query punishments with filters and pagination."""
stmt = select(Punishment)
count_stmt = select(func.count(Punishment.id))
# Apply filters
if query.player_uuid:
stmt = stmt.where(Punishment.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(Punishment.player_uuid == query.player_uuid)
if query.punishment_type:
stmt = stmt.where(Punishment.punishment_type == query.punishment_type)
count_stmt = count_stmt.where(Punishment.punishment_type == query.punishment_type)
count_stmt = count_stmt.where(
Punishment.punishment_type == query.punishment_type
)
if query.is_active is not None:
stmt = stmt.where(Punishment.is_active == query.is_active)
count_stmt = count_stmt.where(Punishment.is_active == query.is_active)
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Punishment.created_at.desc())
stmt = (
stmt.offset(offset).limit(query.size).order_by(Punishment.created_at.desc())
)
result = await self.session.execute(stmt)
punishments = list(result.scalars().all())
return punishments, total
async def get_active_ban(self, player_uuid: str) -> Optional[Punishment]:
"""Get active ban for player."""
stmt = select(Punishment).where(
Punishment.player_uuid == player_uuid,
Punishment.punishment_type.in_(['BAN', 'TEMPBAN']),
Punishment.punishment_type.in_(["BAN", "TEMPBAN"]),
Punishment.is_active == True,
(Punishment.expires_at.is_(None)) | (Punishment.expires_at > datetime.utcnow())
(Punishment.expires_at.is_(None))
| (Punishment.expires_at > datetime.utcnow()),
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_active_mute(self, player_uuid: str) -> Optional[Punishment]:
"""Get active mute for player."""
stmt = select(Punishment).where(
Punishment.player_uuid == player_uuid,
Punishment.punishment_type.in_(['MUTE', 'TEMPMUTE']),
Punishment.punishment_type.in_(["MUTE", "TEMPMUTE"]),
Punishment.is_active == True,
(Punishment.expires_at.is_(None)) | (Punishment.expires_at > datetime.utcnow())
(Punishment.expires_at.is_(None))
| (Punishment.expires_at > datetime.utcnow()),
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()

View File

@ -1,12 +1,14 @@
"""Teleport History repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import List, Optional
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.teleport_history import TeleportHistory
from hubgw.schemas.teleport_history import TeleportHistoryCreate, TeleportHistoryQuery
from hubgw.schemas.teleport_history import (TeleportHistoryCreate,
TeleportHistoryQuery)
class TeleportHistoryRepository:
@ -29,7 +31,9 @@ class TeleportHistoryRepository:
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def query(self, query: TeleportHistoryQuery) -> tuple[List[TeleportHistory], int]:
async def query(
self, query: TeleportHistoryQuery
) -> tuple[List[TeleportHistory], int]:
"""Query teleport history entries with filters and pagination."""
stmt = select(TeleportHistory)
count_stmt = select(func.count(TeleportHistory.id))
@ -37,13 +41,17 @@ class TeleportHistoryRepository:
# Apply filters
if query.player_uuid:
stmt = stmt.where(TeleportHistory.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(TeleportHistory.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(
TeleportHistory.player_uuid == query.player_uuid
)
if query.tp_type:
stmt = stmt.where(TeleportHistory.tp_type == query.tp_type)
count_stmt = count_stmt.where(TeleportHistory.tp_type == query.tp_type)
if query.from_world:
stmt = stmt.where(TeleportHistory.from_world == query.from_world)
count_stmt = count_stmt.where(TeleportHistory.from_world == query.from_world)
count_stmt = count_stmt.where(
TeleportHistory.from_world == query.from_world
)
if query.to_world:
stmt = stmt.where(TeleportHistory.to_world == query.to_world)
count_stmt = count_stmt.where(TeleportHistory.to_world == query.to_world)
@ -54,9 +62,13 @@ class TeleportHistoryRepository:
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(TeleportHistory.created_at.desc())
stmt = (
stmt.offset(offset)
.limit(query.size)
.order_by(TeleportHistory.created_at.desc())
)
result = await self.session.execute(stmt)
entries = list(result.scalars().all())
return entries, total
return entries, total

View File

@ -1,18 +1,19 @@
"""User repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.users import User
class UserRepository:
"""User repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_game_id_by_name(self, name: str) -> Optional[str]:
"""Get game_id by user name."""
stmt = select(User.game_id).where(User.name == name)

View File

@ -1,12 +1,13 @@
"""Warps repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, update, delete, func
from typing import List, Optional
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.warp import Warp
from hubgw.schemas.warps import WarpCreateRequest, WarpUpdateRequest, WarpQuery
from hubgw.schemas.warps import WarpCreateRequest, WarpQuery, WarpUpdateRequest
class WarpsRepository:
@ -26,7 +27,7 @@ class WarpsRepository:
yaw=request.yaw,
pitch=request.pitch,
is_public=request.is_public,
description=request.description
description=request.description,
)
self.session.add(warp)
await self.session.commit()
@ -54,7 +55,7 @@ class WarpsRepository:
if not warp:
return None
update_data = request.model_dump(exclude_unset=True, exclude={'name'})
update_data = request.model_dump(exclude_unset=True, exclude={"name"})
for field, value in update_data.items():
setattr(warp, field, value)

View File

@ -1,13 +1,15 @@
"""Whitelist repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func, and_
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from sqlalchemy import and_, delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.models.whitelist import WhitelistEntry
from hubgw.schemas.whitelist import WhitelistAddRequest, WhitelistCheckRequest, WhitelistQuery, WhitelistRemoveRequest
from hubgw.schemas.whitelist import (WhitelistAddRequest,
WhitelistCheckRequest, WhitelistQuery,
WhitelistRemoveRequest)
class WhitelistRepository:
@ -25,7 +27,7 @@ class WhitelistRepository:
added_at=request.added_at,
expires_at=request.expires_at,
is_active=request.is_active,
reason=request.reason
reason=request.reason,
)
self.session.add(entry)
await self.session.commit()
@ -63,14 +65,22 @@ class WhitelistRepository:
count_stmt = select(func.count(WhitelistEntry.id))
if query.player_name:
stmt = stmt.where(WhitelistEntry.player_name.ilike(f"%{query.player_name}%"))
count_stmt = count_stmt.where(WhitelistEntry.player_name.ilike(f"%{query.player_name}%"))
stmt = stmt.where(
WhitelistEntry.player_name.ilike(f"%{query.player_name}%")
)
count_stmt = count_stmt.where(
WhitelistEntry.player_name.ilike(f"%{query.player_name}%")
)
if query.player_uuid:
stmt = stmt.where(WhitelistEntry.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(WhitelistEntry.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(
WhitelistEntry.player_uuid == query.player_uuid
)
if query.added_by:
stmt = stmt.where(WhitelistEntry.added_by.ilike(f"%{query.added_by}%"))
count_stmt = count_stmt.where(WhitelistEntry.added_by.ilike(f"%{query.added_by}%"))
count_stmt = count_stmt.where(
WhitelistEntry.added_by.ilike(f"%{query.added_by}%")
)
if query.is_active is not None:
stmt = stmt.where(WhitelistEntry.is_active == query.is_active)
count_stmt = count_stmt.where(WhitelistEntry.is_active == query.is_active)
@ -79,20 +89,23 @@ class WhitelistRepository:
total = count_result.scalar_one()
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(WhitelistEntry.added_at.desc())
stmt = (
stmt.offset(offset)
.limit(query.size)
.order_by(WhitelistEntry.added_at.desc())
)
result = await self.session.execute(stmt)
entries = list(result.scalars().all())
return entries, total
async def check(self, request: WhitelistCheckRequest) -> Optional[WhitelistEntry]:
"""Check if player is whitelisted."""
stmt = select(WhitelistEntry).where(
and_(
WhitelistEntry.player_name == request.player_name,
WhitelistEntry.is_active == True
WhitelistEntry.is_active == True,
)
)
result = await self.session.execute(stmt)
@ -100,7 +113,9 @@ class WhitelistRepository:
async def remove(self, request: WhitelistRemoveRequest) -> bool:
"""Remove player from whitelist."""
stmt = delete(WhitelistEntry).where(WhitelistEntry.player_name == request.player_name)
stmt = delete(WhitelistEntry).where(
WhitelistEntry.player_name == request.player_name
)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0

View File

@ -1,14 +1,15 @@
"""Audit schemas."""
from pydantic import BaseModel
from typing import List, Dict, Any
from datetime import datetime
from typing import List
from uuid import UUID
from pydantic import BaseModel
class CommandAuditRequest(BaseModel):
"""Command audit request schema."""
player_uuid: UUID
player_name: str
command: str
@ -19,5 +20,5 @@ class CommandAuditRequest(BaseModel):
class CommandAuditResponse(BaseModel):
"""Command audit response schema."""
accepted: int

View File

@ -1,21 +1,21 @@
"""Common schemas."""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from typing import Optional
from pydantic import BaseModel
class BaseSchema(BaseModel):
"""Base schema with common fields."""
created_at: datetime
updated_at: datetime
class ErrorResponse(BaseModel):
"""Error response schema."""
message: str
code: str
details: Optional[dict] = None
@ -23,10 +23,10 @@ class ErrorResponse(BaseModel):
class PaginationParams(BaseModel):
"""Pagination parameters."""
page: int = 1
size: int = 20
@property
def offset(self) -> int:
return (self.page - 1) * self.size
@ -34,7 +34,7 @@ class PaginationParams(BaseModel):
class PaginatedResponse(BaseModel):
"""Paginated response schema."""
items: list
total: int
page: int

View File

@ -1,22 +1,24 @@
"""Cooldown schemas."""
from pydantic import BaseModel
from uuid import UUID
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema, PaginationParams
class CooldownCheckRequest(BaseModel):
"""Cooldown check request schema."""
player_uuid: str
cooldown_type: str
class CooldownCheckResponse(BaseModel):
"""Cooldown check response schema."""
is_active: bool
expires_at: Optional[datetime] = None
remaining_seconds: Optional[int] = None
@ -24,7 +26,7 @@ class CooldownCheckResponse(BaseModel):
class CooldownBase(BaseModel):
"""Base cooldown schema."""
cooldown_type: str
expires_at: datetime
cooldown_seconds: int
@ -33,13 +35,13 @@ class CooldownBase(BaseModel):
class CooldownCreate(CooldownBase):
"""Cooldown creation schema."""
player_uuid: str
class Cooldown(CooldownBase, BaseSchema):
"""Cooldown schema."""
id: UUID
player_uuid: str

View File

@ -1,15 +1,16 @@
"""Home schemas."""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema, PaginationParams
class HomeBase(BaseModel):
"""Base home schema."""
name: str
world: str
x: float
@ -22,13 +23,13 @@ class HomeBase(BaseModel):
class HomeCreate(HomeBase):
"""Home creation schema."""
player_uuid: str
class HomeUpdate(BaseModel):
"""Home update schema."""
name: Optional[str] = None
world: Optional[str] = None
x: Optional[float] = None
@ -41,33 +42,33 @@ class HomeUpdate(BaseModel):
class HomeUpsertRequest(HomeBase):
"""Home upsert request schema."""
player_uuid: str
class Home(HomeBase, BaseSchema):
"""Home response schema."""
id: UUID
player_uuid: str
class HomeGetRequest(BaseModel):
"""Home get request schema."""
player_uuid: str
name: str
class HomeGetResponse(Home):
"""Home get response schema."""
pass
class HomeListResponse(BaseModel):
"""Home list response schema."""
homes: list[Home]
total: int

View File

@ -1,21 +1,23 @@
"""Kit schemas."""
from pydantic import BaseModel
from uuid import UUID
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import PaginationParams
class KitClaimRequest(BaseModel):
"""Kit claim request schema."""
player_uuid: UUID
kit_name: str
class KitClaimResponse(BaseModel):
"""Kit claim response schema."""
success: bool
message: str
cooldown_remaining: Optional[int] = None

View File

@ -1,13 +1,15 @@
"""LuckPerms schemas."""
from pydantic import BaseModel
from typing import Optional
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema
class LuckPermsPlayer(BaseSchema):
"""LuckPerms Player schema."""
uuid: str
username: str
primary_group: str
@ -15,13 +17,13 @@ class LuckPermsPlayer(BaseSchema):
class LuckPermsGroup(BaseSchema):
"""LuckPerms Group schema."""
name: str
class LuckPermsUserPermission(BaseSchema):
"""LuckPerms User Permission schema."""
id: int
uuid: str
permission: str
@ -34,12 +36,12 @@ class LuckPermsUserPermission(BaseSchema):
class LuckPermsPlayerWithPermissions(LuckPermsPlayer):
"""LuckPerms Player with permissions schema."""
permissions: list[LuckPermsUserPermission] = []
class LuckPermsPlayerCreateRequest(BaseModel):
"""Request schema for creating a LuckPerms player."""
username: str
primary_group: str = "default"
primary_group: str = "default"

View File

@ -1,17 +1,18 @@
"""Punishment schemas."""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from ipaddress import IPv4Address
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema
from ipaddress import IPv4Address
class PunishmentCreateRequest(BaseModel):
"""Punishment create request schema."""
player_uuid: str
player_name: str
player_ip: Optional[IPv4Address] = None
@ -26,7 +27,7 @@ class PunishmentCreateRequest(BaseModel):
class PunishmentRevokeRequest(BaseModel):
"""Punishment revoke request schema."""
punishment_id: UUID
revoked_by: str
revoked_reason: str
@ -34,7 +35,7 @@ class PunishmentRevokeRequest(BaseModel):
class PunishmentQuery(BaseModel):
"""Punishment query schema."""
player_uuid: Optional[str] = None
punishment_type: Optional[str] = None
is_active: Optional[bool] = None
@ -44,7 +45,7 @@ class PunishmentQuery(BaseModel):
class PunishmentBase(BaseSchema):
"""Base punishment schema."""
id: UUID
player_uuid: str
player_name: str
@ -64,7 +65,7 @@ class PunishmentBase(BaseSchema):
class PunishmentListResponse(BaseModel):
"""Punishment list response schema."""
punishments: list[PunishmentBase]
total: int
page: int
@ -74,13 +75,13 @@ class PunishmentListResponse(BaseModel):
class ActiveBanStatusResponse(BaseModel):
"""Active ban status response schema."""
is_banned: bool
punishment: Optional[PunishmentBase] = None
class ActiveMuteStatusResponse(BaseModel):
"""Active mute status response schema."""
is_muted: bool
punishment: Optional[PunishmentBase] = None

View File

@ -1,15 +1,17 @@
"""Teleport History schemas."""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema, PaginationParams
class TeleportHistoryBase(BaseModel):
"""Base teleport history schema."""
from_world: Optional[str] = None
from_x: Optional[float] = None
from_y: Optional[float] = None
@ -24,13 +26,13 @@ class TeleportHistoryBase(BaseModel):
class TeleportHistoryCreate(TeleportHistoryBase):
"""Teleport history creation schema."""
player_uuid: str
class TeleportHistory(TeleportHistoryBase, BaseSchema):
"""Teleport history schema."""
id: UUID
player_uuid: str
created_at: datetime
@ -42,4 +44,4 @@ class TeleportHistoryQuery(PaginationParams):
player_uuid: Optional[str] = None
tp_type: Optional[str] = None
from_world: Optional[str] = None
to_world: Optional[str] = None
to_world: Optional[str] = None

View File

@ -1,10 +1,10 @@
"""User schemas."""
from pydantic import BaseModel
from typing import Optional
class GetUserGameIdResponse(BaseModel):
"""Response schema for getting user's game ID."""
game_id: str

View File

@ -1,15 +1,16 @@
"""Warp schemas."""
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel
from hubgw.schemas.common import BaseSchema, PaginationParams
class WarpBase(BaseModel):
"""Base warp schema."""
name: str
world: str
x: float
@ -23,13 +24,13 @@ class WarpBase(BaseModel):
class WarpCreateRequest(WarpBase):
"""Warp create request schema."""
pass
class WarpUpdateRequest(BaseModel):
"""Warp update request schema."""
name: str
world: Optional[str] = None
x: Optional[float] = None
@ -43,31 +44,31 @@ class WarpUpdateRequest(BaseModel):
class WarpDeleteRequest(BaseModel):
"""Warp delete request schema."""
name: str
class WarpGetRequest(BaseModel):
"""Warp get request schema."""
name: str
class Warp(WarpBase, BaseSchema):
"""Warp response schema."""
id: UUID
class WarpGetResponse(Warp):
"""Warp get response schema."""
pass
class WarpQuery(PaginationParams):
"""Warp query schema."""
name: Optional[str] = None
world: Optional[str] = None
is_public: Optional[bool] = None
@ -75,7 +76,7 @@ class WarpQuery(PaginationParams):
class WarpListResponse(BaseModel):
"""Warp list response schema."""
warps: list[Warp]
total: int
page: int

View File

@ -1,14 +1,15 @@
"""Whitelist schemas."""
from pydantic import BaseModel, ConfigDict
from typing import Optional
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class WhitelistAddRequest(BaseModel):
"""Whitelist add request schema."""
player_name: str
player_uuid: str
added_by: str
@ -20,25 +21,26 @@ class WhitelistAddRequest(BaseModel):
class WhitelistRemoveRequest(BaseModel):
"""Whitelist remove request schema."""
player_name: str
class WhitelistCheckRequest(BaseModel):
"""Whitelist check request schema."""
player_name: str
class WhitelistCheckResponse(BaseModel):
"""Whitelist check response schema."""
is_whitelisted: bool
player_uuid: Optional[str] = None
class WhitelistEntry(BaseModel):
"""Whitelist entry schema."""
model_config = ConfigDict(from_attributes=True)
id: UUID
@ -53,7 +55,7 @@ class WhitelistEntry(BaseModel):
class WhitelistListResponse(BaseModel):
"""Whitelist list response schema."""
entries: list[WhitelistEntry]
total: int

View File

@ -1,15 +1,16 @@
"""Audit service."""
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.schemas.audit import CommandAuditRequest, CommandAuditResponse
class AuditService:
"""Audit service for business logic."""
def __init__(self, session: AsyncSession):
self.session = session
async def log_command(self, request: CommandAuditRequest) -> CommandAuditResponse:
"""Log command execution for audit."""
# In a real implementation, this would store the command in an audit table

View File

@ -1,39 +1,44 @@
"""Cooldowns service."""
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.repositories.cooldowns_repo import CooldownsRepository
from hubgw.schemas.cooldowns import CooldownCheckRequest, CooldownCheckResponse, CooldownCreate
from hubgw.schemas.cooldowns import (CooldownCheckRequest,
CooldownCheckResponse, CooldownCreate)
class CooldownsService:
"""Cooldowns service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = CooldownsRepository(session)
async def check_cooldown(self, request: CooldownCheckRequest) -> CooldownCheckResponse:
async def check_cooldown(
self, request: CooldownCheckRequest
) -> CooldownCheckResponse:
"""Check cooldown status."""
cooldown = await self.repo.check(request.player_uuid, request.cooldown_type)
if not cooldown:
return CooldownCheckResponse(is_active=False)
remaining_seconds = int((cooldown.expires_at - datetime.utcnow()).total_seconds())
remaining_seconds = int(
(cooldown.expires_at - datetime.utcnow()).total_seconds()
)
return CooldownCheckResponse(
is_active=remaining_seconds > 0,
expires_at=cooldown.expires_at,
remaining_seconds=max(0, remaining_seconds)
remaining_seconds=max(0, remaining_seconds),
)
async def create_cooldown(self, request: CooldownCreate) -> None:
"""Create new cooldown."""
await self.repo.create(
player_uuid=request.player_uuid,
cooldown_type=request.cooldown_type,
cooldown_seconds=request.cooldown_seconds,
metadata=request.metadata
metadata=request.metadata,
)

View File

@ -1,30 +1,32 @@
"""Homes service."""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from uuid import UUID
from hubgw.repositories.homes_repo import HomesRepository
from hubgw.schemas.homes import HomeUpsertRequest, HomeGetRequest, Home, HomeGetResponse, HomeListResponse
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import NotFoundError
from hubgw.repositories.homes_repo import HomesRepository
from hubgw.schemas.homes import (Home, HomeGetRequest, HomeGetResponse,
HomeListResponse, HomeUpsertRequest)
class HomesService:
"""Homes service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = HomesRepository(session)
async def upsert_home(self, request: HomeUpsertRequest) -> Home:
"""Upsert home with business logic."""
return await self.repo.upsert(request)
async def get_home(self, request: HomeGetRequest) -> HomeGetResponse:
"""Get home with business logic."""
home = await self.repo.get_by_request(request)
if not home:
raise NotFoundError(f"Home '{request.name}' not found for player {request.player_uuid}")
raise NotFoundError(
f"Home '{request.name}' not found for player {request.player_uuid}"
)
return HomeGetResponse(
id=home.id,
player_uuid=home.player_uuid,
@ -37,13 +39,13 @@ class HomesService:
pitch=home.pitch,
is_public=home.is_public,
created_at=home.created_at,
updated_at=home.updated_at
updated_at=home.updated_at,
)
async def list_homes(self, player_uuid: str) -> HomeListResponse:
"""List homes with business logic."""
homes = await self.repo.list_by_player(player_uuid)
home_list = [
Home(
id=home.id,
@ -57,9 +59,9 @@ class HomesService:
pitch=home.pitch,
is_public=home.is_public,
created_at=home.created_at,
updated_at=home.updated_at
updated_at=home.updated_at,
)
for home in homes
]
return HomeListResponse(homes=home_list, total=len(home_list))

View File

@ -1,38 +1,41 @@
"""Kits service."""
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
from datetime import datetime, timedelta
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import CooldownActiveError
from hubgw.repositories.kits_repo import KitsRepository
from hubgw.schemas.kits import KitClaimRequest, KitClaimResponse
from hubgw.core.errors import CooldownActiveError
class KitsService:
"""Kits service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = KitsRepository(session)
async def claim_kit(self, request: KitClaimRequest) -> KitClaimResponse:
"""Claim kit with cooldown logic."""
# Check if player has active cooldown
cooldown = await self.repo.check_cooldown(request.player_uuid, request.kit_name)
if cooldown:
remaining_seconds = int((cooldown.expires_at - datetime.utcnow()).total_seconds())
remaining_seconds = int(
(cooldown.expires_at - datetime.utcnow()).total_seconds()
)
if remaining_seconds > 0:
raise CooldownActiveError(
f"Kit '{request.kit_name}' is on cooldown for {remaining_seconds} seconds",
{"cooldown_remaining": remaining_seconds}
{"cooldown_remaining": remaining_seconds},
)
# Create cooldown (assuming 1 hour cooldown for all kits)
cooldown_seconds = 3600 # 1 hour
await self.repo.create_cooldown(request.player_uuid, request.kit_name, cooldown_seconds)
return KitClaimResponse(
success=True,
message=f"Kit '{request.kit_name}' claimed successfully"
await self.repo.create_cooldown(
request.player_uuid, request.kit_name, cooldown_seconds
)
return KitClaimResponse(
success=True, message=f"Kit '{request.kit_name}' claimed successfully"
)

View File

@ -1,75 +1,83 @@
"""LuckPerms service."""
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, TYPE_CHECKING
from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import AlreadyExistsError, NotFoundError
from hubgw.repositories.luckperms_repo import LuckPermsRepository
from hubgw.schemas.luckperms import (
LuckPermsPlayer, LuckPermsGroup, LuckPermsUserPermission,
LuckPermsPlayerWithPermissions, LuckPermsPlayerCreateRequest
)
from hubgw.core.errors import NotFoundError, AlreadyExistsError
from hubgw.schemas.luckperms import (LuckPermsGroup, LuckPermsPlayer,
LuckPermsPlayerCreateRequest,
LuckPermsPlayerWithPermissions,
LuckPermsUserPermission)
from hubgw.services.users_service import UserService
class LuckPermsService:
"""LuckPerms service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = LuckPermsRepository(session)
async def get_player(self, uuid: str) -> LuckPermsPlayer:
"""Get player by UUID."""
player = await self.repo.get_player(uuid)
if not player:
raise NotFoundError(f"Player {uuid} not found")
return LuckPermsPlayer.model_validate(player)
async def get_player_by_username(self, username: str) -> LuckPermsPlayer:
"""Get player by username."""
player = await self.repo.get_player_by_username(username)
if not player:
raise NotFoundError(f"Player {username} not found")
return LuckPermsPlayer.model_validate(player)
async def get_group(self, name: str) -> LuckPermsGroup:
"""Get group by name."""
group = await self.repo.get_group(name)
if not group:
raise NotFoundError(f"Group {name} not found")
return LuckPermsGroup.model_validate(group)
async def get_user_permissions(self, uuid: str) -> List[LuckPermsUserPermission]:
"""Get user permissions."""
permissions = await self.repo.get_user_permissions(uuid)
return [LuckPermsUserPermission.model_validate(perm) for perm in permissions]
async def get_player_with_permissions(self, uuid: str) -> LuckPermsPlayerWithPermissions:
async def get_player_with_permissions(
self, uuid: str
) -> LuckPermsPlayerWithPermissions:
"""Get player with permissions."""
player = await self.repo.get_player(uuid)
if not player:
raise NotFoundError(f"Player {uuid} not found")
permissions = await self.repo.get_user_permissions(uuid)
permission_schemas = [LuckPermsUserPermission.model_validate(perm) for perm in permissions]
return LuckPermsPlayerWithPermissions.model_validate(player, permissions=permission_schemas)
async def create_player(self, request: LuckPermsPlayerCreateRequest, user_service: UserService) -> LuckPermsPlayer:
permission_schemas = [
LuckPermsUserPermission.model_validate(perm) for perm in permissions
]
return LuckPermsPlayerWithPermissions.model_validate(
player, permissions=permission_schemas
)
async def create_player(
self, request: LuckPermsPlayerCreateRequest, user_service: UserService
) -> LuckPermsPlayer:
"""Create a new player in LuckPerms."""
existing_player = await self.repo.get_player_by_username(request.username)
if existing_player:
raise AlreadyExistsError(f"Player with username {request.username} already exists")
raise AlreadyExistsError(
f"Player with username {request.username} already exists"
)
user_game_id = await user_service.get_game_id_by_name(request.username)
uuid = user_game_id.game_id
created_player = await self.repo.create_player(
uuid=uuid,
username=request.username,
primary_group=request.primary_group
uuid=uuid, username=request.username, primary_group=request.primary_group
)
return created_player

View File

@ -1,27 +1,30 @@
"""Punishments service."""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from uuid import UUID
from hubgw.repositories.punishments_repo import PunishmentsRepository
from hubgw.schemas.punishments import (
PunishmentCreateRequest, PunishmentRevokeRequest, PunishmentQuery,
PunishmentBase, PunishmentListResponse, ActiveBanStatusResponse, ActiveMuteStatusResponse
)
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import NotFoundError
from hubgw.repositories.punishments_repo import PunishmentsRepository
from hubgw.schemas.punishments import (ActiveBanStatusResponse,
ActiveMuteStatusResponse,
PunishmentBase, PunishmentCreateRequest,
PunishmentListResponse, PunishmentQuery,
PunishmentRevokeRequest)
class PunishmentsService:
"""Punishments service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = PunishmentsRepository(session)
async def create_punishment(self, request: PunishmentCreateRequest) -> PunishmentBase:
async def create_punishment(
self, request: PunishmentCreateRequest
) -> PunishmentBase:
"""Create punishment with business logic."""
punishment = await self.repo.create(request)
return PunishmentBase(
id=punishment.id,
player_uuid=punishment.player_uuid,
@ -38,15 +41,17 @@ class PunishmentsService:
revoked_by=punishment.revoked_by,
revoked_reason=punishment.revoked_reason,
evidence_url=punishment.evidence_url,
notes=punishment.notes
notes=punishment.notes,
)
async def revoke_punishment(self, request: PunishmentRevokeRequest) -> PunishmentBase:
async def revoke_punishment(
self, request: PunishmentRevokeRequest
) -> PunishmentBase:
"""Revoke punishment with business logic."""
punishment = await self.repo.revoke(request)
if not punishment:
raise NotFoundError(f"Punishment {request.punishment_id} not found")
return PunishmentBase(
id=punishment.id,
player_uuid=punishment.player_uuid,
@ -63,13 +68,13 @@ class PunishmentsService:
revoked_by=punishment.revoked_by,
revoked_reason=punishment.revoked_reason,
evidence_url=punishment.evidence_url,
notes=punishment.notes
notes=punishment.notes,
)
async def query_punishments(self, query: PunishmentQuery) -> PunishmentListResponse:
"""Query punishments with business logic."""
punishments, total = await self.repo.query(query)
punishment_list = [
PunishmentBase(
id=p.id,
@ -87,28 +92,28 @@ class PunishmentsService:
revoked_by=p.revoked_by,
revoked_reason=p.revoked_reason,
evidence_url=p.evidence_url,
notes=p.notes
notes=p.notes,
)
for p in punishments
]
pages = (total + query.size - 1) // query.size
return PunishmentListResponse(
punishments=punishment_list,
total=total,
page=query.page,
size=query.size,
pages=pages
pages=pages,
)
async def get_active_ban_status(self, player_uuid: UUID) -> ActiveBanStatusResponse:
"""Get active ban status for player."""
ban = await self.repo.get_active_ban(player_uuid)
if not ban:
return ActiveBanStatusResponse(is_banned=False)
punishment = PunishmentBase(
id=ban.id,
player_uuid=ban.player_uuid,
@ -125,18 +130,20 @@ class PunishmentsService:
revoked_by=ban.revoked_by,
revoked_reason=ban.revoked_reason,
evidence_url=ban.evidence_url,
notes=ban.notes
notes=ban.notes,
)
return ActiveBanStatusResponse(is_banned=True, punishment=punishment)
async def get_active_mute_status(self, player_uuid: UUID) -> ActiveMuteStatusResponse:
async def get_active_mute_status(
self, player_uuid: UUID
) -> ActiveMuteStatusResponse:
"""Get active mute status for player."""
mute = await self.repo.get_active_mute(player_uuid)
if not mute:
return ActiveMuteStatusResponse(is_muted=False)
punishment = PunishmentBase(
id=mute.id,
player_uuid=mute.player_uuid,
@ -153,7 +160,7 @@ class PunishmentsService:
revoked_by=mute.revoked_by,
revoked_reason=mute.revoked_reason,
evidence_url=mute.evidence_url,
notes=mute.notes
notes=mute.notes,
)
return ActiveMuteStatusResponse(is_muted=True, punishment=punishment)

View File

@ -1,18 +1,20 @@
"""Teleport History service."""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.repositories.teleport_history_repo import TeleportHistoryRepository
from hubgw.schemas.teleport_history import TeleportHistoryCreate, TeleportHistory
from hubgw.schemas.teleport_history import (TeleportHistory,
TeleportHistoryCreate)
class TeleportHistoryService:
"""Teleport History service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = TeleportHistoryRepository(session)
async def create_teleport(self, request: TeleportHistoryCreate) -> TeleportHistory:
"""Create teleport history entry."""
entry = await self.repo.create(request)
@ -29,10 +31,12 @@ class TeleportHistoryService:
to_z=entry.to_z,
tp_type=entry.tp_type,
target_name=entry.target_name,
created_at=entry.created_at
created_at=entry.created_at,
)
async def list_player_teleports(self, player_uuid: str, limit: int = 100) -> List[TeleportHistory]:
async def list_player_teleports(
self, player_uuid: str, limit: int = 100
) -> List[TeleportHistory]:
"""List teleport history for player."""
entries = await self.repo.list_by_player(player_uuid, limit)
return [
@ -49,7 +53,7 @@ class TeleportHistoryService:
to_z=entry.to_z,
tp_type=entry.tp_type,
target_name=entry.target_name,
created_at=entry.created_at
created_at=entry.created_at,
)
for entry in entries
]
]

View File

@ -2,17 +2,17 @@
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import NotFoundError
from hubgw.repositories.users_repo import UserRepository
from hubgw.schemas.users import GetUserGameIdResponse
from hubgw.core.errors import NotFoundError
class UserService:
"""User service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = UserRepository(session)
async def get_game_id_by_name(self, name: str) -> GetUserGameIdResponse:
"""Get game_id by user name."""
game_id = await self.repo.get_game_id_by_name(name)

View File

@ -1,51 +1,51 @@
"""Warps service."""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import AlreadyExistsError, NotFoundError
from hubgw.repositories.warps_repo import WarpsRepository
from hubgw.schemas.warps import (
WarpCreateRequest, WarpUpdateRequest, WarpDeleteRequest, WarpGetRequest,
Warp, WarpGetResponse, WarpQuery, WarpListResponse
)
from hubgw.core.errors import NotFoundError, AlreadyExistsError
from hubgw.schemas.warps import (Warp, WarpCreateRequest, WarpDeleteRequest,
WarpGetRequest, WarpGetResponse,
WarpListResponse, WarpQuery,
WarpUpdateRequest)
class WarpsService:
"""Warps service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = WarpsRepository(session)
async def create_warp(self, request: WarpCreateRequest) -> Warp:
"""Create warp with business logic."""
# Check if warp with same name already exists
existing = await self.repo.get_by_request(WarpGetRequest(name=request.name))
if existing:
raise AlreadyExistsError(f"Warp '{request.name}' already exists")
return await self.repo.create(request)
async def update_warp(self, request: WarpUpdateRequest) -> Warp:
"""Update warp with business logic."""
warp = await self.repo.update(request)
if not warp:
raise NotFoundError(f"Warp '{request.name}' not found")
return warp
async def delete_warp(self, request: WarpDeleteRequest) -> None:
"""Delete warp with business logic."""
success = await self.repo.delete(request)
if not success:
raise NotFoundError(f"Warp '{request.name}' not found")
async def get_warp(self, request: WarpGetRequest) -> WarpGetResponse:
"""Get warp with business logic."""
warp = await self.repo.get_by_request(request)
if not warp:
raise NotFoundError(f"Warp '{request.name}' not found")
return WarpGetResponse(
id=warp.id,
name=warp.name,
@ -58,13 +58,13 @@ class WarpsService:
is_public=warp.is_public,
description=warp.description,
created_at=warp.created_at,
updated_at=warp.updated_at
updated_at=warp.updated_at,
)
async def list_warps(self, query: WarpQuery) -> WarpListResponse:
"""List warps with business logic."""
warps, total = await self.repo.list(query)
warp_list = [
Warp(
id=warp.id,
@ -78,17 +78,13 @@ class WarpsService:
is_public=warp.is_public,
description=warp.description,
created_at=warp.created_at,
updated_at=warp.updated_at
updated_at=warp.updated_at,
)
for warp in warps
]
pages = (total + query.size - 1) // query.size
return WarpListResponse(
warps=warp_list,
total=total,
page=query.page,
size=query.size,
pages=pages
warps=warp_list, total=total, page=query.page, size=query.size, pages=pages
)

View File

@ -1,44 +1,49 @@
"""Whitelist service."""
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from hubgw.repositories.whitelist_repo import WhitelistRepository
from hubgw.schemas.whitelist import (
WhitelistAddRequest, WhitelistRemoveRequest, WhitelistCheckRequest,
WhitelistEntry as SchemaWhitelistEntry, WhitelistCheckResponse, WhitelistListResponse, WhitelistQuery
)
from hubgw.schemas.luckperms import LuckPermsPlayerCreateRequest
from sqlalchemy.ext.asyncio import AsyncSession
from hubgw.core.errors import AlreadyExistsError, NotFoundError
from hubgw.repositories.whitelist_repo import WhitelistRepository
from hubgw.schemas.luckperms import LuckPermsPlayerCreateRequest
from hubgw.schemas.whitelist import (WhitelistAddRequest,
WhitelistCheckRequest,
WhitelistCheckResponse)
from hubgw.schemas.whitelist import WhitelistEntry as SchemaWhitelistEntry
from hubgw.schemas.whitelist import (WhitelistListResponse, WhitelistQuery,
WhitelistRemoveRequest)
from hubgw.services.luckperms_service import LuckPermsService
from hubgw.services.users_service import UserService
class WhitelistService:
"""Whitelist service for business logic."""
def __init__(self, session: AsyncSession):
self.repo = WhitelistRepository(session)
async def add_player(
self,
self,
request: WhitelistAddRequest,
luckperms_service: LuckPermsService,
user_service: UserService
user_service: UserService,
) -> SchemaWhitelistEntry:
try:
luckperms_create_request = LuckPermsPlayerCreateRequest(
username=request.player_name,
primary_group="default"
username=request.player_name, primary_group="default"
)
await luckperms_service.create_player(
luckperms_create_request, user_service
)
await luckperms_service.create_player(luckperms_create_request, user_service)
except AlreadyExistsError:
pass
existing = await self.repo.get_by_player_name(request.player_name)
if existing:
if existing.is_active:
raise AlreadyExistsError(f"Player '{request.player_name}' is already whitelisted")
raise AlreadyExistsError(
f"Player '{request.player_name}' is already whitelisted"
)
else:
existing.player_uuid = request.player_uuid
existing.added_by = request.added_by
@ -51,37 +56,35 @@ class WhitelistService:
created_entry = await self.repo.create(request)
return SchemaWhitelistEntry.model_validate(created_entry)
async def remove_player(self, request: WhitelistRemoveRequest) -> None:
success = await self.repo.delete_by_player_name(request.player_name)
if not success:
raise NotFoundError(f"Player '{request.player_name}' not found in whitelist")
async def check_player(self, request: WhitelistCheckRequest) -> WhitelistCheckResponse:
raise NotFoundError(
f"Player '{request.player_name}' not found in whitelist"
)
async def check_player(
self, request: WhitelistCheckRequest
) -> WhitelistCheckResponse:
entry = await self.repo.check(request)
return WhitelistCheckResponse(
is_whitelisted=entry is not None,
player_uuid=entry.player_uuid if entry else None
player_uuid=entry.player_uuid if entry else None,
)
async def list_players(self) -> WhitelistListResponse:
entries = await self.repo.list_all()
total = len(entries)
entry_list = [
SchemaWhitelistEntry.model_validate(entry)
for entry in entries
]
entry_list = [SchemaWhitelistEntry.model_validate(entry) for entry in entries]
return WhitelistListResponse(entries=entry_list, total=total)
async def query_players(self, query: WhitelistQuery) -> WhitelistListResponse:
entries, total = await self.repo.query(query)
entry_list = [
SchemaWhitelistEntry.model_validate(entry)
for entry in entries
]
entry_list = [SchemaWhitelistEntry.model_validate(entry) for entry in entries]
return WhitelistListResponse(entries=entry_list, total=total)

View File

@ -1,21 +1,21 @@
"""Pagination utility functions."""
from typing import List, TypeVar, Generic
from math import ceil
from typing import Generic, List, TypeVar
T = TypeVar('T')
T = TypeVar("T")
class PaginatedResult(Generic[T]):
"""Paginated result container."""
def __init__(self, items: List[T], total: int, page: int, size: int):
self.items = items
self.total = total
self.page = page
self.size = size
self.pages = ceil(total / size) if size > 0 else 0
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
@ -23,7 +23,7 @@ class PaginatedResult(Generic[T]):
"total": self.total,
"page": self.page,
"size": self.size,
"pages": self.pages
"pages": self.pages,
}