Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dimos/robot/global_planner/algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def astar(
costmap: Costmap,
goal: VectorLike,
start: VectorLike = (0.0, 0.0),
cost_threshold: int = 50,
cost_threshold: int = 60,
allow_diagonal: bool = True,
) -> Optional[Path]:
"""
Expand Down
88 changes: 30 additions & 58 deletions dimos/robot/global_planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,60 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from dataclasses import dataclass
from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import Tuple, Callable

from dimos.robot.robot import Robot
from dimos.types.vector import VectorLike, to_vector
from dimos.types.vector import VectorLike, to_vector, Vector
from dimos.types.path import Path
from dimos.types.costmap import Costmap
from dimos.robot.global_planner.algo import astar
from dimos.utils.logging_config import setup_logger
from nav_msgs import msg
from dimos.web.websocket_vis.types import Visualizable

logger = setup_logger("dimos.robot.unitree.global_planner", level=logging.DEBUG)
logger = setup_logger("dimos.robot.unitree.global_planner")


@dataclass
class Planner(ABC):
robot: Robot
class Planner(Visualizable):
local_nav: Callable[[Path], bool]

@abstractmethod
def plan(self, goal: VectorLike) -> Path: ...

def walk_loop(self, path: Path) -> bool:
"""Navigate through a path of waypoints.

This method now passes the entire path to the local planner at once,
utilizing the waypoint following capabilities.

Args:
path: Path object containing waypoints

Returns:
bool: True if successfully reached the goal, False otherwise
"""
if not path or len(path) == 0:
logger.warning("Cannot follow empty path")
return False

logger.info(f"Following path with {len(path)} waypoints")

# Use the robot's waypoint navigation capability
result = self.robot.navigate_path_local(path)

if not result:
logger.warning("Failed to navigate the path")
return False

logger.info("Successfully reached the goal")
return True

def set_goal(self, goal: VectorLike):
"""Plan and navigate to a goal position.

Args:
goal: Goal position as a vector-like object

Returns:
bool: True if planning and navigation succeeded, False otherwise
"""
goal = to_vector(goal).to_2d()
path = self.plan(goal)
if not path:
logger.warning("No path found to the goal.")
return False

return self.walk_loop(path)
return self.local_nav(path.resample(1.0))


class AstarPlanner(Planner):
def __init__(self, robot: Robot):
super().__init__(robot)
self.costmap = self.robot.ros_control.topic_latest("map", msg.OccupancyGrid)
def __init__(
self,
costmap: Callable[[], Costmap],
base_link: Callable[[], Tuple[Vector, Vector]],
local_nav: Callable[[Vector], bool],
):
super().__init__(local_nav)
self.base_link = base_link
self.costmap = costmap

def start(self):
return self
def plan(self, goal: VectorLike) -> Path:
[pos, rot] = self.base_link()
costmap = self.costmap()
costmap.save_pickle("costmap3.pickle")
smudge = costmap.smudge()

def stop(self):
if hasattr(self, "costmap"):
self.costmap.dispose()
del self.costmap
self.vis("global_costmap", smudge)
self.vis("pos", pos)
self.vis("target", goal)

def plan(self, goal: VectorLike) -> Path:
[pos, rot] = self.robot.ros_control.transform_euler("base_link")
return astar(Costmap.from_msg(self.costmap()).smudge(kernel_size=3), goal, pos)
path = astar(smudge, goal, pos).resample(0.5)

if path:
self.vis("a*", path)
return path
9 changes: 1 addition & 8 deletions dimos/robot/ros_observable_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from nav_msgs import msg
from dimos.utils.logging_config import setup_logger
from dimos.utils.threadpool import get_scheduler
from dimos.types.costmap import Costmap

from rclpy.qos import (
QoSProfile,
Expand Down Expand Up @@ -57,14 +58,6 @@ def to_profile(self) -> QoSProfile:
raise ValueError(f"Unknown QoS enum value: {self}")


# TODO: should go to some shared file, this is copy pasted from ros_control.py
sensor_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
durability=QoSDurabilityPolicy.VOLATILE,
depth=1,
)

logger = setup_logger("dimos.robot.ros_control.observable_topic")


Expand Down
Loading