mts-mouse/n2_new.py

377 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import time
import heapq
class Cell:
def __init__(self, x, y):
self.x = x
self.y = y
self.walls = {'N': None, 'E': None, 'S': None, 'W': None}
self.visited = False
def get_wall_code(self):
up = self.walls['N']
right = self.walls['E']
down = self.walls['S']
left = self.walls['W']
walls = (
1 if up else 0,
1 if right else 0,
1 if down else 0,
1 if left else 0
)
walls_to_code = {
(0, 0, 0, 0): 0,
(0, 0, 0, 1): 1,
(1, 0, 0, 0): 2,
(0, 1, 0, 0): 3,
(0, 0, 1, 0): 4,
(0, 0, 1, 1): 5,
(0, 1, 1, 0): 6,
(1, 1, 0, 0): 7,
(1, 0, 0, 1): 8,
(0, 1, 0, 1): 9,
(1, 0, 1, 0): 10,
(1, 1, 1, 0): 11,
(1, 1, 0, 1): 12,
(1, 0, 1, 1): 13,
(0, 1, 1, 1): 14,
(1, 1, 1, 1): 15
}
return walls_to_code.get(walls, 0)
class Maze:
class ExitDFS(Exception):
pass
def __init__(self):
self.size = 16
self.restart()
self.grid = [[Cell(x, y) for y in range(self.size)] for x in range(self.size)]
self.cells_to_ignore = {(7, 8)}
def restart(self):
#self.grid = [[Cell(x, y) for y in range(self.size)] for x in range(self.size)]
self.start_x = 0
self.start_y = 0
def get_cell(self, x, y):
return self.grid[x][y]
def is_within_bounds(self, x, y):
return 0 <= x < self.size and 0 <= y < self.size
def to_array(self):
maze_array = []
for y in range(self.size - 1, -1, -1):
row = []
for x in range(self.size):
cell = self.get_cell(x, y)
row.append(cell.get_wall_code())
maze_array.append(row)
return maze_array
class Robot:
def __init__(self, token: str, maze: Maze):
self.token = token
self.api_url = 'http://127.0.0.1:8801/api/v1'
self.maze = maze
self.restart()
def restart(self):
self.x = self.maze.start_x
self.y = self.maze.start_y
self.orientation = 'N' # 'N', 'E', 'S', 'W'
self.visited_cells = set()
self.move_count = 0
def get_sensor_data(self):
time.sleep(0.2)
response = requests.get(f'{self.api_url}/robot-cells/sensor-data', params={'token': self.token})
return response.json()
def move_forward(self):
requests.post(f'{self.api_url}/robot-cells/forward', params={'token': self.token})
time.sleep(0.2)
self.update_position('forward')
def turn_left(self):
requests.post(f'{self.api_url}/robot-cells/left', params={'token': self.token})
time.sleep(0.2)
self.update_orientation('left')
def turn_right(self):
requests.post(f'{self.api_url}/robot-cells/right', params={'token': self.token})
time.sleep(0.2)
self.update_orientation('right')
def update_orientation(self, turn):
directions = ['N', 'E', 'S', 'W']
idx = directions.index(self.orientation)
if turn == 'left':
self.orientation = directions[(idx - 1) % 4]
elif turn == 'right':
self.orientation = directions[(idx + 1) % 4]
def update_position(self, move):
dx, dy = 0, 0
if self.orientation == 'N':
dy = 1 if move == 'forward' else -1
elif self.orientation == 'E':
dx = 1 if move == 'forward' else -1
elif self.orientation == 'S':
dy = -1 if move == 'forward' else 1
elif self.orientation == 'W':
dx = -1 if move == 'forward' else 1
self.x += dx
self.y += dy
self.move_count += 1
def explore(self):
try:
self._dfs(self.x, self.y)
except self.maze.ExitDFS:
pass
def _dfs(self, x, y):
if (x, y) in self.maze.cells_to_ignore:
return
cell = self.maze.get_cell(x, y)
if cell.visited:
return
cell.visited = True
self.visited_cells.add((x, y))
sensor_data = self.get_sensor_data()
threshold = 100
walls = {}
walls[self.orientation_to_dir('N')] = sensor_data['front_distance'] < threshold
walls[self.orientation_to_dir('E')] = sensor_data['right_side_distance'] < threshold
walls[self.orientation_to_dir('W')] = sensor_data['left_side_distance'] < threshold
walls[self.orientation_to_dir('S')] = sensor_data['back_distance'] < threshold
cell.walls.update(walls)
possible_directions = []
for direction in ['N', 'E', 'S', 'W']:
if not cell.walls[direction]:
nx, ny = self.get_next_position(x, y, direction)
if self.maze.is_within_bounds(nx, ny) and (nx, ny) not in self.visited_cells:
if (nx, ny) not in self.maze.cells_to_ignore:
possible_directions.append(direction)
for direction in possible_directions:
nx, ny = self.get_next_position(x, y, direction)
self.move_to(direction)
self._dfs(nx, ny)
self.move_to(self.opposite_direction(direction))
def get_next_position(self, x, y, direction):
dx, dy = 0, 0
if direction == 'N':
dy = 1
elif direction == 'E':
dx = 1
elif direction == 'S':
dy = -1
elif direction == 'W':
dx = -1
return x + dx, y + dy
def move_to(self, direction):
turns = self.calculate_turns(self.orientation, direction)
for turn in turns:
if turn == 'left':
self.turn_left()
elif turn == 'right':
self.turn_right()
self.move_forward()
def calculate_turns(self, current_orientation, target_orientation):
directions = ['N', 'E', 'S', 'W']
idx_current = directions.index(current_orientation)
idx_target = directions.index(target_orientation)
delta = (idx_target - idx_current) % 4
if delta == 0:
return []
elif delta == 1:
return ['right']
elif delta == 2:
return ['right', 'right']
elif delta == 3:
return ['left']
def opposite_direction(self, direction):
opposites = {'N': 'S', 'E': 'W', 'S': 'N', 'W': 'E'}
return opposites[direction]
def orientation_to_dir(self, relative_direction):
directions = ['N', 'E', 'S', 'W']
idx = directions.index(self.orientation)
if relative_direction == 'N':
return directions[idx]
elif relative_direction == 'E':
return directions[(idx + 1) % 4]
elif relative_direction == 'S':
return directions[(idx + 2) % 4]
elif relative_direction == 'W':
return directions[(idx - 1) % 4]
def restart_maze(self):
response = requests.post(f"{self.api_url}/maze/restart", params={'token': self.token})
time.sleep(0.5)
if response.status_code == 200:
print("Лабиринт перезапущен.")
else:
print(f"Ошибка при перезапуске лабиринта: {response.text}")
def find_shortest_path_to_target(self, target_x, target_y):
start_state = (self.x, self.y, self.orientation)
queue = []
heapq.heappush(queue, (0, 0, start_state, []))
visited = set()
while queue:
f, g, (x, y, orientation), path = heapq.heappop(queue)
if (x, y, orientation) in visited:
continue
visited.add((x, y, orientation))
if (x, y) == (target_x, target_y):
print(f"Найден путь до цели ({target_x}, {target_y})")
return path
cell = self.maze.get_cell(x, y)
for action in ['forward', 'left_forward', 'right_forward']:
if action == 'forward':
next_orientation = orientation
dx, dy = self.get_direction_delta(orientation)
nx, ny = x + dx, y + dy
action_cost = 1
elif action == 'left_forward':
next_orientation = self.turn_orientation(orientation, 'left')
dx, dy = self.get_direction_delta(next_orientation)
nx, ny = x + dx, y + dy
action_cost = 2
elif action == 'right_forward':
next_orientation = self.turn_orientation(orientation, 'right')
dx, dy = self.get_direction_delta(next_orientation)
nx, ny = x + dx, y + dy
action_cost = 2
if not self.maze.is_within_bounds(nx, ny):
continue
next_cell = self.maze.get_cell(nx, ny)
wall = cell.walls.get(next_orientation)
if wall is None or wall:
continue
h = self.distance_to_target(nx, ny, target_x, target_y)
heapq.heappush(queue, (
g + action_cost + h,
g + action_cost,
(nx, ny, next_orientation),
path + [(action, next_orientation)]
))
print(f"Путь до цели ({target_x}, {target_y}) не найден.")
return None
def execute_path(self, path):
for action, orientation in path:
if action == 'forward':
self.move_forward()
elif action == 'left_forward':
self.turn_left()
self.move_forward()
elif action == 'right_forward':
self.turn_right()
self.move_forward()
self.orientation = orientation
print(f"Робот прибыл к цели на позиции ({self.x}, {self.y})")
def get_direction_delta(self, orientation):
if orientation == 'N':
return 0, 1
elif orientation == 'E':
return 1, 0
elif orientation == 'S':
return 0, -1
elif orientation == 'W':
return -1, 0
def turn_orientation(self, current_orientation, turn):
directions = ['N', 'E', 'S', 'W']
idx = directions.index(current_orientation)
if turn == 'left':
return directions[(idx - 1) % 4]
elif turn == 'right':
return directions[(idx + 1) % 4]
def distance_to_target(self, x, y, target_x, target_y):
return abs(x - target_x) + abs(y - target_y)
def restart(robot: Robot, maze: Maze):
robot.restart()
robot.restart_maze()
maze.restart()
def start_once(robot: Robot, matrix_check: bool=True, score_check: bool=True):
robot.explore()
print_results(robot=robot, matrix_check=matrix_check, score_check=score_check)
restart(robot, robot.maze)
path = robot.find_shortest_path_to_target(7, 8)
if path:
robot.execute_path(path)
else:
print("Не удалось найти путь до клетки (7, 8).")
def print_results(robot: Robot, matrix_check: bool=False, score_check: bool=True):
maze_array = robot.maze.to_array()
if matrix_check:
for row in maze_array:
print(row)
if not score_check:
return
response = requests.post(
f'{robot.api_url}/matrix/send',
params={'token': robot.token},
json=maze_array
)
if response.status_code == 200:
response_data = response.json()
score = response_data.get('Score')
if score is not None:
print('Отправка матрицы завершена' + f', Score: {score}')
else:
print('Отправка матрицы завершена, но Score отсутствует в ответе:', response_data)
else:
print('Ошибка при отправке матрицы, статус код:', response.status_code)
def main(num_att=1):
token = 'token'
maze = Maze()
robot = Robot(token=token, maze=maze)
times = time.time()
for i in range(num_att):
start_time = time.time()
print(f'Попытка {i + 1}')
start_once(robot=robot)
print(f'Время {time.time() - start_time} секунд')
print(f'Общее время {time.time() - times} секунд (лимит - {60 * 15})')
if __name__ == '__main__':
main(num_att=1)