1use std::collections::{BTreeMap, HashSet};
16use std::mem;
17
18use datafusion_common::HashMap;
19use datafusion_expr::utils::expr_to_columns;
20use snafu::OptionExt;
21use store_api::metadata::RegionMetadataRef;
22use store_api::storage::{ColumnId, NestedPath, ProjectionInput};
23
24use crate::error::{InvalidRequestSnafu, Result};
25use crate::read::scan_region::PredicateGroup;
26
27#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
64pub struct ReadColumns {
65 pub cols: Vec<ReadColumn>,
66}
67
68impl ReadColumns {
69 pub fn from_deduped_column_ids<I>(column_ids: I) -> Self
70 where
71 I: IntoIterator<Item = ColumnId>,
72 {
73 let cols = column_ids
74 .into_iter()
75 .map(|col_id| ReadColumn::new(col_id, vec![]))
76 .collect();
77 ReadColumns { cols }
78 }
79
80 pub fn is_empty(&self) -> bool {
81 self.cols.is_empty()
82 }
83
84 pub fn column_ids_iter(&self) -> impl Iterator<Item = ColumnId> + '_ {
85 self.cols.iter().map(|column| column.column_id)
86 }
87
88 pub fn column_ids(&self) -> Vec<ColumnId> {
89 self.column_ids_iter().collect()
90 }
91
92 pub fn columns(&self) -> &[ReadColumn] {
93 &self.cols
94 }
95
96 pub fn estimated_size(&self) -> usize {
97 self.cols.capacity() * mem::size_of::<ReadColumn>()
98 + self
99 .cols
100 .iter()
101 .map(ReadColumn::estimated_size)
102 .sum::<usize>()
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash)]
107pub struct ReadColumn {
108 pub column_id: ColumnId,
109 pub nested_paths: Vec<NestedPath>,
112}
113
114impl ReadColumn {
115 pub fn new(column_id: ColumnId, nested_paths: Vec<NestedPath>) -> Self {
116 Self {
117 column_id,
118 nested_paths,
119 }
120 }
121
122 pub fn nested_paths(&self) -> &[NestedPath] {
123 &self.nested_paths
124 }
125
126 pub fn estimated_size(&self) -> usize {
127 mem::size_of::<ColumnId>()
128 + self.nested_paths.capacity() * mem::size_of::<NestedPath>()
129 + self
130 .nested_paths
131 .iter()
132 .map(|path| {
133 path.capacity() * mem::size_of::<String>()
134 + path.iter().map(|node| node.capacity()).sum::<usize>()
135 })
136 .sum::<usize>()
137 }
138}
139
140pub fn merge(a: ReadColumns, b: ReadColumns) -> ReadColumns {
141 let mut merged = BTreeMap::<ColumnId, Vec<NestedPath>>::new();
142
143 for col in a.cols.into_iter().chain(b.cols) {
144 if let Some(nested_paths) = merged.get_mut(&col.column_id) {
145 if nested_paths.is_empty() || col.nested_paths.is_empty() {
146 *nested_paths = vec![];
147 } else {
148 merge_nested_paths(nested_paths, col.nested_paths);
149 }
150 continue;
151 }
152
153 merged.insert(col.column_id, normalize_nested_paths(col.nested_paths));
154 }
155
156 ReadColumns {
157 cols: merged
158 .into_iter()
159 .map(|(column_id, nested_paths)| ReadColumn {
160 column_id,
161 nested_paths,
162 })
163 .collect(),
164 }
165}
166
167fn normalize_nested_paths(nested_paths: Vec<NestedPath>) -> Vec<NestedPath> {
168 let mut normalized = Vec::with_capacity(nested_paths.len());
169 merge_nested_paths(&mut normalized, nested_paths);
170 normalized
171}
172
173fn merge_nested_paths(merged: &mut Vec<NestedPath>, incoming: Vec<NestedPath>) {
174 for path in incoming {
175 if merged
176 .iter()
177 .any(|existing| path.starts_with(existing.as_slice()))
178 {
179 continue;
180 }
181
182 merged.retain(|existing| !existing.starts_with(path.as_slice()));
183 merged.push(path);
184 }
185}
186
187pub fn read_columns_from_projection(
199 projection: ProjectionInput,
200 metadata: &RegionMetadataRef,
201) -> Result<ReadColumns> {
202 let root_indices = if projection.projection.is_empty() {
203 vec![metadata.time_index_column_pos()]
204 } else {
205 projection.projection
206 };
207
208 let mut paths_by_col: HashMap<String, Vec<NestedPath>> =
209 HashMap::with_capacity(projection.nested_paths.len());
210 for path in projection.nested_paths {
211 let Some((root_name, _)) = path.split_first() else {
212 continue;
213 };
214 paths_by_col
215 .entry(root_name.clone())
216 .or_default()
217 .push(path);
218 }
219
220 let mut read_cols = Vec::with_capacity(root_indices.len());
221 let mut seen = HashSet::with_capacity(root_indices.len());
222 for root_idx in root_indices {
223 if !seen.insert(root_idx) {
224 continue;
225 }
226
227 let col = metadata
228 .column_metadatas
229 .get(root_idx)
230 .with_context(|| InvalidRequestSnafu {
231 region_id: metadata.region_id,
232 reason: format!("projection index {} is out of bounds", root_idx),
233 })?;
234 let col_id = col.column_id;
235
236 let nested_paths = paths_by_col
237 .remove(&col.column_schema.name)
238 .unwrap_or_default();
239
240 read_cols.push(ReadColumn {
241 column_id: col_id,
242 nested_paths,
243 });
244 }
245
246 Ok(ReadColumns { cols: read_cols })
247}
248
249pub fn read_columns_from_predicate(
257 predicate: &PredicateGroup,
258 metadata: &RegionMetadataRef,
259) -> ReadColumns {
260 let mut root_names = HashSet::new();
261 let mut columns = HashSet::new();
262
263 if let Some(p) = predicate.predicate_without_region() {
264 for expr in p.exprs() {
265 columns.clear();
266 if expr_to_columns(expr, &mut columns).is_err() {
267 continue;
268 }
269 root_names.extend(columns.drain().map(|column| column.name));
270 }
271 }
272
273 if let Some(expr) = predicate.region_partition_expr() {
274 expr.collect_column_names(&mut root_names);
275 }
276
277 let mut cols = Vec::with_capacity(root_names.len());
280 for column in &metadata.column_metadatas {
281 if root_names.contains(&column.column_schema.name) {
282 cols.push(ReadColumn::new(column.column_id, vec![]));
283 }
284 }
285
286 ReadColumns { cols }
287}
288
289#[cfg(test)]
290mod tests {
291 use std::sync::Arc;
292
293 use api::v1::SemanticType;
294 use datafusion_expr::{col, lit};
295 use datatypes::prelude::ConcreteDataType;
296 use datatypes::schema::ColumnSchema;
297 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
298 use store_api::storage::RegionId;
299
300 use super::*;
301
302 #[test]
303 fn test_read_columns_from_empty_projection() {
304 let metadata = new_test_metadata();
305
306 let read_columns =
307 read_columns_from_projection(ProjectionInput::default(), &metadata).unwrap();
308
309 let expected = ReadColumns {
310 cols: vec![ReadColumn::new(2, vec![])],
311 };
312 assert_eq!(expected, read_columns);
313
314 let projection_input =
315 ProjectionInput::new(vec![]).with_nested_paths(vec![vec!["1".to_string()]]);
316 let read_columns = read_columns_from_projection(projection_input, &metadata).unwrap();
317
318 let expected = ReadColumns {
319 cols: vec![ReadColumn::new(2, vec![])],
320 };
321 assert_eq!(expected, read_columns);
322 }
323
324 #[test]
325 fn test_read_columns_from_projection_with_nested_paths() {
326 let metadata = new_test_metadata();
327 let projection = ProjectionInput::new(vec![1, 0]).with_nested_paths(vec![
328 nested_path(&["field_0", "a"]),
329 nested_path(&["field_0", "b", "c"]),
330 ]);
331
332 let read_columns = read_columns_from_projection(projection, &metadata).unwrap();
333
334 let expected = ReadColumns {
335 cols: vec![
336 ReadColumn::new(
337 3,
338 vec![
339 nested_path(&["field_0", "a"]),
340 nested_path(&["field_0", "b", "c"]),
341 ],
342 ),
343 ReadColumn::new(0, vec![]),
344 ],
345 };
346 assert_eq!(expected, read_columns,);
347 }
348
349 #[test]
350 fn test_read_columns_from_projection_dedups_duplicate_indices() {
351 let metadata = new_test_metadata();
352 let projection = ProjectionInput::new(vec![1, 1, 0]).with_nested_paths(vec![
353 nested_path(&["field_0", "a"]),
354 nested_path(&["field_0", "b", "c"]),
355 ]);
356
357 let read_columns = read_columns_from_projection(projection, &metadata).unwrap();
358
359 let expected = ReadColumns {
360 cols: vec![
361 ReadColumn::new(
362 3,
363 vec![
364 nested_path(&["field_0", "a"]),
365 nested_path(&["field_0", "b", "c"]),
366 ],
367 ),
368 ReadColumn::new(0, vec![]),
369 ],
370 };
371 assert_eq!(expected, read_columns);
372 }
373
374 #[test]
375 fn test_read_columns_from_projection_out_of_bound() {
376 let metadata = new_test_metadata();
377 let projection = ProjectionInput::new(vec![3]);
378
379 let err = read_columns_from_projection(projection, &metadata).unwrap_err();
380
381 assert!(
382 err.to_string()
383 .contains("projection index 3 is out of bound")
384 );
385 }
386
387 #[test]
388 fn test_read_columns_from_predicate_reads_root_columns_only() {
389 let metadata = new_test_metadata();
390 let predicate = PredicateGroup::new(
391 metadata.as_ref(),
392 &[col("field_0").gt(lit(1)), col("tag_0").eq(lit("a"))],
393 )
394 .unwrap();
395
396 let read_columns = read_columns_from_predicate(&predicate, &metadata);
397
398 let expected = ReadColumns {
399 cols: vec![ReadColumn::new(0, vec![]), ReadColumn::new(3, vec![])],
400 };
401 assert_eq!(expected, read_columns);
402 }
403
404 #[test]
405 fn test_read_columns_from_predicate_empty() {
406 let metadata = new_test_metadata();
407 let predicate = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
408
409 let read_columns = read_columns_from_predicate(&predicate, &metadata);
410
411 assert!(read_columns.is_empty());
412 }
413
414 #[test]
415 fn test_merge_read_cols_with_only_root() {
416 let a = ReadColumns {
417 cols: vec![ReadColumn::new(3, vec![]), ReadColumn::new(1, vec![])],
418 };
419 let b = ReadColumns {
420 cols: vec![ReadColumn::new(2, vec![])],
421 };
422
423 let merged = merge(a, b);
424
425 assert_eq!(
426 merged,
427 ReadColumns {
428 cols: vec![
429 ReadColumn::new(1, vec![]),
430 ReadColumn::new(2, vec![]),
431 ReadColumn::new(3, vec![]),
432 ],
433 }
434 );
435 }
436
437 #[test]
438 fn test_merge_read_cols_with_nested_paths() {
439 let a = ReadColumns {
440 cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
441 };
442 let b = ReadColumns {
443 cols: vec![ReadColumn::new(
444 1,
445 vec![nested_path(&["j", "b"]), nested_path(&["j", "c"])],
446 )],
447 };
448
449 let merged = merge(a, b);
450
451 assert_eq!(
452 merged,
453 ReadColumns {
454 cols: vec![ReadColumn::new(
455 1,
456 vec![
457 nested_path(&["j", "a"]),
458 nested_path(&["j", "b"]),
459 nested_path(&["j", "c"]),
460 ],
461 )],
462 }
463 );
464 }
465
466 #[test]
467 fn test_merge_read_cols_with_column_override() {
468 let a = ReadColumns {
469 cols: vec![
470 ReadColumn::new(1, vec![nested_path(&["j", "a"])]),
471 ReadColumn::new(2, vec![nested_path(&["k", "b"])]),
472 ],
473 };
474 let b = ReadColumns {
475 cols: vec![
476 ReadColumn::new(1, vec![]),
477 ReadColumn::new(2, vec![nested_path(&["k", "b", "c"])]),
478 ],
479 };
480
481 let merged = merge(a, b);
482
483 assert_eq!(
484 merged,
485 ReadColumns {
486 cols: vec![
487 ReadColumn::new(1, vec![]),
488 ReadColumn::new(2, vec![nested_path(&["k", "b"])])
489 ],
490 }
491 );
492 }
493
494 #[test]
495 fn test_merge_read_cols_dedups_redundant_nested_paths() {
496 let a = ReadColumns {
497 cols: vec![ReadColumn::new(
498 1,
499 vec![
500 nested_path(&["j", "a", "b"]),
501 nested_path(&["j", "a"]),
502 nested_path(&["j", "a", "b", "c"]),
503 ],
504 )],
505 };
506 let b = ReadColumns {
507 cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
508 };
509
510 let merged = merge(a, b);
511
512 assert_eq!(
513 merged,
514 ReadColumns {
515 cols: vec![ReadColumn::new(1, vec![nested_path(&["j", "a"])])],
516 }
517 );
518 }
519
520 fn new_test_metadata() -> RegionMetadataRef {
521 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
522 builder
523 .push_column_metadata(ColumnMetadata {
524 column_schema: ColumnSchema::new(
525 "tag_0".to_string(),
526 ConcreteDataType::string_datatype(),
527 true,
528 ),
529 semantic_type: SemanticType::Tag,
530 column_id: 0,
531 })
532 .push_column_metadata(ColumnMetadata {
533 column_schema: ColumnSchema::new(
534 "field_0".to_string(),
535 ConcreteDataType::string_datatype(),
536 true,
537 ),
538 semantic_type: SemanticType::Field,
539 column_id: 3,
540 })
541 .push_column_metadata(ColumnMetadata {
542 column_schema: ColumnSchema::new(
543 "ts".to_string(),
544 ConcreteDataType::timestamp_millisecond_datatype(),
545 false,
546 ),
547 semantic_type: SemanticType::Timestamp,
548 column_id: 2,
549 });
550 builder.primary_key(vec![0]);
551 Arc::new(builder.build().unwrap())
552 }
553
554 fn nested_path(parts: &[&str]) -> NestedPath {
555 parts.iter().map(|part| (*part).to_string()).collect()
556 }
557}