mito2/memtable/bulk/
context.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Context for iterating bulk memtable.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
21use parquet::file::metadata::ParquetMetaData;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::ColumnId;
24use table::predicate::Predicate;
25
26use crate::error::Result;
27use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
28use crate::sst::parquet::flat_format::FlatReadFormat;
29use crate::sst::parquet::format::ReadFormat;
30use crate::sst::parquet::reader::SimpleFilterContext;
31use crate::sst::parquet::stats::RowGroupPruningStats;
32
33pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
34
35pub struct BulkIterContext {
36    pub(crate) base: RangeBase,
37    pub(crate) predicate: Option<Predicate>,
38}
39
40impl BulkIterContext {
41    pub fn new(
42        region_metadata: RegionMetadataRef,
43        projection: Option<&[ColumnId]>,
44        predicate: Option<Predicate>,
45        skip_auto_convert: bool,
46    ) -> Result<Self> {
47        Self::new_with_pre_filter_mode(
48            region_metadata,
49            projection,
50            predicate,
51            skip_auto_convert,
52            PreFilterMode::All,
53        )
54    }
55
56    pub fn new_with_pre_filter_mode(
57        region_metadata: RegionMetadataRef,
58        projection: Option<&[ColumnId]>,
59        predicate: Option<Predicate>,
60        skip_auto_convert: bool,
61        pre_filter_mode: PreFilterMode,
62    ) -> Result<Self> {
63        let codec = build_primary_key_codec(&region_metadata);
64
65        let simple_filters = predicate
66            .as_ref()
67            .iter()
68            .flat_map(|predicate| {
69                predicate
70                    .exprs()
71                    .iter()
72                    .filter_map(|expr| SimpleFilterContext::new_opt(&region_metadata, None, expr))
73            })
74            .collect();
75
76        let read_format = ReadFormat::new(
77            region_metadata.clone(),
78            projection,
79            true,
80            None,
81            "memtable",
82            skip_auto_convert,
83        )?;
84
85        let dyn_filters = predicate
86            .as_ref()
87            .map(|pred| pred.dyn_filters().clone())
88            .unwrap_or_default();
89
90        Ok(Self {
91            base: RangeBase {
92                filters: simple_filters,
93                dyn_filters,
94                read_format,
95                prune_schema: region_metadata.schema.clone(),
96                expected_metadata: Some(region_metadata),
97                codec,
98                // we don't need to compat batch since all batch in memtable have the same schema.
99                compat_batch: None,
100                pre_filter_mode,
101            },
102            predicate,
103        })
104    }
105
106    /// Prunes row groups by stats.
107    pub(crate) fn row_groups_to_read(
108        &self,
109        file_meta: &Arc<ParquetMetaData>,
110        skip_fields: bool,
111    ) -> VecDeque<usize> {
112        let region_meta = self.base.read_format.metadata();
113        let row_groups = file_meta.row_groups();
114        // expected_metadata is set to None since we always expect region metadata of memtable is up-to-date.
115        let stats =
116            RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields);
117        if let Some(predicate) = self.predicate.as_ref() {
118            predicate
119                .prune_with_stats(&stats, region_meta.schema.arrow_schema())
120                .iter()
121                .zip(0..file_meta.num_row_groups())
122                .filter_map(|(selected, row_group)| {
123                    if !*selected {
124                        return None;
125                    }
126                    Some(row_group)
127                })
128                .collect::<VecDeque<_>>()
129        } else {
130            (0..file_meta.num_row_groups()).collect()
131        }
132    }
133
134    pub(crate) fn read_format(&self) -> &ReadFormat {
135        &self.base.read_format
136    }
137
138    /// Returns the pre-filter mode.
139    pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
140        self.base.pre_filter_mode
141    }
142
143    /// Returns the region id.
144    pub(crate) fn region_id(&self) -> store_api::storage::RegionId {
145        self.base.read_format.metadata().region_id
146    }
147}