diff --git a/Cargo.lock b/Cargo.lock index 39cb4c7b3834..07a8c9de44a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2330,6 +2330,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "data-url" version = "0.3.1" @@ -3110,15 +3116,17 @@ dependencies = [ [[package]] name = "dimos-viewer" -version = "0.30.0-alpha.5" +version = "0.30.1" dependencies = [ "bincode", "clap", + "futures-util", "mimalloc", - "parking_lot", "rerun", "serde", + "serde_json", "tokio", + "tokio-tungstenite", ] [[package]] @@ -12596,6 +12604,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -13013,6 +13033,23 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78122066b0cb818b8afd08f7ed22f7fdbc3e90815035726f0840d0d26c0747a" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.17", + "utf-8", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -13192,6 +13229,12 @@ dependencies = [ "xmlwriter", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8-ranges" version = "1.0.5" diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index 589c3ea49159..9bc7551fbc1a 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -196,6 +196,17 @@ When persisted, the state will be stored at the following locations: #[clap(long)] pub follow: bool, + /// WebSocket server URL for publishing viewer interaction events + /// (clicks, keyboard teleop commands). + /// + /// The viewer connects as a WebSocket client to this URL and sends + /// JSON events. Used by DimOS for click-to-navigate and WASD teleop. + /// + /// Also configurable via the DIMOS_VIEWER_WS_URL environment variable. + /// CLI flag takes priority over the environment variable. + #[clap(long, default_value = "ws://127.0.0.1:3030/ws")] + pub ws_url: String, + /// The number of compute threads to use. /// /// If zero, the same number of threads as the number of cores will be used. diff --git a/dimos/Cargo.toml b/dimos/Cargo.toml index eb8d1f4951dc..5b4e9217f5bb 100644 --- a/dimos/Cargo.toml +++ b/dimos/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dimos-viewer" -version = "0.30.0-alpha.6" +version = "0.30.1" edition = "2024" rust-version = "1.92" license = "MIT OR Apache-2.0" @@ -26,11 +26,12 @@ rerun = { path = "../crates/top/rerun", default-features = false, features = [ "server", ] } -clap = { workspace = true, features = ["derive"] } bincode.workspace = true +clap.workspace = true +futures-util.workspace = true mimalloc.workspace = true -parking_lot.workspace = true serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true tokio = { workspace = true, features = [ "io-util", "macros", @@ -40,3 +41,7 @@ tokio = { workspace = true, features = [ "sync", "time", ] } +tokio-tungstenite = "0.28.0" + +[lints] +workspace = true diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 1e7d51f3c71f..abb5cf28c4a4 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -1,18 +1,14 @@ //! Keyboard handler for WASD movement controls that publish Twist messages. -//! +//! //! Converts keyboard input to robot velocity commands following teleop conventions: //! - WASD/arrows for linear/angular motion //! - QE for strafing //! - Space for emergency stop //! - Shift for speed multiplier -use std::io; -use super::lcm::{LcmPublisher, twist_command}; +use super::ws::WsPublisher; use rerun::external::{egui, re_log}; -/// LCM channel for twist commands (matches DimOS convention) -const CMD_VEL_CHANNEL: &str = "/cmd_vel#geometry_msgs.Twist"; - /// Base speeds for keyboard control const BASE_LINEAR_SPEED: f64 = 0.5; // m/s const BASE_ANGULAR_SPEED: f64 = 0.8; // rad/s @@ -64,10 +60,10 @@ impl KeyState { } } -/// Handles keyboard input and publishes Twist via LCM. +/// Handles keyboard input and publishes Twist via WebSocket. /// Must be activated by clicking the overlay before keys are captured. pub struct KeyboardHandler { - publisher: LcmPublisher, + ws: WsPublisher, state: KeyState, was_active: bool, estop_flash: bool, // true briefly after space pressed @@ -75,16 +71,15 @@ pub struct KeyboardHandler { } impl KeyboardHandler { - /// Create a new keyboard handler with LCM publisher on CMD_VEL_CHANNEL. - pub fn new() -> Result { - let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; - Ok(Self { - publisher, + /// Create a new keyboard handler that publishes twist commands via WebSocket. + pub fn new(ws: WsPublisher) -> Self { + Self { + ws, state: KeyState::new(), was_active: false, estop_flash: false, engaged: false, - }) + } } /// Process keyboard input from egui and publish Twist if keys are held. @@ -99,7 +94,7 @@ impl KeyboardHandler { if !self.engaged { if self.was_active { if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {e:?}"); + re_log::warn!("Failed to send stop on disengage: {e}"); } self.was_active = false; } @@ -113,7 +108,7 @@ impl KeyboardHandler { if ctx.input(|i| i.key_pressed(egui::Key::Space)) { self.state.reset(); if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send emergency stop: {e:?}"); + re_log::warn!("Failed to send emergency stop: {e}"); } self.was_active = false; self.estop_flash = true; @@ -123,12 +118,12 @@ impl KeyboardHandler { // Publish twist command if keys are active, or stop if just released if self.state.any_active() { if let Err(e) = self.publish_twist() { - re_log::warn!("Failed to publish twist command: {e:?}"); + re_log::warn!("Failed to publish twist command: {e}"); } self.was_active = true; } else if self.was_active { if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on key release: {e:?}"); + re_log::warn!("Failed to send stop on key release: {e}"); } self.was_active = false; } @@ -188,8 +183,8 @@ impl KeyboardHandler { self.engaged = !self.engaged; if !self.engaged { // Send stop when disengaging - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on disengage: {err}"); } self.state.reset(); self.was_active = false; @@ -204,8 +199,8 @@ impl KeyboardHandler { && ctx.input(|i| i.pointer.primary_clicked()) { self.engaged = false; - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on outside click: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on outside click: {err}"); } self.state.reset(); self.was_active = false; @@ -308,30 +303,26 @@ impl KeyboardHandler { }); } - /// Convert current KeyState to Twist and publish via LCM. - fn publish_twist(&mut self) -> io::Result<()> { + /// Convert current KeyState to Twist and publish via WebSocket. + fn publish_twist(&mut self) -> Result<(), super::ws::SendError> { let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = self.compute_twist(); + self.ws.send_twist(lin_x, lin_y, lin_z, ang_x, ang_y, ang_z)?; - let cmd = twist_command( - [lin_x, lin_y, lin_z], - [ang_x, ang_y, ang_z], - ); - - self.publisher.publish_twist(&cmd)?; - - re_log::trace!( - "Published twist: lin=({:.2},{:.2},{:.2}) ang=({:.2},{:.2},{:.2})", - lin_x, lin_y, lin_z, ang_x, ang_y, ang_z - ); - + if std::env::var("DIMOS_DEBUG").is_ok_and(|v| v == "1") { + eprintln!( + "[DIMOS_DEBUG] Published twist: lin=({:.2},{:.2},{:.2}) ang=({:.2},{:.2},{:.2})", + lin_x, lin_y, lin_z, ang_x, ang_y, ang_z + ); + } Ok(()) } - /// Publish all-zero twist (stop command) - fn publish_stop(&mut self) -> io::Result<()> { - let cmd = twist_command([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); - self.publisher.publish_twist(&cmd)?; - re_log::debug!("Published stop command"); + /// Publish all-zero twist (stop command) via WebSocket. + fn publish_stop(&mut self) -> Result<(), super::ws::SendError> { + self.ws.send_stop()?; + if std::env::var("DIMOS_DEBUG").is_ok_and(|v| v == "1") { + eprintln!("[DIMOS_DEBUG] Published stop command"); + } Ok(()) } @@ -382,6 +373,23 @@ impl std::fmt::Debug for KeyboardHandler { mod tests { use super::*; + /// Create a dummy WsPublisher for tests (connects to a non-existent server, + /// which is fine — we only test compute_twist, never actually send). + fn test_ws() -> WsPublisher { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { WsPublisher::connect("ws://127.0.0.1:1/test".to_string()) }) + } + + fn handler_with(state: KeyState) -> KeyboardHandler { + KeyboardHandler { + ws: test_ws(), + state, + was_active: false, + estop_flash: false, + engaged: true, + } + } + #[test] fn test_key_state_any_active() { let mut state = KeyState::new(); @@ -401,13 +409,7 @@ mod tests { fn test_wasd_to_twist_mapping() { let mut state = KeyState::new(); state.forward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -418,13 +420,7 @@ mod tests { fn test_turn_left_right_mapping() { let mut state = KeyState::new(); state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -432,13 +428,7 @@ mod tests { let mut state = KeyState::new(); state.right = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -449,13 +439,7 @@ mod tests { fn test_strafe_mapping() { let mut state = KeyState::new(); state.strafe_l = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, BASE_LINEAR_SPEED); @@ -463,13 +447,7 @@ mod tests { let mut state = KeyState::new(); state.strafe_r = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, -BASE_LINEAR_SPEED); @@ -481,13 +459,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.fast = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED * FAST_MULTIPLIER); assert_eq!(lin_y, 0.0); @@ -499,13 +471,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -528,9 +494,7 @@ mod tests { #[test] fn test_keyboard_handler_creation() { - let handler = KeyboardHandler::new(); - assert!(handler.is_ok()); - let handler = handler.unwrap(); + let handler = KeyboardHandler::new(test_ws()); assert!(!handler.was_active); assert!(!handler.engaged); assert!(!handler.state.any_active()); @@ -541,13 +505,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.backward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -556,13 +514,7 @@ mod tests { #[test] fn test_compute_twist_all_zeros() { - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state: KeyState::new(), - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(KeyState::new()); let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); diff --git a/dimos/src/interaction/lcm.rs b/dimos/src/interaction/lcm.rs deleted file mode 100644 index 6b6b4146a8f5..000000000000 --- a/dimos/src/interaction/lcm.rs +++ /dev/null @@ -1,464 +0,0 @@ -//! LCM (Lightweight Communications and Marshalling) publisher for click events and twist commands. -//! -//! Publishes `geometry_msgs/PointStamped` and `geometry_msgs/Twist` messages over UDP multicast, -//! following the same convention as RViz's `/clicked_point` and `/cmd_vel` topics. -//! -//! ## LCM Wire Protocol (short message) -//! ```text -//! [4B magic "LC02"] [4B seqno] [channel\0] [LCM-encoded payload] -//! ``` -//! -//! ## PointStamped Binary Layout -//! ```text -//! [8B fingerprint hash] [Header (no hash)] [Point (no hash)] -//! -//! Header: -//! [4B seq: i32] [4B stamp.sec: i32] [4B stamp.nsec: i32] -//! [4B frame_id_len: i32 (including null)] [frame_id bytes] [null] -//! -//! Point: -//! [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` -//! -//! ## Twist Binary Layout -//! ```text -//! [8B fingerprint hash] [Twist (no hash)] -//! -//! Twist: -//! Vector3 linear: [8B x: f64] [8B y: f64] [8B z: f64] -//! Vector3 angular: [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` - -use std::net::UdpSocket; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::SystemTime; - -/// LCM multicast address and port (default LCM configuration). -const LCM_MULTICAST_ADDR: &str = "239.255.76.67:7667"; - -/// LCM short message magic number: "LC02" in ASCII. -const LCM_MAGIC_SHORT: u32 = 0x4c433032; - -/// Pre-computed fingerprint hash for `geometry_msgs/PointStamped`. -/// -/// Computed from the recursive hash chain: -/// - Time: base=0xde1d24a3a8ecb648 -> rot -> 0xbc3a494751d96c91 -/// - Header: base=0xdbb33f5b4c19b8ea + Time -> rot -> 0x2fdb11453be64af7 -/// - Point: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - PointStamped: base=0xf012413a2c8028c2 + Header + Point -> rot -> 0x9cd764738ea629af -const POINT_STAMPED_HASH: u64 = 0x9cd764738ea629af; - -/// Pre-computed fingerprint hash for `geometry_msgs/Twist`. -/// -/// Computed from the recursive hash chain: -/// - Vector3: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - Twist: base=0x3a4144772922add7 + Vector3 + Vector3 -> rot -> 0x2e7c07d7cdf7e027 -const TWIST_HASH: u64 = 0x2e7c07d7cdf7e027; - -/// A click event with world-space coordinates and entity info. -#[derive(Debug, Clone)] -pub struct ClickEvent { - pub x: f64, - pub y: f64, - pub z: f64, - /// Rerun entity path (stored in frame_id per our convention). - pub entity_path: String, - /// Unix timestamp in seconds. - pub timestamp_sec: i32, - /// Nanosecond remainder. - pub timestamp_nsec: i32, -} - -/// A velocity command (maps to geometry_msgs/Twist). -#[derive(Debug, Clone)] -pub struct TwistCommand { - pub linear_x: f64, // forward/backward - pub linear_y: f64, // strafe left/right - pub linear_z: f64, // up/down (unused for ground robots) - pub angular_x: f64, // roll (unused) - pub angular_y: f64, // pitch (unused) - pub angular_z: f64, // yaw left/right -} - -/// Encodes a `PointStamped` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian i64) -/// - Header (no hash): seq(i32) + stamp.sec(i32) + stamp.nsec(i32) + frame_id(len-prefixed string) -/// - Point (no hash): x(f64) + y(f64) + z(f64) -pub fn encode_point_stamped(event: &ClickEvent) -> Vec { - let frame_id_bytes = event.entity_path.as_bytes(); - // LCM string encoding: i32 length (including null terminator) + bytes + null - let string_len = (frame_id_bytes.len() + 1) as i32; - - // Calculate total size: - // 8 (hash) + 4 (seq) + 4 (sec) + 4 (nsec) + 4 (string_len) + frame_id_bytes + 1 (null) + 24 (3 doubles) - let total_size = 8 + 4 + 4 + 4 + 4 + frame_id_bytes.len() + 1 + 24; - let mut buf = Vec::with_capacity(total_size); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&POINT_STAMPED_HASH.to_be_bytes()); - - // Header._encodeNoHash: - // seq (i32, big-endian) -- always 0 for click events - buf.extend_from_slice(&0i32.to_be_bytes()); - // stamp.sec (i32) - buf.extend_from_slice(&event.timestamp_sec.to_be_bytes()); - // stamp.nsec (i32) - buf.extend_from_slice(&event.timestamp_nsec.to_be_bytes()); - // frame_id: string = i32 length (incl null) + bytes + null - buf.extend_from_slice(&string_len.to_be_bytes()); - buf.extend_from_slice(frame_id_bytes); - buf.push(0); // null terminator - - // Point._encodeNoHash: - buf.extend_from_slice(&event.x.to_be_bytes()); - buf.extend_from_slice(&event.y.to_be_bytes()); - buf.extend_from_slice(&event.z.to_be_bytes()); - - buf -} - -/// Encodes a `Twist` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian u64) -/// - Twist (no hash): linear(Vector3: x,y,z f64) + angular(Vector3: x,y,z f64) -pub fn encode_twist(cmd: &TwistCommand) -> Vec { - // 8 (hash) + 48 (6 doubles) = 56 bytes - let mut buf = Vec::with_capacity(56); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&TWIST_HASH.to_be_bytes()); - - // Twist._encodeNoHash: - // Vector3 linear: - buf.extend_from_slice(&cmd.linear_x.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_y.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_z.to_be_bytes()); - // Vector3 angular: - buf.extend_from_slice(&cmd.angular_x.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_y.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_z.to_be_bytes()); - - buf -} - -/// Builds a complete LCM UDP packet (short message format). -/// -/// Format: `[4B magic] [4B seqno] [channel\0] [payload]` -pub fn build_lcm_packet(channel: &str, payload: &[u8], seq: u32) -> Vec { - let channel_bytes = channel.as_bytes(); - let total = 4 + 4 + channel_bytes.len() + 1 + payload.len(); - let mut pkt = Vec::with_capacity(total); - - pkt.extend_from_slice(&LCM_MAGIC_SHORT.to_be_bytes()); - pkt.extend_from_slice(&seq.to_be_bytes()); - pkt.extend_from_slice(channel_bytes); - pkt.push(0); // null terminator - pkt.extend_from_slice(payload); - - pkt -} - -/// LCM publisher that sends PointStamped and Twist messages via UDP multicast. -pub struct LcmPublisher { - socket: UdpSocket, - seq: AtomicU32, - channel: String, -} - -impl LcmPublisher { - /// Create a new LCM publisher. - /// - /// `channel` is the LCM channel name, e.g. - /// `"/clicked_point#geometry_msgs.PointStamped"` or - /// `"/cmd_vel#geometry_msgs.Twist"`. - pub fn new(channel: String) -> std::io::Result { - let socket = UdpSocket::bind("0.0.0.0:0")?; - // TTL=0 means local machine only; TTL=1 for same subnet - socket.set_multicast_ttl_v4(0)?; - Ok(Self { - socket, - seq: AtomicU32::new(0), - channel, - }) - } - - /// Publish a click event as a PointStamped LCM message. - pub fn publish(&self, event: &ClickEvent) -> std::io::Result { - let payload = encode_point_stamped(event); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } - - /// Publish a twist command as a Twist LCM message. - pub fn publish_twist(&self, cmd: &TwistCommand) -> std::io::Result { - let payload = encode_twist(cmd); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } -} - -impl std::fmt::Debug for LcmPublisher { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LcmPublisher") - .field("channel", &self.channel) - .field("seq", &self.seq.load(Ordering::Relaxed)) - .finish() - } -} - -/// Create a `ClickEvent` from position, entity path, and a millisecond timestamp. -pub fn click_event_from_ms( - position: [f32; 3], - entity_path: &str, - timestamp_ms: u64, -) -> ClickEvent { - let total_secs = (timestamp_ms / 1000) as i32; - let nanos = ((timestamp_ms % 1000) * 1_000_000) as i32; - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: total_secs, - timestamp_nsec: nanos, - } -} - -/// Create a `ClickEvent` from position and entity path, using the current time. -pub fn click_event_now(position: [f32; 3], entity_path: &str) -> ClickEvent { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default(); - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: now.as_secs() as i32, - timestamp_nsec: now.subsec_nanos() as i32, - } -} - -/// Create a `TwistCommand` from velocity values. -pub fn twist_command( - linear: [f64; 3], - angular: [f64; 3], -) -> TwistCommand { - TwistCommand { - linear_x: linear[0], - linear_y: linear[1], - linear_z: linear[2], - angular_x: angular[0], - angular_y: angular[1], - angular_z: angular[2], - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_point_stamped_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let time_hash = rot(0xde1d24a3a8ecb648); - let header_hash = rot(0xdbb33f5b4c19b8ea_u64.wrapping_add(time_hash)); - let point_hash = rot(0x573f2fdd2f76508f); - let ps_hash = - rot(0xf012413a2c8028c2_u64 - .wrapping_add(header_hash) - .wrapping_add(point_hash)); - assert_eq!(ps_hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_twist_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let vector3_hash = rot(0x573f2fdd2f76508f); - let twist_hash = rot(0x3a4144772922add7_u64 - .wrapping_add(vector3_hash) - .wrapping_add(vector3_hash)); - assert_eq!(twist_hash, TWIST_HASH); - } - - #[test] - fn test_encode_twist_matches_python() { - // Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.3)) - let cmd = TwistCommand { - linear_x: 0.5, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.3, - }; - - let encoded = encode_twist(&cmd); - - let expected_hex = "2e7c07d7cdf7e0273fe000000000000000000000000000000000000000000000000000000000000000000000000000003fd3333333333333"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - assert_eq!(encoded.len(), 56, "Encoded length must be 56 bytes"); - } - - #[test] - fn test_encode_twist_zero() { - let cmd = TwistCommand { - linear_x: 0.0, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.0, - }; - let encoded = encode_twist(&cmd); - assert_eq!(encoded.len(), 56); - let hash = u64::from_be_bytes(encoded[0..8].try_into().unwrap()); - assert_eq!(hash, TWIST_HASH); - } - - #[test] - fn test_encode_point_stamped_matches_python() { - let event = ClickEvent { - x: 1.5, - y: 2.5, - z: 3.5, - entity_path: "/world/grid".to_string(), - timestamp_sec: 1234, - timestamp_nsec: 5678, - }; - - let encoded = encode_point_stamped(&event); - - let expected_hex = "9cd764738ea629af00000000000004d20000162e0000000c2f776f726c642f67726964003ff80000000000004004000000000000400c000000000000"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - } - - #[test] - fn test_encode_empty_frame_id() { - let event = ClickEvent { - x: 0.0, - y: 0.0, - z: 0.0, - entity_path: String::new(), - timestamp_sec: 0, - timestamp_nsec: 0, - }; - let encoded = encode_point_stamped(&event); - assert_eq!(encoded.len(), 49); - let str_len = i32::from_be_bytes([encoded[20], encoded[21], encoded[22], encoded[23]]); - assert_eq!(str_len, 1); - } - - #[test] - fn test_build_lcm_packet_format() { - let payload = vec![0xAA, 0xBB]; - let channel = "/test"; - let packet = build_lcm_packet(channel, &payload, 42); - assert_eq!(&packet[0..4], &LCM_MAGIC_SHORT.to_be_bytes()); - assert_eq!(&packet[4..8], &42u32.to_be_bytes()); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let channel_bytes = &packet[8..null_pos]; - assert_eq!(channel_bytes, b"/test"); - assert_eq!(&packet[null_pos + 1..], &[0xAA, 0xBB]); - } - - #[test] - fn test_build_lcm_packet_with_typed_channel() { - let payload = vec![0x01]; - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 0); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let extracted_channel = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(extracted_channel, channel); - } - - #[test] - fn test_click_event_from_ms() { - let event = click_event_from_ms([1.0, 2.0, 3.0], "/world", 1234567); - assert_eq!(event.timestamp_sec, 1234); - assert_eq!(event.timestamp_nsec, 567_000_000); - assert_eq!(event.x, 1.0f64); - assert_eq!(event.entity_path, "/world"); - } - - #[test] - fn test_click_event_now() { - let event = click_event_now([0.0, 0.0, 0.0], "/test"); - let now_sec = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i32; - assert!((event.timestamp_sec - now_sec).abs() < 10); - } - - #[test] - fn test_twist_command() { - let cmd = twist_command([1.0, 0.0, 0.0], [0.0, 0.0, 0.5]); - assert_eq!(cmd.linear_x, 1.0); - assert_eq!(cmd.angular_z, 0.5); - } - - #[test] - fn test_lcm_publisher_creation() { - let publisher = LcmPublisher::new("/clicked_point#geometry_msgs.PointStamped".to_string()); - assert!(publisher.is_ok()); - let publisher_twist = LcmPublisher::new("/cmd_vel#geometry_msgs.Twist".to_string()); - assert!(publisher_twist.is_ok()); - } - - #[test] - fn test_full_packet_structure() { - let event = ClickEvent { - x: 1.0, - y: 2.0, - z: 3.0, - entity_path: "/world/robot".to_string(), - timestamp_sec: 100, - timestamp_nsec: 200, - }; - let payload = encode_point_stamped(&event); - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 7); - let magic = u32::from_be_bytes([packet[0], packet[1], packet[2], packet[3]]); - assert_eq!(magic, LCM_MAGIC_SHORT); - let seqno = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]); - assert_eq!(seqno, 7); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let ch = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(ch, channel); - let data_start = null_pos + 1; - let hash_bytes: [u8; 8] = packet[data_start..data_start + 8].try_into().unwrap(); - let hash = u64::from_be_bytes(hash_bytes); - assert_eq!(hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_sequence_number_increments() { - let publisher = - LcmPublisher::new("/test#geometry_msgs.PointStamped".to_string()).unwrap(); - assert_eq!(publisher.seq.load(Ordering::Relaxed), 0); - let seq1 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq1, 0); - let seq2 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq2, 1); - } -} diff --git a/dimos/src/interaction/mod.rs b/dimos/src/interaction/mod.rs index 6fec89890734..76ab58df72d3 100644 --- a/dimos/src/interaction/mod.rs +++ b/dimos/src/interaction/mod.rs @@ -1,9 +1,9 @@ pub mod handle; pub mod keyboard; -pub mod lcm; pub mod protocol; +pub mod ws; pub use handle::InteractionHandle; pub use keyboard::KeyboardHandler; -pub use lcm::{ClickEvent, TwistCommand, LcmPublisher, click_event_from_ms, click_event_now, twist_command}; pub use protocol::ViewerEvent; +pub use ws::{SendError, WsPublisher}; diff --git a/dimos/src/interaction/ws.rs b/dimos/src/interaction/ws.rs new file mode 100644 index 000000000000..a05636b5488b --- /dev/null +++ b/dimos/src/interaction/ws.rs @@ -0,0 +1,240 @@ +//! WebSocket client for remote (non-LCM) mode. +//! +//! When `dimos-viewer` is started with `--connect`, LCM multicast is unavailable +//! across machines. This module connects to a WebSocket server (typically the +//! Python `RerunWebSocketServer` module) and sends click, twist, and stop events +//! as JSON. +//! +//! Message format (JSON objects with a `"type"` discriminant): +//! +//! ```json +//! {"type":"click","x":1.0,"y":2.0,"z":3.0,"entity_path":"/world","timestamp_ms":1234567890} +//! {"type":"twist","linear_x":0.5,"linear_y":0.0,"linear_z":0.0,"angular_x":0.0,"angular_y":0.0,"angular_z":0.8} +//! {"type":"stop"} +//! ``` + +use std::time::Duration; + +use rerun::external::re_log; +use serde::Serialize; +use tokio::sync::mpsc; + +/// Error returned when a WebSocket event cannot be sent. +#[derive(Debug)] +pub enum SendError { + /// The send queue is full; the event was dropped. + QueueFull, + /// Failed to serialize the event to JSON. + Serialize(String), +} + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::QueueFull => write!(f, "send queue full, event dropped"), + Self::Serialize(e) => write!(f, "serialization error: {e}"), + } + } +} + +/// JSON message variants sent over the WebSocket. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsEvent { + Click { + x: f64, + y: f64, + z: f64, + entity_path: String, + timestamp_ms: u64, + }, + Twist { + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + }, + Stop, +} + +/// Sends `WsEvent`s (serialized to JSON) to a remote WebSocket server. +/// +/// Maintains a persistent connection with automatic reconnection. The +/// internal sender is `Clone`, so you can hand copies to multiple producers +/// (keyboard handler, click handler, …). +#[derive(Clone)] +pub struct WsPublisher { + tx: mpsc::Sender, +} + +impl WsPublisher { + /// Spawn the WebSocket client task and return a publisher. + /// + /// The client connects to `url` (e.g. `ws://127.0.0.1:3030/ws`) and + /// reconnects automatically whenever the connection drops. + /// + /// This spawns a dedicated background thread with its own tokio runtime, + /// so it works even when called from a non-async context (like the eframe UI). + pub fn connect(url: String) -> Self { + let (tx, rx) = mpsc::channel::(256); + + // Spawn a dedicated thread with its own tokio runtime. + // This allows WsPublisher to work from the eframe UI thread which + // doesn't have a tokio runtime. + std::thread::Builder::new() + .name("ws-publisher".to_string()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create tokio runtime for WsPublisher"); + rt.block_on(run_client(url, rx)); + }) + .expect("failed to spawn WsPublisher thread"); + + Self { tx } + } + + /// Publish a click event. + pub fn send_click(&self, x: f64, y: f64, z: f64, entity_path: &str, timestamp_ms: u64) -> Result<(), SendError> { + let event = WsEvent::Click { + x, + y, + z, + entity_path: entity_path.to_string(), + timestamp_ms, + }; + self.broadcast(event) + } + + /// Publish a twist (velocity) command. + pub fn send_twist( + &self, + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + ) -> Result<(), SendError> { + let event = WsEvent::Twist { + linear_x, + linear_y, + linear_z, + angular_x, + angular_y, + angular_z, + }; + self.broadcast(event) + } + + /// Publish a stop command. + pub fn send_stop(&self) -> Result<(), SendError> { + self.broadcast(WsEvent::Stop) + } + + fn broadcast(&self, event: WsEvent) -> Result<(), SendError> { + let json = serde_json::to_string(&event).map_err(|e| SendError::Serialize(e.to_string()))?; + // Non-blocking: error if the channel is full rather than block the UI thread. + self.tx.try_send(json).map_err(|_| SendError::QueueFull) + } +} + +/// Returns true if `DIMOS_DEBUG` is set to `1`. +fn is_debug() -> bool { + std::env::var("DIMOS_DEBUG").is_ok_and(|v| v == "1") +} + +/// Background task: connect → send → reconnect loop. +async fn run_client(url: String, mut rx: mpsc::Receiver) { + use futures_util::{SinkExt, StreamExt}; + use tokio_tungstenite::{connect_async, tungstenite::Message}; + + let debug = is_debug(); + + loop { + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: connecting to {url}"); + } + + match connect_async(&url).await { + Ok((ws_stream, _)) => { + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: connected to {url}"); + } + + let (mut writer, mut reader) = ws_stream.split(); + + // Read task: consume incoming frames (ping → auto pong) so the + // server's keepalive pings get answered and the connection stays + // alive. Exits when the server closes or an error occurs. + let debug_read = debug; + let mut read_handle = tokio::spawn(async move { + while let Some(frame) = reader.next().await { + match frame { + Ok(Message::Close(_)) => { + if debug_read { + eprintln!("[DIMOS_DEBUG] WsPublisher: server sent close frame"); + } + break; + } + Err(err) => { + if debug_read { + eprintln!("[DIMOS_DEBUG] WsPublisher: read error: {err}"); + } + break; + } + _ => {} // Ping/Pong handled by tungstenite internally + } + } + }); + + // Write loop: drain the channel into the WebSocket. + let disconnected = loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(text) => { + if let Err(err) = writer.send(Message::text(text)).await { + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: send error: {err} — reconnecting"); + } + break false; + } + } + None => break true, // rx closed → task is done + } + } + _ = &mut read_handle => { + // Reader exited → server closed the connection. + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: server closed connection — reconnecting"); + } + break false; + } + } + }; + + if disconnected { + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: channel closed, shutting down"); + } + break; + } + } + Err(err) => { + if debug { + eprintln!("[DIMOS_DEBUG] WsPublisher: connection failed: {err} — retrying in 1s"); + } + } + } + + // Drain any stale commands queued during the disconnect — sending + // outdated velocity commands on reconnect would be dangerous. + while rx.try_recv().is_ok() {} + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 793e33a20d73..efa078ea4d9d 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -1,24 +1,26 @@ -//! DimOS Interactive Viewer — custom Rerun viewer with LCM click-to-navigate and WASD teleop. +//! DimOS Interactive Viewer — custom Rerun viewer with WebSocket click-to-navigate and WASD teleop. //! //! Accepts ALL stock Rerun CLI flags and adds DimOS-specific behavior: -//! - Click-to-navigate: click any entity with a 3D position → PointStamped LCM on /clicked_point -//! - WASD keyboard teleop: click overlay to engage, then WASD publishes Twist on /cmd_vel +//! - Click-to-navigate: click any entity with a 3D position → sends click event via WebSocket +//! - WASD keyboard teleop: click overlay to engage, then WASD publishes Twist via WebSocket use std::rc::Rc; use std::cell::RefCell; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use dimos_viewer::interaction::{KeyboardHandler, LcmPublisher, click_event_from_ms}; -use rerun::external::{eframe, egui, re_memory, re_viewer}; +use dimos_viewer::interaction::{KeyboardHandler, WsPublisher}; +use rerun::external::{eframe, egui, re_log, re_memory, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); -/// LCM channel for click events (follows RViz convention) -const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; +/// Default WebSocket server URL +const DEFAULT_WS_URL: &str = "ws://127.0.0.1:3030/ws"; + /// Minimum time between click events (debouncing) const CLICK_DEBOUNCE_MS: u64 = 100; + /// Maximum rapid clicks before logging a warning const RAPID_CLICK_THRESHOLD: usize = 5; @@ -36,9 +38,13 @@ impl eframe::App for DimosApp { } fn save(&mut self, storage: &mut dyn eframe::Storage) { self.inner.save(storage); } + fn clear_color(&self, visuals: &egui::Visuals) -> [f32; 4] { self.inner.clear_color(visuals) } + fn persist_egui_memory(&self) -> bool { self.inner.persist_egui_memory() } + fn auto_save_interval(&self) -> Duration { self.inner.auto_save_interval() } + fn raw_input_hook(&mut self, ctx: &egui::Context, raw_input: &mut egui::RawInput) { self.inner.raw_input_hook(ctx, raw_input); } @@ -48,8 +54,31 @@ fn main() -> Result<(), Box> { let main_thread_token = re_viewer::MainThreadToken::i_promise_i_am_on_the_main_thread(); let build_info = re_viewer::build_info(); - let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) - .expect("Failed to create LCM publisher"); + // Parse args (including --ws-url) via Rerun's clap Args, without consuming them. + // We peek at the parsed value, then pass the original args to run_with_app_wrapper + // which will parse them again. + let parsed: rerun::RerunArgs = clap::Parser::parse(); + let ws_url = if std::env::var("DIMOS_VIEWER_WS_URL").is_ok() { + // Env var overrides default but not an explicit CLI flag. + // If the parsed value equals the default, check the env var. + if parsed.ws_url == DEFAULT_WS_URL { + std::env::var("DIMOS_VIEWER_WS_URL").unwrap() + } else { + parsed.ws_url.clone() + } + } else { + parsed.ws_url.clone() + }; + + let debug = std::env::var("DIMOS_DEBUG").is_ok_and(|v| v == "1"); + + // Connect WebSocket publisher for click/keyboard events + let ws_publisher = WsPublisher::connect(ws_url.clone()); + if debug { + eprintln!("[DIMOS_DEBUG] WebSocket client target: {ws_url}"); + } + + let keyboard_handler_ws = ws_publisher.clone(); let last_click_time = Rc::new(RefCell::new( Instant::now() - Duration::from_secs(10) @@ -79,7 +108,7 @@ fn main() -> Result<(), Box> { let mut count = rapid_click_count.borrow_mut(); *count += 1; if *count == RAPID_CLICK_THRESHOLD { - rerun::external::re_log::warn!( + re_log::warn!( "Rapid click detected ({RAPID_CLICK_THRESHOLD} clicks within {CLICK_DEBOUNCE_MS}ms)" ); } @@ -89,26 +118,24 @@ fn main() -> Result<(), Box> { } *last_click_time.borrow_mut() = now; - let ts = SystemTime::now() + let timestamp_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; - let click = click_event_from_ms( - [pos.x, pos.y, pos.z], + if let Err(err) = ws_publisher.send_click( + pos.x as f64, + pos.y as f64, + pos.z as f64, &entity_path.to_string(), - ts, - ); - - match lcm_publisher.publish(&click) { - Ok(_) => rerun::external::re_log::debug!( - "Nav goal: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, pos.x, pos.y, pos.z - ), - Err(e) => rerun::external::re_log::error!( - "Failed to publish nav goal: {e:?}" - ), + timestamp_ms, + ) { + re_log::warn!("Failed to send click event: {err}"); } + re_log::debug!( + "Click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, pos.x, pos.y, pos.z + ); } re_viewer::SelectionChangeItem::Entity { position: None, .. } => { no_position_count += 1; @@ -118,7 +145,7 @@ fn main() -> Result<(), Box> { } if !has_position && no_position_count > 0 { - rerun::external::re_log::trace!( + re_log::trace!( "Selection change without position ({no_position_count} items) — normal for hover/keyboard nav." ); } @@ -126,9 +153,19 @@ fn main() -> Result<(), Box> { })), }; + if debug { + if let Some(ref connect) = parsed.connect { + match connect.as_deref() { + Some(url) => eprintln!("[DIMOS_DEBUG] gRPC connecting to: {url}"), + None => eprintln!("[DIMOS_DEBUG] gRPC connecting to default (port {})", parsed.port), + } + } else { + eprintln!("[DIMOS_DEBUG] gRPC: starting local server on port {}", parsed.port); + } + } + let wrapper: rerun::AppWrapper = Box::new(move |app| { - let keyboard = KeyboardHandler::new() - .map_err(|e| -> Box { Box::new(e) })?; + let keyboard = KeyboardHandler::new(keyboard_handler_ws.clone()); Ok(Box::new(DimosApp { inner: app, keyboard })) }); diff --git a/dimos/test_ws_client.ts b/dimos/test_ws_client.ts new file mode 100644 index 000000000000..557245c8f4b5 --- /dev/null +++ b/dimos/test_ws_client.ts @@ -0,0 +1,60 @@ +/** + * Deno WebSocket test server for dimos-viewer event stream. + * + * Listens for incoming WebSocket connections from the viewer and logs + * all received events (click, twist, stop). + * + * Run with: + * deno run --allow-net dimos/test_ws_client.ts + * + * Or with a custom port: + * deno run --allow-net dimos/test_ws_client.ts 3030 + */ + +const port = parseInt(Deno.args[0] ?? "3030", 10); + +Deno.serve({ port }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + + socket.addEventListener("open", () => { + console.log("[connected] dimos-viewer client connected"); + }); + + socket.addEventListener("message", (ev) => { + const ts = new Date().toISOString(); + try { + const msg = JSON.parse(ev.data as string); + if (msg.type === "click") { + console.log( + `[${ts}] click x=${msg.x.toFixed(3)} y=${msg.y.toFixed(3)} z=${msg.z.toFixed(3)} entity="${msg.entity_path}"`, + ); + } else if (msg.type === "twist") { + console.log( + `[${ts}] twist lin=(${msg.linear_x.toFixed(2)}, ${msg.linear_y.toFixed(2)}, ${msg.linear_z.toFixed(2)}) ang=(${msg.angular_x.toFixed(2)}, ${msg.angular_y.toFixed(2)}, ${msg.angular_z.toFixed(2)})`, + ); + } else if (msg.type === "stop") { + console.log(`[${ts}] stop`); + } else { + console.log(`[${ts}] unknown `, msg); + } + } catch { + console.log(`[${ts}] raw:`, ev.data); + } + }); + + socket.addEventListener("close", (ev) => { + console.log(`[disconnected] code=${ev.code} reason=${ev.reason}`); + }); + + socket.addEventListener("error", (ev) => { + console.error(`[error]`, ev); + }); + + return response; + } + return new Response("Not a websocket request", { status: 400 }); +}); + +console.log(`WebSocket test server listening on ws://localhost:${port}/ws`); +console.log(`Start the viewer with: dimos-viewer --ws-url ws://localhost:${port}/ws`); diff --git a/dimos/uv.lock b/dimos/uv.lock new file mode 100644 index 000000000000..f135a581b73b --- /dev/null +++ b/dimos/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 3 +requires-python = ">=3.10" + +[[package]] +name = "dimos-viewer" +version = "0.30.0a4" +source = { editable = "." } diff --git a/docs/websockets.md b/docs/websockets.md new file mode 100644 index 000000000000..ee0c5e7f5edc --- /dev/null +++ b/docs/websockets.md @@ -0,0 +1,151 @@ +# dimos-viewer WebSocket Event Stream + +When `dimos-viewer` is started with `--connect`, LCM multicast is not available +(LCM uses UDP multicast which is limited to the local machine or subnet). Instead, +the viewer starts a WebSocket server that broadcasts click and keyboard events as +JSON to any connected client. + +## Starting the server + +```sh +dimos-viewer --connect [] [--ws-port ] +``` + +| Flag | Default | Description | +|------|---------|-------------| +| `--connect [url]` | — | Enable connect mode. Optionally specify the gRPC proxy URL (defaults to `rerun+http://127.0.0.1:9877/proxy`). | +| `--ws-port ` | `3030` | Port for the WebSocket event server. | + +The WebSocket server listens on: + +``` +ws://0.0.0.0:/ws +``` + +Multiple clients can connect simultaneously. All connected clients receive every +message. The server does not accept any inbound messages from clients. + +## Message format + +All messages are UTF-8 JSON objects with a `"type"` string discriminant. + +### `heartbeat` + +Sent once per second regardless of viewer activity. Useful for connection health +checks and detecting viewer restarts. + +```json +{ + "type": "heartbeat", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds (from the viewer's system clock). | + +### `click` + +Sent when the user clicks a 3D point in the Rerun viewport. Corresponds to the +`/clicked_point` convention from RViz / `geometry_msgs/PointStamped`. + +```json +{ + "type": "click", + "x": 1.234, + "y": -0.567, + "z": 0.000, + "entity_path": "/world/ground_plane", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `x` | `f64` | World-space X coordinate (metres). | +| `y` | `f64` | World-space Y coordinate (metres). | +| `z` | `f64` | World-space Z coordinate (metres). | +| `entity_path` | `string` | Rerun entity path of the clicked object. | +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds at the moment of the click. | + +Click events are debounced: a minimum of 100 ms must elapse between successive +published clicks. Rapid clicks within that window are silently dropped (with a +warning logged after 5 consecutive rapid clicks). + +### `twist` + +Sent every frame (~60 Hz) while movement keys are held on the keyboard teleop +overlay. Corresponds to `geometry_msgs/Twist` / `/cmd_vel` convention. + +The keyboard overlay must first be **engaged** by clicking on it (green border = +active). Clicking anywhere outside the overlay disengages it and sends a `stop`. + +```json +{ + "type": "twist", + "linear_x": 0.5, + "linear_y": 0.0, + "linear_z": 0.0, + "angular_x": 0.0, + "angular_y": 0.0, + "angular_z": 0.8 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `linear_x` | `f64` | Forward (+) / backward (−) velocity in m/s. | +| `linear_y` | `f64` | Strafe left (+) / right (−) velocity in m/s. | +| `linear_z` | `f64` | Vertical velocity in m/s (always 0 for ground robots). | +| `angular_x` | `f64` | Roll rate in rad/s (always 0). | +| `angular_y` | `f64` | Pitch rate in rad/s (always 0). | +| `angular_z` | `f64` | Yaw left (+) / right (−) rate in rad/s. | + +**Key bindings:** + +| Key | Effect | +|-----|--------| +| `W` / `↑` | Forward (`linear_x = +0.5`) | +| `S` / `↓` | Backward (`linear_x = −0.5`) | +| `A` / `←` | Turn left (`angular_z = +0.8`) | +| `D` / `→` | Turn right (`angular_z = −0.8`) | +| `Q` | Strafe left (`linear_y = +0.5`) | +| `E` | Strafe right (`linear_y = −0.5`) | +| `Shift` | Speed multiplier ×2 | + +Multiple keys can be held simultaneously; their effects are summed. + +### `stop` + +Sent when all movement keys are released, the overlay is disengaged, or `Space` +is pressed (emergency stop). Signals the robot to halt immediately. + +```json +{ + "type": "stop" +} +``` + +No additional fields. Semantically equivalent to a `twist` with all fields zero. + +## Example client (Deno) + +A reference test client is provided at `dimos/test_ws_client.ts`: + +```sh +deno run --allow-net dimos/test_ws_client.ts +# or with a custom address: +deno run --allow-net dimos/test_ws_client.ts ws://192.168.1.10:3030/ws +``` + +## Local mode (no `--connect`) + +Without `--connect`, the viewer uses LCM UDP multicast instead of WebSocket: + +| Channel | Message type | +|---------|-------------| +| `/clicked_point#geometry_msgs.PointStamped` | Click events | +| `/cmd_vel#geometry_msgs.Twist` | Twist / stop commands | + +The WebSocket server is **not** started in this mode. diff --git a/test_keyboard_lcm.py b/test_keyboard_lcm.py deleted file mode 100644 index 9ecbc1e83ea9..000000000000 --- a/test_keyboard_lcm.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Verify dimos-viewer keyboard LCM output matches DimOS Twist expectations. - -Run: python test_keyboard_lcm.py -Requires: dimos-lcm (PYTHONPATH=.../dimos-lcm/generated/python_lcm_msgs) -""" - -import struct -import sys - -def test_twist_encoding(): - """Verify Twist encoding matches Python LCM reference.""" - # This is what the Rust viewer now produces for Twist (no Header) - # TwistCommand { linear_x: 0.5, angular_z: 0.3 } - def rot(h): - return ((h << 1) + ((h >> 63) & 1)) & 0xFFFFFFFFFFFFFFFF - - # Verify hash chain - vector3_hash = rot(0x573f2fdd2f76508f) - twist_hash = rot((0x3a4144772922add7 + vector3_hash + vector3_hash) & 0xFFFFFFFFFFFFFFFF) - assert twist_hash == 0x2e7c07d7cdf7e027, f"Twist hash mismatch: 0x{twist_hash:016x}" - - # Build expected encoding manually: hash + 6 doubles - buf = struct.pack(">q", twist_hash) # 8B fingerprint - buf += struct.pack(">d", 0.5) # linear.x - buf += struct.pack(">d", 0.0) # linear.y - buf += struct.pack(">d", 0.0) # linear.z - buf += struct.pack(">d", 0.0) # angular.x - buf += struct.pack(">d", 0.0) # angular.y - buf += struct.pack(">d", 0.3) # angular.z - assert len(buf) == 56, f"Expected 56 bytes, got {len(buf)}" - - expected_hex = "2e7c07d7cdf7e0273fe000000000000000000000000000000000000000000000000000000000000000000000000000003fd3333333333333" - assert buf.hex() == expected_hex, f"Encoding mismatch:\n got: {buf.hex()}\n expect: {expected_hex}" - - print("PASS: Twist encoding matches Python LCM reference (56 bytes)") - print(f" Channel: /cmd_vel#geometry_msgs.Twist") - print(f" Hash: 0x{twist_hash:016x}") - -def test_channel_name(): - """Verify channel follows DimOS convention.""" - channel = "/cmd_vel#geometry_msgs.Twist" - assert channel.startswith("/cmd_vel"), "Channel must start with /cmd_vel" - assert "#geometry_msgs.Twist" in channel, "Channel must include type suffix" - print(f"PASS: Channel name correct: {channel}") - -if __name__ == "__main__": - test_twist_encoding() - test_channel_name() - print("\nAll tests passed!") diff --git a/uv.lock b/uv.lock index 4acb800535c1..8091c36a1a97 100644 --- a/uv.lock +++ b/uv.lock @@ -4727,7 +4727,7 @@ provides-extras = ["all", "datafusion", "dataplatform", "notebook", "tests"] [[package]] name = "rerun-workspace" -version = "0.28.0a1+dev" +version = "0.30.0a4" source = { virtual = "." } [package.dev-dependencies]