mito2/memtable/bulk/
context.rs1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
21use parquet::file::metadata::ParquetMetaData;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::ColumnId;
24use table::predicate::Predicate;
25
26use crate::sst::parquet::file_range::RangeBase;
27use crate::sst::parquet::flat_format::FlatReadFormat;
28use crate::sst::parquet::format::ReadFormat;
29use crate::sst::parquet::reader::SimpleFilterContext;
30use crate::sst::parquet::stats::RowGroupPruningStats;
31
32pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
33
34pub struct BulkIterContext {
35 pub(crate) base: RangeBase,
36 pub(crate) predicate: Option<Predicate>,
37}
38
39impl BulkIterContext {
40 pub fn new(
41 region_metadata: RegionMetadataRef,
42 projection: &Option<&[ColumnId]>,
43 predicate: Option<Predicate>,
44 ) -> Self {
45 let codec = build_primary_key_codec(®ion_metadata);
46
47 let simple_filters = predicate
48 .as_ref()
49 .iter()
50 .flat_map(|predicate| {
51 predicate
52 .exprs()
53 .iter()
54 .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr))
55 })
56 .collect();
57
58 let read_format = build_read_format(region_metadata, projection, true);
59
60 Self {
61 base: RangeBase {
62 filters: simple_filters,
63 read_format,
64 codec,
65 compat_batch: None,
67 },
68 predicate,
69 }
70 }
71
72 pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
74 let region_meta = self.base.read_format.metadata();
75 let row_groups = file_meta.row_groups();
76 let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
78 if let Some(predicate) = self.predicate.as_ref() {
79 predicate
80 .prune_with_stats(&stats, region_meta.schema.arrow_schema())
81 .iter()
82 .zip(0..file_meta.num_row_groups())
83 .filter_map(|(selected, row_group)| {
84 if !*selected {
85 return None;
86 }
87 Some(row_group)
88 })
89 .collect::<VecDeque<_>>()
90 } else {
91 (0..file_meta.num_row_groups()).collect()
92 }
93 }
94
95 pub(crate) fn read_format(&self) -> &ReadFormat {
96 &self.base.read_format
97 }
98}
99
100fn build_read_format(
101 region_metadata: RegionMetadataRef,
102 projection: &Option<&[ColumnId]>,
103 flat_format: bool,
104) -> ReadFormat {
105 let read_format = if let Some(column_ids) = &projection {
106 ReadFormat::new(region_metadata, column_ids.iter().copied(), flat_format)
107 } else {
108 ReadFormat::new(
110 region_metadata.clone(),
111 region_metadata
112 .column_metadatas
113 .iter()
114 .map(|col| col.column_id),
115 flat_format,
116 )
117 };
118
119 read_format
120}