mito2/memtable/bulk/
context.rs1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use mito_codec::row_converter::{DensePrimaryKeyCodec, build_primary_key_codec};
21use parquet::file::metadata::ParquetMetaData;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::ColumnId;
24use table::predicate::Predicate;
25
26use crate::error::Result;
27use crate::sst::parquet::file_range::RangeBase;
28use crate::sst::parquet::flat_format::FlatReadFormat;
29use crate::sst::parquet::format::ReadFormat;
30use crate::sst::parquet::reader::SimpleFilterContext;
31use crate::sst::parquet::stats::RowGroupPruningStats;
32
33pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
34
35pub struct BulkIterContext {
36 pub(crate) base: RangeBase,
37 pub(crate) predicate: Option<Predicate>,
38}
39
40impl BulkIterContext {
41 pub fn new(
42 region_metadata: RegionMetadataRef,
43 projection: Option<&[ColumnId]>,
44 predicate: Option<Predicate>,
45 skip_auto_convert: bool,
46 ) -> Result<Self> {
47 let codec = build_primary_key_codec(®ion_metadata);
48
49 let simple_filters = predicate
50 .as_ref()
51 .iter()
52 .flat_map(|predicate| {
53 predicate
54 .exprs()
55 .iter()
56 .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr))
57 })
58 .collect();
59
60 let read_format = ReadFormat::new(
61 region_metadata,
62 projection,
63 true,
64 None,
65 "memtable",
66 skip_auto_convert,
67 )?;
68
69 Ok(Self {
70 base: RangeBase {
71 filters: simple_filters,
72 read_format,
73 codec,
74 compat_batch: None,
76 },
77 predicate,
78 })
79 }
80
81 pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
83 let region_meta = self.base.read_format.metadata();
84 let row_groups = file_meta.row_groups();
85 let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
87 if let Some(predicate) = self.predicate.as_ref() {
88 predicate
89 .prune_with_stats(&stats, region_meta.schema.arrow_schema())
90 .iter()
91 .zip(0..file_meta.num_row_groups())
92 .filter_map(|(selected, row_group)| {
93 if !*selected {
94 return None;
95 }
96 Some(row_group)
97 })
98 .collect::<VecDeque<_>>()
99 } else {
100 (0..file_meta.num_row_groups()).collect()
101 }
102 }
103
104 pub(crate) fn read_format(&self) -> &ReadFormat {
105 &self.base.read_format
106 }
107}