1414
1515from collections .abc import Generator
1616from contextlib import contextmanager
17- from typing import Any
17+ from typing import TYPE_CHECKING , Any
1818
1919import numpy as np
2020
@@ -211,13 +211,21 @@ def redis_msggen(size: int) -> tuple[str, Any]:
211211 ROSTopic ,
212212)
213213
214+ if TYPE_CHECKING :
215+ from numpy .typing import NDArray
216+
214217if ROS_AVAILABLE :
215- from rclpy .qos import QoSDurabilityPolicy , QoSHistoryPolicy , QoSProfile , QoSReliabilityPolicy
216- from sensor_msgs .msg import Image as ROSImage
218+ from rclpy .qos import ( # type: ignore[no-untyped-call]
219+ QoSDurabilityPolicy ,
220+ QoSHistoryPolicy ,
221+ QoSProfile ,
222+ QoSReliabilityPolicy ,
223+ )
224+ from sensor_msgs .msg import Image as ROSImage # type: ignore[attr-defined,no-untyped-call]
217225
218226 @contextmanager
219227 def ros_best_effort_pubsub_channel () -> Generator [RawROS , None , None ]:
220- qos = QoSProfile (
228+ qos = QoSProfile ( # type: ignore[no-untyped-call]
221229 reliability = QoSReliabilityPolicy .BEST_EFFORT ,
222230 history = QoSHistoryPolicy .KEEP_LAST ,
223231 durability = QoSDurabilityPolicy .VOLATILE ,
@@ -230,7 +238,7 @@ def ros_best_effort_pubsub_channel() -> Generator[RawROS, None, None]:
230238
231239 @contextmanager
232240 def ros_reliable_pubsub_channel () -> Generator [RawROS , None , None ]:
233- qos = QoSProfile (
241+ qos = QoSProfile ( # type: ignore[no-untyped-call]
234242 reliability = QoSReliabilityPolicy .RELIABLE ,
235243 history = QoSHistoryPolicy .KEEP_LAST ,
236244 durability = QoSDurabilityPolicy .VOLATILE ,
@@ -245,21 +253,21 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]:
245253 import numpy as np
246254
247255 # Create image data
248- data = np .frombuffer (make_data_bytes (size ), dtype = np .uint8 ). reshape ( - 1 )
249- padded_size = ((len (data ) + 2 ) // 3 ) * 3
250- data = np .pad (data , (0 , padded_size - len (data )))
251- pixels = len (data ) // 3
256+ raw_data : NDArray [ np . uint8 ] = np .frombuffer (make_data_bytes (size ), dtype = np .uint8 )
257+ padded_size = ((len (raw_data ) + 2 ) // 3 ) * 3
258+ padded_data : NDArray [ np . uint8 ] = np .pad (raw_data , (0 , padded_size - len (raw_data )))
259+ pixels = len (padded_data ) // 3
252260 height = max (1 , int (pixels ** 0.5 ))
253261 width = pixels // height
254- data = data [: height * width * 3 ]
262+ final_data : NDArray [ np . uint8 ] = padded_data [: height * width * 3 ]
255263
256264 # Create ROS Image message
257- msg = ROSImage ()
265+ msg = ROSImage () # type: ignore[no-untyped-call]
258266 msg .height = height
259267 msg .width = width
260268 msg .encoding = "rgb8"
261269 msg .step = width * 3
262- msg .data = data . tobytes ( )
270+ msg .data = bytes ( final_data )
263271
264272 topic = RawROSTopic (topic = "/benchmark/ros" , ros_type = ROSImage )
265273 return (topic , msg )
@@ -280,7 +288,7 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]:
280288
281289 @contextmanager
282290 def dimos_ros_best_effort_pubsub_channel () -> Generator [DimosROS , None , None ]:
283- qos = QoSProfile (
291+ qos = QoSProfile ( # type: ignore[no-untyped-call]
284292 reliability = QoSReliabilityPolicy .BEST_EFFORT ,
285293 history = QoSHistoryPolicy .KEEP_LAST ,
286294 durability = QoSDurabilityPolicy .VOLATILE ,
@@ -293,7 +301,7 @@ def dimos_ros_best_effort_pubsub_channel() -> Generator[DimosROS, None, None]:
293301
294302 @contextmanager
295303 def dimos_ros_reliable_pubsub_channel () -> Generator [DimosROS , None , None ]:
296- qos = QoSProfile (
304+ qos = QoSProfile ( # type: ignore[no-untyped-call]
297305 reliability = QoSReliabilityPolicy .RELIABLE ,
298306 history = QoSHistoryPolicy .KEEP_LAST ,
299307 durability = QoSDurabilityPolicy .VOLATILE ,
0 commit comments