From 996b7aa64189fef8d31152bbf546c48f1c7f4535 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Mon, 11 Jul 2022 17:53:45 +0200 Subject: [PATCH 1/5] Improve EQL sequence circuit breaker precision --- .../execution/sequence/SequenceMatcher.java | 24 ++++++++++++------- .../sequence/CircuitBreakerTests.java | 4 +++- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 550c65da64d3a..5557b3eb70a9c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -82,7 +82,12 @@ public void clear() { private final Stats stats = new Stats(); private boolean headLimit = false; + + // ---------- CIRCUIT BREAKER ----------- + private long totalRamBytesUsed = 0; + private long prevRamBytesUsedInFlight = 0; + private long prevRamBytesUsedCompleted = 0; @SuppressWarnings("rawtypes") public SequenceMatcher(int stages, boolean descending, TimeValue maxSpan, Limit limit, CircuitBreaker circuitBreaker) { @@ -114,9 +119,6 @@ private void trackSequence(Sequence sequence) { * Returns false if the process needs to be stopped. */ boolean match(int stage, Iterable> hits) { - long ramBytesUsedInFlight = ramBytesUsedInFlight(); - long ramBytesUsedCompleted = ramBytesUsedCompleted(); - for (Tuple tuple : hits) { KeyAndOrdinal ko = tuple.v1(); HitReference hit = tuple.v2(); @@ -145,7 +147,7 @@ boolean match(int stage, Iterable> hits) { log.trace("{}", stats); matched = true; } - trackMemory(ramBytesUsedInFlight, ramBytesUsedCompleted); + trackMemory(); return matched; } @@ -305,21 +307,23 @@ public void clear() { clearCircuitBreaker(); } - private long ramBytesUsedInFlight() { + protected long ramBytesUsedInFlight() { return RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys); } - private long ramBytesUsedCompleted() { + protected long ramBytesUsedCompleted() { return RamUsageEstimator.sizeOfCollection(completed); } private void addMemory(long bytes, String label) { - totalRamBytesUsed += bytes; circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label); + totalRamBytesUsed += bytes; } private void clearCircuitBreaker() { circuitBreaker.addWithoutBreaking(-totalRamBytesUsed); + prevRamBytesUsedInFlight = 0; + prevRamBytesUsedCompleted = 0; totalRamBytesUsed = 0; } @@ -328,11 +332,13 @@ private void clearCircuitBreaker() { // expensive, so we just calculate the difference in bytes of the total memory that the matcher's // structure occupy for the in-flight tracking of sequences, as well as for the list of completed // sequences. - private void trackMemory(long prevRamBytesUsedInflight, long prevRamBytesUsedCompleted) { - long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInflight; + private void trackMemory() { + long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInFlight; addMemory(bytesDiff, CB_INFLIGHT_LABEL); + prevRamBytesUsedInFlight += bytesDiff; bytesDiff = ramBytesUsedCompleted() - prevRamBytesUsedCompleted; addMemory(bytesDiff, CB_COMPLETED_LABEL); + prevRamBytesUsedCompleted += bytesDiff; } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index ef56f5c160604..3db882eedccd2 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -187,8 +187,10 @@ public void testCircuitBreakerSequenceMatcher() { assertEquals("sequence_inflight", e.getMessage()); // Break on second iteration - SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, new EqlTestCircuitBreaker(15000)); + EqlTestCircuitBreaker breaker = new EqlTestCircuitBreaker(15000); + SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, breaker); matcher2.match(0, hits); + assertEquals(matcher2.ramBytesUsedInFlight() + matcher2.ramBytesUsedCompleted(), breaker.ramBytesUsed); e = expectThrows(CircuitBreakingException.class, () -> matcher2.match(0, hits)); assertEquals("sequence_inflight", e.getMessage()); From 25c8c99d889b6ba59080bed6f3dc5c2d0f3ce649 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Thu, 14 Jul 2022 14:25:13 +0200 Subject: [PATCH 2/5] Update docs/changelog/88538.yaml --- docs/changelog/88538.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/88538.yaml diff --git a/docs/changelog/88538.yaml b/docs/changelog/88538.yaml new file mode 100644 index 0000000000000..1d0498e59c3d0 --- /dev/null +++ b/docs/changelog/88538.yaml @@ -0,0 +1,6 @@ +pr: 88538 +summary: Improve EQL Sequence circuit breaker precision +area: EQL +type: bug +issues: + - 88300 From 13a307da8b9bddd0143f21eb5d7669e546099506 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Wed, 27 Jul 2022 17:58:24 +0200 Subject: [PATCH 3/5] Add tests and implement review suggestions --- .../execution/sequence/SequenceMatcher.java | 26 +-- .../sequence/CircuitBreakerTests.java | 192 ++++++++++-------- 2 files changed, 118 insertions(+), 100 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 5557b3eb70a9c..2c821646fe9f9 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -83,9 +83,7 @@ public void clear() { private boolean headLimit = false; - // ---------- CIRCUIT BREAKER ----------- - - private long totalRamBytesUsed = 0; + // circuit breaker accounting private long prevRamBytesUsedInFlight = 0; private long prevRamBytesUsedCompleted = 0; @@ -317,14 +315,17 @@ protected long ramBytesUsedCompleted() { private void addMemory(long bytes, String label) { circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label); - totalRamBytesUsed += bytes; + if (CB_COMPLETED_LABEL.equals(label)) { + prevRamBytesUsedCompleted += bytes; + } else { + prevRamBytesUsedInFlight += bytes; + } } private void clearCircuitBreaker() { - circuitBreaker.addWithoutBreaking(-totalRamBytesUsed); + circuitBreaker.addWithoutBreaking(-prevRamBytesUsedInFlight - prevRamBytesUsedCompleted); prevRamBytesUsedInFlight = 0; prevRamBytesUsedCompleted = 0; - totalRamBytesUsed = 0; } // The method is called at the end of match() which is called for every sub query in the sequence query @@ -333,12 +334,13 @@ private void clearCircuitBreaker() { // structure occupy for the in-flight tracking of sequences, as well as for the list of completed // sequences. private void trackMemory() { - long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInFlight; - addMemory(bytesDiff, CB_INFLIGHT_LABEL); - prevRamBytesUsedInFlight += bytesDiff; - bytesDiff = ramBytesUsedCompleted() - prevRamBytesUsedCompleted; - addMemory(bytesDiff, CB_COMPLETED_LABEL); - prevRamBytesUsedCompleted += bytesDiff; + long newRamBytesUsedInFlight = ramBytesUsedInFlight(); + addMemory(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL); + prevRamBytesUsedInFlight = newRamBytesUsedInFlight; + + long newRamBytesUsedCompleted = ramBytesUsedCompleted(); + addMemory(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL); + prevRamBytesUsedCompleted = newRamBytesUsedCompleted; } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index 3db882eedccd2..50400adeda2d4 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -132,27 +132,7 @@ public void fetchHits(Iterable> refs, ActionListener> criteria = new ArrayList<>(stages); - - for (int i = 0; i < stages; i++) { - final int j = i; - criteria.add( - new Criterion<>( - i, - new BoxedQueryRequest( - () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), - "@timestamp", - emptyList(), - emptySet() - ), - keyExtractors, - tsExtractor, - null, - implicitTbExtractor, - false - ) - ); - } + List> criteria = buildCriteria(stages); SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, CIRCUIT_BREAKER); TumblingWindow window = new TumblingWindow(client, criteria, null, matcher); @@ -231,73 +211,8 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction Collections.emptySet() - ); - IndexResolver indexResolver = new IndexResolver( - esClient, - "cluster", - DefaultDataTypeRegistry.INSTANCE, - () -> { return emptySet(); } - ); - EqlSession eqlSession = new EqlSession( - esClient, - eqlConfiguration, - indexResolver, - new PreAnalyzer(), - new PostAnalyzer(), - new EqlFunctionRegistry(), - new Verifier(new Metrics()), - new Optimizer(), - new Planner(), - eqlCircuitBreaker - ); - QueryClient eqlClient = new PITAwareQueryClient(eqlSession); - List> criteria = new ArrayList<>(sequenceFiltersCount); - - for (int i = 0; i < sequenceFiltersCount; i++) { - final int j = i; - criteria.add( - new Criterion<>( - i, - new BoxedQueryRequest( - () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), - "@timestamp", - emptyList(), - emptySet() - ), - keyExtractors, - tsExtractor, - null, - implicitTbExtractor, - false - ) - ); - } - + QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker); + List> criteria = buildCriteria(sequenceFiltersCount); SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker); TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher); window.execute(wrap(p -> {}, ex -> {})); @@ -308,6 +223,107 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction eqlBreakerSettings = Collections.singletonList( + new BreakerSettings( + CIRCUIT_BREAKER_NAME, + CIRCUIT_BREAKER_LIMIT, + CIRCUIT_BREAKER_OVERHEAD, + CircuitBreaker.Type.MEMORY, + CircuitBreaker.Durability.TRANSIENT + ) + ); + + // let the parent circuit breaker fail, setting its limit to zero + Settings settings = Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), 0).build(); + + try ( + CircuitBreakerService service = new HierarchyCircuitBreakerService( + settings, + eqlBreakerSettings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + ESMockClient esClient = new SuccessfulESMockClient(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT); + ) { + CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME); + QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker); + List> criteria = buildCriteria(sequenceFiltersCount); + + SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker); + TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher); + window.execute(wrap(p -> fail(), ex -> assertTrue(ex instanceof CircuitBreakingException))); + } + } + + private List> buildCriteria(int sequenceFiltersCount) { + List> criteria = new ArrayList<>(sequenceFiltersCount); + for (int i = 0; i < sequenceFiltersCount; i++) { + final int j = i; + criteria.add( + new Criterion<>( + i, + new BoxedQueryRequest( + () -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j), + "@timestamp", + emptyList(), + emptySet() + ), + keyExtractors, + tsExtractor, + null, + implicitTbExtractor, + false + ) + ); + } + return criteria; + } + + private QueryClient buildQueryClient(ESMockClient esClient, CircuitBreaker eqlCircuitBreaker) { + EqlConfiguration eqlConfiguration = new EqlConfiguration( + new String[] { "test" }, + org.elasticsearch.xpack.ql.util.DateUtils.UTC, + "nobody", + "cluster", + null, + emptyMap(), + null, + TimeValue.timeValueSeconds(30), + null, + 123, + "", + new TaskId("test", 123), + new EqlSearchTask( + randomLong(), + "transport", + EqlSearchAction.NAME, + "", + null, + emptyMap(), + emptyMap(), + new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)), + TimeValue.timeValueDays(5) + ), + x -> Collections.emptySet() + ); + IndexResolver indexResolver = new IndexResolver(esClient, "cluster", DefaultDataTypeRegistry.INSTANCE, Collections::emptySet); + EqlSession eqlSession = new EqlSession( + esClient, + eqlConfiguration, + indexResolver, + new PreAnalyzer(), + new PostAnalyzer(), + new EqlFunctionRegistry(), + new Verifier(new Metrics()), + new Optimizer(), + new Planner(), + eqlCircuitBreaker + ); + return new PITAwareQueryClient(eqlSession); + } + /** * A type of internal Node client that deals with three types of requests: open PIT, close PIT and SearchRequest. * This class is used by {@code CircuitBreakerTests#testMemoryClearedOnSuccessfulRequest()} and From 1b275db11ec8e1bde877480327e2a9b0852f50e5 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Thu, 28 Jul 2022 14:49:57 +0200 Subject: [PATCH 4/5] Implement review suggestions --- .../execution/sequence/SequenceMatcher.java | 15 ++---- .../sequence/CircuitBreakerTests.java | 46 +++++++++---------- 2 files changed, 25 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java index 2c821646fe9f9..8510f8a2debd0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java @@ -305,23 +305,16 @@ public void clear() { clearCircuitBreaker(); } + // protected for testing purposes protected long ramBytesUsedInFlight() { return RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys); } + // protected for testing purposes protected long ramBytesUsedCompleted() { return RamUsageEstimator.sizeOfCollection(completed); } - private void addMemory(long bytes, String label) { - circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label); - if (CB_COMPLETED_LABEL.equals(label)) { - prevRamBytesUsedCompleted += bytes; - } else { - prevRamBytesUsedInFlight += bytes; - } - } - private void clearCircuitBreaker() { circuitBreaker.addWithoutBreaking(-prevRamBytesUsedInFlight - prevRamBytesUsedCompleted); prevRamBytesUsedInFlight = 0; @@ -335,11 +328,11 @@ private void clearCircuitBreaker() { // sequences. private void trackMemory() { long newRamBytesUsedInFlight = ramBytesUsedInFlight(); - addMemory(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL); + circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL); prevRamBytesUsedInFlight = newRamBytesUsedInFlight; long newRamBytesUsedCompleted = ramBytesUsedCompleted(); - addMemory(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL); + circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL); prevRamBytesUsedCompleted = newRamBytesUsedCompleted; } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index 50400adeda2d4..e0503d476bc3d 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -192,23 +192,14 @@ public void testMemoryClearedOnShardsException() { } private void assertMemoryCleared(int sequenceFiltersCount, BiFunction esClientSupplier) { - final int SEARCH_REQUESTS_EXPECTED_COUNT = 2; - List eqlBreakerSettings = Collections.singletonList( - new BreakerSettings( - CIRCUIT_BREAKER_NAME, - CIRCUIT_BREAKER_LIMIT, - CIRCUIT_BREAKER_OVERHEAD, - CircuitBreaker.Type.MEMORY, - CircuitBreaker.Durability.TRANSIENT - ) - ); + final int searchRequestsExpectedCount = 2; try ( CircuitBreakerService service = new HierarchyCircuitBreakerService( Settings.EMPTY, - eqlBreakerSettings, + breakerSettings(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT); + ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount); ) { CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME); QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker); @@ -223,18 +214,10 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction eqlBreakerSettings = Collections.singletonList( - new BreakerSettings( - CIRCUIT_BREAKER_NAME, - CIRCUIT_BREAKER_LIMIT, - CIRCUIT_BREAKER_OVERHEAD, - CircuitBreaker.Type.MEMORY, - CircuitBreaker.Durability.TRANSIENT - ) - ); + final int sequenceFiltersCount = 2; + final int searchRequestsExpectedCount = 2; // let the parent circuit breaker fail, setting its limit to zero Settings settings = Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), 0).build(); @@ -242,10 +225,10 @@ public void testParentCircuitBreakerOnClean() { try ( CircuitBreakerService service = new HierarchyCircuitBreakerService( settings, - eqlBreakerSettings, + breakerSettings(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - ESMockClient esClient = new SuccessfulESMockClient(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT); + ESMockClient esClient = new SuccessfulESMockClient(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount); ) { CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME); QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker); @@ -257,6 +240,19 @@ public void testParentCircuitBreakerOnClean() { } } + private List breakerSettings() { + List eqlBreakerSettings = Collections.singletonList( + new BreakerSettings( + CIRCUIT_BREAKER_NAME, + CIRCUIT_BREAKER_LIMIT, + CIRCUIT_BREAKER_OVERHEAD, + CircuitBreaker.Type.MEMORY, + CircuitBreaker.Durability.TRANSIENT + ) + ); + return eqlBreakerSettings; + } + private List> buildCriteria(int sequenceFiltersCount) { List> criteria = new ArrayList<>(sequenceFiltersCount); for (int i = 0; i < sequenceFiltersCount; i++) { From a94977265e47faf82c0d9a6798f15a7e8f75bc75 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Fri, 29 Jul 2022 09:20:27 +0200 Subject: [PATCH 5/5] Rename test method --- .../xpack/eql/execution/sequence/CircuitBreakerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index e0503d476bc3d..7787f3e6ef171 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -215,7 +215,7 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction