Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ pub struct InternalColumnMeta {
pub offsets: Option<RoaringTreemap>,
pub base_block_ids: Option<Scalar>,
pub inner: Option<BlockMetaInfoPtr>,
// The search matched rows and optional scores in the block.
pub matched_rows: Option<Vec<(usize, Option<F32>)>>,
// The search matched rows in the block (aligned with `matched_scores` when present).
pub matched_rows: Option<Vec<usize>>,
// Optional scores for the matched rows.
pub matched_scores: Option<Vec<F32>>,
// The vector topn rows and scores in the block.
pub vector_scores: Option<Vec<(usize, F32)>>,
}
Expand Down Expand Up @@ -280,24 +282,25 @@ impl InternalColumn {
InternalColumnType::SearchMatched => {
assert!(meta.matched_rows.is_some());
let matched_rows = meta.matched_rows.as_ref().unwrap();

let mut bitmap = MutableBitmap::from_len_zeroed(num_rows);
for (idx, _) in matched_rows.iter() {
debug_assert!(*idx < bitmap.len());
for idx in matched_rows.iter() {
debug_assert!(*idx < num_rows);
bitmap.set(*idx, true);
}
Column::Boolean(bitmap.into()).into()
}
InternalColumnType::SearchScore => {
assert!(meta.matched_rows.is_some());
assert!(meta.matched_scores.is_some());
let matched_rows = meta.matched_rows.as_ref().unwrap();
let matched_scores = meta.matched_scores.as_ref().unwrap();
debug_assert_eq!(matched_rows.len(), matched_scores.len());

let mut scores = vec![F32::from(0_f32); num_rows];
for (idx, score) in matched_rows.iter() {
debug_assert!(*idx < scores.len());
for (idx, score) in matched_rows.iter().zip(matched_scores.iter()) {
debug_assert!(*idx < num_rows);
if let Some(val) = scores.get_mut(*idx) {
debug_assert!(score.is_some());
*val = F32::from(*score.unwrap());
*val = *score;
}
}
Float32Type::from_data(scores).into()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
)
.await?;
assert!(matched_rows.is_some());
let matched_rows = matched_rows.unwrap();
let (matched_rows, _) = matched_rows.unwrap();
assert_eq!(matched_rows.len(), ids.len());
for (matched_row, id) in matched_rows.iter().zip(ids.iter()) {
assert_eq!(matched_row.0, *id);
assert_eq!(matched_row, id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ fn expected_data_block(
base_block_ids: None,
inner: None,
matched_rows: block_meta.matched_rows.clone(),
matched_scores: block_meta.matched_scores.clone(),
vector_scores: block_meta.vector_scores.clone(),
};
for internal_column in internal_columns {
Expand Down
6 changes: 4 additions & 2 deletions src/query/storages/common/pruner/src/block_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ pub struct BlockMetaIndex {
pub block_location: String,
pub segment_location: String,
pub snapshot_location: Option<String>,
// The search matched rows and optional scores in the block.
pub matched_rows: Option<Vec<(usize, Option<F32>)>>,
// The search matched rows in the block (aligned with `matched_scores` when present).
pub matched_rows: Option<Vec<usize>>,
// Optional scores for the matched rows.
pub matched_scores: Option<Vec<F32>>,
// The vector topn rows and scores in the block.
pub vector_scores: Option<Vec<(usize, F32)>>,
// The optional meta of virtual columns.
Expand Down
48 changes: 16 additions & 32 deletions src/query/storages/common/pruner/src/topn_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,13 @@ impl TopNPruner {

let mut score_stats = Vec::new();
for (pos, (index, _)) in metas.iter().enumerate() {
let Some(rows) = &index.matched_rows else {
continue;
let Some(scores) = &index.matched_scores else {
return Ok(metas);
};
if rows.is_empty() {
continue;
}
let Some((min_score, max_score)) = block_score_range(rows) else {
let Some((min_score, max_score)) = block_score_range(scores) else {
return Ok(metas);
};
score_stats.push((pos, min_score, max_score, rows.len()));
score_stats.push((pos, min_score, max_score, scores.len()));
}

if score_stats.is_empty() {
Expand Down Expand Up @@ -302,29 +299,22 @@ fn index_match_count(index: &BlockMetaIndex) -> usize {
0
}

fn block_score_range(rows: &[(usize, Option<F32>)]) -> Option<(F32, F32)> {
let mut min_score: Option<F32> = None;
let mut max_score: Option<F32> = None;
for (_, score) in rows {
let score = (*score)?;
min_score = Some(match min_score {
Some(current) => current.min(score),
None => score,
});
max_score = Some(match max_score {
Some(current) => current.max(score),
None => score,
});
fn block_score_range(scores: &[F32]) -> Option<(F32, F32)> {
if scores.is_empty() {
return None;
}
min_score.zip(max_score)
// Scores are arranged in descending order,
// so we can directly get the maximum and minimum score.
let max_score = scores[0];
let min_score = scores[scores.len() - 1];
Some((min_score, max_score))
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use databend_common_expression::types::number::NumberDataType;
use databend_common_expression::types::number::F32;
use databend_common_expression::types::DataType;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -454,7 +444,7 @@ mod tests {
let matched = if matched_rows == 0 {
None
} else {
Some((0..matched_rows).map(|row| (row, None)).collect::<Vec<_>>())
Some((0..matched_rows).collect::<Vec<_>>())
};

let index = BlockMetaIndex {
Expand All @@ -467,6 +457,7 @@ mod tests {
segment_location: "segment".to_string(),
snapshot_location: None,
matched_rows: matched,
matched_scores: None,
vector_scores: None,
virtual_block_meta: None,
};
Expand All @@ -482,15 +473,8 @@ mod tests {
scores: &[f32],
) -> (BlockMetaIndex, Arc<BlockMeta>) {
let (mut index, meta) = build_block(column_id, block_id, min, max, scores.len());
let matched_rows = scores
.iter()
.enumerate()
.map(|(row, score)| {
let ordered: F32 = (*score).into();
(row, Some(ordered))
})
.collect();
index.matched_rows = Some(matched_rows);
let matched_scores = scores.iter().map(|v| (*v).into()).collect();
index.matched_scores = Some(matched_scores);
(index, meta)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl InvertedIndexReader {
index_record: &IndexRecordOption,
fuzziness: &Option<u8>,
index_loc: &str,
) -> Result<Option<Vec<(usize, Option<F32>)>>> {
) -> Result<Option<(Vec<usize>, Option<Vec<F32>>)>> {
let start = Instant::now();

let matched_rows = self
Expand Down Expand Up @@ -113,7 +113,7 @@ impl InvertedIndexReader {
field_ids: &HashSet<u32>,
index_record: &IndexRecordOption,
fuzziness: &Option<u8>,
) -> Result<Option<Vec<(usize, Option<F32>)>>> {
) -> Result<Option<(Vec<usize>, Option<Vec<F32>>)>> {
// read index meta.
let inverted_index_meta = load_inverted_index_meta(self.dal.clone(), index_path).await?;
let version = inverted_index_meta.version;
Expand Down Expand Up @@ -158,7 +158,7 @@ impl InvertedIndexReader {
query: Box<dyn Query>,
version: usize,
inverted_index_meta_map: HashMap<String, SingleColumnMeta>,
) -> Result<Option<Vec<(usize, Option<F32>)>>> {
) -> Result<Option<(Vec<usize>, Option<Vec<F32>>)>> {
let directory = load_inverted_index_directory(
settings,
index_path,
Expand All @@ -173,30 +173,33 @@ impl InvertedIndexReader {
let reader = index.reader()?;
let searcher = reader.searcher();

let matched_rows = if self.has_score {
let (matched_rows, matched_scores) = if self.has_score {
let collector = TopDocs::with_limit(self.row_count as usize);
let docs = searcher.search(&query, &collector)?;

let mut matched_rows = Vec::with_capacity(docs.len());
let mut matched_scores = Vec::with_capacity(docs.len());
for (score, doc_addr) in docs {
let doc_id = doc_addr.doc_id as usize;
let score = F32::from(score);
matched_rows.push((doc_id, Some(score)));
matched_rows.push(doc_id);
matched_scores.push(score);
}
matched_rows
(matched_rows, Some(matched_scores))
} else {
let collector = DocSetCollector;
let docs = searcher.search(&query, &collector)?;

let mut matched_rows = Vec::with_capacity(docs.len());
for doc_addr in docs {
let doc_id = doc_addr.doc_id as usize;
matched_rows.push((doc_id, None));
matched_rows.push(doc_id);
}
matched_rows
(matched_rows, None)
};

if !matched_rows.is_empty() {
Ok(Some(matched_rows))
Ok(Some((matched_rows, matched_scores)))
} else {
Ok(None)
}
Expand Down Expand Up @@ -242,7 +245,7 @@ impl InvertedIndexReader {
index_record: &IndexRecordOption,
fuzziness: &Option<u8>,
mut inverted_index_meta_map: HashMap<String, SingleColumnMeta>,
) -> Result<Option<Vec<(usize, Option<F32>)>>> {
) -> Result<Option<(Vec<usize>, Option<Vec<F32>>)>> {
// 1. read fst and term files.
let mut columns = Vec::with_capacity(field_ids.len() * 2);
for field_id in field_ids {
Expand Down Expand Up @@ -479,19 +482,25 @@ impl InvertedIndexReader {

if let Some(matched_doc_ids) = matched_doc_ids {
if !matched_doc_ids.is_empty() {
let mut matched_rows = Vec::with_capacity(matched_doc_ids.len() as usize);
if self.has_score {
let (matched_rows, matched_scores) = if self.has_score {
let scores =
collector.calculate_scores(query.box_clone(), &matched_doc_ids, None)?;
for (doc_id, score) in matched_doc_ids.into_iter().zip(scores.into_iter()) {
matched_rows.push((doc_id as usize, Some(score)));
}
let mut rows_scores = matched_doc_ids
.into_iter()
.zip(scores.into_iter())
.map(|(doc_id, score)| (doc_id as usize, score))
.collect::<Vec<_>>();
rows_scores.sort_by(|a, b| b.1.cmp(&a.1));
let (matched_rows, matched_scores) = rows_scores.into_iter().unzip();
(matched_rows, Some(matched_scores))
} else {
let mut matched_rows = Vec::with_capacity(matched_doc_ids.len() as usize);
for doc_id in matched_doc_ids.into_iter() {
matched_rows.push((doc_id as usize, None));
matched_rows.push(doc_id as usize);
}
}
return Ok(Some(matched_rows));
(matched_rows, None)
};
return Ok(Some((matched_rows, matched_scores)));
}
}
Ok(None)
Expand Down
50 changes: 40 additions & 10 deletions src/query/storages/fuse/src/operations/read/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,46 @@ pub(crate) fn add_data_block_meta(
let block_meta = fuse_part.block_meta_index().unwrap();

// Transform matched_rows indices from block-level to page-level
let matched_rows = block_meta.matched_rows.clone().map(|matched_rows| {
if let Some(offsets) = &offsets {
matched_rows
.into_iter()
.filter(|(idx, _)| offsets.contains(*idx as u64))
.map(|(idx, score)| ((offsets.rank(idx as u64) - 1) as usize, score))
.collect::<Vec<_>>()
} else {
matched_rows
let (matched_rows, matched_scores) = if let Some(offsets) = &offsets {
match (
block_meta.matched_rows.clone(),
block_meta.matched_scores.clone(),
) {
(Some(rows), Some(scores)) => {
debug_assert_eq!(rows.len(), scores.len());
let mut filtered_rows = Vec::with_capacity(rows.len());
let mut filtered_scores = Vec::with_capacity(scores.len());
for (idx, score) in rows.into_iter().zip(scores.into_iter()) {
if offsets.contains(idx as u64) {
let rank = offsets.rank(idx as u64);
debug_assert!(rank > 0);
let new_idx = (rank - 1) as usize;
filtered_rows.push(new_idx);
filtered_scores.push(score);
}
}
(Some(filtered_rows), Some(filtered_scores))
}
(Some(rows), None) => {
let mut filtered_rows = Vec::with_capacity(rows.len());
for idx in rows.into_iter() {
if offsets.contains(idx as u64) {
let rank = offsets.rank(idx as u64);
debug_assert!(rank > 0);
let new_idx = (rank - 1) as usize;
filtered_rows.push(new_idx);
}
}
(Some(filtered_rows), None)
}
(None, _) => (None, None),
}
});
} else {
(
block_meta.matched_rows.clone(),
block_meta.matched_scores.clone(),
)
};

// Transform vector_scores indices from block-level to page-level
let vector_scores = block_meta.vector_scores.clone().map(|vector_scores| {
Expand All @@ -105,6 +134,7 @@ pub(crate) fn add_data_block_meta(
base_block_ids,
inner: meta,
matched_rows,
matched_scores,
vector_scores,
};
meta = Some(Box::new(internal_column_meta));
Expand Down
Loading