love-bot/bot/database/db.py

327 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiosqlite
from typing import List, Optional
from datetime import date
import os
import logging
from bot.config import settings
from .models import User, Game, Streak, GameChoice
DATABASE_URL = settings.database_name
os.makedirs(os.path.dirname(DATABASE_URL), exist_ok=True)
async def init_db():
"""Инициализирует базу данных и создает таблицы, если их нет."""
async with aiosqlite.connect(DATABASE_URL) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_id INTEGER UNIQUE NOT NULL,
username TEXT
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS games (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_date DATE NOT NULL,
player1_id INTEGER NOT NULL,
player1_choice TEXT CHECK(player1_choice IN ('rock', 'scissors', 'paper')),
player2_id INTEGER NOT NULL,
player2_choice TEXT CHECK(player2_choice IN ('rock', 'scissors', 'paper')),
winner_id INTEGER,
is_finished BOOLEAN DEFAULT FALSE,
FOREIGN KEY (player1_id) REFERENCES users (id),
FOREIGN KEY (player2_id) REFERENCES users (id),
FOREIGN KEY (winner_id) REFERENCES users (id),
UNIQUE (game_date, player1_id, player2_id) -- Гарантирует одну игру в день для пары
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS streaks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
current_streak INTEGER DEFAULT 0,
max_streak INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
await db.commit()
async def add_user(telegram_id: int, username: Optional[str]) -> Optional[User]:
"""Добавляет нового пользователя, если его еще нет. Возвращает объект User."""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT id, telegram_id, username FROM users WHERE telegram_id = ?", (telegram_id,)) as cursor:
user_row = await cursor.fetchone()
if user_row:
return User(id=user_row[0], telegram_id=user_row[1], username=user_row[2])
try:
await db.execute(
"INSERT INTO users (telegram_id, username) VALUES (?, ?)",
(telegram_id, username)
)
await db.commit()
cursor = await db.execute("SELECT last_insert_rowid()")
row = await cursor.fetchone()
if not row:
logging.error(f"Could not retrieve last_insert_rowid after inserting user {telegram_id}")
return None
user_id = row[0]
await ensure_streak_record(user_id)
return User(id=user_id, telegram_id=telegram_id, username=username)
except aiosqlite.IntegrityError:
async with db.execute("SELECT id, telegram_id, username FROM users WHERE telegram_id = ?", (telegram_id,)) as cursor:
user_row = await cursor.fetchone()
if user_row:
return User(id=user_row[0], telegram_id=user_row[1], username=user_row[2])
return None
async def get_user_by_telegram_id(telegram_id: int) -> Optional[User]:
"""Ищет пользователя по его Telegram ID."""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT id, telegram_id, username FROM users WHERE telegram_id = ?", (telegram_id,)) as cursor:
user_row = await cursor.fetchone()
if user_row:
return User(id=user_row[0], telegram_id=user_row[1], username=user_row[2])
return None
async def get_user_by_id(user_id: int) -> Optional[User]:
"""Ищет пользователя по его ID в базе данных."""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT id, telegram_id, username FROM users WHERE id = ?", (user_id,)) as cursor:
user_row = await cursor.fetchone()
if user_row:
return User(id=user_row[0], telegram_id=user_row[1], username=user_row[2])
return None
async def get_all_users() -> List[User]:
"""Возвращает список всех зарегистрированных пользователей."""
users = []
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT id, telegram_id, username FROM users") as cursor:
async for row in cursor:
users.append(User(id=row[0], telegram_id=row[1], username=row[2]))
return users
# --- Функции для работы со стриками (заглушка ensure_streak_record добавлена для add_user) ---
async def ensure_streak_record(user_id: int) -> Streak:
"""
Гарантирует наличие записи о стриках для пользователя.
Если записи нет, создает ее с нулевыми значениями.
Возвращает объект Streak.
"""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT id, user_id, current_streak, max_streak FROM streaks WHERE user_id = ?", (user_id,)) as cursor:
streak_row = await cursor.fetchone()
if streak_row:
return Streak(id=streak_row[0], user_id=streak_row[1], current_streak=streak_row[2], max_streak=streak_row[3])
else:
await db.execute("INSERT INTO streaks (user_id) VALUES (?)", (user_id,))
await db.commit()
async with db.execute("SELECT id, user_id, current_streak, max_streak FROM streaks WHERE user_id = ?", (user_id,)) as new_cursor:
new_streak_row = await new_cursor.fetchone()
if new_streak_row:
return Streak(id=new_streak_row[0], user_id=new_streak_row[1], current_streak=new_streak_row[2], max_streak=new_streak_row[3])
else:
raise Exception(f"Could not create or find streak record for user_id {user_id}")
# --- Функции для работы с играми ---
def _row_to_game(row: Optional[tuple]) -> Optional[Game]:
"""Вспомогательная функция для преобразования строки базы данных в объект Game."""
if row:
return Game(
id=row[0],
game_date=date.fromisoformat(row[1]),
player1_id=row[2],
player1_choice=row[3],
player2_id=row[4],
player2_choice=row[5],
winner_id=row[6],
is_finished=bool(row[7])
)
return None
async def create_or_get_today_game(player1_id: int, player2_id: int) -> Optional[Game]:
"""
Находит или создает игру для указанной пары игроков на СЕГОДНЯШНЮЮ дату.
Возвращает объект Game или None в случае ошибки.
Гарантирует, что player1_id < player2_id для уникальности записи.
"""
today = date.today()
p1, p2 = sorted((player1_id, player2_id))
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute(
"""SELECT id, game_date, player1_id, player1_choice, player2_id, player2_choice, winner_id, is_finished
FROM games
WHERE game_date = ? AND player1_id = ? AND player2_id = ?""",
(today, p1, p2)
) as cursor:
game_row = await cursor.fetchone()
if game_row:
return _row_to_game(game_row)
try:
await db.execute(
"""INSERT INTO games (game_date, player1_id, player2_id) VALUES (?, ?, ?)""",
(today, p1, p2)
)
await db.commit()
async with db.execute(
"""SELECT id, game_date, player1_id, player1_choice, player2_id, player2_choice, winner_id, is_finished
FROM games
WHERE game_date = ? AND player1_id = ? AND player2_id = ?""",
(today, p1, p2)
) as new_cursor:
new_game_row = await new_cursor.fetchone()
return _row_to_game(new_game_row)
except aiosqlite.IntegrityError:
async with db.execute(
"""SELECT id, game_date, player1_id, player1_choice, player2_id, player2_choice, winner_id, is_finished
FROM games
WHERE game_date = ? AND player1_id = ? AND player2_id = ?""",
(today, p1, p2)
) as cursor:
game_row = await cursor.fetchone()
return _row_to_game(game_row)
except Exception as e:
print(f"Error creating/getting today's game: {e}")
return None
async def update_game_choice(game_id: int, player_id: int, choice: GameChoice) -> bool:
"""Обновляет выбор игрока в указанной игре."""
async with aiosqlite.connect(DATABASE_URL) as db:
game = await get_game_by_id(game_id)
if not game:
return False
column_to_update = None
if game.player1_id == player_id:
column_to_update = "player1_choice"
elif game.player2_id == player_id:
column_to_update = "player2_choice"
if not column_to_update:
return False
try:
await db.execute(
f"UPDATE games SET {column_to_update} = ? WHERE id = ?",
(choice, game_id)
)
await db.commit()
return True
except Exception as e:
print(f"Error updating game choice: {e}")
return False
async def get_game_by_id(game_id: int) -> Optional[Game]:
"""Получает игру по её ID."""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute(
"""SELECT id, game_date, player1_id, player1_choice, player2_id, player2_choice, winner_id, is_finished
FROM games
WHERE id = ?""",
(game_id,)
) as cursor:
game_row = await cursor.fetchone()
return _row_to_game(game_row)
async def finish_game(game_id: int, winner_id: Optional[int]) -> bool:
"""Отмечает игру как завершенную и указывает победителя (или None для ничьи)."""
async with aiosqlite.connect(DATABASE_URL) as db:
try:
await db.execute(
"UPDATE games SET winner_id = ?, is_finished = TRUE WHERE id = ?",
(winner_id, game_id)
)
await db.commit()
return True
except Exception as e:
print(f"Error finishing game: {e}")
return False
async def get_game_on_date(player1_id: int, player2_id: int, game_date: date) -> Optional[Game]:
"""Получает игру для пары игроков на указанную дату."""
p1, p2 = sorted((player1_id, player2_id))
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute(
"""SELECT id, game_date, player1_id, player1_choice, player2_id, player2_choice, winner_id, is_finished
FROM games
WHERE game_date = ? AND player1_id = ? AND player2_id = ?""",
(game_date, p1, p2)
) as cursor:
game_row = await cursor.fetchone()
return _row_to_game(game_row)
# --- Функции для работы со стриками ---
async def get_streak(user_id: int) -> Optional[Streak]:
"""Получает текущую и максимальную серию побед для пользователя."""
return await ensure_streak_record(user_id)
async def update_streak(user_id: int, win: bool):
"""Обновляет серию побед пользователя."""
async with aiosqlite.connect(DATABASE_URL) as db:
current_streak_data = await get_streak(user_id)
if not current_streak_data:
print(f"Error: Streak record not found for user {user_id} during update.")
return
current_streak = current_streak_data.current_streak
max_streak = current_streak_data.max_streak
if win:
current_streak += 1
if current_streak > max_streak:
max_streak = current_streak
else:
current_streak = 0
await db.execute(
"UPDATE streaks SET current_streak = ?, max_streak = ? WHERE user_id = ?",
(current_streak, max_streak, user_id)
)
await db.commit()
# --- Функции для статистики ---
async def get_wins_count(user_id: int) -> int:
"""Возвращает общее количество побед пользователя."""
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute("SELECT COUNT(*) FROM games WHERE winner_id = ? AND is_finished = TRUE", (user_id,)) as cursor:
result = await cursor.fetchone()
return result[0] if result else 0
async def get_total_games_count(player1_id: int, player2_id: int) -> int:
"""Возвращает общее количество сыгранных (завершенных) игр между двумя пользователями."""
p1, p2 = sorted((player1_id, player2_id))
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute(
"SELECT COUNT(*) FROM games WHERE player1_id = ? AND player2_id = ? AND is_finished = TRUE",
(p1, p2)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else 0
async def get_draw_count(player1_id: int, player2_id: int) -> int:
"""Возвращает общее количество ничьих между двумя пользователями."""
p1, p2 = sorted((player1_id, player2_id))
async with aiosqlite.connect(DATABASE_URL) as db:
async with db.execute(
"SELECT COUNT(*) FROM games WHERE player1_id = ? AND player2_id = ? AND is_finished = TRUE AND winner_id IS NULL",
(p1, p2)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else 0