diff --git a/docs/changelog/88538.yaml b/docs/changelog/88538.yaml new file mode 100644 index 0000000000000..1d0498e59c3d0 --- /dev/null +++ b/docs/changelog/88538.yaml @@ -0,0 +1,6 @@ +pr: 88538 +summary: Improve EQL Sequence circuit breaker precision +area: EQL +type: bug +issues: + - 88300 diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 550c65da64d3a..8510f8a2debd0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -82,7 +82,10 @@ public void clear() { private final Stats stats = new Stats(); private boolean headLimit = false; - private long totalRamBytesUsed = 0; + + // circuit breaker accounting + private long prevRamBytesUsedInFlight = 0; + private long prevRamBytesUsedCompleted = 0; @SuppressWarnings("rawtypes") public SequenceMatcher(int stages, boolean descending, TimeValue maxSpan, Limit limit, CircuitBreaker circuitBreaker) { @@ -114,9 +117,6 @@ private void trackSequence(Sequence sequence) { * Returns false if the process needs to be stopped. */ boolean match(int stage, Iterable> hits) { - long ramBytesUsedInFlight = ramBytesUsedInFlight(); - long ramBytesUsedCompleted = ramBytesUsedCompleted(); - for (Tuple tuple : hits) { KeyAndOrdinal ko = tuple.v1(); HitReference hit = tuple.v2(); @@ -145,7 +145,7 @@ boolean match(int stage, Iterable> hits) { log.trace("{}", stats); matched = true; } - trackMemory(ramBytesUsedInFlight, ramBytesUsedCompleted); + trackMemory(); return matched; } @@ -305,22 +305,20 @@ public void clear() { clearCircuitBreaker(); } - private long ramBytesUsedInFlight() { + // protected for testing purposes + protected long ramBytesUsedInFlight() { return RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys); } - private long ramBytesUsedCompleted() { + // protected for testing purposes + protected long ramBytesUsedCompleted() { return RamUsageEstimator.sizeOfCollection(completed); } - private void addMemory(long bytes, String label) { - totalRamBytesUsed += bytes; - circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label); - } - private void clearCircuitBreaker() { - circuitBreaker.addWithoutBreaking(-totalRamBytesUsed); - totalRamBytesUsed = 0; + circuitBreaker.addWithoutBreaking(-prevRamBytesUsedInFlight - prevRamBytesUsedCompleted); + prevRamBytesUsedInFlight = 0; + prevRamBytesUsedCompleted = 0; } // The method is called at the end of match() which is called for every sub query in the sequence query @@ -328,11 +326,14 @@ private void clearCircuitBreaker() { // expensive, so we just calculate the difference in bytes of the total memory that the matcher's // structure occupy for the in-flight tracking of sequences, as well as for the list of completed // sequences. - private void trackMemory(long prevRamBytesUsedInflight, long prevRamBytesUsedCompleted) { - long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInflight; - addMemory(bytesDiff, CB_INFLIGHT_LABEL); - bytesDiff = ramBytesUsedCompleted() - prevRamBytesUsedCompleted; - addMemory(bytesDiff, CB_COMPLETED_LABEL); + private void trackMemory() { + long newRamBytesUsedInFlight = ramBytesUsedInFlight(); + circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL); + prevRamBytesUsedInFlight = newRamBytesUsedInFlight; + + long newRamBytesUsedCompleted = ramBytesUsedCompleted(); + circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL); + prevRamBytesUsedCompleted = newRamBytesUsedCompleted; } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index ef56f5c160604..7787f3e6ef171 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -132,27 +132,7 @@ public void fetchHits(Iterable> refs, ActionListener> criteria = new ArrayList<>(stages); - - for (int i = 0; i < stages; i++) { - final int j = i; - criteria.add( - new Criterion<>( - i, - new BoxedQueryRequest( - () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), - "@timestamp", - emptyList(), - emptySet() - ), - keyExtractors, - tsExtractor, - null, - implicitTbExtractor, - false - ) - ); - } + List> criteria = buildCriteria(stages); SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, CIRCUIT_BREAKER); TumblingWindow window = new TumblingWindow(client, criteria, null, matcher); @@ -187,8 +167,10 @@ public void testCircuitBreakerSequenceMatcher() { assertEquals("sequence_inflight", e.getMessage()); // Break on second iteration - SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, new EqlTestCircuitBreaker(15000)); + EqlTestCircuitBreaker breaker = new EqlTestCircuitBreaker(15000); + SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, breaker); matcher2.match(0, hits); + assertEquals(matcher2.ramBytesUsedInFlight() + matcher2.ramBytesUsedCompleted(), breaker.ramBytesUsed); e = expectThrows(CircuitBreakingException.class, () -> matcher2.match(0, hits)); assertEquals("sequence_inflight", e.getMessage()); @@ -210,92 +192,18 @@ public void testMemoryClearedOnShardsException() { } private void assertMemoryCleared(int sequenceFiltersCount, BiFunction esClientSupplier) { - final int SEARCH_REQUESTS_EXPECTED_COUNT = 2; - List eqlBreakerSettings = Collections.singletonList( - new BreakerSettings( - CIRCUIT_BREAKER_NAME, - CIRCUIT_BREAKER_LIMIT, - CIRCUIT_BREAKER_OVERHEAD, - CircuitBreaker.Type.MEMORY, - CircuitBreaker.Durability.TRANSIENT - ) - ); + final int searchRequestsExpectedCount = 2; try ( CircuitBreakerService service = new HierarchyCircuitBreakerService( Settings.EMPTY, - eqlBreakerSettings, + breakerSettings(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT); + ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount); ) { CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME); - EqlConfiguration eqlConfiguration = new EqlConfiguration( - new String[] { "test" }, - org.elasticsearch.xpack.ql.util.DateUtils.UTC, - "nobody", - "cluster", - null, - emptyMap(), - null, - TimeValue.timeValueSeconds(30), - null, - 123, - "", - new TaskId("test", 123), - new EqlSearchTask( - randomLong(), - "transport", - EqlSearchAction.NAME, - "", - null, - emptyMap(), - emptyMap(), - new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)), - TimeValue.timeValueDays(5) - ), - x -> Collections.emptySet() - ); - IndexResolver indexResolver = new IndexResolver( - esClient, - "cluster", - DefaultDataTypeRegistry.INSTANCE, - () -> { return emptySet(); } - ); - EqlSession eqlSession = new EqlSession( - esClient, - eqlConfiguration, - indexResolver, - new PreAnalyzer(), - new PostAnalyzer(), - new EqlFunctionRegistry(), - new Verifier(new Metrics()), - new Optimizer(), - new Planner(), - eqlCircuitBreaker - ); - QueryClient eqlClient = new PITAwareQueryClient(eqlSession); - List> criteria = new ArrayList<>(sequenceFiltersCount); - - for (int i = 0; i < sequenceFiltersCount; i++) { - final int j = i; - criteria.add( - new Criterion<>( - i, - new BoxedQueryRequest( - () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), - "@timestamp", - emptyList(), - emptySet() - ), - keyExtractors, - tsExtractor, - null, - implicitTbExtractor, - false - ) - ); - } - + QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker); + List> criteria = buildCriteria(sequenceFiltersCount); SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker); TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher); window.execute(wrap(p -> {}, ex -> {})); @@ -306,6 +214,112 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction> criteria = buildCriteria(sequenceFiltersCount); + + SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker); + TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher); + window.execute(wrap(p -> fail(), ex -> assertTrue(ex instanceof CircuitBreakingException))); + } + } + + private List breakerSettings() { + List eqlBreakerSettings = Collections.singletonList( + new BreakerSettings( + CIRCUIT_BREAKER_NAME, + CIRCUIT_BREAKER_LIMIT, + CIRCUIT_BREAKER_OVERHEAD, + CircuitBreaker.Type.MEMORY, + CircuitBreaker.Durability.TRANSIENT + ) + ); + return eqlBreakerSettings; + } + + private List> buildCriteria(int sequenceFiltersCount) { + List> criteria = new ArrayList<>(sequenceFiltersCount); + for (int i = 0; i < sequenceFiltersCount; i++) { + final int j = i; + criteria.add( + new Criterion<>( + i, + new BoxedQueryRequest( + () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), + "@timestamp", + emptyList(), + emptySet() + ), + keyExtractors, + tsExtractor, + null, + implicitTbExtractor, + false + ) + ); + } + return criteria; + } + + private QueryClient buildQueryClient(ESMockClient esClient, CircuitBreaker eqlCircuitBreaker) { + EqlConfiguration eqlConfiguration = new EqlConfiguration( + new String[] { "test" }, + org.elasticsearch.xpack.ql.util.DateUtils.UTC, + "nobody", + "cluster", + null, + emptyMap(), + null, + TimeValue.timeValueSeconds(30), + null, + 123, + "", + new TaskId("test", 123), + new EqlSearchTask( + randomLong(), + "transport", + EqlSearchAction.NAME, + "", + null, + emptyMap(), + emptyMap(), + new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)), + TimeValue.timeValueDays(5) + ), + x -> Collections.emptySet() + ); + IndexResolver indexResolver = new IndexResolver(esClient, "cluster", DefaultDataTypeRegistry.INSTANCE, Collections::emptySet); + EqlSession eqlSession = new EqlSession( + esClient, + eqlConfiguration, + indexResolver, + new PreAnalyzer(), + new PostAnalyzer(), + new EqlFunctionRegistry(), + new Verifier(new Metrics()), + new Optimizer(), + new Planner(), + eqlCircuitBreaker + ); + return new PITAwareQueryClient(eqlSession); + } + /** * A type of internal Node client that deals with three types of requests: open PIT, close PIT and SearchRequest. * This class is used by {@code CircuitBreakerTests#testMemoryClearedOnSuccessfulRequest()} and