Skip to main content

mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17#[cfg(feature = "vector_index")]
18use std::collections::BTreeSet;
19use std::collections::{HashSet, VecDeque};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_telemetry::{tracing, warn};
26use datafusion_expr::Expr;
27use datatypes::arrow::array::ArrayRef;
28use datatypes::arrow::datatypes::Field;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::data_type::ConcreteDataType;
31use datatypes::prelude::DataType;
32use futures::StreamExt;
33use mito_codec::row_converter::build_primary_key_codec;
34use object_store::ObjectStore;
35use parquet::arrow::ProjectionMask;
36use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
37use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder};
38use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
39use partition::expr::PartitionExpr;
40use snafu::ResultExt;
41use store_api::codec::PrimaryKeyEncoding;
42use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
43use store_api::region_request::PathType;
44use store_api::storage::{ColumnId, FileId};
45use table::predicate::Predicate;
46
47use crate::cache::index::result_cache::PredicateKey;
48use crate::cache::{CacheStrategy, CachedSstMeta};
49#[cfg(feature = "vector_index")]
50use crate::error::ApplyVectorIndexSnafu;
51use crate::error::{ReadDataPartSnafu, ReadParquetSnafu, Result, SerializePartitionExprSnafu};
52use crate::metrics::{
53    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
54    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
55};
56use crate::read::flat_projection::CompactionProjectionMapper;
57use crate::read::prune::FlatPruneReader;
58use crate::read::{Batch, BatchReader};
59use crate::sst::file::FileHandle;
60use crate::sst::index::bloom_filter::applier::{
61    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
62};
63use crate::sst::index::fulltext_index::applier::{
64    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
65};
66use crate::sst::index::inverted_index::applier::{
67    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
68};
69#[cfg(feature = "vector_index")]
70use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
71use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
72use crate::sst::parquet::async_reader::SstAsyncFileReader;
73use crate::sst::parquet::file_range::{
74    FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
75};
76use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
77use crate::sst::parquet::metadata::MetadataLoader;
78use crate::sst::parquet::prefilter::{
79    PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter,
80};
81use crate::sst::parquet::read_columns::{ParquetReadColumns, build_projection_mask};
82use crate::sst::parquet::row_group::ParquetFetchMetrics;
83use crate::sst::parquet::row_selection::RowGroupSelection;
84use crate::sst::parquet::stats::RowGroupPruningStats;
85use crate::sst::tag_maybe_to_dictionary_field;
86
87const INDEX_TYPE_FULLTEXT: &str = "fulltext";
88const INDEX_TYPE_INVERTED: &str = "inverted";
89const INDEX_TYPE_BLOOM: &str = "bloom filter";
90const INDEX_TYPE_VECTOR: &str = "vector";
91
92macro_rules! handle_index_error {
93    ($err:expr, $file_handle:expr, $index_type:expr) => {
94        if cfg!(any(test, feature = "test")) {
95            panic!(
96                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
97                $index_type,
98                $file_handle.region_id(),
99                $file_handle.file_id(),
100                $err
101            );
102        } else {
103            warn!(
104                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
105                $index_type,
106                $file_handle.region_id(),
107                $file_handle.file_id()
108            );
109        }
110    };
111}
112
113/// Parquet SST reader builder.
114pub struct ParquetReaderBuilder {
115    /// SST directory.
116    table_dir: String,
117    /// Path type for generating file paths.
118    path_type: PathType,
119    file_handle: FileHandle,
120    object_store: ObjectStore,
121    /// Predicate to push down.
122    predicate: Option<Predicate>,
123    /// Metadata of columns to read.
124    ///
125    /// `None` reads all columns. Due to schema change, the projection
126    /// can contain columns not in the parquet file.
127    projection: Option<Vec<ColumnId>>,
128    /// Strategy to cache SST data.
129    cache_strategy: CacheStrategy,
130    /// Index appliers.
131    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
132    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
133    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
134    /// Vector index applier for KNN search.
135    #[cfg(feature = "vector_index")]
136    vector_index_applier: Option<VectorIndexApplierRef>,
137    /// Over-fetched k for vector index scan.
138    #[cfg(feature = "vector_index")]
139    vector_index_k: Option<usize>,
140    /// Expected metadata of the region while reading the SST.
141    /// This is usually the latest metadata of the region. The reader use
142    /// it get the correct column id of a column by name.
143    expected_metadata: Option<RegionMetadataRef>,
144    /// Whether this reader is for compaction.
145    compaction: bool,
146    /// Mode to pre-filter columns.
147    pre_filter_mode: PreFilterMode,
148    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
149    decode_primary_key_values: bool,
150    page_index_policy: PageIndexPolicy,
151}
152
153impl ParquetReaderBuilder {
154    /// Returns a new [ParquetReaderBuilder] to read specific SST.
155    pub fn new(
156        table_dir: String,
157        path_type: PathType,
158        file_handle: FileHandle,
159        object_store: ObjectStore,
160    ) -> ParquetReaderBuilder {
161        ParquetReaderBuilder {
162            table_dir,
163            path_type,
164            file_handle,
165            object_store,
166            predicate: None,
167            projection: None,
168            cache_strategy: CacheStrategy::Disabled,
169            inverted_index_appliers: [None, None],
170            bloom_filter_index_appliers: [None, None],
171            fulltext_index_appliers: [None, None],
172            #[cfg(feature = "vector_index")]
173            vector_index_applier: None,
174            #[cfg(feature = "vector_index")]
175            vector_index_k: None,
176            expected_metadata: None,
177            compaction: false,
178            pre_filter_mode: PreFilterMode::All,
179            decode_primary_key_values: false,
180            page_index_policy: Default::default(),
181        }
182    }
183
184    /// Attaches the predicate to the builder.
185    #[must_use]
186    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
187        self.predicate = predicate;
188        self
189    }
190
191    /// Attaches the projection to the builder.
192    ///
193    /// The reader only applies the projection to fields.
194    #[must_use]
195    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
196        self.projection = projection;
197        self
198    }
199
200    /// Attaches the cache to the builder.
201    #[must_use]
202    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
203        self.cache_strategy = cache;
204        self
205    }
206
207    /// Attaches the inverted index appliers to the builder.
208    #[must_use]
209    pub(crate) fn inverted_index_appliers(
210        mut self,
211        index_appliers: [Option<InvertedIndexApplierRef>; 2],
212    ) -> Self {
213        self.inverted_index_appliers = index_appliers;
214        self
215    }
216
217    /// Attaches the bloom filter index appliers to the builder.
218    #[must_use]
219    pub(crate) fn bloom_filter_index_appliers(
220        mut self,
221        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
222    ) -> Self {
223        self.bloom_filter_index_appliers = index_appliers;
224        self
225    }
226
227    /// Attaches the fulltext index appliers to the builder.
228    #[must_use]
229    pub(crate) fn fulltext_index_appliers(
230        mut self,
231        index_appliers: [Option<FulltextIndexApplierRef>; 2],
232    ) -> Self {
233        self.fulltext_index_appliers = index_appliers;
234        self
235    }
236
237    /// Attaches the vector index applier to the builder.
238    #[cfg(feature = "vector_index")]
239    #[must_use]
240    pub(crate) fn vector_index_applier(
241        mut self,
242        applier: Option<VectorIndexApplierRef>,
243        k: Option<usize>,
244    ) -> Self {
245        self.vector_index_applier = applier;
246        self.vector_index_k = k;
247        self
248    }
249
250    /// Attaches the expected metadata to the builder.
251    #[must_use]
252    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
253        self.expected_metadata = expected_metadata;
254        self
255    }
256
257    /// Sets the compaction flag.
258    #[must_use]
259    pub fn compaction(mut self, compaction: bool) -> Self {
260        self.compaction = compaction;
261        self
262    }
263
264    /// Sets the pre-filter mode.
265    #[must_use]
266    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
267        self.pre_filter_mode = pre_filter_mode;
268        self
269    }
270
271    /// Decodes primary key values eagerly when reading primary key format SSTs.
272    #[must_use]
273    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
274        self.decode_primary_key_values = decode;
275        self
276    }
277
278    #[must_use]
279    pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
280        self.page_index_policy = page_index_policy;
281        self
282    }
283
284    /// Builds a [ParquetReader].
285    ///
286    /// This needs to perform IO operation.
287    #[tracing::instrument(
288        skip_all,
289        fields(
290            region_id = %self.file_handle.region_id(),
291            file_id = %self.file_handle.file_id()
292        )
293    )]
294    pub async fn build(&self) -> Result<Option<ParquetReader>> {
295        let mut metrics = ReaderMetrics::default();
296
297        let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else {
298            return Ok(None);
299        };
300        ParquetReader::new(Arc::new(context), selection)
301            .await
302            .map(Some)
303    }
304
305    /// Builds a [FileRangeContext] and collects row groups to read.
306    ///
307    /// This needs to perform IO operation.
308    #[tracing::instrument(
309        skip_all,
310        fields(
311            region_id = %self.file_handle.region_id(),
312            file_id = %self.file_handle.file_id()
313        )
314    )]
315    pub(crate) async fn build_reader_input(
316        &self,
317        metrics: &mut ReaderMetrics,
318    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
319        self.build_reader_input_inner(metrics).await
320    }
321
322    async fn build_reader_input_inner(
323        &self,
324        metrics: &mut ReaderMetrics,
325    ) -> Result<Option<(FileRangeContext, RowGroupSelection)>> {
326        let start = Instant::now();
327
328        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
329        let file_size = self.file_handle.meta_ref().file_size;
330
331        // Loads parquet metadata of the file.
332        let (sst_meta, cache_miss) = self
333            .read_parquet_metadata(
334                &file_path,
335                file_size,
336                &mut metrics.metadata_cache_metrics,
337                self.page_index_policy,
338            )
339            .await?;
340        let parquet_meta = sst_meta.parquet_metadata();
341        let region_meta = sst_meta.region_metadata();
342        let region_partition_expr_str = self
343            .expected_metadata
344            .as_ref()
345            .and_then(|meta| meta.partition_expr.as_ref())
346            .map(|expr| expr.as_str());
347        let (_, is_same_region_partition) = Self::is_same_region_partition(
348            region_partition_expr_str,
349            self.file_handle.meta_ref().partition_expr.as_ref(),
350        )?;
351        // Skip auto convert when:
352        // - compaction is enabled
353        // - region partition expr is same with file partition expr (no need to auto convert)
354        let skip_auto_convert = self.compaction && is_same_region_partition;
355
356        // Build a compaction projection helper when:
357        // - compaction is enabled
358        // - region partition expr differs from file partition expr
359        // - flat format is enabled
360        // - primary key encoding is sparse
361        //
362        // This is applied after row-group filtering to align batches with flat output schema
363        // before compat handling.
364        let compaction_projection_mapper = if self.compaction
365            && !is_same_region_partition
366            && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse
367        {
368            Some(CompactionProjectionMapper::try_new(&region_meta)?)
369        } else {
370            None
371        };
372
373        let mut read_format = if let Some(column_ids) = &self.projection {
374            ReadFormat::new(
375                region_meta.clone(),
376                Some(column_ids),
377                true, // Always reads as flat format.
378                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
379                &file_path,
380                skip_auto_convert,
381            )?
382        } else {
383            // Lists all column ids to read, we always use the expected metadata if possible.
384            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
385            let column_ids: Vec<_> = expected_meta
386                .column_metadatas
387                .iter()
388                .map(|col| col.column_id)
389                .collect();
390            ReadFormat::new(
391                region_meta.clone(),
392                Some(&column_ids),
393                true, // Always reads as flat format.
394                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
395                &file_path,
396                skip_auto_convert,
397            )?
398        };
399        if self.decode_primary_key_values {
400            read_format.set_decode_primary_key_values(true);
401        }
402        if need_override_sequence(&parquet_meta) {
403            read_format
404                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
405        }
406
407        // Computes the projection mask.
408        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
409        let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices(
410            read_format.projection_indices().iter().copied(),
411        );
412
413        let projection_mask = build_projection_mask(&parquet_read_cols, parquet_schema_desc);
414        let selection = self
415            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
416            .await;
417
418        if selection.is_empty() {
419            metrics.build_cost += start.elapsed();
420            return Ok(None);
421        }
422
423        // Trigger background download if metadata had a cache miss and selection is not empty
424        if cache_miss && !selection.is_empty() {
425            use crate::cache::file_cache::{FileType, IndexKey};
426            let index_key = IndexKey::new(
427                self.file_handle.region_id(),
428                self.file_handle.file_id().file_id(),
429                FileType::Parquet,
430            );
431            self.cache_strategy.maybe_download_background(
432                index_key,
433                file_path.clone(),
434                self.object_store.clone(),
435                file_size,
436            );
437        }
438
439        let prune_schema = self
440            .expected_metadata
441            .as_ref()
442            .map(|meta| meta.schema.clone())
443            .unwrap_or_else(|| region_meta.schema.clone());
444
445        // Create ArrowReaderMetadata for async stream building.
446        let arrow_reader_options =
447            ArrowReaderOptions::new().with_schema(read_format.arrow_schema().clone());
448        let arrow_metadata =
449            ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
450                .context(ReadDataPartSnafu)?;
451
452        let filters = if let Some(predicate) = &self.predicate {
453            predicate
454                .exprs()
455                .iter()
456                .filter_map(|expr| {
457                    SimpleFilterContext::new_opt(
458                        &region_meta,
459                        self.expected_metadata.as_deref(),
460                        expr,
461                    )
462                })
463                .collect::<Vec<_>>()
464        } else {
465            vec![]
466        };
467
468        let dyn_filters = if let Some(predicate) = &self.predicate {
469            predicate.dyn_filters().as_ref().clone()
470        } else {
471            vec![]
472        };
473
474        let codec = build_primary_key_codec(read_format.metadata());
475
476        // Extract primary key filters from precomputed filter contexts for prefiltering.
477        let primary_key_filters = {
478            let pk_filters = filters
479                .iter()
480                .filter_map(SimpleFilterContext::primary_key_prefilter)
481                .collect::<Vec<_>>();
482            (!pk_filters.is_empty()).then_some(Arc::new(pk_filters))
483        };
484
485        let prefilter_builder = PrefilterContextBuilder::new(
486            &read_format,
487            &codec,
488            primary_key_filters.as_ref(),
489            parquet_meta.file_metadata().schema_descr(),
490        );
491
492        let reader_builder = RowGroupReaderBuilder {
493            file_handle: self.file_handle.clone(),
494            file_path,
495            parquet_meta,
496            arrow_metadata,
497            object_store: self.object_store.clone(),
498            projection: projection_mask,
499            cache_strategy: self.cache_strategy.clone(),
500            prefilter_builder,
501        };
502
503        let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
504
505        let context = FileRangeContext::new(
506            reader_builder,
507            RangeBase {
508                filters,
509                dyn_filters,
510                read_format,
511                expected_metadata: self.expected_metadata.clone(),
512                prune_schema,
513                codec,
514                compat_batch: None,
515                compaction_projection_mapper,
516                pre_filter_mode: self.pre_filter_mode,
517                partition_filter,
518            },
519        );
520
521        metrics.build_cost += start.elapsed();
522
523        Ok(Some((context, selection)))
524    }
525
526    fn is_same_region_partition(
527        region_partition_expr_str: Option<&str>,
528        file_partition_expr: Option<&PartitionExpr>,
529    ) -> Result<(Option<PartitionExpr>, bool)> {
530        let region_partition_expr = match region_partition_expr_str {
531            Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?,
532            None => None,
533        };
534
535        let is_same = region_partition_expr.as_ref() == file_partition_expr;
536        Ok((region_partition_expr, is_same))
537    }
538
539    /// Compare partition expressions from expected metadata and file metadata,
540    /// and build a partition filter if they differ.
541    fn build_partition_filter(
542        &self,
543        read_format: &ReadFormat,
544        prune_schema: &Arc<datatypes::schema::Schema>,
545    ) -> Result<Option<PartitionFilterContext>> {
546        let region_partition_expr_str = self
547            .expected_metadata
548            .as_ref()
549            .and_then(|meta| meta.partition_expr.as_ref());
550        let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
551
552        let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition(
553            region_partition_expr_str.map(|s| s.as_str()),
554            file_partition_expr_ref,
555        )?;
556
557        if is_same_region_partition {
558            return Ok(None);
559        }
560
561        let Some(region_partition_expr) = region_partition_expr else {
562            return Ok(None);
563        };
564
565        // Collect columns referenced by the partition expression.
566        let mut referenced_columns = HashSet::new();
567        region_partition_expr.collect_column_names(&mut referenced_columns);
568
569        // Build a partition_schema containing only referenced columns.
570        let is_flat = read_format.as_flat().is_some();
571        let partition_schema = Arc::new(datatypes::schema::Schema::new(
572            prune_schema
573                .column_schemas()
574                .iter()
575                .filter(|col| referenced_columns.contains(&col.name))
576                .map(|col| {
577                    if is_flat
578                        && let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
579                        && column_meta.semantic_type == SemanticType::Tag
580                        && col.data_type.is_string()
581                    {
582                        let field = Arc::new(Field::new(
583                            &col.name,
584                            col.data_type.as_arrow_type(),
585                            col.is_nullable(),
586                        ));
587                        let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
588                        let mut column = col.clone();
589                        column.data_type =
590                            ConcreteDataType::from_arrow_type(dict_field.data_type());
591                        return column;
592                    }
593
594                    col.clone()
595                })
596                .collect::<Vec<_>>(),
597        ));
598
599        let region_partition_physical_expr = region_partition_expr
600            .try_as_physical_expr(partition_schema.arrow_schema())
601            .context(SerializePartitionExprSnafu)?;
602
603        Ok(Some(PartitionFilterContext {
604            region_partition_physical_expr,
605            partition_schema,
606        }))
607    }
608
609    /// Reads parquet metadata of specific file.
610    /// Returns (fused metadata, cache_miss_flag).
611    async fn read_parquet_metadata(
612        &self,
613        file_path: &str,
614        file_size: u64,
615        cache_metrics: &mut MetadataCacheMetrics,
616        page_index_policy: PageIndexPolicy,
617    ) -> Result<(Arc<CachedSstMeta>, bool)> {
618        let start = Instant::now();
619        let _t = READ_STAGE_ELAPSED
620            .with_label_values(&["read_parquet_metadata"])
621            .start_timer();
622
623        let file_id = self.file_handle.file_id();
624        // Tries to get from cache with metrics tracking.
625        if let Some(metadata) = self
626            .cache_strategy
627            .get_sst_meta_data(file_id, cache_metrics, page_index_policy)
628            .await
629        {
630            cache_metrics.metadata_load_cost += start.elapsed();
631            return Ok((metadata, false));
632        }
633
634        // Cache miss, load metadata directly.
635        let mut metadata_loader =
636            MetadataLoader::new(self.object_store.clone(), file_path, file_size);
637        metadata_loader.with_page_index_policy(page_index_policy);
638        let metadata = metadata_loader.load(cache_metrics).await?;
639
640        let metadata = Arc::new(CachedSstMeta::try_new(file_path, metadata)?);
641        // Cache the metadata.
642        self.cache_strategy
643            .put_sst_meta_data(file_id, metadata.clone());
644
645        cache_metrics.metadata_load_cost += start.elapsed();
646        Ok((metadata, true))
647    }
648
649    /// Computes row groups to read, along with their respective row selections.
650    #[tracing::instrument(
651        skip_all,
652        fields(
653            region_id = %self.file_handle.region_id(),
654            file_id = %self.file_handle.file_id()
655        )
656    )]
657    async fn row_groups_to_read(
658        &self,
659        read_format: &ReadFormat,
660        parquet_meta: &ParquetMetaData,
661        metrics: &mut ReaderFilterMetrics,
662    ) -> RowGroupSelection {
663        let num_row_groups = parquet_meta.num_row_groups();
664        let num_rows = parquet_meta.file_metadata().num_rows();
665        if num_row_groups == 0 || num_rows == 0 {
666            return RowGroupSelection::default();
667        }
668
669        // Let's assume that the number of rows in the first row group
670        // can represent the `row_group_size` of the Parquet file.
671        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
672        if row_group_size == 0 {
673            return RowGroupSelection::default();
674        }
675
676        metrics.rg_total += num_row_groups;
677        metrics.rows_total += num_rows as usize;
678
679        // Compute skip_fields once for all pruning operations
680        let skip_fields = self.pre_filter_mode.skip_fields();
681
682        let mut output = self.row_groups_by_minmax(
683            read_format,
684            parquet_meta,
685            row_group_size,
686            num_rows as usize,
687            metrics,
688            skip_fields,
689        );
690        if output.is_empty() {
691            return output;
692        }
693
694        let fulltext_filtered = self
695            .prune_row_groups_by_fulltext_index(
696                row_group_size,
697                num_row_groups,
698                &mut output,
699                metrics,
700                skip_fields,
701            )
702            .await;
703        if output.is_empty() {
704            return output;
705        }
706
707        self.prune_row_groups_by_inverted_index(
708            row_group_size,
709            num_row_groups,
710            &mut output,
711            metrics,
712            skip_fields,
713        )
714        .await;
715        if output.is_empty() {
716            return output;
717        }
718
719        self.prune_row_groups_by_bloom_filter(
720            row_group_size,
721            parquet_meta,
722            &mut output,
723            metrics,
724            skip_fields,
725        )
726        .await;
727        if output.is_empty() {
728            return output;
729        }
730
731        if !fulltext_filtered {
732            self.prune_row_groups_by_fulltext_bloom(
733                row_group_size,
734                parquet_meta,
735                &mut output,
736                metrics,
737                skip_fields,
738            )
739            .await;
740        }
741        #[cfg(feature = "vector_index")]
742        {
743            self.prune_row_groups_by_vector_index(
744                row_group_size,
745                num_row_groups,
746                &mut output,
747                metrics,
748            )
749            .await;
750            if output.is_empty() {
751                return output;
752            }
753        }
754        output
755    }
756
757    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
758    async fn prune_row_groups_by_fulltext_index(
759        &self,
760        row_group_size: usize,
761        num_row_groups: usize,
762        output: &mut RowGroupSelection,
763        metrics: &mut ReaderFilterMetrics,
764        skip_fields: bool,
765    ) -> bool {
766        if !self.file_handle.meta_ref().fulltext_index_available() {
767            return false;
768        }
769
770        let mut pruned = false;
771        // If skip_fields is true, only apply the first applier (for tags).
772        let appliers = if skip_fields {
773            &self.fulltext_index_appliers[..1]
774        } else {
775            &self.fulltext_index_appliers[..]
776        };
777        for index_applier in appliers.iter().flatten() {
778            let predicate_key = index_applier.predicate_key();
779            // Fast path: return early if the result is in the cache.
780            let cached = self
781                .cache_strategy
782                .index_result_cache()
783                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
784            if let Some(result) = cached.as_ref()
785                && all_required_row_groups_searched(output, result)
786            {
787                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
788                metrics.fulltext_index_cache_hit += 1;
789                pruned = true;
790                continue;
791            }
792
793            // Slow path: apply the index from the file.
794            metrics.fulltext_index_cache_miss += 1;
795            let file_size_hint = self.file_handle.meta_ref().index_file_size();
796            let apply_res = index_applier
797                .apply_fine(
798                    self.file_handle.index_id(),
799                    Some(file_size_hint),
800                    metrics.fulltext_index_apply_metrics.as_mut(),
801                )
802                .await;
803            let selection = match apply_res {
804                Ok(Some(res)) => {
805                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
806                }
807                Ok(None) => continue,
808                Err(err) => {
809                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
810                    continue;
811                }
812            };
813
814            self.apply_index_result_and_update_cache(
815                predicate_key,
816                self.file_handle.file_id().file_id(),
817                selection,
818                output,
819                metrics,
820                INDEX_TYPE_FULLTEXT,
821            );
822            pruned = true;
823        }
824        pruned
825    }
826
827    /// Applies index to prune row groups.
828    ///
829    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
830    /// as an escape route in case of index issues, and it can be used to test
831    /// the correctness of the index.
832    async fn prune_row_groups_by_inverted_index(
833        &self,
834        row_group_size: usize,
835        num_row_groups: usize,
836        output: &mut RowGroupSelection,
837        metrics: &mut ReaderFilterMetrics,
838        skip_fields: bool,
839    ) -> bool {
840        if !self.file_handle.meta_ref().inverted_index_available() {
841            return false;
842        }
843
844        let mut pruned = false;
845        // If skip_fields is true, only apply the first applier (for tags).
846        let appliers = if skip_fields {
847            &self.inverted_index_appliers[..1]
848        } else {
849            &self.inverted_index_appliers[..]
850        };
851        for index_applier in appliers.iter().flatten() {
852            let predicate_key = index_applier.predicate_key();
853            // Fast path: return early if the result is in the cache.
854            let cached = self
855                .cache_strategy
856                .index_result_cache()
857                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
858            if let Some(result) = cached.as_ref()
859                && all_required_row_groups_searched(output, result)
860            {
861                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
862                metrics.inverted_index_cache_hit += 1;
863                pruned = true;
864                continue;
865            }
866
867            // Slow path: apply the index from the file.
868            metrics.inverted_index_cache_miss += 1;
869            let file_size_hint = self.file_handle.meta_ref().index_file_size();
870            let apply_res = index_applier
871                .apply(
872                    self.file_handle.index_id(),
873                    Some(file_size_hint),
874                    metrics.inverted_index_apply_metrics.as_mut(),
875                )
876                .await;
877            let selection = match apply_res {
878                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
879                    row_group_size,
880                    num_row_groups,
881                    apply_output,
882                ),
883                Err(err) => {
884                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
885                    continue;
886                }
887            };
888
889            self.apply_index_result_and_update_cache(
890                predicate_key,
891                self.file_handle.file_id().file_id(),
892                selection,
893                output,
894                metrics,
895                INDEX_TYPE_INVERTED,
896            );
897            pruned = true;
898        }
899        pruned
900    }
901
902    async fn prune_row_groups_by_bloom_filter(
903        &self,
904        row_group_size: usize,
905        parquet_meta: &ParquetMetaData,
906        output: &mut RowGroupSelection,
907        metrics: &mut ReaderFilterMetrics,
908        skip_fields: bool,
909    ) -> bool {
910        if !self.file_handle.meta_ref().bloom_filter_index_available() {
911            return false;
912        }
913
914        let mut pruned = false;
915        // If skip_fields is true, only apply the first applier (for tags).
916        let appliers = if skip_fields {
917            &self.bloom_filter_index_appliers[..1]
918        } else {
919            &self.bloom_filter_index_appliers[..]
920        };
921        for index_applier in appliers.iter().flatten() {
922            let predicate_key = index_applier.predicate_key();
923            // Fast path: return early if the result is in the cache.
924            let cached = self
925                .cache_strategy
926                .index_result_cache()
927                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
928            if let Some(result) = cached.as_ref()
929                && all_required_row_groups_searched(output, result)
930            {
931                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
932                metrics.bloom_filter_cache_hit += 1;
933                pruned = true;
934                continue;
935            }
936
937            // Slow path: apply the index from the file.
938            metrics.bloom_filter_cache_miss += 1;
939            let file_size_hint = self.file_handle.meta_ref().index_file_size();
940            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
941                (
942                    rg.num_rows() as usize,
943                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
944                    output.contains_non_empty_row_group(i)
945                        && cached
946                            .as_ref()
947                            .map(|c| !c.contains_row_group(i))
948                            .unwrap_or(true),
949                )
950            });
951            let apply_res = index_applier
952                .apply(
953                    self.file_handle.index_id(),
954                    Some(file_size_hint),
955                    rgs,
956                    metrics.bloom_filter_apply_metrics.as_mut(),
957                )
958                .await;
959            let mut selection = match apply_res {
960                Ok(apply_output) => {
961                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
962                }
963                Err(err) => {
964                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
965                    continue;
966                }
967            };
968
969            // New searched row groups are added to `selection`, concat them with `cached`.
970            if let Some(cached) = cached.as_ref() {
971                selection.concat(cached);
972            }
973
974            self.apply_index_result_and_update_cache(
975                predicate_key,
976                self.file_handle.file_id().file_id(),
977                selection,
978                output,
979                metrics,
980                INDEX_TYPE_BLOOM,
981            );
982            pruned = true;
983        }
984        pruned
985    }
986
987    /// Prunes row groups by vector index results.
988    #[cfg(feature = "vector_index")]
989    async fn prune_row_groups_by_vector_index(
990        &self,
991        row_group_size: usize,
992        num_row_groups: usize,
993        output: &mut RowGroupSelection,
994        metrics: &mut ReaderFilterMetrics,
995    ) {
996        let Some(applier) = &self.vector_index_applier else {
997            return;
998        };
999        let Some(k) = self.vector_index_k else {
1000            return;
1001        };
1002        if !self.file_handle.meta_ref().vector_index_available() {
1003            return;
1004        }
1005
1006        let file_size_hint = self.file_handle.meta_ref().index_file_size();
1007        let apply_res = applier
1008            .apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
1009            .await;
1010        let row_ids = match apply_res {
1011            Ok(res) => res.row_offsets,
1012            Err(err) => {
1013                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1014                return;
1015            }
1016        };
1017
1018        let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
1019        {
1020            Ok(selection) => selection,
1021            Err(err) => {
1022                handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
1023                return;
1024            }
1025        };
1026        metrics.rows_vector_selected += selection.row_count();
1027        apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
1028    }
1029
1030    async fn prune_row_groups_by_fulltext_bloom(
1031        &self,
1032        row_group_size: usize,
1033        parquet_meta: &ParquetMetaData,
1034        output: &mut RowGroupSelection,
1035        metrics: &mut ReaderFilterMetrics,
1036        skip_fields: bool,
1037    ) -> bool {
1038        if !self.file_handle.meta_ref().fulltext_index_available() {
1039            return false;
1040        }
1041
1042        let mut pruned = false;
1043        // If skip_fields is true, only apply the first applier (for tags).
1044        let appliers = if skip_fields {
1045            &self.fulltext_index_appliers[..1]
1046        } else {
1047            &self.fulltext_index_appliers[..]
1048        };
1049        for index_applier in appliers.iter().flatten() {
1050            let predicate_key = index_applier.predicate_key();
1051            // Fast path: return early if the result is in the cache.
1052            let cached = self
1053                .cache_strategy
1054                .index_result_cache()
1055                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
1056            if let Some(result) = cached.as_ref()
1057                && all_required_row_groups_searched(output, result)
1058            {
1059                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
1060                metrics.fulltext_index_cache_hit += 1;
1061                pruned = true;
1062                continue;
1063            }
1064
1065            // Slow path: apply the index from the file.
1066            metrics.fulltext_index_cache_miss += 1;
1067            let file_size_hint = self.file_handle.meta_ref().index_file_size();
1068            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
1069                (
1070                    rg.num_rows() as usize,
1071                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
1072                    output.contains_non_empty_row_group(i)
1073                        && cached
1074                            .as_ref()
1075                            .map(|c| !c.contains_row_group(i))
1076                            .unwrap_or(true),
1077                )
1078            });
1079            let apply_res = index_applier
1080                .apply_coarse(
1081                    self.file_handle.index_id(),
1082                    Some(file_size_hint),
1083                    rgs,
1084                    metrics.fulltext_index_apply_metrics.as_mut(),
1085                )
1086                .await;
1087            let mut selection = match apply_res {
1088                Ok(Some(apply_output)) => {
1089                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
1090                }
1091                Ok(None) => continue,
1092                Err(err) => {
1093                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
1094                    continue;
1095                }
1096            };
1097
1098            // New searched row groups are added to `selection`, concat them with `cached`.
1099            if let Some(cached) = cached.as_ref() {
1100                selection.concat(cached);
1101            }
1102
1103            self.apply_index_result_and_update_cache(
1104                predicate_key,
1105                self.file_handle.file_id().file_id(),
1106                selection,
1107                output,
1108                metrics,
1109                INDEX_TYPE_FULLTEXT,
1110            );
1111            pruned = true;
1112        }
1113        pruned
1114    }
1115
1116    /// Computes row groups selection after min-max pruning.
1117    fn row_groups_by_minmax(
1118        &self,
1119        read_format: &ReadFormat,
1120        parquet_meta: &ParquetMetaData,
1121        row_group_size: usize,
1122        total_row_count: usize,
1123        metrics: &mut ReaderFilterMetrics,
1124        skip_fields: bool,
1125    ) -> RowGroupSelection {
1126        let Some(predicate) = &self.predicate else {
1127            return RowGroupSelection::new(row_group_size, total_row_count);
1128        };
1129
1130        let file_id = self.file_handle.file_id().file_id();
1131        let index_result_cache = self.cache_strategy.index_result_cache();
1132        let cached_minmax_key =
1133            if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
1134                // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
1135                // building row-group pruning stats for identical predicates across queries.
1136                let mut exprs = predicate
1137                    .exprs()
1138                    .iter()
1139                    .map(|expr| format!("{expr:?}"))
1140                    .collect::<Vec<_>>();
1141                exprs.sort();
1142                let schema_version = self
1143                    .expected_metadata
1144                    .as_ref()
1145                    .map(|meta| meta.schema_version)
1146                    .unwrap_or_else(|| read_format.metadata().schema_version);
1147                Some(PredicateKey::new_minmax(
1148                    Arc::new(exprs),
1149                    schema_version,
1150                    skip_fields,
1151                ))
1152            } else {
1153                None
1154            };
1155
1156        if let Some(index_result_cache) = index_result_cache
1157            && let Some(predicate_key) = cached_minmax_key.as_ref()
1158        {
1159            if let Some(result) = index_result_cache.get(predicate_key, file_id) {
1160                metrics.minmax_cache_hit += 1;
1161                let num_row_groups = parquet_meta.num_row_groups();
1162                metrics.rg_minmax_filtered +=
1163                    num_row_groups.saturating_sub(result.row_group_count());
1164                return (*result).clone();
1165            }
1166
1167            metrics.minmax_cache_miss += 1;
1168        }
1169
1170        let region_meta = read_format.metadata();
1171        let row_groups = parquet_meta.row_groups();
1172        let stats = RowGroupPruningStats::new(
1173            row_groups,
1174            read_format,
1175            self.expected_metadata.clone(),
1176            skip_fields,
1177        );
1178        let prune_schema = self
1179            .expected_metadata
1180            .as_ref()
1181            .map(|meta| meta.schema.arrow_schema())
1182            .unwrap_or_else(|| region_meta.schema.arrow_schema());
1183
1184        // Here we use the schema of the SST to build the physical expression. If the column
1185        // in the SST doesn't have the same column id as the column in the expected metadata,
1186        // we will get a None statistics for that column.
1187        let mask = predicate.prune_with_stats(&stats, prune_schema);
1188        let output = RowGroupSelection::from_full_row_group_ids(
1189            mask.iter()
1190                .enumerate()
1191                .filter_map(|(row_group, keep)| keep.then_some(row_group)),
1192            row_group_size,
1193            total_row_count,
1194        );
1195
1196        metrics.rg_minmax_filtered += parquet_meta
1197            .num_row_groups()
1198            .saturating_sub(output.row_group_count());
1199
1200        if let Some(index_result_cache) = index_result_cache
1201            && let Some(predicate_key) = cached_minmax_key
1202        {
1203            index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
1204        }
1205
1206        output
1207    }
1208
1209    fn apply_index_result_and_update_cache(
1210        &self,
1211        predicate_key: &PredicateKey,
1212        file_id: FileId,
1213        result: RowGroupSelection,
1214        output: &mut RowGroupSelection,
1215        metrics: &mut ReaderFilterMetrics,
1216        index_type: &str,
1217    ) {
1218        apply_selection_and_update_metrics(output, &result, metrics, index_type);
1219
1220        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
1221            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
1222        }
1223    }
1224}
1225fn apply_selection_and_update_metrics(
1226    output: &mut RowGroupSelection,
1227    result: &RowGroupSelection,
1228    metrics: &mut ReaderFilterMetrics,
1229    index_type: &str,
1230) {
1231    let intersection = output.intersect(result);
1232
1233    let row_group_count = output.row_group_count() - intersection.row_group_count();
1234    let row_count = output.row_count() - intersection.row_count();
1235
1236    metrics.update_index_metrics(index_type, row_group_count, row_count);
1237
1238    *output = intersection;
1239}
1240
1241#[cfg(feature = "vector_index")]
1242fn vector_selection_from_offsets(
1243    row_offsets: Vec<u64>,
1244    row_group_size: usize,
1245    num_row_groups: usize,
1246) -> Result<RowGroupSelection> {
1247    let mut row_ids = BTreeSet::new();
1248    for offset in row_offsets {
1249        let row_id = u32::try_from(offset).map_err(|_| {
1250            ApplyVectorIndexSnafu {
1251                reason: format!("Row offset {} exceeds u32::MAX", offset),
1252            }
1253            .build()
1254        })?;
1255        row_ids.insert(row_id);
1256    }
1257    Ok(RowGroupSelection::from_row_ids(
1258        row_ids,
1259        row_group_size,
1260        num_row_groups,
1261    ))
1262}
1263
1264fn all_required_row_groups_searched(
1265    required_row_groups: &RowGroupSelection,
1266    cached_row_groups: &RowGroupSelection,
1267) -> bool {
1268    required_row_groups.iter().all(|(rg_id, _)| {
1269        // Row group with no rows is not required to search.
1270        !required_row_groups.contains_non_empty_row_group(*rg_id)
1271            // The row group is already searched.
1272            || cached_row_groups.contains_row_group(*rg_id)
1273    })
1274}
1275
1276/// Metrics of filtering rows groups and rows.
1277#[derive(Debug, Default, Clone)]
1278pub(crate) struct ReaderFilterMetrics {
1279    /// Number of row groups before filtering.
1280    pub(crate) rg_total: usize,
1281    /// Number of row groups filtered by fulltext index.
1282    pub(crate) rg_fulltext_filtered: usize,
1283    /// Number of row groups filtered by inverted index.
1284    pub(crate) rg_inverted_filtered: usize,
1285    /// Number of row groups filtered by min-max index.
1286    pub(crate) rg_minmax_filtered: usize,
1287    /// Number of row groups filtered by bloom filter index.
1288    pub(crate) rg_bloom_filtered: usize,
1289    /// Number of row groups filtered by vector index.
1290    pub(crate) rg_vector_filtered: usize,
1291
1292    /// Number of rows in row group before filtering.
1293    pub(crate) rows_total: usize,
1294    /// Number of rows in row group filtered by fulltext index.
1295    pub(crate) rows_fulltext_filtered: usize,
1296    /// Number of rows in row group filtered by inverted index.
1297    pub(crate) rows_inverted_filtered: usize,
1298    /// Number of rows in row group filtered by bloom filter index.
1299    pub(crate) rows_bloom_filtered: usize,
1300    /// Number of rows filtered by vector index.
1301    pub(crate) rows_vector_filtered: usize,
1302    /// Number of rows selected by vector index.
1303    pub(crate) rows_vector_selected: usize,
1304    /// Number of rows filtered by precise filter.
1305    pub(crate) rows_precise_filtered: usize,
1306
1307    /// Number of index result cache hits for fulltext index.
1308    pub(crate) fulltext_index_cache_hit: usize,
1309    /// Number of index result cache misses for fulltext index.
1310    pub(crate) fulltext_index_cache_miss: usize,
1311    /// Number of index result cache hits for inverted index.
1312    pub(crate) inverted_index_cache_hit: usize,
1313    /// Number of index result cache misses for inverted index.
1314    pub(crate) inverted_index_cache_miss: usize,
1315    /// Number of index result cache hits for bloom filter index.
1316    pub(crate) bloom_filter_cache_hit: usize,
1317    /// Number of index result cache misses for bloom filter index.
1318    pub(crate) bloom_filter_cache_miss: usize,
1319    /// Number of index result cache hits for minmax pruning.
1320    pub(crate) minmax_cache_hit: usize,
1321    /// Number of index result cache misses for minmax pruning.
1322    pub(crate) minmax_cache_miss: usize,
1323
1324    /// Optional metrics for inverted index applier.
1325    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1326    /// Optional metrics for bloom filter index applier.
1327    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1328    /// Optional metrics for fulltext index applier.
1329    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1330
1331    /// Number of pruner builder cache hits.
1332    pub(crate) pruner_cache_hit: usize,
1333    /// Number of pruner builder cache misses.
1334    pub(crate) pruner_cache_miss: usize,
1335    /// Duration spent waiting for pruner to build file ranges.
1336    pub(crate) pruner_prune_cost: Duration,
1337}
1338
1339impl ReaderFilterMetrics {
1340    /// Adds `other` metrics to this metrics.
1341    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1342        self.rg_total += other.rg_total;
1343        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1344        self.rg_inverted_filtered += other.rg_inverted_filtered;
1345        self.rg_minmax_filtered += other.rg_minmax_filtered;
1346        self.rg_bloom_filtered += other.rg_bloom_filtered;
1347        self.rg_vector_filtered += other.rg_vector_filtered;
1348
1349        self.rows_total += other.rows_total;
1350        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1351        self.rows_inverted_filtered += other.rows_inverted_filtered;
1352        self.rows_bloom_filtered += other.rows_bloom_filtered;
1353        self.rows_vector_filtered += other.rows_vector_filtered;
1354        self.rows_vector_selected += other.rows_vector_selected;
1355        self.rows_precise_filtered += other.rows_precise_filtered;
1356
1357        self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
1358        self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
1359        self.inverted_index_cache_hit += other.inverted_index_cache_hit;
1360        self.inverted_index_cache_miss += other.inverted_index_cache_miss;
1361        self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
1362        self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
1363        self.minmax_cache_hit += other.minmax_cache_hit;
1364        self.minmax_cache_miss += other.minmax_cache_miss;
1365
1366        self.pruner_cache_hit += other.pruner_cache_hit;
1367        self.pruner_cache_miss += other.pruner_cache_miss;
1368        self.pruner_prune_cost += other.pruner_prune_cost;
1369
1370        // Merge optional applier metrics
1371        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1372            self.inverted_index_apply_metrics
1373                .get_or_insert_with(Default::default)
1374                .merge_from(other_metrics);
1375        }
1376        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1377            self.bloom_filter_apply_metrics
1378                .get_or_insert_with(Default::default)
1379                .merge_from(other_metrics);
1380        }
1381        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1382            self.fulltext_index_apply_metrics
1383                .get_or_insert_with(Default::default)
1384                .merge_from(other_metrics);
1385        }
1386    }
1387
1388    /// Reports metrics.
1389    pub(crate) fn observe(&self) {
1390        READ_ROW_GROUPS_TOTAL
1391            .with_label_values(&["before_filtering"])
1392            .inc_by(self.rg_total as u64);
1393        READ_ROW_GROUPS_TOTAL
1394            .with_label_values(&["fulltext_index_filtered"])
1395            .inc_by(self.rg_fulltext_filtered as u64);
1396        READ_ROW_GROUPS_TOTAL
1397            .with_label_values(&["inverted_index_filtered"])
1398            .inc_by(self.rg_inverted_filtered as u64);
1399        READ_ROW_GROUPS_TOTAL
1400            .with_label_values(&["minmax_index_filtered"])
1401            .inc_by(self.rg_minmax_filtered as u64);
1402        READ_ROW_GROUPS_TOTAL
1403            .with_label_values(&["bloom_filter_index_filtered"])
1404            .inc_by(self.rg_bloom_filtered as u64);
1405        READ_ROW_GROUPS_TOTAL
1406            .with_label_values(&["vector_index_filtered"])
1407            .inc_by(self.rg_vector_filtered as u64);
1408
1409        PRECISE_FILTER_ROWS_TOTAL
1410            .with_label_values(&["parquet"])
1411            .inc_by(self.rows_precise_filtered as u64);
1412        READ_ROWS_IN_ROW_GROUP_TOTAL
1413            .with_label_values(&["before_filtering"])
1414            .inc_by(self.rows_total as u64);
1415        READ_ROWS_IN_ROW_GROUP_TOTAL
1416            .with_label_values(&["fulltext_index_filtered"])
1417            .inc_by(self.rows_fulltext_filtered as u64);
1418        READ_ROWS_IN_ROW_GROUP_TOTAL
1419            .with_label_values(&["inverted_index_filtered"])
1420            .inc_by(self.rows_inverted_filtered as u64);
1421        READ_ROWS_IN_ROW_GROUP_TOTAL
1422            .with_label_values(&["bloom_filter_index_filtered"])
1423            .inc_by(self.rows_bloom_filtered as u64);
1424        READ_ROWS_IN_ROW_GROUP_TOTAL
1425            .with_label_values(&["vector_index_filtered"])
1426            .inc_by(self.rows_vector_filtered as u64);
1427    }
1428
1429    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1430        match index_type {
1431            INDEX_TYPE_FULLTEXT => {
1432                self.rg_fulltext_filtered += row_group_count;
1433                self.rows_fulltext_filtered += row_count;
1434            }
1435            INDEX_TYPE_INVERTED => {
1436                self.rg_inverted_filtered += row_group_count;
1437                self.rows_inverted_filtered += row_count;
1438            }
1439            INDEX_TYPE_BLOOM => {
1440                self.rg_bloom_filtered += row_group_count;
1441                self.rows_bloom_filtered += row_count;
1442            }
1443            INDEX_TYPE_VECTOR => {
1444                self.rg_vector_filtered += row_group_count;
1445                self.rows_vector_filtered += row_count;
1446            }
1447            _ => {}
1448        }
1449    }
1450}
1451
1452#[cfg(all(test, feature = "vector_index"))]
1453mod vector_index_tests {
1454    use super::*;
1455
1456    #[test]
1457    fn test_vector_selection_from_offsets() {
1458        let row_group_size = 4;
1459        let num_row_groups = 3;
1460        let selection =
1461            vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
1462                .unwrap();
1463
1464        assert_eq!(selection.row_group_count(), 3);
1465        assert_eq!(selection.row_count(), 4);
1466        assert!(selection.contains_non_empty_row_group(0));
1467        assert!(selection.contains_non_empty_row_group(1));
1468        assert!(selection.contains_non_empty_row_group(2));
1469    }
1470
1471    #[test]
1472    fn test_vector_selection_from_offsets_out_of_range() {
1473        let row_group_size = 4;
1474        let num_row_groups = 2;
1475        let selection = vector_selection_from_offsets(
1476            vec![0, 7, u64::from(u32::MAX) + 1],
1477            row_group_size,
1478            num_row_groups,
1479        );
1480        assert!(selection.is_err());
1481    }
1482
1483    #[test]
1484    fn test_vector_selection_updates_metrics() {
1485        let row_group_size = 4;
1486        let total_rows = 8;
1487        let mut output = RowGroupSelection::new(row_group_size, total_rows);
1488        let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
1489        let mut metrics = ReaderFilterMetrics::default();
1490
1491        apply_selection_and_update_metrics(
1492            &mut output,
1493            &selection,
1494            &mut metrics,
1495            INDEX_TYPE_VECTOR,
1496        );
1497
1498        assert_eq!(metrics.rg_vector_filtered, 1);
1499        assert_eq!(metrics.rows_vector_filtered, 7);
1500        assert_eq!(output.row_count(), 1);
1501    }
1502}
1503
1504/// Metrics for parquet metadata cache operations.
1505#[derive(Default, Clone, Copy)]
1506pub(crate) struct MetadataCacheMetrics {
1507    /// Number of memory cache hits for parquet metadata.
1508    pub(crate) mem_cache_hit: usize,
1509    /// Number of file cache hits for parquet metadata.
1510    pub(crate) file_cache_hit: usize,
1511    /// Number of cache misses for parquet metadata.
1512    pub(crate) cache_miss: usize,
1513    /// Duration to load parquet metadata.
1514    pub(crate) metadata_load_cost: Duration,
1515    /// Number of read operations performed.
1516    pub(crate) num_reads: usize,
1517    /// Total bytes read from storage.
1518    pub(crate) bytes_read: u64,
1519}
1520
1521impl std::fmt::Debug for MetadataCacheMetrics {
1522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523        let Self {
1524            mem_cache_hit,
1525            file_cache_hit,
1526            cache_miss,
1527            metadata_load_cost,
1528            num_reads,
1529            bytes_read,
1530        } = self;
1531
1532        if self.is_empty() {
1533            return write!(f, "{{}}");
1534        }
1535        write!(f, "{{")?;
1536
1537        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1538
1539        if *mem_cache_hit > 0 {
1540            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1541        }
1542        if *file_cache_hit > 0 {
1543            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1544        }
1545        if *cache_miss > 0 {
1546            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1547        }
1548        if *num_reads > 0 {
1549            write!(f, ", \"num_reads\":{}", num_reads)?;
1550        }
1551        if *bytes_read > 0 {
1552            write!(f, ", \"bytes_read\":{}", bytes_read)?;
1553        }
1554
1555        write!(f, "}}")
1556    }
1557}
1558
1559impl MetadataCacheMetrics {
1560    /// Returns true if the metrics are empty (contain no meaningful data).
1561    pub(crate) fn is_empty(&self) -> bool {
1562        self.metadata_load_cost.is_zero()
1563    }
1564
1565    /// Adds `other` metrics to this metrics.
1566    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1567        self.mem_cache_hit += other.mem_cache_hit;
1568        self.file_cache_hit += other.file_cache_hit;
1569        self.cache_miss += other.cache_miss;
1570        self.metadata_load_cost += other.metadata_load_cost;
1571        self.num_reads += other.num_reads;
1572        self.bytes_read += other.bytes_read;
1573    }
1574}
1575
1576/// Parquet reader metrics.
1577#[derive(Debug, Default, Clone)]
1578pub struct ReaderMetrics {
1579    /// Filtered row groups and rows metrics.
1580    pub(crate) filter_metrics: ReaderFilterMetrics,
1581    /// Duration to build the parquet reader.
1582    pub(crate) build_cost: Duration,
1583    /// Duration to scan the reader.
1584    pub(crate) scan_cost: Duration,
1585    /// Number of record batches read.
1586    pub(crate) num_record_batches: usize,
1587    /// Number of batches decoded.
1588    pub(crate) num_batches: usize,
1589    /// Number of rows read.
1590    pub(crate) num_rows: usize,
1591    /// Metrics for parquet metadata cache.
1592    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1593    /// Optional metrics for page/row group fetch operations.
1594    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1595    /// Memory size of metadata loaded for building file ranges.
1596    pub(crate) metadata_mem_size: isize,
1597    /// Number of file range builders created.
1598    pub(crate) num_range_builders: isize,
1599}
1600
1601impl ReaderMetrics {
1602    /// Adds `other` metrics to this metrics.
1603    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1604        self.filter_metrics.merge_from(&other.filter_metrics);
1605        self.build_cost += other.build_cost;
1606        self.scan_cost += other.scan_cost;
1607        self.num_record_batches += other.num_record_batches;
1608        self.num_batches += other.num_batches;
1609        self.num_rows += other.num_rows;
1610        self.metadata_cache_metrics
1611            .merge_from(&other.metadata_cache_metrics);
1612        if let Some(other_fetch) = &other.fetch_metrics {
1613            if let Some(self_fetch) = &self.fetch_metrics {
1614                self_fetch.merge_from(other_fetch);
1615            } else {
1616                self.fetch_metrics = Some(other_fetch.clone());
1617            }
1618        }
1619        self.metadata_mem_size += other.metadata_mem_size;
1620        self.num_range_builders += other.num_range_builders;
1621    }
1622
1623    /// Reports total rows.
1624    pub(crate) fn observe_rows(&self, read_type: &str) {
1625        READ_ROWS_TOTAL
1626            .with_label_values(&[read_type])
1627            .inc_by(self.num_rows as u64);
1628    }
1629}
1630
1631/// Builder to build a [ParquetRecordBatchStream] for a row group.
1632pub(crate) struct RowGroupReaderBuilder {
1633    /// SST file to read.
1634    ///
1635    /// Holds the file handle to avoid the file purge it.
1636    file_handle: FileHandle,
1637    /// Path of the file.
1638    file_path: String,
1639    /// Metadata of the parquet file.
1640    parquet_meta: Arc<ParquetMetaData>,
1641    /// Arrow reader metadata for building async stream.
1642    arrow_metadata: ArrowReaderMetadata,
1643    /// Object store as an Operator.
1644    object_store: ObjectStore,
1645    /// Projection mask.
1646    projection: ProjectionMask,
1647    /// Cache.
1648    cache_strategy: CacheStrategy,
1649    /// Pre-built prefilter state. `None` if prefiltering is not applicable.
1650    prefilter_builder: Option<PrefilterContextBuilder>,
1651}
1652
1653/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
1654/// needed for prefiltering decisions.
1655pub(crate) struct RowGroupBuildContext<'a> {
1656    /// Simple filters pushed down. Used by prefilter on other columns.
1657    #[allow(dead_code)]
1658    pub(crate) filters: &'a [SimpleFilterContext],
1659    /// Whether to skip field filters. Used by prefilter on other columns.
1660    #[allow(dead_code)]
1661    pub(crate) skip_fields: bool,
1662    /// Index of the row group to read.
1663    pub(crate) row_group_idx: usize,
1664    /// Row selection for the row group. `None` means all rows.
1665    pub(crate) row_selection: Option<RowSelection>,
1666    /// Metrics for tracking fetch operations.
1667    pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
1668}
1669
1670impl RowGroupReaderBuilder {
1671    /// Path of the file to read.
1672    pub(crate) fn file_path(&self) -> &str {
1673        &self.file_path
1674    }
1675
1676    /// Handle of the file to read.
1677    pub(crate) fn file_handle(&self) -> &FileHandle {
1678        &self.file_handle
1679    }
1680
1681    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1682        &self.parquet_meta
1683    }
1684
1685    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1686        &self.cache_strategy
1687    }
1688
1689    pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool {
1690        self.prefilter_builder.is_some()
1691    }
1692
1693    /// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
1694    ///
1695    /// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
1696    /// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
1697    /// 2. Reads the full projection with the refined row selection
1698    pub(crate) async fn build(
1699        &self,
1700        build_ctx: RowGroupBuildContext<'_>,
1701    ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1702        let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
1703
1704        let Some(mut prefilter_ctx) = prefilter_ctx else {
1705            // No prefilter applicable, build stream with full projection.
1706            return self
1707                .build_with_projection(
1708                    build_ctx.row_group_idx,
1709                    build_ctx.row_selection,
1710                    self.projection.clone(),
1711                    build_ctx.fetch_metrics,
1712                )
1713                .await;
1714        };
1715
1716        let prefilter_start = Instant::now();
1717        let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
1718        if let Some(metrics) = build_ctx.fetch_metrics {
1719            let mut data = metrics.data.lock().unwrap();
1720            data.prefilter_cost += prefilter_start.elapsed();
1721            data.prefilter_filtered_rows += prefilter_result.filtered_rows;
1722        }
1723
1724        let refined_selection = Some(prefilter_result.refined_selection);
1725
1726        self.build_with_projection(
1727            build_ctx.row_group_idx,
1728            refined_selection,
1729            self.projection.clone(),
1730            build_ctx.fetch_metrics,
1731        )
1732        .await
1733    }
1734
1735    /// Builds a [ParquetRecordBatchStream] with a custom projection mask.
1736    pub(crate) async fn build_with_projection(
1737        &self,
1738        row_group_idx: usize,
1739        row_selection: Option<RowSelection>,
1740        projection: ProjectionMask,
1741        fetch_metrics: Option<&ParquetFetchMetrics>,
1742    ) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
1743        // Create async file reader with caching support.
1744        let async_reader = SstAsyncFileReader::new(
1745            self.file_handle.file_id(),
1746            self.file_path.clone(),
1747            self.object_store.clone(),
1748            self.cache_strategy.clone(),
1749            self.parquet_meta.clone(),
1750            row_group_idx,
1751        )
1752        .with_fetch_metrics(fetch_metrics.cloned());
1753
1754        // Build the async stream using ArrowReaderBuilder API.
1755        let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
1756            async_reader,
1757            self.arrow_metadata.clone(),
1758        );
1759        builder = builder
1760            .with_row_groups(vec![row_group_idx])
1761            .with_projection(projection)
1762            .with_batch_size(DEFAULT_READ_BATCH_SIZE);
1763
1764        if let Some(selection) = row_selection {
1765            builder = builder.with_row_selection(selection);
1766        }
1767
1768        let stream = builder.build().context(ReadParquetSnafu {
1769            path: &self.file_path,
1770        })?;
1771
1772        Ok(stream)
1773    }
1774}
1775
1776/// The filter to evaluate or the prune result of the default value.
1777pub(crate) enum MaybeFilter {
1778    /// The filter to evaluate.
1779    Filter(SimpleFilterEvaluator),
1780    /// The filter matches the default value.
1781    Matched,
1782    /// The filter is pruned.
1783    Pruned,
1784}
1785
1786/// Context to evaluate the column filter for a parquet file.
1787pub(crate) struct SimpleFilterContext {
1788    /// Filter to evaluate.
1789    filter: MaybeFilter,
1790    /// Id of the column to evaluate.
1791    column_id: ColumnId,
1792    /// Semantic type of the column.
1793    semantic_type: SemanticType,
1794    /// The data type of the column.
1795    data_type: ConcreteDataType,
1796    /// Whether this filter can be applied by flat parquet primary-key prefiltering.
1797    usable_primary_key_filter: bool,
1798}
1799
1800impl SimpleFilterContext {
1801    /// Creates a context for the `expr`.
1802    ///
1803    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1804    /// expected metadata.
1805    pub(crate) fn new_opt(
1806        sst_meta: &RegionMetadataRef,
1807        expected_meta: Option<&RegionMetadata>,
1808        expr: &Expr,
1809    ) -> Option<Self> {
1810        let filter = SimpleFilterEvaluator::try_new(expr)?;
1811        // Parquet PK prefilter always supports the partition column. Only
1812        // PartitionTreeMemtable skips it after partition pruning.
1813        let usable_primary_key_filter =
1814            is_usable_primary_key_filter(sst_meta, expected_meta, &filter);
1815        let (column_metadata, maybe_filter) = match expected_meta {
1816            Some(meta) => {
1817                // Gets the column metadata from the expected metadata.
1818                let column = meta.column_by_name(filter.column_name())?;
1819                // Checks if the column is present in the SST metadata. We still uses the
1820                // column from the expected metadata.
1821                match sst_meta.column_by_id(column.column_id) {
1822                    Some(sst_column) => {
1823                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1824
1825                        (column, MaybeFilter::Filter(filter))
1826                    }
1827                    None => {
1828                        // If the column is not present in the SST metadata, we evaluate the filter
1829                        // against the default value of the column.
1830                        // If we can't evaluate the filter, we return None.
1831                        if pruned_by_default(&filter, column)? {
1832                            (column, MaybeFilter::Pruned)
1833                        } else {
1834                            (column, MaybeFilter::Matched)
1835                        }
1836                    }
1837                }
1838            }
1839            None => {
1840                let column = sst_meta.column_by_name(filter.column_name())?;
1841                (column, MaybeFilter::Filter(filter))
1842            }
1843        };
1844
1845        let usable_primary_key_filter =
1846            matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter;
1847
1848        Some(Self {
1849            filter: maybe_filter,
1850            column_id: column_metadata.column_id,
1851            semantic_type: column_metadata.semantic_type,
1852            data_type: column_metadata.column_schema.data_type.clone(),
1853            usable_primary_key_filter,
1854        })
1855    }
1856
1857    /// Returns the filter to evaluate.
1858    pub(crate) fn filter(&self) -> &MaybeFilter {
1859        &self.filter
1860    }
1861
1862    /// Returns the column id.
1863    pub(crate) fn column_id(&self) -> ColumnId {
1864        self.column_id
1865    }
1866
1867    /// Returns the semantic type of the column.
1868    pub(crate) fn semantic_type(&self) -> SemanticType {
1869        self.semantic_type
1870    }
1871
1872    /// Returns the data type of the column.
1873    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1874        &self.data_type
1875    }
1876
1877    /// Returns whether this filter is eligible for flat parquet PK prefiltering.
1878    pub(crate) fn usable_primary_key_filter(&self) -> bool {
1879        self.usable_primary_key_filter
1880    }
1881
1882    /// Returns the filter evaluator when it is eligible for PK prefiltering.
1883    pub(crate) fn primary_key_prefilter(&self) -> Option<SimpleFilterEvaluator> {
1884        if !self.usable_primary_key_filter {
1885            return None;
1886        }
1887
1888        match &self.filter {
1889            MaybeFilter::Filter(filter) => Some(filter.clone()),
1890            MaybeFilter::Matched | MaybeFilter::Pruned => None,
1891        }
1892    }
1893}
1894
1895/// Prune a column by its default value.
1896/// Returns false if we can't create the default value or evaluate the filter.
1897fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1898    let value = column.column_schema.create_default().ok().flatten()?;
1899    let scalar_value = value
1900        .try_to_scalar_value(&column.column_schema.data_type)
1901        .ok()?;
1902    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1903    Some(!matches)
1904}
1905
1906/// Parquet batch reader to read our SST format.
1907pub struct ParquetReader {
1908    /// File range context.
1909    context: FileRangeContextRef,
1910    /// Row group selection to read.
1911    selection: RowGroupSelection,
1912    /// Reader of current row group.
1913    reader: Option<FlatPruneReader>,
1914    /// Metrics for tracking row group fetch operations.
1915    fetch_metrics: ParquetFetchMetrics,
1916}
1917
1918impl ParquetReader {
1919    #[tracing::instrument(
1920        skip_all,
1921        fields(
1922            region_id = %self.context.reader_builder().file_handle.region_id(),
1923            file_id = %self.context.reader_builder().file_handle.file_id()
1924        )
1925    )]
1926    pub async fn next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1927        loop {
1928            if let Some(reader) = &mut self.reader {
1929                if let Some(batch) = reader.next_batch().await? {
1930                    return Ok(Some(batch));
1931                }
1932                self.reader = None;
1933                continue;
1934            }
1935
1936            let Some((row_group_idx, row_selection)) = self.selection.pop_first() else {
1937                return Ok(None);
1938            };
1939
1940            let skip_fields = self.context.pre_filter_mode().skip_fields();
1941            let parquet_reader = self
1942                .context
1943                .reader_builder()
1944                .build(self.context.build_context(
1945                    row_group_idx,
1946                    Some(row_selection),
1947                    Some(&self.fetch_metrics),
1948                    skip_fields,
1949                ))
1950                .await?;
1951            self.reader = Some(FlatPruneReader::new_with_row_group_reader(
1952                self.context.clone(),
1953                FlatRowGroupReader::new(self.context.clone(), parquet_reader),
1954                skip_fields,
1955            ));
1956        }
1957    }
1958    /// Creates a new reader.
1959    #[tracing::instrument(
1960        skip_all,
1961        fields(
1962            region_id = %context.reader_builder().file_handle.region_id(),
1963            file_id = %context.reader_builder().file_handle.file_id()
1964        )
1965    )]
1966    pub(crate) async fn new(
1967        context: FileRangeContextRef,
1968        mut selection: RowGroupSelection,
1969    ) -> Result<Self> {
1970        debug_assert!(context.read_format().as_flat().is_some());
1971        let fetch_metrics = ParquetFetchMetrics::default();
1972        let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1973            let skip_fields = context.pre_filter_mode().skip_fields();
1974            let parquet_reader = context
1975                .reader_builder()
1976                .build(context.build_context(
1977                    row_group_idx,
1978                    Some(row_selection),
1979                    Some(&fetch_metrics),
1980                    skip_fields,
1981                ))
1982                .await?;
1983            Some(FlatPruneReader::new_with_row_group_reader(
1984                context.clone(),
1985                FlatRowGroupReader::new(context.clone(), parquet_reader),
1986                skip_fields,
1987            ))
1988        } else {
1989            None
1990        };
1991
1992        Ok(ParquetReader {
1993            context,
1994            selection,
1995            reader,
1996            fetch_metrics,
1997        })
1998    }
1999
2000    /// Returns the metadata of the SST.
2001    pub fn metadata(&self) -> &RegionMetadataRef {
2002        self.context.read_format().metadata()
2003    }
2004
2005    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
2006        self.context.reader_builder().parquet_meta.clone()
2007    }
2008}
2009
2010/// RowGroupReaderContext represents the fields that cannot be shared
2011/// between different `RowGroupReader`s.
2012pub(crate) trait RowGroupReaderContext: Send {
2013    fn read_format(&self) -> &ReadFormat;
2014
2015    fn file_path(&self) -> &str;
2016}
2017
2018impl RowGroupReaderContext for FileRangeContextRef {
2019    fn read_format(&self) -> &ReadFormat {
2020        self.as_ref().read_format()
2021    }
2022
2023    fn file_path(&self) -> &str {
2024        self.as_ref().file_path()
2025    }
2026}
2027
2028/// [RowGroupReader] that reads from [FileRange].
2029pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
2030
2031#[allow(dead_code)]
2032impl RowGroupReader {
2033    /// Creates a new reader from file range.
2034    pub(crate) fn new(
2035        context: FileRangeContextRef,
2036        stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2037    ) -> Self {
2038        Self::create(context, stream)
2039    }
2040}
2041
2042/// Reader to read a row group of a parquet file.
2043pub(crate) struct RowGroupReaderBase<T> {
2044    /// Context of [RowGroupReader] so adapts to different underlying implementation.
2045    context: T,
2046    /// Inner parquet record batch stream.
2047    stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2048    /// Buffered batches to return.
2049    batches: VecDeque<Batch>,
2050    /// Local scan metrics.
2051    metrics: ReaderMetrics,
2052    /// Cached sequence array to override sequences.
2053    override_sequence: Option<ArrayRef>,
2054}
2055
2056#[allow(dead_code)]
2057impl<T> RowGroupReaderBase<T>
2058where
2059    T: RowGroupReaderContext,
2060{
2061    /// Creates a new reader to read the primary key format.
2062    pub(crate) fn create(context: T, stream: ParquetRecordBatchStream<SstAsyncFileReader>) -> Self {
2063        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2064        let override_sequence = context
2065            .read_format()
2066            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2067        assert!(context.read_format().as_primary_key().is_some());
2068
2069        Self {
2070            context,
2071            stream,
2072            batches: VecDeque::new(),
2073            metrics: ReaderMetrics::default(),
2074            override_sequence,
2075        }
2076    }
2077
2078    /// Gets the metrics.
2079    pub(crate) fn metrics(&self) -> &ReaderMetrics {
2080        &self.metrics
2081    }
2082
2083    /// Gets [ReadFormat] of underlying reader.
2084    pub(crate) fn read_format(&self) -> &ReadFormat {
2085        self.context.read_format()
2086    }
2087
2088    /// Tries to fetch next [RecordBatch] from the stream asynchronously.
2089    async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
2090        match self.stream.next().await.transpose() {
2091            Ok(batch) => Ok(batch),
2092            Err(e) => Err(e).context(ReadParquetSnafu {
2093                path: self.context.file_path(),
2094            }),
2095        }
2096    }
2097
2098    /// Returns the next [Batch].
2099    pub(crate) async fn next_inner(&mut self) -> Result<Option<Batch>> {
2100        let scan_start = Instant::now();
2101        if let Some(batch) = self.batches.pop_front() {
2102            self.metrics.num_rows += batch.num_rows();
2103            self.metrics.scan_cost += scan_start.elapsed();
2104            return Ok(Some(batch));
2105        }
2106
2107        // We need to fetch next record batch and convert it to batches.
2108        while self.batches.is_empty() {
2109            let Some(record_batch) = self.fetch_next_record_batch().await? else {
2110                self.metrics.scan_cost += scan_start.elapsed();
2111                return Ok(None);
2112            };
2113            self.metrics.num_record_batches += 1;
2114
2115            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
2116            self.context
2117                .read_format()
2118                .as_primary_key()
2119                .unwrap()
2120                .convert_record_batch(
2121                    &record_batch,
2122                    self.override_sequence.as_ref(),
2123                    &mut self.batches,
2124                )?;
2125            self.metrics.num_batches += self.batches.len();
2126        }
2127        let batch = self.batches.pop_front();
2128        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
2129        self.metrics.scan_cost += scan_start.elapsed();
2130        Ok(batch)
2131    }
2132}
2133
2134#[async_trait::async_trait]
2135impl<T> BatchReader for RowGroupReaderBase<T>
2136where
2137    T: RowGroupReaderContext + Send + Sync,
2138{
2139    async fn next_batch(&mut self) -> Result<Option<Batch>> {
2140        self.next_inner().await
2141    }
2142}
2143
2144/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
2145pub(crate) struct FlatRowGroupReader {
2146    /// Context for file ranges.
2147    context: FileRangeContextRef,
2148    /// Inner parquet record batch stream.
2149    stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2150    /// Cached sequence array to override sequences.
2151    override_sequence: Option<ArrayRef>,
2152}
2153
2154impl FlatRowGroupReader {
2155    /// Creates a new flat reader from file range.
2156    pub(crate) fn new(
2157        context: FileRangeContextRef,
2158        stream: ParquetRecordBatchStream<SstAsyncFileReader>,
2159    ) -> Self {
2160        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
2161        let override_sequence = context
2162            .read_format()
2163            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
2164
2165        Self {
2166            context,
2167            stream,
2168            override_sequence,
2169        }
2170    }
2171
2172    /// Returns the next RecordBatch.
2173    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
2174        match self.stream.next().await {
2175            Some(batch_result) => {
2176                let record_batch = batch_result.context(ReadParquetSnafu {
2177                    path: self.context.file_path(),
2178                })?;
2179
2180                // Safety: Only flat format use FlatRowGroupReader.
2181                let flat_format = self.context.read_format().as_flat().unwrap();
2182                let record_batch =
2183                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
2184                Ok(Some(record_batch))
2185            }
2186            None => Ok(None),
2187        }
2188    }
2189}
2190
2191#[cfg(test)]
2192mod tests {
2193    use std::any::Any;
2194    use std::fmt::{Debug, Formatter};
2195    use std::sync::{Arc, LazyLock};
2196
2197    use datafusion::arrow::datatypes::DataType;
2198    use datafusion_common::ScalarValue;
2199    use datafusion_expr::expr::ScalarFunction;
2200    use datafusion_expr::{
2201        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
2202        col, lit,
2203    };
2204    use datatypes::arrow::array::{ArrayRef, Int64Array};
2205    use datatypes::arrow::record_batch::RecordBatch;
2206    use datatypes::prelude::ConcreteDataType;
2207    use datatypes::schema::ColumnSchema;
2208    use object_store::services::Memory;
2209    use parquet::arrow::ArrowWriter;
2210    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
2211    use store_api::region_request::PathType;
2212    use table::predicate::Predicate;
2213
2214    use super::*;
2215    use crate::sst::parquet::metadata::MetadataLoader;
2216    use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
2217
2218    #[tokio::test(flavor = "current_thread")]
2219    async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
2220        #[derive(Eq, PartialEq, Hash)]
2221        struct PanicDebugUdf;
2222
2223        impl Debug for PanicDebugUdf {
2224            fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
2225                panic!("minmax predicate key should not format exprs when cache is disabled");
2226            }
2227        }
2228
2229        impl ScalarUDFImpl for PanicDebugUdf {
2230            fn as_any(&self) -> &dyn Any {
2231                self
2232            }
2233
2234            fn name(&self) -> &str {
2235                "panic_debug_udf"
2236            }
2237
2238            fn signature(&self) -> &Signature {
2239                static SIGNATURE: LazyLock<Signature> =
2240                    LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
2241                &SIGNATURE
2242            }
2243
2244            fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
2245                Ok(DataType::Int64)
2246            }
2247
2248            fn invoke_with_args(
2249                &self,
2250                _args: ScalarFunctionArgs,
2251            ) -> datafusion_common::Result<ColumnarValue> {
2252                Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
2253            }
2254        }
2255
2256        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
2257        let file_handle = sst_file_handle(0, 1);
2258        let table_dir = "test_table".to_string();
2259        let path_type = PathType::Bare;
2260        let file_path = file_handle.file_path(&table_dir, path_type);
2261
2262        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
2263        let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
2264        let mut parquet_bytes = Vec::new();
2265        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
2266        writer.write(&batch).unwrap();
2267        writer.close().unwrap();
2268        let file_size = parquet_bytes.len() as u64;
2269        object_store.write(&file_path, parquet_bytes).await.unwrap();
2270
2271        let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2272        let read_format =
2273            ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
2274
2275        let mut cache_metrics = MetadataCacheMetrics::default();
2276        let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
2277        let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
2278
2279        let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
2280        let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
2281            udf,
2282            vec![],
2283        ))]);
2284        let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
2285            .predicate(Some(predicate))
2286            .cache(CacheStrategy::Disabled);
2287
2288        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
2289        let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
2290        let mut metrics = ReaderFilterMetrics::default();
2291        let selection = builder.row_groups_by_minmax(
2292            &read_format,
2293            &parquet_meta,
2294            row_group_size,
2295            total_row_count,
2296            &mut metrics,
2297            false,
2298        );
2299
2300        assert!(!selection.is_empty());
2301    }
2302
2303    fn expected_metadata_with_reused_tag_name(
2304        old_metadata: &RegionMetadata,
2305    ) -> Arc<RegionMetadata> {
2306        let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
2307        builder
2308            .push_column_metadata(ColumnMetadata {
2309                column_schema: ColumnSchema::new(
2310                    "tag_0".to_string(),
2311                    ConcreteDataType::string_datatype(),
2312                    true,
2313                ),
2314                semantic_type: SemanticType::Tag,
2315                column_id: 10,
2316            })
2317            .push_column_metadata(ColumnMetadata {
2318                column_schema: ColumnSchema::new(
2319                    "tag_1".to_string(),
2320                    ConcreteDataType::string_datatype(),
2321                    true,
2322                ),
2323                semantic_type: SemanticType::Tag,
2324                column_id: 1,
2325            })
2326            .push_column_metadata(ColumnMetadata {
2327                column_schema: ColumnSchema::new(
2328                    "field_0".to_string(),
2329                    ConcreteDataType::uint64_datatype(),
2330                    true,
2331                ),
2332                semantic_type: SemanticType::Field,
2333                column_id: 2,
2334            })
2335            .push_column_metadata(ColumnMetadata {
2336                column_schema: ColumnSchema::new(
2337                    "ts".to_string(),
2338                    ConcreteDataType::timestamp_millisecond_datatype(),
2339                    false,
2340                ),
2341                semantic_type: SemanticType::Timestamp,
2342                column_id: 3,
2343            })
2344            .primary_key(vec![10, 1]);
2345
2346        Arc::new(builder.build().unwrap())
2347    }
2348
2349    #[test]
2350    fn test_simple_filter_context_marks_usable_primary_key_filter() {
2351        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2352        let ctx =
2353            SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap();
2354
2355        assert!(ctx.usable_primary_key_filter());
2356        assert!(ctx.primary_key_prefilter().is_some());
2357    }
2358
2359    #[test]
2360    fn test_simple_filter_context_skips_non_usable_primary_key_filter() {
2361        let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
2362
2363        let field_ctx =
2364            SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap();
2365        assert!(!field_ctx.usable_primary_key_filter());
2366        assert!(field_ctx.primary_key_prefilter().is_none());
2367
2368        let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
2369        let mismatched_ctx = SimpleFilterContext::new_opt(
2370            &metadata,
2371            Some(expected_metadata.as_ref()),
2372            &col("tag_0").eq(lit("a")),
2373        )
2374        .unwrap();
2375        assert!(!mismatched_ctx.usable_primary_key_filter());
2376        assert!(mismatched_ctx.primary_key_prefilter().is_none());
2377    }
2378}