mito2/sst/parquet/
stats.rs1use std::borrow::Borrow;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use datafusion_common::pruning::PruningStatistics;
23use datafusion_common::{Column, ScalarValue};
24use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
25use parquet::file::metadata::RowGroupMetaData;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28
29use crate::sst::parquet::format::{ReadFormat, StatValues};
30
31pub(crate) struct RowGroupPruningStats<'a, T> {
33 row_groups: &'a [T],
35 read_format: &'a ReadFormat,
37 expected_metadata: Option<RegionMetadataRef>,
42 skip_fields: bool,
44}
45
46impl<'a, T> RowGroupPruningStats<'a, T> {
47 pub(crate) fn new(
49 row_groups: &'a [T],
50 read_format: &'a ReadFormat,
51 expected_metadata: Option<RegionMetadataRef>,
52 skip_fields: bool,
53 ) -> Self {
54 Self {
55 row_groups,
56 read_format,
57 expected_metadata,
58 skip_fields,
59 }
60 }
61
62 fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
66 let metadata = self
67 .expected_metadata
68 .as_ref()
69 .unwrap_or_else(|| self.read_format.metadata());
70 let col = metadata.column_by_name(name)?;
71
72 if self.skip_fields && col.semantic_type == SemanticType::Field {
74 return None;
75 }
76
77 Some(col.column_id)
78 }
79
80 fn compat_default_value(&self, column: &str) -> Option<ArrayRef> {
82 let metadata = self.expected_metadata.as_ref()?;
83 let col_metadata = metadata.column_by_name(column)?;
84 col_metadata
85 .column_schema
86 .create_default_vector(self.row_groups.len())
87 .unwrap_or(None)
88 .map(|vector| vector.to_arrow_array())
89 }
90}
91
92impl<T: Borrow<RowGroupMetaData>> RowGroupPruningStats<'_, T> {
93 fn compat_null_count(&self, column: &str) -> Option<ArrayRef> {
95 let metadata = self.expected_metadata.as_ref()?;
96 let col_metadata = metadata.column_by_name(column)?;
97 let value = col_metadata
98 .column_schema
99 .create_default()
100 .unwrap_or(None)?;
101 let values = self.row_groups.iter().map(|meta| {
102 if value.is_null() {
103 u64::try_from(meta.borrow().num_rows()).ok()
104 } else {
105 Some(0)
106 }
107 });
108 Some(Arc::new(UInt64Array::from_iter(values)))
109 }
110}
111
112impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
113 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
114 let column_id = self.column_id_to_prune(&column.name)?;
115 match self.read_format.min_values(self.row_groups, column_id) {
116 StatValues::Values(values) => Some(values),
117 StatValues::NoColumn => self.compat_default_value(&column.name),
118 StatValues::NoStats => None,
119 }
120 }
121
122 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
123 let column_id = self.column_id_to_prune(&column.name)?;
124 match self.read_format.max_values(self.row_groups, column_id) {
125 StatValues::Values(values) => Some(values),
126 StatValues::NoColumn => self.compat_default_value(&column.name),
127 StatValues::NoStats => None,
128 }
129 }
130
131 fn num_containers(&self) -> usize {
132 self.row_groups.len()
133 }
134
135 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
136 let column_id = self.column_id_to_prune(&column.name)?;
137 match self.read_format.null_counts(self.row_groups, column_id) {
138 StatValues::Values(values) => Some(values),
139 StatValues::NoColumn => self.compat_null_count(&column.name),
140 StatValues::NoStats => None,
141 }
142 }
143
144 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
145 None
147 }
148
149 fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
150 None
152 }
153}