1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- from collections .abc import Callable
15+ from collections .abc import Callable , Generator
1616from dataclasses import dataclass , field
17- import queue
1817import time
18+ from typing import Any
1919
2020import reactivex as rx
2121from reactivex import operators as ops
22- from reactivex .disposable import Disposable
2322from reactivex .observable import Observable
2423
25- from dimos import spec
26- from dimos .agents import Output , Reducer , Stream , skill # type: ignore[attr-defined]
24+ from dimos .agents import Output , Reducer , Stream , skill
2725from dimos .core import Module , ModuleConfig , Out , rpc
2826from dimos .hardware .sensors .camera .spec import CameraHardware
2927from dimos .hardware .sensors .camera .webcam import Webcam
3028from dimos .msgs .geometry_msgs import Quaternion , Transform , Vector3
31- from dimos .msgs .sensor_msgs import Image
3229from dimos .msgs .sensor_msgs .CameraInfo import CameraInfo
3330from dimos .msgs .sensor_msgs .Image import Image , sharpness_barrier
34- from dimos .spec import perception as spec # type: ignore[no-redef]
31+ from dimos .spec import perception
32+ from dimos .utils .reactive import iter_observable
3533
3634
37- def default_transform (): # type: ignore[no-untyped-def]
35+ def default_transform () -> Transform :
3836 return Transform (
3937 translation = Vector3 (0.0 , 0.0 , 0.0 ),
4038 rotation = Quaternion (0.0 , 0.0 , 0.0 , 1.0 ),
@@ -47,81 +45,52 @@ def default_transform(): # type: ignore[no-untyped-def]
4745class CameraModuleConfig (ModuleConfig ):
4846 frame_id : str = "camera_link"
4947 transform : Transform | None = field (default_factory = default_transform )
50- hardware : Callable [[], CameraHardware ] | CameraHardware = Webcam # type: ignore[type-arg]
51- frequency : float = 5.0
48+ hardware : Callable [[], CameraHardware [ Any ]] | CameraHardware [ Any ] = Webcam
49+ frequency : float = 0.0 # Hz, 0 means no limit
5250
5351
54- class CameraModule (Module [CameraModuleConfig ], spec .Camera ):
52+ class CameraModule (Module [CameraModuleConfig ], perception .Camera ):
5553 color_image : Out [Image ]
5654 camera_info : Out [CameraInfo ]
5755
58- hardware : CameraHardware = None # type: ignore[assignment, type-arg]
59- _module_subscription : Disposable | None = None
60- _camera_info_subscription : Disposable | None = None
61- _skill_stream : Observable [Image ] | None = None
56+ hardware : CameraHardware [Any ]
6257
6358 config : CameraModuleConfig
6459 default_config = CameraModuleConfig
6560
66- def __init__ (self , * args , ** kwargs ): # type: ignore[no-untyped-def]
61+ def __init__ (self , * args : Any , ** kwargs : Any ) -> None :
6762 super ().__init__ (* args , ** kwargs )
6863
69- @property
70- def hardware_camera_info (self ) -> CameraInfo :
71- return self .hardware .camera_info
72-
7364 @rpc
74- def start (self ): # type: ignore[no-untyped-def]
65+ def start (self ) -> None :
7566 if callable (self .config .hardware ):
7667 self .hardware = self .config .hardware ()
7768 else :
7869 self .hardware = self .config .hardware
7970
80- if self ._module_subscription :
81- return "already started"
82-
83- stream = self .hardware .image_stream ().pipe (sharpness_barrier (self .config .frequency )) # type: ignore[attr-defined]
84- self ._disposables .add (stream .subscribe (self .color_image .publish ))
85-
86- # camera_info_stream = self.camera_info_stream(frequency=5.0)
87-
88- def publish_info (camera_info : CameraInfo ) -> None :
89- self .camera_info .publish (camera_info )
90-
91- if self .config .transform is None :
92- return
93-
94- camera_link = self .config .transform
95- camera_link .ts = camera_info .ts
96- camera_optical = Transform (
97- translation = Vector3 (0.0 , 0.0 , 0.0 ),
98- rotation = Quaternion (- 0.5 , 0.5 , - 0.5 , 0.5 ),
99- frame_id = "camera_link" ,
100- child_frame_id = "camera_optical" ,
101- ts = camera_link .ts ,
102- )
103-
104- self .tf .publish (camera_link , camera_optical )
71+ stream = self .hardware .image_stream ()
10572
106- self . _camera_info_subscription = self .camera_info_stream (). subscribe ( publish_info ) # type: ignore[assignment]
107- self . _module_subscription = stream .subscribe ( self .color_image . publish ) # type: ignore[attr-defined]
73+ if self .config . frequency > 0 :
74+ stream = stream .pipe ( sharpness_barrier ( self .config . frequency ))
10875
109- @skill (stream = Stream .passive , output = Output .image , reducer = Reducer .latest ) # type: ignore[arg-type]
110- def video_stream (self ) -> Image : # type: ignore[misc]
111- """implicit video stream skill"""
112- _queue = queue .Queue (maxsize = 1 ) # type: ignore[var-annotated]
113- self .hardware .image_stream ().subscribe (_queue .put )
76+ self ._disposables .add (
77+ stream .subscribe (self .color_image .publish ),
78+ )
11479
115- yield from iter (_queue .get , None )
80+ self ._disposables .add (
81+ rx .interval (1.0 ).subscribe (lambda _ : self .publish_metadata ()),
82+ )
11683
117- def publish_info (self , camera_info : CameraInfo ) -> None :
84+ def publish_metadata (self ) -> None :
85+ camera_info = self .hardware .camera_info .with_ts (time .time ())
11886 self .camera_info .publish (camera_info )
11987
120- if self .config .transform is None :
88+ if not self .config .transform :
12189 return
12290
12391 camera_link = self .config .transform
12492 camera_link .ts = camera_info .ts
93+
12594 camera_optical = Transform (
12695 translation = Vector3 (0.0 , 0.0 , 0.0 ),
12796 rotation = Quaternion (- 0.5 , 0.5 , - 0.5 , 0.5 ),
@@ -132,21 +101,13 @@ def publish_info(self, camera_info: CameraInfo) -> None:
132101
133102 self .tf .publish (camera_link , camera_optical )
134103
135- def camera_info_stream (self , frequency : float = 1.0 ) -> Observable [CameraInfo ]:
136- def camera_info (_ ) -> CameraInfo : # type: ignore[no-untyped-def]
137- self .hardware .camera_info .ts = time .time ()
138- return self .hardware .camera_info
139-
140- return rx .interval (1.0 / frequency ).pipe (ops .map (camera_info ))
141-
142- def stop (self ): # type: ignore[no-untyped-def]
143- if self ._module_subscription :
144- self ._module_subscription .dispose ()
145- self ._module_subscription = None
146- if self ._camera_info_subscription :
147- self ._camera_info_subscription .dispose ()
148- self ._camera_info_subscription = None
149- # Also stop the hardware if it has a stop method
104+ # actually skills should support on_demand passive skills so we don't emit this periodically
105+ # but just provide the latest frame on demand
106+ @skill (stream = Stream .passive , output = Output .image , reducer = Reducer .latest ) # type: ignore[arg-type]
107+ def video_stream (self ) -> Generator [Image , None , None ]:
108+ yield from iter_observable (self .hardware .image_stream ().pipe (ops .sample (1.0 )))
109+
110+ def stop (self ) -> None :
150111 if self .hardware and hasattr (self .hardware , "stop" ):
151112 self .hardware .stop ()
152113 super ().stop ()
0 commit comments