Skip to main content

mito2/sst/parquet/
stats.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Statistics of parquet SSTs.
16
17use std::borrow::Borrow;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use datafusion_common::pruning::PruningStatistics;
23use datafusion_common::{Column, ScalarValue};
24use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
25use parquet::file::metadata::RowGroupMetaData;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28
29use crate::sst::parquet::flat_format::FlatReadFormat;
30use crate::sst::parquet::format::StatValues;
31
32/// Statistics for pruning row groups.
33pub(crate) struct RowGroupPruningStats<'a, T> {
34    /// Metadata of SST row groups.
35    row_groups: &'a [T],
36    /// Helper to read the SST.
37    read_format: &'a FlatReadFormat,
38    /// The metadata of the region.
39    /// It contains the schema a query expects to read. If it is not None, we use it instead
40    /// of the metadata in the SST to get the column id of a column as the SST may have
41    /// different columns.
42    expected_metadata: Option<RegionMetadataRef>,
43    /// If true, skip columns with Field semantic type during pruning.
44    skip_fields: bool,
45}
46
47impl<'a, T> RowGroupPruningStats<'a, T> {
48    /// Creates a new statistics to prune specific `row_groups`.
49    pub(crate) fn new(
50        row_groups: &'a [T],
51        read_format: &'a FlatReadFormat,
52        expected_metadata: Option<RegionMetadataRef>,
53        skip_fields: bool,
54    ) -> Self {
55        Self {
56            row_groups,
57            read_format,
58            expected_metadata,
59            skip_fields,
60        }
61    }
62
63    /// Returns the column id of specific column name if we need to read it.
64    /// Prefers the column id in the expected metadata if it exists.
65    /// Returns None if skip_fields is true and the column is a Field.
66    fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
67        let metadata = self
68            .expected_metadata
69            .as_ref()
70            .unwrap_or_else(|| self.read_format.metadata());
71        let col = metadata.column_by_name(name)?;
72
73        // Skip field columns when skip_fields is enabled
74        if self.skip_fields && col.semantic_type == SemanticType::Field {
75            return None;
76        }
77
78        Some(col.column_id)
79    }
80
81    /// Returns the default value of all row groups for `column` according to the metadata.
82    fn compat_default_value(&self, column: &str) -> Option<ArrayRef> {
83        let metadata = self.expected_metadata.as_ref()?;
84        let col_metadata = metadata.column_by_name(column)?;
85        col_metadata
86            .column_schema
87            .create_default_vector(self.row_groups.len())
88            .unwrap_or(None)
89            .map(|vector| vector.to_arrow_array())
90    }
91}
92
93impl<T: Borrow<RowGroupMetaData>> RowGroupPruningStats<'_, T> {
94    /// Returns the null count of all row groups for `column` according to the metadata.
95    fn compat_null_count(&self, column: &str) -> Option<ArrayRef> {
96        let metadata = self.expected_metadata.as_ref()?;
97        let col_metadata = metadata.column_by_name(column)?;
98        let value = col_metadata
99            .column_schema
100            .create_default()
101            .unwrap_or(None)?;
102        let values = self.row_groups.iter().map(|meta| {
103            if value.is_null() {
104                u64::try_from(meta.borrow().num_rows()).ok()
105            } else {
106                Some(0)
107            }
108        });
109        Some(Arc::new(UInt64Array::from_iter(values)))
110    }
111}
112
113impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_, T> {
114    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
115        let column_id = self.column_id_to_prune(&column.name)?;
116        match self.read_format.min_values(self.row_groups, column_id) {
117            StatValues::Values(values) => Some(values),
118            StatValues::NoColumn => self.compat_default_value(&column.name),
119            StatValues::NoStats => None,
120        }
121    }
122
123    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
124        let column_id = self.column_id_to_prune(&column.name)?;
125        match self.read_format.max_values(self.row_groups, column_id) {
126            StatValues::Values(values) => Some(values),
127            StatValues::NoColumn => self.compat_default_value(&column.name),
128            StatValues::NoStats => None,
129        }
130    }
131
132    fn num_containers(&self) -> usize {
133        self.row_groups.len()
134    }
135
136    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
137        let column_id = self.column_id_to_prune(&column.name)?;
138        match self.read_format.null_counts(self.row_groups, column_id) {
139            StatValues::Values(values) => Some(values),
140            StatValues::NoColumn => self.compat_null_count(&column.name),
141            StatValues::NoStats => None,
142        }
143    }
144
145    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
146        // TODO(LFC): Impl it.
147        None
148    }
149
150    fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
151        // TODO(LFC): Impl it.
152        None
153    }
154}