Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()}
* upload the entire files contents to Cloud Storage. Delete the temporary file.
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>
* <ol>
* <li>A Resumable Upload Session will be used to upload the file on disk.</li>
Expand Down Expand Up @@ -272,7 +272,7 @@ public static BidiBlobWriteSessionConfig bidiWrite() {
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
return bufferToDiskThenUpload(
Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage"));
Expand All @@ -289,7 +289,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
return bufferToDiskThenUpload(ImmutableList.of(path));
}
Expand All @@ -308,7 +308,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
throws IOException {
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = 9059242302276891867L;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {
Expand Down Expand Up @@ -147,7 +148,8 @@ public Blob create(BlobInfo blobInfo, BlobTargetOption... options) {
.setMd5(EMPTY_BYTE_ARRAY_MD5)
.setCrc32c(EMPTY_BYTE_ARRAY_CRC32C)
.build();
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, objectTargetOptOpts);
}

@Override
Expand All @@ -161,7 +163,8 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
BaseEncoding.base64()
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))
.build();
return internalCreate(updatedInfo, content, 0, content.length, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, content, 0, content.length, objectTargetOptOpts);
}

@Override
Expand All @@ -180,7 +183,8 @@ public Blob create(
Ints.toByteArray(
Hashing.crc32c().hashBytes(content, offset, length).asInt())))
.build();
return internalCreate(updatedInfo, content, offset, length, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, content, offset, length, objectTargetOptOpts);
}

@Override
Expand All @@ -203,12 +207,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op

private Blob internalCreate(
BlobInfo info,
final byte[] content,
final byte @NonNull [] content,
final int offset,
final int length,
BlobTargetOption... options) {
Opts<ObjectTargetOpt> opts) {
Preconditions.checkNotNull(content);
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(info);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();

BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build();
Expand Down Expand Up @@ -1647,4 +1650,48 @@ public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... o
writerFactory.writeSession(this, blobInfo, opts);
return BlobWriteSessions.of(writableByteChannelSession);
}

@Override
public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> opts)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
long size = Files.size(path);
if (size == 0L) {
return internalCreate(info, EMPTY_BYTE_ARRAY, 0, 0, opts);
}
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
StorageObject encode = codecs.blobInfo().encode(updated);

Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
HttpClientContext.from(storageRpc),
getOptions().asRetryDependencies(),
retryAlgorithmManager.idempotent(),
jsonResumableWrite);
HttpContentRange contentRange =
HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size);
ResumableOperationResult<StorageObject> put =
session.put(RewindableContent.of(path), contentRange);
// all exception translation is taken care of down in the JsonResumableSession
StorageObject object = put.getObject();
if (object == null) {
// if by some odd chance the put didn't get the StorageObject, query for it
ResumableOperationResult<StorageObject> query = session.query();
object = query.getObject();
}
return codecs.blobInfo().decode(object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void allDefaults() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bufferToTempDirThenUpload() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand All @@ -78,6 +77,12 @@ public void bufferToTempDirThenUpload() throws Exception {
.toBuilder()
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
.build();
} else if (transport == Transport.HTTP) {
options =
((HttpStorageOptions) storage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
.build();
}
assertWithMessage("unable to resolve options").that(options).isNotNull();
//noinspection DataFlowIssue
Expand Down