Skip to content

Commit 251f80c

Browse files
authored
Unified TimeSeriesStore with pluggable backends, global rewrite of timed event storage (#1080)
* embeddings not generic, lsp added to dev deps * Fix return type annotations for reactive pipe operators - quality_barrier: Callable[[Observable[T]], Observable[T]] - sharpness_barrier: Callable[[Observable[Image]], Observable[Image]] * sketching a new embedding db * benchmark prints loss heatmap * typing * get_data supports nested paths * sensor storage sketch and pickle implementation * Implement SensorStore convenience methods with drift-free streaming - Implement find_closest(), first_timestamp(), iterate(), iterate_ts(), iterate_realtime() methods using abstract _iter_items/_find_closest_timestamp - Add scheduler-based stream() with absolute time reference to prevent timing drift during long playback (ported from replay.py) - Move imports to top of file, add proper typing throughout - Fix pickledir.py mypy error (pickle.load returns Any) * Add SqliteStore backend for sensor data - Single-file SQLite storage with indexed timestamp queries - BLOB storage for pickled sensor data - INSERT OR REPLACE for duplicate timestamp handling - Supports multiple tables per database (different sensors) - Added to parametrized tests (15 tests across 3 backends) * Add PostgresStore backend for sensor data - PostgresStore implements SensorStore[T] + Resource for lifecycle management - Multiple stores can share same database with different tables - Tables created automatically on first save - Tests are optional - skip gracefully if PostgreSQL not available - Added psycopg2-binary and types-psycopg2 dependencies - Includes reset_db() helper for simple migrations (drop/recreate) * Add SQL identifier validation and fix mypy issues - Validate table/database names in SqliteStore and PostgresStore using regex (alphanumeric/underscore, not starting with digit) - Fix Transform.to_pose() return type using TYPE_CHECKING import - Add return type annotation to TF.get_pose() - Fix ambiguous doclink in transports.md * Make SqliteStore use get_data/get_data_dir like PickleDirStore SqliteStore now accepts a name (e.g. "recordings/lidar") that gets resolved via get_data_dir to data/recordings/lidar.db. Still supports absolute paths and :memory: for backward compatibility. * Require T to be Timestamped subclass in SensorStore - Add TypeVar bound: T = TypeVar("T", bound=Timestamped) - Simplify save() to always use data.ts (no more optional timestamp) - Update tests to use SampleData(Timestamped) instead of strings - SqliteStore accepts str | Path for backward compatibility * consolidating old replay and new sensor store * Add LegacyPickleStore backend for TimedSensorReplay compatibility * legacy sensor store implemented, replay shim implemented * replaced legacy replay with new system * Remove import-untyped type ignores for consistency with dev * Restore import-untyped type ignores for local mypy compatibility Required when cupy/contact_graspnet are installed locally without type stubs. * record/replay on modules * Fix mypy errors: add psycopg2 stubs and xacro type ignore - Add import-untyped to xacro type: ignore comment in mesh_utils.py - Remove unused record/replay RPC methods from ModuleBase * Decouple SensorStore from Timestamped type constraint Remove the Timestamped bound from SensorStore's TypeVar, enabling storage of arbitrary data types. Timestamps are now provided explicitly via save(ts, data), with Timestamped convenience methods (save_ts, pipe_save_ts, consume_stream_ts) as opt-in helpers. iterate_realtime() and stream() now use stored timestamps instead of data.ts. * CI code cleanup * Fix ruff errors in embedding models Fix import sorting in treid.py and suppress B027 for optional warmup() in base.py. * Remove unused EmbeddingModel.warmup method * Remove unused warmup(), fix consume_stream callers Remove dead EmbeddingModel.warmup() method. Update go2.py to use consume_stream_ts() for the new SensorStore API. * Rename SensorStore methods: Timestamped as default, raw for explicit ts save/pipe_save/consume_stream now work with Timestamped data by default. save_raw/pipe_save_raw/consume_stream_raw take explicit timestamps for non-Timestamped data. * Rename SensorStore to TimeSeriesStore * Add _delete to all backends, fix find_closest and add/get/prune_old Implement _delete for InMemoryStore, SqliteStore, PickleDirStore, PostgresStore (LegacyPickleStore raises NotImplementedError). Fix find_closest docstring placement and add get/add/prune_old convenience methods. * Add collection API to TimeSeriesStore, rewrite InMemoryStore with SortedKeyList Replace InMemoryStore's dict + sorted-cache (O(n log n) rebuild on every write) with SortedKeyList for O(log n) insert, delete, and range queries. Add collection methods to TimeSeriesStore base: __len__, __iter__, last/last_timestamp, start_ts/ end_ts, time_range, duration, find_before/find_after, slice_by_time. Implement backing abstract methods (_count, _last_timestamp, _find_before, _find_after) in all five backends. Performance benchmarks confirm InMemoryStore matches TimestampedCollection on 100k items. * Rename memory/sensor to memory/timeseries, extract InMemoryStore to inmemory.py Rename the module to better reflect its purpose. Extract InMemoryStore from base.py into its own file (inmemory.py) to keep base.py focused on the abstract TimeSeriesStore class. Update all internal and external imports. * Simplify TimeSeriesStore: bound T to Timestamped, remove _raw methods, store T directly - Bound T to Timestamped — no more raw/non-Timestamped data paths - Removed save_raw, pipe_save_raw, consume_stream_raw - InMemoryStore stores T directly in SortedKeyList (no _Entry wrapper) - Removed duplicate-check on insert (same semantics as TimestampedCollection) - Performance now at parity with TimestampedCollection * tests reorganization * replacing memory store in tf.py * Fix mypy types: remove base get/add (TBuffer overrides), add None guards in tests * Replace TimestampedCollection with InMemoryStore, remove dead code - Delete TimestampedCollection class (replaced by InMemoryStore) - Rewrite TimestampedBufferCollection to inherit InMemoryStore - Remove TBuffer_old dead code from tf.py - Fix prune_old mutation-during-iteration bug in base.py - Break circular import with TYPE_CHECKING guard in base.py - Update Image.py to use public API instead of _items access - Update tests to use InMemoryStore directly * delete legacy code * removed small comment * test fix, psql mypy * fix(memory): import ModuleConfig from concrete module to fix mypy error Import Module and ModuleConfig from dimos.core.module instead of the lazy-loaded dimos.core namespace, which mypy sees as type Any.
1 parent ad956e7 commit 251f80c

39 files changed

Lines changed: 2656 additions & 733 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,5 @@ yolo11n.pt
6666
*mobileclip*
6767
/results
6868

69+
CLAUDE.MD
6970
/assets/teleop_certs/

dimos/core/stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def _subscribe(observer, scheduler=None): # type: ignore[no-untyped-def]
6969

7070
# default return is backpressured because most
7171
# use cases will want this by default
72-
def observable(self): # type: ignore[no-untyped-def]
72+
def observable(self) -> Observable[T]:
7373
return backpressure(self.pure_observable())
7474

7575

dimos/core/testing.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,5 @@ def odomloop(self) -> None:
7979
self.odometry.publish(odom)
8080

8181
lidarmsg = next(lidariter)
82-
lidarmsg.pubtime = time.perf_counter() # type: ignore[union-attr]
8382
self.lidar.publish(lidarmsg)
8483
time.sleep(0.1)

dimos/mapping/pointclouds/test_occupancy_speed.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020
from dimos.mapping.pointclouds.occupancy import OCCUPANCY_ALGOS
2121
from dimos.mapping.voxels import VoxelGridMapper
2222
from dimos.utils.cli.plot import bar
23-
from dimos.utils.data import _get_data_dir, get_data
23+
from dimos.utils.data import get_data, get_data_dir
2424
from dimos.utils.testing import TimedSensorReplay
2525

2626

2727
@pytest.mark.tool
2828
def test_build_map():
2929
mapper = VoxelGridMapper(publish_interval=-1)
3030

31-
for ts, frame in TimedSensorReplay("unitree_go2_bigoffice/lidar").iterate_duration():
32-
print(ts, frame)
31+
for _ts, frame in TimedSensorReplay("unitree_go2_bigoffice/lidar").iterate():
3332
mapper.add_frame(frame)
3433

35-
pickle_file = _get_data_dir() / "unitree_go2_bigoffice_map.pickle"
34+
pickle_file = get_data_dir() / "unitree_go2_bigoffice_map.pickle"
3635
global_pcd = mapper.get_global_pointcloud2()
3736

3837
with open(pickle_file, "wb") as f:

dimos/memory/embedding.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright 2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections.abc import Callable
16+
from dataclasses import dataclass, field
17+
from typing import cast
18+
19+
import reactivex as rx
20+
from reactivex import operators as ops
21+
from reactivex.observable import Observable
22+
23+
from dimos.core import In, rpc
24+
from dimos.core.module import Module, ModuleConfig
25+
from dimos.models.embedding.base import Embedding, EmbeddingModel
26+
from dimos.models.embedding.clip import CLIPModel
27+
from dimos.msgs.geometry_msgs import PoseStamped
28+
from dimos.msgs.nav_msgs import OccupancyGrid
29+
from dimos.msgs.sensor_msgs import Image
30+
from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier
31+
from dimos.utils.reactive import getter_hot
32+
33+
34+
@dataclass
35+
class Config(ModuleConfig):
36+
embedding_model: EmbeddingModel = field(default_factory=CLIPModel)
37+
38+
39+
@dataclass
40+
class SpatialEntry:
41+
image: Image
42+
pose: PoseStamped
43+
44+
45+
@dataclass
46+
class SpatialEmbedding(SpatialEntry):
47+
embedding: Embedding
48+
49+
50+
class EmbeddingMemory(Module[Config]):
51+
default_config = Config
52+
config: Config
53+
color_image: In[Image]
54+
global_costmap: In[OccupancyGrid]
55+
56+
_costmap_getter: Callable[[], OccupancyGrid] | None = None
57+
58+
def get_costmap(self) -> OccupancyGrid:
59+
if self._costmap_getter is None:
60+
self._costmap_getter = getter_hot(self.global_costmap.pure_observable())
61+
self._disposables.add(self._costmap_getter)
62+
return self._costmap_getter()
63+
64+
@rpc
65+
def query_costmap(self, text: str) -> OccupancyGrid:
66+
costmap = self.get_costmap()
67+
# overlay costmap with embedding heat
68+
return costmap
69+
70+
@rpc
71+
def start(self) -> None:
72+
# would be cool if this sharpness_barrier was somehow self-calibrating
73+
#
74+
# we need a Governor system, sharpness_barrier frequency shouldn't
75+
# be a fixed float but an observable that adjusts based on downstream load
76+
#
77+
# (also voxel size for mapper for example would benefit from this)
78+
self.color_image.pure_observable().pipe(
79+
sharpness_barrier(0.5),
80+
ops.flat_map(self._try_create_spatial_entry),
81+
ops.map(self._embed_spatial_entry),
82+
ops.map(self._store_spatial_entry),
83+
).subscribe(print)
84+
85+
def _try_create_spatial_entry(self, img: Image) -> Observable[SpatialEntry]:
86+
pose = self.tf.get_pose("world", "base_link")
87+
if not pose:
88+
return rx.empty()
89+
return rx.of(SpatialEntry(image=img, pose=pose))
90+
91+
def _embed_spatial_entry(self, spatial_entry: SpatialEntry) -> SpatialEmbedding:
92+
embedding = cast("Embedding", self.config.embedding_model.embed(spatial_entry.image))
93+
return SpatialEmbedding(
94+
image=spatial_entry.image,
95+
pose=spatial_entry.pose,
96+
embedding=embedding,
97+
)
98+
99+
def _store_spatial_entry(self, spatial_embedding: SpatialEmbedding) -> SpatialEmbedding:
100+
return spatial_embedding
101+
102+
def query_text(self, query: str) -> list[SpatialEmbedding]:
103+
self.config.embedding_model.embed_text(query)
104+
results: list[SpatialEmbedding] = []
105+
return results

dimos/memory/test_embedding.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright 2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
from dimos.memory.embedding import EmbeddingMemory, SpatialEntry
18+
from dimos.msgs.geometry_msgs import PoseStamped
19+
from dimos.utils.data import get_data
20+
from dimos.utils.testing import TimedSensorReplay
21+
22+
dir_name = "unitree_go2_bigoffice"
23+
24+
25+
@pytest.mark.skip
26+
def test_embed_frame() -> None:
27+
"""Test embedding a single frame."""
28+
# Load a frame from recorded data
29+
video = TimedSensorReplay(get_data(dir_name) / "video")
30+
frame = video.find_closest_seek(10)
31+
32+
# Create memory and embed
33+
memory = EmbeddingMemory()
34+
35+
try:
36+
# Create a spatial entry with dummy pose (no TF needed for this test)
37+
dummy_pose = PoseStamped(
38+
position=[0, 0, 0],
39+
orientation=[0, 0, 0, 1], # identity quaternion
40+
)
41+
spatial_entry = SpatialEntry(image=frame, pose=dummy_pose)
42+
43+
# Embed the frame
44+
result = memory._embed_spatial_entry(spatial_entry)
45+
46+
# Verify
47+
assert result is not None
48+
assert result.embedding is not None
49+
assert result.embedding.vector is not None
50+
print(f"Embedding shape: {result.embedding.vector.shape}")
51+
print(f"Embedding vector (first 5): {result.embedding.vector[:5]}")
52+
finally:
53+
memory.stop()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2025-2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Time series storage and replay."""
15+
16+
from dimos.memory.timeseries.base import TimeSeriesStore
17+
from dimos.memory.timeseries.inmemory import InMemoryStore
18+
from dimos.memory.timeseries.pickledir import PickleDirStore
19+
from dimos.memory.timeseries.sqlite import SqliteStore
20+
21+
22+
def __getattr__(name: str): # type: ignore[no-untyped-def]
23+
if name == "PostgresStore":
24+
from dimos.memory.timeseries.postgres import PostgresStore
25+
26+
return PostgresStore
27+
if name == "reset_db":
28+
from dimos.memory.timeseries.postgres import reset_db
29+
30+
return reset_db
31+
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
32+
33+
34+
__all__ = [
35+
"InMemoryStore",
36+
"PickleDirStore",
37+
"PostgresStore",
38+
"SqliteStore",
39+
"TimeSeriesStore",
40+
"reset_db",
41+
]

0 commit comments

Comments
 (0)