mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, tracing, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, FileId};
40use table::predicate::Predicate;
41
42use crate::cache::CacheStrategy;
43use crate::cache::index::result_cache::PredicateKey;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
50    READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::FileHandle;
55use crate::sst::index::bloom_filter::applier::{
56    BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
57};
58use crate::sst::index::fulltext_index::applier::{
59    FulltextIndexApplierRef, FulltextIndexApplyMetrics,
60};
61use crate::sst::index::inverted_index::applier::{
62    InvertedIndexApplierRef, InvertedIndexApplyMetrics,
63};
64use crate::sst::parquet::file_range::{
65    FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
66};
67use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
68use crate::sst::parquet::metadata::MetadataLoader;
69use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
70use crate::sst::parquet::row_selection::RowGroupSelection;
71use crate::sst::parquet::stats::RowGroupPruningStats;
72use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
73
74const INDEX_TYPE_FULLTEXT: &str = "fulltext";
75const INDEX_TYPE_INVERTED: &str = "inverted";
76const INDEX_TYPE_BLOOM: &str = "bloom filter";
77
78macro_rules! handle_index_error {
79    ($err:expr, $file_handle:expr, $index_type:expr) => {
80        if cfg!(any(test, feature = "test")) {
81            panic!(
82                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
83                $index_type,
84                $file_handle.region_id(),
85                $file_handle.file_id(),
86                $err
87            );
88        } else {
89            warn!(
90                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
91                $index_type,
92                $file_handle.region_id(),
93                $file_handle.file_id()
94            );
95        }
96    };
97}
98
99/// Parquet SST reader builder.
100pub struct ParquetReaderBuilder {
101    /// SST directory.
102    table_dir: String,
103    /// Path type for generating file paths.
104    path_type: PathType,
105    file_handle: FileHandle,
106    object_store: ObjectStore,
107    /// Predicate to push down.
108    predicate: Option<Predicate>,
109    /// Metadata of columns to read.
110    ///
111    /// `None` reads all columns. Due to schema change, the projection
112    /// can contain columns not in the parquet file.
113    projection: Option<Vec<ColumnId>>,
114    /// Strategy to cache SST data.
115    cache_strategy: CacheStrategy,
116    /// Index appliers.
117    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
118    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
119    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
120    /// Expected metadata of the region while reading the SST.
121    /// This is usually the latest metadata of the region. The reader use
122    /// it get the correct column id of a column by name.
123    expected_metadata: Option<RegionMetadataRef>,
124    /// Whether to use flat format for reading.
125    flat_format: bool,
126    /// Whether this reader is for compaction.
127    compaction: bool,
128    /// Mode to pre-filter columns.
129    pre_filter_mode: PreFilterMode,
130    /// Whether to decode primary key values eagerly when reading primary key format SSTs.
131    decode_primary_key_values: bool,
132}
133
134impl ParquetReaderBuilder {
135    /// Returns a new [ParquetReaderBuilder] to read specific SST.
136    pub fn new(
137        table_dir: String,
138        path_type: PathType,
139        file_handle: FileHandle,
140        object_store: ObjectStore,
141    ) -> ParquetReaderBuilder {
142        ParquetReaderBuilder {
143            table_dir,
144            path_type,
145            file_handle,
146            object_store,
147            predicate: None,
148            projection: None,
149            cache_strategy: CacheStrategy::Disabled,
150            inverted_index_appliers: [None, None],
151            bloom_filter_index_appliers: [None, None],
152            fulltext_index_appliers: [None, None],
153            expected_metadata: None,
154            flat_format: false,
155            compaction: false,
156            pre_filter_mode: PreFilterMode::All,
157            decode_primary_key_values: false,
158        }
159    }
160
161    /// Attaches the predicate to the builder.
162    #[must_use]
163    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
164        self.predicate = predicate;
165        self
166    }
167
168    /// Attaches the projection to the builder.
169    ///
170    /// The reader only applies the projection to fields.
171    #[must_use]
172    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
173        self.projection = projection;
174        self
175    }
176
177    /// Attaches the cache to the builder.
178    #[must_use]
179    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
180        self.cache_strategy = cache;
181        self
182    }
183
184    /// Attaches the inverted index appliers to the builder.
185    #[must_use]
186    pub(crate) fn inverted_index_appliers(
187        mut self,
188        index_appliers: [Option<InvertedIndexApplierRef>; 2],
189    ) -> Self {
190        self.inverted_index_appliers = index_appliers;
191        self
192    }
193
194    /// Attaches the bloom filter index appliers to the builder.
195    #[must_use]
196    pub(crate) fn bloom_filter_index_appliers(
197        mut self,
198        index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
199    ) -> Self {
200        self.bloom_filter_index_appliers = index_appliers;
201        self
202    }
203
204    /// Attaches the fulltext index appliers to the builder.
205    #[must_use]
206    pub(crate) fn fulltext_index_appliers(
207        mut self,
208        index_appliers: [Option<FulltextIndexApplierRef>; 2],
209    ) -> Self {
210        self.fulltext_index_appliers = index_appliers;
211        self
212    }
213
214    /// Attaches the expected metadata to the builder.
215    #[must_use]
216    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
217        self.expected_metadata = expected_metadata;
218        self
219    }
220
221    /// Sets the flat format flag.
222    #[must_use]
223    pub fn flat_format(mut self, flat_format: bool) -> Self {
224        self.flat_format = flat_format;
225        self
226    }
227
228    /// Sets the compaction flag.
229    #[must_use]
230    pub fn compaction(mut self, compaction: bool) -> Self {
231        self.compaction = compaction;
232        self
233    }
234
235    /// Sets the pre-filter mode.
236    #[must_use]
237    pub(crate) fn pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
238        self.pre_filter_mode = pre_filter_mode;
239        self
240    }
241
242    /// Decodes primary key values eagerly when reading primary key format SSTs.
243    #[must_use]
244    pub(crate) fn decode_primary_key_values(mut self, decode: bool) -> Self {
245        self.decode_primary_key_values = decode;
246        self
247    }
248
249    /// Builds a [ParquetReader].
250    ///
251    /// This needs to perform IO operation.
252    #[tracing::instrument(
253        skip_all,
254        fields(
255            region_id = %self.file_handle.region_id(),
256            file_id = %self.file_handle.file_id()
257        )
258    )]
259    pub async fn build(&self) -> Result<ParquetReader> {
260        let mut metrics = ReaderMetrics::default();
261
262        let (context, selection) = self.build_reader_input(&mut metrics).await?;
263        ParquetReader::new(Arc::new(context), selection).await
264    }
265
266    /// Builds a [FileRangeContext] and collects row groups to read.
267    ///
268    /// This needs to perform IO operation.
269    #[tracing::instrument(
270        skip_all,
271        fields(
272            region_id = %self.file_handle.region_id(),
273            file_id = %self.file_handle.file_id()
274        )
275    )]
276    pub(crate) async fn build_reader_input(
277        &self,
278        metrics: &mut ReaderMetrics,
279    ) -> Result<(FileRangeContext, RowGroupSelection)> {
280        let start = Instant::now();
281
282        let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
283        let file_size = self.file_handle.meta_ref().file_size;
284
285        // Loads parquet metadata of the file.
286        let (parquet_meta, cache_miss) = self
287            .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
288            .await?;
289        // Decodes region metadata.
290        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
291        // Gets the metadata stored in the SST.
292        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
293        let mut read_format = if let Some(column_ids) = &self.projection {
294            ReadFormat::new(
295                region_meta.clone(),
296                Some(column_ids),
297                self.flat_format,
298                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
299                &file_path,
300                self.compaction,
301            )?
302        } else {
303            // Lists all column ids to read, we always use the expected metadata if possible.
304            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
305            let column_ids: Vec<_> = expected_meta
306                .column_metadatas
307                .iter()
308                .map(|col| col.column_id)
309                .collect();
310            ReadFormat::new(
311                region_meta.clone(),
312                Some(&column_ids),
313                self.flat_format,
314                Some(parquet_meta.file_metadata().schema_descr().num_columns()),
315                &file_path,
316                self.compaction,
317            )?
318        };
319        if self.decode_primary_key_values {
320            read_format.set_decode_primary_key_values(true);
321        }
322        if need_override_sequence(&parquet_meta) {
323            read_format
324                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
325        }
326
327        // Computes the projection mask.
328        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
329        let indices = read_format.projection_indices();
330        // Now we assumes we don't have nested schemas.
331        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
332        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
333
334        // Computes the field levels.
335        let hint = Some(read_format.arrow_schema().fields());
336        let field_levels =
337            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
338                .context(ReadDataPartSnafu)?;
339        let selection = self
340            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
341            .await;
342
343        // Trigger background download if metadata had a cache miss and selection is not empty
344        if cache_miss && !selection.is_empty() {
345            use crate::cache::file_cache::{FileType, IndexKey};
346            let index_key = IndexKey::new(
347                self.file_handle.region_id(),
348                self.file_handle.file_id().file_id(),
349                FileType::Parquet,
350            );
351            self.cache_strategy.maybe_download_background(
352                index_key,
353                file_path.clone(),
354                self.object_store.clone(),
355                file_size,
356            );
357        }
358
359        let prune_schema = self
360            .expected_metadata
361            .as_ref()
362            .map(|meta| meta.schema.clone())
363            .unwrap_or_else(|| region_meta.schema.clone());
364
365        let reader_builder = RowGroupReaderBuilder {
366            file_handle: self.file_handle.clone(),
367            file_path,
368            parquet_meta,
369            object_store: self.object_store.clone(),
370            projection: projection_mask,
371            field_levels,
372            cache_strategy: self.cache_strategy.clone(),
373        };
374
375        let filters = if let Some(predicate) = &self.predicate {
376            predicate
377                .exprs()
378                .iter()
379                .filter_map(|expr| {
380                    SimpleFilterContext::new_opt(
381                        &region_meta,
382                        self.expected_metadata.as_deref(),
383                        expr,
384                    )
385                })
386                .collect::<Vec<_>>()
387        } else {
388            vec![]
389        };
390
391        let dyn_filters = if let Some(predicate) = &self.predicate {
392            predicate.dyn_filters().clone()
393        } else {
394            Arc::new(vec![])
395        };
396
397        let codec = build_primary_key_codec(read_format.metadata());
398
399        let context = FileRangeContext::new(
400            reader_builder,
401            RangeBase {
402                filters,
403                dyn_filters,
404                read_format,
405                expected_metadata: self.expected_metadata.clone(),
406                prune_schema,
407                codec,
408                compat_batch: None,
409                pre_filter_mode: self.pre_filter_mode,
410            },
411        );
412
413        metrics.build_cost += start.elapsed();
414
415        Ok((context, selection))
416    }
417
418    /// Decodes region metadata from key value.
419    fn get_region_metadata(
420        file_path: &str,
421        key_value_meta: Option<&Vec<KeyValue>>,
422    ) -> Result<RegionMetadata> {
423        let key_values = key_value_meta.context(InvalidParquetSnafu {
424            file: file_path,
425            reason: "missing key value meta",
426        })?;
427        let meta_value = key_values
428            .iter()
429            .find(|kv| kv.key == PARQUET_METADATA_KEY)
430            .with_context(|| InvalidParquetSnafu {
431                file: file_path,
432                reason: format!("key {} not found", PARQUET_METADATA_KEY),
433            })?;
434        let json = meta_value
435            .value
436            .as_ref()
437            .with_context(|| InvalidParquetSnafu {
438                file: file_path,
439                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
440            })?;
441
442        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
443    }
444
445    /// Reads parquet metadata of specific file.
446    /// Returns (metadata, cache_miss_flag).
447    async fn read_parquet_metadata(
448        &self,
449        file_path: &str,
450        file_size: u64,
451        cache_metrics: &mut MetadataCacheMetrics,
452    ) -> Result<(Arc<ParquetMetaData>, bool)> {
453        let start = Instant::now();
454        let _t = READ_STAGE_ELAPSED
455            .with_label_values(&["read_parquet_metadata"])
456            .start_timer();
457
458        let file_id = self.file_handle.file_id();
459        // Tries to get from cache with metrics tracking.
460        if let Some(metadata) = self
461            .cache_strategy
462            .get_parquet_meta_data(file_id, cache_metrics)
463            .await
464        {
465            cache_metrics.metadata_load_cost += start.elapsed();
466            return Ok((metadata, false));
467        }
468
469        // Cache miss, load metadata directly.
470        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
471        let metadata = metadata_loader.load().await?;
472
473        let metadata = Arc::new(metadata);
474        // Cache the metadata.
475        self.cache_strategy
476            .put_parquet_meta_data(file_id, metadata.clone());
477
478        cache_metrics.metadata_load_cost += start.elapsed();
479        Ok((metadata, true))
480    }
481
482    /// Computes row groups to read, along with their respective row selections.
483    #[tracing::instrument(
484        skip_all,
485        fields(
486            region_id = %self.file_handle.region_id(),
487            file_id = %self.file_handle.file_id()
488        )
489    )]
490    async fn row_groups_to_read(
491        &self,
492        read_format: &ReadFormat,
493        parquet_meta: &ParquetMetaData,
494        metrics: &mut ReaderFilterMetrics,
495    ) -> RowGroupSelection {
496        let num_row_groups = parquet_meta.num_row_groups();
497        let num_rows = parquet_meta.file_metadata().num_rows();
498        if num_row_groups == 0 || num_rows == 0 {
499            return RowGroupSelection::default();
500        }
501
502        // Let's assume that the number of rows in the first row group
503        // can represent the `row_group_size` of the Parquet file.
504        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
505        if row_group_size == 0 {
506            return RowGroupSelection::default();
507        }
508
509        metrics.rg_total += num_row_groups;
510        metrics.rows_total += num_rows as usize;
511
512        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
513
514        // Compute skip_fields once for all pruning operations
515        let skip_fields = self.compute_skip_fields(parquet_meta);
516
517        self.prune_row_groups_by_minmax(
518            read_format,
519            parquet_meta,
520            &mut output,
521            metrics,
522            skip_fields,
523        );
524        if output.is_empty() {
525            return output;
526        }
527
528        let fulltext_filtered = self
529            .prune_row_groups_by_fulltext_index(
530                row_group_size,
531                num_row_groups,
532                &mut output,
533                metrics,
534                skip_fields,
535            )
536            .await;
537        if output.is_empty() {
538            return output;
539        }
540
541        self.prune_row_groups_by_inverted_index(
542            row_group_size,
543            num_row_groups,
544            &mut output,
545            metrics,
546            skip_fields,
547        )
548        .await;
549        if output.is_empty() {
550            return output;
551        }
552
553        self.prune_row_groups_by_bloom_filter(
554            row_group_size,
555            parquet_meta,
556            &mut output,
557            metrics,
558            skip_fields,
559        )
560        .await;
561        if output.is_empty() {
562            return output;
563        }
564
565        if !fulltext_filtered {
566            self.prune_row_groups_by_fulltext_bloom(
567                row_group_size,
568                parquet_meta,
569                &mut output,
570                metrics,
571                skip_fields,
572            )
573            .await;
574        }
575        output
576    }
577
578    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
579    async fn prune_row_groups_by_fulltext_index(
580        &self,
581        row_group_size: usize,
582        num_row_groups: usize,
583        output: &mut RowGroupSelection,
584        metrics: &mut ReaderFilterMetrics,
585        skip_fields: bool,
586    ) -> bool {
587        if !self.file_handle.meta_ref().fulltext_index_available() {
588            return false;
589        }
590
591        let mut pruned = false;
592        // If skip_fields is true, only apply the first applier (for tags).
593        let appliers = if skip_fields {
594            &self.fulltext_index_appliers[..1]
595        } else {
596            &self.fulltext_index_appliers[..]
597        };
598        for index_applier in appliers.iter().flatten() {
599            let predicate_key = index_applier.predicate_key();
600            // Fast path: return early if the result is in the cache.
601            let cached = self
602                .cache_strategy
603                .index_result_cache()
604                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
605            if let Some(result) = cached.as_ref()
606                && all_required_row_groups_searched(output, result)
607            {
608                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
609                pruned = true;
610                continue;
611            }
612
613            // Slow path: apply the index from the file.
614            let file_size_hint = self.file_handle.meta_ref().index_file_size();
615            let apply_res = index_applier
616                .apply_fine(
617                    self.file_handle.index_id(),
618                    Some(file_size_hint),
619                    metrics.fulltext_index_apply_metrics.as_mut(),
620                )
621                .await;
622            let selection = match apply_res {
623                Ok(Some(res)) => {
624                    RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups)
625                }
626                Ok(None) => continue,
627                Err(err) => {
628                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
629                    continue;
630                }
631            };
632
633            self.apply_index_result_and_update_cache(
634                predicate_key,
635                self.file_handle.file_id().file_id(),
636                selection,
637                output,
638                metrics,
639                INDEX_TYPE_FULLTEXT,
640            );
641            pruned = true;
642        }
643        pruned
644    }
645
646    /// Applies index to prune row groups.
647    ///
648    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
649    /// as an escape route in case of index issues, and it can be used to test
650    /// the correctness of the index.
651    async fn prune_row_groups_by_inverted_index(
652        &self,
653        row_group_size: usize,
654        num_row_groups: usize,
655        output: &mut RowGroupSelection,
656        metrics: &mut ReaderFilterMetrics,
657        skip_fields: bool,
658    ) -> bool {
659        if !self.file_handle.meta_ref().inverted_index_available() {
660            return false;
661        }
662
663        let mut pruned = false;
664        // If skip_fields is true, only apply the first applier (for tags).
665        let appliers = if skip_fields {
666            &self.inverted_index_appliers[..1]
667        } else {
668            &self.inverted_index_appliers[..]
669        };
670        for index_applier in appliers.iter().flatten() {
671            let predicate_key = index_applier.predicate_key();
672            // Fast path: return early if the result is in the cache.
673            let cached = self
674                .cache_strategy
675                .index_result_cache()
676                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
677            if let Some(result) = cached.as_ref()
678                && all_required_row_groups_searched(output, result)
679            {
680                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
681                pruned = true;
682                continue;
683            }
684
685            // Slow path: apply the index from the file.
686            let file_size_hint = self.file_handle.meta_ref().index_file_size();
687            let apply_res = index_applier
688                .apply(
689                    self.file_handle.index_id(),
690                    Some(file_size_hint),
691                    metrics.inverted_index_apply_metrics.as_mut(),
692                )
693                .await;
694            let selection = match apply_res {
695                Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
696                    row_group_size,
697                    num_row_groups,
698                    apply_output,
699                ),
700                Err(err) => {
701                    handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
702                    continue;
703                }
704            };
705
706            self.apply_index_result_and_update_cache(
707                predicate_key,
708                self.file_handle.file_id().file_id(),
709                selection,
710                output,
711                metrics,
712                INDEX_TYPE_INVERTED,
713            );
714            pruned = true;
715        }
716        pruned
717    }
718
719    async fn prune_row_groups_by_bloom_filter(
720        &self,
721        row_group_size: usize,
722        parquet_meta: &ParquetMetaData,
723        output: &mut RowGroupSelection,
724        metrics: &mut ReaderFilterMetrics,
725        skip_fields: bool,
726    ) -> bool {
727        if !self.file_handle.meta_ref().bloom_filter_index_available() {
728            return false;
729        }
730
731        let mut pruned = false;
732        // If skip_fields is true, only apply the first applier (for tags).
733        let appliers = if skip_fields {
734            &self.bloom_filter_index_appliers[..1]
735        } else {
736            &self.bloom_filter_index_appliers[..]
737        };
738        for index_applier in appliers.iter().flatten() {
739            let predicate_key = index_applier.predicate_key();
740            // Fast path: return early if the result is in the cache.
741            let cached = self
742                .cache_strategy
743                .index_result_cache()
744                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
745            if let Some(result) = cached.as_ref()
746                && all_required_row_groups_searched(output, result)
747            {
748                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
749                pruned = true;
750                continue;
751            }
752
753            // Slow path: apply the index from the file.
754            let file_size_hint = self.file_handle.meta_ref().index_file_size();
755            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
756                (
757                    rg.num_rows() as usize,
758                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
759                    output.contains_non_empty_row_group(i)
760                        && cached
761                            .as_ref()
762                            .map(|c| !c.contains_row_group(i))
763                            .unwrap_or(true),
764                )
765            });
766            let apply_res = index_applier
767                .apply(
768                    self.file_handle.index_id(),
769                    Some(file_size_hint),
770                    rgs,
771                    metrics.bloom_filter_apply_metrics.as_mut(),
772                )
773                .await;
774            let mut selection = match apply_res {
775                Ok(apply_output) => {
776                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
777                }
778                Err(err) => {
779                    handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
780                    continue;
781                }
782            };
783
784            // New searched row groups are added to `selection`, concat them with `cached`.
785            if let Some(cached) = cached.as_ref() {
786                selection.concat(cached);
787            }
788
789            self.apply_index_result_and_update_cache(
790                predicate_key,
791                self.file_handle.file_id().file_id(),
792                selection,
793                output,
794                metrics,
795                INDEX_TYPE_BLOOM,
796            );
797            pruned = true;
798        }
799        pruned
800    }
801
802    async fn prune_row_groups_by_fulltext_bloom(
803        &self,
804        row_group_size: usize,
805        parquet_meta: &ParquetMetaData,
806        output: &mut RowGroupSelection,
807        metrics: &mut ReaderFilterMetrics,
808        skip_fields: bool,
809    ) -> bool {
810        if !self.file_handle.meta_ref().fulltext_index_available() {
811            return false;
812        }
813
814        let mut pruned = false;
815        // If skip_fields is true, only apply the first applier (for tags).
816        let appliers = if skip_fields {
817            &self.fulltext_index_appliers[..1]
818        } else {
819            &self.fulltext_index_appliers[..]
820        };
821        for index_applier in appliers.iter().flatten() {
822            let predicate_key = index_applier.predicate_key();
823            // Fast path: return early if the result is in the cache.
824            let cached = self
825                .cache_strategy
826                .index_result_cache()
827                .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
828            if let Some(result) = cached.as_ref()
829                && all_required_row_groups_searched(output, result)
830            {
831                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
832                pruned = true;
833                continue;
834            }
835
836            // Slow path: apply the index from the file.
837            let file_size_hint = self.file_handle.meta_ref().index_file_size();
838            let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
839                (
840                    rg.num_rows() as usize,
841                    // Optimize: only search the row group that required by `output` and not stored in `cached`.
842                    output.contains_non_empty_row_group(i)
843                        && cached
844                            .as_ref()
845                            .map(|c| !c.contains_row_group(i))
846                            .unwrap_or(true),
847                )
848            });
849            let apply_res = index_applier
850                .apply_coarse(
851                    self.file_handle.index_id(),
852                    Some(file_size_hint),
853                    rgs,
854                    metrics.fulltext_index_apply_metrics.as_mut(),
855                )
856                .await;
857            let mut selection = match apply_res {
858                Ok(Some(apply_output)) => {
859                    RowGroupSelection::from_row_ranges(apply_output, row_group_size)
860                }
861                Ok(None) => continue,
862                Err(err) => {
863                    handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
864                    continue;
865                }
866            };
867
868            // New searched row groups are added to `selection`, concat them with `cached`.
869            if let Some(cached) = cached.as_ref() {
870                selection.concat(cached);
871            }
872
873            self.apply_index_result_and_update_cache(
874                predicate_key,
875                self.file_handle.file_id().file_id(),
876                selection,
877                output,
878                metrics,
879                INDEX_TYPE_FULLTEXT,
880            );
881            pruned = true;
882        }
883        pruned
884    }
885
886    /// Computes whether to skip field columns when building statistics based on PreFilterMode.
887    fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
888        match self.pre_filter_mode {
889            PreFilterMode::All => false,
890            PreFilterMode::SkipFields => true,
891            PreFilterMode::SkipFieldsOnDelete => {
892                // Check if any row group contains delete op
893                let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
894                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
895                    row_group_contains_delete(parquet_meta, rg_idx, &file_path)
896                        .inspect_err(|e| {
897                            warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
898                        })
899                        .unwrap_or(false)
900                })
901            }
902        }
903    }
904
905    /// Prunes row groups by min-max index.
906    fn prune_row_groups_by_minmax(
907        &self,
908        read_format: &ReadFormat,
909        parquet_meta: &ParquetMetaData,
910        output: &mut RowGroupSelection,
911        metrics: &mut ReaderFilterMetrics,
912        skip_fields: bool,
913    ) -> bool {
914        let Some(predicate) = &self.predicate else {
915            return false;
916        };
917
918        let row_groups_before = output.row_group_count();
919
920        let region_meta = read_format.metadata();
921        let row_groups = parquet_meta.row_groups();
922        let stats = RowGroupPruningStats::new(
923            row_groups,
924            read_format,
925            self.expected_metadata.clone(),
926            skip_fields,
927        );
928        let prune_schema = self
929            .expected_metadata
930            .as_ref()
931            .map(|meta| meta.schema.arrow_schema())
932            .unwrap_or_else(|| region_meta.schema.arrow_schema());
933
934        // Here we use the schema of the SST to build the physical expression. If the column
935        // in the SST doesn't have the same column id as the column in the expected metadata,
936        // we will get a None statistics for that column.
937        predicate
938            .prune_with_stats(&stats, prune_schema)
939            .iter()
940            .zip(0..parquet_meta.num_row_groups())
941            .for_each(|(mask, row_group)| {
942                if !*mask {
943                    output.remove_row_group(row_group);
944                }
945            });
946
947        let row_groups_after = output.row_group_count();
948        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
949
950        true
951    }
952
953    fn apply_index_result_and_update_cache(
954        &self,
955        predicate_key: &PredicateKey,
956        file_id: FileId,
957        result: RowGroupSelection,
958        output: &mut RowGroupSelection,
959        metrics: &mut ReaderFilterMetrics,
960        index_type: &str,
961    ) {
962        apply_selection_and_update_metrics(output, &result, metrics, index_type);
963
964        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
965            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
966        }
967    }
968}
969
970fn apply_selection_and_update_metrics(
971    output: &mut RowGroupSelection,
972    result: &RowGroupSelection,
973    metrics: &mut ReaderFilterMetrics,
974    index_type: &str,
975) {
976    let intersection = output.intersect(result);
977
978    let row_group_count = output.row_group_count() - intersection.row_group_count();
979    let row_count = output.row_count() - intersection.row_count();
980
981    metrics.update_index_metrics(index_type, row_group_count, row_count);
982
983    *output = intersection;
984}
985
986fn all_required_row_groups_searched(
987    required_row_groups: &RowGroupSelection,
988    cached_row_groups: &RowGroupSelection,
989) -> bool {
990    required_row_groups.iter().all(|(rg_id, _)| {
991        // Row group with no rows is not required to search.
992        !required_row_groups.contains_non_empty_row_group(*rg_id)
993            // The row group is already searched.
994            || cached_row_groups.contains_row_group(*rg_id)
995    })
996}
997
998/// Metrics of filtering rows groups and rows.
999#[derive(Debug, Default, Clone)]
1000pub(crate) struct ReaderFilterMetrics {
1001    /// Number of row groups before filtering.
1002    pub(crate) rg_total: usize,
1003    /// Number of row groups filtered by fulltext index.
1004    pub(crate) rg_fulltext_filtered: usize,
1005    /// Number of row groups filtered by inverted index.
1006    pub(crate) rg_inverted_filtered: usize,
1007    /// Number of row groups filtered by min-max index.
1008    pub(crate) rg_minmax_filtered: usize,
1009    /// Number of row groups filtered by bloom filter index.
1010    pub(crate) rg_bloom_filtered: usize,
1011
1012    /// Number of rows in row group before filtering.
1013    pub(crate) rows_total: usize,
1014    /// Number of rows in row group filtered by fulltext index.
1015    pub(crate) rows_fulltext_filtered: usize,
1016    /// Number of rows in row group filtered by inverted index.
1017    pub(crate) rows_inverted_filtered: usize,
1018    /// Number of rows in row group filtered by bloom filter index.
1019    pub(crate) rows_bloom_filtered: usize,
1020    /// Number of rows filtered by precise filter.
1021    pub(crate) rows_precise_filtered: usize,
1022
1023    /// Optional metrics for inverted index applier.
1024    pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
1025    /// Optional metrics for bloom filter index applier.
1026    pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
1027    /// Optional metrics for fulltext index applier.
1028    pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
1029}
1030
1031impl ReaderFilterMetrics {
1032    /// Adds `other` metrics to this metrics.
1033    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
1034        self.rg_total += other.rg_total;
1035        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
1036        self.rg_inverted_filtered += other.rg_inverted_filtered;
1037        self.rg_minmax_filtered += other.rg_minmax_filtered;
1038        self.rg_bloom_filtered += other.rg_bloom_filtered;
1039
1040        self.rows_total += other.rows_total;
1041        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
1042        self.rows_inverted_filtered += other.rows_inverted_filtered;
1043        self.rows_bloom_filtered += other.rows_bloom_filtered;
1044        self.rows_precise_filtered += other.rows_precise_filtered;
1045
1046        // Merge optional applier metrics
1047        if let Some(other_metrics) = &other.inverted_index_apply_metrics {
1048            self.inverted_index_apply_metrics
1049                .get_or_insert_with(Default::default)
1050                .merge_from(other_metrics);
1051        }
1052        if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
1053            self.bloom_filter_apply_metrics
1054                .get_or_insert_with(Default::default)
1055                .merge_from(other_metrics);
1056        }
1057        if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
1058            self.fulltext_index_apply_metrics
1059                .get_or_insert_with(Default::default)
1060                .merge_from(other_metrics);
1061        }
1062    }
1063
1064    /// Reports metrics.
1065    pub(crate) fn observe(&self) {
1066        READ_ROW_GROUPS_TOTAL
1067            .with_label_values(&["before_filtering"])
1068            .inc_by(self.rg_total as u64);
1069        READ_ROW_GROUPS_TOTAL
1070            .with_label_values(&["fulltext_index_filtered"])
1071            .inc_by(self.rg_fulltext_filtered as u64);
1072        READ_ROW_GROUPS_TOTAL
1073            .with_label_values(&["inverted_index_filtered"])
1074            .inc_by(self.rg_inverted_filtered as u64);
1075        READ_ROW_GROUPS_TOTAL
1076            .with_label_values(&["minmax_index_filtered"])
1077            .inc_by(self.rg_minmax_filtered as u64);
1078        READ_ROW_GROUPS_TOTAL
1079            .with_label_values(&["bloom_filter_index_filtered"])
1080            .inc_by(self.rg_bloom_filtered as u64);
1081
1082        PRECISE_FILTER_ROWS_TOTAL
1083            .with_label_values(&["parquet"])
1084            .inc_by(self.rows_precise_filtered as u64);
1085        READ_ROWS_IN_ROW_GROUP_TOTAL
1086            .with_label_values(&["before_filtering"])
1087            .inc_by(self.rows_total as u64);
1088        READ_ROWS_IN_ROW_GROUP_TOTAL
1089            .with_label_values(&["fulltext_index_filtered"])
1090            .inc_by(self.rows_fulltext_filtered as u64);
1091        READ_ROWS_IN_ROW_GROUP_TOTAL
1092            .with_label_values(&["inverted_index_filtered"])
1093            .inc_by(self.rows_inverted_filtered as u64);
1094        READ_ROWS_IN_ROW_GROUP_TOTAL
1095            .with_label_values(&["bloom_filter_index_filtered"])
1096            .inc_by(self.rows_bloom_filtered as u64);
1097    }
1098
1099    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
1100        match index_type {
1101            INDEX_TYPE_FULLTEXT => {
1102                self.rg_fulltext_filtered += row_group_count;
1103                self.rows_fulltext_filtered += row_count;
1104            }
1105            INDEX_TYPE_INVERTED => {
1106                self.rg_inverted_filtered += row_group_count;
1107                self.rows_inverted_filtered += row_count;
1108            }
1109            INDEX_TYPE_BLOOM => {
1110                self.rg_bloom_filtered += row_group_count;
1111                self.rows_bloom_filtered += row_count;
1112            }
1113            _ => {}
1114        }
1115    }
1116}
1117
1118/// Metrics for parquet metadata cache operations.
1119#[derive(Default, Clone, Copy)]
1120pub(crate) struct MetadataCacheMetrics {
1121    /// Number of memory cache hits for parquet metadata.
1122    pub(crate) mem_cache_hit: usize,
1123    /// Number of file cache hits for parquet metadata.
1124    pub(crate) file_cache_hit: usize,
1125    /// Number of cache misses for parquet metadata.
1126    pub(crate) cache_miss: usize,
1127    /// Duration to load parquet metadata.
1128    pub(crate) metadata_load_cost: Duration,
1129}
1130
1131impl std::fmt::Debug for MetadataCacheMetrics {
1132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1133        let Self {
1134            mem_cache_hit,
1135            file_cache_hit,
1136            cache_miss,
1137            metadata_load_cost,
1138        } = self;
1139
1140        if self.is_empty() {
1141            return write!(f, "{{}}");
1142        }
1143        write!(f, "{{")?;
1144
1145        write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
1146
1147        if *mem_cache_hit > 0 {
1148            write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
1149        }
1150        if *file_cache_hit > 0 {
1151            write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
1152        }
1153        if *cache_miss > 0 {
1154            write!(f, ", \"cache_miss\":{}", cache_miss)?;
1155        }
1156
1157        write!(f, "}}")
1158    }
1159}
1160
1161impl MetadataCacheMetrics {
1162    /// Returns true if the metrics are empty (contain no meaningful data).
1163    pub(crate) fn is_empty(&self) -> bool {
1164        self.metadata_load_cost.is_zero()
1165    }
1166
1167    /// Adds `other` metrics to this metrics.
1168    pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
1169        self.mem_cache_hit += other.mem_cache_hit;
1170        self.file_cache_hit += other.file_cache_hit;
1171        self.cache_miss += other.cache_miss;
1172        self.metadata_load_cost += other.metadata_load_cost;
1173    }
1174}
1175
1176/// Parquet reader metrics.
1177#[derive(Debug, Default, Clone)]
1178pub struct ReaderMetrics {
1179    /// Filtered row groups and rows metrics.
1180    pub(crate) filter_metrics: ReaderFilterMetrics,
1181    /// Duration to build the parquet reader.
1182    pub(crate) build_cost: Duration,
1183    /// Duration to scan the reader.
1184    pub(crate) scan_cost: Duration,
1185    /// Number of record batches read.
1186    pub(crate) num_record_batches: usize,
1187    /// Number of batches decoded.
1188    pub(crate) num_batches: usize,
1189    /// Number of rows read.
1190    pub(crate) num_rows: usize,
1191    /// Metrics for parquet metadata cache.
1192    pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
1193    /// Optional metrics for page/row group fetch operations.
1194    pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
1195}
1196
1197impl ReaderMetrics {
1198    /// Adds `other` metrics to this metrics.
1199    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
1200        self.filter_metrics.merge_from(&other.filter_metrics);
1201        self.build_cost += other.build_cost;
1202        self.scan_cost += other.scan_cost;
1203        self.num_record_batches += other.num_record_batches;
1204        self.num_batches += other.num_batches;
1205        self.num_rows += other.num_rows;
1206        self.metadata_cache_metrics
1207            .merge_from(&other.metadata_cache_metrics);
1208        if let Some(other_fetch) = &other.fetch_metrics {
1209            if let Some(self_fetch) = &self.fetch_metrics {
1210                self_fetch.merge_from(other_fetch);
1211            } else {
1212                self.fetch_metrics = Some(other_fetch.clone());
1213            }
1214        }
1215    }
1216
1217    /// Reports total rows.
1218    pub(crate) fn observe_rows(&self, read_type: &str) {
1219        READ_ROWS_TOTAL
1220            .with_label_values(&[read_type])
1221            .inc_by(self.num_rows as u64);
1222    }
1223}
1224
1225/// Builder to build a [ParquetRecordBatchReader] for a row group.
1226pub(crate) struct RowGroupReaderBuilder {
1227    /// SST file to read.
1228    ///
1229    /// Holds the file handle to avoid the file purge it.
1230    file_handle: FileHandle,
1231    /// Path of the file.
1232    file_path: String,
1233    /// Metadata of the parquet file.
1234    parquet_meta: Arc<ParquetMetaData>,
1235    /// Object store as an Operator.
1236    object_store: ObjectStore,
1237    /// Projection mask.
1238    projection: ProjectionMask,
1239    /// Field levels to read.
1240    field_levels: FieldLevels,
1241    /// Cache.
1242    cache_strategy: CacheStrategy,
1243}
1244
1245impl RowGroupReaderBuilder {
1246    /// Path of the file to read.
1247    pub(crate) fn file_path(&self) -> &str {
1248        &self.file_path
1249    }
1250
1251    /// Handle of the file to read.
1252    pub(crate) fn file_handle(&self) -> &FileHandle {
1253        &self.file_handle
1254    }
1255
1256    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
1257        &self.parquet_meta
1258    }
1259
1260    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
1261        &self.cache_strategy
1262    }
1263
1264    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
1265    pub(crate) async fn build(
1266        &self,
1267        row_group_idx: usize,
1268        row_selection: Option<RowSelection>,
1269        fetch_metrics: Option<&ParquetFetchMetrics>,
1270    ) -> Result<ParquetRecordBatchReader> {
1271        let fetch_start = Instant::now();
1272
1273        let mut row_group = InMemoryRowGroup::create(
1274            self.file_handle.region_id(),
1275            self.file_handle.file_id().file_id(),
1276            &self.parquet_meta,
1277            row_group_idx,
1278            self.cache_strategy.clone(),
1279            &self.file_path,
1280            self.object_store.clone(),
1281        );
1282        // Fetches data into memory.
1283        row_group
1284            .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
1285            .await
1286            .context(ReadParquetSnafu {
1287                path: &self.file_path,
1288            })?;
1289
1290        // Record total fetch elapsed time.
1291        if let Some(metrics) = fetch_metrics {
1292            metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
1293        }
1294
1295        // Builds the parquet reader.
1296        // Now the row selection is None.
1297        ParquetRecordBatchReader::try_new_with_row_groups(
1298            &self.field_levels,
1299            &row_group,
1300            DEFAULT_READ_BATCH_SIZE,
1301            row_selection,
1302        )
1303        .context(ReadParquetSnafu {
1304            path: &self.file_path,
1305        })
1306    }
1307}
1308
1309/// The state of a [ParquetReader].
1310enum ReaderState {
1311    /// The reader is reading a row group.
1312    Readable(PruneReader),
1313    /// The reader is exhausted.
1314    Exhausted(ReaderMetrics),
1315}
1316
1317impl ReaderState {
1318    /// Returns the metrics of the reader.
1319    fn metrics(&self) -> ReaderMetrics {
1320        match self {
1321            ReaderState::Readable(reader) => reader.metrics(),
1322            ReaderState::Exhausted(m) => m.clone(),
1323        }
1324    }
1325}
1326
1327/// The filter to evaluate or the prune result of the default value.
1328pub(crate) enum MaybeFilter {
1329    /// The filter to evaluate.
1330    Filter(SimpleFilterEvaluator),
1331    /// The filter matches the default value.
1332    Matched,
1333    /// The filter is pruned.
1334    Pruned,
1335}
1336
1337/// Context to evaluate the column filter for a parquet file.
1338pub(crate) struct SimpleFilterContext {
1339    /// Filter to evaluate.
1340    filter: MaybeFilter,
1341    /// Id of the column to evaluate.
1342    column_id: ColumnId,
1343    /// Semantic type of the column.
1344    semantic_type: SemanticType,
1345    /// The data type of the column.
1346    data_type: ConcreteDataType,
1347}
1348
1349impl SimpleFilterContext {
1350    /// Creates a context for the `expr`.
1351    ///
1352    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1353    /// expected metadata.
1354    pub(crate) fn new_opt(
1355        sst_meta: &RegionMetadataRef,
1356        expected_meta: Option<&RegionMetadata>,
1357        expr: &Expr,
1358    ) -> Option<Self> {
1359        let filter = SimpleFilterEvaluator::try_new(expr)?;
1360        let (column_metadata, maybe_filter) = match expected_meta {
1361            Some(meta) => {
1362                // Gets the column metadata from the expected metadata.
1363                let column = meta.column_by_name(filter.column_name())?;
1364                // Checks if the column is present in the SST metadata. We still uses the
1365                // column from the expected metadata.
1366                match sst_meta.column_by_id(column.column_id) {
1367                    Some(sst_column) => {
1368                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1369
1370                        (column, MaybeFilter::Filter(filter))
1371                    }
1372                    None => {
1373                        // If the column is not present in the SST metadata, we evaluate the filter
1374                        // against the default value of the column.
1375                        // If we can't evaluate the filter, we return None.
1376                        if pruned_by_default(&filter, column)? {
1377                            (column, MaybeFilter::Pruned)
1378                        } else {
1379                            (column, MaybeFilter::Matched)
1380                        }
1381                    }
1382                }
1383            }
1384            None => {
1385                let column = sst_meta.column_by_name(filter.column_name())?;
1386                (column, MaybeFilter::Filter(filter))
1387            }
1388        };
1389
1390        Some(Self {
1391            filter: maybe_filter,
1392            column_id: column_metadata.column_id,
1393            semantic_type: column_metadata.semantic_type,
1394            data_type: column_metadata.column_schema.data_type.clone(),
1395        })
1396    }
1397
1398    /// Returns the filter to evaluate.
1399    pub(crate) fn filter(&self) -> &MaybeFilter {
1400        &self.filter
1401    }
1402
1403    /// Returns the column id.
1404    pub(crate) fn column_id(&self) -> ColumnId {
1405        self.column_id
1406    }
1407
1408    /// Returns the semantic type of the column.
1409    pub(crate) fn semantic_type(&self) -> SemanticType {
1410        self.semantic_type
1411    }
1412
1413    /// Returns the data type of the column.
1414    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1415        &self.data_type
1416    }
1417}
1418
1419/// Prune a column by its default value.
1420/// Returns false if we can't create the default value or evaluate the filter.
1421fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1422    let value = column.column_schema.create_default().ok().flatten()?;
1423    let scalar_value = value
1424        .try_to_scalar_value(&column.column_schema.data_type)
1425        .ok()?;
1426    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1427    Some(!matches)
1428}
1429
1430/// Parquet batch reader to read our SST format.
1431pub struct ParquetReader {
1432    /// File range context.
1433    context: FileRangeContextRef,
1434    /// Row group selection to read.
1435    selection: RowGroupSelection,
1436    /// Reader of current row group.
1437    reader_state: ReaderState,
1438    /// Metrics for tracking row group fetch operations.
1439    fetch_metrics: ParquetFetchMetrics,
1440}
1441
1442#[async_trait]
1443impl BatchReader for ParquetReader {
1444    #[tracing::instrument(
1445        skip_all,
1446        fields(
1447            region_id = %self.context.reader_builder().file_handle.region_id(),
1448            file_id = %self.context.reader_builder().file_handle.file_id()
1449        )
1450    )]
1451    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1452        let ReaderState::Readable(reader) = &mut self.reader_state else {
1453            return Ok(None);
1454        };
1455
1456        // We don't collect the elapsed time if the reader returns an error.
1457        if let Some(batch) = reader.next_batch().await? {
1458            return Ok(Some(batch));
1459        }
1460
1461        // No more items in current row group, reads next row group.
1462        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1463            let parquet_reader = self
1464                .context
1465                .reader_builder()
1466                .build(
1467                    row_group_idx,
1468                    Some(row_selection),
1469                    Some(&self.fetch_metrics),
1470                )
1471                .await?;
1472
1473            // Resets the parquet reader.
1474            // Compute skip_fields for this row group
1475            let skip_fields = self.context.should_skip_fields(row_group_idx);
1476            reader.reset_source(
1477                Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)),
1478                skip_fields,
1479            );
1480            if let Some(batch) = reader.next_batch().await? {
1481                return Ok(Some(batch));
1482            }
1483        }
1484
1485        // The reader is exhausted.
1486        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1487        Ok(None)
1488    }
1489}
1490
1491impl Drop for ParquetReader {
1492    fn drop(&mut self) {
1493        let metrics = self.reader_state.metrics();
1494        debug!(
1495            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1496            self.context.reader_builder().file_handle.region_id(),
1497            self.context.reader_builder().file_handle.file_id(),
1498            self.context.reader_builder().file_handle.time_range(),
1499            metrics.filter_metrics.rg_total
1500                - metrics.filter_metrics.rg_inverted_filtered
1501                - metrics.filter_metrics.rg_minmax_filtered
1502                - metrics.filter_metrics.rg_fulltext_filtered
1503                - metrics.filter_metrics.rg_bloom_filtered,
1504            metrics.filter_metrics.rg_total,
1505            metrics
1506        );
1507
1508        // Report metrics.
1509        READ_STAGE_ELAPSED
1510            .with_label_values(&["build_parquet_reader"])
1511            .observe(metrics.build_cost.as_secs_f64());
1512        READ_STAGE_ELAPSED
1513            .with_label_values(&["scan_row_groups"])
1514            .observe(metrics.scan_cost.as_secs_f64());
1515        metrics.observe_rows("parquet_reader");
1516        metrics.filter_metrics.observe();
1517    }
1518}
1519
1520impl ParquetReader {
1521    /// Creates a new reader.
1522    #[tracing::instrument(
1523        skip_all,
1524        fields(
1525            region_id = %context.reader_builder().file_handle.region_id(),
1526            file_id = %context.reader_builder().file_handle.file_id()
1527        )
1528    )]
1529    pub(crate) async fn new(
1530        context: FileRangeContextRef,
1531        mut selection: RowGroupSelection,
1532    ) -> Result<Self> {
1533        let fetch_metrics = ParquetFetchMetrics::default();
1534        // No more items in current row group, reads next row group.
1535        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1536            let parquet_reader = context
1537                .reader_builder()
1538                .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
1539                .await?;
1540            // Compute skip_fields once for this row group
1541            let skip_fields = context.should_skip_fields(row_group_idx);
1542            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1543                context.clone(),
1544                RowGroupReader::new(context.clone(), parquet_reader),
1545                skip_fields,
1546            ))
1547        } else {
1548            ReaderState::Exhausted(ReaderMetrics::default())
1549        };
1550
1551        Ok(ParquetReader {
1552            context,
1553            selection,
1554            reader_state,
1555            fetch_metrics,
1556        })
1557    }
1558
1559    /// Returns the metadata of the SST.
1560    pub fn metadata(&self) -> &RegionMetadataRef {
1561        self.context.read_format().metadata()
1562    }
1563
1564    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1565        self.context.reader_builder().parquet_meta.clone()
1566    }
1567}
1568
1569/// RowGroupReaderContext represents the fields that cannot be shared
1570/// between different `RowGroupReader`s.
1571pub(crate) trait RowGroupReaderContext: Send {
1572    fn map_result(
1573        &self,
1574        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1575    ) -> Result<Option<RecordBatch>>;
1576
1577    fn read_format(&self) -> &ReadFormat;
1578}
1579
1580impl RowGroupReaderContext for FileRangeContextRef {
1581    fn map_result(
1582        &self,
1583        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1584    ) -> Result<Option<RecordBatch>> {
1585        result.context(ArrowReaderSnafu {
1586            path: self.file_path(),
1587        })
1588    }
1589
1590    fn read_format(&self) -> &ReadFormat {
1591        self.as_ref().read_format()
1592    }
1593}
1594
1595/// [RowGroupReader] that reads from [FileRange].
1596pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1597
1598impl RowGroupReader {
1599    /// Creates a new reader from file range.
1600    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1601        Self::create(context, reader)
1602    }
1603}
1604
1605/// Reader to read a row group of a parquet file.
1606pub(crate) struct RowGroupReaderBase<T> {
1607    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1608    context: T,
1609    /// Inner parquet reader.
1610    reader: ParquetRecordBatchReader,
1611    /// Buffered batches to return.
1612    batches: VecDeque<Batch>,
1613    /// Local scan metrics.
1614    metrics: ReaderMetrics,
1615    /// Cached sequence array to override sequences.
1616    override_sequence: Option<ArrayRef>,
1617}
1618
1619impl<T> RowGroupReaderBase<T>
1620where
1621    T: RowGroupReaderContext,
1622{
1623    /// Creates a new reader to read the primary key format.
1624    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1625        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1626        let override_sequence = context
1627            .read_format()
1628            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1629        assert!(context.read_format().as_primary_key().is_some());
1630
1631        Self {
1632            context,
1633            reader,
1634            batches: VecDeque::new(),
1635            metrics: ReaderMetrics::default(),
1636            override_sequence,
1637        }
1638    }
1639
1640    /// Gets the metrics.
1641    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1642        &self.metrics
1643    }
1644
1645    /// Gets [ReadFormat] of underlying reader.
1646    pub(crate) fn read_format(&self) -> &ReadFormat {
1647        self.context.read_format()
1648    }
1649
1650    /// Tries to fetch next [RecordBatch] from the reader.
1651    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1652        self.context.map_result(self.reader.next().transpose())
1653    }
1654
1655    /// Returns the next [Batch].
1656    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1657        let scan_start = Instant::now();
1658        if let Some(batch) = self.batches.pop_front() {
1659            self.metrics.num_rows += batch.num_rows();
1660            self.metrics.scan_cost += scan_start.elapsed();
1661            return Ok(Some(batch));
1662        }
1663
1664        // We need to fetch next record batch and convert it to batches.
1665        while self.batches.is_empty() {
1666            let Some(record_batch) = self.fetch_next_record_batch()? else {
1667                self.metrics.scan_cost += scan_start.elapsed();
1668                return Ok(None);
1669            };
1670            self.metrics.num_record_batches += 1;
1671
1672            // Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
1673            self.context
1674                .read_format()
1675                .as_primary_key()
1676                .unwrap()
1677                .convert_record_batch(
1678                    &record_batch,
1679                    self.override_sequence.as_ref(),
1680                    &mut self.batches,
1681                )?;
1682            self.metrics.num_batches += self.batches.len();
1683        }
1684        let batch = self.batches.pop_front();
1685        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1686        self.metrics.scan_cost += scan_start.elapsed();
1687        Ok(batch)
1688    }
1689}
1690
1691#[async_trait::async_trait]
1692impl<T> BatchReader for RowGroupReaderBase<T>
1693where
1694    T: RowGroupReaderContext,
1695{
1696    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1697        self.next_inner()
1698    }
1699}
1700
1701/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
1702pub(crate) struct FlatRowGroupReader {
1703    /// Context for file ranges.
1704    context: FileRangeContextRef,
1705    /// Inner parquet reader.
1706    reader: ParquetRecordBatchReader,
1707    /// Cached sequence array to override sequences.
1708    override_sequence: Option<ArrayRef>,
1709}
1710
1711impl FlatRowGroupReader {
1712    /// Creates a new flat reader from file range.
1713    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1714        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1715        let override_sequence = context
1716            .read_format()
1717            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1718
1719        Self {
1720            context,
1721            reader,
1722            override_sequence,
1723        }
1724    }
1725
1726    /// Returns the next RecordBatch.
1727    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1728        match self.reader.next() {
1729            Some(batch_result) => {
1730                let record_batch = batch_result.context(ArrowReaderSnafu {
1731                    path: self.context.file_path(),
1732                })?;
1733
1734                // Safety: Only flat format use FlatRowGroupReader.
1735                let flat_format = self.context.read_format().as_flat().unwrap();
1736                let record_batch =
1737                    flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
1738                Ok(Some(record_batch))
1739            }
1740            None => Ok(None),
1741        }
1742    }
1743}