Initial base

This commit is contained in:
itqop 2024-03-07 16:06:53 +03:00
parent 0279534182
commit 181cee2e8c
19 changed files with 353 additions and 0 deletions

4
app/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from app.app import init_app
app, database = init_app()
from app.handlers import root

23
app/app.py Normal file
View File

@ -0,0 +1,23 @@
from typing import Tuple
from fastapi import FastAPI
from pydantic import MySQLDsn
from config import configs
from db import MySQLConfig, Database
def get_app() -> FastAPI:
app = FastAPI()
return app
def get_config(url: MySQLDsn) -> MySQLConfig:
db_config = MySQLConfig(url=url)
return db_config
def get_database(uri: MySQLConfig) -> Database:
database = Database(uri=uri)
return database
def init_app() -> Tuple[FastAPI, Database]:
app = get_app()
uri = get_config(url=configs.DB_URI)
database = get_database(uri=uri)
return app, database

66
app/handlers.py Normal file
View File

@ -0,0 +1,66 @@
from app import app, database
from db import RequestSchema, UserSchema
from fastapi import HTTPException
@app.get('/')
async def root():
return {
"answer": None
}
@app.post("/get_uuid/")
async def get_uuid(request: RequestSchema):
profile = await get_profile(schema=request)
return {
"username": profile.username,
"uuid": profile.uuid
}
@app.post("/check_subscription/")
async def check_subscription(request: RequestSchema):
profile = await get_profile(schema=request)
subscription_status = await database.check_subscription_by_uuid(user=profile)
return {"has_subscription": subscription_status}
@app.post("/check_ban_status/")
async def check_ban_status(request: RequestSchema):
profile = await get_profile(schema=request)
ban_status = await database.check_ban_status_by_uuid(user=profile)
return {"is_banned": not ban_status}
@app.post("/unban/")
async def unban_user(request: RequestSchema):
profile = await get_profile(schema=request)
status = (await check_ban_status(request=request))['is_banned']
if status:
await database.unban_by_uuid(user=profile)
return {"message": "User unbanned successfully"}
return {"message": "User not banned"}
@app.post("/grant_permissions/")
async def grant_permissions(request: RequestSchema):
profile = await get_profile(schema=request)
status = (await check_subscription(request=request))['has_subscription']
if not status:
await database.grant_permissions_by_uuid(user=profile)
await grant_tab(request=request)
return {"message": "Permissions granted successfully"}
return {"message": "Permissions already granted"}
@app.post("/grant_prefix/")
async def grant_tab(request: RequestSchema):
profile = await get_profile(schema=request)
await database.grant_tab_by_username(user=profile, tab=request.tab)
return {"message": "Tab granted successfully"}
async def get_profile(schema: RequestSchema) -> UserSchema:
uuid = await database.get_uuid_by_username(schema)
if not uuid:
raise HTTPException(status_code=404, detail="User not found")
profile = UserSchema.model_construct(
username=schema.username,
uuid=uuid,
expiry=schema.validated_expiry
)
return profile

21
config.py Normal file
View File

@ -0,0 +1,21 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import PositiveInt, computed_field, MySQLDsn
from pydantic_core import Url
class Configs(BaseSettings):
HOST: str
PORT: PositiveInt
DATABASE: str
USERNAME: str
PASSWORD: str
@computed_field
def DB_URI(self) -> MySQLDsn:
return Url(
f"mysql+aiomysql://{self.USERNAME}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DATABASE}?charset=utf8mb4"
)
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')
configs = Configs()

3
db/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from db.sql_schemas import TabUser, LuckpermsUserPermission, LitebansBan, LuckpermsPlayer
from db.database import Database
from db.schemas import UUIDSchema, MySQLConfig, UserSchema, RequestSchema, TabSchema

94
db/database.py Normal file
View File

@ -0,0 +1,94 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from db import LuckpermsPlayer, LitebansBan, LuckpermsUserPermission, TabUser
from db.schemas import MySQLConfig, UserSchema, RequestSchema, TabSchema
from sqlalchemy.ext.asyncio import AsyncSession
from aiocache import cached, SimpleMemoryCache
from aiocache.serializers import PickleSerializer
class Database:
def __init__(self, uri: MySQLConfig):
self.engine = create_async_engine(str(uri))
self.AsyncSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine, class_=AsyncSession)
self.uri = uri
@cached(ttl=3600, cache=SimpleMemoryCache, serializer=PickleSerializer())
async def get_uuid_by_username(self, request: RequestSchema):
async with self.AsyncSessionLocal() as session:
async with session.begin():
player = await session.execute(select(LuckpermsPlayer).filter(LuckpermsPlayer.username == request.username))
player = player.scalar_one_or_none()
if player:
return player.uuid
else:
return None
async def unban_by_uuid(self, user: UserSchema):
async with self.AsyncSessionLocal() as session:
bans = await session.execute(
select(LitebansBan).filter(
LitebansBan.uuid == user.uuid,
LitebansBan.active == True
)
)
bans = bans.scalars().all()
for ban in bans:
ban.active = False
ban.removed_by_uuid = "fa87b50a-791b-4c61-a56f-d6cc03df1582"
ban.removed_by_name = "The_MrKroll"
ban.removed_by_reason = "Куплен разбан на сайте"
await session.commit()
async def grant_permissions_by_uuid(self, user: UserSchema):
async with self.AsyncSessionLocal() as session:
permission = LuckpermsUserPermission(
uuid=str(user.uuid),
permission="group.subscribe",
value="1",
server="global",
world="global",
expiry=str(user.expiry),
contexts="{}"
)
session.add(permission)
await session.commit()
async def grant_tab_by_username(self, user: UserSchema, tab: TabSchema):
async with self.AsyncSessionLocal() as session:
prefix = TabUser(
user=user.username,
property=tab.property,
value=tab.value,
expiry=user.expiry
)
session.add(prefix)
await session.commit()
async def check_subscription_by_uuid(self, user: UserSchema) -> bool:
async with self.AsyncSessionLocal() as session:
subscription = await session.execute(
select(LuckpermsUserPermission).filter(
LuckpermsUserPermission.uuid == user.uuid,
LuckpermsUserPermission.permission == "group.subscribe"
)
)
if subscription.scalar():
return True
else:
return False
@cached(ttl=2, cache=SimpleMemoryCache, serializer=PickleSerializer())
async def check_ban_status_by_uuid(self, user: UserSchema) -> bool:
async with self.AsyncSessionLocal() as session:
ban = await session.execute(
select(LitebansBan).filter(
LitebansBan.uuid == user.uuid,
LitebansBan.active == True
)
)
if not ban.scalar():
return True
else:
return False

View File

@ -0,0 +1,5 @@
from pydantic import MySQLDsn
class MySQLConfig(MySQLDsn):
pass

View File

@ -0,0 +1,33 @@
from pydantic import BaseModel, PositiveInt, computed_field, field_validator
from .TabSchema import TabSchema
import datetime
import re
class RequestSchema(BaseModel):
username: str
expiry: PositiveInt = 1
tab: TabSchema = TabSchema()
@field_validator('username')
def validate_username(cls, value):
if not re.fullmatch(r'[a-zA-Z0-9_-]{4,16}', value):
raise ValueError("Invalid username format")
return value
@computed_field(return_type=PositiveInt)
def validated_expiry(self):
if self.expiry > 10**5:
try:
expiry_datetime = datetime.datetime.utcfromtimestamp(self.expiry)
if expiry_datetime <= datetime.datetime.utcnow():
raise ValueError("Expiry timestamp must be in the future")
return self.expiry
except ValueError:
raise ValueError("Invalid timestamp format")
else:
if self.expiry != 0:
now = datetime.datetime.utcnow()
return int((now + datetime.timedelta(days=self.expiry)).timestamp())
else:
return 0

19
db/schemas/TabSchema.py Normal file
View File

@ -0,0 +1,19 @@
from pydantic import BaseModel, field_validator
class TabSchema(BaseModel):
property: str = "tagprefix"
value: str = "&b$ &f"
@field_validator("property")
def validate_property(cls, v):
valid_properties = {"tagprefix", "tagsuffix", "tabprefix", "tabsuffix", "abovename", "belowname"}
if v not in valid_properties:
raise ValueError(f"Invalid property: {v}")
return v
@field_validator("value")
def validate_value_length(cls, v):
if len(v) > 30:
raise ValueError("Value length must be less than or equal to 30 characters")
return v

14
db/schemas/UUIDSchema.py Normal file
View File

@ -0,0 +1,14 @@
from pydantic import BaseModel, field_validator
import uuid
class UUIDSchema(BaseModel):
uuid: str
@field_validator('uuid')
def validate_uuid(cls, value):
try:
uuid_obj = uuid.UUID(value)
except ValueError as e:
raise ValueError("Invalid UUID format") from e
return str(uuid_obj)

7
db/schemas/UserSchema.py Normal file
View File

@ -0,0 +1,7 @@
from pydantic import PositiveInt
from .UUIDSchema import UUIDSchema
class UserSchema(UUIDSchema):
username: str
expiry: PositiveInt = 1

5
db/schemas/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .TabSchema import TabSchema
from .MySQLConfig import MySQLConfig
from .RequestSchema import RequestSchema
from .UserSchema import UserSchema
from .UUIDSchema import UUIDSchema

3
db/sql_schemas/Base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

View File

@ -0,0 +1,13 @@
from sqlalchemy import Column, String, Boolean
from .Base import Base
class LitebansBan(Base):
__tablename__ = 'litebans_bans'
id = Column(String, primary_key=True)
uuid = Column(String, index=True)
active = Column(Boolean)
removed_by_uuid = Column(String)
removed_by_name = Column(String)
removed_by_reason = Column(String)

View File

@ -0,0 +1,10 @@
from sqlalchemy import Column, String
from .Base import Base
class LuckpermsPlayer(Base):
__tablename__ = 'luckperms_players'
uuid = Column(String, unique=True, index=True, primary_key=True)
username = Column(String, unique=True, index=True)
primary_group = Column(String, unique=False, index=True)

View File

@ -0,0 +1,15 @@
from sqlalchemy import Column, String, Integer
from .Base import Base
class LuckpermsUserPermission(Base):
__tablename__ = 'luckperms_user_permissions'
id = Column(Integer, primary_key=True)
uuid = Column(String, index=True)
permission = Column(String)
value = Column(Integer)
server = Column(String)
world = Column(String)
expiry = Column(Integer)
contexts = Column(String)

12
db/sql_schemas/TabUser.py Normal file
View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, String, Integer
from .Base import Base
class TabUser(Base):
__tablename__ = 'tab_users'
id = Column(Integer, primary_key=True)
user = Column(String, index=True)
property = Column(String)
value = Column(String)
expiry = Column(Integer)

View File

@ -0,0 +1,4 @@
from .LitebansBan import LitebansBan
from .LuckpermsPlayer import LuckpermsPlayer
from .LuckpermsUserPermission import LuckpermsUserPermission
from .TabUser import TabUser

2
main.py Normal file
View File

@ -0,0 +1,2 @@
if __name__ == 'main':
from app import app, database