mito2/sst/parquet/
stats.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Statistics of parquet SSTs.
16
17use std::borrow::Borrow;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use datafusion::physical_optimizer::pruning::PruningStatistics;
22use datafusion_common::{Column, ScalarValue};
23use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
24use parquet::file::metadata::RowGroupMetaData;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::ColumnId;
27
28use crate::sst::parquet::format::{ReadFormat, StatValues};
29
30/// Statistics for pruning row groups.
31pub(crate) struct RowGroupPruningStats<'a, T> {
32    /// Metadata of SST row groups.
33    row_groups: &'a [T],
34    /// Helper to read the SST.
35    read_format: &'a ReadFormat,
36    /// The metadata of the region.
37    /// It contains the schema a query expects to read. If it is not None, we use it instead
38    /// of the metadata in the SST to get the column id of a column as the SST may have
39    /// different columns.
40    expected_metadata: Option<RegionMetadataRef>,
41}
42
43impl<'a, T> RowGroupPruningStats<'a, T> {
44    /// Creates a new statistics to prune specific `row_groups`.
45    pub(crate) fn new(
46        row_groups: &'a [T],
47        read_format: &'a ReadFormat,
48        expected_metadata: Option<RegionMetadataRef>,
49    ) -> Self {
50        Self {
51            row_groups,
52            read_format,
53            expected_metadata,
54        }
55    }
56
57    /// Returns the column id of specific column name if we need to read it.
58    /// Prefers the column id in the expected metadata if it exists.
59    fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
60        let metadata = self
61            .expected_metadata
62            .as_ref()
63            .unwrap_or_else(|| self.read_format.metadata());
64        metadata.column_by_name(name).map(|col| col.column_id)
65    }
66
67    /// Returns the default value of all row groups for `column` according to the metadata.
68    fn compat_default_value(&self, column: &str) -> Option<ArrayRef> {
69        let metadata = self.expected_metadata.as_ref()?;
70        let col_metadata = metadata.column_by_name(column)?;
71        col_metadata
72            .column_schema
73            .create_default_vector(self.row_groups.len())
74            .unwrap_or(None)
75            .map(|vector| vector.to_arrow_array())
76    }
77}
78
79impl<T: Borrow<RowGroupMetaData>> RowGroupPruningStats<'_, T> {
80    /// Returns the null count of all row groups for `column` according to the metadata.
81    fn compat_null_count(&self, column: &str) -> Option<ArrayRef> {
82        let metadata = self.expected_metadata.as_ref()?;
83        let col_metadata = metadata.column_by_name(column)?;
84        let value = col_metadata
85            .column_schema
86            .create_default()
87            .unwrap_or(None)?;
88        let values = self.row_groups.iter().map(|meta| {
89            if value.is_null() {
90                u64::try_from(meta.borrow().num_rows()).ok()
91            } else {
92                Some(0)
93            }
94        });
95        Some(Arc::new(UInt64Array::from_iter(values)))
96    }
97}
98
99impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
100    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
101        let column_id = self.column_id_to_prune(&column.name)?;
102        match self.read_format.min_values(self.row_groups, column_id) {
103            StatValues::Values(values) => Some(values),
104            StatValues::NoColumn => self.compat_default_value(&column.name),
105            StatValues::NoStats => None,
106        }
107    }
108
109    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
110        let column_id = self.column_id_to_prune(&column.name)?;
111        match self.read_format.max_values(self.row_groups, column_id) {
112            StatValues::Values(values) => Some(values),
113            StatValues::NoColumn => self.compat_default_value(&column.name),
114            StatValues::NoStats => None,
115        }
116    }
117
118    fn num_containers(&self) -> usize {
119        self.row_groups.len()
120    }
121
122    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
123        let column_id = self.column_id_to_prune(&column.name)?;
124        match self.read_format.null_counts(self.row_groups, column_id) {
125            StatValues::Values(values) => Some(values),
126            StatValues::NoColumn => self.compat_null_count(&column.name),
127            StatValues::NoStats => None,
128        }
129    }
130
131    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
132        // TODO(LFC): Impl it.
133        None
134    }
135
136    fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
137        // TODO(LFC): Impl it.
138        None
139    }
140}