mito2/sst/parquet/
stats.rs1use std::borrow::Borrow;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use datafusion_common::pruning::PruningStatistics;
23use datafusion_common::{Column, ScalarValue};
24use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
25use parquet::file::metadata::RowGroupMetaData;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28
29use crate::sst::parquet::flat_format::FlatReadFormat;
30use crate::sst::parquet::format::StatValues;
31
32pub(crate) struct RowGroupPruningStats<'a, T> {
34 row_groups: &'a [T],
36 read_format: &'a FlatReadFormat,
38 expected_metadata: Option<RegionMetadataRef>,
43 skip_fields: bool,
45}
46
47impl<'a, T> RowGroupPruningStats<'a, T> {
48 pub(crate) fn new(
50 row_groups: &'a [T],
51 read_format: &'a FlatReadFormat,
52 expected_metadata: Option<RegionMetadataRef>,
53 skip_fields: bool,
54 ) -> Self {
55 Self {
56 row_groups,
57 read_format,
58 expected_metadata,
59 skip_fields,
60 }
61 }
62
63 fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
67 let metadata = self
68 .expected_metadata
69 .as_ref()
70 .unwrap_or_else(|| self.read_format.metadata());
71 let col = metadata.column_by_name(name)?;
72
73 if self.skip_fields && col.semantic_type == SemanticType::Field {
75 return None;
76 }
77
78 Some(col.column_id)
79 }
80
81 fn compat_default_value(&self, column: &str) -> Option<ArrayRef> {
83 let metadata = self.expected_metadata.as_ref()?;
84 let col_metadata = metadata.column_by_name(column)?;
85 col_metadata
86 .column_schema
87 .create_default_vector(self.row_groups.len())
88 .unwrap_or(None)
89 .map(|vector| vector.to_arrow_array())
90 }
91}
92
93impl<T: Borrow<RowGroupMetaData>> RowGroupPruningStats<'_, T> {
94 fn compat_null_count(&self, column: &str) -> Option<ArrayRef> {
96 let metadata = self.expected_metadata.as_ref()?;
97 let col_metadata = metadata.column_by_name(column)?;
98 let value = col_metadata
99 .column_schema
100 .create_default()
101 .unwrap_or(None)?;
102 let values = self.row_groups.iter().map(|meta| {
103 if value.is_null() {
104 u64::try_from(meta.borrow().num_rows()).ok()
105 } else {
106 Some(0)
107 }
108 });
109 Some(Arc::new(UInt64Array::from_iter(values)))
110 }
111}
112
113impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
114 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
115 let column_id = self.column_id_to_prune(&column.name)?;
116 match self.read_format.min_values(self.row_groups, column_id) {
117 StatValues::Values(values) => Some(values),
118 StatValues::NoColumn => self.compat_default_value(&column.name),
119 StatValues::NoStats => None,
120 }
121 }
122
123 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
124 let column_id = self.column_id_to_prune(&column.name)?;
125 match self.read_format.max_values(self.row_groups, column_id) {
126 StatValues::Values(values) => Some(values),
127 StatValues::NoColumn => self.compat_default_value(&column.name),
128 StatValues::NoStats => None,
129 }
130 }
131
132 fn num_containers(&self) -> usize {
133 self.row_groups.len()
134 }
135
136 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
137 let column_id = self.column_id_to_prune(&column.name)?;
138 match self.read_format.null_counts(self.row_groups, column_id) {
139 StatValues::Values(values) => Some(values),
140 StatValues::NoColumn => self.compat_null_count(&column.name),
141 StatValues::NoStats => None,
142 }
143 }
144
145 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
146 None
148 }
149
150 fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
151 None
153 }
154}