Skip to content

Commit 016f101

Browse files
authored
Merge pull request #10621 from sundy-li/auto-cast-loading-parquet
feat(query): enable runtime cast transform in loading parquet files
2 parents a4ccd99 + 9e9833a commit 016f101

File tree

6 files changed

+159
-20
lines changed

6 files changed

+159
-20
lines changed

src/query/pipeline/sources/src/input_formats/impls/input_format_parquet.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use common_arrow::read_columns_async;
3838
use common_exception::ErrorCode;
3939
use common_exception::Result;
4040
use common_expression::DataBlock;
41-
use common_expression::TableField;
41+
use common_expression::DataField;
42+
use common_expression::DataSchema;
4243
use common_expression::TableSchema;
4344
use common_expression::TableSchemaRef;
4445
use common_meta_app::principal::StageInfo;
@@ -362,7 +363,15 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
362363
fn deserialize(&mut self, mut batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
363364
if let Some(rg) = batch.as_mut() {
364365
let chunk = rg.get_arrow_chunk()?;
365-
let block = DataBlock::from_arrow_chunk(&chunk, &self.ctx.data_schema())?;
366+
367+
let fields: Vec<DataField> = rg
368+
.fields_to_read
369+
.iter()
370+
.map(DataField::from)
371+
.collect::<Vec<_>>();
372+
373+
let input_schema = DataSchema::new(fields);
374+
let block = DataBlock::from_arrow_chunk(&chunk, &input_schema)?;
366375

367376
let block_total_rows = block.num_rows();
368377
let num_rows_per_block = self.ctx.block_compact_thresholds.max_rows_per_block;
@@ -446,20 +455,12 @@ impl AligningStateTrait for AligningState {
446455

447456
fn get_used_fields(fields: &Vec<Field>, schema: &TableSchemaRef) -> Result<Vec<Field>> {
448457
let mut read_fields = Vec::with_capacity(fields.len());
449-
for (idx, f) in schema.fields().iter().enumerate() {
458+
for f in schema.fields().iter() {
450459
if let Some(m) = fields
451460
.iter()
452461
.filter(|c| c.name.eq_ignore_ascii_case(f.name()))
453462
.last()
454463
{
455-
let tf = TableField::from(m);
456-
if tf.data_type().remove_nullable() != f.data_type().remove_nullable() {
457-
return Err(ErrorCode::TableSchemaMismatch(format!(
458-
"parquet schema mismatch for field {}(start from 0), expect: {:?}, got {:?}",
459-
idx, f, tf
460-
)));
461-
}
462-
463464
read_fields.push(m.clone());
464465
} else {
465466
return Err(ErrorCode::TableSchemaMismatch(format!(

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::VecDeque;
1616
use std::io::BufRead;
1717
use std::io::Cursor;
1818
use std::ops::Not;
19+
use std::str::FromStr;
1920
use std::sync::Arc;
2021
use std::time::Instant;
2122

@@ -49,6 +50,7 @@ use common_formats::FastFieldDecoderValues;
4950
use common_io::cursor_ext::ReadBytesExt;
5051
use common_io::cursor_ext::ReadCheckPointExt;
5152
use common_meta_app::principal::FileFormatOptions;
53+
use common_meta_app::principal::StageFileFormatType;
5254
use common_meta_app::principal::StageInfo;
5355
use common_pipeline_core::Pipeline;
5456
use common_pipeline_sources::AsyncSource;
@@ -85,6 +87,7 @@ use crate::interpreters::common::append2table;
8587
use crate::interpreters::Interpreter;
8688
use crate::interpreters::InterpreterPtr;
8789
use crate::pipelines::processors::transforms::TransformAddConstColumns;
90+
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
8891
use crate::pipelines::processors::TransformResortAddOn;
8992
use crate::pipelines::PipelineBuildResult;
9093
use crate::pipelines::SourcePipeBuilder;
@@ -370,17 +373,49 @@ impl Interpreter for InsertInterpreter {
370373
1,
371374
)?;
372375
}
373-
InsertInputSource::StreamingWithFormat(_, _, input_context) => {
376+
InsertInputSource::StreamingWithFormat(format, _, input_context) => {
374377
let input_context = input_context.as_ref().expect("must success").clone();
375378
input_context
376379
.format
377380
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
381+
382+
if Ok(StageFileFormatType::Parquet) == StageFileFormatType::from_str(format) {
383+
let dest_schema = plan.schema();
384+
let func_ctx = self.ctx.get_function_context()?;
385+
386+
build_res.main_pipeline.add_transform(
387+
|transform_input_port, transform_output_port| {
388+
TransformRuntimeCastSchema::try_create(
389+
transform_input_port,
390+
transform_output_port,
391+
dest_schema.clone(),
392+
func_ctx,
393+
)
394+
},
395+
)?;
396+
}
378397
}
379-
InsertInputSource::StreamingWithFileFormat(_, _, input_context) => {
398+
InsertInputSource::StreamingWithFileFormat(format_options, _, input_context) => {
380399
let input_context = input_context.as_ref().expect("must success").clone();
381400
input_context
382401
.format
383402
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
403+
404+
if StageFileFormatType::Parquet == format_options.format {
405+
let dest_schema = plan.schema();
406+
let func_ctx = self.ctx.get_function_context()?;
407+
408+
build_res.main_pipeline.add_transform(
409+
|transform_input_port, transform_output_port| {
410+
TransformRuntimeCastSchema::try_create(
411+
transform_input_port,
412+
transform_output_port,
413+
dest_schema.clone(),
414+
func_ctx,
415+
)
416+
},
417+
)?;
418+
}
384419
}
385420
InsertInputSource::Stage(opts) => {
386421
tracing::info!("insert: from stage with options {:?}", opts);

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod transform_merge_block;
2929
mod transform_resort_addon;
3030
mod transform_right_join;
3131
mod transform_right_semi_anti_join;
32+
mod transform_runtime_cast_schema;
3233
mod transform_runtime_filter;
3334

3435
pub use aggregator::build_partition_bucket;
@@ -84,6 +85,7 @@ pub use transform_right_join::RightJoinCompactor;
8485
pub use transform_right_join::TransformRightJoin;
8586
pub use transform_right_semi_anti_join::RightSemiAntiJoinCompactor;
8687
pub use transform_right_semi_anti_join::TransformRightSemiAntiJoin;
88+
pub use transform_runtime_cast_schema::TransformRuntimeCastSchema;
8789
pub use transform_runtime_filter::SinkRuntimeFilterSource;
8890
pub use transform_runtime_filter::TransformRuntimeFilter;
8991
pub use transform_sort_merge::SortMergeCompactor;
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_exception::Result;
18+
use common_expression::BlockEntry;
19+
use common_expression::DataBlock;
20+
use common_expression::DataSchemaRef;
21+
use common_expression::Evaluator;
22+
use common_expression::Expr;
23+
use common_expression::FunctionContext;
24+
use common_functions::scalars::BUILTIN_FUNCTIONS;
25+
26+
use crate::pipelines::processors::port::InputPort;
27+
use crate::pipelines::processors::port::OutputPort;
28+
use crate::pipelines::processors::processor::ProcessorPtr;
29+
use crate::pipelines::processors::transforms::transform::Transform;
30+
use crate::pipelines::processors::transforms::transform::Transformer;
31+
32+
/// TransformRuntimeCastSchema is used to cast block to the specified schema.
33+
/// Different from `TransformCastSchema`, it is used at the runtime
34+
pub struct TransformRuntimeCastSchema {
35+
func_ctx: FunctionContext,
36+
insert_schema: DataSchemaRef,
37+
}
38+
39+
impl TransformRuntimeCastSchema
40+
where Self: Transform
41+
{
42+
pub fn try_create(
43+
input_port: Arc<InputPort>,
44+
output_port: Arc<OutputPort>,
45+
insert_schema: DataSchemaRef,
46+
func_ctx: FunctionContext,
47+
) -> Result<ProcessorPtr> {
48+
Ok(ProcessorPtr::create(Transformer::create(
49+
input_port,
50+
output_port,
51+
Self {
52+
func_ctx,
53+
insert_schema,
54+
},
55+
)))
56+
}
57+
}
58+
59+
impl Transform for TransformRuntimeCastSchema {
60+
const NAME: &'static str = "CastSchemaTransform";
61+
62+
fn transform(&mut self, data_block: DataBlock) -> Result<DataBlock> {
63+
let exprs: Vec<Expr> = data_block
64+
.columns()
65+
.iter()
66+
.zip(self.insert_schema.fields().iter().enumerate())
67+
.map(|(from, (index, to))| {
68+
let expr = Expr::ColumnRef {
69+
span: None,
70+
id: index,
71+
data_type: from.data_type.clone(),
72+
display_name: to.name().clone(),
73+
};
74+
if &from.data_type != to.data_type() {
75+
Expr::Cast {
76+
span: None,
77+
is_try: false,
78+
expr: Box::new(expr),
79+
dest_type: to.data_type().clone(),
80+
}
81+
} else {
82+
expr
83+
}
84+
})
85+
.collect();
86+
87+
let mut columns = Vec::with_capacity(exprs.len());
88+
let evaluator = Evaluator::new(&data_block, self.func_ctx, &BUILTIN_FUNCTIONS);
89+
90+
for (field, expr) in self.insert_schema.fields().iter().zip(exprs.iter()) {
91+
let value = evaluator.run(expr)?;
92+
let column = BlockEntry {
93+
data_type: field.data_type().clone(),
94+
value,
95+
};
96+
columns.push(column);
97+
}
98+
Ok(DataBlock::new(columns, data_block.num_rows()))
99+
}
100+
}

tests/suites/1_stateful/01_load/01_0000_streaming_load.result

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
198 2020.0 767
1313
--parquet less
1414
199 2020.0 769
15-
--parquet mismatch schema
16-
1
15+
--parquet runtime cast schema
16+
199 2020.0 769

tests/suites/1_stateful/01_load/01_0000_streaming_load.sh

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,13 @@ echo "--parquet less"
8787
curl -s -H "insert_sql:insert into ontime_less file_format = (type = Parquet)" -F "upload=@/tmp/ontime_200.parquet" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
8888
echo "select count(1), avg(Year), sum(DayOfWeek) from ontime_less;" | $MYSQL_CLIENT_CONNECT
8989

90-
# load parquet with mismatch schema
91-
echo "--parquet mismatch schema"
92-
cat $CURDIR/../ddl/ontime.sql | sed 's/ontime/ontime_test_mismatch/g' | sed 's/DATE/VARCHAR/g' | $MYSQL_CLIENT_CONNECT
93-
curl -s -H "insert_sql:insert into ontime_test_mismatch file_format = (type = Parquet)" -F "upload=@/tmp/ontime_200.parquet" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c 'parquet schema mismatch'
90+
# load parquet with mismatch schema, will auto cast
91+
echo "--parquet runtime cast schema"
92+
cat $CURDIR/../ddl/ontime.sql | sed 's/ontime/ontime_test_schmea_mismatch/g' | sed 's/DATE/TIMESTAMP/g' | $MYSQL_CLIENT_CONNECT
93+
curl -s -H "insert_sql:insert into ontime_test_schmea_mismatch file_format = (type = Parquet)" -F "upload=@/tmp/ontime_200.parquet" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
94+
echo "select count(1), avg(Year), sum(DayOfWeek) from ontime_test_schmea_mismatch;" | $MYSQL_CLIENT_CONNECT
9495

9596

9697
echo "drop table ontime_streaming_load;" | $MYSQL_CLIENT_CONNECT
97-
echo "drop table ontime_test_mismatch;" | $MYSQL_CLIENT_CONNECT
98+
echo "drop table ontime_test_schmea_mismatch;" | $MYSQL_CLIENT_CONNECT
9899
echo "drop table ontime_less;" | $MYSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)