Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lang/java/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,5 +250,10 @@
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DataFileWriter<D> implements Closeable, Flushable {

private byte[] sync; // 16 random bytes
private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
private Function<OutputStream, BinaryEncoder> initEncoder = out -> new EncoderFactory().directBinaryEncoder(out,
null);

private boolean isOpen;
private Codec codec;
Expand Down Expand Up @@ -129,6 +132,17 @@ public DataFileWriter<D> setSyncInterval(int syncInterval) {
return this;
}

/**
* Allows setting a different encoder than the default DirectBinaryEncoder.
*
* @param initEncoderFunc Function to create a binary encoder
* @return this DataFileWriter
*/
public DataFileWriter<D> setEncoder(Function<OutputStream, BinaryEncoder> initEncoderFunc) {
this.initEncoder = initEncoderFunc;
return this;
}

/** Open a new file for data matching a schema with a random sync. */
public DataFileWriter<D> create(Schema schema, File file) throws IOException {
SyncableFileOutputStream sfos = new SyncableFileOutputStream(file);
Expand Down Expand Up @@ -241,7 +255,7 @@ private void init(OutputStream outs) throws IOException {
this.vout = efactory.directBinaryEncoder(out, null);
dout.setSchema(schema);
buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1));
this.bufOut = efactory.directBinaryEncoder(buffer, null);
this.bufOut = this.initEncoder.apply(buffer);
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;

/**
* An {@link Encoder} for Avro's binary encoding that does not buffer output.
* <p/>
* This encoder does not buffer writes in contrast to
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when:
* The buffering in BufferedBinaryEncoder is not desired because you buffer a
* different level or the Encoder is very short-lived.
* </p>
* The BlockingDirectBinaryEncoder will encode the number of bytes of the Map
* and Array blocks. This will allow to postpone the decoding, or skip over it
* at all.
* <p/>
* To construct, use
* {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)}
* <p/>
* {@link BlockingDirectBinaryEncoder} instances returned by this method are not
* thread-safe
*
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
private final ArrayList<BufferOutputStream> buffers;

private final ArrayDeque<OutputStream> stashedBuffers;

private int depth = 0;

private final ArrayDeque<Long> blockItemCounts;

/**
* Create a writer that sends its output to the underlying stream
* <code>out</code>.
*
* @param out The Outputstream to write to
*/
public BlockingDirectBinaryEncoder(OutputStream out) {
super(out);
this.buffers = new ArrayList<>();
this.stashedBuffers = new ArrayDeque<>();
this.blockItemCounts = new ArrayDeque<>();
}

private void startBlock() {
stashedBuffers.push(out);
if (this.buffers.size() <= depth) {
this.buffers.add(new BufferOutputStream());
}
BufferOutputStream buf = buffers.get(depth);
buf.reset();
this.depth += 1;
this.out = buf;
}

private void endBlock() {
if (depth == 0) {
throw new RuntimeException("Called endBlock, while not buffering a block");
}
this.depth -= 1;
out = stashedBuffers.pop();
BufferOutputStream buffer = this.buffers.get(depth);
long blockItemCount = blockItemCounts.pop();
if (blockItemCount > 0) {
try {
// Make it negative, so the reader knows that the number of bytes is coming
writeLong(-blockItemCount);
writeLong(buffer.size());
writeFixed(buffer.toBufferWithoutCopy());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public void setItemCount(long itemCount) throws IOException {
blockItemCounts.push(itemCount);
}

@Override
public void writeArrayStart() throws IOException {
startBlock();
}

@Override
public void writeArrayEnd() throws IOException {
endBlock();
// Writes another zero to indicate that this is the last block
super.writeArrayEnd();
}

@Override
public void writeMapStart() throws IOException {
startBlock();
}

@Override
public void writeMapEnd() throws IOException {
endBlock();
// Writes another zero to indicate that this is the last block
super.writeMapEnd();
}

private static class BufferOutputStream extends ByteArrayOutputStream {
BufferOutputStream() {
}

ByteBuffer toBufferWithoutCopy() {
return ByteBuffer.wrap(buf, 0, count);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,28 @@
* This encoder does not buffer writes, and as a result is slower than
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when
* the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is
* very short lived.
* very short-lived.
* <p/>
* To construct, use
* {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
* <p/>
* DirectBinaryEncoder is not thread-safe
*
*
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class DirectBinaryEncoder extends BinaryEncoder {
private OutputStream out;
protected OutputStream out;
// the buffer is used for writing floats, doubles, and large longs.
private final byte[] buf = new byte[12];

/**
* Create a writer that sends its output to the underlying stream
* <code>out</code>.
**/
DirectBinaryEncoder(OutputStream out) {
protected DirectBinaryEncoder(OutputStream out) {
configure(out);
}

Expand All @@ -69,8 +69,8 @@ public void writeBoolean(boolean b) throws IOException {
}

/*
* buffering is slower for ints that encode to just 1 or two bytes, and and
* faster for large ones. (Sun JRE 1.6u22, x64 -server)
* buffering is slower for ints that encode to just 1 or two bytes, and faster
* for large ones. (Sun JRE 1.6u22, x64 -server)
*/
@Override
public void writeInt(int n) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,49 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse)
}
}

/**
* Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the
* OutputStream provided as the destination for written data. If <i>reuse</i> is
* provided, an attempt will be made to reconfigure <i>reuse</i> rather than
* construct a new instance, but this is not guaranteed, a new instance may be
* returned.
* <p/>
* The {@link BinaryEncoder} implementation returned does not buffer its output,
* calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to
* be flushed.
* <p/>
* The {@link BlockingDirectBinaryEncoder} will write the block sizes for the
* arrays and maps so efficient skipping can be done.
* <p/>
* Performance of unbuffered writes can be significantly slower than buffered
* writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns
* BinaryEncoder instances that are tuned for performance but may buffer output.
* The unbuffered, 'direct' encoder may be desired when buffering semantics are
* problematic, or if the lifetime of the encoder is so short that the buffer
* would not be useful.
* <p/>
* {@link BinaryEncoder} instances returned by this method are not thread-safe.
*
* @param out The OutputStream to initialize to. Cannot be null.
* @param reuse The BinaryEncoder to <i>attempt</i> to reuse given the factory
* configuration. A BinaryEncoder implementation may not be
* compatible with reuse, causing a new instance to be returned. If
* null, a new instance is returned.
* @return A BinaryEncoder that uses <i>out</i> as its data output. If
* <i>reuse</i> is null, this will be a new instance. If <i>reuse</i> is
* not null, then the returned instance may be a new instance or
* <i>reuse</i> reconfigured to use <i>out</i>.
* @see DirectBinaryEncoder
* @see Encoder
*/
public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) {
return new BlockingDirectBinaryEncoder(out);
} else {
return ((DirectBinaryEncoder) reuse).configure(out);
}
}

/**
* Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
* provided as the destination for written data. If <i>reuse</i> is provided, an
Expand Down
45 changes: 30 additions & 15 deletions lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.avro.file.CodecFactory;
Expand All @@ -40,7 +42,9 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.RandomData;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -93,22 +97,32 @@ private File makeFile(CodecFactory codec) {
@ParameterizedTest
@MethodSource("codecs")
public void runTestsInOrder(CodecFactory codec) throws Exception {
LOG.info("Running with codec: " + codec);
testGenericWrite(codec);
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
testGenericAppend(codec);
testReadWithHeader(codec);
testFSync(codec, false);
testFSync(codec, true);
// Run for both encoders, but the MethodSource didn't really like it,
// so it is just a loop within the test
List<Function<OutputStream, BinaryEncoder>> encoders = new ArrayList<>();
encoders.add(b -> new EncoderFactory().directBinaryEncoder(b, null));
encoders.add(b -> new EncoderFactory().blockingDirectBinaryEncoder(b, null));

for (Function<OutputStream, BinaryEncoder> encoder : encoders) {
LOG.info("Running with codec: {}", codec);
testGenericWrite(codec, encoder);
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
testGenericAppend(codec, encoder);
testReadWithHeader(codec);
testFSync(codec, encoder, false);
testFSync(codec, encoder, true);
}
}

private void testGenericWrite(CodecFactory codec) throws IOException {
private void testGenericWrite(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc)
throws IOException {
DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>()).setSyncInterval(100);
if (codec != null) {
writer.setCodec(codec);
}
writer.setEncoder(encoderFunc);
writer.create(SCHEMA, makeFile(codec));
try {
int count = 0;
Expand Down Expand Up @@ -210,10 +224,12 @@ private void testSyncDiscovery(CodecFactory codec) throws IOException {
}
}

private void testGenericAppend(CodecFactory codec) throws IOException {
private void testGenericAppend(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc)
throws IOException {
File file = makeFile(codec);
long start = file.length();
try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>()).appendTo(file)) {
writer.setEncoder(encoderFunc);
for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
writer.append(datum);
}
Expand Down Expand Up @@ -254,11 +270,8 @@ private void testReadWithHeader(CodecFactory codec) throws IOException {
assertEquals(validPos, sin.tell(), "Should not move from sync point on reopen");
assertNotNull(readerFalse.next(), "Should be able to reopen at sync point");
}

}

}

}

@Test
Expand Down Expand Up @@ -306,8 +319,10 @@ public void testFlushCount() throws IOException {
assertTrue(out.flushCount < currentCount && out.flushCount >= flushCounter);
}

private void testFSync(CodecFactory codec, boolean useFile) throws IOException {
private void testFSync(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc, boolean useFile)
throws IOException {
try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.setEncoder(encoderFunc);
writer.setFlushOnEveryBlock(false);
TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
if (useFile) {
Expand Down
Loading