Skip to main content

mito2/read/
read_columns.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::mem;
17
18use datafusion_common::HashMap;
19use datafusion_expr::utils::expr_to_columns;
20use snafu::OptionExt;
21use store_api::metadata::RegionMetadataRef;
22use store_api::storage::{ColumnId, NestedPath, ProjectionInput};
23
24use crate::error::{InvalidRequestSnafu, Result};
25use crate::read::scan_region::PredicateGroup;
26
27/// Logical columns to read from a region.
28///
29/// Read columns describe which logical columns and nested fields should be read
30/// from storage. Each read column is identified by its [`ColumnId`],
31/// which represents the root column in the storage schema.
32///
33/// Nested fields under the column are specified by [`NestedPath`] entries.
34/// Each path includes the root column name as its first element.
35///
36/// For example, assume column id `9` corresponds to a root column named `j`
37/// with nested fields:
38///
39/// ```text
40/// j
41/// ├── a
42/// └── b
43///     └── c
44/// ```
45///
46/// The following SQL:
47///
48/// SELECT j.a, j.b.c FROM t
49///
50/// may produce read columns like:
51///
52/// ```text
53/// ReadColumn {
54///     column_id: 9,
55///     nested_paths: [
56///         ["j", "a"],
57///         ["j", "b", "c"],
58///     ]
59/// }
60/// ```
61///
62/// If `nested_paths` is empty, the whole column will be read.
63#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
64pub struct ReadColumns {
65    pub cols: Vec<ReadColumn>,
66}
67
68impl ReadColumns {
69    pub fn from_deduped_column_ids<I>(column_ids: I) -> Self
70    where
71        I: IntoIterator<Item = ColumnId>,
72    {
73        let cols = column_ids
74            .into_iter()
75            .map(|col_id| ReadColumn::new(col_id, vec![]))
76            .collect();
77        ReadColumns { cols }
78    }
79
80    pub fn is_empty(&self) -> bool {
81        self.cols.is_empty()
82    }
83
84    pub fn column_ids_iter(&self) -> impl Iterator<Item = ColumnId> + '_ {
85        self.cols.iter().map(|column| column.column_id)
86    }
87
88    pub fn column_ids(&self) -> Vec<ColumnId> {
89        self.column_ids_iter().collect()
90    }
91
92    pub fn columns(&self) -> &[ReadColumn] {
93        &self.cols
94    }
95
96    pub fn estimated_size(&self) -> usize {
97        self.cols.capacity() * mem::size_of::<ReadColumn>()
98            + self
99                .cols
100                .iter()
101                .map(ReadColumn::estimated_size)
102                .sum::<usize>()
103    }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash)]
107pub struct ReadColumn {
108    pub column_id: ColumnId,
109    /// Nested field paths under this column.
110    /// Empty means reading the whole column.
111    pub nested_paths: Vec<NestedPath>,
112}
113
114impl ReadColumn {
115    pub fn new(column_id: ColumnId, nested_paths: Vec<NestedPath>) -> Self {
116        Self {
117            column_id,
118            nested_paths,
119        }
120    }
121
122    pub fn nested_paths(&self) -> &[NestedPath] {
123        &self.nested_paths
124    }
125
126    pub fn estimated_size(&self) -> usize {
127        mem::size_of::<ColumnId>()
128            + self.nested_paths.capacity() * mem::size_of::<NestedPath>()
129            + self
130                .nested_paths
131                .iter()
132                .map(|path| {
133                    path.capacity() * mem::size_of::<String>()
134                        + path.iter().map(|node| node.capacity()).sum::<usize>()
135                })
136                .sum::<usize>()
137    }
138}
139
140pub fn merge(a: ReadColumns, b: ReadColumns) -> ReadColumns {
141    let mut merged = BTreeMap::<ColumnId, Vec<NestedPath>>::new();
142
143    for col in a.cols.into_iter().chain(b.cols) {
144        if let Some(nested_paths) = merged.get_mut(&col.column_id) {
145            if nested_paths.is_empty() || col.nested_paths.is_empty() {
146                *nested_paths = vec![];
147            } else {
148                merge_nested_paths(nested_paths, col.nested_paths);
149            }
150            continue;
151        }
152
153        merged.insert(col.column_id, normalize_nested_paths(col.nested_paths));
154    }
155
156    ReadColumns {
157        cols: merged
158            .into_iter()
159            .map(|(column_id, nested_paths)| ReadColumn {
160                column_id,
161                nested_paths,
162            })
163            .collect(),
164    }
165}
166
167fn normalize_nested_paths(nested_paths: Vec<NestedPath>) -> Vec<NestedPath> {
168    let mut normalized = Vec::with_capacity(nested_paths.len());
169    merge_nested_paths(&mut normalized, nested_paths);
170    normalized
171}
172
173fn merge_nested_paths(merged: &mut Vec<NestedPath>, incoming: Vec<NestedPath>) {
174    for path in incoming {
175        if merged
176            .iter()
177            .any(|existing| path.starts_with(existing.as_slice()))
178        {
179            continue;
180        }
181
182        merged.retain(|existing| !existing.starts_with(path.as_slice()));
183        merged.push(path);
184    }
185}
186
187/// Build [`ReadColumns`] from [`ProjectionInput`].
188///
189/// Note: If `projection.projection` is empty, this function still reads the
190/// time index column so the scan can preserve row counts for empty-output
191/// queries such as `SELECT COUNT(*)`.
192///
193/// Order:
194/// - This function keeps the first-seen order from `projection.projection`
195///   (duplicate indices are skipped).
196/// - Keeping a stable order makes [`ReadColumns`] comparisons deterministic
197///   (`Eq`/`Hash`) and avoids cache-key instability in upper layers.
198pub fn read_columns_from_projection(
199    projection: ProjectionInput,
200    metadata: &RegionMetadataRef,
201) -> Result<ReadColumns> {
202    let root_indices = if projection.projection.is_empty() {
203        vec![metadata.time_index_column_pos()]
204    } else {
205        projection.projection
206    };
207
208    let mut paths_by_col: HashMap<String, Vec<NestedPath>> =
209        HashMap::with_capacity(projection.nested_paths.len());
210    for path in projection.nested_paths {
211        let Some((root_name, _)) = path.split_first() else {
212            continue;
213        };
214        paths_by_col
215            .entry(root_name.clone())
216            .or_default()
217            .push(path);
218    }
219
220    let mut read_cols = Vec::with_capacity(root_indices.len());
221    let mut seen = HashSet::with_capacity(root_indices.len());
222    for root_idx in root_indices {
223        if !seen.insert(root_idx) {
224            continue;
225        }
226
227        let col = metadata
228            .column_metadatas
229            .get(root_idx)
230            .with_context(|| InvalidRequestSnafu {
231                region_id: metadata.region_id,
232                reason: format!("projection index {} is out of bounds", root_idx),
233            })?;
234        let col_id = col.column_id;
235
236        let nested_paths = paths_by_col
237            .remove(&col.column_schema.name)
238            .unwrap_or_default();
239
240        read_cols.push(ReadColumn {
241            column_id: col_id,
242            nested_paths,
243        });
244    }
245
246    Ok(ReadColumns { cols: read_cols })
247}
248
249/// Build [`ReadColumns`] from [`PredicateGroup`].
250///
251/// Order:
252/// - This function follows `metadata.column_metadatas` order when materializing
253///   columns from predicate-referenced names.
254/// - Using metadata order keeps the output deterministic for [`ReadColumns`]
255///   equality/hash checks and for cache keys derived from read columns.
256pub fn read_columns_from_predicate(
257    predicate: &PredicateGroup,
258    metadata: &RegionMetadataRef,
259) -> ReadColumns {
260    let mut root_names = HashSet::new();
261    let mut columns = HashSet::new();
262
263    if let Some(p) = predicate.predicate_without_region() {
264        for expr in p.exprs() {
265            columns.clear();
266            if expr_to_columns(expr, &mut columns).is_err() {
267                continue;
268            }
269            root_names.extend(columns.drain().map(|column| column.name));
270        }
271    }
272
273    if let Some(expr) = predicate.region_partition_expr() {
274        expr.collect_column_names(&mut root_names);
275    }
276
277    // TODO(fys): Parse nested paths from predicate expressions and attach them
278    // to read columns instead of always reading the whole root column.
279    let mut cols = Vec::with_capacity(root_names.len());
280    for column in &metadata.column_metadatas {
281        if root_names.contains(&column.column_schema.name) {
282            cols.push(ReadColumn::new(column.column_id, vec![]));
283        }
284    }
285
286    ReadColumns { cols }
287}
288
289#[cfg(test)]
290mod tests {
291    use std::sync::Arc;
292
293    use api::v1::SemanticType;
294    use datafusion_expr::{col, lit};
295    use datatypes::prelude::ConcreteDataType;
296    use datatypes::schema::ColumnSchema;
297    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
298    use store_api::storage::RegionId;
299
300    use super::*;
301
302    #[test]
303    fn test_read_columns_from_empty_projection() {
304        let metadata = new_test_metadata();
305
306        let read_columns =
307            read_columns_from_projection(ProjectionInput::default(), &metadata).unwrap();
308
309        let expected = ReadColumns {
310            cols: vec![ReadColumn::new(2, vec![])],
311        };
312        assert_eq!(expected, read_columns);
313
314        let projection_input =
315            ProjectionInput::new(vec![]).with_nested_paths(vec![vec!["1".to_string()]]);
316        let read_columns = read_columns_from_projection(projection_input, &metadata).unwrap();
317
318        let expected = ReadColumns {
319            cols: vec![ReadColumn::new(2, vec![])],
320        };
321        assert_eq!(expected, read_columns);
322    }
323
324    #[test]
325    fn test_read_columns_from_projection_with_nested_paths() {
326        let metadata = new_test_metadata();
327        let projection = ProjectionInput::new(vec![1, 0]).with_nested_paths(vec![
328            nested_path(&["field_0", "a"]),
329            nested_path(&["field_0", "b", "c"]),
330        ]);
331
332        let read_columns = read_columns_from_projection(projection, &metadata).unwrap();
333
334        let expected = ReadColumns {
335            cols: vec![
336                ReadColumn::new(
337                    3,
338                    vec![
339                        nested_path(&["field_0", "a"]),
340                        nested_path(&["field_0", "b", "c"]),
341                    ],
342                ),
343                ReadColumn::new(0, vec![]),
344            ],
345        };
346        assert_eq!(expected, read_columns,);
347    }
348
349    #[test]
350    fn test_read_columns_from_projection_dedups_duplicate_indices() {
351        let metadata = new_test_metadata();
352        let projection = ProjectionInput::new(vec![1, 1, 0]).with_nested_paths(vec![
353            nested_path(&["field_0", "a"]),
354            nested_path(&["field_0", "b", "c"]),
355        ]);
356
357        let read_columns = read_columns_from_projection(projection, &metadata).unwrap();
358
359        let expected = ReadColumns {
360            cols: vec![
361                ReadColumn::new(
362                    3,
363                    vec![
364                        nested_path(&["field_0", "a"]),
365                        nested_path(&["field_0", "b", "c"]),
366                    ],
367                ),
368                ReadColumn::new(0, vec![]),
369            ],
370        };
371        assert_eq!(expected, read_columns);
372    }
373
374    #[test]
375    fn test_read_columns_from_projection_out_of_bound() {
376        let metadata = new_test_metadata();
377        let projection = ProjectionInput::new(vec![3]);
378
379        let err = read_columns_from_projection(projection, &metadata).unwrap_err();
380
381        assert!(
382            err.to_string()
383                .contains("projection index 3 is out of bound")
384        );
385    }
386
387    #[test]
388    fn test_read_columns_from_predicate_reads_root_columns_only() {
389        let metadata = new_test_metadata();
390        let predicate = PredicateGroup::new(
391            metadata.as_ref(),
392            &[col("field_0").gt(lit(1)), col("tag_0").eq(lit("a"))],
393        )
394        .unwrap();
395
396        let read_columns = read_columns_from_predicate(&predicate, &metadata);
397
398        let expected = ReadColumns {
399            cols: vec![ReadColumn::new(0, vec![]), ReadColumn::new(3, vec![])],
400        };
401        assert_eq!(expected, read_columns);
402    }
403
404    #[test]
405    fn test_read_columns_from_predicate_empty() {
406        let metadata = new_test_metadata();
407        let predicate = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
408
409        let read_columns = read_columns_from_predicate(&predicate, &metadata);
410
411        assert!(read_columns.is_empty());
412    }
413
414    #[test]
415    fn test_merge_read_cols_with_only_root() {
416        let a = ReadColumns {
417            cols: vec![ReadColumn::new(3, vec![]), ReadColumn::new(1, vec![])],
418        };
419        let b = ReadColumns {
420            cols: vec![ReadColumn::new(2, vec![])],
421        };
422
423        let merged = merge(a, b);
424
425        assert_eq!(
426            merged,
427            ReadColumns {
428                cols: vec![
429                    ReadColumn::new(1, vec![]),
430                    ReadColumn::new(2, vec![]),
431                    ReadColumn::new(3, vec![]),
432                ],
433            }
434        );
435    }
436
437    #[test]
438    fn test_merge_read_cols_with_nested_paths() {
439        let a = ReadColumns {
440            cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
441        };
442        let b = ReadColumns {
443            cols: vec![ReadColumn::new(
444                1,
445                vec![nested_path(&["j", "b"]), nested_path(&["j", "c"])],
446            )],
447        };
448
449        let merged = merge(a, b);
450
451        assert_eq!(
452            merged,
453            ReadColumns {
454                cols: vec![ReadColumn::new(
455                    1,
456                    vec![
457                        nested_path(&["j", "a"]),
458                        nested_path(&["j", "b"]),
459                        nested_path(&["j", "c"]),
460                    ],
461                )],
462            }
463        );
464    }
465
466    #[test]
467    fn test_merge_read_cols_with_column_override() {
468        let a = ReadColumns {
469            cols: vec![
470                ReadColumn::new(1, vec![nested_path(&["j", "a"])]),
471                ReadColumn::new(2, vec![nested_path(&["k", "b"])]),
472            ],
473        };
474        let b = ReadColumns {
475            cols: vec![
476                ReadColumn::new(1, vec![]),
477                ReadColumn::new(2, vec![nested_path(&["k", "b", "c"])]),
478            ],
479        };
480
481        let merged = merge(a, b);
482
483        assert_eq!(
484            merged,
485            ReadColumns {
486                cols: vec![
487                    ReadColumn::new(1, vec![]),
488                    ReadColumn::new(2, vec![nested_path(&["k", "b"])])
489                ],
490            }
491        );
492    }
493
494    #[test]
495    fn test_merge_read_cols_dedups_redundant_nested_paths() {
496        let a = ReadColumns {
497            cols: vec![ReadColumn::new(
498                1,
499                vec![
500                    nested_path(&["j", "a", "b"]),
501                    nested_path(&["j", "a"]),
502                    nested_path(&["j", "a", "b", "c"]),
503                ],
504            )],
505        };
506        let b = ReadColumns {
507            cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
508        };
509
510        let merged = merge(a, b);
511
512        assert_eq!(
513            merged,
514            ReadColumns {
515                cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
516            }
517        );
518    }
519
520    fn new_test_metadata() -> RegionMetadataRef {
521        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
522        builder
523            .push_column_metadata(ColumnMetadata {
524                column_schema: ColumnSchema::new(
525                    "tag_0".to_string(),
526                    ConcreteDataType::string_datatype(),
527                    true,
528                ),
529                semantic_type: SemanticType::Tag,
530                column_id: 0,
531            })
532            .push_column_metadata(ColumnMetadata {
533                column_schema: ColumnSchema::new(
534                    "field_0".to_string(),
535                    ConcreteDataType::string_datatype(),
536                    true,
537                ),
538                semantic_type: SemanticType::Field,
539                column_id: 3,
540            })
541            .push_column_metadata(ColumnMetadata {
542                column_schema: ColumnSchema::new(
543                    "ts".to_string(),
544                    ConcreteDataType::timestamp_millisecond_datatype(),
545                    false,
546                ),
547                semantic_type: SemanticType::Timestamp,
548                column_id: 2,
549            });
550        builder.primary_key(vec![0]);
551        Arc::new(builder.build().unwrap())
552    }
553
554    fn nested_path(parts: &[&str]) -> NestedPath {
555        parts.iter().map(|part| (*part).to_string()).collect()
556    }
557}