Skip to content

Commit 801cef6

Browse files
Merge pull request #801 from dimensionalOS/rpc-fixes-merge
- implements exception passing to RPC clients (if there is an exception on the server, it will be raised on the caller side) - stress tests for all transport protocols (we expect 10k messages < sec, 100% reliability expected across all transports) - stress tests for RPC (1k rpc calls < sec, 100% reliability expected) - SHM actually fails this - has 25% reliability on RPC tests due to how it handles message reception, this is an issue that means we can't use SHM across the board in dimos - moves rpc call threadpool from LCMRPC to the rpc base so it's used cross-protocol - shares a single cpp LCM instance/loop per process for all instantiated transports - investigated kernel networking stats and LCM cpp implementation, optimized details around this to get LCM to perform much better
2 parents 888fe6c + 9618e03 commit 801cef6

23 files changed

+910
-526
lines changed

dimos/agents2/spec.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ def start(self) -> None:
162162
super().start()
163163

164164
def stop(self) -> None:
165+
if hasattr(self, "transport") and self.transport:
166+
self.transport.stop() # type: ignore[attr-defined]
167+
self.transport = None # type: ignore[assignment]
165168
super().stop()
166169

167170
@rpc

dimos/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
pLCMTransport,
2020
pSHMTransport,
2121
)
22-
from dimos.protocol.rpc.lcmrpc import LCMRPC
22+
from dimos.protocol.rpc import LCMRPC
2323
from dimos.protocol.rpc.spec import RPCSpec
2424
from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec
2525
from dimos.utils.actor_registry import ActorRegistry

dimos/core/module.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from dimos.core.resource import Resource
3434
from dimos.core.rpc_client import RpcCall
3535
from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport
36-
from dimos.protocol.rpc import LCMRPC, RPCSpec # type: ignore[attr-defined]
36+
from dimos.protocol.rpc import LCMRPC, RPCSpec
3737
from dimos.protocol.service import Configurable # type: ignore[attr-defined]
3838
from dimos.protocol.skill.skill import SkillContainer
3939
from dimos.protocol.tf import LCMTF, TFSpec

dimos/core/rpc_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from collections.abc import Callable
1616
from typing import Any
1717

18-
from dimos.protocol.rpc.lcmrpc import LCMRPC
18+
from dimos.protocol.rpc import LCMRPC
1919
from dimos.utils.logging_config import setup_logger
2020

2121
logger = setup_logger()

dimos/protocol/pubsub/lcmpubsub.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ class LCMPubSubBase(LCMService, PubSub[Topic, Any]):
6464
_callbacks: dict[str, list[Callable[[Any], None]]]
6565

6666
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
67-
LCMService.__init__(self, **kwargs)
6867
super().__init__(**kwargs)
6968
self._callbacks = {}
7069

@@ -73,6 +72,7 @@ def publish(self, topic: Topic, message: bytes) -> None:
7372
if self.l is None:
7473
logger.error("Tried to publish after LCM was closed")
7574
return
75+
7676
self.l.publish(str(topic), message)
7777

7878
def subscribe(
@@ -88,6 +88,9 @@ def noop() -> None:
8888

8989
lcm_subscription = self.l.subscribe(str(topic), lambda _, msg: callback(msg, topic))
9090

91+
# Set queue capacity to 10000 to handle high-volume bursts
92+
lcm_subscription.set_queue_capacity(10000)
93+
9194
def unsubscribe() -> None:
9295
if self.l is None:
9396
return

dimos/protocol/pubsub/test_spec.py

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import pytest
2424

2525
from dimos.msgs.geometry_msgs import Vector3
26+
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
2627
from dimos.protocol.pubsub.memory import Memory
2728

2829

@@ -61,27 +62,21 @@ def redis_context():
6162
print("Redis not available")
6263

6364

64-
try:
65-
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
65+
@contextmanager
66+
def lcm_context():
67+
lcm_pubsub = LCM(autoconf=True)
68+
lcm_pubsub.start()
69+
yield lcm_pubsub
70+
lcm_pubsub.stop()
6671

67-
@contextmanager
68-
def lcm_context():
69-
lcm_pubsub = LCM(autoconf=True)
70-
lcm_pubsub.start()
71-
yield lcm_pubsub
72-
lcm_pubsub.stop()
7372

74-
testdata.append(
75-
(
76-
lcm_context,
77-
Topic(topic="/test_topic", lcm_type=Vector3),
78-
[Vector3(1, 2, 3), Vector3(4, 5, 6), Vector3(7, 8, 9)], # Using Vector3 as mock data,
79-
)
73+
testdata.append(
74+
(
75+
lcm_context,
76+
Topic(topic="/test_topic", lcm_type=Vector3),
77+
[Vector3(1, 2, 3), Vector3(4, 5, 6), Vector3(7, 8, 9)], # Using Vector3 as mock data,
8078
)
81-
82-
except (ConnectionError, ImportError):
83-
# either redis is not installed or the server is not running
84-
print("LCM not available")
79+
)
8580

8681

8782
from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory
@@ -263,3 +258,40 @@ async def consume_messages() -> None:
263258
# Verify all messages were received in order
264259
assert len(received_messages) == len(messages_to_send)
265260
assert received_messages == messages_to_send
261+
262+
263+
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
264+
def test_high_volume_messages(pubsub_context, topic, values) -> None:
265+
"""Test that all 5000 messages are received correctly."""
266+
with pubsub_context() as x:
267+
# Create a list to capture received messages
268+
received_messages = []
269+
last_message_time = [time.time()] # Use list to allow modification in callback
270+
271+
# Define callback function
272+
def callback(message, topic) -> None:
273+
received_messages.append(message)
274+
last_message_time[0] = time.time()
275+
276+
# Subscribe to the topic
277+
x.subscribe(topic, callback)
278+
279+
# Publish 10000 messages
280+
num_messages = 10000
281+
for _ in range(num_messages):
282+
x.publish(topic, values[0])
283+
284+
# Wait until no messages received for 0.5 seconds
285+
timeout = 1.0 # Maximum time to wait
286+
stable_duration = 0.1 # Time without new messages to consider done
287+
start_time = time.time()
288+
289+
while time.time() - start_time < timeout:
290+
if time.time() - last_message_time[0] >= stable_duration:
291+
break
292+
time.sleep(0.1)
293+
294+
# Capture count and clear list to avoid printing huge list on failure
295+
received_len = len(received_messages)
296+
received_messages.clear()
297+
assert received_len == num_messages, f"Expected {num_messages} messages, got {received_len}"

dimos/protocol/rpc/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from dimos.protocol.rpc.lcmrpc import LCMRPC
15+
from dimos.protocol.rpc.pubsubrpc import LCMRPC, ShmRPC
1616
from dimos.protocol.rpc.spec import RPCClient, RPCServer, RPCSpec
17+
18+
__all__ = ["LCMRPC", "RPCClient", "RPCServer", "RPCSpec", "ShmRPC"]

dimos/protocol/rpc/lcmrpc.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

0 commit comments

Comments
 (0)