import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import gymnasium as gym from gymnasium import spaces from typing import Any, Dict, Optional, Tuple class WheeledRobotEnv(gym.Env): """ Base gymnasium environment for wheeled robot simulation. This environment provides a foundation for simulating various types of wheeled robots in a 2D plane. The robot state consists of (x, y, theta) representing position and orientation. The environment supports different rendering modes and maintains a history of robot positions for visualization. Attributes: dt (float): Time step for simulation (default: 0.05) max_steps (int): Maximum number of simulation steps (default: 500) state (np.ndarray): Current robot state [x, y, theta] history (list): List of historical states for trajectory visualization observation_space (spaces.Box): Valid observation space bounds action_space (spaces.Space): Valid action space (defined by subclasses) render_mode (str): Rendering mode ("human", "rgb_array", or None) The coordinate system uses: - x: horizontal position - y: vertical position - theta: orientation angle in radians """ metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 20} def __init__(self, render_mode: Optional[str] = None, random_target: bool = True, target_position: Optional[np.ndarray] = None, velocity_tolerance: float = 0.05, speed_limit_enabled: bool = False): super().__init__() self.dt = 0.05 self.max_steps = 400 self.current_step = 0 self.state = np.array([0.0, 0.0, 0.0], dtype=np.float32) self.state_derivative = np.array([0.0, 0.0, 0.0], dtype=np.float32) self.history = [] self.time_history = [] # Target position configuration self.random_target = random_target self.fixed_target = target_position if target_position is not None else np.array([10.0, 5.0, np.pi], dtype=np.float32) self.target_position = np.array([0.0, 0.0, 0.0], dtype=np.float32) # Target reaching parameters self.position_tolerance = 0.2 # meters self.orientation_tolerance = 0.1 # radians self.velocity_tolerance = velocity_tolerance # units/step for "stopped" detection self.target_reached_reward = 100.0 # Speed limiting parameters self.speed_limit_enabled = speed_limit_enabled self._wheel_speed_limits = None # To be set by subclasses # Expanded observation space: [x, y, theta, dx, dy, dtheta, time, target_x, target_y, target_theta] self.observation_space = spaces.Box( low=np.array([-15.0, -10.0, -np.pi, -15.0, -10.0, -10.0, 0.0, -15.0, -10.0, -np.pi], dtype=np.float32), high=np.array([15.0, 10.0, np.pi, 15.0, 10.0, 10.0, 1000.0, 15.0, 10.0, np.pi], dtype=np.float32), dtype=np.float32 ) self.action_space = self._define_action_space() self.render_mode = render_mode self.h_fig = None self.h_ax = None # Default render bounds self.render_xlim = (-5, 5) self.render_ylim = (-2, 8) self.F = self._F() def _apply_wheel_speed_limits(self, wheel_speeds: np.ndarray) -> np.ndarray: """Apply wheel speed limits if enabled""" if not self.speed_limit_enabled or self._wheel_speed_limits is None: return wheel_speeds # Apply symmetric limits to each wheel limited_speeds = np.clip(wheel_speeds.flatten(), -self._wheel_speed_limits, self._wheel_speed_limits) return limited_speeds.reshape(-1, 1) def _define_action_space(self): return spaces.Box(low=-5.0, high=5.0, shape=(3,), dtype=np.float32) @staticmethod def _F(): return np.eye(3) @staticmethod def _R(theta): return np.array([[np.cos(theta), -np.sin(theta), 0.0], [np.sin(theta), np.cos(theta), 0.0], [0, 0, 1]]) def _generate_target_position(self) -> np.ndarray: """Generate a new target position""" if self.random_target: # Generate random target within bounds, avoiding robot start position x = np.random.uniform(-10.0, 10.0) y = np.random.uniform(-5.0, 8.0) theta = np.random.uniform(0, 2*np.pi) # Ensure target is not too close to start position (0,0,0) while np.sqrt(x**2 + y**2) < 2.0: x = np.random.uniform(-10.0, 10.0) y = np.random.uniform(-5.0, 8.0) return np.array([x, y, theta], dtype=np.float32) else: return self.fixed_target.copy() def _is_target_reached(self) -> bool: """Check if robot has reached the target within tolerance AND is stopped""" pos_error = np.abs(self.state[:2] - self.target_position[:2]) angle_error = np.abs(np.mod(self.state[2] - self.target_position[2] + np.pi, 2*np.pi) - np.pi) robot_velocity = np.linalg.norm(self.state_derivative) return bool((pos_error[0] < self.position_tolerance) and (pos_error[1] < self.position_tolerance) and (angle_error < self.orientation_tolerance) and (robot_velocity < self.velocity_tolerance)) def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) -> Tuple[np.ndarray, Dict[str, Any]]: if seed is not None: np.random.seed(seed) self.state = np.array([0.0, 0.0, 0.0], dtype=np.float32) self.state_derivative = np.array([0.0, 0.0, 0.0], dtype=np.float32) self.current_step = 0 self.history = [self.state.copy()] self.time_history = [0.0] # Generate new target position self.target_position = self._generate_target_position() # Include target in observation: [state, state_derivative, time, target] observation = np.concatenate([self.state, self.state_derivative, [self.current_step * self.dt], self.target_position]) return observation.astype(np.float32), {} def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]: # if isinstance(self.action_space, spaces.Box): # action = np.clip(action, self.action_space.low, self.action_space.high) U = action.reshape(-1, 1) X = self.state.reshape(-1, 1) # Apply wheel speed limits before kinematic conversion U = self._apply_wheel_speed_limits(U) # U represents wheel velocities from controllers # F converts wheel velocities to robot velocities robot_velocities = self.F @ U Xp = X + self._R(X[2, 0]) @ robot_velocities * self.dt previous_state = self.state.copy() self.state = Xp.flatten().astype(np.float32) self.state[2] = np.mod(self.state[2], 2*np.pi) # Wrap theta to [0, 2π] self.state_derivative = ((self.state - previous_state) / self.dt).astype(np.float32) self.current_step += 1 self.history.append(self.state.copy()) self.time_history.append(self.current_step * self.dt) reward = self._compute_reward(action) terminated = self._is_terminated() truncated = self.current_step >= self.max_steps observation = np.concatenate([self.state, self.state_derivative, [self.current_step * self.dt], self.target_position]) return observation.astype(np.float32), reward, terminated, truncated, {} def _compute_reward(self, action: np.ndarray) -> float: # Base action penalty base_reward = -0.01 * np.sum(action**2) # Check if target is reached position_error = np.linalg.norm(self.state[:2] - self.target_position[:2]) orientation_error = abs(self.state[2] - self.target_position[2]) # Handle angle wrapping for orientation error orientation_error = min(orientation_error, 2*np.pi - orientation_error) # Check if robot is both at target AND stopped robot_velocity = np.linalg.norm(self.state_derivative) # Target reached bonus (requires position, orientation AND velocity criteria) if (position_error < self.position_tolerance and orientation_error < self.orientation_tolerance and robot_velocity < self.velocity_tolerance): return base_reward + self.target_reached_reward return base_reward def _is_terminated(self) -> bool: return self._is_target_reached() def set_render_bounds(self, xlim: Tuple[float, float], ylim: Tuple[float, float]) -> None: """ Set the coordinate bounds for rendering. Args: xlim: Tuple of (x_min, x_max) for x-axis limits ylim: Tuple of (y_min, y_max) for y-axis limits """ self.render_xlim = xlim self.render_ylim = ylim def render(self): if self.render_mode == "human": self._render_human() elif self.render_mode == "rgb_array": return self._render_rgb_array() def _render_human(self): if self.h_fig is None: self._create_figure() if len(self.history) > 1 and self.h_ax is not None: history_array = np.array(self.history) time_array = np.array(self.time_history) current_time = self.current_step * self.dt # Clear all subplots for ax in self.h_ax: ax.clear() # Main trajectory plot (left panel) self.h_ax[0].plot(history_array[:, 0], history_array[:, 1], 'b-', alpha=0.7, label='Trajectory') self.h_ax[0].plot(self.state[0], self.state[1], 'ro', markersize=8, label='Current Position') # Draw target position self.h_ax[0].plot(self.target_position[0], self.target_position[1], 'gs', markersize=10, label='Target Position') target_tip_points = self._get_robot_axis_points(self.target_position) self.h_ax[0].plot(target_tip_points[0, :2], target_tip_points[1, :2], 'g--', linewidth=1, alpha=0.7) self.h_ax[0].plot(target_tip_points[0, [0, 2]], target_tip_points[1, [0, 2]], 'g--', linewidth=1, alpha=0.7) tip_points = self._get_robot_axis_points(self.state) self.h_ax[0].plot(tip_points[0, :2], tip_points[1, :2], 'r-', linewidth=2, label='X-axis') self.h_ax[0].plot(tip_points[0, [0, 2]], tip_points[1, [0, 2]], 'g-', linewidth=2, label='Y-axis') self.h_ax[0].set_xlim(*self.render_xlim) self.h_ax[0].set_ylim(*self.render_ylim) self.h_ax[0].set_aspect('equal', adjustable='box') self.h_ax[0].grid(True) self.h_ax[0].set_xlabel('x [m]') self.h_ax[0].set_ylabel('y [m]') self.h_ax[0].set_title(f'Robot Position (Time: {current_time:04.2f}s -- Step: {self.current_step:03d})') # Time-series plots (right panels) labels = ['x [m]', 'y [m]', 'θ [rad]'] for i, (ax, label) in enumerate(zip(self.h_ax[1:], labels)): ax.plot(time_array, history_array[:, i], 'k-', linewidth=1) ax.plot(current_time, self.state[i], 'ro', markersize=6) ax.set_xlabel('Time [s]') ax.set_ylabel(label) ax.grid(True) ax.set_xlim(0, max(current_time + 1, 5)) # Fixed y-axis ranges for time-series plots if i == 0: # x position ax.set_ylim(*self.render_xlim) # Use same range as main plot elif i == 1: # y position ax.set_ylim(*self.render_ylim) # Use same range as main plot elif i == 2: # theta angle ax.set_ylim(0, 2*np.pi) # Fixed range for full rotation plt.tight_layout() plt.pause(self.dt) def _render_rgb_array(self): return None def _create_figure(self): self.h_fig = plt.figure(figsize=(10, 5.5)) # Create multi-panel layout matching original self.h_ax = [ self.h_fig.add_subplot(1, 2, 1), # Main trajectory plot self.h_fig.add_subplot(3, 2, 2), # x vs time self.h_fig.add_subplot(3, 2, 4), # y vs time self.h_fig.add_subplot(3, 2, 6) # theta vs time ] # Configure main trajectory plot self.h_ax[0].set_xlabel('x [m]') self.h_ax[0].set_ylabel('y [m]') self.h_ax[0].set_title('Wheeled Robot') self.h_ax[0].axis('equal') self.h_ax[0].grid(True) self.h_ax[0].set_xlim(*self.render_xlim) self.h_ax[0].set_ylim(*self.render_ylim) # Configure time-series plots labels = ['x [m]', 'y [m]', 'θ [rad]'] for i, (ax, label) in enumerate(zip(self.h_ax[1:], labels)): ax.set_xlabel('Time [s]') ax.set_ylabel(label) ax.grid(True) plt.tight_layout() plt.ion() def _get_robot_axis_points(self, state): tip_points = np.zeros((2, 3), dtype=float) tip_points[:, 0] = state[:2] Rxy = self._R(state[2])[:2, :2] tip_points[:, [1]] = Rxy @ np.array([[0.7], [0]]) + tip_points[:, [0]] tip_points[:, [2]] = Rxy @ np.array([[0], [0.7]]) + tip_points[:, [0]] return tip_points def close(self): if self.h_fig is not None: plt.close(self.h_fig) self.h_fig = None self.h_ax = None class OmniRobotEnv(WheeledRobotEnv): """ Omnidirectional robot environment using three-wheel omnidirectional drive. This environment simulates a robot with three omnidirectional wheels arranged at 120-degree intervals. The robot can move in any direction and rotate simultaneously without changing orientation first. The action space consists of 3 wheel velocities, which are converted to robot motion through the kinematic transformation matrix F. Wheel configuration: - Wheel 1: 0° (front) - Wheel 2: 120° (rear left) - Wheel 3: 240° (rear right) - All wheels at distance l=0.5 from center - Wheel radius r=0.1 Action space: Box(3,) representing wheel velocities [-100.0, 100.0] """ def __init__(self, render_mode: Optional[str] = None, random_target: bool = True, target_position: Optional[np.ndarray] = None, velocity_tolerance: float = 0.05, speed_limit_enabled: bool = False): super().__init__(render_mode, random_target, target_position, velocity_tolerance, speed_limit_enabled) self._wheel_speed_limits = np.array([70.0, 70.0, 70.0]) def _define_action_space(self): return spaces.Box(low=-100.0, high=100.0, shape=(3,), dtype=np.float32) @staticmethod def _F(): a1 = 0.0; b1 = 0.0; l1 = 0.5 a2 = (2/3)*np.pi; b2 = 0.0; l2 = 0.5 a3 = (4/3)*np.pi; b3 = 0.0; l3 = 0.5 r = 0.1 J = np.array([ [1/r*np.sin(a1+b1), -1/r*np.cos(a1+b1), -l1/r*np.cos(b1)], [1/r*np.sin(a2+b2), -1/r*np.cos(a2+b2), -l2/r*np.cos(b2)], [1/r*np.sin(a3+b3), -1/r*np.cos(a3+b3), -l3/r*np.cos(b3)] ]) F = np.linalg.inv(J) return F class MecanumRobotEnv(WheeledRobotEnv): """ Mecanum wheel robot environment using four-wheel mecanum drive. This environment simulates a robot with four mecanum wheels arranged in a rectangular configuration. Each wheel has rollers at 45° angles, allowing the robot to move in any direction and rotate simultaneously. The action space consists of 4 wheel velocities for the mecanum wheels, which are converted to robot motion through the kinematic transformation matrix F using the Moore-Penrose pseudoinverse. Wheel configuration: - Front-left wheel: position (+0.5, +0.5), roller direction (+1, +1)/√2 - Front-right wheel: position (-0.5, +0.5), roller direction (-1, +1)/√2 - Rear-left wheel: position (-0.5, -0.5), roller direction (-1, -1)/√2 - Rear-right wheel: position (+0.5, -0.5), roller direction (+1, -1)/√2 - All wheels have radius r=0.1 Action space: Box(4,) representing wheel velocities [-160.0, 160.0] """ def __init__(self, render_mode: Optional[str] = None, random_target: bool = True, target_position: Optional[np.ndarray] = None, velocity_tolerance: float = 0.05, speed_limit_enabled: bool = False): super().__init__(render_mode, random_target, target_position, velocity_tolerance, speed_limit_enabled) self._wheel_speed_limits = np.array([40.0, 40.0, 40.0, 40.0]) def _define_action_space(self): return spaces.Box(low=-160.0, high=160.0, shape=(4,), dtype=np.float32) @staticmethod def _F(): l1 = np.array([+0.5, +0.5]); f1 = np.array([+1, +1])/np.sqrt(2); d1 = np.array([+1, 0]) l2 = np.array([-0.5, +0.5]); f2 = np.array([-1, +1])/np.sqrt(2); d2 = np.array([+1, 0]) l3 = np.array([-0.5, -0.5]); f3 = np.array([-1, -1])/np.sqrt(2); d3 = np.array([-1, 0]) l4 = np.array([+0.5, -0.5]); f4 = np.array([+1, -1])/np.sqrt(2); d4 = np.array([-1, 0]) r = 0.1 J11 = f1[1]/(r*d1[0]*f1[1]-r*d1[1]*f1[0]) J21 = f2[1]/(r*d2[0]*f2[1]-r*d2[1]*f2[0]) J31 = f3[1]/(r*d3[0]*f3[1]-r*d3[1]*f3[0]) J41 = f4[1]/(r*d4[0]*f4[1]-r*d4[1]*f4[0]) J12 = -f1[0]/(r*d1[0]*f1[1]-r*d1[1]*f1[0]) J22 = -f2[0]/(r*d2[0]*f2[1]-r*d2[1]*f2[0]) J32 = -f3[0]/(r*d3[0]*f3[1]-r*d3[1]*f3[0]) J42 = -f4[0]/(r*d4[0]*f4[1]-r*d4[1]*f4[0]) J13 = (-f1[1]*l1[1]-f1[0]*l1[0])/(r*d1[0]*f1[1]-r*d1[1]*f1[0]) J23 = (-f2[1]*l2[1]-f2[0]*l2[0])/(r*d2[0]*f2[1]-r*d2[1]*f2[0]) J33 = (-f3[1]*l3[1]-f3[0]*l3[0])/(r*d3[0]*f3[1]-r*d3[1]*f3[0]) J43 = (-f4[1]*l4[1]-f4[0]*l4[0])/(r*d4[0]*f4[1]-r*d4[1]*f4[0]) J = np.array([[J11, J12, J13], [J21, J22, J23], [J31, J32, J33], [J41, J42, J43]]) F = np.linalg.pinv(J) return F class TankRobotEnv(WheeledRobotEnv): """ Tank-drive (differential drive) robot environment using two wheels. This environment simulates a robot with two independently controlled wheels on opposite sides. The robot moves forward/backward when both wheels rotate in the same direction, and turns when wheels rotate at different speeds. The robot cannot move sideways and must rotate to change direction. The action space consists of 2 wheel velocities, which are converted to robot motion through the kinematic transformation matrix F using the Moore-Penrose pseudoinverse. Wheel configuration: - Left wheel: contributes to forward motion and rotation - Right wheel: contributes to forward motion and rotation - Wheel base b=1.0 (distance between wheels) - Wheel radius r=0.1 Action space: Box(2,) representing left and right wheel velocities [-100.0, 100.0] """ def __init__(self, render_mode: Optional[str] = None, random_target: bool = True, target_position: Optional[np.ndarray] = None, velocity_tolerance: float = 0.05, speed_limit_enabled: bool = False): super().__init__(render_mode, random_target, target_position, velocity_tolerance, speed_limit_enabled) self._wheel_speed_limits = np.array([50.0, 50.0]) def _define_action_space(self): return spaces.Box(low=-100.0, high=100.0, shape=(2,), dtype=np.float32) @staticmethod def _F(): b = 1 r = 0.1 J = np.array([ [+1/r, 0, -b/(2*r)], [-1/r, 0, -b/(2*r)] ]) F = np.linalg.pinv(J) return F class AckermanRobotEnv(WheeledRobotEnv): """ Ackermann steering robot environment using bicycle kinematics. This environment simulates a car-like robot with Ackermann steering geometry. The robot has a front steering wheel and rear driving wheel, similar to a bicycle model. It moves forward/backward and steers by changing the front wheel angle. Unlike other robot types, this class overrides the step() method to implement bicycle kinematics instead of using the standard F matrix approach. Vehicle parameters: - Wheel radius r=0.1 - Uses bicycle kinematics model - Steering angle affects turning radius Action space: Box(2,) with: - action[0]: driving velocity [-5.0, 5.0] - action[1]: steering angle [-1.0, 1.0] radians The robot cannot move sideways and follows curved paths based on the steering angle and forward velocity. """ def __init__(self, render_mode: Optional[str] = None, random_target: bool = True, target_position: Optional[np.ndarray] = None, velocity_tolerance: float = 0.05, speed_limit_enabled: bool = False): super().__init__(render_mode, random_target, target_position, velocity_tolerance, speed_limit_enabled) # Set hardcoded limits with 10% margin above max observed (velocity: 1.3, steering: 0.6) self._wheel_speed_limits = np.array([4, 2]) def _define_action_space(self): return spaces.Box(low=np.array([-10.0, -1.0]), high=np.array([10.0, 1.0]), dtype=np.float32) def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]: # if isinstance(self.action_space, spaces.Box): # action = np.clip(action, self.action_space.low, self.action_space.high) # Apply speed limits for Ackerman robot (velocity, steering angle) action = self._apply_wheel_speed_limits(action.reshape(-1, 1)).flatten() r = 0.1 dU = action.reshape(-1, 1) X = self.state.reshape(-1, 1) if dU[1, 0] != 0: dX = np.array([[r*dU[0,0]], [0], [r*dU[0,0]/dU[1,0]]]) else: dX = np.array([[r*dU[0,0]], [0], [0]]) Xp = X + self._R(X[2, 0]) @ dX * self.dt previous_state = self.state.copy() self.state = Xp.flatten().astype(np.float32) self.state[2] = np.mod(self.state[2], 2*np.pi) # Wrap theta to [0, 2π] self.state_derivative = ((self.state - previous_state) / self.dt).astype(np.float32) self.current_step += 1 self.history.append(self.state.copy()) self.time_history.append(self.current_step * self.dt) reward = self._compute_reward(action) terminated = self._is_terminated() truncated = self.current_step >= self.max_steps observation = np.concatenate([self.state, self.state_derivative, [self.current_step * self.dt], self.target_position]) return observation.astype(np.float32), reward, terminated, truncated, {}