feat: add new methonds

This commit is contained in:
itqop 2025-10-16 00:49:45 +03:00
parent 637fc7516b
commit 6141e0d1bb
26 changed files with 407 additions and 570 deletions

View File

@ -7,16 +7,39 @@ from uuid import UUID
from datetime import datetime, timedelta
from hubgw.models.cooldown import Cooldown
from hubgw.schemas.cooldowns import CooldownCreate, CooldownQuery
class CooldownsRepository:
"""Cooldowns repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def check(self, player_uuid: str, cooldown_type: str) -> Optional[Cooldown]:
"""Check if cooldown is active."""
async def create(self, request: CooldownCreate) -> Cooldown:
"""Create new cooldown."""
expires_at = datetime.utcnow() + timedelta(seconds=request.cooldown_seconds)
cooldown = Cooldown(
player_uuid=request.player_uuid,
cooldown_type=request.cooldown_type,
expires_at=expires_at,
cooldown_seconds=request.cooldown_seconds,
metadata=request.metadata
)
self.session.add(cooldown)
await self.session.commit()
await self.session.refresh(cooldown)
return cooldown
async def get_by_id(self, entry_id: UUID) -> Optional[Cooldown]:
"""Get cooldown by id."""
stmt = select(Cooldown).where(Cooldown.id == entry_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_active_by_type(self, player_uuid: str, cooldown_type: str) -> Optional[Cooldown]:
"""Get active cooldown by type."""
stmt = select(Cooldown).where(
Cooldown.player_uuid == player_uuid,
Cooldown.cooldown_type == cooldown_type,
@ -24,28 +47,49 @@ class CooldownsRepository:
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def create(self, player_uuid: str, cooldown_type: str, cooldown_seconds: int, metadata: Optional[dict] = None) -> Cooldown:
"""Create new cooldown."""
expires_at = datetime.utcnow() + timedelta(seconds=cooldown_seconds)
cooldown = Cooldown(
player_uuid=player_uuid,
cooldown_type=cooldown_type,
expires_at=expires_at,
cooldown_seconds=cooldown_seconds,
metadata=metadata
)
self.session.add(cooldown)
async def update(self, cooldown: Cooldown) -> Cooldown:
"""Update cooldown."""
await self.session.commit()
await self.session.refresh(cooldown)
return cooldown
async def list_by_player(self, player_uuid: str) -> List[Cooldown]:
"""List active cooldowns by player."""
stmt = select(Cooldown).where(
Cooldown.player_uuid == player_uuid,
Cooldown.expires_at > func.now()
)
async def delete_by_id(self, entry_id: UUID) -> bool:
"""Delete cooldown by id."""
stmt = delete(Cooldown).where(Cooldown.id == entry_id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
await self.session.commit()
return result.rowcount > 0
async def query(self, query: CooldownQuery) -> tuple[List[Cooldown], int]:
"""Query cooldowns with filters and pagination."""
stmt = select(Cooldown)
count_stmt = select(func.count(Cooldown.id))
# Apply filters
if query.player_uuid:
stmt = stmt.where(Cooldown.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(Cooldown.player_uuid == query.player_uuid)
if query.cooldown_type:
stmt = stmt.where(Cooldown.cooldown_type == query.cooldown_type)
count_stmt = count_stmt.where(Cooldown.cooldown_type == query.cooldown_type)
if query.is_active is not None:
if query.is_active:
stmt = stmt.where(Cooldown.expires_at > func.now())
count_stmt = count_stmt.where(Cooldown.expires_at > func.now())
else:
stmt = stmt.where(Cooldown.expires_at <= func.now())
count_stmt = count_stmt.where(Cooldown.expires_at <= func.now())
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
result = await self.session.execute(stmt)
cooldowns = list(result.scalars().all())
return cooldowns, total

View File

@ -1,23 +1,23 @@
"""Homes repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy import select, update, delete, func
from sqlalchemy.dialects.postgresql import insert
from typing import List, Optional
from uuid import UUID
from hubgw.models.home import Home
from hubgw.schemas.homes import HomeUpsertRequest, HomeGetRequest
from hubgw.schemas.homes import HomeCreate, HomeQuery
class HomesRepository:
"""Homes repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def upsert(self, request: HomeUpsertRequest) -> Home:
"""Upsert home."""
async def create(self, request: HomeCreate) -> Home:
"""Create or update home (upsert)."""
stmt = insert(Home).values(
player_uuid=request.player_uuid,
name=request.name,
@ -31,38 +31,76 @@ class HomesRepository:
)
stmt = stmt.on_conflict_do_update(
index_elements=['player_uuid', 'name'],
set_=dict(
world=stmt.excluded.world,
x=stmt.excluded.x,
y=stmt.excluded.y,
z=stmt.excluded.z,
yaw=stmt.excluded.yaw,
pitch=stmt.excluded.pitch,
is_public=stmt.excluded.is_public
)
)
set_={
"world": stmt.excluded.world,
"x": stmt.excluded.x,
"y": stmt.excluded.y,
"z": stmt.excluded.z,
"yaw": stmt.excluded.yaw,
"pitch": stmt.excluded.pitch,
"is_public": stmt.excluded.is_public,
}
).returning(Home)
result = await self.session.execute(stmt)
await self.session.commit()
# Get the upserted home
stmt = select(Home).where(
Home.player_uuid == request.player_uuid,
Home.name == request.name
)
result = await self.session.execute(stmt)
return result.scalar_one()
async def get_by_request(self, request: HomeGetRequest) -> Optional[Home]:
"""Get home by request."""
async def get_by_id(self, home_id: UUID) -> Optional[Home]:
"""Get home by id."""
stmt = select(Home).where(Home.id == home_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_player_and_name(self, player_uuid: str, name: str) -> Optional[Home]:
"""Get home by player and name."""
stmt = select(Home).where(
Home.player_uuid == request.player_uuid,
Home.name == request.name
Home.player_uuid == player_uuid,
Home.name == name
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_by_player(self, player_uuid: str) -> List[Home]:
"""List homes by player UUID."""
stmt = select(Home).where(Home.player_uuid == player_uuid)
async def update(self, home: Home) -> Home:
"""Update home."""
await self.session.commit()
await self.session.refresh(home)
return home
async def delete_by_id(self, home_id: UUID) -> bool:
"""Delete home by id."""
stmt = delete(Home).where(Home.id == home_id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
await self.session.commit()
return result.rowcount > 0
async def query(self, query: HomeQuery) -> tuple[List[Home], int]:
"""Query homes with filters and pagination."""
stmt = select(Home)
count_stmt = select(func.count(Home.id))
# Apply filters
if query.player_uuid:
stmt = stmt.where(Home.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(Home.player_uuid == query.player_uuid)
if query.name:
stmt = stmt.where(Home.name.ilike(f"%{query.name}%"))
count_stmt = count_stmt.where(Home.name.ilike(f"%{query.name}%"))
if query.world:
stmt = stmt.where(Home.world == query.world)
count_stmt = count_stmt.where(Home.world == query.world)
if query.is_public is not None:
stmt = stmt.where(Home.is_public == query.is_public)
count_stmt = count_stmt.where(Home.is_public == query.is_public)
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Home.name)
result = await self.session.execute(stmt)
homes = list(result.scalars().all())
return homes, total

View File

@ -1,37 +1,98 @@
"""Kits repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import Optional
from sqlalchemy import select, delete, func
from typing import Optional, List
from uuid import UUID
from datetime import datetime, timedelta
from hubgw.models.cooldown import Cooldown
from hubgw.schemas.kits import KitCooldownQuery
class KitsRepository:
"""Kits repository for database operations."""
"""
Kits repository for database operations.
NOTE: This repository manages cooldowns for kits, not the kits themselves.
Consider refactoring to move this logic to CooldownsRepository to follow SRP.
"""
def __init__(self, session: AsyncSession):
self.session = session
async def check_cooldown(self, player_uuid: UUID, kit_name: str) -> Optional[Cooldown]:
async def get_active_cooldown(self, player_uuid: UUID, kit_name: str) -> Optional[Cooldown]:
"""Check if player has active cooldown for kit."""
stmt = select(Cooldown).where(
Cooldown.player_uuid == player_uuid,
Cooldown.key == f"kit_{kit_name}",
Cooldown.player_uuid == str(player_uuid),
Cooldown.cooldown_type == f"kit_{kit_name}",
Cooldown.expires_at > func.now()
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def create_cooldown(self, player_uuid: UUID, kit_name: str, cooldown_seconds: int) -> Cooldown:
"""Create cooldown for kit."""
expires_at = datetime.utcnow() + timedelta(seconds=cooldown_seconds)
cooldown = Cooldown(
key=f"kit_{kit_name}",
player_uuid=player_uuid,
player_uuid=str(player_uuid),
cooldown_type=f"kit_{kit_name}",
expires_at=expires_at,
cooldown_seconds=cooldown_seconds
)
self.session.add(cooldown)
await self.session.commit()
await self.session.refresh(cooldown)
return cooldown
async def get_cooldown_by_id(self, cooldown_id: UUID) -> Optional[Cooldown]:
"""Get kit cooldown by id."""
stmt = select(Cooldown).where(
Cooldown.id == cooldown_id,
Cooldown.cooldown_type.like("kit_%")
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def delete_cooldown_by_id(self, cooldown_id: UUID) -> bool:
"""Delete kit cooldown by id."""
stmt = delete(Cooldown).where(
Cooldown.id == cooldown_id,
Cooldown.cooldown_type.like("kit_%")
)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0
async def query_cooldowns(self, query: KitCooldownQuery) -> tuple[List[Cooldown], int]:
"""Query kit cooldowns with filters and pagination."""
stmt = select(Cooldown).where(Cooldown.cooldown_type.like("kit_%"))
count_stmt = select(func.count(Cooldown.id)).where(Cooldown.cooldown_type.like("kit_%"))
# Apply filters
if query.player_uuid:
stmt = stmt.where(Cooldown.player_uuid == str(query.player_uuid))
count_stmt = count_stmt.where(Cooldown.player_uuid == str(query.player_uuid))
if query.kit_name:
stmt = stmt.where(Cooldown.cooldown_type == f"kit_{query.kit_name}")
count_stmt = count_stmt.where(Cooldown.cooldown_type == f"kit_{query.kit_name}")
if query.is_active is not None:
if query.is_active:
stmt = stmt.where(Cooldown.expires_at > func.now())
count_stmt = count_stmt.where(Cooldown.expires_at > func.now())
else:
stmt = stmt.where(Cooldown.expires_at <= func.now())
count_stmt = count_stmt.where(Cooldown.expires_at <= func.now())
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(Cooldown.expires_at.desc())
result = await self.session.execute(stmt)
cooldowns = list(result.scalars().all())
return cooldowns, total

View File

@ -1,19 +1,20 @@
"""Teleport History repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List
from sqlalchemy import select, func
from typing import List, Optional
from uuid import UUID
from hubgw.models.teleport_history import TeleportHistory
from hubgw.schemas.teleport_history import TeleportHistoryCreate
from hubgw.schemas.teleport_history import TeleportHistoryCreate, TeleportHistoryQuery
class TeleportHistoryRepository:
"""Teleport History repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, request: TeleportHistoryCreate) -> TeleportHistory:
"""Create teleport history entry."""
entry = TeleportHistory(**request.model_dump())
@ -21,11 +22,41 @@ class TeleportHistoryRepository:
await self.session.commit()
await self.session.refresh(entry)
return entry
async def list_by_player(self, player_uuid: str, limit: int = 100) -> List[TeleportHistory]:
"""List teleport history by player."""
stmt = select(TeleportHistory).where(
TeleportHistory.player_uuid == player_uuid
).order_by(TeleportHistory.created_at.desc()).limit(limit)
async def get_by_id(self, entry_id: UUID) -> Optional[TeleportHistory]:
"""Get teleport history entry by id."""
stmt = select(TeleportHistory).where(TeleportHistory.id == entry_id)
result = await self.session.execute(stmt)
return list(result.scalars().all())
return result.scalar_one_or_none()
async def query(self, query: TeleportHistoryQuery) -> tuple[List[TeleportHistory], int]:
"""Query teleport history entries with filters and pagination."""
stmt = select(TeleportHistory)
count_stmt = select(func.count(TeleportHistory.id))
# Apply filters
if query.player_uuid:
stmt = stmt.where(TeleportHistory.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(TeleportHistory.player_uuid == query.player_uuid)
if query.tp_type:
stmt = stmt.where(TeleportHistory.tp_type == query.tp_type)
count_stmt = count_stmt.where(TeleportHistory.tp_type == query.tp_type)
if query.from_world:
stmt = stmt.where(TeleportHistory.from_world == query.from_world)
count_stmt = count_stmt.where(TeleportHistory.from_world == query.from_world)
if query.to_world:
stmt = stmt.where(TeleportHistory.to_world == query.to_world)
count_stmt = count_stmt.where(TeleportHistory.to_world == query.to_world)
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(TeleportHistory.created_at.desc())
result = await self.session.execute(stmt)
entries = list(result.scalars().all())
return entries, total

View File

@ -6,15 +6,15 @@ from typing import List, Optional
from uuid import UUID
from hubgw.models.warp import Warp
from hubgw.schemas.warps import WarpCreateRequest, WarpUpdateRequest, WarpDeleteRequest, WarpGetRequest, WarpListQuery
from hubgw.schemas.warps import WarpCreateRequest, WarpUpdateRequest, WarpQuery
class WarpsRepository:
"""Warps repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, request: WarpCreateRequest) -> Warp:
"""Create warp."""
warp = Warp(
@ -32,60 +32,75 @@ class WarpsRepository:
await self.session.commit()
await self.session.refresh(warp)
return warp
async def get_by_id(self, warp_id: UUID) -> Optional[Warp]:
"""Get warp by id."""
stmt = select(Warp).where(Warp.id == warp_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_name(self, name: str) -> Optional[Warp]:
"""Get warp by name."""
stmt = select(Warp).where(Warp.name == name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def update(self, request: WarpUpdateRequest) -> Optional[Warp]:
"""Update warp."""
stmt = select(Warp).where(Warp.name == request.name)
result = await self.session.execute(stmt)
warp = result.scalar_one_or_none()
if not warp:
return None
update_data = request.model_dump(exclude_unset=True, exclude={'name'})
for field, value in update_data.items():
setattr(warp, field, value)
await self.session.commit()
await self.session.refresh(warp)
return warp
async def delete(self, request: WarpDeleteRequest) -> bool:
"""Delete warp."""
stmt = delete(Warp).where(Warp.name == request.name)
async def delete_by_id(self, warp_id: UUID) -> bool:
"""Delete warp by id."""
stmt = delete(Warp).where(Warp.id == warp_id)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0
async def get_by_request(self, request: WarpGetRequest) -> Optional[Warp]:
"""Get warp by request."""
stmt = select(Warp).where(Warp.name == request.name)
async def delete_by_name(self, name: str) -> bool:
"""Delete warp by name."""
stmt = delete(Warp).where(Warp.name == name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list(self, query: WarpListQuery) -> tuple[List[Warp], int]:
"""List warps with pagination."""
await self.session.commit()
return result.rowcount > 0
async def query(self, query: WarpQuery) -> tuple[List[Warp], int]:
"""Query warps with filters and pagination."""
stmt = select(Warp)
count_stmt = select(func.count(Warp.id))
# Apply filters
if query.name:
stmt = stmt.where(Warp.name.ilike(f"%{query.name}%"))
count_stmt = count_stmt.where(Warp.name.ilike(f"%{query.name}%"))
if query.world:
stmt = stmt.where(Warp.world == query.world)
count_stmt = count_stmt.where(Warp.world == query.world)
if query.is_public is not None:
stmt = stmt.where(Warp.is_public == query.is_public)
count_stmt = count_stmt.where(Warp.is_public == query.is_public)
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size)
stmt = stmt.offset(offset).limit(query.size).order_by(Warp.name)
result = await self.session.execute(stmt)
warps = list(result.scalars().all())
return warps, total

View File

@ -1,22 +1,22 @@
"""Whitelist repository."""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, insert, delete, func
from sqlalchemy import select, insert, delete, func, update
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from hubgw.models.whitelist import WhitelistEntry
from hubgw.schemas.whitelist import WhitelistAddRequest, WhitelistRemoveRequest, WhitelistCheckRequest
from hubgw.schemas.whitelist import WhitelistAddRequest, WhitelistQuery
class WhitelistRepository:
"""Whitelist repository for database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def add(self, request: WhitelistAddRequest) -> WhitelistEntry:
async def create(self, request: WhitelistAddRequest) -> WhitelistEntry:
"""Add player to whitelist."""
entry = WhitelistEntry(
player_name=request.player_name,
@ -31,28 +31,60 @@ class WhitelistRepository:
await self.session.commit()
await self.session.refresh(entry)
return entry
async def remove(self, request: WhitelistRemoveRequest) -> bool:
"""Remove player from whitelist."""
stmt = delete(WhitelistEntry).where(WhitelistEntry.player_name == request.player_name)
async def get_by_id(self, entry_id: int) -> Optional[WhitelistEntry]:
"""Get whitelist entry by id."""
stmt = select(WhitelistEntry).where(WhitelistEntry.id == entry_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_player_name(self, player_name: str) -> Optional[WhitelistEntry]:
"""Get whitelist entry by player name."""
stmt = select(WhitelistEntry).where(WhitelistEntry.player_name == player_name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def update(self, entry: WhitelistEntry) -> WhitelistEntry:
"""Update whitelist entry."""
await self.session.commit()
await self.session.refresh(entry)
return entry
async def delete_by_id(self, entry_id: int) -> bool:
"""Delete whitelist entry by id."""
stmt = delete(WhitelistEntry).where(WhitelistEntry.id == entry_id)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0
async def check(self, request: WhitelistCheckRequest) -> Optional[WhitelistEntry]:
"""Check if player is whitelisted."""
stmt = select(WhitelistEntry).where(WhitelistEntry.player_name == request.player_name)
async def query(self, query: WhitelistQuery) -> tuple[List[WhitelistEntry], int]:
"""Query whitelist entries with filters and pagination."""
stmt = select(WhitelistEntry)
count_stmt = select(func.count(WhitelistEntry.id))
# Apply filters
if query.player_name:
stmt = stmt.where(WhitelistEntry.player_name.ilike(f"%{query.player_name}%"))
count_stmt = count_stmt.where(WhitelistEntry.player_name.ilike(f"%{query.player_name}%"))
if query.player_uuid:
stmt = stmt.where(WhitelistEntry.player_uuid == query.player_uuid)
count_stmt = count_stmt.where(WhitelistEntry.player_uuid == query.player_uuid)
if query.added_by:
stmt = stmt.where(WhitelistEntry.added_by.ilike(f"%{query.added_by}%"))
count_stmt = count_stmt.where(WhitelistEntry.added_by.ilike(f"%{query.added_by}%"))
if query.is_active is not None:
stmt = stmt.where(WhitelistEntry.is_active == query.is_active)
count_stmt = count_stmt.where(WhitelistEntry.is_active == query.is_active)
# Get total count
count_result = await self.session.execute(count_stmt)
total = count_result.scalar()
# Apply pagination
offset = (query.page - 1) * query.size
stmt = stmt.offset(offset).limit(query.size).order_by(WhitelistEntry.player_name)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def list_all(self) -> List[WhitelistEntry]:
"""List all whitelist entries."""
stmt = select(WhitelistEntry).order_by(WhitelistEntry.player_name)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def count(self) -> int:
"""Get total whitelist count."""
stmt = select(func.count(WhitelistEntry.id))
result = await self.session.execute(stmt)
return result.scalar()
entries = list(result.scalars().all())
return entries, total

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
from uuid import UUID
from datetime import datetime
from typing import Optional
from hubgw.schemas.common import BaseSchema
from hubgw.schemas.common import BaseSchema, PaginationParams
class CooldownCheckRequest(BaseModel):
@ -42,3 +42,11 @@ class Cooldown(CooldownBase, BaseSchema):
id: UUID
player_uuid: str
class CooldownQuery(PaginationParams):
"""Cooldown query schema."""
player_uuid: Optional[str] = None
cooldown_type: Optional[str] = None
is_active: Optional[bool] = None

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from hubgw.schemas.common import BaseSchema
from hubgw.schemas.common import BaseSchema, PaginationParams
class HomeBase(BaseModel):
@ -70,3 +70,12 @@ class HomeListResponse(BaseModel):
homes: list[Home]
total: int
class HomeQuery(PaginationParams):
"""Home query schema."""
player_uuid: Optional[str] = None
name: Optional[str] = None
world: Optional[str] = None
is_public: Optional[bool] = None

View File

@ -3,6 +3,7 @@
from pydantic import BaseModel
from uuid import UUID
from typing import Optional
from hubgw.schemas.common import PaginationParams
class KitClaimRequest(BaseModel):
@ -18,3 +19,11 @@ class KitClaimResponse(BaseModel):
success: bool
message: str
cooldown_remaining: Optional[int] = None
class KitCooldownQuery(PaginationParams):
"""Kit cooldown query schema."""
player_uuid: Optional[UUID] = None
kit_name: Optional[str] = None
is_active: Optional[bool] = None

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from hubgw.schemas.common import BaseSchema
from hubgw.schemas.common import BaseSchema, PaginationParams
class TeleportHistoryBase(BaseModel):
@ -33,4 +33,13 @@ class TeleportHistory(TeleportHistoryBase, BaseSchema):
id: UUID
player_uuid: str
created_at: datetime
created_at: datetime
class TeleportHistoryQuery(PaginationParams):
"""Teleport history query schema."""
player_uuid: Optional[str] = None
tp_type: Optional[str] = None
from_world: Optional[str] = None
to_world: Optional[str] = None

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from hubgw.schemas.common import BaseSchema
from hubgw.schemas.common import BaseSchema, PaginationParams
class WarpBase(BaseModel):
@ -65,13 +65,12 @@ class WarpGetResponse(Warp):
pass
class WarpListQuery(BaseModel):
"""Warp list query schema."""
class WarpQuery(PaginationParams):
"""Warp query schema."""
page: int = 1
size: int = 20
name: Optional[str] = None
world: Optional[str] = None
is_public: Optional[int] = None
is_public: Optional[bool] = None
class WarpListResponse(BaseModel):

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from uuid import UUID
from hubgw.schemas.common import BaseSchema
from hubgw.schemas.common import BaseSchema, PaginationParams
class WhitelistAddRequest(BaseModel):
@ -56,3 +56,12 @@ class WhitelistListResponse(BaseModel):
entries: list[WhitelistEntry]
total: int
class WhitelistQuery(PaginationParams):
"""Whitelist query schema."""
player_name: Optional[str] = None
player_uuid: Optional[str] = None
added_by: Optional[str] = None
is_active: Optional[bool] = None

View File

View File

@ -1,19 +0,0 @@
"""Tests for audit endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
from datetime import datetime
def test_log_command_unauthorized(test_client: TestClient):
"""Test log command without API key."""
response = test_client.post("/api/v1/audit/commands", json={
"player_uuid": str(uuid4()),
"player_name": "test_player",
"command": "tp",
"arguments": ["player2"],
"server": "hub",
"timestamp": datetime.utcnow().isoformat()
})
assert response.status_code == 401

View File

@ -1,22 +0,0 @@
"""Tests for cooldowns endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
def test_check_cooldown_unauthorized(test_client: TestClient):
"""Test check cooldown without API key."""
response = test_client.post("/api/v1/cooldowns/check", json={
"player_uuid": str(uuid4()),
"key": "test_key"
})
assert response.status_code == 401
def test_touch_cooldown_unauthorized(test_client: TestClient):
"""Test touch cooldown without API key."""
response = test_client.put("/api/v1/cooldowns/touch?seconds=60", json={
"key": "test_key"
})
assert response.status_code == 401

View File

@ -1,11 +0,0 @@
"""Tests for health endpoints."""
import pytest
from fastapi.testclient import TestClient
def test_health_check(test_client: TestClient):
"""Test health check endpoint."""
response = test_client.get("/api/v1/health/")
assert response.status_code == 200
assert response.json() == {"status": "ok"}

View File

@ -1,33 +0,0 @@
"""Tests for homes endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
def test_upsert_home_unauthorized(test_client: TestClient):
"""Test upsert home without API key."""
response = test_client.put("/api/v1/homes/", json={
"player_uuid": str(uuid4()),
"name": "test_home",
"world": "world",
"x": 0.0,
"y": 64.0,
"z": 0.0
})
assert response.status_code == 401
def test_get_home_unauthorized(test_client: TestClient):
"""Test get home without API key."""
response = test_client.post("/api/v1/homes/get", json={
"player_uuid": str(uuid4()),
"name": "test_home"
})
assert response.status_code == 401
def test_list_homes_unauthorized(test_client: TestClient):
"""Test list homes without API key."""
response = test_client.get(f"/api/v1/homes/{uuid4()}")
assert response.status_code == 401

View File

@ -1,14 +0,0 @@
"""Tests for kits endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
def test_claim_kit_unauthorized(test_client: TestClient):
"""Test claim kit without API key."""
response = test_client.post("/api/v1/kits/claim", json={
"player_uuid": str(uuid4()),
"kit_name": "starter"
})
assert response.status_code == 401

View File

@ -1,49 +0,0 @@
"""Tests for punishments endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
def test_create_punishment_unauthorized(test_client: TestClient):
"""Test create punishment without API key."""
response = test_client.post("/api/v1/punishments/", json={
"player_uuid": str(uuid4()),
"player_name": "test_player",
"punishment_type": "ban",
"reason": "Test ban",
"staff_uuid": str(uuid4()),
"staff_name": "admin"
})
assert response.status_code == 401
def test_revoke_punishment_unauthorized(test_client: TestClient):
"""Test revoke punishment without API key."""
response = test_client.post("/api/v1/punishments/revoke", json={
"punishment_id": str(uuid4()),
"revoked_by": str(uuid4()),
"revoked_reason": "Test revoke"
})
assert response.status_code == 401
def test_query_punishments_unauthorized(test_client: TestClient):
"""Test query punishments without API key."""
response = test_client.post("/api/v1/punishments/query", json={
"page": 1,
"size": 20
})
assert response.status_code == 401
def test_get_ban_status_unauthorized(test_client: TestClient):
"""Test get ban status without API key."""
response = test_client.get(f"/api/v1/punishments/ban/{uuid4()}")
assert response.status_code == 401
def test_get_mute_status_unauthorized(test_client: TestClient):
"""Test get mute status without API key."""
response = test_client.get(f"/api/v1/punishments/mute/{uuid4()}")
assert response.status_code == 401

View File

@ -1,50 +0,0 @@
"""Tests for warps endpoints."""
import pytest
from fastapi.testclient import TestClient
def test_create_warp_unauthorized(test_client: TestClient):
"""Test create warp without API key."""
response = test_client.post("/api/v1/warps/", json={
"name": "test_warp",
"world": "world",
"x": 0.0,
"y": 64.0,
"z": 0.0
})
assert response.status_code == 401
def test_update_warp_unauthorized(test_client: TestClient):
"""Test update warp without API key."""
response = test_client.patch("/api/v1/warps/", json={
"name": "test_warp",
"world": "world_nether"
})
assert response.status_code == 401
def test_delete_warp_unauthorized(test_client: TestClient):
"""Test delete warp without API key."""
response = test_client.delete("/api/v1/warps/", json={
"name": "test_warp"
})
assert response.status_code == 401
def test_get_warp_unauthorized(test_client: TestClient):
"""Test get warp without API key."""
response = test_client.post("/api/v1/warps/get", json={
"name": "test_warp"
})
assert response.status_code == 401
def test_list_warps_unauthorized(test_client: TestClient):
"""Test list warps without API key."""
response = test_client.post("/api/v1/warps/list", json={
"page": 1,
"size": 20
})
assert response.status_code == 401

View File

@ -1,37 +0,0 @@
"""Tests for whitelist endpoints."""
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4
def test_add_player_unauthorized(test_client: TestClient):
"""Test add player without API key."""
response = test_client.post("/api/v1/whitelist/add", json={
"player_name": "test_player",
"player_uuid": str(uuid4()),
"added_by": "admin"
})
assert response.status_code == 401
def test_remove_player_unauthorized(test_client: TestClient):
"""Test remove player without API key."""
response = test_client.post("/api/v1/whitelist/remove", json={
"player_name": "test_player"
})
assert response.status_code == 401
def test_check_player_unauthorized(test_client: TestClient):
"""Test check player without API key."""
response = test_client.post("/api/v1/whitelist/check", json={
"player_name": "test_player"
})
assert response.status_code == 401
def test_list_players_unauthorized(test_client: TestClient):
"""Test list players without API key."""
response = test_client.get("/api/v1/whitelist/")
assert response.status_code == 401

View File

@ -1,48 +0,0 @@
"""Tests for homes repository."""
import pytest
from uuid import uuid4
from hubgw.repositories.homes_repo import HomesRepository
from hubgw.schemas.homes import HomeUpsertRequest, HomeGetRequest
@pytest.mark.asyncio
async def test_upsert_home(test_session):
"""Test upsert home."""
repo = HomesRepository(test_session)
request = HomeUpsertRequest(
player_uuid=uuid4(),
name="test_home",
world="world",
x=0.0,
y=64.0,
z=0.0
)
home = await repo.upsert(request)
assert home.name == "test_home"
assert home.player_uuid == request.player_uuid
@pytest.mark.asyncio
async def test_get_home_not_found(test_session):
"""Test get home when not found."""
repo = HomesRepository(test_session)
request = HomeGetRequest(
player_uuid=uuid4(),
name="nonexistent_home"
)
home = await repo.get_by_request(request)
assert home is None
@pytest.mark.asyncio
async def test_list_homes_empty(test_session):
"""Test list homes when empty."""
repo = HomesRepository(test_session)
homes = await repo.list_by_player(uuid4())
assert homes == []

View File

@ -1,56 +0,0 @@
"""Tests for punishments repository."""
import pytest
from uuid import uuid4
from datetime import datetime
from hubgw.repositories.punishments_repo import PunishmentsRepository
from hubgw.schemas.punishments import PunishmentCreateRequest, PunishmentRevokeRequest, PunishmentQuery
@pytest.mark.asyncio
async def test_create_punishment(test_session):
"""Test create punishment."""
repo = PunishmentsRepository(test_session)
request = PunishmentCreateRequest(
player_uuid=uuid4(),
player_name="test_player",
punishment_type="ban",
reason="Test ban",
staff_uuid=uuid4(),
staff_name="admin"
)
punishment = await repo.create(request)
assert punishment.player_name == "test_player"
assert punishment.punishment_type == "ban"
@pytest.mark.asyncio
async def test_query_punishments_empty(test_session):
"""Test query punishments when empty."""
repo = PunishmentsRepository(test_session)
query = PunishmentQuery(page=1, size=20)
punishments, total = await repo.query(query)
assert punishments == []
assert total == 0
@pytest.mark.asyncio
async def test_get_active_ban_not_found(test_session):
"""Test get active ban when not found."""
repo = PunishmentsRepository(test_session)
ban = await repo.get_active_ban(uuid4())
assert ban is None
@pytest.mark.asyncio
async def test_get_active_mute_not_found(test_session):
"""Test get active mute when not found."""
repo = PunishmentsRepository(test_session)
mute = await repo.get_active_mute(uuid4())
assert mute is None

View File

@ -1,46 +0,0 @@
"""Tests for warps repository."""
import pytest
from hubgw.repositories.warps_repo import WarpsRepository
from hubgw.schemas.warps import WarpCreateRequest, WarpUpdateRequest, WarpDeleteRequest, WarpGetRequest, WarpListQuery
@pytest.mark.asyncio
async def test_create_warp(test_session):
"""Test create warp."""
repo = WarpsRepository(test_session)
request = WarpCreateRequest(
name="test_warp",
world="world",
x=0.0,
y=64.0,
z=0.0
)
warp = await repo.create(request)
assert warp.name == "test_warp"
assert warp.world == "world"
@pytest.mark.asyncio
async def test_get_warp_not_found(test_session):
"""Test get warp when not found."""
repo = WarpsRepository(test_session)
request = WarpGetRequest(name="nonexistent_warp")
warp = await repo.get_by_request(request)
assert warp is None
@pytest.mark.asyncio
async def test_list_warps_empty(test_session):
"""Test list warps when empty."""
repo = WarpsRepository(test_session)
query = WarpListQuery(page=1, size=20)
warps, total = await repo.list(query)
assert warps == []
assert total == 0

View File

@ -1,42 +0,0 @@
"""Tests for whitelist repository."""
import pytest
from uuid import uuid4
from hubgw.repositories.whitelist_repo import WhitelistRepository
from hubgw.schemas.whitelist import WhitelistAddRequest, WhitelistRemoveRequest, WhitelistCheckRequest
@pytest.mark.asyncio
async def test_add_player(test_session):
"""Test add player to whitelist."""
repo = WhitelistRepository(test_session)
request = WhitelistAddRequest(
player_name="test_player",
player_uuid=uuid4(),
added_by="admin"
)
entry = await repo.add(request)
assert entry.player_name == "test_player"
assert entry.added_by == "admin"
@pytest.mark.asyncio
async def test_check_player_not_found(test_session):
"""Test check player when not found."""
repo = WhitelistRepository(test_session)
request = WhitelistCheckRequest(player_name="nonexistent_player")
entry = await repo.check(request)
assert entry is None
@pytest.mark.asyncio
async def test_list_players_empty(test_session):
"""Test list players when empty."""
repo = WhitelistRepository(test_session)
entries = await repo.list_all()
assert entries == []

0
tests/unit/__init__.py Normal file
View File