2626import reactivex as rx
2727from reactivex import operators as ops
2828from reactivex .disposable import Disposable
29+ import rerun as rr
2930
3031import dimos .core .colors as colors
3132from dimos .core .resource import Resource
@@ -137,20 +138,21 @@ def __str__(self) -> str:
137138 )
138139
139140
140- class Out (Stream [T ]):
141+ class Out (Stream [T ], ObservableMixin [ T ] ):
141142 _transport : Transport # type: ignore[type-arg]
142143
143144 def __init__ (self , * argv , ** kwargs ) -> None : # type: ignore[no-untyped-def]
144145 super ().__init__ (* argv , ** kwargs )
146+ self ._rerun_config : dict | None = None # type: ignore[type-arg]
147+ self ._rerun_last_log : float = 0.0
145148
146149 @property
147150 def transport (self ) -> Transport [T ]:
148151 return self ._transport
149152
150153 @transport .setter
151154 def transport (self , value : Transport [T ]) -> None :
152- # just for type checking
153- ...
155+ self ._transport = value
154156
155157 @property
156158 def state (self ) -> State :
@@ -173,8 +175,76 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def]
173175 if not hasattr (self , "_transport" ) or self ._transport is None :
174176 logger .warning (f"Trying to publish on Out { self } without a transport" )
175177 return
178+
179+ # Log to Rerun directly if configured
180+ if self ._rerun_config is not None :
181+ self ._log_to_rerun (msg )
182+
176183 self ._transport .broadcast (self , msg )
177184
185+ def subscribe (self , cb ) -> Callable [[], None ]: # type: ignore[no-untyped-def]
186+ """Subscribe to this output stream.
187+
188+ Args:
189+ cb: Callback function to receive messages
190+
191+ Returns:
192+ Unsubscribe function
193+ """
194+ return self .transport .subscribe (cb , self ) # type: ignore[arg-type, func-returns-value, no-any-return]
195+
196+ def autolog_to_rerun (
197+ self ,
198+ entity_path : str ,
199+ rate_limit : float | None = None ,
200+ ** rerun_kwargs : Any ,
201+ ) -> None :
202+ """Configure this output to auto-log to Rerun (fire-and-forget).
203+
204+ Call once in start() - messages auto-logged when published.
205+
206+ Args:
207+ entity_path: Rerun entity path (e.g., "world/map")
208+ rate_limit: Max Hz to log (None = unlimited)
209+ **rerun_kwargs: Passed to msg.to_rerun() for rendering config
210+ (e.g., radii=0.02, colormap="turbo", colors=[255,0,0])
211+
212+ Example:
213+ def start(self):
214+ super().start()
215+ # Just declare it - fire and forget!
216+ self.global_map.autolog_to_rerun("world/map", rate_limit=5.0, radii=0.02)
217+ """
218+ self ._rerun_config = {
219+ "entity_path" : entity_path ,
220+ "rate_limit" : rate_limit ,
221+ "rerun_kwargs" : rerun_kwargs ,
222+ }
223+ self ._rerun_last_log = 0.0
224+
225+ def _log_to_rerun (self , msg : T ) -> None :
226+ """Log message to Rerun with rate limiting."""
227+ if not hasattr (msg , "to_rerun" ):
228+ return
229+
230+ if self ._rerun_config is None :
231+ return
232+
233+ import time
234+
235+ config = self ._rerun_config
236+
237+ # Rate limiting
238+ if config ["rate_limit" ] is not None :
239+ now = time .monotonic ()
240+ min_interval = 1.0 / config ["rate_limit" ]
241+ if now - self ._rerun_last_log < min_interval :
242+ return # Skip - too soon
243+ self ._rerun_last_log = now
244+
245+ rerun_data = msg .to_rerun (** config ["rerun_kwargs" ])
246+ rr .log (config ["entity_path" ], rerun_data )
247+
178248
179249class RemoteStream (Stream [T ]):
180250 @property
0 commit comments