From 3b5b3c9b9855a65bb8d64e62a8cabd46fe3f082f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 26 Oct 2021 12:45:31 +0200 Subject: [PATCH] Limit CS Update Task Description Size (#79443) When working with very large clusters, there's various avenues to create very large batches of tasks that can render as strings with O(MB) length. Since we only use these strings for logging and there is limited value in knowing the exact task descriptions of large numbers of tasks it seems reasonable to put a hard limit on the logging here to prevent hard to work with logs and save some memory in extreme cases. --- .../cluster/ClusterStateTaskExecutor.java | 8 +++++- .../cluster/service/TaskBatcher.java | 25 +++++++++++++++---- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index e93b5cf77592e..f7d4e1dc82c48 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.cluster; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import java.util.IdentityHashMap; @@ -46,7 +47,12 @@ default void clusterStatePublished(ClusterStatePublicationEvent clusterStatePubl * This allows groupd task description but the submitting source. */ default String describeTasks(List tasks) { - return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator); + final StringBuilder output = new StringBuilder(); + Strings.collectionToDelimitedStringWithLimit( + (Iterable) () -> tasks.stream().map(Object::toString).filter(s -> s.isEmpty() == false).iterator(), + ", ", "", "", 1024, output + ); + return output.toString(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java index bc93d9bca9482..78007317db80a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.core.TimeValue; @@ -131,14 +132,28 @@ void runIfNotProcessed(BatchedTask updateTask) { } if (toExecute.isEmpty() == false) { - final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> { + run(updateTask.batchingKey, toExecute, buildTasksDescription(updateTask, toExecute, processTasksBySource)); + } + } + } + + private static final int MAX_TASK_DESCRIPTION_CHARS = 8 * 1024; + + private String buildTasksDescription(BatchedTask updateTask, + List toExecute, + Map> processTasksBySource) { + final StringBuilder output = new StringBuilder(); + Strings.collectionToDelimitedStringWithLimit( + (Iterable) () -> processTasksBySource.entrySet().stream().map(entry -> { String tasks = updateTask.describeTasks(entry.getValue()); return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; - }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); - - run(updateTask.batchingKey, toExecute, tasksSummary); - } + }).filter(s -> s.isEmpty() == false).iterator(), + ", ", "", "", MAX_TASK_DESCRIPTION_CHARS, output + ); + if (output.length() > MAX_TASK_DESCRIPTION_CHARS) { + output.append(" (").append(toExecute.size()).append(" tasks in total)"); } + return output.toString(); } /**