Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1811,20 +1811,83 @@ private boolean remoteLogEnabledAndRemoteCopyEnabled() {
return remoteLogEnabled() && !config().remoteLogCopyDisable();
}

private boolean isSegmentEligibleForDeletion(Optional<LogSegment> nextSegmentOpt, long upperBoundOffset) {
private boolean isSegmentEligibleForDeletion(LogSegment segment, Optional<LogSegment> nextSegmentOpt, long upperBoundOffset) {
boolean allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isPresent() && logStartOffset >= nextSegmentOpt.get().baseOffset();
// Segments are eligible for deletion when:
// 1. they are uploaded to the remote storage
// 2. log-start-offset was incremented higher than the largest offset in the candidate segment
// 3. they have exceeded the remote retention time (retention.ms), even if upload failed
// 4. only local segments size exceeds the remote retention size (retention.bytes), even if upload failed
// Note: when remote log copy is disabled, we will fall back to local log check using retention.ms/bytes
if (remoteLogEnabledAndRemoteCopyEnabled()) {
return (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) ||
allowDeletionDueToLogStartOffsetIncremented;
// Check if segment is uploaded to remote storage
boolean uploadedToRemote = upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage();

// Check if log-start-offset was incremented
if (uploadedToRemote || allowDeletionDueToLogStartOffsetIncremented) {
return true;
}

if (isSegmentEligibleForDeletionByRemoteRetentionTime(segment)) {
return true;
}

if (isSegmentEligibleForDeletionByRemoteRetentionSize(segment)) {
return true;
}

return false;
} else {
return true;
}
}

private boolean isSegmentEligibleForDeletionByRemoteRetentionTime(LogSegment segment) {
long remoteRetentionMs = config().retentionMs;
if (remoteRetentionMs < 0) {
return false;
}

try {
long currentTime = time().milliseconds();
long segmentTimestamp = segment.largestTimestamp();
if (currentTime - segmentTimestamp > remoteRetentionMs) {
logger.info("Segment {} is eligible for deletion due to remote retention time {}ms breach, " +
"even though it hasn't been uploaded to remote storage yet. " +
"This prevents local disk from filling up when remote storage is unavailable.",
segment, remoteRetentionMs);
return true;
}
} catch (IOException e) {
logger.warn("Failed to read largest timestamp for segment {}, skipping time-based retention check. " +
"Error: {}", segment, e.getMessage());
}
return false;
}

private boolean isSegmentEligibleForDeletionByRemoteRetentionSize(LogSegment segment) {
long remoteRetentionSize = config().retentionSize;
if (remoteRetentionSize < 0) {
return false;
}

long onlyLocalSize;
if (highestOffsetInRemoteStorage() < 0) {
onlyLocalSize = size();
} else {
onlyLocalSize = onlyLocalLogSegmentsSize();
}

if (onlyLocalSize > remoteRetentionSize) {
logger.info("Segment {} is eligible for deletion due to remote retention size {} bytes breach. " +
"Only local segments size {} exceeds the limit, even though it hasn't been uploaded to remote storage yet. " +
"This prevents local disk from filling up when remote storage is unavailable.",
segment, remoteRetentionSize, onlyLocalSize);
return true;
}
return false;
}

/**
* Find segments starting from the oldest until the user-supplied predicate is false.
* A final segment that is empty will never be returned.
Expand Down Expand Up @@ -1855,7 +1918,7 @@ public List<LogSegment> deletableSegments(DeletionCondition predicate) throws IO
if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty() && segment.size() > 0) {
shouldRoll = true;
}
if (predicateResult && !isLastSegmentAndEmpty && isSegmentEligibleForDeletion(nextSegmentOpt, upperBoundOffset)) {
if (predicateResult && !isLastSegmentAndEmpty && isSegmentEligibleForDeletion(segment, nextSegmentOpt, upperBoundOffset)) {
deletable.add(segment);
segmentOpt = nextSegmentOpt;
} else {
Expand Down