Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0117d89
feat: vector reducer functions and performance improvements
AlexFilipImproving Mar 11, 2026
5bc847c
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Mar 11, 2026
29ed9d8
Merge main and regenerate tests
AlexFilipImproving Mar 11, 2026
225c756
Fix changes that git messed up in the merge
AlexFilipImproving Mar 11, 2026
8280227
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Mar 13, 2026
6f9d638
Fix some integration tests
AlexFilipImproving Mar 13, 2026
32aeebd
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Mar 16, 2026
1841331
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Mar 23, 2026
1b72e51
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Apr 1, 2026
23a5618
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Apr 10, 2026
ff40960
Remove test regeneration in generate.py
AlexFilipImproving Apr 10, 2026
d8233f7
refactor: move reducer processing to per-record model with self-parsing
AlexFilipImproving Apr 17, 2026
6cda697
Remove subprocess import
AlexFilipImproving Apr 17, 2026
8451a99
Merge branch 'main' into feature/vector-reducer-functions
AlexFilipImproving Apr 20, 2026
11ad3a0
feat: add vector value type support to expr::Value
AlexFilipImproving Mar 11, 2026
4cfd0a4
Fix formatting issue
AlexFilipImproving Mar 16, 2026
743f18c
Formatting
AlexFilipImproving Mar 16, 2026
8b6eed5
Change name from Vector to Array and modify related constructor in Value
AlexFilipImproving Apr 14, 2026
5097428
Format fix
AlexFilipImproving Apr 14, 2026
ebb47be
feat: add RANDOM_SAMPLE reducer function
rileydes-improving Apr 1, 2026
c7ac819
test: pruning RANDOM_SAMPLE unit tests to only essential
rileydes-improving Apr 1, 2026
1fa83d1
format: autoformat changes
rileydes-improving Apr 1, 2026
bbe70f9
refactor(ft_aggregate): running RandomSample initialization logic log…
rileydes-improving Apr 1, 2026
44b91ea
refactor(ft_aggregate_exec_test): update Vector to Array naming
rileydes-improving Apr 15, 2026
82e6ee3
refactor(ft_aggregate_exec): refactor RandomSample based off base cha…
rileydes-improving Apr 23, 2026
08c1dd9
test(aggregate): add RANDOM_SAMPLE error handling tests to compatibil…
rileydes-improving Apr 23, 2026
bae15fe
style: autoformat changes
rileydes-improving Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/commands/ft.aggregate.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ The following reducer functions are available. The reducer functions that take a
| MAX 1 <expression> | The largest numerical values of the expression. |
| AVG 1 <expression> | The numerical average of the values of the expression. |
| STDDEV 1 <expression> | The standard deviation the values of the expression. |
| RANDOM_SAMPLE 2 <expression> <sample_size> | A random sample of values from the expression using reservoir sampling. Returns an array of up to sample_size elements. |
Binary file modified integration/compatibility/aggregate-answers.pickle.gz
Binary file not shown.
42 changes: 35 additions & 7 deletions integration/compatibility/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ def check(self, dialect, *orig_cmd):

def checkall(self, dialect, *orig_cmd, **kwargs):
'''Non-vector commands. Doesn't have support for '*' yet. '''
self.checkvec(self, dialect, orig_cmd, kwargs)
self.check(self, dialect, orig_cmd)
self.checkvec(dialect, *orig_cmd, **kwargs)
self.check(dialect, *orig_cmd)

def test_bad_numeric_data(self, key_type, dialect):
self.setup_data("bad numbers", key_type)
self.check(dialect, f"ft.search {key_type}_idx1", "@n1:[-inf inf]")
self.check(dialect, f"ft.search {key_type}_idx1", "-@n1:[-inf inf]")
self.check(dialect, f"ft.search {key_type}_idx1", "@n2:[-inf inf]")
self.check(dialect, f"ft.search {key_type}_idx1", "-@n2:[-inf inf]")
self.check(dialect, "ft.search", f"{key_type}_idx1", "@n1:[-inf inf]")
self.check(dialect, "ft.search", f"{key_type}_idx1", "-@n1:[-inf inf]")
self.check(dialect, "ft.search", f"{key_type}_idx1", "@n2:[-inf inf]")
self.check(dialect, "ft.search", f"{key_type}_idx1", "-@n2:[-inf inf]")

def test_search_reverse(self, key_type, dialect):
self.setup_data("reverse vector numbers", key_type)
Expand Down Expand Up @@ -263,6 +263,34 @@ def test_aggregate_groupby(self, key_type, dialect):
f"ft.aggregate {key_type}_idx1 * load 6 @__key @n1 @n2 @t1 @t2 @t3 groupby 1 @t3 reduce max 1 @n1 as nmax"
)
self.check(dialect, f'ft.aggregate {key_type}_idx1 * load 6 @__key @n1 @n2 @t1 @t2 @t3 groupby 1 @t1 reduce max 1 @n2 as nmax')

def test_aggregate_random_sample_errors(self, key_type, dialect):
"""Test error behavior for RANDOM_SAMPLE edge cases against Redis.

Only cases that Redis rejects are included. Success succeeds
are omitted because RANDOM_SAMPLE output is non-deterministic
and can't be compared row-by-row against a recorded answer.
"""
self.setup_data("sortable numbers", key_type)

error_cases = [
# Wrong argument counts.
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 0 as s",
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 1 @n1 as s",
# Non-numeric sample size.
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 invalid as s",
# Non-integer sample size.
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 1.5 as s",
# Negative sample size.
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 -1 as s",
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 -5 as s",
# Above cap (1000).
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 1001 as s",
f"ft.aggregate {key_type}_idx1 * load 2 @__key @n1 groupby 1 @t1 reduce random_sample 2 @n1 10000 as s",
]
for cmd in error_cases:
self.execute_command(cmd.split())

def test_aggregate_limit(self, key_type, dialect):
self.setup_data("sortable numbers", key_type)
self.check(dialect, f"ft.aggregate {key_type}_idx1 * load 3 @__key @n1 @n2")
Expand Down Expand Up @@ -463,7 +491,7 @@ def test_search_sortby(self, key_type, dialect):

for sort_key in ["n1", "n2"]:
for direction in ["ASC", "DESC", ""]:
for return_keys in ["", "RETURN 3 @n1 @t1"]:
for return_keys in ["", "RETURN 2 @n1 @t1"]:
for wsk in ["", "WITHSORTKEYS"]:
for limit in ["LIMIT 0 5", "LIMIT 2 3", ""]:
self.check(dialect, f"ft.search {key_type}_idx1 * SORTBY {sort_key} {direction} {return_keys} {limit} {wsk}")
Expand Down
160 changes: 160 additions & 0 deletions integration/test_non_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,162 @@ def validate_aggregate_queries(client: Valkey):
)
assert result[0] == 2

def validate_random_sample_queries(client: Valkey):
"""
Test FT.AGGREGATE with RANDOM_SAMPLE reducer.
"""
# 1. Basic RANDOM_SAMPLE functionality
# Use APPLY to create a constant field for grouping all records together
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 1000]",
"LOAD", "1", "price",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "5", "AS", "sample"
)
assert result[0] == 1
# Result should have a sample field with an array
row = dict(zip(result[1][::2], result[1][1::2]))
assert b'sample' in row
# Sample should be an array (list in Python)
sample = row[b'sample']
assert isinstance(sample, list)
assert len(sample) <= 5 # Should have at most 5 elements

# 2. RANDOM_SAMPLE with various sample sizes
# Sample size smaller than group size
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "1", "price",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "10", "AS", "sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
sample = row[b'sample']
assert isinstance(sample, list)
assert len(sample) == 10 # Should have exactly 10 elements

# Sample size larger than group size
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 5]",
"LOAD", "1", "price",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "100", "AS", "sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
sample = row[b'sample']
assert isinstance(sample, list)
assert len(sample) == 5 # Should have all 5 elements

# Sample size = 0 (empty array)
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "1", "price",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "0", "AS", "sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
sample = row[b'sample']
assert isinstance(sample, list)
assert len(sample) == 0 # Should be empty

# 3. Multiple RANDOM_SAMPLE reducers in same query
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "2", "price", "rating",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "5", "AS", "price_sample",
"REDUCE", "RANDOM_SAMPLE", "2", "@rating", "5", "AS", "rating_sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
assert b'price_sample' in row
assert b'rating_sample' in row
price_sample = row[b'price_sample']
rating_sample = row[b'rating_sample']
assert isinstance(price_sample, list)
assert isinstance(rating_sample, list)
assert len(price_sample) == 5
assert len(rating_sample) == 5
# Samples should be independent (different values)
assert price_sample != rating_sample

# 4. RANDOM_SAMPLE with GROUPBY operations
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 1000]",
"LOAD", "2", "price", "category",
"GROUPBY", "1", "@category",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "10", "AS", "sample"
)
assert result[0] == 2 # Two categories: electronics and books
for i in range(1, len(result)):
row = dict(zip(result[i][::2], result[i][1::2]))
assert b'category' in row
assert b'sample' in row
sample = row[b'sample']
assert isinstance(sample, list)
assert len(sample) == 10 # Each group should have 10 samples

# 5. RANDOM_SAMPLE with numeric fields
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "1", "price",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "5", "AS", "sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
sample = row[b'sample']
assert isinstance(sample, list)
# All values should be numeric strings
for val in sample:
assert isinstance(val, bytes)
float(val) # Should be convertible to float

# 6. RANDOM_SAMPLE with string fields
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "1", "category",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@category", "5", "AS", "sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
sample = row[b'sample']
assert isinstance(sample, list)
# All values should be strings
for val in sample:
assert isinstance(val, bytes)
assert val in [b'electronics', b'books']

# 7. RANDOM_SAMPLE with mixed-type fields (numeric and string in same property)
# This tests that RANDOM_SAMPLE handles different types correctly
result = client.execute_command(
"FT.AGGREGATE", "products", "@price:[1 100]",
"LOAD", "2", "price", "rating",
"APPLY", "1", "AS", "all",
"GROUPBY", "1", "@all",
"REDUCE", "RANDOM_SAMPLE", "2", "@price", "3", "AS", "price_sample",
"REDUCE", "RANDOM_SAMPLE", "2", "@rating", "3", "AS", "rating_sample"
)
assert result[0] == 1
row = dict(zip(result[1][::2], result[1][1::2]))
price_sample = row[b'price_sample']
rating_sample = row[b'rating_sample']
assert isinstance(price_sample, list)
assert isinstance(rating_sample, list)
assert len(price_sample) == 3
assert len(rating_sample) == 3

def validate_aggregate_complex_queries(client: Valkey):
"""
Test complex FT.AGGREGATE queries with numeric and tag.
Expand Down Expand Up @@ -488,6 +644,8 @@ def test_aggregate_complex(self):
for doc in json_docs:
assert client.execute_command(*doc) == b"OK"
validate_aggregate_complex_queries(client)
# Test RANDOM_SAMPLE functionality
validate_random_sample_queries(client)

def test_uningested_multi_field(self):
"""
Expand Down Expand Up @@ -550,6 +708,8 @@ def test_aggregate_complex_cluster(self):
for doc in aggregate_complex_json_docs:
assert cluster_client.execute_command(*doc) == b"OK"
validate_aggregate_complex_queries(cluster_client)
# Test RANDOM_SAMPLE functionality in cluster mode
validate_random_sample_queries(cluster_client)

def test_max_search_keys_fetch_limited(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/commands/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ struct QueryCommand : public query::SearchParameters {
//
// Determine if we need full results or if we can optimize with trimming
//
virtual bool RequiresCompleteResults() const = 0;
bool RequiresCompleteResults() const override = 0;
//
// Called when query completes.
//
Expand Down
7 changes: 6 additions & 1 deletion src/commands/ft.aggregate.json
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@
"name": "STDDEV",
"type": "pure-token",
"token": "STDDEV"
},
{
"name": "RANDOM_SAMPLE",
"type": "pure-token",
"token": "RANDOM_SAMPLE"
}
]
},
Expand Down Expand Up @@ -361,4 +366,4 @@
"module_since": "1.1.0",
"summary": "Performs a search of the specified index. The keys which match the query expression are subjected to further processing as specified"
}
}
}
35 changes: 35 additions & 0 deletions src/commands/ft_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,48 @@ absl::Status AggregateParameters::ParseCommand(vmsdk::ArgsIterator &itr) {
return absl::OkStatus();
}

// Forward declaration for recursive serialization
void SerializeValueToResp(ValkeyModuleCtx *ctx, const expr::Value &value);

void SerializeArrayToResp(ValkeyModuleCtx *ctx, const expr::Value::Array vec) {
ValkeyModule_ReplyWithArray(ctx, vec->size());
for (const auto &elem : *vec) {
SerializeValueToResp(ctx, elem);
}
}

void SerializeValueToResp(ValkeyModuleCtx *ctx, const expr::Value &value) {
if (value.IsArray()) {
SerializeArrayToResp(ctx, value.GetArray());
} else if (value.IsBool()) {
ValkeyModule_ReplyWithLongLong(ctx, value.GetBool() ? 1 : 0);
} else if (value.IsDouble()) {
auto value_str = value.AsString();
ValkeyModule_ReplyWithStringBuffer(ctx, value_str.data(), value_str.size());
} else if (value.IsString()) {
auto value_sv = value.GetStringView();
ValkeyModule_ReplyWithStringBuffer(ctx, value_sv.data(), value_sv.size());
} else {
// Fallback for Nil and unknown types
ValkeyModule_ReplyWithNull(ctx);
}
}

bool ReplyWithValue(ValkeyModuleCtx *ctx,
data_model::AttributeDataType data_type,
std::string_view name, indexes::IndexerType indexer_type,
const expr::Value &value, int dialect) {
if (value.IsNil()) {
return false;
}

// Handle vector values with RESP array serialization
if (value.IsArray()) {
ValkeyModule_ReplyWithSimpleString(ctx, name.data());
SerializeArrayToResp(ctx, value.GetArray());
return true;
}

if (data_type == data_model::AttributeDataType::ATTRIBUTE_DATA_TYPE_HASH) {
ValkeyModule_ReplyWithSimpleString(ctx, name.data());
auto value_sv = value.AsStringView();
Expand Down
Loading