Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -373,17 +374,31 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
}

@Override
public DeleteResult delete() {
public DeleteResult delete() throws IOException {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();
return getFutureValue(future);
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();
getFutureValue(future);
}

private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,116 @@ public void onFailure(Exception e) {
assertEquals(simulatedFailure, exceptionRef.get().getCause());
}

public void testDeleteWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock the list operation to block indefinitely
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete);
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated error");
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onError(simulatedError);
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IOException e = expectThrows(IOException.class, blobContainer::delete);
assertEquals("Failed to list objects for deletion", e.getMessage());
assertEquals(simulatedError, e.getCause());
}

public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock deleteObjects to block indefinitely
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
});

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated delete error");
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(simulatedError);
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Failed to delete blobs " + blobNames, e.getMessage());
assertEquals(simulatedError, e.getCause().getCause());
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down