Skip to content

Commit b1069f8

Browse files
committed
Make sure to close IndexInput on exception
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
1 parent b04e72a commit b1069f8

1 file changed

Lines changed: 70 additions & 64 deletions

File tree

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -398,75 +398,81 @@ private void uploadBlob(
398398
long expectedChecksum = calculateChecksumOfChecksum(from, src);
399399
long contentLength;
400400
IndexInput indexInput = from.openInput(src, ioContext);
401-
contentLength = indexInput.length();
402-
boolean remoteIntegrityEnabled = false;
403-
if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
404-
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported();
405-
}
406-
lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15);
407-
RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
408-
409-
if (lowPriorityUpload) {
410-
offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply(
411-
new OffsetRangeIndexInputStream(indexInput.clone(), size, position)
412-
);
413-
} else {
414-
offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply(
415-
new OffsetRangeIndexInputStream(indexInput.clone(), size, position)
416-
);
417-
}
418-
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
419-
src,
420-
remoteFileName,
421-
contentLength,
422-
true,
423-
lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL,
424-
offsetRangeInputStreamSupplier,
425-
expectedChecksum,
426-
remoteIntegrityEnabled
427-
);
428-
ActionListener<Void> completionListener = ActionListener.wrap(resp -> {
429-
try {
430-
postUploadRunner.run();
431-
listener.onResponse(null);
432-
} catch (Exception e) {
433-
logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e);
434-
listener.onFailure(e);
435-
}
436-
}, ex -> {
437-
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex);
438-
IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex);
439-
if (corruptIndexException != null) {
440-
listener.onFailure(corruptIndexException);
441-
return;
442-
}
443-
Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class);
444-
if (throwable != null) {
445-
CorruptFileException corruptFileException = (CorruptFileException) throwable;
446-
listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName()));
447-
return;
401+
try {
402+
contentLength = indexInput.length();
403+
boolean remoteIntegrityEnabled = false;
404+
if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) {
405+
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported();
448406
}
449-
listener.onFailure(ex);
450-
});
407+
lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15);
408+
RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
451409

452-
completionListener = ActionListener.runBefore(completionListener, () -> {
453-
try {
454-
remoteTransferContainer.close();
455-
} catch (Exception e) {
456-
logger.warn("Error occurred while closing streams", e);
410+
if (lowPriorityUpload) {
411+
offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply(
412+
new OffsetRangeIndexInputStream(indexInput.clone(), size, position)
413+
);
414+
} else {
415+
offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply(
416+
new OffsetRangeIndexInputStream(indexInput.clone(), size, position)
417+
);
457418
}
458-
});
419+
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
420+
src,
421+
remoteFileName,
422+
contentLength,
423+
true,
424+
lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL,
425+
offsetRangeInputStreamSupplier,
426+
expectedChecksum,
427+
remoteIntegrityEnabled
428+
);
429+
ActionListener<Void> completionListener = ActionListener.wrap(resp -> {
430+
try {
431+
postUploadRunner.run();
432+
listener.onResponse(null);
433+
} catch (Exception e) {
434+
logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e);
435+
listener.onFailure(e);
436+
}
437+
}, ex -> {
438+
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex);
439+
IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex);
440+
if (corruptIndexException != null) {
441+
listener.onFailure(corruptIndexException);
442+
return;
443+
}
444+
Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class);
445+
if (throwable != null) {
446+
CorruptFileException corruptFileException = (CorruptFileException) throwable;
447+
listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName()));
448+
return;
449+
}
450+
listener.onFailure(ex);
451+
});
459452

460-
completionListener = ActionListener.runAfter(completionListener, () -> {
461-
try {
462-
indexInput.close();
463-
} catch (IOException e) {
464-
logger.warn("Error occurred while closing index input", e);
465-
}
466-
});
453+
completionListener = ActionListener.runBefore(completionListener, () -> {
454+
try {
455+
remoteTransferContainer.close();
456+
} catch (Exception e) {
457+
logger.warn("Error occurred while closing streams", e);
458+
}
459+
});
467460

468-
WriteContext writeContext = remoteTransferContainer.createWriteContext();
469-
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener);
461+
completionListener = ActionListener.runAfter(completionListener, () -> {
462+
try {
463+
indexInput.close();
464+
} catch (IOException e) {
465+
logger.warn("Error occurred while closing index input", e);
466+
}
467+
});
468+
469+
WriteContext writeContext = remoteTransferContainer.createWriteContext();
470+
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener);
471+
} catch(Exception e) {
472+
logger.warn("Exception while calling asyncBlobUpload, closing IndexInput to avoid leak");
473+
indexInput.close();
474+
throw e;
475+
}
470476
}
471477

472478
private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException {

0 commit comments

Comments
 (0)