1use std::collections::{HashMap, HashSet};
16
17use parquet::arrow::ProjectionMask;
18use parquet::schema::types::SchemaDescriptor;
19
20pub type ParquetNestedPath = Vec<String>;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct ParquetReadColumns {
26 root_indices: Vec<usize>,
32 cols: Vec<ParquetReadColumn>,
33 has_nested: bool,
34}
35
36impl ParquetReadColumns {
37 pub fn from_deduped(cols: Vec<ParquetReadColumn>) -> Self {
44 let has_nested = cols.iter().any(|col| !col.nested_paths.is_empty());
45 let root_indices = cols.iter().map(|col| col.root_index).collect();
46 Self {
47 root_indices,
48 cols,
49 has_nested,
50 }
51 }
52
53 pub fn from_deduped_root_indices(root_indices: impl IntoIterator<Item = usize>) -> Self {
58 let root_indices = root_indices.into_iter().collect::<Vec<_>>();
59 let cols = root_indices
60 .iter()
61 .copied()
62 .map(ParquetReadColumn::new)
63 .collect();
64 Self {
65 root_indices,
66 cols,
67 has_nested: false,
68 }
69 }
70
71 pub fn columns(&self) -> &[ParquetReadColumn] {
72 &self.cols
73 }
74
75 pub fn has_nested(&self) -> bool {
76 self.has_nested
77 }
78
79 pub fn root_indices_iter(&self) -> impl Iterator<Item = usize> + '_ {
80 self.root_indices.iter().copied()
81 }
82
83 pub fn root_indices(&self) -> &[usize] {
85 &self.root_indices
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct ParquetReadColumn {
102 root_index: usize,
104 nested_paths: Vec<ParquetNestedPath>,
111}
112
113impl ParquetReadColumn {
114 pub fn new(root_index: usize) -> Self {
115 Self {
116 root_index,
117 nested_paths: vec![],
118 }
119 }
120
121 pub fn with_nested_paths(self, nested_paths: Vec<ParquetNestedPath>) -> Self {
122 Self {
123 nested_paths,
124 ..self
125 }
126 }
127
128 pub fn merge_nested_paths(&mut self, nested_paths: Vec<ParquetNestedPath>) {
130 let reads_whole_root = self.nested_paths.is_empty() || nested_paths.is_empty();
131 if reads_whole_root {
132 self.nested_paths = vec![];
134 } else {
135 self.nested_paths.extend(nested_paths);
136 }
137 }
138
139 pub fn root_index(&self) -> usize {
140 self.root_index
141 }
142
143 pub fn nested_paths(&self) -> &[ParquetNestedPath] {
144 &self.nested_paths
145 }
146}
147
148#[derive(Clone)]
150pub struct ProjectionMaskPlan {
151 pub mask: ProjectionMask,
153 pub projected_root_presence: Vec<bool>,
165}
166
167pub fn build_projection_plan(
184 parquet_read_cols: &ParquetReadColumns,
185 parquet_schema_desc: &SchemaDescriptor,
186) -> ProjectionMaskPlan {
187 if !parquet_read_cols.has_nested() {
188 let mask =
189 ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter());
190 return ProjectionMaskPlan {
191 mask,
192 projected_root_presence: vec![true; parquet_read_cols.columns().len()],
193 };
194 }
195
196 let (leaf_indices, matched_roots) =
197 build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols);
198
199 let projected_root_presence = parquet_read_cols
200 .columns()
201 .iter()
202 .map(|col| matched_roots.contains(&col.root_index()))
203 .collect();
204
205 let mask = ProjectionMask::leaves(parquet_schema_desc, leaf_indices);
206 ProjectionMaskPlan {
207 mask,
208 projected_root_presence,
209 }
210}
211
212fn build_parquet_leaves_indices(
219 parquet_schema_desc: &SchemaDescriptor,
220 projection: &ParquetReadColumns,
221) -> (Vec<usize>, HashSet<usize>) {
222 let mut map = HashMap::with_capacity(projection.cols.len());
223 for col in &projection.cols {
224 map.insert(col.root_index, &col.nested_paths);
225 }
226
227 let mut leaf_indices = Vec::new();
228 let mut matched_roots = HashSet::with_capacity(projection.cols.len());
229 for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() {
230 let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx);
231 let Some(nested_paths) = map.get(&root_idx) else {
232 continue;
233 };
234 if nested_paths.is_empty() {
235 leaf_indices.push(leaf_idx);
236 matched_roots.insert(root_idx);
237 continue;
238 }
239
240 let leaf_path = leaf_col.path().parts();
241 if nested_paths
242 .iter()
243 .any(|nested_path| leaf_path.starts_with(nested_path))
244 {
245 leaf_indices.push(leaf_idx);
246 matched_roots.insert(root_idx);
247 }
248 }
249 (leaf_indices, matched_roots)
250}
251
252#[cfg(test)]
253mod tests {
254 use std::sync::Arc;
255
256 use parquet::basic::Repetition;
257 use parquet::schema::types::Type;
258
259 use super::*;
260
261 #[test]
262 fn test_build_projection_mask_without_nested_paths() {
263 let parquet_schema_desc = build_test_nested_parquet_schema();
264 let projection = ParquetReadColumns::from_deduped_root_indices([0, 1]);
265
266 let plan = build_projection_plan(&projection, &parquet_schema_desc);
267
268 assert_eq!(vec![true, true], plan.projected_root_presence);
269 assert_eq!(
270 ProjectionMask::roots(&parquet_schema_desc, [0, 1]),
271 plan.mask
272 );
273 }
274
275 #[test]
276 fn test_reads_whole_root() {
277 let parquet_schema_desc = build_test_nested_parquet_schema();
278
279 let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
280 root_index: 0,
281 nested_paths: vec![],
282 }]);
283
284 let (leaf_indices, matched_roots) =
285 build_parquet_leaves_indices(&parquet_schema_desc, &projection);
286 assert_eq!(vec![0, 1, 2], leaf_indices);
287 assert_eq!(HashSet::from([0]), matched_roots);
288 }
289
290 #[test]
291 fn test_filters_nested_paths() {
292 let parquet_schema_desc = build_test_nested_parquet_schema();
293
294 let projection = ParquetReadColumns::from_deduped(vec![
295 ParquetReadColumn {
296 root_index: 0,
297 nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
298 },
299 ParquetReadColumn {
300 root_index: 1,
301 nested_paths: vec![],
302 },
303 ]);
304
305 let (leaf_indices, matched_roots) =
306 build_parquet_leaves_indices(&parquet_schema_desc, &projection);
307 assert_eq!(vec![1, 2, 3], leaf_indices);
308 assert_eq!(HashSet::from([0, 1]), matched_roots);
309 }
310
311 #[test]
312 fn test_reads_middle_level_path() {
313 let parquet_schema_desc = build_test_nested_parquet_schema();
314
315 let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
316 root_index: 0,
317 nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
318 }]);
319
320 let (leaf_indices, matched_roots) =
321 build_parquet_leaves_indices(&parquet_schema_desc, &projection);
322 assert_eq!(vec![1, 2], leaf_indices);
323 assert_eq!(HashSet::from([0]), matched_roots);
324 }
325
326 #[test]
327 fn test_reads_leaf_level_path() {
328 let parquet_schema_desc = build_test_nested_parquet_schema();
329
330 let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
331 root_index: 0,
332 nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
333 }]);
334
335 let (leaf_indices, matched_roots) =
336 build_parquet_leaves_indices(&parquet_schema_desc, &projection);
337 assert_eq!(vec![1], leaf_indices);
338 assert_eq!(HashSet::from([0]), matched_roots);
339 }
340
341 #[test]
342 fn test_build_projection_mask_with_unmatched_roots() {
343 let parquet_schema_desc = build_test_nested_parquet_schema();
344
345 let projection = ParquetReadColumns::from_deduped(vec![
346 ParquetReadColumn {
347 root_index: 0,
348 nested_paths: vec![vec!["j".to_string(), "missing".to_string()]],
349 },
350 ParquetReadColumn {
351 root_index: 1,
352 nested_paths: vec![],
353 },
354 ]);
355
356 let plan = build_projection_plan(&projection, &parquet_schema_desc);
357
358 assert_eq!(vec![false, true], plan.projected_root_presence);
359 assert_eq!(
360 ProjectionMask::leaves(&parquet_schema_desc, vec![3]),
361 plan.mask
362 );
363 }
364
365 #[test]
366 fn test_merges_mixed_paths() {
367 let parquet_schema_desc = build_test_nested_parquet_schema();
368
369 let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
370 root_index: 0,
371 nested_paths: vec![
372 vec!["j".to_string(), "a".to_string()],
373 vec!["j".to_string(), "b".to_string(), "d".to_string()],
374 ],
375 }]);
376
377 let (leaf_indices, matched_roots) =
378 build_parquet_leaves_indices(&parquet_schema_desc, &projection);
379 assert_eq!(vec![0, 2], leaf_indices);
380 assert_eq!(HashSet::from([0]), matched_roots);
381 }
382
383 #[test]
384 fn test_merge_nested_paths_extends_paths() {
385 let mut col = ParquetReadColumn::new(0)
386 .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
387
388 col.merge_nested_paths(vec![vec!["j".to_string(), "b".to_string()]]);
389
390 assert_eq!(
391 &[
392 vec!["j".to_string(), "a".to_string()],
393 vec!["j".to_string(), "b".to_string()],
394 ],
395 col.nested_paths()
396 );
397 }
398
399 #[test]
400 fn test_merge_nested_paths_with_whole_root() {
401 let mut col = ParquetReadColumn::new(0)
402 .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
403
404 col.merge_nested_paths(vec![]);
405
406 assert!(col.nested_paths().is_empty());
407 }
408
409 fn build_test_nested_parquet_schema() -> SchemaDescriptor {
418 let leaf_a = Arc::new(
419 Type::primitive_type_builder("a", parquet::basic::Type::INT64)
420 .with_repetition(Repetition::REQUIRED)
421 .build()
422 .unwrap(),
423 );
424 let leaf_c = Arc::new(
425 Type::primitive_type_builder("c", parquet::basic::Type::INT64)
426 .with_repetition(Repetition::REQUIRED)
427 .build()
428 .unwrap(),
429 );
430 let leaf_d = Arc::new(
431 Type::primitive_type_builder("d", parquet::basic::Type::INT64)
432 .with_repetition(Repetition::REQUIRED)
433 .build()
434 .unwrap(),
435 );
436 let group_b = Arc::new(
437 Type::group_type_builder("b")
438 .with_repetition(Repetition::REQUIRED)
439 .with_fields(vec![leaf_c, leaf_d])
440 .build()
441 .unwrap(),
442 );
443 let root_j = Arc::new(
444 Type::group_type_builder("j")
445 .with_repetition(Repetition::REQUIRED)
446 .with_fields(vec![leaf_a, group_b])
447 .build()
448 .unwrap(),
449 );
450 let root_k = Arc::new(
451 Type::primitive_type_builder("k", parquet::basic::Type::INT64)
452 .with_repetition(Repetition::REQUIRED)
453 .build()
454 .unwrap(),
455 );
456 let schema = Arc::new(
457 Type::group_type_builder("schema")
458 .with_fields(vec![root_j, root_k])
459 .build()
460 .unwrap(),
461 );
462
463 SchemaDescriptor::new(schema)
464 }
465}