mito2/memtable/bulk/
context.rs1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use common_recordbatch::filter::SimpleFilterEvaluator;
21use mito_codec::row_converter::build_primary_key_codec;
22use parquet::file::metadata::ParquetMetaData;
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::ColumnId;
25use table::predicate::Predicate;
26
27use crate::error::Result;
28use crate::read::read_columns::ReadColumns;
29use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
30use crate::sst::parquet::flat_format::FlatReadFormat;
31use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, build_bulk_filter_plan};
32use crate::sst::parquet::stats::RowGroupPruningStats;
33
34pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
35
36pub struct BulkIterContext {
37 pub(crate) base: RangeBase,
38 pub(crate) predicate: Option<Predicate>,
39 pk_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
42}
43
44impl BulkIterContext {
45 pub fn new(
46 region_metadata: RegionMetadataRef,
47 projection: Option<&[ColumnId]>,
48 predicate: Option<Predicate>,
49 skip_auto_convert: bool,
50 ) -> Result<Self> {
51 Self::new_with_pre_filter_mode(
52 region_metadata,
53 projection,
54 predicate,
55 skip_auto_convert,
56 PreFilterMode::All,
57 )
58 }
59
60 pub fn new_with_pre_filter_mode(
61 region_metadata: RegionMetadataRef,
62 projection: Option<&[ColumnId]>,
63 predicate: Option<Predicate>,
64 skip_auto_convert: bool,
65 pre_filter_mode: PreFilterMode,
66 ) -> Result<Self> {
67 let codec = build_primary_key_codec(®ion_metadata);
68
69 let read_cols = if let Some(col_ids) = projection {
70 ReadColumns::from_deduped_column_ids(col_ids.iter().copied())
71 } else {
72 ReadColumns::from_deduped_column_ids(
73 region_metadata
74 .column_metadatas
75 .iter()
76 .map(|col| col.column_id),
77 )
78 };
79 let read_format = FlatReadFormat::new(
80 region_metadata.clone(),
81 read_cols,
82 None,
83 "memtable",
84 skip_auto_convert,
85 )?;
86
87 let dyn_filters = predicate
88 .as_ref()
89 .map(|pred| pred.dyn_filters().as_ref().clone())
90 .unwrap_or_default();
91
92 let filter_plan = build_bulk_filter_plan(&read_format, predicate.as_ref());
93
94 Ok(Self {
95 base: RangeBase {
96 filters: filter_plan.remaining_simple_filters,
97 dyn_filters,
98 read_format,
99 prune_schema: region_metadata.schema.clone(),
100 expected_metadata: Some(region_metadata),
101 codec,
102 compat_batch: None,
104 compaction_projection_mapper: None,
105 pre_filter_mode,
106 partition_filter: None,
107 },
108 predicate,
109 pk_filters: filter_plan.pk_filters,
110 })
111 }
112
113 pub(crate) fn row_groups_to_read(
115 &self,
116 file_meta: &Arc<ParquetMetaData>,
117 skip_fields: bool,
118 ) -> VecDeque<usize> {
119 let region_meta = self.base.read_format.metadata();
120 let row_groups = file_meta.row_groups();
121 let stats =
123 RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields);
124 if let Some(predicate) = self.predicate.as_ref() {
125 predicate
126 .prune_with_stats(&stats, region_meta.schema.arrow_schema())
127 .iter()
128 .zip(0..file_meta.num_row_groups())
129 .filter_map(|(selected, row_group)| {
130 if !*selected {
131 return None;
132 }
133 Some(row_group)
134 })
135 .collect::<VecDeque<_>>()
136 } else {
137 (0..file_meta.num_row_groups()).collect()
138 }
139 }
140
141 pub(crate) fn build_pk_filter(&self) -> Option<CachedPrimaryKeyFilter> {
144 let pk_filters = self.pk_filters.as_ref()?;
145 let metadata = self.base.read_format.metadata();
146 let inner = self
148 .base
149 .codec
150 .primary_key_filter(metadata, Arc::clone(pk_filters), false);
151 Some(CachedPrimaryKeyFilter::new(inner))
152 }
153
154 pub(crate) fn read_format(&self) -> &FlatReadFormat {
155 &self.base.read_format
156 }
157
158 pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
160 self.base.pre_filter_mode
161 }
162
163 pub(crate) fn region_id(&self) -> store_api::storage::RegionId {
165 self.base.read_format.metadata().region_id
166 }
167}