mito2/sst/parquet/
stats.rs1use std::borrow::Borrow;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use datafusion::physical_optimizer::pruning::PruningStatistics;
22use datafusion_common::{Column, ScalarValue};
23use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
24use parquet::file::metadata::RowGroupMetaData;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::ColumnId;
27
28use crate::sst::parquet::format::{ReadFormat, StatValues};
29
30pub(crate) struct RowGroupPruningStats<'a, T> {
32 row_groups: &'a [T],
34 read_format: &'a ReadFormat,
36 expected_metadata: Option<RegionMetadataRef>,
41}
42
43impl<'a, T> RowGroupPruningStats<'a, T> {
44 pub(crate) fn new(
46 row_groups: &'a [T],
47 read_format: &'a ReadFormat,
48 expected_metadata: Option<RegionMetadataRef>,
49 ) -> Self {
50 Self {
51 row_groups,
52 read_format,
53 expected_metadata,
54 }
55 }
56
57 fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
60 let metadata = self
61 .expected_metadata
62 .as_ref()
63 .unwrap_or_else(|| self.read_format.metadata());
64 metadata.column_by_name(name).map(|col| col.column_id)
65 }
66
67 fn compat_default_value(&self, column: &str) -> Option<ArrayRef> {
69 let metadata = self.expected_metadata.as_ref()?;
70 let col_metadata = metadata.column_by_name(column)?;
71 col_metadata
72 .column_schema
73 .create_default_vector(self.row_groups.len())
74 .unwrap_or(None)
75 .map(|vector| vector.to_arrow_array())
76 }
77}
78
79impl<T: Borrow<RowGroupMetaData>> RowGroupPruningStats<'_, T> {
80 fn compat_null_count(&self, column: &str) -> Option<ArrayRef> {
82 let metadata = self.expected_metadata.as_ref()?;
83 let col_metadata = metadata.column_by_name(column)?;
84 let value = col_metadata
85 .column_schema
86 .create_default()
87 .unwrap_or(None)?;
88 let values = self.row_groups.iter().map(|meta| {
89 if value.is_null() {
90 u64::try_from(meta.borrow().num_rows()).ok()
91 } else {
92 Some(0)
93 }
94 });
95 Some(Arc::new(UInt64Array::from_iter(values)))
96 }
97}
98
99impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
100 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
101 let column_id = self.column_id_to_prune(&column.name)?;
102 match self.read_format.min_values(self.row_groups, column_id) {
103 StatValues::Values(values) => Some(values),
104 StatValues::NoColumn => self.compat_default_value(&column.name),
105 StatValues::NoStats => None,
106 }
107 }
108
109 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
110 let column_id = self.column_id_to_prune(&column.name)?;
111 match self.read_format.max_values(self.row_groups, column_id) {
112 StatValues::Values(values) => Some(values),
113 StatValues::NoColumn => self.compat_default_value(&column.name),
114 StatValues::NoStats => None,
115 }
116 }
117
118 fn num_containers(&self) -> usize {
119 self.row_groups.len()
120 }
121
122 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
123 let column_id = self.column_id_to_prune(&column.name)?;
124 match self.read_format.null_counts(self.row_groups, column_id) {
125 StatValues::Values(values) => Some(values),
126 StatValues::NoColumn => self.compat_null_count(&column.name),
127 StatValues::NoStats => None,
128 }
129 }
130
131 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
132 None
134 }
135
136 fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
137 None
139 }
140}