-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[RFC] Join support in OpenSearch #15185
Description
Is your feature request related to a problem? Please describe
I'm of the opinion that native support on JOIN queries in OpenSearch could be helpful for a lot of users using OpenSearch for Observability use cases. Here is the proposal on how we can do it in OpenSearch making best use of underlying lucene and OpenSearch architecture. If you think that's the bad idea or a good idea, please share your thoughts? Also, share your opinion on overall approach we are headed toward.
Describe the solution you'd like
This is proposal of the execution framework to support JOIN across indexes making use of OpenSearch query DSL. The focus of this document is to design all logical components, their contract and interactions.
Assumptions
- Execution framework will not take any decision related to query planning. It assumes Query planner will provide an optimized execution plan for the JOIN query.
- Integration with existing SQL based libraries for JOIN execution like Datafusion is out of scope (this might be a wrong assumption, but this proposal doesn’t incorporate and compares this alternative, which could be a separate RFC?).
- Query languages like SQL and PPL and their corresponding plugins will make use of the new Query DSL format to benefit from native JOIN support.
Motivation for native support
- Better performance and resource utilization over SQL plugin or any plugin based implementation.
- Address the constraints(check Appendix section) in existing
parent-childjoin andnestedfield type. - No dependence on any other query languages to handle join queries.
Proposal
Design considerations
The execution framework should have well defined logical components. Each component should be modular and must have well defined abstractions and dependencies. For interoperability, it should be possible to rewrite any component in a different language without impacting other modules except the mechanism of communication between them. Each module should be pluggable and extensible to support future use cases like supporting data source other than lucene, different implementation of Join algorithm, serialization protocols etc.
Performance considerations
Performance optimization for JOIN query will heavily depend on the efficiency of both logical and physical plan, which is out of scope of the query execution framework. For execution framework, key performance considerations are:
- Minimizing bytes transferred over network: Reducing the volume of rows and bytes transferred per row across the network.
- Efficient implementation of JOIN algorithms: Implementing JOIN algorithms (e.g., hash, merge, index join) ensuring efficient use of CPU, memory, and I/O resources across the cluster to avoid bottlenecks.
- Resource isolation and circuit breakers: Sandboxing resources for different components to prevent resource contention and ensure stable performance under various loads.
- Concurrent execution of independent subtasks wherever applicable.
- Request caching wherever applicable.
For the rest of the proposal, we will use the following 2 indexes and SQL Join query -
|
left_index: logs message: text status_code: keyword @timestamp: date instance_id: keyword |
right_index: instance_details instance_id: keyword region: keyword created_at: date |
SQL JOIN query
SELECT
id.region,
l.status_code,
COUNT(*) AS status_code_count
FROM
logs l
JOIN
instance_details id
ON
l.instance_id = id.instance_id
WHERE
l.message LIKE '%error%'
AND l.@timestamp >= NOW() - INTERVAL '1 HOUR'
AND id.created_at >= NOW() - INTERVAL '1 YEAR'
GROUP BY
id.region,
l.status_codeSQL query output
| region | status_code | status_code_count |
|---|---|---|
| IAD | 500 | 1 |
| ORD | 404 | 2 |
Proposed Query DSL specification
/logs/_search
{
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
},
{
"match": {
"message": "error"
}
}
]
}
},
"fields": ["instance_id", "status_code"],
"join": {
"right_query": {
"index": "instance_details",
"query": {
"range": {
"created_at": {
"gte": "now-1y"
}
}
},
"fields": ["instance_id", "region"]
},
"type": "inner",
"algorithm": "hash_join", // optional
"condition": {
"left_field": "instance_id",
"right_field": "instance_id",
"comparator": "="
},
"fields": ["region", "status_code"],
"aggs": {
"by_region": {
"terms": {
"field": "region"
},
"aggs": {
"by_status_code": {
"terms": {
"field": "status_code"
},
"aggs": {
"status_code_count": {
"value_count": {
"field": "status_code"
}
}
}
}
}
}
}
}
}Proposed output:
{
"join_hits": {
"total": 3,
"hits": [
{
"fields": {
"status_code": "500",
"region": "IAD"
}
},
{
"fields": {
"status_code": "404",
"region": "ORD"
}
},
{
"fields": {
"status_code": "404",
"region": "ORD"
}
},
],
"aggregations": {
"by_region": {
"buckets": [
{
"key": "IAD",
"doc_count": 1,
"by_status_code": {
"buckets": [
{
"key": "500",
"doc_count": 1
}
]
}
},
{
"key": "ORD",
"doc_count": 2,
"by_status_code": {
"buckets": [
{
"key": "404",
"doc_count": 2
}
]
}
}
]
}
}
}
}Supported Join field type and comparators [P0]:
| Supported Type | Comparator |
|---|---|
| keyword | "=" |
| long/float/double | "<", ">", "<=", ">=", "=" |
| date | "<", ">", "<=", ">=", "=" |
Supported Join type:
Only inner join.
Supported aggregation type:
Bucket aggregation: term, range aggregation.
Metric aggregation: Some of the decomposable aggregation will be supported on joined rows like min, max, count, sum, avg.
Future enhancements:
- We may support other types of join like left outer, right outer etc.
- We may support nested join query, to join across more than 2 indexes, by letting users add a
"join"in the"right_query"section. - Derived fields may optionally be defined both for left and right index query which can be used in join condition. Additionally, we may support derived fields to combine fields from left and right index.
- We may support selection of aggregation buckets from
left_indexandright_indexas key, value of the row to perform join on them. - We may support a complex join condition with support of expression instead of predefined operators.
- We may support other types of supported bucket and metric aggregations in OpenSearch.
- We may support ordering of final join results.
- We may support scoring and retrieving results from left and right index ordered by their score. On combining join results, we can combine their score using either custom scorer or the way we combine BM25 score across shards at coordinator today.
Components
To understand various components, let’s look at a possible logical plan tree for this query -

1. Source Reader
Purpose
Read rows from the data source. It can make use of an index or simply scan of all rows depending on the query passed to it. It doesn’t work on optimizing the query but blindly executes the query passed to it at the time of initialization. It must support pagination and producing rows in batched manner efficiently.
For lucene based implementation, SourceReader will have access to the corresponding shard, which is a lucene index, and will execute the given lucene query. It will make use of customized Collector to collect documents and generate rows with docID (optionally) and desired fields to fetch.
Properties
Type: Lucene
Source identifier: Shard ID
Input
Query: lucene query for lucene based implementation
Pagination info: page size
Fields: fields to fetch
Output
Iterator of matching rows. A row is a tuple of <docID, f1, f2, f3>. Output here is non-serialized version of iterator, for java implementation it will be a new Iterator class object with ability like nextPage() which will fetch all rows in next page.
Note: It is the responsibility of stream to consume this iterator and perform serialization to send it over network if needed.
2. Stream
Purpose
The purpose of stream is to transmit rows in a batched manner from one component to another. It makes other components agnostic of physical location of the rows and makes them independent of each making system modular.
Stream can employ one of the chosen mechanism for RPC like TransportAction based, gRPC stream or Netty stream. It is the responsibility of a Stream to serialize rows at the source and deserialize them at the sink. For encoding and decoding, it can choose from various supported protocols like Json(or Protobuf in future). It ensures order of the rows is maintained while serialization and deserialization. At source, makes use of Iterator interface to fetch next batch of rows, serializes them and transmits them to sink, where it deserializes them and it consumer can make use of same Iterator interface to consume rows.
Properties
RPC mechanism: e.g. gRPC/Transport Actions based.
Protocol: e.g. Proto/Json/Apache Arrow
Sort order key: optional. It doesn’t sort the rows and this property is more like a tag which its consumer can make use of.
Input
Iterator of rows
Output
Iterator of rows
3. Collator
Purpose
It collates one or more streams into one maintaining ordering guarantees of the returned streams. If there are no ordering constraints, it simply interleaves different iterators by ensuring all iterators are consumed at the same rate.
If ordering is specified, there can be 2 cases, input streams are ordered or unordered. If ordered, it can simply merge the streams maintaining the order. If input streams are unordered, it could be a heavy duty operation as it needs to fetch all rows from different streams and sort them.
Join executor can make efficient use of Collator and its ordering guarantees while performing JOIN operation. It can also be used to combine Iterator of multiple shards of same index. However, choosing the right collator and type of join algorithm is part of the query planning.
Properties
Sort key and order
Input
List of Iterators
Output
Iterator
4. Join Executor
Purpose
Perform Join on left and right input streams given a join field. It assumes left stream to be the one with more matching rows and it is query planner which swaps left with right if necessary based on cardinality estimations. As part of query execution plan, join executor is initialized with the provided JOIN algorithm.
Properties
Join algorithm: Hash, SortMerge, Broadcast, Index.
Input
Left Iterator
Right Iterator
Join field
Join condition
Output
Iterator with rows fields
5. Aggregator (Internal)
Purpose
Performs both bucket and metric aggregation on input Iterator. It transforms input Iterator into a dummy ValueSourcewhich BucketCollector of a given Aggregation type can make use of. Instead of DocIDSetIterator, this customized BucketCollector will directly deal with the input Iterator rows and will fetch field values using its dummy ValueSource.
This approach has benefit of using OpenSearch rich Aggregation framework at ease.
Input
AggregationQuery
Iterator of rows
Output
InternalAggregation
In future, InternalAggregation can use similar protocol for serialization which Streams are using to optimize on performance.
6. Aggregator (Reduce)
This is same as Aggregation reduce phase which reduces the results from different shards at coordinator.
Note: Fields and their type definition are available to all components.
Physical plan
Let’s use the same query defined above and create few possible execution plan using components defined above. Here we assume both left and right index are entirely present locally. Distributed physical plan takes care of distributed nature of indexes and shards and builds on top of physical plan.
Hash Join
Join executor will build a hash table on join key from right index (assuming cardinality of join key on right is lower than right and hash table will fit in memory) and will iterate all rows of left index to join.
The hash can be built at the time indexing just like it can be done to optimize cardinality aggregation and build eager_global_ordinals for keyword fields at index time.
Here a row doesn’t need all projection fields but just the join field and matching docIDs. This can save bytes transferred over network. Any subsequent component which needs other projection fields can request them lazily using docIDs and shard identifier. This will be part of the query plan.
Sort-merge Join
It assumes the rows are ordered by join key in both left and right index. This is a memory efficient algorithm as nothing needs to be kept in-memory in Join executor.
A possible physical plan:
Let’s assume left index has 4 shards (SL1, SL2) and right index has 2 shards SR1 and matching rows in left index are more than right index. Collator will ensure the ordering guarantees while collating SL1 and SL2. Stream’s row will just contain instance_idand docIDs till Join. Aggregator will request region and status_code fields from source reader for given docIDs and shardID.

Distributed physical plan
Let’s assume left index has 4 shards (SL1, SL2, SL3, SL4,) and right index has 2 shards (SR1, SR2). SL1, SL2, SR1 on node 1 and SL3, SL4, SR2 on node 2.
A possible distributed physical plan by converting previous physical plan
- Use
Collatorto collateSL1, SL2on node 1 andSL3, SL4on node 2. - Use Broadcast join to broadcast all rows from
SR2to node 1 andSR1to node 2. Alternatively, broadcast hash table fromSR2 and SR1to use Hash join if hash tables are under allowed limit. - Perform Join on node 1 and node 2.
- Aggregate results on node 1 and node 2.
- Reduce results on the coordinator node, let’s assume its node 1.

We can derive some of the simple rules for query planner to begin with:
- Always Collate all local shards of an index before performing JOIN. Downside could be related concurrency as joining smaller shard will result in more number of overall join operations, which can be executed in parallel.
- Always perform JOIN before aggregation. However, it might be possible to do aggregation push down before JOIN for decomposable aggregation functions like min, max, sum, count which can save a lot on bytes transmitted.
- Lazily fetch non-join projection fields.
- Always perform Aggregation(Internal) on same node on which JOIN was performed.
- Just 1 Aggregation(Reduce) and on the coordinator node like it is done today.
- Always perform broadcast join and broadcast smaller table of size less than some threshold.
- Always use sort-merge join when index sort is used on the join key.
- If document routing is based on a JOIN field, make use of it to retrieve only
More details to follow on following sections and on low level design of above components and other join algorithms.
Fault tolerance
Resource isolation and circuit Breakers
Concurrency
Asynchronous execution
Shard allocation algorithm across indexes
Query caching
Appendix
Join using nested and Join field type
When documents have a parent-child/nested relationship, one can use nested fields or a join field, to index and query the documents from a single index.
Caveats
- Documents must have parent child relationship, so its one-to-many relationship. Many-to-many relationship isn't supported and if that's the case, it has to be flattened to one to many.
- Parent child relationship needs to be established at index time and cannot be done at query time, so its less flexible.
- All documents are stored as part of same index, so parent and child documents cannot be scaled independently.
- When getting, deleting, updating a document, a routing value needs to be provided as both, parent and child, are supposed to be part of the same shard.
Benefits
- Good performance: Since parent and child are co-located in the same shard, so queries are faster as no cross index/shard communication is needed. Also, global ordinals are computed for the join field of parent and child, which optimizes the query even further.
- Aligns with OpenSearch philosophy of a shard capable of handling all queries for documents residing in them. So no explicit changes were required to query interfaces except to support a new query types.
I have put down this proposal after few round of discussions and help from @msfroh @jainankitk @harshavamsi @getsaurabh02
Related component
Search:Query Capabilities
Describe alternatives you've considered
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Status
Status