Skip to content

Commit b53e090

Browse files
ivosonLuciferYang
authored andcommitted
[SPARK-56302][CORE] Free task result memory eagerly during serialization on executor
### What changes were proposed in this pull request? Eagerly null intermediate objects during task result serialization in `Executor` to reduce peak heap memory usage. During result serialization in `TaskRunner.run()`, three representations of the result coexist on the heap simultaneously: 1. `value` — the raw task result object from `task.run()` 2. `valueByteBuffer` — first serialization of the result 3. `serializedDirectResult` — second serialization wrapping the above into a `DirectTaskResult` Each becomes dead as soon as the next is produced, but none were released. This PR nulls each reference as soon as it's no longer needed: - `value = null` after serializing into `valueByteBuffer` - `valueByteBuffer = null` and `directResult = null` after re-serializing into `serializedDirectResult` All changes are confined to the executor side within `TaskRunner.run()`, where the variables are local and not exposed to other components. ### Why are the changes needed? For tasks returning large results (e.g. `collect()` on large datasets), the redundant copies can roughly triple peak memory during serialization, increasing GC pressure or causing executor OOM. Eagerly freeing dead references lets the GC reclaim memory sooner. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.1.88 Closes #55110 from ivoson/free-result-memory-asap. Lead-authored-by: Tengfei Huang <tengfei.huang@databricks.com> Co-authored-by: Tengfei Huang <tengfei.h@gmail.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 3851cb5) Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent a3b502e commit b53e090

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import org.apache.spark.status.api.v1.ThreadStackTrace
5757
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
5858
import org.apache.spark.util._
5959
import org.apache.spark.util.ArrayImplicits._
60+
import org.apache.spark.util.io.ChunkedByteBuffer
6061

6162
private[spark] object IsolatedSessionState {
6263
// Authoritative store for all isolated sessions. Sessions are put here when created
@@ -862,7 +863,7 @@ private[spark] class Executor(
862863
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
863864
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
864865
}
865-
val value = Utils.tryWithSafeFinally {
866+
var value: Any = Utils.tryWithSafeFinally {
866867
val res = task.run(
867868
taskAttemptId = taskId,
868869
attemptNumber = taskDescription.attemptNumber,
@@ -917,7 +918,9 @@ private[spark] class Executor(
917918

918919
val resultSer = env.serializer.newInstance()
919920
val beforeSerializationNs = System.nanoTime()
920-
val valueByteBuffer = SerializerHelper.serializeToChunkedBuffer(resultSer, value)
921+
var valueByteBuffer: ChunkedByteBuffer = SerializerHelper.serializeToChunkedBuffer(
922+
resultSer, value)
923+
value = null // Allow GC to reclaim the raw task result
921924
val afterSerializationNs = System.nanoTime()
922925

923926
// Deserialization happens in two parts: first, we deserialize a Task object, which
@@ -961,10 +964,15 @@ private[spark] class Executor(
961964
val accumUpdates = task.collectAccumulatorUpdates()
962965
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
963966
// TODO: do not serialize value twice
964-
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
967+
var directResult: DirectTaskResult[Any] = new DirectTaskResult(
968+
valueByteBuffer, accumUpdates, metricPeaks)
965969
// try to estimate a reasonable upper bound of DirectTaskResult serialization
966970
val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult,
967971
valueByteBuffer.size + accumUpdates.size * 32 + metricPeaks.length * 8)
972+
// Allow GC to reclaim the first serialization buffer. Both references must be
973+
// nulled: the local var and the field inside directResult point to the same object.
974+
valueByteBuffer = null
975+
directResult = null
968976
val resultSize = serializedDirectResult.size
969977
executorSource.METRIC_RESULT_SIZE.inc(resultSize)
970978

0 commit comments

Comments
 (0)