Skip to content

Commit e5dd14d

Browse files
committed
added rate limiting and backpressure to pointcloud publishing
1 parent 3b8c818 commit e5dd14d

2 files changed

Lines changed: 51 additions & 11 deletions

File tree

dimos/hardware/sensors/camera/realsense/camera.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import cv2
2121
import numpy as np
2222
import pyrealsense2 as rs
23+
import reactivex as rx
24+
from reactivex.disposable import Disposable
2325
from scipy.spatial.transform import Rotation
2426

2527
from dimos.core import Module, ModuleConfig, Out, rpc
@@ -29,6 +31,7 @@
2931
from dimos.msgs.sensor_msgs import CameraInfo
3032
from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
3133
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2
34+
from dimos.utils.reactive import backpressure
3235

3336
from dimos.robot.foxglove_bridge import FoxgloveBridge
3437

@@ -55,6 +58,8 @@ class RealSenseCameraConfig(ModuleConfig):
5558
serial_number: str | None = None
5659
align_depth_to_color: bool = True
5760
enable_depth: bool = True
61+
enable_pointcloud: bool = False
62+
pointcloud_fps: float = 5.0
5863

5964

6065
class RealSenseCamera(Module[RealSenseCameraConfig]):
@@ -78,6 +83,11 @@ def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
7883
self._depth_camera_info: CameraInfo | None = None
7984
self._depth_scale: float = 0.001
8085
self._color_to_depth_extrinsics: rs.extrinsics | None = None
86+
# Pointcloud generation state
87+
self._latest_color_img: Image | None = None
88+
self._latest_depth_img: Image | None = None
89+
self._pointcloud_lock = threading.Lock()
90+
self._pointcloud_disposable: Disposable | None = None
8191

8292
@property
8393
def _camera_link(self) -> str:
@@ -145,6 +155,13 @@ def start(self) -> str:
145155
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
146156
self._thread.start()
147157

158+
if self.config.enable_pointcloud and self.config.enable_depth:
159+
interval_sec = 1.0 / self.config.pointcloud_fps
160+
self._pointcloud_disposable = backpressure(rx.interval(interval_sec)).subscribe(
161+
on_next=lambda _: self._generate_pointcloud(),
162+
on_error=lambda e: print(f"Pointcloud error: {e}"),
163+
)
164+
148165
return "started"
149166

150167
def _build_camera_info(self) -> None:
@@ -281,15 +298,11 @@ def _capture_loop(self) -> None:
281298
self._depth_camera_info.ts = ts
282299
self.depth_camera_info.publish(self._depth_camera_info)
283300

284-
# Create and publish colored pointcloud
285-
if color_img is not None and depth_img is not None and self._color_camera_info:
286-
pcd = PointCloud2.from_rgbd(
287-
color_image=color_img,
288-
depth_image=depth_img,
289-
camera_info=self._color_camera_info,
290-
depth_scale=self._depth_scale,
291-
)
292-
self.pointcloud.publish(pcd)
301+
# Store latest images for pointcloud generation
302+
if self.config.enable_pointcloud and color_img is not None and depth_img is not None:
303+
with self._pointcloud_lock:
304+
self._latest_color_img = color_img
305+
self._latest_depth_img = depth_img
293306

294307
# Publish TF
295308
self._publish_tf(ts)
@@ -353,10 +366,35 @@ def _publish_tf(self, ts: float) -> None:
353366

354367
self.tf.publish(*transforms)
355368

369+
def _generate_pointcloud(self) -> None:
370+
"""Generate and publish pointcloud from latest images (called by rx.interval)."""
371+
with self._pointcloud_lock:
372+
color_img = self._latest_color_img
373+
depth_img = self._latest_depth_img
374+
375+
if color_img is None or depth_img is None or self._color_camera_info is None:
376+
return
377+
378+
try:
379+
pcd = PointCloud2.from_rgbd(
380+
color_image=color_img,
381+
depth_image=depth_img,
382+
camera_info=self._color_camera_info,
383+
depth_scale=self._depth_scale,
384+
)
385+
self.pointcloud.publish(pcd)
386+
except Exception as e:
387+
print(f"Pointcloud generation error: {e}")
388+
356389
@rpc
357390
def stop(self) -> None:
358391
self._running = False
359392

393+
# Stop pointcloud generation
394+
if self._pointcloud_disposable:
395+
self._pointcloud_disposable.dispose()
396+
self._pointcloud_disposable = None
397+
360398
# Stop pipeline first to unblock wait_for_frames()
361399
if self._pipeline:
362400
try:
@@ -375,6 +413,8 @@ def stop(self) -> None:
375413
self._profile = None
376414
self._align = None
377415
self._color_to_depth_extrinsics = None
416+
self._latest_color_img = None
417+
self._latest_depth_img = None
378418
super().stop()
379419

380420
@rpc
@@ -394,7 +434,7 @@ def main() -> None:
394434
dimos = ModuleCoordinator(n=2)
395435
dimos.start()
396436

397-
camera = dimos.deploy(RealSenseCamera)
437+
camera = dimos.deploy(RealSenseCamera, enable_pointcloud=True, pointcloud_fps=5.0)
398438
foxglove_bridge = FoxgloveBridge()
399439
foxglove_bridge.start()
400440

dimos/msgs/sensor_msgs/PointCloud2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def from_rgbd(
171171
depth_image: Image,
172172
camera_info: CameraInfo,
173173
depth_scale: float = 1.0,
174-
depth_trunc: float = 20.0,
174+
depth_trunc: float = 5.0,
175175
) -> PointCloud2:
176176
"""Create PointCloud2 from RGB and depth Image messages.
177177

0 commit comments

Comments
 (0)