Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
62f13e1
feat: Implement SORTBY in FT.SEARCH. All previous commits squashed.
AlexFilipImproving Jan 28, 2026
eb99755
Merge branch 'main' into feature/sortby-implementation
AlexFilipImproving Jan 28, 2026
7ddc6bb
Add RequiresCompleteResults override
AlexFilipImproving Jan 28, 2026
047d3b5
Generate new answers file
AlexFilipImproving Jan 28, 2026
3a9d620
Fix some integration tests and generate new batch of answers
AlexFilipImproving Feb 3, 2026
a3c972c
Bug fix and put TODO back
AlexFilipImproving Feb 3, 2026
9fe989e
Regenerate aggregate answers
AlexFilipImproving Feb 3, 2026
739cdeb
Move from ToNumeric to To and regenerate answers
AlexFilipImproving Feb 3, 2026
7b3ba25
Add sortby identifier to json response
AlexFilipImproving Feb 5, 2026
2969d47
Regenerate answers
AlexFilipImproving Feb 5, 2026
4cf2fc6
Merge branch 'main' into feature/sortby-implementation
AlexFilipImproving Feb 5, 2026
ce5d657
Combine ProcessNeighborsForReply and ProcessNonVectorNeighborsForReply
AlexFilipImproving Feb 6, 2026
eb5d778
Pipe sort attribute through pipeline to make sure that it actually sorts
AlexFilipImproving Feb 7, 2026
0bd2368
Fix compatibility tests for SORTBY
AlexFilipImproving Feb 9, 2026
f81ac71
Add WITHSORTKEYS
AlexFilipImproving Feb 9, 2026
0e80f27
Merge branch 'main' into feature/sortby-implementation
AlexFilipImproving Feb 9, 2026
5ec285b
Fix failing test
AlexFilipImproving Feb 9, 2026
5fe9004
Merge branch 'feature/sortby-implementation' of github.com:Bit-Quill/…
AlexFilipImproving Feb 9, 2026
f9c4123
Fix tests
AlexFilipImproving Feb 9, 2026
5dbb631
Fix build errors
AlexFilipImproving Feb 9, 2026
a0378cd
Fix syntax
AlexFilipImproving Feb 9, 2026
20106f2
Remove unnecessary include
AlexFilipImproving Feb 9, 2026
8922e26
Remove secondary sorting
AlexFilipImproving Feb 9, 2026
cb01e7a
Regenerate answers
AlexFilipImproving Feb 10, 2026
9cc9c78
Minor test fix
AlexFilipImproving Feb 10, 2026
ca40b0d
Merge branch 'main' into feature/sortby-implementation
AlexFilipImproving Feb 11, 2026
8930298
Fix compile problem
AlexFilipImproving Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ FT.SEARCH <index> <query>
[TIMEOUT <timeout>]
[PARAMS nargs <name> <value> [ <name> <value> ...]]
[LIMIT <offset> <num>]
[SORTBY <field> [ASC|DESC]]
[DIALECT <dialect>]
```

Expand All @@ -165,6 +166,7 @@ Performs a search of the specified index. The keys which match the query express
- **PARAMS \<count\> \<name1\> \<value1\> \<name2\> \<value2\> ...** (optional): `count` is of the number of arguments, i.e., twice the number of value name pairs. See the query string for usage details.
- **RETURN \<count\> \<field1\> \<field2\> ...** (options): `count` is the number of fields to return. Specifies the fields you want to retrieve from your documents, along with any aliases for the returned values. By default, all fields are returned unless the NOCONTENT option is set, in which case no fields are returned. If num is set to 0, it behaves the same as NOCONTENT.
- **LIMIT \<offset\> \<count\>** (optional): Lets you choose a portion of the result. The first `<offset>` keys are skipped and only a maximum of `<count>` keys are included. The default is LIMIT 0 10, which returns at most 10 keys.
- **SORTBY \<field\> [ASC|DESC]** (optional): Sorts the results by the specified indexed field. The field must be a numeric or tag field that is indexed. ASC sorts in ascending order (default), DESC sorts in descending order. Documents with missing values for the sort field are placed at the end of the results.
- **DIALECT \<dialect\>** (optional): Specifies your dialect. The only supported dialect is 2\.

**RESPONSE**
Expand Down
Binary file modified integration/compatibility/aggregate-answers.pickle.gz
Binary file not shown.
17 changes: 13 additions & 4 deletions integration/compatibility/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def setup_class(cls):
if cls.ANSWER_FILE_NAME is None:
raise NotImplementedError("Subclass must define ANSWER_FILE_NAME")

os.system("docker remove Generate-search || true")
if os.system("docker run --name Generate-search -p 6380:6379 redis/redis-stack-server &") != 0:
if os.system("docker run --rm -d --name Generate-search -p 6380:6379 redis/redis-stack-server") != 0:
print("Failed to start Redis Stack server, please check your Docker setup.")
sys.exit(1)
print("Started Generate-search server")
Expand All @@ -66,7 +65,6 @@ def setup_class(cls):
def teardown_class(cls):
print("Stopping Generate-search server")
os.system("docker stop Generate-search")
os.system("docker remove Generate-search")
print("Dumping ", len(cls.answers), " answers")
with gzip.open(cls.ANSWER_FILE_NAME, "wb") as answer_file:
pickle.dump(cls.answers, answer_file)
Expand Down Expand Up @@ -155,7 +153,7 @@ def checkall(self, dialect, *orig_cmd, **kwargs):
self.checkvec(self, dialect, orig_cmd, kwargs)
self.check(self, dialect, orig_cmd)

'''
'''
def test_bad_numeric_data(self, key_type, dialect):
self.setup_data("bad numbers", key_type)
self.check(dialect, f"ft.search {key_type}_idx1", "@n1:[-inf inf]")
Expand Down Expand Up @@ -460,3 +458,14 @@ def test_aggregate_dyadic_ops(self, key_type, dialect):
"as",
"nn",
)

def test_search_sortby(self, key_type, dialect):
self.setup_data("sortable numbers", key_type)

for sort_key in ["n1", "n2"]:
for direction in ["ASC", "DESC", ""]:
for return_keys in ["", "RETURN 3 @n1 @t1"]:
for wsk in ["", "WITHSORTKEYS"]:
for limit in ["LIMIT 0 5", "LIMIT 2 3", ""]:
self.check(dialect, f"ft.search {key_type}_idx1 * SORTBY {sort_key} {direction} {return_keys} {limit} {wsk}")

58 changes: 47 additions & 11 deletions integration/compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,47 @@ def parse_value(x, key_type):
raise
return result

def unpack_search_result(rs, key_type):
def result_has_sortkeys(rs):
"""Detect if a search result actually contains sort keys by checking the format.

With sort keys: [count, key1, #sortkey1, [fields1], key2, #sortkey2, [fields2], ...]
Without sort keys: [count, key1, [fields1], key2, [fields2], ...]

The sort key is a bytes/string that starts with '#' (or '$' in some Redis versions),
and fields are always a list.
"""
if len(rs) < 3:
return False
# Check if element at index 2 (after count and first key) is a sort key (starts with # or $)
# or a fields list
second_elem = rs[2]
if isinstance(second_elem, list):
# It's a fields list, so no sort keys
return False
if isinstance(second_elem, (bytes, str)):
# Check if it starts with '#' or '$' (sort key indicator)
if isinstance(second_elem, bytes):
return second_elem.startswith(b'#') or second_elem.startswith(b'$')
return second_elem.startswith('#') or second_elem.startswith('$')
return False

def unpack_search_result(rs, key_type, has_sortkeys=False):
rows = []
for (key, value) in [(rs[i],rs[i+1]) for i in range(1, len(rs), 2)]:
#try:
row = {"__key": key}
for i in range(0, len(value), 2):
row[parse_field(value[i], key_type)] = parse_value(value[i+1], key_type)
rows += [row]
#except:
# print("Parse failure: ", key, value)
if has_sortkeys:
# Format: [count, key1, sortkey1, [fields1], key2, sortkey2, [fields2], ...]
# Step by 3 elements at a time
for (key, sortkey, value) in [(rs[i], rs[i+1], rs[i+2]) for i in range(1, len(rs), 3)]:
row = {"__key": key}
for j in range(0, len(value), 2):
row[parse_field(value[j], key_type)] = parse_value(value[j+1], key_type)
rows += [row]
else:
# Format: [count, key1, [fields1], key2, [fields2], ...]
for (key, value) in [(rs[i],rs[i+1]) for i in range(1, len(rs), 2)]:
row = {"__key": key}
for i in range(0, len(value), 2):
row[parse_field(value[i], key_type)] = parse_value(value[i+1], key_type)
rows += [row]
return rows

def unpack_agg_result(rs, key_type):
Expand All @@ -123,7 +154,12 @@ def unpack_agg_result(rs, key_type):

def unpack_result(cmd, key_type, rs, sortkeys):
if "ft.search" in cmd[0].lower():
out = unpack_search_result(rs, key_type)
# Detect if the result actually has sort keys by checking the format,
# not just whether WITHSORTKEYS is in the command. This handles cases
# where the expected result (from pickle) may not have sort keys even
# if the command requested them.
has_sortkeys = result_has_sortkeys(rs)
out = unpack_search_result(rs, key_type, has_sortkeys)
else:
out = unpack_agg_result(rs, key_type)
#
Expand Down Expand Up @@ -242,7 +278,7 @@ def compare_results(expected, results):
sortkeys = [cmd[ix+2+i][1:] for i in range(count)]
elif 'sortby' in cmd:
ix = cmd.index('sortby')
count = int(cmd[ix+1])
count = int(cmd[ix+1]) if cmd[0] != 'ft.search' else 1
# Grab the fields after the count, stripping any leading '@'
sortkeys = [cmd[ix+2+i][1 if cmd[ix+2+i].startswith("@") else 0:] for i in range(count)]
for f in ['asc', 'desc', 'ASC', 'DESC']:
Expand Down
19 changes: 10 additions & 9 deletions src/attribute_data_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,23 @@ bool HashHasRecord(ValkeyModuleKey *key, absl::string_view identifier) {
}

absl::StatusOr<RecordsMap> HashAttributeDataType::FetchAllRecords(
ValkeyModuleCtx *ctx, const std::string &vector_identifier,
ValkeyModuleCtx *ctx, const std::optional<std::string> &vector_identifier,
[[maybe_unused]] ValkeyModuleKey *open_key, absl::string_view key,
const absl::flat_hash_set<absl::string_view> &identifiers) const {
vmsdk::VerifyMainThread();
auto key_str = vmsdk::MakeUniqueValkeyString(key);
auto key_obj =
vmsdk::MakeUniqueValkeyOpenKey(ctx, key_str.get(), VALKEYMODULE_READ);
if (!key_obj) {
return absl::NotFoundError(
absl::StrCat("No such record with key: `", vector_identifier, "`"));
return absl::NotFoundError(absl::StrCat(
"No such record with key: `", vector_identifier.value_or(""), "`"));
}
// Only check for vector_identifier if it's not empty (vector queries)
if (!vector_identifier.empty() &&
!HashHasRecord(key_obj.get(), vector_identifier)) {
if (vector_identifier.has_value() &&
!HashHasRecord(key_obj.get(), vector_identifier.value())) {
return absl::NotFoundError(absl::StrCat("No such record with identifier: `",
vector_identifier, "`"));
vector_identifier.value_or(""),
"`"));
}
vmsdk::UniqueValkeyScanCursor cursor = vmsdk::MakeUniqueValkeyScanCursor();
HashScanCallbackData callback_data{identifiers};
Expand Down Expand Up @@ -177,13 +178,13 @@ absl::StatusOr<vmsdk::UniqueValkeyString> JsonAttributeDataType::GetRecord(
}

absl::StatusOr<RecordsMap> JsonAttributeDataType::FetchAllRecords(
ValkeyModuleCtx *ctx, const std::string &vector_identifier,
ValkeyModuleCtx *ctx, const std::optional<std::string> &vector_identifier,
ValkeyModuleKey *open_key, absl::string_view key,
const absl::flat_hash_set<absl::string_view> &identifiers) const {
// First, validate that a JSON object exists for the given key using the
// vector identifier.
VMSDK_RETURN_IF_ERROR(
GetJsonRecord(ctx, open_key, key, vector_identifier, nullptr));
VMSDK_RETURN_IF_ERROR(GetJsonRecord(ctx, open_key, key,
vector_identifier.value_or(""), nullptr));
RecordsMap key_value_content;
for (const auto &identifier : identifiers) {
auto str = GetRecord(ctx, open_key, key, identifier);
Expand Down
6 changes: 3 additions & 3 deletions src/attribute_data_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class AttributeDataType {
VALKEYMODULE_NOTIFY_EVICTED;
};
virtual absl::StatusOr<RecordsMap> FetchAllRecords(
ValkeyModuleCtx *ctx, const std::string &vector_identifier,
ValkeyModuleCtx *ctx, const std::optional<std::string> &vector_identifier,
ValkeyModuleKey *open_key, absl::string_view key,
const absl::flat_hash_set<absl::string_view> &identifiers) const = 0;
virtual data_model::AttributeDataType ToProto() const = 0;
Expand All @@ -94,7 +94,7 @@ class HashAttributeDataType : public AttributeDataType {
}
inline std::string ToString() const override { return "HASH"; }
absl::StatusOr<RecordsMap> FetchAllRecords(
ValkeyModuleCtx *ctx, const std::string &vector_identifier,
ValkeyModuleCtx *ctx, const std::optional<std::string> &vector_identifier,
ValkeyModuleKey *open_key, absl::string_view key,
const absl::flat_hash_set<absl::string_view> &identifiers) const override;
bool IsProperType(ValkeyModuleKey *key) const override {
Expand All @@ -120,7 +120,7 @@ class JsonAttributeDataType : public AttributeDataType {
}
inline std::string ToString() const override { return "JSON"; }
absl::StatusOr<RecordsMap> FetchAllRecords(
ValkeyModuleCtx *ctx, const std::string &vector_identifier,
ValkeyModuleCtx *ctx, const std::optional<std::string> &vector_identifier,
ValkeyModuleKey *open_key, absl::string_view key,
const absl::flat_hash_set<absl::string_view> &identifiers) const override;
bool IsProperType(ValkeyModuleKey *key) const override {
Expand Down
15 changes: 11 additions & 4 deletions src/commands/commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "fanout.h"
#include "ft_create_parser.h"
#include "ft_search_parser.h"
#include "src/acl.h"
#include "src/commands/ft_search.h"
#include "src/coordinator/metadata_manager.h"
Expand Down Expand Up @@ -129,9 +130,9 @@ absl::Status QueryCommand::Execute(ValkeyModuleCtx *ctx,
uint32_t db_num = ValkeyModule_GetSelectedDb(ctx);
parameters->db_num = db_num;

VMSDK_ASSIGN_OR_RETURN(parameters->index_schema,
SchemaManager::Instance().GetIndexSchema(
db_num, parameters->index_schema_name));
VMSDK_ASSIGN_OR_RETURN(
parameters->index_schema,
schema_manager.GetIndexSchema(db_num, parameters->index_schema_name));
VMSDK_RETURN_IF_ERROR(
vmsdk::ParseParamValue(itr, parameters->parse_vars.query_string));
VMSDK_RETURN_IF_ERROR(parameters->ParseCommand(itr));
Expand Down Expand Up @@ -218,11 +219,17 @@ absl::Status QueryCommand::Execute(ValkeyModuleCtx *ctx,
parameters->index_schema->GetVersion());
}

// Extract sortby parameter if this is a SearchCommand
std::optional<query::SortByParameter> sortby_param = std::nullopt;
if (auto *search_cmd = dynamic_cast<SearchCommand *>(parameters.get())) {
sortby_param = search_cmd->sortby;
}

return query::fanout::PerformSearchFanoutAsync(
ctx, search_targets,
ValkeySearch::Instance().GetCoordinatorClientPool(),
std::move(parameters), ValkeySearch::Instance().GetReaderThreadPool(),
std::move(on_done_callback));
std::move(on_done_callback), sortby_param);
}
return query::SearchAsync(
std::move(parameters), ValkeySearch::Instance().GetReaderThreadPool(),
Expand Down
16 changes: 8 additions & 8 deletions src/commands/ft_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ absl::Status AggregateParameters::ParseCommand(vmsdk::ArgsIterator &itr) {
RealIndexInterface real_index_interface(index_schema);
parse_vars_.index_interface_ = &real_index_interface;

VMSDK_RETURN_IF_ERROR(PreParseQueryString(*this));
VMSDK_RETURN_IF_ERROR(PreParseQueryString());
// Ensure that key is first value if it gets included...
CHECK(AddRecordAttribute("__key", "__key", indexes::IndexerType::kNone) == 0);
auto score_sv = vmsdk::ToStringView(score_as.get());
Expand All @@ -112,7 +112,7 @@ absl::Status AggregateParameters::ParseCommand(vmsdk::ArgsIterator &itr) {
limit.number = std::numeric_limits<uint64_t>::max(); // Override default of
// 10 from search

VMSDK_RETURN_IF_ERROR(PostParseQueryString(*this));
VMSDK_RETURN_IF_ERROR(PostParseQueryString());
VMSDK_RETURN_IF_ERROR(VerifyQueryString(*this));
VMSDK_RETURN_IF_ERROR(ManipulateReturnsClause(*this));

Expand Down Expand Up @@ -175,16 +175,16 @@ absl::StatusOr<std::pair<size_t, size_t>> ProcessNeighborsForProcessing(
size_t key_index = 0, scores_index = 0;

if (parameters.IsVectorQuery()) {
auto identifier =
auto vector_identifier =
parameters.index_schema->GetIdentifier(parameters.attribute_alias);
if (!identifier.ok()) {
if (!vector_identifier.ok()) {
++Metrics::GetStats().query_failed_requests_cnt;
return identifier.status();
return vector_identifier.status();
}

query::ProcessNeighborsForReply(
ctx, parameters.index_schema->GetAttributeDataType(), neighbors,
parameters, identifier.value());
parameters, vector_identifier.value());

if (parameters.load_key) {
key_index = parameters.AddRecordAttribute("__key", "__key",
Expand All @@ -196,9 +196,9 @@ absl::StatusOr<std::pair<size_t, size_t>> ProcessNeighborsForProcessing(
indexes::IndexerType::kNone);
}
} else {
query::ProcessNonVectorNeighborsForReply(
query::ProcessNeighborsForReply(
ctx, parameters.index_schema->GetAttributeDataType(), neighbors,
parameters);
parameters, std::nullopt);

if (parameters.load_key) {
key_index = parameters.AddRecordAttribute("__key", "__key",
Expand Down
Loading
Loading