mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::array::ArrayRef;
27use datatypes::arrow::error::ArrowError;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::data_type::ConcreteDataType;
30use mito_codec::row_converter::build_primary_key_codec;
31use object_store::ObjectStore;
32use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
33use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use parquet::file::metadata::ParquetMetaData;
35use parquet::format::KeyValue;
36use snafu::{OptionExt, ResultExt};
37use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
38use store_api::region_request::PathType;
39use store_api::storage::ColumnId;
40use table::predicate::Predicate;
41
42use crate::cache::index::result_cache::PredicateKey;
43use crate::cache::CacheStrategy;
44use crate::error::{
45    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
46    ReadParquetSnafu, Result,
47};
48use crate::metrics::{
49    PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
50    READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
51};
52use crate::read::prune::{PruneReader, Source};
53use crate::read::{Batch, BatchReader};
54use crate::sst::file::{FileHandle, FileId};
55use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
56use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
57use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
58use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
59use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
60use crate::sst::parquet::metadata::MetadataLoader;
61use crate::sst::parquet::row_group::InMemoryRowGroup;
62use crate::sst::parquet::row_selection::RowGroupSelection;
63use crate::sst::parquet::stats::RowGroupPruningStats;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
65
66const INDEX_TYPE_FULLTEXT: &str = "fulltext";
67const INDEX_TYPE_INVERTED: &str = "inverted";
68const INDEX_TYPE_BLOOM: &str = "bloom filter";
69
70macro_rules! handle_index_error {
71    ($err:expr, $file_handle:expr, $index_type:expr) => {
72        if cfg!(any(test, feature = "test")) {
73            panic!(
74                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
75                $index_type,
76                $file_handle.region_id(),
77                $file_handle.file_id(),
78                $err
79            );
80        } else {
81            warn!(
82                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
83                $index_type,
84                $file_handle.region_id(),
85                $file_handle.file_id()
86            );
87        }
88    };
89}
90
91/// Parquet SST reader builder.
92pub struct ParquetReaderBuilder {
93    /// SST directory.
94    file_dir: String,
95    /// Path type for generating file paths.
96    path_type: PathType,
97    file_handle: FileHandle,
98    object_store: ObjectStore,
99    /// Predicate to push down.
100    predicate: Option<Predicate>,
101    /// Metadata of columns to read.
102    ///
103    /// `None` reads all columns. Due to schema change, the projection
104    /// can contain columns not in the parquet file.
105    projection: Option<Vec<ColumnId>>,
106    /// Strategy to cache SST data.
107    cache_strategy: CacheStrategy,
108    /// Index appliers.
109    inverted_index_applier: Option<InvertedIndexApplierRef>,
110    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
111    fulltext_index_applier: Option<FulltextIndexApplierRef>,
112    /// Expected metadata of the region while reading the SST.
113    /// This is usually the latest metadata of the region. The reader use
114    /// it get the correct column id of a column by name.
115    expected_metadata: Option<RegionMetadataRef>,
116}
117
118impl ParquetReaderBuilder {
119    /// Returns a new [ParquetReaderBuilder] to read specific SST.
120    pub fn new(
121        file_dir: String,
122        path_type: PathType,
123        file_handle: FileHandle,
124        object_store: ObjectStore,
125    ) -> ParquetReaderBuilder {
126        ParquetReaderBuilder {
127            file_dir,
128            path_type,
129            file_handle,
130            object_store,
131            predicate: None,
132            projection: None,
133            cache_strategy: CacheStrategy::Disabled,
134            inverted_index_applier: None,
135            bloom_filter_index_applier: None,
136            fulltext_index_applier: None,
137            expected_metadata: None,
138        }
139    }
140
141    /// Attaches the predicate to the builder.
142    #[must_use]
143    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
144        self.predicate = predicate;
145        self
146    }
147
148    /// Attaches the projection to the builder.
149    ///
150    /// The reader only applies the projection to fields.
151    #[must_use]
152    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
153        self.projection = projection;
154        self
155    }
156
157    /// Attaches the cache to the builder.
158    #[must_use]
159    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
160        self.cache_strategy = cache;
161        self
162    }
163
164    /// Attaches the inverted index applier to the builder.
165    #[must_use]
166    pub(crate) fn inverted_index_applier(
167        mut self,
168        index_applier: Option<InvertedIndexApplierRef>,
169    ) -> Self {
170        self.inverted_index_applier = index_applier;
171        self
172    }
173
174    /// Attaches the bloom filter index applier to the builder.
175    #[must_use]
176    pub(crate) fn bloom_filter_index_applier(
177        mut self,
178        index_applier: Option<BloomFilterIndexApplierRef>,
179    ) -> Self {
180        self.bloom_filter_index_applier = index_applier;
181        self
182    }
183
184    /// Attaches the fulltext index applier to the builder.
185    #[must_use]
186    pub(crate) fn fulltext_index_applier(
187        mut self,
188        index_applier: Option<FulltextIndexApplierRef>,
189    ) -> Self {
190        self.fulltext_index_applier = index_applier;
191        self
192    }
193
194    /// Attaches the expected metadata to the builder.
195    #[must_use]
196    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
197        self.expected_metadata = expected_metadata;
198        self
199    }
200
201    /// Builds a [ParquetReader].
202    ///
203    /// This needs to perform IO operation.
204    pub async fn build(&self) -> Result<ParquetReader> {
205        let mut metrics = ReaderMetrics::default();
206
207        let (context, selection) = self.build_reader_input(&mut metrics).await?;
208        ParquetReader::new(Arc::new(context), selection).await
209    }
210
211    /// Builds a [FileRangeContext] and collects row groups to read.
212    ///
213    /// This needs to perform IO operation.
214    pub(crate) async fn build_reader_input(
215        &self,
216        metrics: &mut ReaderMetrics,
217    ) -> Result<(FileRangeContext, RowGroupSelection)> {
218        let start = Instant::now();
219
220        let file_path = self.file_handle.file_path(&self.file_dir, self.path_type);
221        let file_size = self.file_handle.meta_ref().file_size;
222
223        // Loads parquet metadata of the file.
224        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
225        // Decodes region metadata.
226        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
227        // Gets the metadata stored in the SST.
228        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
229        let mut read_format = if let Some(column_ids) = &self.projection {
230            ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
231        } else {
232            // Lists all column ids to read, we always use the expected metadata if possible.
233            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
234            ReadFormat::new(
235                region_meta.clone(),
236                expected_meta
237                    .column_metadatas
238                    .iter()
239                    .map(|col| col.column_id),
240            )
241        };
242        if need_override_sequence(&parquet_meta) {
243            read_format
244                .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
245        }
246
247        // Computes the projection mask.
248        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
249        let indices = read_format.projection_indices();
250        // Now we assumes we don't have nested schemas.
251        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
252        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
253
254        // Computes the field levels.
255        let hint = Some(read_format.arrow_schema().fields());
256        let field_levels =
257            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
258                .context(ReadDataPartSnafu)?;
259        let selection = self
260            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
261            .await;
262
263        let reader_builder = RowGroupReaderBuilder {
264            file_handle: self.file_handle.clone(),
265            file_path,
266            parquet_meta,
267            object_store: self.object_store.clone(),
268            projection: projection_mask,
269            field_levels,
270            cache_strategy: self.cache_strategy.clone(),
271        };
272
273        let filters = if let Some(predicate) = &self.predicate {
274            predicate
275                .exprs()
276                .iter()
277                .filter_map(|expr| {
278                    SimpleFilterContext::new_opt(
279                        &region_meta,
280                        self.expected_metadata.as_deref(),
281                        expr,
282                    )
283                })
284                .collect::<Vec<_>>()
285        } else {
286            vec![]
287        };
288
289        let codec = build_primary_key_codec(read_format.metadata());
290
291        let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
292
293        metrics.build_cost += start.elapsed();
294
295        Ok((context, selection))
296    }
297
298    /// Decodes region metadata from key value.
299    fn get_region_metadata(
300        file_path: &str,
301        key_value_meta: Option<&Vec<KeyValue>>,
302    ) -> Result<RegionMetadata> {
303        let key_values = key_value_meta.context(InvalidParquetSnafu {
304            file: file_path,
305            reason: "missing key value meta",
306        })?;
307        let meta_value = key_values
308            .iter()
309            .find(|kv| kv.key == PARQUET_METADATA_KEY)
310            .with_context(|| InvalidParquetSnafu {
311                file: file_path,
312                reason: format!("key {} not found", PARQUET_METADATA_KEY),
313            })?;
314        let json = meta_value
315            .value
316            .as_ref()
317            .with_context(|| InvalidParquetSnafu {
318                file: file_path,
319                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
320            })?;
321
322        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
323    }
324
325    /// Reads parquet metadata of specific file.
326    async fn read_parquet_metadata(
327        &self,
328        file_path: &str,
329        file_size: u64,
330    ) -> Result<Arc<ParquetMetaData>> {
331        let _t = READ_STAGE_ELAPSED
332            .with_label_values(&["read_parquet_metadata"])
333            .start_timer();
334
335        let file_id = self.file_handle.file_id();
336        // Tries to get from global cache.
337        if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
338            return Ok(metadata);
339        }
340
341        // Cache miss, load metadata directly.
342        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
343        let metadata = metadata_loader.load().await?;
344        let metadata = Arc::new(metadata);
345        // Cache the metadata.
346        self.cache_strategy
347            .put_parquet_meta_data(file_id, metadata.clone());
348
349        Ok(metadata)
350    }
351
352    /// Computes row groups to read, along with their respective row selections.
353    async fn row_groups_to_read(
354        &self,
355        read_format: &ReadFormat,
356        parquet_meta: &ParquetMetaData,
357        metrics: &mut ReaderFilterMetrics,
358    ) -> RowGroupSelection {
359        let num_row_groups = parquet_meta.num_row_groups();
360        let num_rows = parquet_meta.file_metadata().num_rows();
361        if num_row_groups == 0 || num_rows == 0 {
362            return RowGroupSelection::default();
363        }
364
365        // Let's assume that the number of rows in the first row group
366        // can represent the `row_group_size` of the Parquet file.
367        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
368        if row_group_size == 0 {
369            return RowGroupSelection::default();
370        }
371
372        metrics.rg_total += num_row_groups;
373        metrics.rows_total += num_rows as usize;
374
375        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
376
377        self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
378        if output.is_empty() {
379            return output;
380        }
381
382        let fulltext_filtered = self
383            .prune_row_groups_by_fulltext_index(
384                row_group_size,
385                num_row_groups,
386                &mut output,
387                metrics,
388            )
389            .await;
390        if output.is_empty() {
391            return output;
392        }
393
394        self.prune_row_groups_by_inverted_index(
395            row_group_size,
396            num_row_groups,
397            &mut output,
398            metrics,
399        )
400        .await;
401        if output.is_empty() {
402            return output;
403        }
404
405        self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
406            .await;
407        if output.is_empty() {
408            return output;
409        }
410
411        if !fulltext_filtered {
412            self.prune_row_groups_by_fulltext_bloom(
413                row_group_size,
414                parquet_meta,
415                &mut output,
416                metrics,
417            )
418            .await;
419        }
420        output
421    }
422
423    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
424    async fn prune_row_groups_by_fulltext_index(
425        &self,
426        row_group_size: usize,
427        num_row_groups: usize,
428        output: &mut RowGroupSelection,
429        metrics: &mut ReaderFilterMetrics,
430    ) -> bool {
431        let Some(index_applier) = &self.fulltext_index_applier else {
432            return false;
433        };
434        if !self.file_handle.meta_ref().fulltext_index_available() {
435            return false;
436        }
437
438        let predicate_key = index_applier.predicate_key();
439        // Fast path: return early if the result is in the cache.
440        let cached = self
441            .cache_strategy
442            .index_result_cache()
443            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
444        if let Some(result) = cached.as_ref() {
445            if all_required_row_groups_searched(output, result) {
446                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
447                return true;
448            }
449        }
450
451        // Slow path: apply the index from the file.
452        let file_size_hint = self.file_handle.meta_ref().index_file_size();
453        let apply_res = index_applier
454            .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
455            .await;
456        let selection = match apply_res {
457            Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
458            Ok(None) => return false,
459            Err(err) => {
460                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
461                return false;
462            }
463        };
464
465        self.apply_index_result_and_update_cache(
466            predicate_key,
467            self.file_handle.file_id().file_id(),
468            selection,
469            output,
470            metrics,
471            INDEX_TYPE_FULLTEXT,
472        );
473        true
474    }
475
476    /// Applies index to prune row groups.
477    ///
478    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
479    /// as an escape route in case of index issues, and it can be used to test
480    /// the correctness of the index.
481    async fn prune_row_groups_by_inverted_index(
482        &self,
483        row_group_size: usize,
484        num_row_groups: usize,
485        output: &mut RowGroupSelection,
486        metrics: &mut ReaderFilterMetrics,
487    ) -> bool {
488        let Some(index_applier) = &self.inverted_index_applier else {
489            return false;
490        };
491        if !self.file_handle.meta_ref().inverted_index_available() {
492            return false;
493        }
494
495        let predicate_key = index_applier.predicate_key();
496        // Fast path: return early if the result is in the cache.
497        let cached = self
498            .cache_strategy
499            .index_result_cache()
500            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
501        if let Some(result) = cached.as_ref() {
502            if all_required_row_groups_searched(output, result) {
503                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
504                return true;
505            }
506        }
507
508        // Slow path: apply the index from the file.
509        let file_size_hint = self.file_handle.meta_ref().index_file_size();
510        let apply_res = index_applier
511            .apply(self.file_handle.file_id(), Some(file_size_hint))
512            .await;
513        let selection = match apply_res {
514            Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
515                row_group_size,
516                num_row_groups,
517                output,
518            ),
519            Err(err) => {
520                handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
521                return false;
522            }
523        };
524
525        self.apply_index_result_and_update_cache(
526            predicate_key,
527            self.file_handle.file_id().file_id(),
528            selection,
529            output,
530            metrics,
531            INDEX_TYPE_INVERTED,
532        );
533        true
534    }
535
536    async fn prune_row_groups_by_bloom_filter(
537        &self,
538        row_group_size: usize,
539        parquet_meta: &ParquetMetaData,
540        output: &mut RowGroupSelection,
541        metrics: &mut ReaderFilterMetrics,
542    ) -> bool {
543        let Some(index_applier) = &self.bloom_filter_index_applier else {
544            return false;
545        };
546        if !self.file_handle.meta_ref().bloom_filter_index_available() {
547            return false;
548        }
549
550        let predicate_key = index_applier.predicate_key();
551        // Fast path: return early if the result is in the cache.
552        let cached = self
553            .cache_strategy
554            .index_result_cache()
555            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
556        if let Some(result) = cached.as_ref() {
557            if all_required_row_groups_searched(output, result) {
558                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
559                return true;
560            }
561        }
562
563        // Slow path: apply the index from the file.
564        let file_size_hint = self.file_handle.meta_ref().index_file_size();
565        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
566            (
567                rg.num_rows() as usize,
568                // Optimize: only search the row group that required by `output` and not stored in `cached`.
569                output.contains_non_empty_row_group(i)
570                    && cached
571                        .as_ref()
572                        .map(|c| !c.contains_row_group(i))
573                        .unwrap_or(true),
574            )
575        });
576        let apply_res = index_applier
577            .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
578            .await;
579        let mut selection = match apply_res {
580            Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
581            Err(err) => {
582                handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
583                return false;
584            }
585        };
586
587        // New searched row groups are added to `selection`, concat them with `cached`.
588        if let Some(cached) = cached.as_ref() {
589            selection.concat(cached);
590        }
591
592        self.apply_index_result_and_update_cache(
593            predicate_key,
594            self.file_handle.file_id().file_id(),
595            selection,
596            output,
597            metrics,
598            INDEX_TYPE_BLOOM,
599        );
600        true
601    }
602
603    async fn prune_row_groups_by_fulltext_bloom(
604        &self,
605        row_group_size: usize,
606        parquet_meta: &ParquetMetaData,
607        output: &mut RowGroupSelection,
608        metrics: &mut ReaderFilterMetrics,
609    ) -> bool {
610        let Some(index_applier) = &self.fulltext_index_applier else {
611            return false;
612        };
613        if !self.file_handle.meta_ref().fulltext_index_available() {
614            return false;
615        }
616
617        let predicate_key = index_applier.predicate_key();
618        // Fast path: return early if the result is in the cache.
619        let cached = self
620            .cache_strategy
621            .index_result_cache()
622            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id().file_id()));
623        if let Some(result) = cached.as_ref() {
624            if all_required_row_groups_searched(output, result) {
625                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
626                return true;
627            }
628        }
629
630        // Slow path: apply the index from the file.
631        let file_size_hint = self.file_handle.meta_ref().index_file_size();
632        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
633            (
634                rg.num_rows() as usize,
635                // Optimize: only search the row group that required by `output` and not stored in `cached`.
636                output.contains_non_empty_row_group(i)
637                    && cached
638                        .as_ref()
639                        .map(|c| !c.contains_row_group(i))
640                        .unwrap_or(true),
641            )
642        });
643        let apply_res = index_applier
644            .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
645            .await;
646        let mut selection = match apply_res {
647            Ok(Some(apply_output)) => {
648                RowGroupSelection::from_row_ranges(apply_output, row_group_size)
649            }
650            Ok(None) => return false,
651            Err(err) => {
652                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
653                return false;
654            }
655        };
656
657        // New searched row groups are added to `selection`, concat them with `cached`.
658        if let Some(cached) = cached.as_ref() {
659            selection.concat(cached);
660        }
661
662        self.apply_index_result_and_update_cache(
663            predicate_key,
664            self.file_handle.file_id().file_id(),
665            selection,
666            output,
667            metrics,
668            INDEX_TYPE_FULLTEXT,
669        );
670        true
671    }
672
673    /// Prunes row groups by min-max index.
674    fn prune_row_groups_by_minmax(
675        &self,
676        read_format: &ReadFormat,
677        parquet_meta: &ParquetMetaData,
678        output: &mut RowGroupSelection,
679        metrics: &mut ReaderFilterMetrics,
680    ) -> bool {
681        let Some(predicate) = &self.predicate else {
682            return false;
683        };
684
685        let row_groups_before = output.row_group_count();
686
687        let region_meta = read_format.metadata();
688        let row_groups = parquet_meta.row_groups();
689        let stats =
690            RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
691        let prune_schema = self
692            .expected_metadata
693            .as_ref()
694            .map(|meta| meta.schema.arrow_schema())
695            .unwrap_or_else(|| region_meta.schema.arrow_schema());
696
697        // Here we use the schema of the SST to build the physical expression. If the column
698        // in the SST doesn't have the same column id as the column in the expected metadata,
699        // we will get a None statistics for that column.
700        predicate
701            .prune_with_stats(&stats, prune_schema)
702            .iter()
703            .zip(0..parquet_meta.num_row_groups())
704            .for_each(|(mask, row_group)| {
705                if !*mask {
706                    output.remove_row_group(row_group);
707                }
708            });
709
710        let row_groups_after = output.row_group_count();
711        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
712
713        true
714    }
715
716    fn apply_index_result_and_update_cache(
717        &self,
718        predicate_key: &PredicateKey,
719        file_id: FileId,
720        result: RowGroupSelection,
721        output: &mut RowGroupSelection,
722        metrics: &mut ReaderFilterMetrics,
723        index_type: &str,
724    ) {
725        apply_selection_and_update_metrics(output, &result, metrics, index_type);
726
727        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
728            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
729        }
730    }
731}
732
733fn apply_selection_and_update_metrics(
734    output: &mut RowGroupSelection,
735    result: &RowGroupSelection,
736    metrics: &mut ReaderFilterMetrics,
737    index_type: &str,
738) {
739    let intersection = output.intersect(result);
740
741    let row_group_count = output.row_group_count() - intersection.row_group_count();
742    let row_count = output.row_count() - intersection.row_count();
743
744    metrics.update_index_metrics(index_type, row_group_count, row_count);
745
746    *output = intersection;
747}
748
749fn all_required_row_groups_searched(
750    required_row_groups: &RowGroupSelection,
751    cached_row_groups: &RowGroupSelection,
752) -> bool {
753    required_row_groups.iter().all(|(rg_id, _)| {
754        // Row group with no rows is not required to search.
755        !required_row_groups.contains_non_empty_row_group(*rg_id)
756            // The row group is already searched.
757            || cached_row_groups.contains_row_group(*rg_id)
758    })
759}
760
761/// Metrics of filtering rows groups and rows.
762#[derive(Debug, Default, Clone, Copy)]
763pub(crate) struct ReaderFilterMetrics {
764    /// Number of row groups before filtering.
765    pub(crate) rg_total: usize,
766    /// Number of row groups filtered by fulltext index.
767    pub(crate) rg_fulltext_filtered: usize,
768    /// Number of row groups filtered by inverted index.
769    pub(crate) rg_inverted_filtered: usize,
770    /// Number of row groups filtered by min-max index.
771    pub(crate) rg_minmax_filtered: usize,
772    /// Number of row groups filtered by bloom filter index.
773    pub(crate) rg_bloom_filtered: usize,
774
775    /// Number of rows in row group before filtering.
776    pub(crate) rows_total: usize,
777    /// Number of rows in row group filtered by fulltext index.
778    pub(crate) rows_fulltext_filtered: usize,
779    /// Number of rows in row group filtered by inverted index.
780    pub(crate) rows_inverted_filtered: usize,
781    /// Number of rows in row group filtered by bloom filter index.
782    pub(crate) rows_bloom_filtered: usize,
783    /// Number of rows filtered by precise filter.
784    pub(crate) rows_precise_filtered: usize,
785}
786
787impl ReaderFilterMetrics {
788    /// Adds `other` metrics to this metrics.
789    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
790        self.rg_total += other.rg_total;
791        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
792        self.rg_inverted_filtered += other.rg_inverted_filtered;
793        self.rg_minmax_filtered += other.rg_minmax_filtered;
794        self.rg_bloom_filtered += other.rg_bloom_filtered;
795
796        self.rows_total += other.rows_total;
797        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
798        self.rows_inverted_filtered += other.rows_inverted_filtered;
799        self.rows_bloom_filtered += other.rows_bloom_filtered;
800        self.rows_precise_filtered += other.rows_precise_filtered;
801    }
802
803    /// Reports metrics.
804    pub(crate) fn observe(&self) {
805        READ_ROW_GROUPS_TOTAL
806            .with_label_values(&["before_filtering"])
807            .inc_by(self.rg_total as u64);
808        READ_ROW_GROUPS_TOTAL
809            .with_label_values(&["fulltext_index_filtered"])
810            .inc_by(self.rg_fulltext_filtered as u64);
811        READ_ROW_GROUPS_TOTAL
812            .with_label_values(&["inverted_index_filtered"])
813            .inc_by(self.rg_inverted_filtered as u64);
814        READ_ROW_GROUPS_TOTAL
815            .with_label_values(&["minmax_index_filtered"])
816            .inc_by(self.rg_minmax_filtered as u64);
817        READ_ROW_GROUPS_TOTAL
818            .with_label_values(&["bloom_filter_index_filtered"])
819            .inc_by(self.rg_bloom_filtered as u64);
820
821        PRECISE_FILTER_ROWS_TOTAL
822            .with_label_values(&["parquet"])
823            .inc_by(self.rows_precise_filtered as u64);
824        READ_ROWS_IN_ROW_GROUP_TOTAL
825            .with_label_values(&["before_filtering"])
826            .inc_by(self.rows_total as u64);
827        READ_ROWS_IN_ROW_GROUP_TOTAL
828            .with_label_values(&["fulltext_index_filtered"])
829            .inc_by(self.rows_fulltext_filtered as u64);
830        READ_ROWS_IN_ROW_GROUP_TOTAL
831            .with_label_values(&["inverted_index_filtered"])
832            .inc_by(self.rows_inverted_filtered as u64);
833        READ_ROWS_IN_ROW_GROUP_TOTAL
834            .with_label_values(&["bloom_filter_index_filtered"])
835            .inc_by(self.rows_bloom_filtered as u64);
836    }
837
838    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
839        match index_type {
840            INDEX_TYPE_FULLTEXT => {
841                self.rg_fulltext_filtered += row_group_count;
842                self.rows_fulltext_filtered += row_count;
843            }
844            INDEX_TYPE_INVERTED => {
845                self.rg_inverted_filtered += row_group_count;
846                self.rows_inverted_filtered += row_count;
847            }
848            INDEX_TYPE_BLOOM => {
849                self.rg_bloom_filtered += row_group_count;
850                self.rows_bloom_filtered += row_count;
851            }
852            _ => {}
853        }
854    }
855}
856
857/// Parquet reader metrics.
858#[derive(Debug, Default, Clone)]
859pub struct ReaderMetrics {
860    /// Filtered row groups and rows metrics.
861    pub(crate) filter_metrics: ReaderFilterMetrics,
862    /// Duration to build the parquet reader.
863    pub(crate) build_cost: Duration,
864    /// Duration to scan the reader.
865    pub(crate) scan_cost: Duration,
866    /// Number of record batches read.
867    pub(crate) num_record_batches: usize,
868    /// Number of batches decoded.
869    pub(crate) num_batches: usize,
870    /// Number of rows read.
871    pub(crate) num_rows: usize,
872}
873
874impl ReaderMetrics {
875    /// Adds `other` metrics to this metrics.
876    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
877        self.filter_metrics.merge_from(&other.filter_metrics);
878        self.build_cost += other.build_cost;
879        self.scan_cost += other.scan_cost;
880        self.num_record_batches += other.num_record_batches;
881        self.num_batches += other.num_batches;
882        self.num_rows += other.num_rows;
883    }
884
885    /// Reports total rows.
886    pub(crate) fn observe_rows(&self, read_type: &str) {
887        READ_ROWS_TOTAL
888            .with_label_values(&[read_type])
889            .inc_by(self.num_rows as u64);
890    }
891}
892
893/// Builder to build a [ParquetRecordBatchReader] for a row group.
894pub(crate) struct RowGroupReaderBuilder {
895    /// SST file to read.
896    ///
897    /// Holds the file handle to avoid the file purge it.
898    file_handle: FileHandle,
899    /// Path of the file.
900    file_path: String,
901    /// Metadata of the parquet file.
902    parquet_meta: Arc<ParquetMetaData>,
903    /// Object store as an Operator.
904    object_store: ObjectStore,
905    /// Projection mask.
906    projection: ProjectionMask,
907    /// Field levels to read.
908    field_levels: FieldLevels,
909    /// Cache.
910    cache_strategy: CacheStrategy,
911}
912
913impl RowGroupReaderBuilder {
914    /// Path of the file to read.
915    pub(crate) fn file_path(&self) -> &str {
916        &self.file_path
917    }
918
919    /// Handle of the file to read.
920    pub(crate) fn file_handle(&self) -> &FileHandle {
921        &self.file_handle
922    }
923
924    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
925        &self.parquet_meta
926    }
927
928    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
929        &self.cache_strategy
930    }
931
932    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
933    pub(crate) async fn build(
934        &self,
935        row_group_idx: usize,
936        row_selection: Option<RowSelection>,
937    ) -> Result<ParquetRecordBatchReader> {
938        let mut row_group = InMemoryRowGroup::create(
939            self.file_handle.region_id(),
940            self.file_handle.file_id().file_id(),
941            &self.parquet_meta,
942            row_group_idx,
943            self.cache_strategy.clone(),
944            &self.file_path,
945            self.object_store.clone(),
946        );
947        // Fetches data into memory.
948        row_group
949            .fetch(&self.projection, row_selection.as_ref())
950            .await
951            .context(ReadParquetSnafu {
952                path: &self.file_path,
953            })?;
954
955        // Builds the parquet reader.
956        // Now the row selection is None.
957        ParquetRecordBatchReader::try_new_with_row_groups(
958            &self.field_levels,
959            &row_group,
960            DEFAULT_READ_BATCH_SIZE,
961            row_selection,
962        )
963        .context(ReadParquetSnafu {
964            path: &self.file_path,
965        })
966    }
967}
968
969/// The state of a [ParquetReader].
970enum ReaderState {
971    /// The reader is reading a row group.
972    Readable(PruneReader),
973    /// The reader is exhausted.
974    Exhausted(ReaderMetrics),
975}
976
977impl ReaderState {
978    /// Returns the metrics of the reader.
979    fn metrics(&self) -> ReaderMetrics {
980        match self {
981            ReaderState::Readable(reader) => reader.metrics(),
982            ReaderState::Exhausted(m) => m.clone(),
983        }
984    }
985}
986
987/// The filter to evaluate or the prune result of the default value.
988pub(crate) enum MaybeFilter {
989    /// The filter to evaluate.
990    Filter(SimpleFilterEvaluator),
991    /// The filter matches the default value.
992    Matched,
993    /// The filter is pruned.
994    Pruned,
995}
996
997/// Context to evaluate the column filter for a parquet file.
998pub(crate) struct SimpleFilterContext {
999    /// Filter to evaluate.
1000    filter: MaybeFilter,
1001    /// Id of the column to evaluate.
1002    column_id: ColumnId,
1003    /// Semantic type of the column.
1004    semantic_type: SemanticType,
1005    /// The data type of the column.
1006    data_type: ConcreteDataType,
1007}
1008
1009impl SimpleFilterContext {
1010    /// Creates a context for the `expr`.
1011    ///
1012    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1013    /// expected metadata.
1014    pub(crate) fn new_opt(
1015        sst_meta: &RegionMetadataRef,
1016        expected_meta: Option<&RegionMetadata>,
1017        expr: &Expr,
1018    ) -> Option<Self> {
1019        let filter = SimpleFilterEvaluator::try_new(expr)?;
1020        let (column_metadata, maybe_filter) = match expected_meta {
1021            Some(meta) => {
1022                // Gets the column metadata from the expected metadata.
1023                let column = meta.column_by_name(filter.column_name())?;
1024                // Checks if the column is present in the SST metadata. We still uses the
1025                // column from the expected metadata.
1026                match sst_meta.column_by_id(column.column_id) {
1027                    Some(sst_column) => {
1028                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1029
1030                        (column, MaybeFilter::Filter(filter))
1031                    }
1032                    None => {
1033                        // If the column is not present in the SST metadata, we evaluate the filter
1034                        // against the default value of the column.
1035                        // If we can't evaluate the filter, we return None.
1036                        if pruned_by_default(&filter, column)? {
1037                            (column, MaybeFilter::Pruned)
1038                        } else {
1039                            (column, MaybeFilter::Matched)
1040                        }
1041                    }
1042                }
1043            }
1044            None => {
1045                let column = sst_meta.column_by_name(filter.column_name())?;
1046                (column, MaybeFilter::Filter(filter))
1047            }
1048        };
1049
1050        Some(Self {
1051            filter: maybe_filter,
1052            column_id: column_metadata.column_id,
1053            semantic_type: column_metadata.semantic_type,
1054            data_type: column_metadata.column_schema.data_type.clone(),
1055        })
1056    }
1057
1058    /// Returns the filter to evaluate.
1059    pub(crate) fn filter(&self) -> &MaybeFilter {
1060        &self.filter
1061    }
1062
1063    /// Returns the column id.
1064    pub(crate) fn column_id(&self) -> ColumnId {
1065        self.column_id
1066    }
1067
1068    /// Returns the semantic type of the column.
1069    pub(crate) fn semantic_type(&self) -> SemanticType {
1070        self.semantic_type
1071    }
1072
1073    /// Returns the data type of the column.
1074    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1075        &self.data_type
1076    }
1077}
1078
1079/// Prune a column by its default value.
1080/// Returns false if we can't create the default value or evaluate the filter.
1081fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1082    let value = column.column_schema.create_default().ok().flatten()?;
1083    let scalar_value = value
1084        .try_to_scalar_value(&column.column_schema.data_type)
1085        .ok()?;
1086    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1087    Some(!matches)
1088}
1089
1090/// Parquet batch reader to read our SST format.
1091pub struct ParquetReader {
1092    /// File range context.
1093    context: FileRangeContextRef,
1094    /// Row group selection to read.
1095    selection: RowGroupSelection,
1096    /// Reader of current row group.
1097    reader_state: ReaderState,
1098}
1099
1100#[async_trait]
1101impl BatchReader for ParquetReader {
1102    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1103        let ReaderState::Readable(reader) = &mut self.reader_state else {
1104            return Ok(None);
1105        };
1106
1107        // We don't collect the elapsed time if the reader returns an error.
1108        if let Some(batch) = reader.next_batch().await? {
1109            return Ok(Some(batch));
1110        }
1111
1112        // No more items in current row group, reads next row group.
1113        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1114            let parquet_reader = self
1115                .context
1116                .reader_builder()
1117                .build(row_group_idx, Some(row_selection))
1118                .await?;
1119
1120            // Resets the parquet reader.
1121            reader.reset_source(Source::RowGroup(RowGroupReader::new(
1122                self.context.clone(),
1123                parquet_reader,
1124            )));
1125            if let Some(batch) = reader.next_batch().await? {
1126                return Ok(Some(batch));
1127            }
1128        }
1129
1130        // The reader is exhausted.
1131        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1132        Ok(None)
1133    }
1134}
1135
1136impl Drop for ParquetReader {
1137    fn drop(&mut self) {
1138        let metrics = self.reader_state.metrics();
1139        debug!(
1140            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1141            self.context.reader_builder().file_handle.region_id(),
1142            self.context.reader_builder().file_handle.file_id(),
1143            self.context.reader_builder().file_handle.time_range(),
1144            metrics.filter_metrics.rg_total
1145                - metrics.filter_metrics.rg_inverted_filtered
1146                - metrics.filter_metrics.rg_minmax_filtered
1147                - metrics.filter_metrics.rg_fulltext_filtered
1148                - metrics.filter_metrics.rg_bloom_filtered,
1149            metrics.filter_metrics.rg_total,
1150            metrics
1151        );
1152
1153        // Report metrics.
1154        READ_STAGE_ELAPSED
1155            .with_label_values(&["build_parquet_reader"])
1156            .observe(metrics.build_cost.as_secs_f64());
1157        READ_STAGE_ELAPSED
1158            .with_label_values(&["scan_row_groups"])
1159            .observe(metrics.scan_cost.as_secs_f64());
1160        metrics.observe_rows("parquet_reader");
1161        metrics.filter_metrics.observe();
1162    }
1163}
1164
1165impl ParquetReader {
1166    /// Creates a new reader.
1167    pub(crate) async fn new(
1168        context: FileRangeContextRef,
1169        mut selection: RowGroupSelection,
1170    ) -> Result<Self> {
1171        // No more items in current row group, reads next row group.
1172        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1173            let parquet_reader = context
1174                .reader_builder()
1175                .build(row_group_idx, Some(row_selection))
1176                .await?;
1177            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1178                context.clone(),
1179                RowGroupReader::new(context.clone(), parquet_reader),
1180            ))
1181        } else {
1182            ReaderState::Exhausted(ReaderMetrics::default())
1183        };
1184
1185        Ok(ParquetReader {
1186            context,
1187            selection,
1188            reader_state,
1189        })
1190    }
1191
1192    /// Returns the metadata of the SST.
1193    pub fn metadata(&self) -> &RegionMetadataRef {
1194        self.context.read_format().metadata()
1195    }
1196
1197    #[cfg(test)]
1198    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1199        self.context.reader_builder().parquet_meta.clone()
1200    }
1201}
1202
1203/// RowGroupReaderContext represents the fields that cannot be shared
1204/// between different `RowGroupReader`s.
1205pub(crate) trait RowGroupReaderContext: Send {
1206    fn map_result(
1207        &self,
1208        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1209    ) -> Result<Option<RecordBatch>>;
1210
1211    fn read_format(&self) -> &ReadFormat;
1212}
1213
1214impl RowGroupReaderContext for FileRangeContextRef {
1215    fn map_result(
1216        &self,
1217        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1218    ) -> Result<Option<RecordBatch>> {
1219        result.context(ArrowReaderSnafu {
1220            path: self.file_path(),
1221        })
1222    }
1223
1224    fn read_format(&self) -> &ReadFormat {
1225        self.as_ref().read_format()
1226    }
1227}
1228
1229/// [RowGroupReader] that reads from [FileRange].
1230pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1231
1232impl RowGroupReader {
1233    /// Creates a new reader from file range.
1234    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1235        Self::create(context, reader)
1236    }
1237}
1238
1239/// Reader to read a row group of a parquet file.
1240pub(crate) struct RowGroupReaderBase<T> {
1241    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1242    context: T,
1243    /// Inner parquet reader.
1244    reader: ParquetRecordBatchReader,
1245    /// Buffered batches to return.
1246    batches: VecDeque<Batch>,
1247    /// Local scan metrics.
1248    metrics: ReaderMetrics,
1249    /// Cached sequence array to override sequences.
1250    override_sequence: Option<ArrayRef>,
1251}
1252
1253impl<T> RowGroupReaderBase<T>
1254where
1255    T: RowGroupReaderContext,
1256{
1257    /// Creates a new reader.
1258    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1259        // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
1260        let override_sequence = context
1261            .read_format()
1262            .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
1263        Self {
1264            context,
1265            reader,
1266            batches: VecDeque::new(),
1267            metrics: ReaderMetrics::default(),
1268            override_sequence,
1269        }
1270    }
1271
1272    /// Gets the metrics.
1273    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1274        &self.metrics
1275    }
1276
1277    /// Gets [ReadFormat] of underlying reader.
1278    pub(crate) fn read_format(&self) -> &ReadFormat {
1279        self.context.read_format()
1280    }
1281
1282    /// Tries to fetch next [RecordBatch] from the reader.
1283    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1284        self.context.map_result(self.reader.next().transpose())
1285    }
1286
1287    /// Returns the next [Batch].
1288    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1289        let scan_start = Instant::now();
1290        if let Some(batch) = self.batches.pop_front() {
1291            self.metrics.num_rows += batch.num_rows();
1292            self.metrics.scan_cost += scan_start.elapsed();
1293            return Ok(Some(batch));
1294        }
1295
1296        // We need to fetch next record batch and convert it to batches.
1297        while self.batches.is_empty() {
1298            let Some(record_batch) = self.fetch_next_record_batch()? else {
1299                self.metrics.scan_cost += scan_start.elapsed();
1300                return Ok(None);
1301            };
1302            self.metrics.num_record_batches += 1;
1303
1304            self.context.read_format().convert_record_batch(
1305                &record_batch,
1306                self.override_sequence.as_ref(),
1307                &mut self.batches,
1308            )?;
1309            self.metrics.num_batches += self.batches.len();
1310        }
1311        let batch = self.batches.pop_front();
1312        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1313        self.metrics.scan_cost += scan_start.elapsed();
1314        Ok(batch)
1315    }
1316}
1317
1318#[async_trait::async_trait]
1319impl<T> BatchReader for RowGroupReaderBase<T>
1320where
1321    T: RowGroupReaderContext,
1322{
1323    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1324        self.next_inner()
1325    }
1326}