1use std::collections::HashMap;
16
17use parquet::arrow::ProjectionMask;
18use parquet::schema::types::SchemaDescriptor;
19
20pub type ParquetNestedPath = Vec<String>;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct ParquetReadColumns {
26 cols: Vec<ParquetReadColumn>,
27 has_nested: bool,
28}
29
30impl ParquetReadColumns {
31 pub fn from_deduped_root_indices(root_indices: impl IntoIterator<Item = usize>) -> Self {
36 let cols = root_indices
37 .into_iter()
38 .map(ParquetReadColumn::new)
39 .collect();
40 Self {
41 cols,
42 has_nested: false,
43 }
44 }
45
46 pub fn columns(&self) -> &[ParquetReadColumn] {
47 &self.cols
48 }
49
50 pub fn has_nested(&self) -> bool {
51 self.has_nested
52 }
53
54 pub fn root_indices_iter(&self) -> impl Iterator<Item = usize> + '_ {
55 self.cols.iter().map(|col| col.root_index)
56 }
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct ParquetReadColumn {
72 root_index: usize,
74 nested_paths: Vec<ParquetNestedPath>,
81}
82
83impl ParquetReadColumn {
84 pub fn new(root_index: usize) -> Self {
85 Self {
86 root_index,
87 nested_paths: vec![],
88 }
89 }
90
91 pub fn with_nested_paths(self, nested_paths: Vec<ParquetNestedPath>) -> Self {
92 Self {
93 nested_paths,
94 ..self
95 }
96 }
97
98 pub fn root_index(&self) -> usize {
99 self.root_index
100 }
101
102 pub fn nested_paths(&self) -> &[ParquetNestedPath] {
103 &self.nested_paths
104 }
105}
106
107pub fn build_projection_mask(
109 parquet_read_cols: &ParquetReadColumns,
110 parquet_schema_desc: &SchemaDescriptor,
111) -> ProjectionMask {
112 if parquet_read_cols.has_nested() {
113 let leaf_indices = build_parquet_leaves_indices(parquet_schema_desc, parquet_read_cols);
114 ProjectionMask::leaves(parquet_schema_desc, leaf_indices)
115 } else {
116 ProjectionMask::roots(parquet_schema_desc, parquet_read_cols.root_indices_iter())
117 }
118}
119
120fn build_parquet_leaves_indices(
122 parquet_schema_desc: &SchemaDescriptor,
123 projection: &ParquetReadColumns,
124) -> Vec<usize> {
125 let mut map = HashMap::with_capacity(projection.cols.len());
126 for col in &projection.cols {
127 map.insert(col.root_index, &col.nested_paths);
128 }
129
130 let mut leaf_indices = Vec::new();
131 for (leaf_idx, leaf_col) in parquet_schema_desc.columns().iter().enumerate() {
132 let root_idx = parquet_schema_desc.get_column_root_idx(leaf_idx);
133 let Some(nested_paths) = map.get(&root_idx) else {
134 continue;
135 };
136 if nested_paths.is_empty() {
137 leaf_indices.push(leaf_idx);
138 continue;
139 }
140
141 let leaf_path = leaf_col.path().parts();
142 if nested_paths
143 .iter()
144 .any(|nested_path| leaf_path.starts_with(nested_path))
145 {
146 leaf_indices.push(leaf_idx);
147 }
148 }
149 leaf_indices
150}
151
152#[cfg(test)]
153mod tests {
154 use std::sync::Arc;
155
156 use parquet::basic::Repetition;
157 use parquet::schema::types::Type;
158
159 use super::*;
160
161 #[test]
162 fn test_reads_whole_root() {
163 let parquet_schema_desc = build_test_nested_parquet_schema();
164
165 let projection = ParquetReadColumns {
166 cols: vec![ParquetReadColumn {
167 root_index: 0,
168 nested_paths: vec![],
169 }],
170 has_nested: false,
171 };
172
173 assert_eq!(
174 vec![0, 1, 2],
175 build_parquet_leaves_indices(&parquet_schema_desc, &projection)
176 );
177 }
178
179 #[test]
180 fn test_filters_nested_paths() {
181 let parquet_schema_desc = build_test_nested_parquet_schema();
182
183 let projection = ParquetReadColumns {
184 cols: vec![
185 ParquetReadColumn {
186 root_index: 0,
187 nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
188 },
189 ParquetReadColumn {
190 root_index: 1,
191 nested_paths: vec![],
192 },
193 ],
194 has_nested: true,
195 };
196
197 assert_eq!(
198 vec![1, 2, 3],
199 build_parquet_leaves_indices(&parquet_schema_desc, &projection)
200 );
201 }
202
203 #[test]
204 fn test_reads_middle_level_path() {
205 let parquet_schema_desc = build_test_nested_parquet_schema();
206
207 let projection = ParquetReadColumns {
208 cols: vec![ParquetReadColumn {
209 root_index: 0,
210 nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
211 }],
212 has_nested: true,
213 };
214
215 assert_eq!(
216 vec![1, 2],
217 build_parquet_leaves_indices(&parquet_schema_desc, &projection)
218 );
219 }
220
221 #[test]
222 fn test_reads_leaf_level_path() {
223 let parquet_schema_desc = build_test_nested_parquet_schema();
224
225 let projection = ParquetReadColumns {
226 cols: vec![ParquetReadColumn {
227 root_index: 0,
228 nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
229 }],
230 has_nested: true,
231 };
232
233 assert_eq!(
234 vec![1],
235 build_parquet_leaves_indices(&parquet_schema_desc, &projection)
236 );
237 }
238
239 #[test]
240 fn test_merges_mixed_paths() {
241 let parquet_schema_desc = build_test_nested_parquet_schema();
242
243 let projection = ParquetReadColumns {
244 cols: vec![ParquetReadColumn {
245 root_index: 0,
246 nested_paths: vec![
247 vec!["j".to_string(), "a".to_string()],
248 vec!["j".to_string(), "b".to_string(), "d".to_string()],
249 ],
250 }],
251 has_nested: true,
252 };
253
254 assert_eq!(
255 vec![0, 2],
256 build_parquet_leaves_indices(&parquet_schema_desc, &projection)
257 );
258 }
259
260 fn build_test_nested_parquet_schema() -> SchemaDescriptor {
269 let leaf_a = Arc::new(
270 Type::primitive_type_builder("a", parquet::basic::Type::INT64)
271 .with_repetition(Repetition::REQUIRED)
272 .build()
273 .unwrap(),
274 );
275 let leaf_c = Arc::new(
276 Type::primitive_type_builder("c", parquet::basic::Type::INT64)
277 .with_repetition(Repetition::REQUIRED)
278 .build()
279 .unwrap(),
280 );
281 let leaf_d = Arc::new(
282 Type::primitive_type_builder("d", parquet::basic::Type::INT64)
283 .with_repetition(Repetition::REQUIRED)
284 .build()
285 .unwrap(),
286 );
287 let group_b = Arc::new(
288 Type::group_type_builder("b")
289 .with_repetition(Repetition::REQUIRED)
290 .with_fields(vec![leaf_c, leaf_d])
291 .build()
292 .unwrap(),
293 );
294 let root_j = Arc::new(
295 Type::group_type_builder("j")
296 .with_repetition(Repetition::REQUIRED)
297 .with_fields(vec![leaf_a, group_b])
298 .build()
299 .unwrap(),
300 );
301 let root_k = Arc::new(
302 Type::primitive_type_builder("k", parquet::basic::Type::INT64)
303 .with_repetition(Repetition::REQUIRED)
304 .build()
305 .unwrap(),
306 );
307 let schema = Arc::new(
308 Type::group_type_builder("schema")
309 .with_fields(vec![root_j, root_k])
310 .build()
311 .unwrap(),
312 );
313
314 SchemaDescriptor::new(schema)
315 }
316}