mito2/memtable/bulk/
context.rs1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
21use parquet::file::metadata::ParquetMetaData;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::ColumnId;
24use table::predicate::Predicate;
25
26use crate::error::Result;
27use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
28use crate::sst::parquet::flat_format::FlatReadFormat;
29use crate::sst::parquet::format::ReadFormat;
30use crate::sst::parquet::reader::SimpleFilterContext;
31use crate::sst::parquet::stats::RowGroupPruningStats;
32
33pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
34
35pub struct BulkIterContext {
36 pub(crate) base: RangeBase,
37 pub(crate) predicate: Option<Predicate>,
38}
39
40impl BulkIterContext {
41 pub fn new(
42 region_metadata: RegionMetadataRef,
43 projection: Option<&[ColumnId]>,
44 predicate: Option<Predicate>,
45 skip_auto_convert: bool,
46 ) -> Result<Self> {
47 Self::new_with_pre_filter_mode(
48 region_metadata,
49 projection,
50 predicate,
51 skip_auto_convert,
52 PreFilterMode::All,
53 )
54 }
55
56 pub fn new_with_pre_filter_mode(
57 region_metadata: RegionMetadataRef,
58 projection: Option<&[ColumnId]>,
59 predicate: Option<Predicate>,
60 skip_auto_convert: bool,
61 pre_filter_mode: PreFilterMode,
62 ) -> Result<Self> {
63 let codec = build_primary_key_codec(®ion_metadata);
64
65 let simple_filters = predicate
66 .as_ref()
67 .iter()
68 .flat_map(|predicate| {
69 predicate
70 .exprs()
71 .iter()
72 .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr))
73 })
74 .collect();
75
76 let read_format = ReadFormat::new(
77 region_metadata.clone(),
78 projection,
79 true,
80 None,
81 "memtable",
82 skip_auto_convert,
83 )?;
84
85 let dyn_filters = predicate
86 .as_ref()
87 .map(|pred| pred.dyn_filters().clone())
88 .unwrap_or_default();
89
90 Ok(Self {
91 base: RangeBase {
92 filters: simple_filters,
93 dyn_filters,
94 read_format,
95 prune_schema: region_metadata.schema.clone(),
96 expected_metadata: Some(region_metadata),
97 codec,
98 compat_batch: None,
100 pre_filter_mode,
101 },
102 predicate,
103 })
104 }
105
106 pub(crate) fn row_groups_to_read(
108 &self,
109 file_meta: &Arc<ParquetMetaData>,
110 skip_fields: bool,
111 ) -> VecDeque<usize> {
112 let region_meta = self.base.read_format.metadata();
113 let row_groups = file_meta.row_groups();
114 let stats =
116 RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields);
117 if let Some(predicate) = self.predicate.as_ref() {
118 predicate
119 .prune_with_stats(&stats, region_meta.schema.arrow_schema())
120 .iter()
121 .zip(0..file_meta.num_row_groups())
122 .filter_map(|(selected, row_group)| {
123 if !*selected {
124 return None;
125 }
126 Some(row_group)
127 })
128 .collect::<VecDeque<_>>()
129 } else {
130 (0..file_meta.num_row_groups()).collect()
131 }
132 }
133
134 pub(crate) fn read_format(&self) -> &ReadFormat {
135 &self.base.read_format
136 }
137
138 pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
140 self.base.pre_filter_mode
141 }
142
143 pub(crate) fn region_id(&self) -> store_api::storage::RegionId {
145 self.base.read_format.metadata().region_id
146 }
147}