Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dimos/robot/global_planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Planner(Visualizable):
@abstractmethod
def plan(self, goal: VectorLike) -> Path: ...

def set_goal(self, goal: VectorLike, goal_theta: float = 0.0, stop_event: Optional[threading.Event] = None):
def set_goal(self, goal: VectorLike, goal_theta: Optional[float] = None, stop_event: Optional[threading.Event] = None):
goal = to_vector(goal).to_2d()
path = self.plan(goal)
if not path:
Expand Down
40 changes: 6 additions & 34 deletions dimos/robot/local_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
visualize_local_planner_state
)

from dimos.types.vector import VectorLike, to_tuple
from dimos.types.vector import VectorLike, Vector, to_tuple
from dimos.types.path import Path
from nav_msgs.msg import OccupancyGrid

Expand Down Expand Up @@ -97,35 +97,6 @@ def __init__(self,
# topics
self.local_costmap = self.robot.ros_control.topic_latest("/local_costmap/costmap", OccupancyGrid)

def _transform_orientation_to_odom(self, orientation: float, source_frame: str) -> float:
"""Transform an orientation angle from source frame to odom frame.

Args:
orientation: Orientation angle in radians (in source frame)
source_frame: The source frame of the orientation

Returns:
float: Transformed orientation in odom frame, or original if transform fails
"""
# If already in odom frame, no need to transform
if source_frame == "odom":
return orientation

try:
# Use existing transform_euler_rot to get the rotation offset between frames
rot_offset = self.robot.ros_control.transform_euler_rot(source_frame, "odom")
if rot_offset is not None:
# Add the yaw component of the offset to transform the orientation
transformed_orientation = normalize_angle(orientation + rot_offset[2])
logger.info(f"Orientation transformed from {source_frame} to odom: {transformed_orientation:.2f} rad")
return transformed_orientation
else:
logger.warning(f"Could not transform orientation from {source_frame} to odom frame. Using as is.")
return orientation
except Exception as e:
logger.warning(f"Error transforming orientation: {e}. Using orientation as is.")
return orientation

def set_goal(self, goal_xy: VectorLike, frame: str = "odom", goal_theta: Optional[float] = None):
"""Set a single goal position, converting to odom frame if necessary.
This clears any existing waypoints being followed.
Expand Down Expand Up @@ -158,8 +129,8 @@ def set_goal(self, goal_xy: VectorLike, frame: str = "odom", goal_theta: Optiona

# Set goal orientation if provided
if goal_theta is not None:
self.goal_theta = self._transform_orientation_to_odom(goal_theta, frame)
logger.info(f"Goal orientation set in odom frame: {self.goal_theta:.2f} rad")
transformed_rot = self.robot.ros_control.transform_rot(Vector(0.0, 0.0, goal_theta), source_frame=frame, target_frame="odom")
self.goal_theta = transformed_rot[2]

def set_goal_waypoints(self, waypoints: Path, frame: str = "map", goal_theta: Optional[float] = None):
"""Sets a path of waypoints for the robot to follow.
Expand Down Expand Up @@ -201,8 +172,8 @@ def set_goal_waypoints(self, waypoints: Path, frame: str = "map", goal_theta: Op

# Set goal orientation if provided
if goal_theta is not None:
self.goal_theta = self._transform_orientation_to_odom(goal_theta, frame)
logger.info(f"Final waypoint orientation set in odom frame: {self.goal_theta:.2f} rad")
transformed_rot = self.robot.ros_control.transform_rot(Vector(0.0, 0.0, goal_theta), source_frame=frame, target_frame="odom")
self.goal_theta = transformed_rot[2]

def _update_waypoint_target(self, robot_pos_np: np.ndarray) -> bool:
"""Helper function to manage waypoint progression and update the target goal.
Expand Down Expand Up @@ -463,6 +434,7 @@ def update_visualization(self) -> np.ndarray:
grid_origin=grid_origin,
robot_pose=robot_pose,
goal_xy=goal_xy, # Current target (lookahead or final)
goal_theta=self.goal_theta, # Pass goal orientation if available
visualization_size=self.visualization_size,
robot_width=self.robot_width,
robot_length=self.robot_length,
Expand Down
66 changes: 56 additions & 10 deletions dimos/robot/ros_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ def tf_buffer(self) -> Buffer:

return self._tf_buffer

def transform_euler_pos(self, child_frame: str, parent_frame: str = "map", timeout: float = 1.0):
return to_euler_pos(self.transform(child_frame, parent_frame, timeout))
def transform_euler_pos(self, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
return to_euler_pos(self.transform(source_frame, target_frame, timeout))

def transform_euler_rot(self, child_frame: str, parent_frame: str = "map", timeout: float = 1.0):
return to_euler_rot(self.transform(child_frame, parent_frame, timeout))
def transform_euler_rot(self, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
return to_euler_rot(self.transform(source_frame, target_frame, timeout))

def transform_euler(self, child_frame: str, parent_frame: str = "map", timeout: float = 1.0):
res = self.transform(child_frame, parent_frame, timeout)
def transform_euler(self, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
res = self.transform(source_frame, target_frame, timeout)
return to_euler(res)

def transform(
self, child_frame: str, parent_frame: str = "map", timeout: float = 1.0
self, source_frame: str, target_frame: str = "map", timeout: float = 1.0
) -> Optional[TransformStamped]:
try:
transform = self.tf_buffer.lookup_transform(
parent_frame,
child_frame,
target_frame,
source_frame,
rclpy.time.Time(),
rclpy.duration.Duration(seconds=timeout),
)
Expand All @@ -83,7 +83,7 @@ def transform(
return None

def transform_point(self, point: Vector, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
"""Transform a point from child_frame to parent_frame.
"""Transform a point from source_frame to target_frame.

Args:
point: The point to transform (x, y, z)
Expand Down Expand Up @@ -119,6 +119,8 @@ def transform_point(self, point: Vector, source_frame: str, target_frame: str =
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException) as e:
logger.error(f"Transform from {source_frame} to {target_frame} failed: {e}")
return None



def transform_path(self, path: Path, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
"""Transform a path from source_frame to target_frame.
Expand All @@ -138,3 +140,47 @@ def transform_path(self, path: Path, source_frame: str, target_frame: str = "map
if transformed_point is not None:
transformed_path.append(transformed_point)
return transformed_path

def transform_rot(self, rotation: Vector, source_frame: str, target_frame: str = "map", timeout: float = 1.0):
"""Transform a rotation from source_frame to target_frame.

Args:
rotation: The rotation to transform as Euler angles (x, y, z) in radians
source_frame: The source frame of the rotation
target_frame: The target frame to transform to
timeout: Time to wait for the transform to become available (seconds)

Returns:
The transformed rotation as a Vector of Euler angles (x, y, z), or None if the transform failed
"""
try:
# Wait for transform to become available
self.tf_buffer.can_transform(
target_frame, source_frame, rclpy.time.Time(), rclpy.duration.Duration(seconds=timeout)
)

# Create a rotation matrix from the input Euler angles
input_rotation = R.from_euler('xyz', rotation, degrees=False)

# Get the transform from source to target frame
transform = self.transform(source_frame, target_frame, timeout)
if transform is None:
return None

# Extract the rotation from the transform
q = transform.transform.rotation
transform_rotation = R.from_quat([q.x, q.y, q.z, q.w])

# Compose the rotations
# The resulting rotation is the composition of the transform rotation and input rotation
result_rotation = transform_rotation * input_rotation

# Convert back to Euler angles
euler_angles = result_rotation.as_euler('xyz', degrees=False)

# Return as Vector type
return Vector(euler_angles)

except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException) as e:
logger.error(f"Transform rotation from {source_frame} to {target_frame} failed: {e}")
return None
29 changes: 29 additions & 0 deletions dimos/utils/ros_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def visualize_local_planner_state(
robot_length: float = 0.7,
map_size_meters: float = 10.0,
goal_xy: Optional[Tuple[float, float]] = None,
goal_theta: Optional[float] = None,
histogram: Optional[np.ndarray] = None,
selected_direction: Optional[float] = None,
waypoints: Optional['Path'] = None,
Expand All @@ -95,6 +96,7 @@ def visualize_local_planner_state(
robot_length: Length of the robot in meters
map_size_meters: Size of the map to visualize in meters
goal_xy: Optional tuple (x, y) of the goal position in the odom frame
goal_theta: Optional goal orientation in radians (in odom frame)
histogram: Optional numpy array of the VFH histogram
selected_direction: Optional selected direction angle in radians
waypoints: Optional Path object containing waypoints to visualize
Expand Down Expand Up @@ -250,6 +252,33 @@ def visualize_local_planner_state(
cv2.circle(vis_img, (goal_img_x, goal_img_y), 5, (0, 255, 0), -1) # Green circle
cv2.circle(vis_img, (goal_img_x, goal_img_y), 8, (0, 0, 0), 1) # Black outline

# Draw goal orientation
if goal_theta is not None and goal_xy is not None:
# For waypoint mode, only draw orientation at the final waypoint
if waypoints is not None and len(waypoints) > 0:
# Use the final waypoint position
final_waypoint = waypoints[-1]
goal_x, goal_y = final_waypoint[0], final_waypoint[1]
else:
# Use the current goal position
goal_x, goal_y = goal_xy

goal_rel_x_map = goal_x - robot_x
goal_rel_y_map = goal_y - robot_y
goal_img_x = int(center_x + goal_rel_x_map * scale)
goal_img_y = int(center_y - goal_rel_y_map * scale) # Flip y-axis

# Calculate goal orientation vector direction in visualization frame
# goal_theta is already in odom frame, need to adjust for visualization orientation
goal_dir_length = 30 # Length of direction indicator in pixels
goal_dir_end_x = int(goal_img_x + goal_dir_length * math.cos(goal_theta))
goal_dir_end_y = int(goal_img_y - goal_dir_length * math.sin(goal_theta)) # Flip y-axis

# Draw goal orientation arrow
if 0 <= goal_img_x < vis_size and 0 <= goal_img_y < vis_size:
cv2.arrowedLine(vis_img, (goal_img_x, goal_img_y), (goal_dir_end_x, goal_dir_end_y),
(255, 0, 255), 4) # Magenta arrow

# Add scale bar
scale_bar_length_px = int(1.0 * scale)
scale_bar_x = vis_size - scale_bar_length_px - 10
Expand Down