Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public ResultRetryAlgorithm<?> getFor(CreateNotificationConfigRequest req) {
}

public ResultRetryAlgorithm<?> getFor(DeleteBucketRequest req) {
return retryStrategy.getNonidempotentHandler();
return retryStrategy.getIdempotentHandler();
}

public ResultRetryAlgorithm<?> getFor(DeleteHmacKeyRequest req) {
return retryStrategy.getNonidempotentHandler();
return retryStrategy.getIdempotentHandler();
}

public ResultRetryAlgorithm<?> getFor(DeleteNotificationConfigRequest req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,32 @@ public Blob create(
@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
requireNonNull(blobInfo, "blobInfo must be non null");
InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES));

Opts<ObjectTargetOpt> optsWithDefaults = Opts.unwrap(options).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
.build();

try (UnbufferedWritableByteChannel c = session.open()) {
ByteStreams.copy(Channels.newChannel(inputStreamParam), c);
}
ApiFuture<WriteObjectResponse> responseApiFuture = session.getResult();
return this.getBlob(responseApiFuture);
} catch (IOException | ApiException e) {
throw StorageException.coalesce(e);
}
}
Expand Down Expand Up @@ -549,17 +573,20 @@ public boolean delete(String bucket, BucketSourceOption... options) {
DeleteBucketRequest.Builder builder =
DeleteBucketRequest.newBuilder().setName(bucketNameCodec.encode(bucket));
DeleteBucketRequest req = opts.deleteBucketsRequest().apply(builder).build();
try {
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.deleteBucketCallable().call(req, merge),
Decoder.identity());
return true;
} catch (StorageException e) {
return false;
}
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Boolean.TRUE.equals(
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
storageClient.deleteBucketCallable().call(req, merge);
return true;
} catch (NotFoundException e) {
return false;
}
},
Decoder.identity()));
}

@Override
Expand Down Expand Up @@ -760,11 +787,19 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable(),
getOptions(),
retryAlgorithmManager.idempotent(),
() -> startResumableWrite(grpcCallContext, req),
() -> wrapped,
hasher);
}

Expand Down
Loading