Skip to content

Commit 238cee6

Browse files
committed
feat(storage): Support domain for nested data types
1 parent 2e59317 commit 238cee6

File tree

6 files changed

+266
-105
lines changed

6 files changed

+266
-105
lines changed

src/query/expression/src/schema.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,77 @@ impl TableSchema {
568568
}
569569
}
570570

571+
// Returns the inner column ids with name,
572+
// tuple data type may have inner fields, like `a.1`, `a:b`
573+
pub fn inner_column_ids_with_name(&self, name: &String) -> Vec<ColumnId> {
574+
fn collect_inner_column_ids(
575+
name: &String,
576+
field_name: &String,
577+
data_type: &TableDataType,
578+
column_ids: &mut Vec<ColumnId>,
579+
next_column_id: &mut ColumnId,
580+
) -> bool {
581+
if name == field_name {
582+
let n = data_type.n_columns();
583+
for i in 0..n {
584+
column_ids.push(*next_column_id + i as u32);
585+
}
586+
return true;
587+
}
588+
589+
if let TableDataType::Tuple {
590+
fields_name,
591+
fields_type,
592+
} = data_type
593+
{
594+
if name.starts_with(field_name) {
595+
for ((i, inner_field_name), inner_field_type) in
596+
fields_name.iter().enumerate().zip(fields_type.iter())
597+
{
598+
let inner_name = format!("{}:{}", field_name, inner_field_name);
599+
if name.starts_with(&inner_name) {
600+
return collect_inner_column_ids(
601+
name,
602+
&inner_name,
603+
inner_field_type,
604+
column_ids,
605+
next_column_id,
606+
);
607+
}
608+
let inner_name = format!("{}:{}", field_name, i + 1);
609+
if name.starts_with(&inner_name) {
610+
return collect_inner_column_ids(
611+
name,
612+
&inner_name,
613+
inner_field_type,
614+
column_ids,
615+
next_column_id,
616+
);
617+
}
618+
*next_column_id += inner_field_type.n_columns() as u32;
619+
}
620+
}
621+
}
622+
false
623+
}
624+
625+
let mut column_ids = Vec::new();
626+
for field in self.fields() {
627+
let mut next_column_id = field.column_id;
628+
if collect_inner_column_ids(
629+
name,
630+
&field.name,
631+
&field.data_type,
632+
&mut column_ids,
633+
&mut next_column_id,
634+
) {
635+
break;
636+
}
637+
}
638+
639+
column_ids
640+
}
641+
571642
fn traverse_paths(
572643
fields: &[TableField],
573644
path: &[FieldIndex],
@@ -996,6 +1067,20 @@ impl TableDataType {
9961067
_ => self.to_string().to_uppercase(),
9971068
}
9981069
}
1070+
1071+
// Returns the number of leaf columns of the TableDataType
1072+
pub fn n_columns(&self) -> usize {
1073+
match self {
1074+
TableDataType::Nullable(box inner_ty)
1075+
| TableDataType::Array(box inner_ty)
1076+
| TableDataType::Map(box inner_ty) => inner_ty.n_columns(),
1077+
TableDataType::Tuple { fields_type, .. } => fields_type
1078+
.iter()
1079+
.map(|inner_ty| inner_ty.n_columns())
1080+
.sum(),
1081+
_ => 1,
1082+
}
1083+
}
9991084
}
10001085

10011086
pub type DataSchemaRef = Arc<DataSchema>;

src/query/expression/src/types.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,19 @@ impl DataType {
236236
_ => self.to_string().to_uppercase(),
237237
}
238238
}
239+
240+
// Returns the number of leaf columns of the DataType
241+
pub fn n_columns(&self) -> usize {
242+
match self {
243+
DataType::Nullable(box inner_ty)
244+
| DataType::Array(box inner_ty)
245+
| DataType::Map(box inner_ty) => inner_ty.n_columns(),
246+
DataType::Tuple(inner_tys) => {
247+
inner_tys.iter().map(|inner_ty| inner_ty.n_columns()).sum()
248+
}
249+
_ => 1,
250+
}
251+
}
239252
}
240253

241254
pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static {

src/query/expression/tests/it/schema.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,3 +526,43 @@ fn test_schema_modify_field() -> Result<()> {
526526

527527
Ok(())
528528
}
529+
530+
#[test]
531+
fn test_inner_column_ids_with_name() -> Result<()> {
532+
let fields = vec![
533+
TableField::new("a", TableDataType::Number(NumberDataType::UInt64)),
534+
TableField::new("b", TableDataType::Tuple {
535+
fields_name: vec!["b1".to_string(), "b2".to_string()],
536+
fields_type: vec![
537+
TableDataType::Tuple {
538+
fields_name: vec!["b11".to_string(), "b12".to_string()],
539+
fields_type: vec![TableDataType::Boolean, TableDataType::String],
540+
}
541+
TableDataType::Number(NumberDataType::UInt64),
542+
],
543+
}
544+
TableField::new("c", TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt64)))),
545+
TableField::new("d", TableDataType::Map(Box::new(
546+
TableDataType::Tuple {
547+
fields_name: vec!["key".to_string(), "value".to_string()],
548+
fields_type: vec![TableDataType::String, TableDataType::String],
549+
}
550+
)),
551+
TableField::new("e", TableDataType::String),
552+
];
553+
let mut schema = TableSchema::new(fields);
554+
555+
assert_eq!(schema.inner_column_ids_with_name("a"), vec![0]);
556+
assert_eq!(schema.inner_column_ids_with_name("b"), vec![1,2,3]);
557+
assert_eq!(schema.inner_column_ids_with_name("b:b1"), vec![1,2]);
558+
assert_eq!(schema.inner_column_ids_with_name("b:1"), vec![1,2]);
559+
assert_eq!(schema.inner_column_ids_with_name("b:b1:b11"), vec![1]);
560+
assert_eq!(schema.inner_column_ids_with_name("b:1:1"), vec![1]);
561+
assert_eq!(schema.inner_column_ids_with_name("b:b1:b12"), vec![2]);
562+
assert_eq!(schema.inner_column_ids_with_name("b:1:2"), vec![2]);
563+
assert_eq!(schema.inner_column_ids_with_name("b:b2"), vec![3]);
564+
assert_eq!(schema.inner_column_ids_with_name("b:2"), vec![3]);
565+
assert_eq!(schema.inner_column_ids_with_name("c"), vec![4]);
566+
assert_eq!(schema.inner_column_ids_with_name("d"), vec![5,6]);
567+
assert_eq!(schema.inner_column_ids_with_name("e"), vec![7]);
568+
}

src/query/storages/common/index/src/page_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,14 @@ impl PageIndex {
162162
{
163163
let f = &self.cluster_key_fields[idx];
164164

165-
let stats = ColumnStatistics {
165+
let stat = ColumnStatistics {
166166
min: min.clone(),
167167
max: max.clone(),
168168
null_count: 1,
169169
in_memory_size: 0,
170170
distinct_of_values: None,
171171
};
172-
let domain = statistics_to_domain(Some(&stats), f.data_type());
172+
let domain = statistics_to_domain(vec![&stat], f.data_type());
173173
input_domains.insert(f.name().clone(), domain);
174174
}
175175

src/query/storages/common/index/src/range_index.rs

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
16-
1715
use common_exception::Result;
1816
use common_expression::types::nullable::NullableDomain;
1917
use common_expression::types::number::SimpleDomain;
@@ -26,7 +24,6 @@ use common_expression::types::StringType;
2624
use common_expression::types::TimestampType;
2725
use common_expression::types::ValueType;
2826
use common_expression::with_number_mapped_type;
29-
use common_expression::ColumnId;
3027
use common_expression::ConstantFolder;
3128
use common_expression::Domain;
3229
use common_expression::Expr;
@@ -43,7 +40,7 @@ use crate::Index;
4340
pub struct RangeIndex {
4441
expr: Expr<String>,
4542
func_ctx: FunctionContext,
46-
column_ids: HashMap<String, ColumnId>,
43+
schema: TableSchemaRef,
4744
}
4845

4946
impl RangeIndex {
@@ -52,19 +49,10 @@ impl RangeIndex {
5249
expr: &Expr<String>,
5350
schema: TableSchemaRef,
5451
) -> Result<Self> {
55-
let leaf_fields = schema.leaf_fields();
56-
let column_ids = leaf_fields.iter().fold(
57-
HashMap::with_capacity(leaf_fields.len()),
58-
|mut acc, field| {
59-
acc.insert(field.name().clone(), field.column_id());
60-
acc
61-
},
62-
);
63-
6452
Ok(Self {
6553
expr: expr.clone(),
6654
func_ctx,
67-
column_ids,
55+
schema,
6856
})
6957
}
7058

@@ -83,11 +71,13 @@ impl RangeIndex {
8371
.column_refs()
8472
.into_iter()
8573
.map(|(name, ty)| {
86-
let stat = match self.column_ids.get(&name) {
87-
Some(column_id) => stats.get(column_id),
88-
None => None,
89-
};
90-
let domain = statistics_to_domain(stat, &ty);
74+
let column_ids = self.schema.inner_column_ids_with_name(&name);
75+
let stats = column_ids
76+
.iter()
77+
.filter_map(|column_id| stats.get(column_id))
78+
.collect::<_>();
79+
80+
let domain = statistics_to_domain(stats, &ty);
9181
Ok((name, domain))
9282
})
9383
.collect::<Result<_>>()?;
@@ -107,52 +97,92 @@ impl RangeIndex {
10797
}
10898
}
10999

110-
pub fn statistics_to_domain(stat: Option<&ColumnStatistics>, data_type: &DataType) -> Domain {
111-
if stat.is_none() {
100+
pub fn statistics_to_domain(mut stats: Vec<&ColumnStatistics>, data_type: &DataType) -> Domain {
101+
if stats.len() != data_type.n_columns() {
112102
return Domain::full(data_type);
113103
}
114-
let stat = stat.unwrap();
115-
if stat.min.is_null() || stat.max.is_null() {
116-
return Domain::Nullable(NullableDomain {
117-
has_null: true,
118-
value: None,
119-
});
120-
}
121-
with_number_mapped_type!(|NUM_TYPE| match data_type {
122-
DataType::Number(NumberDataType::NUM_TYPE) => {
123-
NumberType::<NUM_TYPE>::upcast_domain(SimpleDomain {
124-
min: NumberType::<NUM_TYPE>::try_downcast_scalar(&stat.min.as_ref()).unwrap(),
125-
max: NumberType::<NUM_TYPE>::try_downcast_scalar(&stat.max.as_ref()).unwrap(),
126-
})
127-
}
128-
DataType::String => Domain::String(StringDomain {
129-
min: StringType::try_downcast_scalar(&stat.min.as_ref())
130-
.unwrap()
131-
.to_vec(),
132-
max: Some(
133-
StringType::try_downcast_scalar(&stat.max.as_ref())
134-
.unwrap()
135-
.to_vec()
136-
),
137-
}),
138-
DataType::Timestamp => TimestampType::upcast_domain(SimpleDomain {
139-
min: TimestampType::try_downcast_scalar(&stat.min.as_ref()).unwrap(),
140-
max: TimestampType::try_downcast_scalar(&stat.max.as_ref()).unwrap(),
141-
}),
142-
DataType::Date => DateType::upcast_domain(SimpleDomain {
143-
min: DateType::try_downcast_scalar(&stat.min.as_ref()).unwrap(),
144-
max: DateType::try_downcast_scalar(&stat.max.as_ref()).unwrap(),
145-
}),
146-
DataType::Nullable(ty) => {
147-
let domain = statistics_to_domain(Some(stat), ty);
104+
match data_type {
105+
DataType::Nullable(box inner_ty) => {
106+
if stats.len() == 1 && (stats[0].min.is_null() || stats[0].max.is_null()) {
107+
return Domain::Nullable(NullableDomain {
108+
has_null: true,
109+
value: None,
110+
});
111+
}
112+
let has_null = if stats.len() == 1 {
113+
stats[0].null_count > 0
114+
} else {
115+
// Only leaf columns have statistics,
116+
// nested columns are treated as having nullable values
117+
true
118+
};
119+
let domain = statistics_to_domain(stats, inner_ty);
148120
Domain::Nullable(NullableDomain {
149-
has_null: stat.null_count > 0,
121+
has_null,
150122
value: Some(Box::new(domain)),
151123
})
152124
}
153-
// Unsupported data type
154-
_ => Domain::full(data_type),
155-
})
125+
DataType::Tuple(inner_tys) => {
126+
let inner_domains = inner_tys
127+
.iter()
128+
.map(|inner_ty| {
129+
let n = inner_ty.n_columns();
130+
let stats = stats.drain(..n).collect();
131+
statistics_to_domain(stats, inner_ty)
132+
})
133+
.collect::<Vec<_>>();
134+
Domain::Tuple(inner_domains)
135+
}
136+
DataType::Array(box inner_ty) => {
137+
let n = inner_ty.n_columns();
138+
let stats = stats.drain(..n).collect();
139+
let inner_domain = statistics_to_domain(stats, inner_ty);
140+
Domain::Array(Some(Box::new(inner_domain)))
141+
}
142+
DataType::Map(box inner_ty) => {
143+
let n = inner_ty.n_columns();
144+
let stats = stats.drain(..n).collect();
145+
let inner_domain = statistics_to_domain(stats, inner_ty);
146+
let kv_domain = inner_domain.as_tuple().unwrap();
147+
Domain::Map(Some((
148+
Box::new(kv_domain[0].clone()),
149+
Box::new(kv_domain[1].clone()),
150+
)))
151+
}
152+
_ => {
153+
let stat = stats[0];
154+
with_number_mapped_type!(|NUM_TYPE| match data_type {
155+
DataType::Number(NumberDataType::NUM_TYPE) => {
156+
NumberType::<NUM_TYPE>::upcast_domain(SimpleDomain {
157+
min: NumberType::<NUM_TYPE>::try_downcast_scalar(&stat.min.as_ref())
158+
.unwrap(),
159+
max: NumberType::<NUM_TYPE>::try_downcast_scalar(&stat.max.as_ref())
160+
.unwrap(),
161+
})
162+
}
163+
DataType::String => Domain::String(StringDomain {
164+
min: StringType::try_downcast_scalar(&stat.min.as_ref())
165+
.unwrap()
166+
.to_vec(),
167+
max: Some(
168+
StringType::try_downcast_scalar(&stat.max.as_ref())
169+
.unwrap()
170+
.to_vec()
171+
),
172+
}),
173+
DataType::Timestamp => TimestampType::upcast_domain(SimpleDomain {
174+
min: TimestampType::try_downcast_scalar(&stat.min.as_ref()).unwrap(),
175+
max: TimestampType::try_downcast_scalar(&stat.max.as_ref()).unwrap(),
176+
}),
177+
DataType::Date => DateType::upcast_domain(SimpleDomain {
178+
min: DateType::try_downcast_scalar(&stat.min.as_ref()).unwrap(),
179+
max: DateType::try_downcast_scalar(&stat.max.as_ref()).unwrap(),
180+
}),
181+
// Unsupported data type
182+
_ => Domain::full(data_type),
183+
})
184+
}
185+
}
156186
}
157187

158188
impl Index for RangeIndex {}

0 commit comments

Comments
 (0)