Skip to content

Commit 33a2e95

Browse files
abhijeet-dhumalntkathole
authored andcommitted
perf: Optimize timestamp conversion in _convert_rows_to_protobuf
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent f5f5e50 commit 33a2e95

File tree

2 files changed

+181
-11
lines changed

2 files changed

+181
-11
lines changed

sdk/python/feast/utils.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,27 +1436,27 @@ def _convert_rows_to_protobuf(
14361436
requested_features: List[str],
14371437
read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]],
14381438
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[ValueProto]]]:
1439-
# Pre-calculate the length to avoid repeated calculations
14401439
n_rows = len(read_rows)
14411440

1442-
# Create single instances of commonly used values
14431441
null_value = ValueProto()
14441442
null_status = FieldStatus.NOT_FOUND
1445-
null_timestamp = Timestamp()
14461443
present_status = FieldStatus.PRESENT
14471444

1445+
# Pre-compute timestamps once per entity (not per feature)
1446+
# This reduces O(features * entities) to O(entities) for timestamp conversion
1447+
row_timestamps = []
1448+
for row_ts, _ in read_rows:
1449+
ts_proto = Timestamp()
1450+
if row_ts is not None:
1451+
ts_proto.FromDatetime(row_ts)
1452+
row_timestamps.append(ts_proto)
1453+
14481454
requested_features_vectors = []
14491455
for feature_name in requested_features:
1450-
ts_vector = [null_timestamp] * n_rows
1456+
ts_vector = list(row_timestamps) # Shallow copy of pre-computed timestamps
14511457
status_vector = [null_status] * n_rows
14521458
value_vector = [null_value] * n_rows
1453-
for idx, read_row in enumerate(read_rows):
1454-
row_ts_proto = Timestamp()
1455-
row_ts, feature_data = read_row
1456-
# TODO (Ly): reuse whatever timestamp if row_ts is None?
1457-
if row_ts is not None:
1458-
row_ts_proto.FromDatetime(row_ts)
1459-
ts_vector[idx] = row_ts_proto
1459+
for idx, (_, feature_data) in enumerate(read_rows):
14601460
if (feature_data is not None) and (feature_name in feature_data):
14611461
status_vector[idx] = present_status
14621462
value_vector[idx] = feature_data[feature_name]
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
"""
2+
Tests for feast.utils module.
3+
4+
These unit tests cover the _convert_rows_to_protobuf function which is critical
5+
for online feature retrieval performance. The function converts raw database
6+
rows to protobuf format for the serving response.
7+
"""
8+
9+
from datetime import datetime, timezone
10+
11+
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
12+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
13+
from feast.utils import _convert_rows_to_protobuf
14+
15+
16+
class TestConvertRowsToProtobuf:
17+
"""Tests for _convert_rows_to_protobuf function."""
18+
19+
def test_basic_conversion(self):
20+
"""Test basic conversion with single feature and entity."""
21+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
22+
value = ValueProto(float_val=1.5)
23+
24+
read_rows = [(timestamp, {"feature_1": value})]
25+
requested_features = ["feature_1"]
26+
27+
result = _convert_rows_to_protobuf(requested_features, read_rows)
28+
29+
assert len(result) == 1
30+
ts_vector, status_vector, value_vector = result[0]
31+
assert len(ts_vector) == 1
32+
assert ts_vector[0].seconds == int(timestamp.timestamp())
33+
assert value_vector[0] == value
34+
35+
def test_multiple_features_same_timestamp(self):
36+
"""Test that multiple features share the same pre-computed timestamp.
37+
38+
This verifies the optimization: timestamps are computed once per entity,
39+
not once per feature per entity.
40+
"""
41+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
42+
value1 = ValueProto(float_val=1.0)
43+
value2 = ValueProto(float_val=2.0)
44+
45+
read_rows = [(timestamp, {"feature_1": value1, "feature_2": value2})]
46+
requested_features = ["feature_1", "feature_2"]
47+
48+
result = _convert_rows_to_protobuf(requested_features, read_rows)
49+
50+
assert len(result) == 2
51+
ts1 = result[0][0][0]
52+
ts2 = result[1][0][0]
53+
assert ts1.seconds == ts2.seconds
54+
assert ts1.seconds == int(timestamp.timestamp())
55+
56+
def test_multiple_entities(self):
57+
"""Test conversion with multiple entities having different timestamps."""
58+
ts1 = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
59+
ts2 = datetime(2024, 1, 2, 12, 0, 0, tzinfo=timezone.utc)
60+
61+
read_rows = [
62+
(ts1, {"feature_1": ValueProto(float_val=1.0)}),
63+
(ts2, {"feature_1": ValueProto(float_val=2.0)}),
64+
]
65+
requested_features = ["feature_1"]
66+
67+
result = _convert_rows_to_protobuf(requested_features, read_rows)
68+
69+
assert len(result) == 1
70+
ts_vector, status_vector, value_vector = result[0]
71+
assert len(ts_vector) == 2
72+
assert ts_vector[0].seconds == int(ts1.timestamp())
73+
assert ts_vector[1].seconds == int(ts2.timestamp())
74+
75+
def test_null_timestamp_handling(self):
76+
"""Test that null timestamps produce empty Timestamp proto."""
77+
read_rows = [
78+
(None, {"feature_1": ValueProto(float_val=1.0)}),
79+
(
80+
datetime(2024, 1, 1, tzinfo=timezone.utc),
81+
{"feature_1": ValueProto(float_val=2.0)},
82+
),
83+
]
84+
requested_features = ["feature_1"]
85+
86+
result = _convert_rows_to_protobuf(requested_features, read_rows)
87+
88+
ts_vector = result[0][0]
89+
assert ts_vector[0].seconds == 0 # Null timestamp -> empty proto
90+
assert ts_vector[1].seconds != 0 # Valid timestamp
91+
92+
def test_missing_feature_data(self):
93+
"""Test handling of missing feature data (None row)."""
94+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
95+
96+
read_rows = [
97+
(timestamp, {"feature_1": ValueProto(float_val=1.0)}),
98+
(timestamp, None), # No feature data for this entity
99+
]
100+
requested_features = ["feature_1"]
101+
102+
result = _convert_rows_to_protobuf(requested_features, read_rows)
103+
104+
ts_vector, status_vector, value_vector = result[0]
105+
assert len(ts_vector) == 2
106+
assert status_vector[0] == FieldStatus.PRESENT
107+
assert status_vector[1] == FieldStatus.NOT_FOUND
108+
109+
def test_feature_not_in_row(self):
110+
"""Test handling when requested feature is not in the row's data."""
111+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
112+
113+
read_rows = [
114+
(timestamp, {"feature_1": ValueProto(float_val=1.0)}),
115+
]
116+
requested_features = ["feature_1", "feature_2"] # feature_2 not in data
117+
118+
result = _convert_rows_to_protobuf(requested_features, read_rows)
119+
120+
assert len(result) == 2
121+
# feature_1 is present
122+
assert result[0][1][0] == FieldStatus.PRESENT
123+
# feature_2 is not found
124+
assert result[1][1][0] == FieldStatus.NOT_FOUND
125+
126+
def test_empty_inputs(self):
127+
"""Test handling of empty inputs."""
128+
# Empty rows
129+
result = _convert_rows_to_protobuf(["feature_1"], [])
130+
assert len(result) == 1
131+
assert len(result[0][0]) == 0 # Empty ts_vector
132+
133+
# Empty features
134+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
135+
result = _convert_rows_to_protobuf([], [(timestamp, {"f": ValueProto()})])
136+
assert len(result) == 0
137+
138+
def test_large_scale_correctness(self):
139+
"""Test correctness with large number of features and entities.
140+
141+
This test verifies that the optimized implementation produces correct
142+
results at scale (50 features x 500 entities = 25,000 data points).
143+
"""
144+
timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
145+
num_entities = 500
146+
num_features = 50
147+
148+
feature_data = {
149+
f"feature_{i}": ValueProto(float_val=float(i)) for i in range(num_features)
150+
}
151+
read_rows = [(timestamp, feature_data.copy()) for _ in range(num_entities)]
152+
requested_features = [f"feature_{i}" for i in range(num_features)]
153+
154+
result = _convert_rows_to_protobuf(requested_features, read_rows)
155+
156+
# Verify structure
157+
assert len(result) == num_features
158+
for feature_idx, (ts_vector, status_vector, value_vector) in enumerate(result):
159+
assert len(ts_vector) == num_entities
160+
assert len(status_vector) == num_entities
161+
assert len(value_vector) == num_entities
162+
163+
# Verify all timestamps are the same (pre-computed once)
164+
expected_ts = int(timestamp.timestamp())
165+
for ts in ts_vector:
166+
assert ts.seconds == expected_ts
167+
168+
# Verify all statuses are PRESENT
169+
for status in status_vector:
170+
assert status == FieldStatus.PRESENT

0 commit comments

Comments
 (0)