mito2/memtable/bulk/
context.rs1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use parquet::file::metadata::ParquetMetaData;
21use store_api::metadata::RegionMetadataRef;
22use store_api::storage::ColumnId;
23use table::predicate::Predicate;
24
25use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
26use crate::sst::parquet::file_range::RangeBase;
27use crate::sst::parquet::format::ReadFormat;
28use crate::sst::parquet::reader::SimpleFilterContext;
29use crate::sst::parquet::stats::RowGroupPruningStats;
30
31pub(crate) type BulkIterContextRef = Arc<BulkIterContext>;
32
33pub(crate) struct BulkIterContext {
34 pub(crate) base: RangeBase,
35 pub(crate) predicate: Option<Predicate>,
36}
37
38impl BulkIterContext {
39 pub(crate) fn new(
40 region_metadata: RegionMetadataRef,
41 projection: &Option<&[ColumnId]>,
42 predicate: Option<Predicate>,
43 ) -> Self {
44 let codec = build_primary_key_codec(®ion_metadata);
45
46 let simple_filters = predicate
47 .as_ref()
48 .iter()
49 .flat_map(|predicate| {
50 predicate
51 .exprs()
52 .iter()
53 .filter_map(|expr| SimpleFilterContext::new_opt(®ion_metadata, None, expr))
54 })
55 .collect();
56
57 let read_format = build_read_format(region_metadata, projection);
58
59 Self {
60 base: RangeBase {
61 filters: simple_filters,
62 read_format,
63 codec,
64 compat_batch: None,
66 },
67 predicate,
68 }
69 }
70
71 pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
73 let region_meta = self.base.read_format.metadata();
74 let row_groups = file_meta.row_groups();
75 let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
77 if let Some(predicate) = self.predicate.as_ref() {
78 predicate
79 .prune_with_stats(&stats, region_meta.schema.arrow_schema())
80 .iter()
81 .zip(0..file_meta.num_row_groups())
82 .filter_map(|(selected, row_group)| {
83 if !*selected {
84 return None;
85 }
86 Some(row_group)
87 })
88 .collect::<VecDeque<_>>()
89 } else {
90 (0..file_meta.num_row_groups()).collect()
91 }
92 }
93
94 pub(crate) fn read_format(&self) -> &ReadFormat {
95 &self.base.read_format
96 }
97}
98
99fn build_read_format(
100 region_metadata: RegionMetadataRef,
101 projection: &Option<&[ColumnId]>,
102) -> ReadFormat {
103 let read_format = if let Some(column_ids) = &projection {
104 ReadFormat::new(region_metadata, column_ids.iter().copied())
105 } else {
106 ReadFormat::new(
108 region_metadata.clone(),
109 region_metadata
110 .column_metadatas
111 .iter()
112 .map(|col| col.column_id),
113 )
114 };
115
116 read_format
117}