mito2/sst/parquet/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet reader.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::SemanticType;
22use async_trait::async_trait;
23use common_recordbatch::filter::SimpleFilterEvaluator;
24use common_telemetry::{debug, warn};
25use datafusion_expr::Expr;
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use mito_codec::row_converter::build_primary_key_codec;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
32use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33use parquet::file::metadata::ParquetMetaData;
34use parquet::format::KeyValue;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
37use store_api::storage::ColumnId;
38use table::predicate::Predicate;
39
40use crate::cache::index::result_cache::PredicateKey;
41use crate::cache::CacheStrategy;
42use crate::error::{
43    ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
44    ReadParquetSnafu, Result,
45};
46use crate::metrics::{
47    PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL,
48    READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
49};
50use crate::read::prune::{PruneReader, Source};
51use crate::read::{Batch, BatchReader};
52use crate::sst::file::{FileHandle, FileId};
53use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
54use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
55use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
56use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
57use crate::sst::parquet::format::ReadFormat;
58use crate::sst::parquet::metadata::MetadataLoader;
59use crate::sst::parquet::row_group::InMemoryRowGroup;
60use crate::sst::parquet::row_selection::RowGroupSelection;
61use crate::sst::parquet::stats::RowGroupPruningStats;
62use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
63
64const INDEX_TYPE_FULLTEXT: &str = "fulltext";
65const INDEX_TYPE_INVERTED: &str = "inverted";
66const INDEX_TYPE_BLOOM: &str = "bloom filter";
67
68macro_rules! handle_index_error {
69    ($err:expr, $file_handle:expr, $index_type:expr) => {
70        if cfg!(any(test, feature = "test")) {
71            panic!(
72                "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
73                $index_type,
74                $file_handle.region_id(),
75                $file_handle.file_id(),
76                $err
77            );
78        } else {
79            warn!(
80                $err; "Failed to apply {} index, region_id: {}, file_id: {}",
81                $index_type,
82                $file_handle.region_id(),
83                $file_handle.file_id()
84            );
85        }
86    };
87}
88
89/// Parquet SST reader builder.
90pub struct ParquetReaderBuilder {
91    /// SST directory.
92    file_dir: String,
93    file_handle: FileHandle,
94    object_store: ObjectStore,
95    /// Predicate to push down.
96    predicate: Option<Predicate>,
97    /// Metadata of columns to read.
98    ///
99    /// `None` reads all columns. Due to schema change, the projection
100    /// can contain columns not in the parquet file.
101    projection: Option<Vec<ColumnId>>,
102    /// Strategy to cache SST data.
103    cache_strategy: CacheStrategy,
104    /// Index appliers.
105    inverted_index_applier: Option<InvertedIndexApplierRef>,
106    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
107    fulltext_index_applier: Option<FulltextIndexApplierRef>,
108    /// Expected metadata of the region while reading the SST.
109    /// This is usually the latest metadata of the region. The reader use
110    /// it get the correct column id of a column by name.
111    expected_metadata: Option<RegionMetadataRef>,
112}
113
114impl ParquetReaderBuilder {
115    /// Returns a new [ParquetReaderBuilder] to read specific SST.
116    pub fn new(
117        file_dir: String,
118        file_handle: FileHandle,
119        object_store: ObjectStore,
120    ) -> ParquetReaderBuilder {
121        ParquetReaderBuilder {
122            file_dir,
123            file_handle,
124            object_store,
125            predicate: None,
126            projection: None,
127            cache_strategy: CacheStrategy::Disabled,
128            inverted_index_applier: None,
129            bloom_filter_index_applier: None,
130            fulltext_index_applier: None,
131            expected_metadata: None,
132        }
133    }
134
135    /// Attaches the predicate to the builder.
136    #[must_use]
137    pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
138        self.predicate = predicate;
139        self
140    }
141
142    /// Attaches the projection to the builder.
143    ///
144    /// The reader only applies the projection to fields.
145    #[must_use]
146    pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
147        self.projection = projection;
148        self
149    }
150
151    /// Attaches the cache to the builder.
152    #[must_use]
153    pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
154        self.cache_strategy = cache;
155        self
156    }
157
158    /// Attaches the inverted index applier to the builder.
159    #[must_use]
160    pub(crate) fn inverted_index_applier(
161        mut self,
162        index_applier: Option<InvertedIndexApplierRef>,
163    ) -> Self {
164        self.inverted_index_applier = index_applier;
165        self
166    }
167
168    /// Attaches the bloom filter index applier to the builder.
169    #[must_use]
170    pub(crate) fn bloom_filter_index_applier(
171        mut self,
172        index_applier: Option<BloomFilterIndexApplierRef>,
173    ) -> Self {
174        self.bloom_filter_index_applier = index_applier;
175        self
176    }
177
178    /// Attaches the fulltext index applier to the builder.
179    #[must_use]
180    pub(crate) fn fulltext_index_applier(
181        mut self,
182        index_applier: Option<FulltextIndexApplierRef>,
183    ) -> Self {
184        self.fulltext_index_applier = index_applier;
185        self
186    }
187
188    /// Attaches the expected metadata to the builder.
189    #[must_use]
190    pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
191        self.expected_metadata = expected_metadata;
192        self
193    }
194
195    /// Builds a [ParquetReader].
196    ///
197    /// This needs to perform IO operation.
198    pub async fn build(&self) -> Result<ParquetReader> {
199        let mut metrics = ReaderMetrics::default();
200
201        let (context, selection) = self.build_reader_input(&mut metrics).await?;
202        ParquetReader::new(Arc::new(context), selection).await
203    }
204
205    /// Builds a [FileRangeContext] and collects row groups to read.
206    ///
207    /// This needs to perform IO operation.
208    pub(crate) async fn build_reader_input(
209        &self,
210        metrics: &mut ReaderMetrics,
211    ) -> Result<(FileRangeContext, RowGroupSelection)> {
212        let start = Instant::now();
213
214        let file_path = self.file_handle.file_path(&self.file_dir);
215        let file_size = self.file_handle.meta_ref().file_size;
216
217        // Loads parquet metadata of the file.
218        let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
219        // Decodes region metadata.
220        let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
221        // Gets the metadata stored in the SST.
222        let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
223        let read_format = if let Some(column_ids) = &self.projection {
224            ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
225        } else {
226            // Lists all column ids to read, we always use the expected metadata if possible.
227            let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
228            ReadFormat::new(
229                region_meta.clone(),
230                expected_meta
231                    .column_metadatas
232                    .iter()
233                    .map(|col| col.column_id),
234            )
235        };
236
237        // Computes the projection mask.
238        let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
239        let indices = read_format.projection_indices();
240        // Now we assumes we don't have nested schemas.
241        // TODO(yingwen): Revisit this if we introduce nested types such as JSON type.
242        let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
243
244        // Computes the field levels.
245        let hint = Some(read_format.arrow_schema().fields());
246        let field_levels =
247            parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
248                .context(ReadDataPartSnafu)?;
249        let selection = self
250            .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
251            .await;
252
253        let reader_builder = RowGroupReaderBuilder {
254            file_handle: self.file_handle.clone(),
255            file_path,
256            parquet_meta,
257            object_store: self.object_store.clone(),
258            projection: projection_mask,
259            field_levels,
260            cache_strategy: self.cache_strategy.clone(),
261        };
262
263        let filters = if let Some(predicate) = &self.predicate {
264            predicate
265                .exprs()
266                .iter()
267                .filter_map(|expr| {
268                    SimpleFilterContext::new_opt(
269                        &region_meta,
270                        self.expected_metadata.as_deref(),
271                        expr,
272                    )
273                })
274                .collect::<Vec<_>>()
275        } else {
276            vec![]
277        };
278
279        let codec = build_primary_key_codec(read_format.metadata());
280
281        let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
282
283        metrics.build_cost += start.elapsed();
284
285        Ok((context, selection))
286    }
287
288    /// Decodes region metadata from key value.
289    fn get_region_metadata(
290        file_path: &str,
291        key_value_meta: Option<&Vec<KeyValue>>,
292    ) -> Result<RegionMetadata> {
293        let key_values = key_value_meta.context(InvalidParquetSnafu {
294            file: file_path,
295            reason: "missing key value meta",
296        })?;
297        let meta_value = key_values
298            .iter()
299            .find(|kv| kv.key == PARQUET_METADATA_KEY)
300            .with_context(|| InvalidParquetSnafu {
301                file: file_path,
302                reason: format!("key {} not found", PARQUET_METADATA_KEY),
303            })?;
304        let json = meta_value
305            .value
306            .as_ref()
307            .with_context(|| InvalidParquetSnafu {
308                file: file_path,
309                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
310            })?;
311
312        RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
313    }
314
315    /// Reads parquet metadata of specific file.
316    async fn read_parquet_metadata(
317        &self,
318        file_path: &str,
319        file_size: u64,
320    ) -> Result<Arc<ParquetMetaData>> {
321        let _t = READ_STAGE_ELAPSED
322            .with_label_values(&["read_parquet_metadata"])
323            .start_timer();
324
325        let region_id = self.file_handle.region_id();
326        let file_id = self.file_handle.file_id();
327        // Tries to get from global cache.
328        if let Some(metadata) = self
329            .cache_strategy
330            .get_parquet_meta_data(region_id, file_id)
331            .await
332        {
333            return Ok(metadata);
334        }
335
336        // Cache miss, load metadata directly.
337        let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
338        let metadata = metadata_loader.load().await?;
339        let metadata = Arc::new(metadata);
340        // Cache the metadata.
341        self.cache_strategy.put_parquet_meta_data(
342            self.file_handle.region_id(),
343            self.file_handle.file_id(),
344            metadata.clone(),
345        );
346
347        Ok(metadata)
348    }
349
350    /// Computes row groups to read, along with their respective row selections.
351    async fn row_groups_to_read(
352        &self,
353        read_format: &ReadFormat,
354        parquet_meta: &ParquetMetaData,
355        metrics: &mut ReaderFilterMetrics,
356    ) -> RowGroupSelection {
357        let num_row_groups = parquet_meta.num_row_groups();
358        let num_rows = parquet_meta.file_metadata().num_rows();
359        if num_row_groups == 0 || num_rows == 0 {
360            return RowGroupSelection::default();
361        }
362
363        // Let's assume that the number of rows in the first row group
364        // can represent the `row_group_size` of the Parquet file.
365        let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
366        if row_group_size == 0 {
367            return RowGroupSelection::default();
368        }
369
370        metrics.rg_total += num_row_groups;
371        metrics.rows_total += num_rows as usize;
372
373        let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
374
375        self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
376        if output.is_empty() {
377            return output;
378        }
379
380        let fulltext_filtered = self
381            .prune_row_groups_by_fulltext_index(
382                row_group_size,
383                num_row_groups,
384                &mut output,
385                metrics,
386            )
387            .await;
388        if output.is_empty() {
389            return output;
390        }
391
392        self.prune_row_groups_by_inverted_index(
393            row_group_size,
394            num_row_groups,
395            &mut output,
396            metrics,
397        )
398        .await;
399        if output.is_empty() {
400            return output;
401        }
402
403        self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
404            .await;
405        if output.is_empty() {
406            return output;
407        }
408
409        if !fulltext_filtered {
410            self.prune_row_groups_by_fulltext_bloom(
411                row_group_size,
412                parquet_meta,
413                &mut output,
414                metrics,
415            )
416            .await;
417        }
418        output
419    }
420
421    /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
422    async fn prune_row_groups_by_fulltext_index(
423        &self,
424        row_group_size: usize,
425        num_row_groups: usize,
426        output: &mut RowGroupSelection,
427        metrics: &mut ReaderFilterMetrics,
428    ) -> bool {
429        let Some(index_applier) = &self.fulltext_index_applier else {
430            return false;
431        };
432        if !self.file_handle.meta_ref().fulltext_index_available() {
433            return false;
434        }
435
436        let predicate_key = index_applier.predicate_key();
437        // Fast path: return early if the result is in the cache.
438        let cached = self
439            .cache_strategy
440            .index_result_cache()
441            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
442        if let Some(result) = cached.as_ref() {
443            if all_required_row_groups_searched(output, result) {
444                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
445                return true;
446            }
447        }
448
449        // Slow path: apply the index from the file.
450        let file_size_hint = self.file_handle.meta_ref().index_file_size();
451        let apply_res = index_applier
452            .apply_fine(self.file_handle.file_id(), Some(file_size_hint))
453            .await;
454        let selection = match apply_res {
455            Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
456            Ok(None) => return false,
457            Err(err) => {
458                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
459                return false;
460            }
461        };
462
463        self.apply_index_result_and_update_cache(
464            predicate_key,
465            self.file_handle.file_id(),
466            selection,
467            output,
468            metrics,
469            INDEX_TYPE_FULLTEXT,
470        );
471        true
472    }
473
474    /// Applies index to prune row groups.
475    ///
476    /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
477    /// as an escape route in case of index issues, and it can be used to test
478    /// the correctness of the index.
479    async fn prune_row_groups_by_inverted_index(
480        &self,
481        row_group_size: usize,
482        num_row_groups: usize,
483        output: &mut RowGroupSelection,
484        metrics: &mut ReaderFilterMetrics,
485    ) -> bool {
486        let Some(index_applier) = &self.inverted_index_applier else {
487            return false;
488        };
489        if !self.file_handle.meta_ref().inverted_index_available() {
490            return false;
491        }
492
493        let predicate_key = index_applier.predicate_key();
494        // Fast path: return early if the result is in the cache.
495        let cached = self
496            .cache_strategy
497            .index_result_cache()
498            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
499        if let Some(result) = cached.as_ref() {
500            if all_required_row_groups_searched(output, result) {
501                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
502                return true;
503            }
504        }
505
506        // Slow path: apply the index from the file.
507        let file_size_hint = self.file_handle.meta_ref().index_file_size();
508        let apply_res = index_applier
509            .apply(self.file_handle.file_id(), Some(file_size_hint))
510            .await;
511        let selection = match apply_res {
512            Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
513                row_group_size,
514                num_row_groups,
515                output,
516            ),
517            Err(err) => {
518                handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
519                return false;
520            }
521        };
522
523        self.apply_index_result_and_update_cache(
524            predicate_key,
525            self.file_handle.file_id(),
526            selection,
527            output,
528            metrics,
529            INDEX_TYPE_INVERTED,
530        );
531        true
532    }
533
534    async fn prune_row_groups_by_bloom_filter(
535        &self,
536        row_group_size: usize,
537        parquet_meta: &ParquetMetaData,
538        output: &mut RowGroupSelection,
539        metrics: &mut ReaderFilterMetrics,
540    ) -> bool {
541        let Some(index_applier) = &self.bloom_filter_index_applier else {
542            return false;
543        };
544        if !self.file_handle.meta_ref().bloom_filter_index_available() {
545            return false;
546        }
547
548        let predicate_key = index_applier.predicate_key();
549        // Fast path: return early if the result is in the cache.
550        let cached = self
551            .cache_strategy
552            .index_result_cache()
553            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
554        if let Some(result) = cached.as_ref() {
555            if all_required_row_groups_searched(output, result) {
556                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
557                return true;
558            }
559        }
560
561        // Slow path: apply the index from the file.
562        let file_size_hint = self.file_handle.meta_ref().index_file_size();
563        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
564            (
565                rg.num_rows() as usize,
566                // Optimize: only search the row group that required by `output` and not stored in `cached`.
567                output.contains_non_empty_row_group(i)
568                    && cached
569                        .as_ref()
570                        .map(|c| !c.contains_row_group(i))
571                        .unwrap_or(true),
572            )
573        });
574        let apply_res = index_applier
575            .apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
576            .await;
577        let mut selection = match apply_res {
578            Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
579            Err(err) => {
580                handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
581                return false;
582            }
583        };
584
585        // New searched row groups are added to `selection`, concat them with `cached`.
586        if let Some(cached) = cached.as_ref() {
587            selection.concat(cached);
588        }
589
590        self.apply_index_result_and_update_cache(
591            predicate_key,
592            self.file_handle.file_id(),
593            selection,
594            output,
595            metrics,
596            INDEX_TYPE_BLOOM,
597        );
598        true
599    }
600
601    async fn prune_row_groups_by_fulltext_bloom(
602        &self,
603        row_group_size: usize,
604        parquet_meta: &ParquetMetaData,
605        output: &mut RowGroupSelection,
606        metrics: &mut ReaderFilterMetrics,
607    ) -> bool {
608        let Some(index_applier) = &self.fulltext_index_applier else {
609            return false;
610        };
611        if !self.file_handle.meta_ref().fulltext_index_available() {
612            return false;
613        }
614
615        let predicate_key = index_applier.predicate_key();
616        // Fast path: return early if the result is in the cache.
617        let cached = self
618            .cache_strategy
619            .index_result_cache()
620            .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
621        if let Some(result) = cached.as_ref() {
622            if all_required_row_groups_searched(output, result) {
623                apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
624                return true;
625            }
626        }
627
628        // Slow path: apply the index from the file.
629        let file_size_hint = self.file_handle.meta_ref().index_file_size();
630        let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
631            (
632                rg.num_rows() as usize,
633                // Optimize: only search the row group that required by `output` and not stored in `cached`.
634                output.contains_non_empty_row_group(i)
635                    && cached
636                        .as_ref()
637                        .map(|c| !c.contains_row_group(i))
638                        .unwrap_or(true),
639            )
640        });
641        let apply_res = index_applier
642            .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
643            .await;
644        let mut selection = match apply_res {
645            Ok(Some(apply_output)) => {
646                RowGroupSelection::from_row_ranges(apply_output, row_group_size)
647            }
648            Ok(None) => return false,
649            Err(err) => {
650                handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
651                return false;
652            }
653        };
654
655        // New searched row groups are added to `selection`, concat them with `cached`.
656        if let Some(cached) = cached.as_ref() {
657            selection.concat(cached);
658        }
659
660        self.apply_index_result_and_update_cache(
661            predicate_key,
662            self.file_handle.file_id(),
663            selection,
664            output,
665            metrics,
666            INDEX_TYPE_FULLTEXT,
667        );
668        true
669    }
670
671    /// Prunes row groups by min-max index.
672    fn prune_row_groups_by_minmax(
673        &self,
674        read_format: &ReadFormat,
675        parquet_meta: &ParquetMetaData,
676        output: &mut RowGroupSelection,
677        metrics: &mut ReaderFilterMetrics,
678    ) -> bool {
679        let Some(predicate) = &self.predicate else {
680            return false;
681        };
682
683        let row_groups_before = output.row_group_count();
684
685        let region_meta = read_format.metadata();
686        let row_groups = parquet_meta.row_groups();
687        let stats =
688            RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
689        let prune_schema = self
690            .expected_metadata
691            .as_ref()
692            .map(|meta| meta.schema.arrow_schema())
693            .unwrap_or_else(|| region_meta.schema.arrow_schema());
694
695        // Here we use the schema of the SST to build the physical expression. If the column
696        // in the SST doesn't have the same column id as the column in the expected metadata,
697        // we will get a None statistics for that column.
698        predicate
699            .prune_with_stats(&stats, prune_schema)
700            .iter()
701            .zip(0..parquet_meta.num_row_groups())
702            .for_each(|(mask, row_group)| {
703                if !*mask {
704                    output.remove_row_group(row_group);
705                }
706            });
707
708        let row_groups_after = output.row_group_count();
709        metrics.rg_minmax_filtered += row_groups_before - row_groups_after;
710
711        true
712    }
713
714    fn apply_index_result_and_update_cache(
715        &self,
716        predicate_key: &PredicateKey,
717        file_id: FileId,
718        result: RowGroupSelection,
719        output: &mut RowGroupSelection,
720        metrics: &mut ReaderFilterMetrics,
721        index_type: &str,
722    ) {
723        apply_selection_and_update_metrics(output, &result, metrics, index_type);
724
725        if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
726            index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
727        }
728    }
729}
730
731fn apply_selection_and_update_metrics(
732    output: &mut RowGroupSelection,
733    result: &RowGroupSelection,
734    metrics: &mut ReaderFilterMetrics,
735    index_type: &str,
736) {
737    let intersection = output.intersect(result);
738
739    let row_group_count = output.row_group_count() - intersection.row_group_count();
740    let row_count = output.row_count() - intersection.row_count();
741
742    metrics.update_index_metrics(index_type, row_group_count, row_count);
743
744    *output = intersection;
745}
746
747fn all_required_row_groups_searched(
748    required_row_groups: &RowGroupSelection,
749    cached_row_groups: &RowGroupSelection,
750) -> bool {
751    required_row_groups.iter().all(|(rg_id, _)| {
752        // Row group with no rows is not required to search.
753        !required_row_groups.contains_non_empty_row_group(*rg_id)
754            // The row group is already searched.
755            || cached_row_groups.contains_row_group(*rg_id)
756    })
757}
758
759/// Metrics of filtering rows groups and rows.
760#[derive(Debug, Default, Clone, Copy)]
761pub(crate) struct ReaderFilterMetrics {
762    /// Number of row groups before filtering.
763    pub(crate) rg_total: usize,
764    /// Number of row groups filtered by fulltext index.
765    pub(crate) rg_fulltext_filtered: usize,
766    /// Number of row groups filtered by inverted index.
767    pub(crate) rg_inverted_filtered: usize,
768    /// Number of row groups filtered by min-max index.
769    pub(crate) rg_minmax_filtered: usize,
770    /// Number of row groups filtered by bloom filter index.
771    pub(crate) rg_bloom_filtered: usize,
772
773    /// Number of rows in row group before filtering.
774    pub(crate) rows_total: usize,
775    /// Number of rows in row group filtered by fulltext index.
776    pub(crate) rows_fulltext_filtered: usize,
777    /// Number of rows in row group filtered by inverted index.
778    pub(crate) rows_inverted_filtered: usize,
779    /// Number of rows in row group filtered by bloom filter index.
780    pub(crate) rows_bloom_filtered: usize,
781    /// Number of rows filtered by precise filter.
782    pub(crate) rows_precise_filtered: usize,
783}
784
785impl ReaderFilterMetrics {
786    /// Adds `other` metrics to this metrics.
787    pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
788        self.rg_total += other.rg_total;
789        self.rg_fulltext_filtered += other.rg_fulltext_filtered;
790        self.rg_inverted_filtered += other.rg_inverted_filtered;
791        self.rg_minmax_filtered += other.rg_minmax_filtered;
792        self.rg_bloom_filtered += other.rg_bloom_filtered;
793
794        self.rows_total += other.rows_total;
795        self.rows_fulltext_filtered += other.rows_fulltext_filtered;
796        self.rows_inverted_filtered += other.rows_inverted_filtered;
797        self.rows_bloom_filtered += other.rows_bloom_filtered;
798        self.rows_precise_filtered += other.rows_precise_filtered;
799    }
800
801    /// Reports metrics.
802    pub(crate) fn observe(&self) {
803        READ_ROW_GROUPS_TOTAL
804            .with_label_values(&["before_filtering"])
805            .inc_by(self.rg_total as u64);
806        READ_ROW_GROUPS_TOTAL
807            .with_label_values(&["fulltext_index_filtered"])
808            .inc_by(self.rg_fulltext_filtered as u64);
809        READ_ROW_GROUPS_TOTAL
810            .with_label_values(&["inverted_index_filtered"])
811            .inc_by(self.rg_inverted_filtered as u64);
812        READ_ROW_GROUPS_TOTAL
813            .with_label_values(&["minmax_index_filtered"])
814            .inc_by(self.rg_minmax_filtered as u64);
815        READ_ROW_GROUPS_TOTAL
816            .with_label_values(&["bloom_filter_index_filtered"])
817            .inc_by(self.rg_bloom_filtered as u64);
818
819        PRECISE_FILTER_ROWS_TOTAL
820            .with_label_values(&["parquet"])
821            .inc_by(self.rows_precise_filtered as u64);
822        READ_ROWS_IN_ROW_GROUP_TOTAL
823            .with_label_values(&["before_filtering"])
824            .inc_by(self.rows_total as u64);
825        READ_ROWS_IN_ROW_GROUP_TOTAL
826            .with_label_values(&["fulltext_index_filtered"])
827            .inc_by(self.rows_fulltext_filtered as u64);
828        READ_ROWS_IN_ROW_GROUP_TOTAL
829            .with_label_values(&["inverted_index_filtered"])
830            .inc_by(self.rows_inverted_filtered as u64);
831        READ_ROWS_IN_ROW_GROUP_TOTAL
832            .with_label_values(&["bloom_filter_index_filtered"])
833            .inc_by(self.rows_bloom_filtered as u64);
834    }
835
836    fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
837        match index_type {
838            INDEX_TYPE_FULLTEXT => {
839                self.rg_fulltext_filtered += row_group_count;
840                self.rows_fulltext_filtered += row_count;
841            }
842            INDEX_TYPE_INVERTED => {
843                self.rg_inverted_filtered += row_group_count;
844                self.rows_inverted_filtered += row_count;
845            }
846            INDEX_TYPE_BLOOM => {
847                self.rg_bloom_filtered += row_group_count;
848                self.rows_bloom_filtered += row_count;
849            }
850            _ => {}
851        }
852    }
853}
854
855/// Parquet reader metrics.
856#[derive(Debug, Default, Clone)]
857pub struct ReaderMetrics {
858    /// Filtered row groups and rows metrics.
859    pub(crate) filter_metrics: ReaderFilterMetrics,
860    /// Duration to build the parquet reader.
861    pub(crate) build_cost: Duration,
862    /// Duration to scan the reader.
863    pub(crate) scan_cost: Duration,
864    /// Number of record batches read.
865    pub(crate) num_record_batches: usize,
866    /// Number of batches decoded.
867    pub(crate) num_batches: usize,
868    /// Number of rows read.
869    pub(crate) num_rows: usize,
870}
871
872impl ReaderMetrics {
873    /// Adds `other` metrics to this metrics.
874    pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
875        self.filter_metrics.merge_from(&other.filter_metrics);
876        self.build_cost += other.build_cost;
877        self.scan_cost += other.scan_cost;
878        self.num_record_batches += other.num_record_batches;
879        self.num_batches += other.num_batches;
880        self.num_rows += other.num_rows;
881    }
882
883    /// Reports total rows.
884    pub(crate) fn observe_rows(&self, read_type: &str) {
885        READ_ROWS_TOTAL
886            .with_label_values(&[read_type])
887            .inc_by(self.num_rows as u64);
888    }
889}
890
891/// Builder to build a [ParquetRecordBatchReader] for a row group.
892pub(crate) struct RowGroupReaderBuilder {
893    /// SST file to read.
894    ///
895    /// Holds the file handle to avoid the file purge it.
896    file_handle: FileHandle,
897    /// Path of the file.
898    file_path: String,
899    /// Metadata of the parquet file.
900    parquet_meta: Arc<ParquetMetaData>,
901    /// Object store as an Operator.
902    object_store: ObjectStore,
903    /// Projection mask.
904    projection: ProjectionMask,
905    /// Field levels to read.
906    field_levels: FieldLevels,
907    /// Cache.
908    cache_strategy: CacheStrategy,
909}
910
911impl RowGroupReaderBuilder {
912    /// Path of the file to read.
913    pub(crate) fn file_path(&self) -> &str {
914        &self.file_path
915    }
916
917    /// Handle of the file to read.
918    pub(crate) fn file_handle(&self) -> &FileHandle {
919        &self.file_handle
920    }
921
922    pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
923        &self.parquet_meta
924    }
925
926    pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
927        &self.cache_strategy
928    }
929
930    /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
931    pub(crate) async fn build(
932        &self,
933        row_group_idx: usize,
934        row_selection: Option<RowSelection>,
935    ) -> Result<ParquetRecordBatchReader> {
936        let mut row_group = InMemoryRowGroup::create(
937            self.file_handle.region_id(),
938            self.file_handle.file_id(),
939            &self.parquet_meta,
940            row_group_idx,
941            self.cache_strategy.clone(),
942            &self.file_path,
943            self.object_store.clone(),
944        );
945        // Fetches data into memory.
946        row_group
947            .fetch(&self.projection, row_selection.as_ref())
948            .await
949            .context(ReadParquetSnafu {
950                path: &self.file_path,
951            })?;
952
953        // Builds the parquet reader.
954        // Now the row selection is None.
955        ParquetRecordBatchReader::try_new_with_row_groups(
956            &self.field_levels,
957            &row_group,
958            DEFAULT_READ_BATCH_SIZE,
959            row_selection,
960        )
961        .context(ReadParquetSnafu {
962            path: &self.file_path,
963        })
964    }
965}
966
967/// The state of a [ParquetReader].
968enum ReaderState {
969    /// The reader is reading a row group.
970    Readable(PruneReader),
971    /// The reader is exhausted.
972    Exhausted(ReaderMetrics),
973}
974
975impl ReaderState {
976    /// Returns the metrics of the reader.
977    fn metrics(&self) -> ReaderMetrics {
978        match self {
979            ReaderState::Readable(reader) => reader.metrics(),
980            ReaderState::Exhausted(m) => m.clone(),
981        }
982    }
983}
984
985/// The filter to evaluate or the prune result of the default value.
986pub(crate) enum MaybeFilter {
987    /// The filter to evaluate.
988    Filter(SimpleFilterEvaluator),
989    /// The filter matches the default value.
990    Matched,
991    /// The filter is pruned.
992    Pruned,
993}
994
995/// Context to evaluate the column filter for a parquet file.
996pub(crate) struct SimpleFilterContext {
997    /// Filter to evaluate.
998    filter: MaybeFilter,
999    /// Id of the column to evaluate.
1000    column_id: ColumnId,
1001    /// Semantic type of the column.
1002    semantic_type: SemanticType,
1003    /// The data type of the column.
1004    data_type: ConcreteDataType,
1005}
1006
1007impl SimpleFilterContext {
1008    /// Creates a context for the `expr`.
1009    ///
1010    /// Returns None if the column to filter doesn't exist in the SST metadata or the
1011    /// expected metadata.
1012    pub(crate) fn new_opt(
1013        sst_meta: &RegionMetadataRef,
1014        expected_meta: Option<&RegionMetadata>,
1015        expr: &Expr,
1016    ) -> Option<Self> {
1017        let filter = SimpleFilterEvaluator::try_new(expr)?;
1018        let (column_metadata, maybe_filter) = match expected_meta {
1019            Some(meta) => {
1020                // Gets the column metadata from the expected metadata.
1021                let column = meta.column_by_name(filter.column_name())?;
1022                // Checks if the column is present in the SST metadata. We still uses the
1023                // column from the expected metadata.
1024                match sst_meta.column_by_id(column.column_id) {
1025                    Some(sst_column) => {
1026                        debug_assert_eq!(column.semantic_type, sst_column.semantic_type);
1027
1028                        (column, MaybeFilter::Filter(filter))
1029                    }
1030                    None => {
1031                        // If the column is not present in the SST metadata, we evaluate the filter
1032                        // against the default value of the column.
1033                        // If we can't evaluate the filter, we return None.
1034                        if pruned_by_default(&filter, column)? {
1035                            (column, MaybeFilter::Pruned)
1036                        } else {
1037                            (column, MaybeFilter::Matched)
1038                        }
1039                    }
1040                }
1041            }
1042            None => {
1043                let column = sst_meta.column_by_name(filter.column_name())?;
1044                (column, MaybeFilter::Filter(filter))
1045            }
1046        };
1047
1048        Some(Self {
1049            filter: maybe_filter,
1050            column_id: column_metadata.column_id,
1051            semantic_type: column_metadata.semantic_type,
1052            data_type: column_metadata.column_schema.data_type.clone(),
1053        })
1054    }
1055
1056    /// Returns the filter to evaluate.
1057    pub(crate) fn filter(&self) -> &MaybeFilter {
1058        &self.filter
1059    }
1060
1061    /// Returns the column id.
1062    pub(crate) fn column_id(&self) -> ColumnId {
1063        self.column_id
1064    }
1065
1066    /// Returns the semantic type of the column.
1067    pub(crate) fn semantic_type(&self) -> SemanticType {
1068        self.semantic_type
1069    }
1070
1071    /// Returns the data type of the column.
1072    pub(crate) fn data_type(&self) -> &ConcreteDataType {
1073        &self.data_type
1074    }
1075}
1076
1077/// Prune a column by its default value.
1078/// Returns false if we can't create the default value or evaluate the filter.
1079fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option<bool> {
1080    let value = column.column_schema.create_default().ok().flatten()?;
1081    let scalar_value = value
1082        .try_to_scalar_value(&column.column_schema.data_type)
1083        .ok()?;
1084    let matches = filter.evaluate_scalar(&scalar_value).ok()?;
1085    Some(!matches)
1086}
1087
1088/// Parquet batch reader to read our SST format.
1089pub struct ParquetReader {
1090    /// File range context.
1091    context: FileRangeContextRef,
1092    /// Row group selection to read.
1093    selection: RowGroupSelection,
1094    /// Reader of current row group.
1095    reader_state: ReaderState,
1096}
1097
1098#[async_trait]
1099impl BatchReader for ParquetReader {
1100    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1101        let ReaderState::Readable(reader) = &mut self.reader_state else {
1102            return Ok(None);
1103        };
1104
1105        // We don't collect the elapsed time if the reader returns an error.
1106        if let Some(batch) = reader.next_batch().await? {
1107            return Ok(Some(batch));
1108        }
1109
1110        // No more items in current row group, reads next row group.
1111        while let Some((row_group_idx, row_selection)) = self.selection.pop_first() {
1112            let parquet_reader = self
1113                .context
1114                .reader_builder()
1115                .build(row_group_idx, Some(row_selection))
1116                .await?;
1117
1118            // Resets the parquet reader.
1119            reader.reset_source(Source::RowGroup(RowGroupReader::new(
1120                self.context.clone(),
1121                parquet_reader,
1122            )));
1123            if let Some(batch) = reader.next_batch().await? {
1124                return Ok(Some(batch));
1125            }
1126        }
1127
1128        // The reader is exhausted.
1129        self.reader_state = ReaderState::Exhausted(reader.metrics().clone());
1130        Ok(None)
1131    }
1132}
1133
1134impl Drop for ParquetReader {
1135    fn drop(&mut self) {
1136        let metrics = self.reader_state.metrics();
1137        debug!(
1138            "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
1139            self.context.reader_builder().file_handle.region_id(),
1140            self.context.reader_builder().file_handle.file_id(),
1141            self.context.reader_builder().file_handle.time_range(),
1142            metrics.filter_metrics.rg_total
1143                - metrics.filter_metrics.rg_inverted_filtered
1144                - metrics.filter_metrics.rg_minmax_filtered
1145                - metrics.filter_metrics.rg_fulltext_filtered
1146                - metrics.filter_metrics.rg_bloom_filtered,
1147            metrics.filter_metrics.rg_total,
1148            metrics
1149        );
1150
1151        // Report metrics.
1152        READ_STAGE_ELAPSED
1153            .with_label_values(&["build_parquet_reader"])
1154            .observe(metrics.build_cost.as_secs_f64());
1155        READ_STAGE_ELAPSED
1156            .with_label_values(&["scan_row_groups"])
1157            .observe(metrics.scan_cost.as_secs_f64());
1158        metrics.observe_rows("parquet_reader");
1159        metrics.filter_metrics.observe();
1160    }
1161}
1162
1163impl ParquetReader {
1164    /// Creates a new reader.
1165    pub(crate) async fn new(
1166        context: FileRangeContextRef,
1167        mut selection: RowGroupSelection,
1168    ) -> Result<Self> {
1169        // No more items in current row group, reads next row group.
1170        let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
1171            let parquet_reader = context
1172                .reader_builder()
1173                .build(row_group_idx, Some(row_selection))
1174                .await?;
1175            ReaderState::Readable(PruneReader::new_with_row_group_reader(
1176                context.clone(),
1177                RowGroupReader::new(context.clone(), parquet_reader),
1178            ))
1179        } else {
1180            ReaderState::Exhausted(ReaderMetrics::default())
1181        };
1182
1183        Ok(ParquetReader {
1184            context,
1185            selection,
1186            reader_state,
1187        })
1188    }
1189
1190    /// Returns the metadata of the SST.
1191    pub fn metadata(&self) -> &RegionMetadataRef {
1192        self.context.read_format().metadata()
1193    }
1194
1195    #[cfg(test)]
1196    pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
1197        self.context.reader_builder().parquet_meta.clone()
1198    }
1199}
1200
1201/// RowGroupReaderContext represents the fields that cannot be shared
1202/// between different `RowGroupReader`s.
1203pub(crate) trait RowGroupReaderContext: Send {
1204    fn map_result(
1205        &self,
1206        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1207    ) -> Result<Option<RecordBatch>>;
1208
1209    fn read_format(&self) -> &ReadFormat;
1210}
1211
1212impl RowGroupReaderContext for FileRangeContextRef {
1213    fn map_result(
1214        &self,
1215        result: std::result::Result<Option<RecordBatch>, ArrowError>,
1216    ) -> Result<Option<RecordBatch>> {
1217        result.context(ArrowReaderSnafu {
1218            path: self.file_path(),
1219        })
1220    }
1221
1222    fn read_format(&self) -> &ReadFormat {
1223        self.as_ref().read_format()
1224    }
1225}
1226
1227/// [RowGroupReader] that reads from [FileRange].
1228pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
1229
1230impl RowGroupReader {
1231    /// Creates a new reader from file range.
1232    pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
1233        Self {
1234            context,
1235            reader,
1236            batches: VecDeque::new(),
1237            metrics: ReaderMetrics::default(),
1238        }
1239    }
1240}
1241
1242/// Reader to read a row group of a parquet file.
1243pub(crate) struct RowGroupReaderBase<T> {
1244    /// Context of [RowGroupReader] so adapts to different underlying implementation.
1245    context: T,
1246    /// Inner parquet reader.
1247    reader: ParquetRecordBatchReader,
1248    /// Buffered batches to return.
1249    batches: VecDeque<Batch>,
1250    /// Local scan metrics.
1251    metrics: ReaderMetrics,
1252}
1253
1254impl<T> RowGroupReaderBase<T>
1255where
1256    T: RowGroupReaderContext,
1257{
1258    /// Creates a new reader.
1259    pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
1260        Self {
1261            context,
1262            reader,
1263            batches: VecDeque::new(),
1264            metrics: ReaderMetrics::default(),
1265        }
1266    }
1267
1268    /// Gets the metrics.
1269    pub(crate) fn metrics(&self) -> &ReaderMetrics {
1270        &self.metrics
1271    }
1272
1273    /// Gets [ReadFormat] of underlying reader.
1274    pub(crate) fn read_format(&self) -> &ReadFormat {
1275        self.context.read_format()
1276    }
1277
1278    /// Tries to fetch next [RecordBatch] from the reader.
1279    fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
1280        self.context.map_result(self.reader.next().transpose())
1281    }
1282
1283    /// Returns the next [Batch].
1284    pub(crate) fn next_inner(&mut self) -> Result<Option<Batch>> {
1285        let scan_start = Instant::now();
1286        if let Some(batch) = self.batches.pop_front() {
1287            self.metrics.num_rows += batch.num_rows();
1288            self.metrics.scan_cost += scan_start.elapsed();
1289            return Ok(Some(batch));
1290        }
1291
1292        // We need to fetch next record batch and convert it to batches.
1293        while self.batches.is_empty() {
1294            let Some(record_batch) = self.fetch_next_record_batch()? else {
1295                self.metrics.scan_cost += scan_start.elapsed();
1296                return Ok(None);
1297            };
1298            self.metrics.num_record_batches += 1;
1299
1300            self.context
1301                .read_format()
1302                .convert_record_batch(&record_batch, &mut self.batches)?;
1303            self.metrics.num_batches += self.batches.len();
1304        }
1305        let batch = self.batches.pop_front();
1306        self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
1307        self.metrics.scan_cost += scan_start.elapsed();
1308        Ok(batch)
1309    }
1310}
1311
1312#[async_trait::async_trait]
1313impl<T> BatchReader for RowGroupReaderBase<T>
1314where
1315    T: RowGroupReaderContext,
1316{
1317    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1318        self.next_inner()
1319    }
1320}