Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
- Use env variable (OPENSEARCH_FIPS_MODE) to enable opensearch to run in FIPS enforced mode instead of checking for existence of bcFIPS jars ([#20625](https://github.com/opensearch-project/OpenSearch/pull/20625))
- Update streaming flag to use search request context ([#20530](https://github.com/opensearch-project/OpenSearch/pull/20530))

### Fixed
- Fix flaky test failures in ShardsLimitAllocationDeciderIT ([#20375](https://github.com/opensearch-project/OpenSearch/pull/20375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,10 @@ public final SearchRequest getRequest() {
return request;
}

protected SearchRequestContext getSearchRequestContext() {
return searchRequestContext;
}

protected final SearchResponse buildSearchResponse(
InternalSearchResponse internalSearchResponse,
ShardSearchFailure[] failures,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class SearchRequestContext {
private final SearchRequest searchRequest;
private final LinkedBlockingQueue<TaskResourceInfo> phaseResourceUsage;
private final Supplier<TaskResourceInfo> taskResourceUsageSupplier;
private boolean streamingRequest;

SearchRequestContext(
final SearchRequestOperationsListener searchRequestOperationsListener,
Expand Down Expand Up @@ -156,6 +157,14 @@ void setSuccessfulSearchShardIndices(Set<Index> successfulSearchShardIndices) {
public Set<Index> getSuccessfulSearchShardIndices() {
return successfulSearchShardIndices;
}

void setStreamingRequest(boolean streamingRequest) {
this.streamingRequest = streamingRequest;
}

public boolean isStreamingRequest() {
return streamingRequest;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ void successfulShardExecution(SearchShardIterator shardsIt) {
try {
shardResultsConsumed.set(true);
if (streamResultsReceived.get() == streamResultsConsumeCallback.get()) {
if (streamResultsReceived.get() > 0) {
getSearchRequestContext().setStreamingRequest(true);
}
getLogger().debug("Stream results consumption has called back, let shard consumption callback trigger onPhaseDone");
onPhaseDone();
} else {
Expand All @@ -180,6 +183,7 @@ private void successfulStreamExecution() {
try {
if (streamResultsReceived.get() == streamResultsConsumeCallback.incrementAndGet()) {
if (shardResultsConsumed.get()) {
getSearchRequestContext().setStreamingRequest(true);
getLogger().debug("Stream consumption trigger onPhaseDone");
onPhaseDone();
}
Expand Down
Loading