Skip to main content

mito2/sst/parquet/
read_columns.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use parquet::arrow::ProjectionMask;
18use parquet::schema::types::SchemaDescriptor;
19
20/// A nested field access path inside one parquet root column.
21pub type ParquetNestedPath = Vec<String>;
22
23/// The parquet columns to read.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct ParquetReadColumns {
26    cols: Vec<ParquetReadColumn>,
27    has_nested: bool,
28}
29
30impl ParquetReadColumns {
31    /// Builds root-column projections from root indices that are already
32    /// deduplicated.
33    ///
34    /// Note: this constructor does not check for duplicates.
35    pub fn from_deduped_root_indices(root_indices: impl IntoIterator<Item = usize>) -> Self {
36        let cols = root_indices
37            .into_iter()
38            .map(ParquetReadColumn::new)
39            .collect();
40        Self {
41            cols,
42            has_nested: false,
43        }
44    }
45
46    pub fn columns(&self) -> &[ParquetReadColumn] {
47        &self.cols
48    }
49
50    pub fn has_nested(&self) -> bool {
51        self.has_nested
52    }
53
54    pub fn root_indices_iter(&self) -> impl Iterator<Item = usize> + '_ {
55        self.cols.iter().map(|col| col.root_index)
56    }
57}
58
59/// Read requirement for a single parquet root column.
60///
61/// `root_index` identifies the root column in the parquet schema.
62///
63/// If `nested_paths` is empty, the whole root column is read. Otherwise, only
64/// leaves under the specified nested paths are read.
65///
66/// To construct a [`ParquetReadColumn`]:
67/// - `ParquetReadColumn::new(0)` reads the whole root column at index `0`.
68/// - `ParquetReadColumn::new(0).with_nested_paths(vec![vec!["j".into(), "b".into()]])`
69///   reads only leaves under `j.b`.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct ParquetReadColumn {
72    /// Root field index in the parquet schema.
73    root_index: usize,
74    /// Nested paths to read under this root column.
75    ///
76    /// Each path includes the root column itself. For example, for a root
77    /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`.
78    ///
79    /// If empty, the whole root column is read.
80    nested_paths: Vec<ParquetNestedPath>,
81}
82
83impl ParquetReadColumn {
84    pub fn new(root_index: usize) -> Self {
85        Self {
86            root_index,
87            nested_paths: vec![],
88        }
89    }
90
91    pub fn with_nested_paths(self, nested_paths: Vec<ParquetNestedPath>) -> Self {
92        Self {
93            nested_paths,
94            ..self
95        }
96    }
97
98    pub fn root_index(&self) -> usize {
99        self.root_index
100    }
101
102    pub fn nested_paths(&self) -> &[ParquetNestedPath] {
103        &self.nested_paths
104    }
105}
106
107/// Builds a projection mask from parquet read columns.
108pub fn build_projection_mask(
109    parquet_read_cols: &ParquetReadColumns,
110    parquet_schema_desc: &SchemaDescriptor,
111) -> ProjectionMask {
112    if parquet_read_cols.has_nested() {
113        let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols);
114        ProjectionMask::leaves(parquet_schema_desc, leaf_indices)
115    } else {
116        ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter())
117    }
118}
119
120/// Builds parquet leaf-column indices from parquet read columns.
121fn build_parquet_leaves_indices(
122    parquet_schema_desc: &SchemaDescriptor,
123    projection: &ParquetReadColumns,
124) -> Vec<usize> {
125    let mut map = HashMap::with_capacity(projection.cols.len());
126    for col in &projection.cols {
127        map.insert(col.root_index, &col.nested_paths);
128    }
129
130    let mut leaf_indices = Vec::new();
131    for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() {
132        let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx);
133        let Some(nested_paths) = map.get(&root_idx) else {
134            continue;
135        };
136        if nested_paths.is_empty() {
137            leaf_indices.push(leaf_idx);
138            continue;
139        }
140
141        let leaf_path = leaf_col.path().parts();
142        if nested_paths
143            .iter()
144            .any(|nested_path| leaf_path.starts_with(nested_path))
145        {
146            leaf_indices.push(leaf_idx);
147        }
148    }
149    leaf_indices
150}
151
152#[cfg(test)]
153mod tests {
154    use std::sync::Arc;
155
156    use parquet::basic::Repetition;
157    use parquet::schema::types::Type;
158
159    use super::*;
160
161    #[test]
162    fn test_reads_whole_root() {
163        let parquet_schema_desc = build_test_nested_parquet_schema();
164
165        let projection = ParquetReadColumns {
166            cols: vec![ParquetReadColumn {
167                root_index: 0,
168                nested_paths: vec![],
169            }],
170            has_nested: false,
171        };
172
173        assert_eq!(
174            vec![0, 1, 2],
175            build_parquet_leaves_indices(&parquet_schema_desc, &projection)
176        );
177    }
178
179    #[test]
180    fn test_filters_nested_paths() {
181        let parquet_schema_desc = build_test_nested_parquet_schema();
182
183        let projection = ParquetReadColumns {
184            cols: vec![
185                ParquetReadColumn {
186                    root_index: 0,
187                    nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
188                },
189                ParquetReadColumn {
190                    root_index: 1,
191                    nested_paths: vec![],
192                },
193            ],
194            has_nested: true,
195        };
196
197        assert_eq!(
198            vec![1, 2, 3],
199            build_parquet_leaves_indices(&parquet_schema_desc, &projection)
200        );
201    }
202
203    #[test]
204    fn test_reads_middle_level_path() {
205        let parquet_schema_desc = build_test_nested_parquet_schema();
206
207        let projection = ParquetReadColumns {
208            cols: vec![ParquetReadColumn {
209                root_index: 0,
210                nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
211            }],
212            has_nested: true,
213        };
214
215        assert_eq!(
216            vec![1, 2],
217            build_parquet_leaves_indices(&parquet_schema_desc, &projection)
218        );
219    }
220
221    #[test]
222    fn test_reads_leaf_level_path() {
223        let parquet_schema_desc = build_test_nested_parquet_schema();
224
225        let projection = ParquetReadColumns {
226            cols: vec![ParquetReadColumn {
227                root_index: 0,
228                nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
229            }],
230            has_nested: true,
231        };
232
233        assert_eq!(
234            vec![1],
235            build_parquet_leaves_indices(&parquet_schema_desc, &projection)
236        );
237    }
238
239    #[test]
240    fn test_merges_mixed_paths() {
241        let parquet_schema_desc = build_test_nested_parquet_schema();
242
243        let projection = ParquetReadColumns {
244            cols: vec![ParquetReadColumn {
245                root_index: 0,
246                nested_paths: vec![
247                    vec!["j".to_string(), "a".to_string()],
248                    vec!["j".to_string(), "b".to_string(), "d".to_string()],
249                ],
250            }],
251            has_nested: true,
252        };
253
254        assert_eq!(
255            vec![0, 2],
256            build_parquet_leaves_indices(&parquet_schema_desc, &projection)
257        );
258    }
259
260    // Test schema:
261    // schema
262    // |- j
263    // |  |- a: INT64
264    // |  `- b
265    // |     |- c: INT64
266    // |     `- d: INT64
267    // `- k: INT64
268    fn build_test_nested_parquet_schema() -> SchemaDescriptor {
269        let leaf_a = Arc::new(
270            Type::primitive_type_builder("a", parquet::basic::Type::INT64)
271                .with_repetition(Repetition::REQUIRED)
272                .build()
273                .unwrap(),
274        );
275        let leaf_c = Arc::new(
276            Type::primitive_type_builder("c", parquet::basic::Type::INT64)
277                .with_repetition(Repetition::REQUIRED)
278                .build()
279                .unwrap(),
280        );
281        let leaf_d = Arc::new(
282            Type::primitive_type_builder("d", parquet::basic::Type::INT64)
283                .with_repetition(Repetition::REQUIRED)
284                .build()
285                .unwrap(),
286        );
287        let group_b = Arc::new(
288            Type::group_type_builder("b")
289                .with_repetition(Repetition::REQUIRED)
290                .with_fields(vec![leaf_c, leaf_d])
291                .build()
292                .unwrap(),
293        );
294        let root_j = Arc::new(
295            Type::group_type_builder("j")
296                .with_repetition(Repetition::REQUIRED)
297                .with_fields(vec![leaf_a, group_b])
298                .build()
299                .unwrap(),
300        );
301        let root_k = Arc::new(
302            Type::primitive_type_builder("k", parquet::basic::Type::INT64)
303                .with_repetition(Repetition::REQUIRED)
304                .build()
305                .unwrap(),
306        );
307        let schema = Arc::new(
308            Type::group_type_builder("schema")
309                .with_fields(vec![root_j, root_k])
310                .build()
311                .unwrap(),
312        );
313
314        SchemaDescriptor::new(schema)
315    }
316}