Skip to content

Commit 4ecd272

Browse files
voonhousyihua
authored andcommitted
fix: ensure that InlineFS is seeked to the correct offset upon init (#14178)
1 parent bfc236b commit 4ecd272

4 files changed

Lines changed: 298 additions & 0 deletions

File tree

hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/inline/InLineFileSystem.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,29 @@
4343
* - Reading an inlined file at a given offset, length, read it out as if it were an independent file of that length
4444
* - Inlined path is of the form "inlinefs:///path/to/outer/file/<outer_file_scheme>/?start_offset=<start_offset>&length=<length>
4545
* <p>
46+
* Example:
47+
* <pre>
48+
* inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
49+
* .col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
50+
* </pre>
51+
* <p>
52+
* In this example:
53+
* <ul>
54+
* <li>The outer file path is: {@code tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/
55+
* column_stats/.col-stats-0001-0_20250429145946675.log.1_1-120-382}</li>
56+
* <li>The outer file scheme is: {@code local} (representing local file system)</li>
57+
* <li>The inline content starts at byte offset: {@code 8036}</li>
58+
* <li>The inline content length is: {@code 6959} bytes</li>
59+
* </ul>
60+
* <p>
61+
* When this path is opened, the file system will:
62+
* <ol>
63+
* <li>Extract the outer file path and scheme from the URL</li>
64+
* <li>Open the outer file using the appropriate file system</li>
65+
* <li>Seek to the start_offset (8036 in the example)</li>
66+
* <li>Present the next 'length' bytes (6959 in the example) as if they were an independent file</li>
67+
* </ol>
68+
* <p>
4669
* TODO: The reader/writer may try to use relative paths based on the inlinepath and it may not work. Need to handle
4770
* this gracefully eg. the parquet summary metadata reading. TODO: If this shows promise, also support directly writing
4871
* the inlined file to the underneath file without buffer

hudi-io/src/main/java/org/apache/hudi/storage/inline/InLineFSUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,18 @@ public class InLineFSUtils {
3838
* Get the InlineFS Path for a given schema and its Path.
3939
* <p>
4040
* Examples:
41+
* <pre>
4142
* Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
4243
* Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
4344
*
45+
* Real-world example:
46+
* Input Path: tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
47+
* .col-stats-0001-0_20250429145946675.log.1_1-120-382
48+
* origScheme: local, startOffset = 8036, length = 6959
49+
* Output: "inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
50+
* .col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959"
51+
* </pre>
52+
*
4453
* @param outerPath The outer file path
4554
* @param origScheme The file schema
4655
* @param inLineStartOffset Start offset for the inline file

hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,39 @@
1717

1818
import java.io.IOException;
1919

20+
/**
21+
* A seekable data input stream that reads inline content from an outer file at a specific offset and length.
22+
* <p>
23+
* This class is used when reading inline files stored within larger files (e.g., log blocks embedded in Hudi log files).
24+
* It provides a view of a segment of the underlying stream as if it were an independent file.
25+
* <p>
26+
* Example InlineFS URL:
27+
* <pre>
28+
* inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
29+
* .col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
30+
* </pre>
31+
* <p>
32+
* Key behaviors:
33+
* <ul>
34+
* <li>Upon initialization, the underlying stream is immediately seeked to the start offset to ensure correct positioning</li>
35+
* <li>{@link #getPos()} returns positions relative to the start offset (0-based from the inline content start)</li>
36+
* <li>{@link #seek(long)} accepts positions relative to the start offset and translates them to absolute positions</li>
37+
* <li>Attempting to seek beyond the length throws an {@link IOException}</li>
38+
* </ul>
39+
*/
2040
public class InlineSeekableDataInputStream
2141
extends TrinoSeekableDataInputStream
2242
{
2343
private final long startOffset;
2444
private final long length;
2545

2646
public InlineSeekableDataInputStream(TrinoInputStream stream, long startOffset, long length)
47+
throws IOException
2748
{
2849
super(stream);
2950
this.startOffset = startOffset;
3051
this.length = length;
52+
stream.seek(startOffset);
3153
}
3254

3355
@Override
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.hudi.io;
15+
16+
import io.trino.filesystem.TrinoInputStream;
17+
import org.junit.jupiter.api.Test;
18+
19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
24+
25+
class TestInlineSeekableDataInputStream
26+
{
27+
public static final String CONST_STR_FOR_BYTES = "0123456789ABCDEFGHIJ";
28+
29+
@Test
30+
void testStreamIsSeekableToStartOffsetUponInitialization()
31+
throws IOException
32+
{
33+
// Create a test stream with data at various positions
34+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
35+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
36+
37+
long startOffset = 5;
38+
long length = 10;
39+
40+
// Initialize InlineSeekableDataInputStream
41+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
42+
43+
// Verify the stream was seeked to the startOffset during initialization
44+
assertThat(stream.getPosition()).isEqualTo(startOffset);
45+
46+
// Verify getPos() returns 0 (relative to startOffset)
47+
assertThat(inlineStream.getPos()).isEqualTo(0);
48+
}
49+
50+
@Test
51+
void testGetPosReturnsRelativePosition()
52+
throws IOException
53+
{
54+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
55+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
56+
57+
long startOffset = 3;
58+
long length = 10;
59+
60+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
61+
62+
// Initially at position 0 (relative)
63+
assertThat(inlineStream.getPos()).isEqualTo(0);
64+
65+
// Seek to position 5 (relative)
66+
inlineStream.seek(5);
67+
assertThat(inlineStream.getPos()).isEqualTo(5);
68+
69+
// Verify underlying stream is at startOffset + 5
70+
assertThat(stream.getPosition()).isEqualTo(startOffset + 5);
71+
}
72+
73+
@Test
74+
void testSeekWithinBounds()
75+
throws IOException
76+
{
77+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
78+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
79+
80+
long startOffset = 2;
81+
long length = 8;
82+
83+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
84+
85+
// Seek to the middle
86+
inlineStream.seek(4);
87+
assertThat(inlineStream.getPos()).isEqualTo(4);
88+
assertThat(stream.getPosition()).isEqualTo(startOffset + 4);
89+
90+
// Seek to the end (length is exclusive, so seeking to length should work)
91+
inlineStream.seek(8);
92+
assertThat(inlineStream.getPos()).isEqualTo(8);
93+
assertThat(stream.getPosition()).isEqualTo(startOffset + 8);
94+
}
95+
96+
@Test
97+
void testSeekPastLengthThrowsException()
98+
throws IOException
99+
{
100+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
101+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
102+
103+
long startOffset = 5;
104+
long length = 10;
105+
106+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
107+
108+
// Attempting to seek past the length should throw IOException
109+
assertThatThrownBy(() -> inlineStream.seek(11))
110+
.isInstanceOf(IOException.class)
111+
.hasMessageContaining("Attempting to seek past inline content")
112+
.hasMessageContaining("position to seek to is 11")
113+
.hasMessageContaining("but the length is 10");
114+
}
115+
116+
@Test
117+
void testReadDataAtCorrectOffset()
118+
throws IOException
119+
{
120+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
121+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
122+
123+
long startOffset = 5; // Start at '5'
124+
long length = 5; // Read 5 bytes: "56789"
125+
126+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
127+
128+
// Read first byte should be '5' (byte at position startOffset)
129+
int firstByte = inlineStream.read();
130+
assertThat(firstByte).isEqualTo('5');
131+
assertThat(inlineStream.getPos()).isEqualTo(1);
132+
133+
// Read next byte should be '6'
134+
int secondByte = inlineStream.read();
135+
assertThat(secondByte).isEqualTo('6');
136+
assertThat(inlineStream.getPos()).isEqualTo(2);
137+
}
138+
139+
@Test
140+
void testSeekAndRead()
141+
throws IOException
142+
{
143+
byte[] data = CONST_STR_FOR_BYTES.getBytes(StandardCharsets.UTF_8);
144+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
145+
146+
long startOffset = 10; // Start at 'A'
147+
long length = 10; // Length covers "ABCDEFGHIJ"
148+
149+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
150+
151+
// Seek to position 3 (relative) - should be at 'D' (position 13 absolute)
152+
inlineStream.seek(3);
153+
assertThat(inlineStream.getPos()).isEqualTo(3);
154+
155+
// Read should get 'D'
156+
int readByte = inlineStream.read();
157+
assertThat(readByte).isEqualTo('D');
158+
assertThat(inlineStream.getPos()).isEqualTo(4);
159+
}
160+
161+
@Test
162+
void testZeroLengthInlineStream()
163+
throws IOException
164+
{
165+
byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
166+
TestTrinoInputStream stream = new TestTrinoInputStream(data);
167+
168+
long startOffset = 5;
169+
long length = 0;
170+
171+
InlineSeekableDataInputStream inlineStream = new InlineSeekableDataInputStream(stream, startOffset, length);
172+
173+
// Position should be 0
174+
assertThat(inlineStream.getPos()).isEqualTo(0);
175+
176+
// Seeking to any position > 0 should fail
177+
assertThatThrownBy(() -> inlineStream.seek(1))
178+
.isInstanceOf(IOException.class)
179+
.hasMessageContaining("Attempting to seek past inline content");
180+
}
181+
182+
/**
183+
* Test implementation of TrinoInputStream for unit testing.
184+
*/
185+
private static class TestTrinoInputStream
186+
extends TrinoInputStream
187+
{
188+
private final byte[] data;
189+
private int position;
190+
191+
public TestTrinoInputStream(byte[] data)
192+
{
193+
this.data = data;
194+
this.position = 0;
195+
}
196+
197+
@Override
198+
public long getPosition()
199+
{
200+
return position;
201+
}
202+
203+
@Override
204+
public void seek(long position)
205+
throws IOException
206+
{
207+
if (position < 0 || position > data.length) {
208+
throw new IOException("Invalid seek position: " + position);
209+
}
210+
this.position = (int) position;
211+
}
212+
213+
@Override
214+
public int read()
215+
throws IOException
216+
{
217+
if (position >= data.length) {
218+
return -1;
219+
}
220+
return data[position++] & 0xFF;
221+
}
222+
223+
@Override
224+
public int read(byte[] b, int off, int len)
225+
throws IOException
226+
{
227+
if (position >= data.length) {
228+
return -1;
229+
}
230+
int available = data.length - position;
231+
int toRead = Math.min(len, available);
232+
System.arraycopy(data, position, b, off, toRead);
233+
position += toRead;
234+
return toRead;
235+
}
236+
237+
@Override
238+
public void close()
239+
throws IOException
240+
{
241+
// No-op for test
242+
}
243+
}
244+
}

0 commit comments

Comments
 (0)