Skip to content

Commit 1d6aa22

Browse files
alexlin2leshypaul-nechifor
authored
Less bandwidth usage on LCM, bug fixed with navigation (#559)
* timestamped find_closest takes tolerance * initial implementation of frame alignment mechanism * timestamped alignment finished * tests passing * rpc stress test * work on correct event wait * correct faster RPC replies & timeout errors * merge bugfix * typo * Fix event loop leak (#547) * fix: event loop leak * CI code cleanup --------- Co-authored-by: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> * spatial memory and tracker bug fix * make depth_colorized optional * various bug fix and added depth16 * added timestamp alignment to object tracker, which is currently the only thing that need aligned frames * added nav_only cpu_only run file * passing tests --------- Co-authored-by: lesh <lesh@sysphere.org> Co-authored-by: Paul Nechifor <paul@nechifor.net> Co-authored-by: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> Former-commit-id: 85967f9
1 parent 9ba3e45 commit 1d6aa22

18 files changed

Lines changed: 1129 additions & 145 deletions

File tree

dimos/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __getattr__(self, name: str):
5454

5555
if name in self.rpcs:
5656
return lambda *args, **kwargs: self.rpc.call_sync(
57-
f"{self.remote_name}/{name}", (args, kwargs), timeout=2.0
57+
f"{self.remote_name}/{name}", (args, kwargs)
5858
)
5959

6060
# return super().__getattr__(name)

dimos/core/test_rpcstress.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# Copyright 2025 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import threading
16+
import time
17+
18+
from dimos.core import In, Module, Out, rpc
19+
20+
21+
class Counter(Module):
22+
current_count: int = 0
23+
24+
count_stream: Out[int] = None
25+
26+
def __init__(self):
27+
super().__init__()
28+
self.current_count = 0
29+
30+
@rpc
31+
def increment(self):
32+
"""Increment the counter and publish the new value."""
33+
self.current_count += 1
34+
self.count_stream.publish(self.current_count)
35+
return self.current_count
36+
37+
38+
class CounterValidator(Module):
39+
"""Calls counter.increment() as fast as possible and validates no numbers are skipped."""
40+
41+
count_in: In[int] = None
42+
43+
def __init__(self, increment_func):
44+
super().__init__()
45+
self.increment_func = increment_func
46+
self.last_seen = 0
47+
self.missing_numbers = []
48+
self.running = False
49+
self.call_thread = None
50+
self.call_count = 0
51+
self.total_latency = 0.0
52+
self.call_start_time = None
53+
self.waiting_for_response = False
54+
55+
@rpc
56+
def start(self):
57+
"""Start the validator."""
58+
self.count_in.subscribe(self._on_count_received)
59+
self.running = True
60+
self.call_thread = threading.Thread(target=self._call_loop)
61+
self.call_thread.start()
62+
63+
@rpc
64+
def stop(self):
65+
"""Stop the validator."""
66+
self.running = False
67+
if self.call_thread:
68+
self.call_thread.join()
69+
70+
def _on_count_received(self, count: int):
71+
"""Check if we received all numbers in sequence and trigger next call."""
72+
# Calculate round trip time
73+
if self.call_start_time:
74+
latency = time.time() - self.call_start_time
75+
self.total_latency += latency
76+
77+
if count != self.last_seen + 1:
78+
for missing in range(self.last_seen + 1, count):
79+
self.missing_numbers.append(missing)
80+
print(f"[VALIDATOR] Missing number detected: {missing}")
81+
self.last_seen = count
82+
83+
# Signal that we can make the next call
84+
self.waiting_for_response = False
85+
86+
def _call_loop(self):
87+
"""Call increment only after receiving response from previous call."""
88+
while self.running:
89+
if not self.waiting_for_response:
90+
try:
91+
self.waiting_for_response = True
92+
self.call_start_time = time.time()
93+
result = self.increment_func()
94+
call_time = time.time() - self.call_start_time
95+
self.call_count += 1
96+
if self.call_count % 100 == 0:
97+
avg_latency = (
98+
self.total_latency / self.call_count if self.call_count > 0 else 0
99+
)
100+
print(
101+
f"[VALIDATOR] Made {self.call_count} calls, last result: {result}, RPC call time: {call_time * 1000:.2f}ms, avg RTT: {avg_latency * 1000:.2f}ms"
102+
)
103+
except Exception as e:
104+
print(f"[VALIDATOR] Error calling increment: {e}")
105+
self.waiting_for_response = False
106+
time.sleep(0.001) # Small delay on error
107+
else:
108+
# Don't sleep - busy wait for maximum speed
109+
pass
110+
111+
@rpc
112+
def get_stats(self):
113+
"""Get validation statistics."""
114+
avg_latency = self.total_latency / self.call_count if self.call_count > 0 else 0
115+
return {
116+
"call_count": self.call_count,
117+
"last_seen": self.last_seen,
118+
"missing_count": len(self.missing_numbers),
119+
"missing_numbers": self.missing_numbers[:10] if self.missing_numbers else [],
120+
"avg_rtt_ms": avg_latency * 1000,
121+
"calls_per_sec": self.call_count / 10.0 if self.call_count > 0 else 0,
122+
}
123+
124+
125+
if __name__ == "__main__":
126+
import dimos.core as core
127+
from dimos.core import pLCMTransport
128+
129+
# Start dimos with 2 workers
130+
client = core.start(2)
131+
132+
# Deploy counter module
133+
counter = client.deploy(Counter)
134+
counter.count_stream.transport = pLCMTransport("/counter_stream")
135+
136+
# Deploy validator module with increment function
137+
validator = client.deploy(CounterValidator, counter.increment)
138+
validator.count_in.transport = pLCMTransport("/counter_stream")
139+
140+
# Connect validator to counter's output
141+
validator.count_in.connect(counter.count_stream)
142+
143+
# Start modules
144+
validator.start()
145+
146+
print("[MAIN] Counter and validator started. Running for 10 seconds...")
147+
148+
# Test direct RPC speed for comparison
149+
print("\n[MAIN] Testing direct RPC call speed for 1 second...")
150+
start = time.time()
151+
direct_count = 0
152+
while time.time() - start < 1.0:
153+
counter.increment()
154+
direct_count += 1
155+
print(f"[MAIN] Direct RPC calls per second: {direct_count}")
156+
157+
# Run for 10 seconds
158+
time.sleep(10)
159+
160+
# Get stats before stopping
161+
stats = validator.get_stats()
162+
print(f"\n[MAIN] Final statistics:")
163+
print(f" - Total calls made: {stats['call_count']}")
164+
print(f" - Last number seen: {stats['last_seen']}")
165+
print(f" - Missing numbers: {stats['missing_count']}")
166+
print(f" - Average RTT: {stats['avg_rtt_ms']:.2f}ms")
167+
print(f" - Calls per second: {stats['calls_per_sec']:.1f}")
168+
if stats["missing_numbers"]:
169+
print(f" - First missing numbers: {stats['missing_numbers']}")
170+
171+
# Stop modules
172+
validator.stop()
173+
174+
# Shutdown dimos
175+
client.shutdown()
176+
177+
print("[MAIN] Test complete.")

dimos/msgs/sensor_msgs/Image.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class ImageFormat(Enum):
3737
GRAY = "GRAY" # 8-bit Grayscale
3838
GRAY16 = "GRAY16" # 16-bit Grayscale
3939
DEPTH = "DEPTH" # 32-bit Float Depth
40+
DEPTH16 = "DEPTH16" # 16-bit Integer Depth (millimeters)
4041

4142

4243
@dataclass
@@ -169,6 +170,8 @@ def to_opencv(self) -> np.ndarray:
169170
return self.data
170171
elif self.format == ImageFormat.DEPTH:
171172
return self.data # Depth images are already in the correct format
173+
elif self.format == ImageFormat.DEPTH16:
174+
return self.data # 16-bit depth images are already in the correct format
172175
else:
173176
raise ValueError(f"Unsupported format conversion: {self.format}")
174177

@@ -373,6 +376,11 @@ def _get_lcm_encoding(self) -> str:
373376
return "32FC1"
374377
elif self.dtype == np.float64:
375378
return "64FC1"
379+
elif self.format == ImageFormat.DEPTH16:
380+
if self.dtype == np.uint16:
381+
return "16UC1" # 16-bit unsigned depth
382+
elif self.dtype == np.int16:
383+
return "16SC1" # 16-bit signed depth
376384

377385
raise ValueError(
378386
f"Cannot determine LCM encoding for format={self.format}, dtype={self.dtype}"
@@ -393,6 +401,9 @@ def _parse_encoding(encoding: str) -> dict:
393401
"32FC1": {"format": ImageFormat.DEPTH, "dtype": np.float32, "channels": 1},
394402
"32FC3": {"format": ImageFormat.RGB, "dtype": np.float32, "channels": 3},
395403
"64FC1": {"format": ImageFormat.DEPTH, "dtype": np.float64, "channels": 1},
404+
# 16-bit depth encodings
405+
"16UC1": {"format": ImageFormat.DEPTH16, "dtype": np.uint16, "channels": 1},
406+
"16SC1": {"format": ImageFormat.DEPTH16, "dtype": np.int16, "channels": 1},
396407
}
397408

398409
if encoding not in encoding_map:

dimos/navigation/bt_navigator/navigator.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,28 @@ def _transform_goal_to_odom_frame(self, goal: PoseStamped) -> Optional[PoseStamp
218218
return goal
219219

220220
try:
221-
transform = self.tf.get(
222-
parent_frame=odom_frame,
223-
child_frame=goal.frame_id,
224-
time_point=goal.ts,
225-
time_tolerance=1.0,
226-
)
221+
transform = None
222+
max_retries = 3
223+
224+
for attempt in range(max_retries):
225+
transform = self.tf.get(
226+
parent_frame=odom_frame,
227+
child_frame=goal.frame_id,
228+
)
229+
230+
if transform:
231+
break
227232

228-
if not transform:
229-
logger.error(f"Could not find transform from '{goal.frame_id}' to '{odom_frame}'")
230-
return None
233+
if attempt < max_retries - 1:
234+
logger.warning(
235+
f"Transform attempt {attempt + 1}/{max_retries} failed, retrying..."
236+
)
237+
time.sleep(1.0)
238+
else:
239+
logger.error(
240+
f"Could not find transform from '{goal.frame_id}' to '{odom_frame}' after {max_retries} attempts"
241+
)
242+
return None
231243

232244
pose = apply_transform(goal, transform)
233245
transformed_goal = PoseStamped(

dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def __init__(
107107
lookahead_distance: float = 5.0,
108108
max_explored_distance: float = 10.0,
109109
info_gain_threshold: float = 0.03,
110-
num_no_gain_attempts: int = 4,
110+
num_no_gain_attempts: int = 2,
111111
goal_timeout: float = 15.0,
112112
**kwargs,
113113
):
@@ -639,7 +639,8 @@ def get_exploration_goal(
639639
logger.info(
640640
f"No information gain for {self.no_gain_counter} consecutive attempts"
641641
)
642-
self.reset_exploration_session()
642+
self.no_gain_counter = 0 # Reset counter when stopping due to no gain
643+
self.stop_exploration()
643644
return None
644645
else:
645646
self.no_gain_counter = 0
@@ -724,6 +725,7 @@ def stop_exploration(self) -> bool:
724725
return False
725726

726727
self.exploration_active = False
728+
self.no_gain_counter = 0 # Reset counter when exploration stops
727729
self.stop_event.set()
728730

729731
if self.exploration_thread and self.exploration_thread.is_alive():

dimos/navigation/global_planner/planner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def plan(self, goal: Pose) -> Optional[Path]:
204204

205205
# Get current position from odometry
206206
robot_pos = self.latest_odom.position
207-
costmap = self.latest_costmap.inflate(0.1).gradient(max_distance=1.0)
207+
costmap = self.latest_costmap.inflate(0.2).gradient(max_distance=1.5)
208208

209209
# Run A* planning
210210
path = astar(costmap, goal.position, robot_pos)

0 commit comments

Comments
 (0)