1#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{tracing, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::arrow::datatypes::Field;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::prelude::DataType;
32use futures::StreamExt;
33use mito_codec::row_converter::build_primary_key_codec;
34use object_store::ObjectStore;
35use parquet::arrow::ProjectionMask;
36use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
37use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
38use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
39use partition::expr::PartitionExpr;
40use snafu::ResultExt;
41use store_api::codec::PrimaryKeyEncoding;
42use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
43use store_api::region_request::PathType;
44use store_api::storage::{ColumnId, FileId};
45use table::predicate::Predicate;
46
47use crate::cache::index::result_cache::PredicateKey;
48use crate::cache::{CacheStrategy, CachedSstMeta};
49#[cfg(feature = "vector_index")]
50use crate::error::ApplyVectorIndexSnafu;
51use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
52use crate::metrics::{
53 PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
54 READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
55};
56use crate::read::flat_projection::CompactionProjectionMapper;
57use crate::read::prune::FlatPruneReader;
58use crate::read::{Batch, BatchReader};
59use crate::sst::file::FileHandle;
60use crate::sst::index::bloom_filter::applier::{
61 BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
62};
63use crate::sst::index::fulltext_index::applier::{
64 FulltextIndexApplierRef, FulltextIndexApplyMetrics,
65};
66use crate::sst::index::inverted_index::applier::{
67 InvertedIndexApplierRef, InvertedIndexApplyMetrics,
68};
69#[cfg(feature = "vector_index")]
70use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
71use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
72use crate::sst::parquet::async_reader::SstAsyncFileReader;
73use crate::sst::parquet::file_range::{
74 FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
75};
76use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
77use crate::sst::parquet::metadata::MetadataLoader;
78use crate::sst::parquet::prefilter::{
79 PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter,
80};
81use crate::sst::parquet::read_columns::{ParquetReadColumns, build_projection_mask};
82use crate::sst::parquet::row_group::ParquetFetchMetrics;
83use crate::sst::parquet::row_selection::RowGroupSelection;
84use crate::sst::parquet::stats::RowGroupPruningStats;
85use crate::sst::tag_maybe_to_dictionary_field;
86
87const INDEX_TYPE_FULLTEXT: &str = "fulltext";
88const INDEX_TYPE_INVERTED: &str = "inverted";
89const INDEX_TYPE_BLOOM: &str = "bloom filter";
90const INDEX_TYPE_VECTOR: &str = "vector";
91
92macro_rules! handle_index_error {
93 ($err:expr, $file_handle:expr, $index_type:expr) => {
94 if cfg!(any(test, feature = "test")) {
95 panic!(
96 "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
97 $index_type,
98 $file_handle.region_id(),
99 $file_handle.file_id(),
100 $err
101 );
102 } else {
103 warn!(
104 $err; "Failed to apply {} index, region_id: {}, file_id: {}",
105 $index_type,
106 $file_handle.region_id(),
107 $file_handle.file_id()
108 );
109 }
110 };
111}
112
113pub struct ParquetReaderBuilder {
115 table_dir: String,
117 path_type: PathType,
119 file_handle: FileHandle,
120 object_store: ObjectStore,
121 predicate: Option<Predicate>,
123 projection: Option<Vec<ColumnId>>,
128 cache_strategy: CacheStrategy,
130 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
132 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
133 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
134 #[cfg(feature = "vector_index")]
136 vector_index_applier: Option<VectorIndexApplierRef>,
137 #[cfg(feature = "vector_index")]
139 vector_index_k: Option<usize>,
140 expected_metadata: Option<RegionMetadataRef>,
144 compaction: bool,
146 pre_filter_mode: PreFilterMode,
148 decode_primary_key_values: bool,
150 page_index_policy: PageIndexPolicy,
151}
152
153impl ParquetReaderBuilder {
154 pub fn new(
156 table_dir: String,
157 path_type: PathType,
158 file_handle: FileHandle,
159 object_store: ObjectStore,
160 ) -> ParquetReaderBuilder {
161 ParquetReaderBuilder {
162 table_dir,
163 path_type,
164 file_handle,
165 object_store,
166 predicate: None,
167 projection: None,
168 cache_strategy: CacheStrategy::Disabled,
169 inverted_index_appliers: [None, None],
170 bloom_filter_index_appliers: [None, None],
171 fulltext_index_appliers: [None, None],
172 #[cfg(feature = "vector_index")]
173 vector_index_applier: None,
174 #[cfg(feature = "vector_index")]
175 vector_index_k: None,
176 expected_metadata: None,
177 compaction: false,
178 pre_filter_mode: PreFilterMode::All,
179 decode_primary_key_values: false,
180 page_index_policy: Default::default(),
181 }
182 }
183
184 #[must_use]
186 pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
187 self.predicate = predicate;
188 self
189 }
190
191 #[must_use]
195 pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
196 self.projection = projection;
197 self
198 }
199
200 #[must_use]
202 pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
203 self.cache_strategy = cache;
204 self
205 }
206
207 #[must_use]
209 pub(crate) fn inverted_index_appliers(
210 mut self,
211 index_appliers: [Option<InvertedIndexApplierRef>; 2],
212 ) -> Self {
213 self.inverted_index_appliers = index_appliers;
214 self
215 }
216
217 #[must_use]
219 pub(crate) fn bloom_filter_index_appliers(
220 mut self,
221 index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
222 ) -> Self {
223 self.bloom_filter_index_appliers = index_appliers;
224 self
225 }
226
227 #[must_use]
229 pub(crate) fn fulltext_index_appliers(
230 mut self,
231 index_appliers: [Option<FulltextIndexApplierRef>; 2],
232 ) -> Self {
233 self.fulltext_index_appliers = index_appliers;
234 self
235 }
236
237 #[cfg(feature = "vector_index")]
239 #[must_use]
240 pub(crate) fn vector_index_applier(
241 mut self,
242 applier: Option<VectorIndexApplierRef>,
243 k: Option<usize>,
244 ) -> Self {
245 self.vector_index_applier = applier;
246 self.vector_index_k = k;
247 self
248 }
249
250 #[must_use]
252 pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
253 self.expected_metadata = expected_metadata;
254 self
255 }
256
257 #[must_use]
259 pub fn compaction(mut self, compaction: bool) -> Self {
260 self.compaction = compaction;
261 self
262 }
263
264 #[must_use]
266 pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
267 self.pre_filter_mode = pre_filter_mode;
268 self
269 }
270
271 #[must_use]
273 pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
274 self.decode_primary_key_values = decode;
275 self
276 }
277
278 #[must_use]
279 pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
280 self.page_index_policy = page_index_policy;
281 self
282 }
283
284 #[tracing::instrument(
288 skip_all,
289 fields(
290 region_id = %self.file_handle.region_id(),
291 file_id = %self.file_handle.file_id()
292 )
293 )]
294 pub async fn build(&self) -> Result<Option<ParquetReader>> {
295 let mut metrics = ReaderMetrics::default();
296
297 let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
298 return Ok(None);
299 };
300 ParquetReader::new(Arc::new(context), selection)
301 .await
302 .map(Some)
303 }
304
305 #[tracing::instrument(
309 skip_all,
310 fields(
311 region_id = %self.file_handle.region_id(),
312 file_id = %self.file_handle.file_id()
313 )
314 )]
315 pub(crate) async fn build_reader_input(
316 &self,
317 metrics: &mut ReaderMetrics,
318 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
319 self.build_reader_input_inner(metrics).await
320 }
321
322 async fn build_reader_input_inner(
323 &self,
324 metrics: &mut ReaderMetrics,
325 ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
326 let start = Instant::now();
327
328 let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
329 let file_size = self.file_handle.meta_ref().file_size;
330
331 let (sst_meta, cache_miss) = self
333 .read_parquet_metadata(
334 &file_path,
335 file_size,
336 &mut metrics.metadata_cache_metrics,
337 self.page_index_policy,
338 )
339 .await?;
340 let parquet_meta = sst_meta.parquet_metadata();
341 let region_meta = sst_meta.region_metadata();
342 let region_partition_expr_str = self
343 .expected_metadata
344 .as_ref()
345 .and_then(|meta| meta.partition_expr.as_ref())
346 .map(|expr| expr.as_str());
347 let (_, is_same_region_partition) = Self::is_same_region_partition(
348 region_partition_expr_str,
349 self.file_handle.meta_ref().partition_expr.as_ref(),
350 )?;
351 let skip_auto_convert = self.compaction && is_same_region_partition;
355
356 let compaction_projection_mapper = if self.compaction
365 && !is_same_region_partition
366 && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
367 {
368 Some(CompactionProjectionMapper::try_new(®ion_meta)?)
369 } else {
370 None
371 };
372
373 let mut read_format = if let Some(column_ids) = &self.projection {
374 ReadFormat::new(
375 region_meta.clone(),
376 Some(column_ids),
377 true, Some(parquet_meta.file_metadata().schema_descr().num_columns()),
379 &file_path,
380 skip_auto_convert,
381 )?
382 } else {
383 let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta);
385 let column_ids: Vec<_> = expected_meta
386 .column_metadatas
387 .iter()
388 .map(|col| col.column_id)
389 .collect();
390 ReadFormat::new(
391 region_meta.clone(),
392 Some(&column_ids),
393 true, Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395 &file_path,
396 skip_auto_convert,
397 )?
398 };
399 if self.decode_primary_key_values {
400 read_format.set_decode_primary_key_values(true);
401 }
402 if need_override_sequence(&parquet_meta) {
403 read_format
404 .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
405 }
406
407 let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
409 let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices(
410 read_format.projection_indices().iter().copied(),
411 );
412
413 let projection_mask = build_projection_mask(&parquet_read_cols, parquet_schema_desc);
414 let selection = self
415 .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
416 .await;
417
418 if selection.is_empty() {
419 metrics.build_cost += start.elapsed();
420 return Ok(None);
421 }
422
423 if cache_miss && !selection.is_empty() {
425 use crate::cache::file_cache::{FileType, IndexKey};
426 let index_key = IndexKey::new(
427 self.file_handle.region_id(),
428 self.file_handle.file_id().file_id(),
429 FileType::Parquet,
430 );
431 self.cache_strategy.maybe_download_background(
432 index_key,
433 file_path.clone(),
434 self.object_store.clone(),
435 file_size,
436 );
437 }
438
439 let prune_schema = self
440 .expected_metadata
441 .as_ref()
442 .map(|meta| meta.schema.clone())
443 .unwrap_or_else(|| region_meta.schema.clone());
444
445 let arrow_reader_options =
447 ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
448 let arrow_metadata =
449 ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
450 .context(ReadDataPartSnafu)?;
451
452 let filters = if let Some(predicate) = &self.predicate {
453 predicate
454 .exprs()
455 .iter()
456 .filter_map(|expr| {
457 SimpleFilterContext::new_opt(
458 ®ion_meta,
459 self.expected_metadata.as_deref(),
460 expr,
461 )
462 })
463 .collect::<Vec<_>>()
464 } else {
465 vec![]
466 };
467
468 let dyn_filters = if let Some(predicate) = &self.predicate {
469 predicate.dyn_filters().as_ref().clone()
470 } else {
471 vec![]
472 };
473
474 let codec = build_primary_key_codec(read_format.metadata());
475
476 let primary_key_filters = {
478 let pk_filters = filters
479 .iter()
480 .filter_map(SimpleFilterContext::primary_key_prefilter)
481 .collect::<Vec<_>>();
482 (!pk_filters.is_empty()).then_some(Arc::new(pk_filters))
483 };
484
485 let prefilter_builder = PrefilterContextBuilder::new(
486 &read_format,
487 &codec,
488 primary_key_filters.as_ref(),
489 parquet_meta.file_metadata().schema_descr(),
490 );
491
492 let reader_builder = RowGroupReaderBuilder {
493 file_handle: self.file_handle.clone(),
494 file_path,
495 parquet_meta,
496 arrow_metadata,
497 object_store: self.object_store.clone(),
498 projection: projection_mask,
499 cache_strategy: self.cache_strategy.clone(),
500 prefilter_builder,
501 };
502
503 let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
504
505 let context = FileRangeContext::new(
506 reader_builder,
507 RangeBase {
508 filters,
509 dyn_filters,
510 read_format,
511 expected_metadata: self.expected_metadata.clone(),
512 prune_schema,
513 codec,
514 compat_batch: None,
515 compaction_projection_mapper,
516 pre_filter_mode: self.pre_filter_mode,
517 partition_filter,
518 },
519 );
520
521 metrics.build_cost += start.elapsed();
522
523 Ok(Some((context, selection)))
524 }
525
526 fn is_same_region_partition(
527 region_partition_expr_str: Option<&str>,
528 file_partition_expr: Option<&PartitionExpr>,
529 ) -> Result<(Option<PartitionExpr>, bool)> {
530 let region_partition_expr = match region_partition_expr_str {
531 Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
532 None => None,
533 };
534
535 let is_same = region_partition_expr.as_ref() == file_partition_expr;
536 Ok((region_partition_expr, is_same))
537 }
538
539 fn build_partition_filter(
542 &self,
543 read_format: &ReadFormat,
544 prune_schema: &Arc<datatypes::schema::Schema>,
545 ) -> Result<Option<PartitionFilterContext>> {
546 let region_partition_expr_str = self
547 .expected_metadata
548 .as_ref()
549 .and_then(|meta| meta.partition_expr.as_ref());
550 let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
551
552 let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
553 region_partition_expr_str.map(|s| s.as_str()),
554 file_partition_expr_ref,
555 )?;
556
557 if is_same_region_partition {
558 return Ok(None);
559 }
560
561 let Some(region_partition_expr) = region_partition_expr else {
562 return Ok(None);
563 };
564
565 let mut referenced_columns = HashSet::new();
567 region_partition_expr.collect_column_names(&mut referenced_columns);
568
569 let is_flat = read_format.as_flat().is_some();
571 let partition_schema = Arc::new(datatypes::schema::Schema::new(
572 prune_schema
573 .column_schemas()
574 .iter()
575 .filter(|col| referenced_columns.contains(&col.name))
576 .map(|col| {
577 if is_flat
578 && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
579 && column_meta.semantic_type == SemanticType::Tag
580 && col.data_type.is_string()
581 {
582 let field = Arc::new(Field::new(
583 &col.name,
584 col.data_type.as_arrow_type(),
585 col.is_nullable(),
586 ));
587 let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
588 let mut column = col.clone();
589 column.data_type =
590 ConcreteDataType::from_arrow_type(dict_field.data_type());
591 return column;
592 }
593
594 col.clone()
595 })
596 .collect::<Vec<_>>(),
597 ));
598
599 let region_partition_physical_expr = region_partition_expr
600 .try_as_physical_expr(partition_schema.arrow_schema())
601 .context(SerializePartitionExprSnafu)?;
602
603 Ok(Some(PartitionFilterContext {
604 region_partition_physical_expr,
605 partition_schema,
606 }))
607 }
608
609 async fn read_parquet_metadata(
612 &self,
613 file_path: &str,
614 file_size: u64,
615 cache_metrics: &mut MetadataCacheMetrics,
616 page_index_policy: PageIndexPolicy,
617 ) -> Result<(Arc<CachedSstMeta>, bool)> {
618 let start = Instant::now();
619 let _t = READ_STAGE_ELAPSED
620 .with_label_values(&["read_parquet_metadata"])
621 .start_timer();
622
623 let file_id = self.file_handle.file_id();
624 if let Some(metadata) = self
626 .cache_strategy
627 .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
628 .await
629 {
630 cache_metrics.metadata_load_cost += start.elapsed();
631 return Ok((metadata, false));
632 }
633
634 let mut metadata_loader =
636 MetadataLoader::new(self.object_store.clone(), file_path, file_size);
637 metadata_loader.with_page_index_policy(page_index_policy);
638 let metadata = metadata_loader.load(cache_metrics).await?;
639
640 let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
641 self.cache_strategy
643 .put_sst_meta_data(file_id, metadata.clone());
644
645 cache_metrics.metadata_load_cost += start.elapsed();
646 Ok((metadata, true))
647 }
648
649 #[tracing::instrument(
651 skip_all,
652 fields(
653 region_id = %self.file_handle.region_id(),
654 file_id = %self.file_handle.file_id()
655 )
656 )]
657 async fn row_groups_to_read(
658 &self,
659 read_format: &ReadFormat,
660 parquet_meta: &ParquetMetaData,
661 metrics: &mut ReaderFilterMetrics,
662 ) -> RowGroupSelection {
663 let num_row_groups = parquet_meta.num_row_groups();
664 let num_rows = parquet_meta.file_metadata().num_rows();
665 if num_row_groups == 0 || num_rows == 0 {
666 return RowGroupSelection::default();
667 }
668
669 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
672 if row_group_size == 0 {
673 return RowGroupSelection::default();
674 }
675
676 metrics.rg_total += num_row_groups;
677 metrics.rows_total += num_rows as usize;
678
679 let skip_fields = self.pre_filter_mode.skip_fields();
681
682 let mut output = self.row_groups_by_minmax(
683 read_format,
684 parquet_meta,
685 row_group_size,
686 num_rows as usize,
687 metrics,
688 skip_fields,
689 );
690 if output.is_empty() {
691 return output;
692 }
693
694 let fulltext_filtered = self
695 .prune_row_groups_by_fulltext_index(
696 row_group_size,
697 num_row_groups,
698 &mut output,
699 metrics,
700 skip_fields,
701 )
702 .await;
703 if output.is_empty() {
704 return output;
705 }
706
707 self.prune_row_groups_by_inverted_index(
708 row_group_size,
709 num_row_groups,
710 &mut output,
711 metrics,
712 skip_fields,
713 )
714 .await;
715 if output.is_empty() {
716 return output;
717 }
718
719 self.prune_row_groups_by_bloom_filter(
720 row_group_size,
721 parquet_meta,
722 &mut output,
723 metrics,
724 skip_fields,
725 )
726 .await;
727 if output.is_empty() {
728 return output;
729 }
730
731 if !fulltext_filtered {
732 self.prune_row_groups_by_fulltext_bloom(
733 row_group_size,
734 parquet_meta,
735 &mut output,
736 metrics,
737 skip_fields,
738 )
739 .await;
740 }
741 #[cfg(feature = "vector_index")]
742 {
743 self.prune_row_groups_by_vector_index(
744 row_group_size,
745 num_row_groups,
746 &mut output,
747 metrics,
748 )
749 .await;
750 if output.is_empty() {
751 return output;
752 }
753 }
754 output
755 }
756
757 async fn prune_row_groups_by_fulltext_index(
759 &self,
760 row_group_size: usize,
761 num_row_groups: usize,
762 output: &mut RowGroupSelection,
763 metrics: &mut ReaderFilterMetrics,
764 skip_fields: bool,
765 ) -> bool {
766 if !self.file_handle.meta_ref().fulltext_index_available() {
767 return false;
768 }
769
770 let mut pruned = false;
771 let appliers = if skip_fields {
773 &self.fulltext_index_appliers[..1]
774 } else {
775 &self.fulltext_index_appliers[..]
776 };
777 for index_applier in appliers.iter().flatten() {
778 let predicate_key = index_applier.predicate_key();
779 let cached = self
781 .cache_strategy
782 .index_result_cache()
783 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
784 if let Some(result) = cached.as_ref()
785 && all_required_row_groups_searched(output, result)
786 {
787 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
788 metrics.fulltext_index_cache_hit += 1;
789 pruned = true;
790 continue;
791 }
792
793 metrics.fulltext_index_cache_miss += 1;
795 let file_size_hint = self.file_handle.meta_ref().index_file_size();
796 let apply_res = index_applier
797 .apply_fine(
798 self.file_handle.index_id(),
799 Some(file_size_hint),
800 metrics.fulltext_index_apply_metrics.as_mut(),
801 )
802 .await;
803 let selection = match apply_res {
804 Ok(Some(res)) => {
805 RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
806 }
807 Ok(None) => continue,
808 Err(err) => {
809 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
810 continue;
811 }
812 };
813
814 self.apply_index_result_and_update_cache(
815 predicate_key,
816 self.file_handle.file_id().file_id(),
817 selection,
818 output,
819 metrics,
820 INDEX_TYPE_FULLTEXT,
821 );
822 pruned = true;
823 }
824 pruned
825 }
826
827 async fn prune_row_groups_by_inverted_index(
833 &self,
834 row_group_size: usize,
835 num_row_groups: usize,
836 output: &mut RowGroupSelection,
837 metrics: &mut ReaderFilterMetrics,
838 skip_fields: bool,
839 ) -> bool {
840 if !self.file_handle.meta_ref().inverted_index_available() {
841 return false;
842 }
843
844 let mut pruned = false;
845 let appliers = if skip_fields {
847 &self.inverted_index_appliers[..1]
848 } else {
849 &self.inverted_index_appliers[..]
850 };
851 for index_applier in appliers.iter().flatten() {
852 let predicate_key = index_applier.predicate_key();
853 let cached = self
855 .cache_strategy
856 .index_result_cache()
857 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
858 if let Some(result) = cached.as_ref()
859 && all_required_row_groups_searched(output, result)
860 {
861 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
862 metrics.inverted_index_cache_hit += 1;
863 pruned = true;
864 continue;
865 }
866
867 metrics.inverted_index_cache_miss += 1;
869 let file_size_hint = self.file_handle.meta_ref().index_file_size();
870 let apply_res = index_applier
871 .apply(
872 self.file_handle.index_id(),
873 Some(file_size_hint),
874 metrics.inverted_index_apply_metrics.as_mut(),
875 )
876 .await;
877 let selection = match apply_res {
878 Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
879 row_group_size,
880 num_row_groups,
881 apply_output,
882 ),
883 Err(err) => {
884 handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
885 continue;
886 }
887 };
888
889 self.apply_index_result_and_update_cache(
890 predicate_key,
891 self.file_handle.file_id().file_id(),
892 selection,
893 output,
894 metrics,
895 INDEX_TYPE_INVERTED,
896 );
897 pruned = true;
898 }
899 pruned
900 }
901
902 async fn prune_row_groups_by_bloom_filter(
903 &self,
904 row_group_size: usize,
905 parquet_meta: &ParquetMetaData,
906 output: &mut RowGroupSelection,
907 metrics: &mut ReaderFilterMetrics,
908 skip_fields: bool,
909 ) -> bool {
910 if !self.file_handle.meta_ref().bloom_filter_index_available() {
911 return false;
912 }
913
914 let mut pruned = false;
915 let appliers = if skip_fields {
917 &self.bloom_filter_index_appliers[..1]
918 } else {
919 &self.bloom_filter_index_appliers[..]
920 };
921 for index_applier in appliers.iter().flatten() {
922 let predicate_key = index_applier.predicate_key();
923 let cached = self
925 .cache_strategy
926 .index_result_cache()
927 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
928 if let Some(result) = cached.as_ref()
929 && all_required_row_groups_searched(output, result)
930 {
931 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
932 metrics.bloom_filter_cache_hit += 1;
933 pruned = true;
934 continue;
935 }
936
937 metrics.bloom_filter_cache_miss += 1;
939 let file_size_hint = self.file_handle.meta_ref().index_file_size();
940 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
941 (
942 rg.num_rows() as usize,
943 output.contains_non_empty_row_group(i)
945 && cached
946 .as_ref()
947 .map(|c| !c.contains_row_group(i))
948 .unwrap_or(true),
949 )
950 });
951 let apply_res = index_applier
952 .apply(
953 self.file_handle.index_id(),
954 Some(file_size_hint),
955 rgs,
956 metrics.bloom_filter_apply_metrics.as_mut(),
957 )
958 .await;
959 let mut selection = match apply_res {
960 Ok(apply_output) => {
961 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
962 }
963 Err(err) => {
964 handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
965 continue;
966 }
967 };
968
969 if let Some(cached) = cached.as_ref() {
971 selection.concat(cached);
972 }
973
974 self.apply_index_result_and_update_cache(
975 predicate_key,
976 self.file_handle.file_id().file_id(),
977 selection,
978 output,
979 metrics,
980 INDEX_TYPE_BLOOM,
981 );
982 pruned = true;
983 }
984 pruned
985 }
986
987 #[cfg(feature = "vector_index")]
989 async fn prune_row_groups_by_vector_index(
990 &self,
991 row_group_size: usize,
992 num_row_groups: usize,
993 output: &mut RowGroupSelection,
994 metrics: &mut ReaderFilterMetrics,
995 ) {
996 let Some(applier) = &self.vector_index_applier else {
997 return;
998 };
999 let Some(k) = self.vector_index_k else {
1000 return;
1001 };
1002 if !self.file_handle.meta_ref().vector_index_available() {
1003 return;
1004 }
1005
1006 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1007 let apply_res = applier
1008 .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1009 .await;
1010 let row_ids = match apply_res {
1011 Ok(res) => res.row_offsets,
1012 Err(err) => {
1013 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1014 return;
1015 }
1016 };
1017
1018 let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1019 {
1020 Ok(selection) => selection,
1021 Err(err) => {
1022 handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1023 return;
1024 }
1025 };
1026 metrics.rows_vector_selected += selection.row_count();
1027 apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1028 }
1029
1030 async fn prune_row_groups_by_fulltext_bloom(
1031 &self,
1032 row_group_size: usize,
1033 parquet_meta: &ParquetMetaData,
1034 output: &mut RowGroupSelection,
1035 metrics: &mut ReaderFilterMetrics,
1036 skip_fields: bool,
1037 ) -> bool {
1038 if !self.file_handle.meta_ref().fulltext_index_available() {
1039 return false;
1040 }
1041
1042 let mut pruned = false;
1043 let appliers = if skip_fields {
1045 &self.fulltext_index_appliers[..1]
1046 } else {
1047 &self.fulltext_index_appliers[..]
1048 };
1049 for index_applier in appliers.iter().flatten() {
1050 let predicate_key = index_applier.predicate_key();
1051 let cached = self
1053 .cache_strategy
1054 .index_result_cache()
1055 .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1056 if let Some(result) = cached.as_ref()
1057 && all_required_row_groups_searched(output, result)
1058 {
1059 apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1060 metrics.fulltext_index_cache_hit += 1;
1061 pruned = true;
1062 continue;
1063 }
1064
1065 metrics.fulltext_index_cache_miss += 1;
1067 let file_size_hint = self.file_handle.meta_ref().index_file_size();
1068 let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1069 (
1070 rg.num_rows() as usize,
1071 output.contains_non_empty_row_group(i)
1073 && cached
1074 .as_ref()
1075 .map(|c| !c.contains_row_group(i))
1076 .unwrap_or(true),
1077 )
1078 });
1079 let apply_res = index_applier
1080 .apply_coarse(
1081 self.file_handle.index_id(),
1082 Some(file_size_hint),
1083 rgs,
1084 metrics.fulltext_index_apply_metrics.as_mut(),
1085 )
1086 .await;
1087 let mut selection = match apply_res {
1088 Ok(Some(apply_output)) => {
1089 RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1090 }
1091 Ok(None) => continue,
1092 Err(err) => {
1093 handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1094 continue;
1095 }
1096 };
1097
1098 if let Some(cached) = cached.as_ref() {
1100 selection.concat(cached);
1101 }
1102
1103 self.apply_index_result_and_update_cache(
1104 predicate_key,
1105 self.file_handle.file_id().file_id(),
1106 selection,
1107 output,
1108 metrics,
1109 INDEX_TYPE_FULLTEXT,
1110 );
1111 pruned = true;
1112 }
1113 pruned
1114 }
1115
1116 fn row_groups_by_minmax(
1118 &self,
1119 read_format: &ReadFormat,
1120 parquet_meta: &ParquetMetaData,
1121 row_group_size: usize,
1122 total_row_count: usize,
1123 metrics: &mut ReaderFilterMetrics,
1124 skip_fields: bool,
1125 ) -> RowGroupSelection {
1126 let Some(predicate) = &self.predicate else {
1127 return RowGroupSelection::new(row_group_size, total_row_count);
1128 };
1129
1130 let file_id = self.file_handle.file_id().file_id();
1131 let index_result_cache = self.cache_strategy.index_result_cache();
1132 let cached_minmax_key =
1133 if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1134 let mut exprs = predicate
1137 .exprs()
1138 .iter()
1139 .map(|expr| format!("{expr:?}"))
1140 .collect::<Vec<_>>();
1141 exprs.sort();
1142 let schema_version = self
1143 .expected_metadata
1144 .as_ref()
1145 .map(|meta| meta.schema_version)
1146 .unwrap_or_else(|| read_format.metadata().schema_version);
1147 Some(PredicateKey::new_minmax(
1148 Arc::new(exprs),
1149 schema_version,
1150 skip_fields,
1151 ))
1152 } else {
1153 None
1154 };
1155
1156 if let Some(index_result_cache) = index_result_cache
1157 && let Some(predicate_key) = cached_minmax_key.as_ref()
1158 {
1159 if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1160 metrics.minmax_cache_hit += 1;
1161 let num_row_groups = parquet_meta.num_row_groups();
1162 metrics.rg_minmax_filtered +=
1163 num_row_groups.saturating_sub(result.row_group_count());
1164 return (*result).clone();
1165 }
1166
1167 metrics.minmax_cache_miss += 1;
1168 }
1169
1170 let region_meta = read_format.metadata();
1171 let row_groups = parquet_meta.row_groups();
1172 let stats = RowGroupPruningStats::new(
1173 row_groups,
1174 read_format,
1175 self.expected_metadata.clone(),
1176 skip_fields,
1177 );
1178 let prune_schema = self
1179 .expected_metadata
1180 .as_ref()
1181 .map(|meta| meta.schema.arrow_schema())
1182 .unwrap_or_else(|| region_meta.schema.arrow_schema());
1183
1184 let mask = predicate.prune_with_stats(&stats, prune_schema);
1188 let output = RowGroupSelection::from_full_row_group_ids(
1189 mask.iter()
1190 .enumerate()
1191 .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1192 row_group_size,
1193 total_row_count,
1194 );
1195
1196 metrics.rg_minmax_filtered += parquet_meta
1197 .num_row_groups()
1198 .saturating_sub(output.row_group_count());
1199
1200 if let Some(index_result_cache) = index_result_cache
1201 && let Some(predicate_key) = cached_minmax_key
1202 {
1203 index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1204 }
1205
1206 output
1207 }
1208
1209 fn apply_index_result_and_update_cache(
1210 &self,
1211 predicate_key: &PredicateKey,
1212 file_id: FileId,
1213 result: RowGroupSelection,
1214 output: &mut RowGroupSelection,
1215 metrics: &mut ReaderFilterMetrics,
1216 index_type: &str,
1217 ) {
1218 apply_selection_and_update_metrics(output, &result, metrics, index_type);
1219
1220 if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1221 index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1222 }
1223 }
1224}
1225fn apply_selection_and_update_metrics(
1226 output: &mut RowGroupSelection,
1227 result: &RowGroupSelection,
1228 metrics: &mut ReaderFilterMetrics,
1229 index_type: &str,
1230) {
1231 let intersection = output.intersect(result);
1232
1233 let row_group_count = output.row_group_count() - intersection.row_group_count();
1234 let row_count = output.row_count() - intersection.row_count();
1235
1236 metrics.update_index_metrics(index_type, row_group_count, row_count);
1237
1238 *output = intersection;
1239}
1240
1241#[cfg(feature = "vector_index")]
1242fn vector_selection_from_offsets(
1243 row_offsets: Vec<u64>,
1244 row_group_size: usize,
1245 num_row_groups: usize,
1246) -> Result<RowGroupSelection> {
1247 let mut row_ids = BTreeSet::new();
1248 for offset in row_offsets {
1249 let row_id = u32::try_from(offset).map_err(|_| {
1250 ApplyVectorIndexSnafu {
1251 reason: format!("Row offset {} exceeds u32::MAX", offset),
1252 }
1253 .build()
1254 })?;
1255 row_ids.insert(row_id);
1256 }
1257 Ok(RowGroupSelection::from_row_ids(
1258 row_ids,
1259 row_group_size,
1260 num_row_groups,
1261 ))
1262}
1263
1264fn all_required_row_groups_searched(
1265 required_row_groups: &RowGroupSelection,
1266 cached_row_groups: &RowGroupSelection,
1267) -> bool {
1268 required_row_groups.iter().all(|(rg_id, _)| {
1269 !required_row_groups.contains_non_empty_row_group(*rg_id)
1271 || cached_row_groups.contains_row_group(*rg_id)
1273 })
1274}
1275
1276#[derive(Debug, Default, Clone)]
1278pub(crate) struct ReaderFilterMetrics {
1279 pub(crate) rg_total: usize,
1281 pub(crate) rg_fulltext_filtered: usize,
1283 pub(crate) rg_inverted_filtered: usize,
1285 pub(crate) rg_minmax_filtered: usize,
1287 pub(crate) rg_bloom_filtered: usize,
1289 pub(crate) rg_vector_filtered: usize,
1291
1292 pub(crate) rows_total: usize,
1294 pub(crate) rows_fulltext_filtered: usize,
1296 pub(crate) rows_inverted_filtered: usize,
1298 pub(crate) rows_bloom_filtered: usize,
1300 pub(crate) rows_vector_filtered: usize,
1302 pub(crate) rows_vector_selected: usize,
1304 pub(crate) rows_precise_filtered: usize,
1306
1307 pub(crate) fulltext_index_cache_hit: usize,
1309 pub(crate) fulltext_index_cache_miss: usize,
1311 pub(crate) inverted_index_cache_hit: usize,
1313 pub(crate) inverted_index_cache_miss: usize,
1315 pub(crate) bloom_filter_cache_hit: usize,
1317 pub(crate) bloom_filter_cache_miss: usize,
1319 pub(crate) minmax_cache_hit: usize,
1321 pub(crate) minmax_cache_miss: usize,
1323
1324 pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1326 pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1328 pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1330
1331 pub(crate) pruner_cache_hit: usize,
1333 pub(crate) pruner_cache_miss: usize,
1335 pub(crate) pruner_prune_cost: Duration,
1337}
1338
1339impl ReaderFilterMetrics {
1340 pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1342 self.rg_total += other.rg_total;
1343 self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1344 self.rg_inverted_filtered += other.rg_inverted_filtered;
1345 self.rg_minmax_filtered += other.rg_minmax_filtered;
1346 self.rg_bloom_filtered += other.rg_bloom_filtered;
1347 self.rg_vector_filtered += other.rg_vector_filtered;
1348
1349 self.rows_total += other.rows_total;
1350 self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1351 self.rows_inverted_filtered += other.rows_inverted_filtered;
1352 self.rows_bloom_filtered += other.rows_bloom_filtered;
1353 self.rows_vector_filtered += other.rows_vector_filtered;
1354 self.rows_vector_selected += other.rows_vector_selected;
1355 self.rows_precise_filtered += other.rows_precise_filtered;
1356
1357 self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1358 self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1359 self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1360 self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1361 self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1362 self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1363 self.minmax_cache_hit += other.minmax_cache_hit;
1364 self.minmax_cache_miss += other.minmax_cache_miss;
1365
1366 self.pruner_cache_hit += other.pruner_cache_hit;
1367 self.pruner_cache_miss += other.pruner_cache_miss;
1368 self.pruner_prune_cost += other.pruner_prune_cost;
1369
1370 if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1372 self.inverted_index_apply_metrics
1373 .get_or_insert_with(Default::default)
1374 .merge_from(other_metrics);
1375 }
1376 if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1377 self.bloom_filter_apply_metrics
1378 .get_or_insert_with(Default::default)
1379 .merge_from(other_metrics);
1380 }
1381 if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1382 self.fulltext_index_apply_metrics
1383 .get_or_insert_with(Default::default)
1384 .merge_from(other_metrics);
1385 }
1386 }
1387
1388 pub(crate) fn observe(&self) {
1390 READ_ROW_GROUPS_TOTAL
1391 .with_label_values(&["before_filtering"])
1392 .inc_by(self.rg_total as u64);
1393 READ_ROW_GROUPS_TOTAL
1394 .with_label_values(&["fulltext_index_filtered"])
1395 .inc_by(self.rg_fulltext_filtered as u64);
1396 READ_ROW_GROUPS_TOTAL
1397 .with_label_values(&["inverted_index_filtered"])
1398 .inc_by(self.rg_inverted_filtered as u64);
1399 READ_ROW_GROUPS_TOTAL
1400 .with_label_values(&["minmax_index_filtered"])
1401 .inc_by(self.rg_minmax_filtered as u64);
1402 READ_ROW_GROUPS_TOTAL
1403 .with_label_values(&["bloom_filter_index_filtered"])
1404 .inc_by(self.rg_bloom_filtered as u64);
1405 READ_ROW_GROUPS_TOTAL
1406 .with_label_values(&["vector_index_filtered"])
1407 .inc_by(self.rg_vector_filtered as u64);
1408
1409 PRECISE_FILTER_ROWS_TOTAL
1410 .with_label_values(&["parquet"])
1411 .inc_by(self.rows_precise_filtered as u64);
1412 READ_ROWS_IN_ROW_GROUP_TOTAL
1413 .with_label_values(&["before_filtering"])
1414 .inc_by(self.rows_total as u64);
1415 READ_ROWS_IN_ROW_GROUP_TOTAL
1416 .with_label_values(&["fulltext_index_filtered"])
1417 .inc_by(self.rows_fulltext_filtered as u64);
1418 READ_ROWS_IN_ROW_GROUP_TOTAL
1419 .with_label_values(&["inverted_index_filtered"])
1420 .inc_by(self.rows_inverted_filtered as u64);
1421 READ_ROWS_IN_ROW_GROUP_TOTAL
1422 .with_label_values(&["bloom_filter_index_filtered"])
1423 .inc_by(self.rows_bloom_filtered as u64);
1424 READ_ROWS_IN_ROW_GROUP_TOTAL
1425 .with_label_values(&["vector_index_filtered"])
1426 .inc_by(self.rows_vector_filtered as u64);
1427 }
1428
1429 fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1430 match index_type {
1431 INDEX_TYPE_FULLTEXT => {
1432 self.rg_fulltext_filtered += row_group_count;
1433 self.rows_fulltext_filtered += row_count;
1434 }
1435 INDEX_TYPE_INVERTED => {
1436 self.rg_inverted_filtered += row_group_count;
1437 self.rows_inverted_filtered += row_count;
1438 }
1439 INDEX_TYPE_BLOOM => {
1440 self.rg_bloom_filtered += row_group_count;
1441 self.rows_bloom_filtered += row_count;
1442 }
1443 INDEX_TYPE_VECTOR => {
1444 self.rg_vector_filtered += row_group_count;
1445 self.rows_vector_filtered += row_count;
1446 }
1447 _ => {}
1448 }
1449 }
1450}
1451
1452#[cfg(all(test, feature = "vector_index"))]
1453mod vector_index_tests {
1454 use super::*;
1455
1456 #[test]
1457 fn test_vector_selection_from_offsets() {
1458 let row_group_size = 4;
1459 let num_row_groups = 3;
1460 let selection =
1461 vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1462 .unwrap();
1463
1464 assert_eq!(selection.row_group_count(), 3);
1465 assert_eq!(selection.row_count(), 4);
1466 assert!(selection.contains_non_empty_row_group(0));
1467 assert!(selection.contains_non_empty_row_group(1));
1468 assert!(selection.contains_non_empty_row_group(2));
1469 }
1470
1471 #[test]
1472 fn test_vector_selection_from_offsets_out_of_range() {
1473 let row_group_size = 4;
1474 let num_row_groups = 2;
1475 let selection = vector_selection_from_offsets(
1476 vec![0, 7, u64::from(u32::MAX) + 1],
1477 row_group_size,
1478 num_row_groups,
1479 );
1480 assert!(selection.is_err());
1481 }
1482
1483 #[test]
1484 fn test_vector_selection_updates_metrics() {
1485 let row_group_size = 4;
1486 let total_rows = 8;
1487 let mut output = RowGroupSelection::new(row_group_size, total_rows);
1488 let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1489 let mut metrics = ReaderFilterMetrics::default();
1490
1491 apply_selection_and_update_metrics(
1492 &mut output,
1493 &selection,
1494 &mut metrics,
1495 INDEX_TYPE_VECTOR,
1496 );
1497
1498 assert_eq!(metrics.rg_vector_filtered, 1);
1499 assert_eq!(metrics.rows_vector_filtered, 7);
1500 assert_eq!(output.row_count(), 1);
1501 }
1502}
1503
1504#[derive(Default, Clone, Copy)]
1506pub(crate) struct MetadataCacheMetrics {
1507 pub(crate) mem_cache_hit: usize,
1509 pub(crate) file_cache_hit: usize,
1511 pub(crate) cache_miss: usize,
1513 pub(crate) metadata_load_cost: Duration,
1515 pub(crate) num_reads: usize,
1517 pub(crate) bytes_read: u64,
1519}
1520
1521impl std::fmt::Debug for MetadataCacheMetrics {
1522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523 let Self {
1524 mem_cache_hit,
1525 file_cache_hit,
1526 cache_miss,
1527 metadata_load_cost,
1528 num_reads,
1529 bytes_read,
1530 } = self;
1531
1532 if self.is_empty() {
1533 return write!(f, "{{}}");
1534 }
1535 write!(f, "{{")?;
1536
1537 write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1538
1539 if *mem_cache_hit > 0 {
1540 write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1541 }
1542 if *file_cache_hit > 0 {
1543 write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1544 }
1545 if *cache_miss > 0 {
1546 write!(f, ", \"cache_miss\":{}", cache_miss)?;
1547 }
1548 if *num_reads > 0 {
1549 write!(f, ", \"num_reads\":{}", num_reads)?;
1550 }
1551 if *bytes_read > 0 {
1552 write!(f, ", \"bytes_read\":{}", bytes_read)?;
1553 }
1554
1555 write!(f, "}}")
1556 }
1557}
1558
1559impl MetadataCacheMetrics {
1560 pub(crate) fn is_empty(&self) -> bool {
1562 self.metadata_load_cost.is_zero()
1563 }
1564
1565 pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1567 self.mem_cache_hit += other.mem_cache_hit;
1568 self.file_cache_hit += other.file_cache_hit;
1569 self.cache_miss += other.cache_miss;
1570 self.metadata_load_cost += other.metadata_load_cost;
1571 self.num_reads += other.num_reads;
1572 self.bytes_read += other.bytes_read;
1573 }
1574}
1575
1576#[derive(Debug, Default, Clone)]
1578pub struct ReaderMetrics {
1579 pub(crate) filter_metrics: ReaderFilterMetrics,
1581 pub(crate) build_cost: Duration,
1583 pub(crate) scan_cost: Duration,
1585 pub(crate) num_record_batches: usize,
1587 pub(crate) num_batches: usize,
1589 pub(crate) num_rows: usize,
1591 pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1593 pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1595 pub(crate) metadata_mem_size: isize,
1597 pub(crate) num_range_builders: isize,
1599}
1600
1601impl ReaderMetrics {
1602 pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1604 self.filter_metrics.merge_from(&other.filter_metrics);
1605 self.build_cost += other.build_cost;
1606 self.scan_cost += other.scan_cost;
1607 self.num_record_batches += other.num_record_batches;
1608 self.num_batches += other.num_batches;
1609 self.num_rows += other.num_rows;
1610 self.metadata_cache_metrics
1611 .merge_from(&other.metadata_cache_metrics);
1612 if let Some(other_fetch) = &other.fetch_metrics {
1613 if let Some(self_fetch) = &self.fetch_metrics {
1614 self_fetch.merge_from(other_fetch);
1615 } else {
1616 self.fetch_metrics = Some(other_fetch.clone());
1617 }
1618 }
1619 self.metadata_mem_size += other.metadata_mem_size;
1620 self.num_range_builders += other.num_range_builders;
1621 }
1622
1623 pub(crate) fn observe_rows(&self, read_type: &str) {
1625 READ_ROWS_TOTAL
1626 .with_label_values(&[read_type])
1627 .inc_by(self.num_rows as u64);
1628 }
1629}
1630
1631pub(crate) struct RowGroupReaderBuilder {
1633 file_handle: FileHandle,
1637 file_path: String,
1639 parquet_meta: Arc<ParquetMetaData>,
1641 arrow_metadata: ArrowReaderMetadata,
1643 object_store: ObjectStore,
1645 projection: ProjectionMask,
1647 cache_strategy: CacheStrategy,
1649 prefilter_builder: Option<PrefilterContextBuilder>,
1651}
1652
1653pub(crate) struct RowGroupBuildContext<'a> {
1656 #[allow(dead_code)]
1658 pub(crate) filters: &'a [SimpleFilterContext],
1659 #[allow(dead_code)]
1661 pub(crate) skip_fields: bool,
1662 pub(crate) row_group_idx: usize,
1664 pub(crate) row_selection: Option<RowSelection>,
1666 pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1668}
1669
1670impl RowGroupReaderBuilder {
1671 pub(crate) fn file_path(&self) -> &str {
1673 &self.file_path
1674 }
1675
1676 pub(crate) fn file_handle(&self) -> &FileHandle {
1678 &self.file_handle
1679 }
1680
1681 pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1682 &self.parquet_meta
1683 }
1684
1685 pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1686 &self.cache_strategy
1687 }
1688
1689 pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool {
1690 self.prefilter_builder.is_some()
1691 }
1692
1693 pub(crate) async fn build(
1699 &self,
1700 build_ctx: RowGroupBuildContext<'_>,
1701 ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1702 let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1703
1704 let Some(mut prefilter_ctx) = prefilter_ctx else {
1705 return self
1707 .build_with_projection(
1708 build_ctx.row_group_idx,
1709 build_ctx.row_selection,
1710 self.projection.clone(),
1711 build_ctx.fetch_metrics,
1712 )
1713 .await;
1714 };
1715
1716 let prefilter_start = Instant::now();
1717 let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1718 if let Some(metrics) = build_ctx.fetch_metrics {
1719 let mut data = metrics.data.lock().unwrap();
1720 data.prefilter_cost += prefilter_start.elapsed();
1721 data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1722 }
1723
1724 let refined_selection = Some(prefilter_result.refined_selection);
1725
1726 self.build_with_projection(
1727 build_ctx.row_group_idx,
1728 refined_selection,
1729 self.projection.clone(),
1730 build_ctx.fetch_metrics,
1731 )
1732 .await
1733 }
1734
1735 pub(crate) async fn build_with_projection(
1737 &self,
1738 row_group_idx: usize,
1739 row_selection: Option<RowSelection>,
1740 projection: ProjectionMask,
1741 fetch_metrics: Option<&ParquetFetchMetrics>,
1742 ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1743 let async_reader = SstAsyncFileReader::new(
1745 self.file_handle.file_id(),
1746 self.file_path.clone(),
1747 self.object_store.clone(),
1748 self.cache_strategy.clone(),
1749 self.parquet_meta.clone(),
1750 row_group_idx,
1751 )
1752 .with_fetch_metrics(fetch_metrics.cloned());
1753
1754 let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
1756 async_reader,
1757 self.arrow_metadata.clone(),
1758 );
1759 builder = builder
1760 .with_row_groups(vec![row_group_idx])
1761 .with_projection(projection)
1762 .with_batch_size(DEFAULT_READ_BATCH_SIZE);
1763
1764 if let Some(selection) = row_selection {
1765 builder = builder.with_row_selection(selection);
1766 }
1767
1768 let stream = builder.build().context(ReadParquetSnafu {
1769 path: &self.file_path,
1770 })?;
1771
1772 Ok(stream)
1773 }
1774}
1775
1776pub(crate) enum MaybeFilter {
1778 Filter(SimpleFilterEvaluator),
1780 Matched,
1782 Pruned,
1784}
1785
1786pub(crate) struct SimpleFilterContext {
1788 filter: MaybeFilter,
1790 column_id: ColumnId,
1792 semantic_type: SemanticType,
1794 data_type: ConcreteDataType,
1796 usable_primary_key_filter: bool,
1798}
1799
1800impl SimpleFilterContext {
1801 pub(crate) fn new_opt(
1806 sst_meta: &RegionMetadataRef,
1807 expected_meta: Option<&RegionMetadata>,
1808 expr: &Expr,
1809 ) -> Option<Self> {
1810 let filter = SimpleFilterEvaluator::try_new(expr)?;
1811 let usable_primary_key_filter =
1814 is_usable_primary_key_filter(sst_meta, expected_meta, &filter);
1815 let (column_metadata, maybe_filter) = match expected_meta {
1816 Some(meta) => {
1817 let column = meta.column_by_name(filter.column_name())?;
1819 match sst_meta.column_by_id(column.column_id) {
1822 Some(sst_column) => {
1823 debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1824
1825 (column, MaybeFilter::Filter(filter))
1826 }
1827 None => {
1828 if pruned_by_default(&filter, column)? {
1832 (column, MaybeFilter::Pruned)
1833 } else {
1834 (column, MaybeFilter::Matched)
1835 }
1836 }
1837 }
1838 }
1839 None => {
1840 let column = sst_meta.column_by_name(filter.column_name())?;
1841 (column, MaybeFilter::Filter(filter))
1842 }
1843 };
1844
1845 let usable_primary_key_filter =
1846 matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter;
1847
1848 Some(Self {
1849 filter: maybe_filter,
1850 column_id: column_metadata.column_id,
1851 semantic_type: column_metadata.semantic_type,
1852 data_type: column_metadata.column_schema.data_type.clone(),
1853 usable_primary_key_filter,
1854 })
1855 }
1856
1857 pub(crate) fn filter(&self) -> &MaybeFilter {
1859 &self.filter
1860 }
1861
1862 pub(crate) fn column_id(&self) -> ColumnId {
1864 self.column_id
1865 }
1866
1867 pub(crate) fn semantic_type(&self) -> SemanticType {
1869 self.semantic_type
1870 }
1871
1872 pub(crate) fn data_type(&self) -> &ConcreteDataType {
1874 &self.data_type
1875 }
1876
1877 pub(crate) fn usable_primary_key_filter(&self) -> bool {
1879 self.usable_primary_key_filter
1880 }
1881
1882 pub(crate) fn primary_key_prefilter(&self) -> Option<SimpleFilterEvaluator> {
1884 if !self.usable_primary_key_filter {
1885 return None;
1886 }
1887
1888 match &self.filter {
1889 MaybeFilter::Filter(filter) => Some(filter.clone()),
1890 MaybeFilter::Matched | MaybeFilter::Pruned => None,
1891 }
1892 }
1893}
1894
1895fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1898 let value = column.column_schema.create_default().ok().flatten()?;
1899 let scalar_value = value
1900 .try_to_scalar_value(&column.column_schema.data_type)
1901 .ok()?;
1902 let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1903 Some(!matches)
1904}
1905
1906pub struct ParquetReader {
1908 context: FileRangeContextRef,
1910 selection: RowGroupSelection,
1912 reader: Option<FlatPruneReader>,
1914 fetch_metrics: ParquetFetchMetrics,
1916}
1917
1918impl ParquetReader {
1919 #[tracing::instrument(
1920 skip_all,
1921 fields(
1922 region_id = %self.context.reader_builder().file_handle.region_id(),
1923 file_id = %self.context.reader_builder().file_handle.file_id()
1924 )
1925 )]
1926 pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1927 loop {
1928 if let Some(reader) = &mut self.reader {
1929 if let Some(batch) = reader.next_batch().await? {
1930 return Ok(Some(batch));
1931 }
1932 self.reader = None;
1933 continue;
1934 }
1935
1936 let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
1937 return Ok(None);
1938 };
1939
1940 let skip_fields = self.context.pre_filter_mode().skip_fields();
1941 let parquet_reader = self
1942 .context
1943 .reader_builder()
1944 .build(self.context.build_context(
1945 row_group_idx,
1946 Some(row_selection),
1947 Some(&self.fetch_metrics),
1948 skip_fields,
1949 ))
1950 .await?;
1951 self.reader = Some(FlatPruneReader::new_with_row_group_reader(
1952 self.context.clone(),
1953 FlatRowGroupReader::new(self.context.clone(), parquet_reader),
1954 skip_fields,
1955 ));
1956 }
1957 }
1958 #[tracing::instrument(
1960 skip_all,
1961 fields(
1962 region_id = %context.reader_builder().file_handle.region_id(),
1963 file_id = %context.reader_builder().file_handle.file_id()
1964 )
1965 )]
1966 pub(crate) async fn new(
1967 context: FileRangeContextRef,
1968 mut selection: RowGroupSelection,
1969 ) -> Result<Self> {
1970 debug_assert!(context.read_format().as_flat().is_some());
1971 let fetch_metrics = ParquetFetchMetrics::default();
1972 let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1973 let skip_fields = context.pre_filter_mode().skip_fields();
1974 let parquet_reader = context
1975 .reader_builder()
1976 .build(context.build_context(
1977 row_group_idx,
1978 Some(row_selection),
1979 Some(&fetch_metrics),
1980 skip_fields,
1981 ))
1982 .await?;
1983 Some(FlatPruneReader::new_with_row_group_reader(
1984 context.clone(),
1985 FlatRowGroupReader::new(context.clone(), parquet_reader),
1986 skip_fields,
1987 ))
1988 } else {
1989 None
1990 };
1991
1992 Ok(ParquetReader {
1993 context,
1994 selection,
1995 reader,
1996 fetch_metrics,
1997 })
1998 }
1999
2000 pub fn metadata(&self) -> &RegionMetadataRef {
2002 self.context.read_format().metadata()
2003 }
2004
2005 pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2006 self.context.reader_builder().parquet_meta.clone()
2007 }
2008}
2009
2010pub(crate) trait RowGroupReaderContext: Send {
2013 fn read_format(&self) -> &ReadFormat;
2014
2015 fn file_path(&self) -> &str;
2016}
2017
2018impl RowGroupReaderContext for FileRangeContextRef {
2019 fn read_format(&self) -> &ReadFormat {
2020 self.as_ref().read_format()
2021 }
2022
2023 fn file_path(&self) -> &str {
2024 self.as_ref().file_path()
2025 }
2026}
2027
2028pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
2030
2031#[allow(dead_code)]
2032impl RowGroupReader {
2033 pub(crate) fn new(
2035 context: FileRangeContextRef,
2036 stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2037 ) -> Self {
2038 Self::create(context, stream)
2039 }
2040}
2041
2042pub(crate) struct RowGroupReaderBase<T> {
2044 context: T,
2046 stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2048 batches: VecDeque<Batch>,
2050 metrics: ReaderMetrics,
2052 override_sequence: Option<ArrayRef>,
2054}
2055
2056#[allow(dead_code)]
2057impl<T> RowGroupReaderBase<T>
2058where
2059 T: RowGroupReaderContext,
2060{
2061 pub(crate) fn create(context: T, stream: ParquetRecordBatchStream<SstAsyncFileReader>) -> Self {
2063 let override_sequence = context
2065 .read_format()
2066 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2067 assert!(context.read_format().as_primary_key().is_some());
2068
2069 Self {
2070 context,
2071 stream,
2072 batches: VecDeque::new(),
2073 metrics: ReaderMetrics::default(),
2074 override_sequence,
2075 }
2076 }
2077
2078 pub(crate) fn metrics(&self) -> &ReaderMetrics {
2080 &self.metrics
2081 }
2082
2083 pub(crate) fn read_format(&self) -> &ReadFormat {
2085 self.context.read_format()
2086 }
2087
2088 async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2090 match self.stream.next().await.transpose() {
2091 Ok(batch) => Ok(batch),
2092 Err(e) => Err(e).context(ReadParquetSnafu {
2093 path: self.context.file_path(),
2094 }),
2095 }
2096 }
2097
2098 pub(crate) async fn next_inner(&mut self) -> Result<Option<Batch>> {
2100 let scan_start = Instant::now();
2101 if let Some(batch) = self.batches.pop_front() {
2102 self.metrics.num_rows += batch.num_rows();
2103 self.metrics.scan_cost += scan_start.elapsed();
2104 return Ok(Some(batch));
2105 }
2106
2107 while self.batches.is_empty() {
2109 let Some(record_batch) = self.fetch_next_record_batch().await? else {
2110 self.metrics.scan_cost += scan_start.elapsed();
2111 return Ok(None);
2112 };
2113 self.metrics.num_record_batches += 1;
2114
2115 self.context
2117 .read_format()
2118 .as_primary_key()
2119 .unwrap()
2120 .convert_record_batch(
2121 &record_batch,
2122 self.override_sequence.as_ref(),
2123 &mut self.batches,
2124 )?;
2125 self.metrics.num_batches += self.batches.len();
2126 }
2127 let batch = self.batches.pop_front();
2128 self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2129 self.metrics.scan_cost += scan_start.elapsed();
2130 Ok(batch)
2131 }
2132}
2133
2134#[async_trait::async_trait]
2135impl<T> BatchReader for RowGroupReaderBase<T>
2136where
2137 T: RowGroupReaderContext + Send + Sync,
2138{
2139 async fn next_batch(&mut self) -> Result<Option<Batch>> {
2140 self.next_inner().await
2141 }
2142}
2143
2144pub(crate) struct FlatRowGroupReader {
2146 context: FileRangeContextRef,
2148 stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2150 override_sequence: Option<ArrayRef>,
2152}
2153
2154impl FlatRowGroupReader {
2155 pub(crate) fn new(
2157 context: FileRangeContextRef,
2158 stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2159 ) -> Self {
2160 let override_sequence = context
2162 .read_format()
2163 .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2164
2165 Self {
2166 context,
2167 stream,
2168 override_sequence,
2169 }
2170 }
2171
2172 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2174 match self.stream.next().await {
2175 Some(batch_result) => {
2176 let record_batch = batch_result.context(ReadParquetSnafu {
2177 path: self.context.file_path(),
2178 })?;
2179
2180 let flat_format = self.context.read_format().as_flat().unwrap();
2182 let record_batch =
2183 flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2184 Ok(Some(record_batch))
2185 }
2186 None => Ok(None),
2187 }
2188 }
2189}
2190
2191#[cfg(test)]
2192mod tests {
2193 use std::any::Any;
2194 use std::fmt::{Debug, Formatter};
2195 use std::sync::{Arc, LazyLock};
2196
2197 use datafusion::arrow::datatypes::DataType;
2198 use datafusion_common::ScalarValue;
2199 use datafusion_expr::expr::ScalarFunction;
2200 use datafusion_expr::{
2201 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2202 col, lit,
2203 };
2204 use datatypes::arrow::array::{ArrayRef, Int64Array};
2205 use datatypes::arrow::record_batch::RecordBatch;
2206 use datatypes::prelude::ConcreteDataType;
2207 use datatypes::schema::ColumnSchema;
2208 use object_store::services::Memory;
2209 use parquet::arrow::ArrowWriter;
2210 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2211 use store_api::region_request::PathType;
2212 use table::predicate::Predicate;
2213
2214 use super::*;
2215 use crate::sst::parquet::metadata::MetadataLoader;
2216 use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2217
2218 #[tokio::test(flavor = "current_thread")]
2219 async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2220 #[derive(Eq, PartialEq, Hash)]
2221 struct PanicDebugUdf;
2222
2223 impl Debug for PanicDebugUdf {
2224 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2225 panic!("minmax predicate key should not format exprs when cache is disabled");
2226 }
2227 }
2228
2229 impl ScalarUDFImpl for PanicDebugUdf {
2230 fn as_any(&self) -> &dyn Any {
2231 self
2232 }
2233
2234 fn name(&self) -> &str {
2235 "panic_debug_udf"
2236 }
2237
2238 fn signature(&self) -> &Signature {
2239 static SIGNATURE: LazyLock<Signature> =
2240 LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2241 &SIGNATURE
2242 }
2243
2244 fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2245 Ok(DataType::Int64)
2246 }
2247
2248 fn invoke_with_args(
2249 &self,
2250 _args: ScalarFunctionArgs,
2251 ) -> datafusion_common::Result<ColumnarValue> {
2252 Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2253 }
2254 }
2255
2256 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2257 let file_handle = sst_file_handle(0, 1);
2258 let table_dir = "test_table".to_string();
2259 let path_type = PathType::Bare;
2260 let file_path = file_handle.file_path(&table_dir, path_type);
2261
2262 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2263 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2264 let mut parquet_bytes = Vec::new();
2265 let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2266 writer.write(&batch).unwrap();
2267 writer.close().unwrap();
2268 let file_size = parquet_bytes.len() as u64;
2269 object_store.write(&file_path, parquet_bytes).await.unwrap();
2270
2271 let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2272 let read_format =
2273 ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
2274
2275 let mut cache_metrics = MetadataCacheMetrics::default();
2276 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2277 let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2278
2279 let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2280 let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2281 udf,
2282 vec![],
2283 ))]);
2284 let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2285 .predicate(Some(predicate))
2286 .cache(CacheStrategy::Disabled);
2287
2288 let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2289 let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2290 let mut metrics = ReaderFilterMetrics::default();
2291 let selection = builder.row_groups_by_minmax(
2292 &read_format,
2293 &parquet_meta,
2294 row_group_size,
2295 total_row_count,
2296 &mut metrics,
2297 false,
2298 );
2299
2300 assert!(!selection.is_empty());
2301 }
2302
2303 fn expected_metadata_with_reused_tag_name(
2304 old_metadata: &RegionMetadata,
2305 ) -> Arc<RegionMetadata> {
2306 let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2307 builder
2308 .push_column_metadata(ColumnMetadata {
2309 column_schema: ColumnSchema::new(
2310 "tag_0".to_string(),
2311 ConcreteDataType::string_datatype(),
2312 true,
2313 ),
2314 semantic_type: SemanticType::Tag,
2315 column_id: 10,
2316 })
2317 .push_column_metadata(ColumnMetadata {
2318 column_schema: ColumnSchema::new(
2319 "tag_1".to_string(),
2320 ConcreteDataType::string_datatype(),
2321 true,
2322 ),
2323 semantic_type: SemanticType::Tag,
2324 column_id: 1,
2325 })
2326 .push_column_metadata(ColumnMetadata {
2327 column_schema: ColumnSchema::new(
2328 "field_0".to_string(),
2329 ConcreteDataType::uint64_datatype(),
2330 true,
2331 ),
2332 semantic_type: SemanticType::Field,
2333 column_id: 2,
2334 })
2335 .push_column_metadata(ColumnMetadata {
2336 column_schema: ColumnSchema::new(
2337 "ts".to_string(),
2338 ConcreteDataType::timestamp_millisecond_datatype(),
2339 false,
2340 ),
2341 semantic_type: SemanticType::Timestamp,
2342 column_id: 3,
2343 })
2344 .primary_key(vec![10, 1]);
2345
2346 Arc::new(builder.build().unwrap())
2347 }
2348
2349 #[test]
2350 fn test_simple_filter_context_marks_usable_primary_key_filter() {
2351 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2352 let ctx =
2353 SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap();
2354
2355 assert!(ctx.usable_primary_key_filter());
2356 assert!(ctx.primary_key_prefilter().is_some());
2357 }
2358
2359 #[test]
2360 fn test_simple_filter_context_skips_non_usable_primary_key_filter() {
2361 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2362
2363 let field_ctx =
2364 SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap();
2365 assert!(!field_ctx.usable_primary_key_filter());
2366 assert!(field_ctx.primary_key_prefilter().is_none());
2367
2368 let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2369 let mismatched_ctx = SimpleFilterContext::new_opt(
2370 &metadata,
2371 Some(expected_metadata.as_ref()),
2372 &col("tag_0").eq(lit("a")),
2373 )
2374 .unwrap();
2375 assert!(!mismatched_ctx.usable_primary_key_filter());
2376 assert!(mismatched_ctx.primary_key_prefilter().is_none());
2377 }
2378}