Skip to content

Commit 026feaa

Browse files
authored
Merge pull request #10710 from leiysky/refine-physical-plan-builder
refactor(planner): Refine lowering procedure for `ScalarExpr`
2 parents 1ccf552 + 92b5760 commit 026feaa

File tree

5 files changed

+242
-55
lines changed

5 files changed

+242
-55
lines changed

src/query/sql/src/executor/physical_plan_builder.rs

Lines changed: 78 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use common_expression::type_check::check_function;
2828
use common_expression::types::DataType;
2929
use common_expression::ConstantFolder;
3030
use common_expression::DataBlock;
31+
use common_expression::DataField;
3132
use common_expression::DataSchemaRefExt;
3233
use common_expression::Expr;
3334
use common_expression::RemoteExpr;
@@ -63,6 +64,7 @@ use crate::optimizer::SExpr;
6364
use crate::plans::AggregateMode;
6465
use crate::plans::AndExpr;
6566
use crate::plans::Exchange;
67+
use crate::plans::JoinType;
6668
use crate::plans::RelOperator;
6769
use crate::plans::ScalarExpr;
6870
use crate::plans::Scan;
@@ -72,6 +74,7 @@ use crate::DerivedColumn;
7274
use crate::Metadata;
7375
use crate::MetadataRef;
7476
use crate::TableInternalColumn;
77+
use crate::TypeCheck;
7578
use crate::DUMMY_COLUMN_INDEX;
7679
use crate::DUMMY_TABLE_INDEX;
7780

@@ -278,8 +281,45 @@ impl PhysicalPlanBuilder {
278281
RelOperator::Join(join) => {
279282
let build_side = self.build(s_expr.child(1)?).await?;
280283
let probe_side = self.build(s_expr.child(0)?).await?;
281-
let build_schema = build_side.output_schema()?;
282-
let probe_schema = probe_side.output_schema()?;
284+
285+
let build_schema = match join.join_type {
286+
JoinType::Left | JoinType::Full => {
287+
let build_schema = build_side.output_schema()?;
288+
// Wrap nullable type for columns in build side.
289+
let build_schema = DataSchemaRefExt::create(
290+
build_schema
291+
.fields()
292+
.iter()
293+
.map(|field| {
294+
DataField::new(field.name(), field.data_type().wrap_nullable())
295+
})
296+
.collect::<Vec<_>>(),
297+
);
298+
build_schema
299+
}
300+
301+
_ => build_side.output_schema()?,
302+
};
303+
304+
let probe_schema = match join.join_type {
305+
JoinType::Right | JoinType::Full => {
306+
let probe_schema = probe_side.output_schema()?;
307+
// Wrap nullable type for columns in probe side.
308+
let probe_schema = DataSchemaRefExt::create(
309+
probe_schema
310+
.fields()
311+
.iter()
312+
.map(|field| {
313+
DataField::new(field.name(), field.data_type().wrap_nullable())
314+
})
315+
.collect::<Vec<_>>(),
316+
);
317+
probe_schema
318+
}
319+
320+
_ => probe_side.output_schema()?,
321+
};
322+
283323
let merged_schema = DataSchemaRefExt::create(
284324
probe_schema
285325
.fields()
@@ -297,12 +337,11 @@ impl PhysicalPlanBuilder {
297337
.right_conditions
298338
.iter()
299339
.map(|scalar| {
300-
let expr =
301-
scalar
302-
.as_expr_with_col_index()?
303-
.project_column_ref(|index| {
304-
build_schema.index_of(&index.to_string()).unwrap()
305-
});
340+
let expr = scalar
341+
.resolve_and_check(build_schema.as_ref())?
342+
.project_column_ref(|index| {
343+
build_schema.index_of(&index.to_string()).unwrap()
344+
});
306345
let (expr, _) = ConstantFolder::fold(
307346
&expr,
308347
self.ctx.get_function_context()?,
@@ -315,12 +354,11 @@ impl PhysicalPlanBuilder {
315354
.left_conditions
316355
.iter()
317356
.map(|scalar| {
318-
let expr =
319-
scalar
320-
.as_expr_with_col_index()?
321-
.project_column_ref(|index| {
322-
probe_schema.index_of(&index.to_string()).unwrap()
323-
});
357+
let expr = scalar
358+
.resolve_and_check(probe_schema.as_ref())?
359+
.project_column_ref(|index| {
360+
probe_schema.index_of(&index.to_string()).unwrap()
361+
});
324362
let (expr, _) = ConstantFolder::fold(
325363
&expr,
326364
self.ctx.get_function_context()?,
@@ -333,12 +371,11 @@ impl PhysicalPlanBuilder {
333371
.non_equi_conditions
334372
.iter()
335373
.map(|scalar| {
336-
let expr =
337-
scalar
338-
.as_expr_with_col_index()?
339-
.project_column_ref(|index| {
340-
merged_schema.index_of(&index.to_string()).unwrap()
341-
});
374+
let expr = scalar
375+
.resolve_and_check(merged_schema.as_ref())?
376+
.project_column_ref(|index| {
377+
merged_schema.index_of(&index.to_string()).unwrap()
378+
});
342379
let (expr, _) = ConstantFolder::fold(
343380
&expr,
344381
self.ctx.get_function_context()?,
@@ -362,12 +399,12 @@ impl PhysicalPlanBuilder {
362399
.items
363400
.iter()
364401
.map(|item| {
365-
let expr =
366-
item.scalar
367-
.as_expr_with_col_index()?
368-
.project_column_ref(|index| {
369-
input_schema.index_of(&index.to_string()).unwrap()
370-
});
402+
let expr = item
403+
.scalar
404+
.resolve_and_check(input_schema.as_ref())?
405+
.project_column_ref(|index| {
406+
input_schema.index_of(&index.to_string()).unwrap()
407+
});
371408
let (expr, _) = ConstantFolder::fold(
372409
&expr,
373410
self.ctx.get_function_context()?,
@@ -394,12 +431,11 @@ impl PhysicalPlanBuilder {
394431
.predicates
395432
.iter()
396433
.map(|scalar| {
397-
let expr =
398-
scalar
399-
.as_expr_with_col_index()?
400-
.project_column_ref(|index| {
401-
input_schema.index_of(&index.to_string()).unwrap()
402-
});
434+
let expr = scalar
435+
.resolve_and_check(input_schema.as_ref())?
436+
.project_column_ref(|index| {
437+
input_schema.index_of(&index.to_string()).unwrap()
438+
});
403439
let expr = cast_expr_to_non_null_boolean(expr)?;
404440
let (expr, _) = ConstantFolder::fold(
405441
&expr,
@@ -678,12 +714,11 @@ impl PhysicalPlanBuilder {
678714
Exchange::Random => FragmentKind::Init,
679715
Exchange::Hash(scalars) => {
680716
for scalar in scalars {
681-
let expr =
682-
scalar
683-
.as_expr_with_col_index()?
684-
.project_column_ref(|index| {
685-
input_schema.index_of(&index.to_string()).unwrap()
686-
});
717+
let expr = scalar
718+
.resolve_and_check(input_schema.as_ref())?
719+
.project_column_ref(|index| {
720+
input_schema.index_of(&index.to_string()).unwrap()
721+
});
687722
let (expr, _) = ConstantFolder::fold(
688723
&expr,
689724
self.ctx.get_function_context()?,
@@ -741,7 +776,7 @@ impl PhysicalPlanBuilder {
741776
left_runtime_filters.insert(
742777
left.0.clone(),
743778
left.1
744-
.as_expr_with_col_index()?
779+
.resolve_and_check(left_schema.as_ref())?
745780
.project_column_ref(|index| {
746781
left_schema.index_of(&index.to_string()).unwrap()
747782
})
@@ -751,7 +786,7 @@ impl PhysicalPlanBuilder {
751786
right.0.clone(),
752787
right
753788
.1
754-
.as_expr_with_col_index()?
789+
.resolve_and_check(right_schema.as_ref())?
755790
.project_column_ref(|index| {
756791
right_schema.index_of(&index.to_string()).unwrap()
757792
})
@@ -778,8 +813,9 @@ impl PhysicalPlanBuilder {
778813
.args
779814
.iter()
780815
.map(|arg| {
781-
let expr =
782-
arg.as_expr_with_col_index()?.project_column_ref(|index| {
816+
let expr = arg
817+
.resolve_and_check(input_schema.as_ref())?
818+
.project_column_ref(|index| {
783819
input_schema.index_of(&index.to_string()).unwrap()
784820
});
785821
let (expr, _) = ConstantFolder::fold(

src/query/sql/src/planner/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,4 @@ pub use metadata::*;
3636
pub use planner::PlanExtras;
3737
pub use planner::Planner;
3838
pub use plans::ScalarExpr;
39-
pub use semantic::normalize_identifier;
40-
pub use semantic::resolve_type_name_by_str;
41-
pub use semantic::validate_function_arg;
42-
pub use semantic::IdentifierNormalizer;
43-
pub use semantic::NameResolutionContext;
39+
pub use semantic::*;

0 commit comments

Comments
 (0)