Skip to main content

mito2/sst/parquet/
read_columns.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use parquet::arrow::ProjectionMask;
18use parquet::schema::types::SchemaDescriptor;
19
20/// A nested field access path inside one parquet root column.
21pub type ParquetNestedPath = Vec<String>;
22
23/// The parquet columns to read.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct ParquetReadColumns {
26    /// Root parquet column indices in the same order as `cols`.
27    ///
28    /// Most readers need these indices as a borrowed slice for Arrow schema
29    /// projection or parquet root-column projection. Keeping them here avoids
30    /// repeatedly collecting `cols.iter().map(|col| col.root_index)`.
31    root_indices: Vec<usize>,
32    cols: Vec<ParquetReadColumn>,
33    has_nested: bool,
34}
35
36impl ParquetReadColumns {
37    /// Builds parquet read columns from deduplicated, normalized input.
38    ///
39    /// `cols` must not contain duplicate root indices, and nested paths must
40    /// already be merged. Empty `nested_paths` means reading the whole root column.
41    ///
42    /// This constructor does not validate or merge input.
43    pub fn from_deduped(cols: Vec<ParquetReadColumn>) -> Self {
44        let has_nested = cols.iter().any(|col| !col.nested_paths.is_empty());
45        let root_indices = cols.iter().map(|col| col.root_index).collect();
46        Self {
47            root_indices,
48            cols,
49            has_nested,
50        }
51    }
52
53    /// Builds root-column projections from root indices that are already
54    /// deduplicated.
55    ///
56    /// Note: this constructor does not check for duplicates.
57    pub fn from_deduped_root_indices(root_indices: impl IntoIterator<Item = usize>) -> Self {
58        let root_indices = root_indices.into_iter().collect::<Vec<_>>();
59        let cols = root_indices
60            .iter()
61            .copied()
62            .map(ParquetReadColumn::new)
63            .collect();
64        Self {
65            root_indices,
66            cols,
67            has_nested: false,
68        }
69    }
70
71    pub fn columns(&self) -> &[ParquetReadColumn] {
72        &self.cols
73    }
74
75    pub fn has_nested(&self) -> bool {
76        self.has_nested
77    }
78
79    pub fn root_indices_iter(&self) -> impl Iterator<Item = usize> + '_ {
80        self.root_indices.iter().copied()
81    }
82
83    /// Returns root parquet column indices.
84    pub fn root_indices(&self) -> &[usize] {
85        &self.root_indices
86    }
87}
88
89/// Read requirement for a single parquet root column.
90///
91/// `root_index` identifies the root column in the parquet schema.
92///
93/// If `nested_paths` is empty, the whole root column is read. Otherwise, only
94/// leaves under the specified nested paths are read.
95///
96/// To construct a [`ParquetReadColumn`]:
97/// - `ParquetReadColumn::new(0)` reads the whole root column at index `0`.
98/// - `ParquetReadColumn::new(0).with_nested_paths(vec![vec!["j".into(), "b".into()]])`
99///   reads only leaves under `j.b`.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct ParquetReadColumn {
102    /// Root field index in the parquet schema.
103    root_index: usize,
104    /// Nested paths to read under this root column.
105    ///
106    /// Each path includes the root column itself. For example, for a root
107    /// column `j`, path `["j", "a", "b"]` refers to `j.a.b`.
108    ///
109    /// If empty, the whole root column is read.
110    nested_paths: Vec<ParquetNestedPath>,
111}
112
113impl ParquetReadColumn {
114    pub fn new(root_index: usize) -> Self {
115        Self {
116            root_index,
117            nested_paths: vec![],
118        }
119    }
120
121    pub fn with_nested_paths(self, nested_paths: Vec<ParquetNestedPath>) -> Self {
122        Self {
123            nested_paths,
124            ..self
125        }
126    }
127
128    /// Merges additional nested paths into this root column.
129    pub fn merge_nested_paths(&mut self, nested_paths: Vec<ParquetNestedPath>) {
130        let reads_whole_root = self.nested_paths.is_empty() || nested_paths.is_empty();
131        if reads_whole_root {
132            // Empty nested paths means reading the whole root column.
133            self.nested_paths = vec![];
134        } else {
135            self.nested_paths.extend(nested_paths);
136        }
137    }
138
139    pub fn root_index(&self) -> usize {
140        self.root_index
141    }
142
143    pub fn nested_paths(&self) -> &[ParquetNestedPath] {
144        &self.nested_paths
145    }
146}
147
148/// Projection plan built for a parquet file.
149#[derive(Clone)]
150pub struct ProjectionMaskPlan {
151    /// `mask` is the projection mask applied to the parquet reader.
152    pub mask: ProjectionMask,
153    /// A boolean mask in output schema order indicating whether each
154    /// projected root column is physically present in the parquet
155    /// read result.
156    ///
157    /// - `true`: the column exists in the input `RecordBatch`.
158    /// - `false`: the column is missing (e.g., due to unmatched nested
159    ///   paths) and must be synthesized during post-processing (typically
160    ///   filled with null/default values).
161    ///
162    /// The length of `projected_root_presence` is always equal to the
163    /// number of fields in the output schema.
164    pub projected_root_presence: Vec<bool>,
165}
166
167/// Builds a projection mask plan for reading a parquet file.
168///
169/// `parquet_read_cols` defines the requested root columns and optional
170/// nested paths to read.
171///
172/// `parquet_schema_desc` is the schema descriptor of the current parquet
173/// file. It is used to resolve requested nested paths to actual leaf
174/// column indices.
175///
176/// See [`ProjectionMaskPlan`] for the returned value.
177///
178/// For example, if the query requests `j.a` and `k`, but the current
179/// parquet file only contains leaves under `j.b` and `k`, then the
180/// returned plan keeps `k` in the projection mask and marks `j` as
181/// not present in the output, so it can be synthesized during
182/// post-processing.
183pub fn build_projection_plan(
184    parquet_read_cols: &ParquetReadColumns,
185    parquet_schema_desc: &SchemaDescriptor,
186) -> ProjectionMaskPlan {
187    if !parquet_read_cols.has_nested() {
188        let mask =
189            ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter());
190        return ProjectionMaskPlan {
191            mask,
192            projected_root_presence: vec![true; parquet_read_cols.columns().len()],
193        };
194    }
195
196    let (leaf_indices, matched_roots) =
197        build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols);
198
199    let projected_root_presence = parquet_read_cols
200        .columns()
201        .iter()
202        .map(|col| matched_roots.contains(&col.root_index()))
203        .collect();
204
205    let mask = ProjectionMask::leaves(parquet_schema_desc, leaf_indices);
206    ProjectionMaskPlan {
207        mask,
208        projected_root_presence,
209    }
210}
211
212/// Builds parquet leaf-column indices for reading a parquet file.
213///
214/// Returns `(leaf_indices, matched_roots)`:
215/// - `leaf_indices`: matched parquet leaf column indices
216/// - `matched_roots`: root column indices that match at least one leaf in the
217///   current parquet schema.
218fn build_parquet_leaves_indices(
219    parquet_schema_desc: &SchemaDescriptor,
220    projection: &ParquetReadColumns,
221) -> (Vec<usize>, HashSet<usize>) {
222    let mut map = HashMap::with_capacity(projection.cols.len());
223    for col in &projection.cols {
224        map.insert(col.root_index, &col.nested_paths);
225    }
226
227    let mut leaf_indices = Vec::new();
228    let mut matched_roots = HashSet::with_capacity(projection.cols.len());
229    for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() {
230        let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx);
231        let Some(nested_paths) = map.get(&root_idx) else {
232            continue;
233        };
234        if nested_paths.is_empty() {
235            leaf_indices.push(leaf_idx);
236            matched_roots.insert(root_idx);
237            continue;
238        }
239
240        let leaf_path = leaf_col.path().parts();
241        if nested_paths
242            .iter()
243            .any(|nested_path| leaf_path.starts_with(nested_path))
244        {
245            leaf_indices.push(leaf_idx);
246            matched_roots.insert(root_idx);
247        }
248    }
249    (leaf_indices, matched_roots)
250}
251
252#[cfg(test)]
253mod tests {
254    use std::sync::Arc;
255
256    use parquet::basic::Repetition;
257    use parquet::schema::types::Type;
258
259    use super::*;
260
261    #[test]
262    fn test_build_projection_mask_without_nested_paths() {
263        let parquet_schema_desc = build_test_nested_parquet_schema();
264        let projection = ParquetReadColumns::from_deduped_root_indices([0, 1]);
265
266        let plan = build_projection_plan(&projection, &parquet_schema_desc);
267
268        assert_eq!(vec![true, true], plan.projected_root_presence);
269        assert_eq!(
270            ProjectionMask::roots(&parquet_schema_desc, [0, 1]),
271            plan.mask
272        );
273    }
274
275    #[test]
276    fn test_reads_whole_root() {
277        let parquet_schema_desc = build_test_nested_parquet_schema();
278
279        let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
280            root_index: 0,
281            nested_paths: vec![],
282        }]);
283
284        let (leaf_indices, matched_roots) =
285            build_parquet_leaves_indices(&parquet_schema_desc, &projection);
286        assert_eq!(vec![0, 1, 2], leaf_indices);
287        assert_eq!(HashSet::from([0]), matched_roots);
288    }
289
290    #[test]
291    fn test_filters_nested_paths() {
292        let parquet_schema_desc = build_test_nested_parquet_schema();
293
294        let projection = ParquetReadColumns::from_deduped(vec![
295            ParquetReadColumn {
296                root_index: 0,
297                nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
298            },
299            ParquetReadColumn {
300                root_index: 1,
301                nested_paths: vec![],
302            },
303        ]);
304
305        let (leaf_indices, matched_roots) =
306            build_parquet_leaves_indices(&parquet_schema_desc, &projection);
307        assert_eq!(vec![1, 2, 3], leaf_indices);
308        assert_eq!(HashSet::from([0, 1]), matched_roots);
309    }
310
311    #[test]
312    fn test_reads_middle_level_path() {
313        let parquet_schema_desc = build_test_nested_parquet_schema();
314
315        let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
316            root_index: 0,
317            nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
318        }]);
319
320        let (leaf_indices, matched_roots) =
321            build_parquet_leaves_indices(&parquet_schema_desc, &projection);
322        assert_eq!(vec![1, 2], leaf_indices);
323        assert_eq!(HashSet::from([0]), matched_roots);
324    }
325
326    #[test]
327    fn test_reads_leaf_level_path() {
328        let parquet_schema_desc = build_test_nested_parquet_schema();
329
330        let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
331            root_index: 0,
332            nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
333        }]);
334
335        let (leaf_indices, matched_roots) =
336            build_parquet_leaves_indices(&parquet_schema_desc, &projection);
337        assert_eq!(vec![1], leaf_indices);
338        assert_eq!(HashSet::from([0]), matched_roots);
339    }
340
341    #[test]
342    fn test_build_projection_mask_with_unmatched_roots() {
343        let parquet_schema_desc = build_test_nested_parquet_schema();
344
345        let projection = ParquetReadColumns::from_deduped(vec![
346            ParquetReadColumn {
347                root_index: 0,
348                nested_paths: vec![vec!["j".to_string(), "missing".to_string()]],
349            },
350            ParquetReadColumn {
351                root_index: 1,
352                nested_paths: vec![],
353            },
354        ]);
355
356        let plan = build_projection_plan(&projection, &parquet_schema_desc);
357
358        assert_eq!(vec![false, true], plan.projected_root_presence);
359        assert_eq!(
360            ProjectionMask::leaves(&parquet_schema_desc, vec![3]),
361            plan.mask
362        );
363    }
364
365    #[test]
366    fn test_merges_mixed_paths() {
367        let parquet_schema_desc = build_test_nested_parquet_schema();
368
369        let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
370            root_index: 0,
371            nested_paths: vec![
372                vec!["j".to_string(), "a".to_string()],
373                vec!["j".to_string(), "b".to_string(), "d".to_string()],
374            ],
375        }]);
376
377        let (leaf_indices, matched_roots) =
378            build_parquet_leaves_indices(&parquet_schema_desc, &projection);
379        assert_eq!(vec![0, 2], leaf_indices);
380        assert_eq!(HashSet::from([0]), matched_roots);
381    }
382
383    #[test]
384    fn test_merge_nested_paths_extends_paths() {
385        let mut col = ParquetReadColumn::new(0)
386            .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
387
388        col.merge_nested_paths(vec![vec!["j".to_string(), "b".to_string()]]);
389
390        assert_eq!(
391            &[
392                vec!["j".to_string(), "a".to_string()],
393                vec!["j".to_string(), "b".to_string()],
394            ],
395            col.nested_paths()
396        );
397    }
398
399    #[test]
400    fn test_merge_nested_paths_with_whole_root() {
401        let mut col = ParquetReadColumn::new(0)
402            .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
403
404        col.merge_nested_paths(vec![]);
405
406        assert!(col.nested_paths().is_empty());
407    }
408
409    // Test schema:
410    // schema
411    // |- j
412    // |  |- a: INT64
413    // |  `- b
414    // |     |- c: INT64
415    // |     `- d: INT64
416    // `- k: INT64
417    fn build_test_nested_parquet_schema() -> SchemaDescriptor {
418        let leaf_a = Arc::new(
419            Type::primitive_type_builder("a", parquet::basic::Type::INT64)
420                .with_repetition(Repetition::REQUIRED)
421                .build()
422                .unwrap(),
423        );
424        let leaf_c = Arc::new(
425            Type::primitive_type_builder("c", parquet::basic::Type::INT64)
426                .with_repetition(Repetition::REQUIRED)
427                .build()
428                .unwrap(),
429        );
430        let leaf_d = Arc::new(
431            Type::primitive_type_builder("d", parquet::basic::Type::INT64)
432                .with_repetition(Repetition::REQUIRED)
433                .build()
434                .unwrap(),
435        );
436        let group_b = Arc::new(
437            Type::group_type_builder("b")
438                .with_repetition(Repetition::REQUIRED)
439                .with_fields(vec![leaf_c, leaf_d])
440                .build()
441                .unwrap(),
442        );
443        let root_j = Arc::new(
444            Type::group_type_builder("j")
445                .with_repetition(Repetition::REQUIRED)
446                .with_fields(vec![leaf_a, group_b])
447                .build()
448                .unwrap(),
449        );
450        let root_k = Arc::new(
451            Type::primitive_type_builder("k", parquet::basic::Type::INT64)
452                .with_repetition(Repetition::REQUIRED)
453                .build()
454                .unwrap(),
455        );
456        let schema = Arc::new(
457            Type::group_type_builder("schema")
458                .with_fields(vec![root_j, root_k])
459                .build()
460                .unwrap(),
461        );
462
463        SchemaDescriptor::new(schema)
464    }
465}