1use common_time::Timestamp;
18use smallvec::{SmallVec, smallvec};
19use store_api::region_engine::PartitionRange;
20use store_api::storage::TimeSeriesDistribution;
21
22use crate::cache::CacheStrategy;
23use crate::memtable::{MemtableRange, MemtableStats};
24use crate::read::scan_region::ScanInput;
25use crate::sst::file::{FileHandle, FileTimeRange, overlaps};
26use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
27use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
28use crate::sst::parquet::format::parquet_row_group_time_range;
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const ALL_ROW_GROUPS: i64 = -1;
32
33#[derive(Debug, Clone, Copy, PartialEq)]
35pub(crate) struct SourceIndex {
36 pub(crate) index: usize,
38 pub(crate) num_row_groups: u64,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq)]
45pub struct RowGroupIndex {
46 pub(crate) index: usize,
48 pub row_group_index: i64,
51}
52
53#[derive(Debug, PartialEq)]
57pub(crate) struct RangeMeta {
58 pub(crate) time_range: FileTimeRange,
60 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
62 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
64 pub(crate) num_rows: usize,
66}
67
68impl RangeMeta {
69 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
72 PartitionRange {
73 start: self.time_range.0,
74 end: Timestamp::new(
75 self.time_range
78 .1
79 .value()
80 .checked_add(1)
81 .unwrap_or(self.time_range.1.value()),
82 self.time_range.1.unit(),
83 ),
84 num_rows: self.num_rows,
85 identifier,
86 }
87 }
88
89 pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
92 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
93 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
94 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
95
96 #[cfg(feature = "enterprise")]
97 Self::push_extension_ranges(input, &mut ranges);
98
99 let ranges = group_ranges_for_seq_scan(ranges);
100 if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
101 return ranges;
103 }
104 maybe_split_ranges_for_seq_scan(ranges)
105 }
106
107 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
109 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
110 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
111 Self::push_unordered_file_ranges(
112 input.memtables.len(),
113 &input.files,
114 &input.cache_strategy,
115 &mut ranges,
116 );
117
118 #[cfg(feature = "enterprise")]
119 Self::push_extension_ranges(input, &mut ranges);
120
121 ranges
122 }
123
124 fn overlaps(&self, meta: &RangeMeta) -> bool {
126 overlaps(&self.time_range, &meta.time_range)
127 }
128
129 fn merge(&mut self, mut other: RangeMeta) {
132 debug_assert!(self.overlaps(&other));
133 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
134 debug_assert!(
135 self.row_group_indices
136 .iter()
137 .all(|idx| !other.row_group_indices.contains(idx))
138 );
139
140 self.time_range = (
141 self.time_range.0.min(other.time_range.0),
142 self.time_range.1.max(other.time_range.1),
143 );
144 self.indices.append(&mut other.indices);
145 self.row_group_indices.append(&mut other.row_group_indices);
146 self.num_rows += other.num_rows;
147 }
148
149 fn can_split_preserve_order(&self) -> bool {
152 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
153 }
154
155 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
157 if self.can_split_preserve_order() {
158 let num_row_groups = self.indices[0].num_row_groups;
159 debug_assert_eq!(1, self.row_group_indices.len());
160 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
161
162 output.reserve(self.row_group_indices.len());
163 let num_rows = self.num_rows / num_row_groups as usize;
164 for row_group_index in 0..num_row_groups {
166 output.push(RangeMeta {
167 time_range: self.time_range,
168 indices: self.indices.clone(),
169 row_group_indices: smallvec![RowGroupIndex {
170 index: self.indices[0].index,
171 row_group_index: row_group_index as i64,
172 }],
173 num_rows,
174 });
175 }
176 } else {
177 output.push(self);
178 }
179 }
180
181 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
182 for (memtable_index, memtable) in memtables.iter().enumerate() {
184 let stats = memtable.stats();
185 let Some(time_range) = stats.time_range() else {
186 continue;
187 };
188 for row_group_index in 0..stats.num_ranges() {
189 let num_rows = stats.num_rows() / stats.num_ranges();
190 ranges.push(RangeMeta {
191 time_range,
192 indices: smallvec![SourceIndex {
193 index: memtable_index,
194 num_row_groups: stats.num_ranges() as u64,
195 }],
196 row_group_indices: smallvec![RowGroupIndex {
197 index: memtable_index,
198 row_group_index: row_group_index as i64,
199 }],
200 num_rows,
201 });
202 }
203 }
204 }
205
206 fn push_unordered_file_ranges(
207 num_memtables: usize,
208 files: &[FileHandle],
209 cache: &CacheStrategy,
210 ranges: &mut Vec<RangeMeta>,
211 ) {
212 for (i, file) in files.iter().enumerate() {
214 let file_index = num_memtables + i;
215 let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
217 if let Some(parquet_meta) = parquet_meta {
218 for row_group_index in 0..file.meta_ref().num_row_groups {
220 let time_range = parquet_row_group_time_range(
221 file.meta_ref(),
222 &parquet_meta,
223 row_group_index as usize,
224 );
225 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
226 ranges.push(RangeMeta {
227 time_range: time_range.unwrap_or_else(|| file.time_range()),
228 indices: smallvec![SourceIndex {
229 index: file_index,
230 num_row_groups: file.meta_ref().num_row_groups,
231 }],
232 row_group_indices: smallvec![RowGroupIndex {
233 index: file_index,
234 row_group_index: row_group_index as i64,
235 }],
236 num_rows: num_rows as usize,
237 });
238 }
239 } else if file.meta_ref().num_row_groups > 0 {
240 for row_group_index in 0..file.meta_ref().num_row_groups {
242 ranges.push(RangeMeta {
243 time_range: file.time_range(),
244 indices: smallvec![SourceIndex {
245 index: file_index,
246 num_row_groups: file.meta_ref().num_row_groups,
247 }],
248 row_group_indices: smallvec![RowGroupIndex {
249 index: file_index,
250 row_group_index: row_group_index as i64,
251 }],
252 num_rows: DEFAULT_ROW_GROUP_SIZE,
253 });
254 }
255 } else {
256 ranges.push(RangeMeta {
258 time_range: file.time_range(),
259 indices: smallvec![SourceIndex {
260 index: file_index,
261 num_row_groups: 0,
262 }],
263 row_group_indices: smallvec![RowGroupIndex {
264 index: file_index,
265 row_group_index: ALL_ROW_GROUPS,
266 }],
267 num_rows: file.meta_ref().num_rows as usize,
269 });
270 }
271 }
272 }
273
274 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
275 for (i, memtable) in memtables.iter().enumerate() {
277 let stats = memtable.stats();
278 let Some(time_range) = stats.time_range() else {
279 continue;
280 };
281 ranges.push(RangeMeta {
282 time_range,
283 indices: smallvec![SourceIndex {
284 index: i,
285 num_row_groups: stats.num_ranges() as u64,
286 }],
287 row_group_indices: smallvec![RowGroupIndex {
288 index: i,
289 row_group_index: ALL_ROW_GROUPS,
290 }],
291 num_rows: stats.num_rows(),
292 });
293 }
294 }
295
296 fn push_seq_file_ranges(
297 num_memtables: usize,
298 files: &[FileHandle],
299 ranges: &mut Vec<RangeMeta>,
300 ) {
301 for (i, file) in files.iter().enumerate() {
303 let file_index = num_memtables + i;
304 ranges.push(RangeMeta {
305 time_range: file.time_range(),
306 indices: smallvec![SourceIndex {
307 index: file_index,
308 num_row_groups: file.meta_ref().num_row_groups,
309 }],
310 row_group_indices: smallvec![RowGroupIndex {
311 index: file_index,
312 row_group_index: ALL_ROW_GROUPS,
313 }],
314 num_rows: file.meta_ref().num_rows as usize,
315 });
316 }
317 }
318
319 #[cfg(feature = "enterprise")]
320 fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
321 for (i, range) in input.extension_ranges().iter().enumerate() {
322 let index = input.num_memtables() + input.num_files() + i;
323 metas.push(RangeMeta {
324 time_range: range.time_range(),
325 indices: smallvec![SourceIndex {
326 index,
327 num_row_groups: range.num_row_groups(),
328 }],
329 row_group_indices: smallvec![RowGroupIndex {
330 index,
331 row_group_index: ALL_ROW_GROUPS,
332 }],
333 num_rows: range.num_rows() as usize,
334 });
335 }
336 }
337}
338
339fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
342 if ranges.is_empty() {
343 return ranges;
344 }
345
346 ranges.sort_unstable_by(|a, b| {
348 let l = a.time_range;
349 let r = b.time_range;
350 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
351 });
352 let mut range_in_progress = None;
353 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
355 for range in ranges {
356 let Some(mut prev_range) = range_in_progress.take() else {
357 range_in_progress = Some(range);
359 continue;
360 };
361
362 if prev_range.overlaps(&range) {
363 prev_range.merge(range);
364 range_in_progress = Some(prev_range);
365 } else {
366 exclusive_ranges.push(prev_range);
367 range_in_progress = Some(range);
368 }
369 }
370 if let Some(range) = range_in_progress {
371 exclusive_ranges.push(range);
372 }
373
374 exclusive_ranges
375}
376
377fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
380 let mut new_ranges = Vec::with_capacity(ranges.len());
381 for range in ranges {
382 range.maybe_split(&mut new_ranges);
383 }
384
385 new_ranges
386}
387
388#[derive(Default)]
390pub struct FileRangeBuilder {
391 context: Option<FileRangeContextRef>,
394 selection: RowGroupSelection,
396}
397
398impl FileRangeBuilder {
399 pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
401 Self {
402 context: Some(context),
403 selection,
404 }
405 }
406
407 pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
410 let Some(context) = self.context.clone() else {
411 return;
412 };
413 if row_group_index >= 0 {
414 let row_group_index = row_group_index as usize;
415 let Some(row_selection) = self.selection.get(row_group_index) else {
417 return;
418 };
419 ranges.push(FileRange::new(
420 context,
421 row_group_index,
422 Some(row_selection.clone()),
423 ));
424 } else {
425 ranges.extend(
427 self.selection
428 .iter()
429 .map(|(row_group_index, row_selection)| {
430 FileRange::new(
431 context.clone(),
432 *row_group_index,
433 Some(row_selection.clone()),
434 )
435 }),
436 );
437 }
438 }
439
440 pub(crate) fn memory_size(&self) -> usize {
442 let context_size = self
443 .context
444 .as_ref()
445 .map(|ctx| ctx.memory_size())
446 .unwrap_or(0);
447 let selection_size = self.selection.mem_usage();
448 context_size + selection_size
449 }
450}
451
452pub(crate) struct MemRangeBuilder {
454 range: MemtableRange,
456 stats: MemtableStats,
458}
459
460impl MemRangeBuilder {
461 pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
463 Self { range, stats }
464 }
465
466 pub(crate) fn build_ranges(
469 &self,
470 _row_group_index: i64,
471 ranges: &mut SmallVec<[MemtableRange; 2]>,
472 ) {
473 ranges.push(self.range.clone())
474 }
475
476 pub(crate) fn stats(&self) -> &MemtableStats {
478 &self.stats
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use common_time::Timestamp;
485 use common_time::timestamp::TimeUnit;
486
487 use super::*;
488
489 type Output = (Vec<usize>, i64, i64);
490
491 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
492 let ranges = input
493 .iter()
494 .map(|(idx, start, end)| {
495 let time_range = (
496 Timestamp::new(*start, TimeUnit::Second),
497 Timestamp::new(*end, TimeUnit::Second),
498 );
499 RangeMeta {
500 time_range,
501 indices: smallvec![SourceIndex {
502 index: *idx,
503 num_row_groups: 0,
504 }],
505 row_group_indices: smallvec![RowGroupIndex {
506 index: *idx,
507 row_group_index: 0
508 }],
509 num_rows: 1,
510 }
511 })
512 .collect();
513 let output = group_ranges_for_seq_scan(ranges);
514 let actual: Vec<_> = output
515 .iter()
516 .map(|range| {
517 let indices = range.indices.iter().map(|index| index.index).collect();
518 let group_indices: Vec<_> = range
519 .row_group_indices
520 .iter()
521 .map(|idx| idx.index)
522 .collect();
523 assert_eq!(indices, group_indices);
524 let range = range.time_range;
525 (indices, range.0.value(), range.1.value())
526 })
527 .collect();
528 assert_eq!(expect, actual);
529 }
530
531 #[test]
532 fn test_group_ranges() {
533 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
535
536 run_group_ranges_test(
538 &[
539 (1, 1000, 2000),
540 (2, 6000, 7000),
541 (3, 0, 1500),
542 (4, 1500, 3000),
543 ],
544 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
545 );
546
547 run_group_ranges_test(
549 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
550 &[
551 (vec![3], 0, 1000),
552 (vec![1], 3000, 4000),
553 (vec![2], 4001, 6000),
554 ],
555 );
556
557 run_group_ranges_test(
559 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
560 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
561 );
562 }
563
564 #[test]
565 fn test_merge_range() {
566 let mut left = RangeMeta {
567 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
568 indices: smallvec![SourceIndex {
569 index: 1,
570 num_row_groups: 2,
571 }],
572 row_group_indices: smallvec![
573 RowGroupIndex {
574 index: 1,
575 row_group_index: 1
576 },
577 RowGroupIndex {
578 index: 1,
579 row_group_index: 2
580 }
581 ],
582 num_rows: 5,
583 };
584 let right = RangeMeta {
585 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
586 indices: smallvec![SourceIndex {
587 index: 2,
588 num_row_groups: 2,
589 }],
590 row_group_indices: smallvec![
591 RowGroupIndex {
592 index: 2,
593 row_group_index: 1
594 },
595 RowGroupIndex {
596 index: 2,
597 row_group_index: 2
598 }
599 ],
600 num_rows: 4,
601 };
602 left.merge(right);
603
604 assert_eq!(
605 left,
606 RangeMeta {
607 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
608 indices: smallvec![
609 SourceIndex {
610 index: 1,
611 num_row_groups: 2
612 },
613 SourceIndex {
614 index: 2,
615 num_row_groups: 2
616 }
617 ],
618 row_group_indices: smallvec![
619 RowGroupIndex {
620 index: 1,
621 row_group_index: 1
622 },
623 RowGroupIndex {
624 index: 1,
625 row_group_index: 2
626 },
627 RowGroupIndex {
628 index: 2,
629 row_group_index: 1
630 },
631 RowGroupIndex {
632 index: 2,
633 row_group_index: 2
634 },
635 ],
636 num_rows: 9,
637 }
638 );
639 }
640
641 #[test]
642 fn test_split_range() {
643 let range = RangeMeta {
644 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
645 indices: smallvec![SourceIndex {
646 index: 1,
647 num_row_groups: 2,
648 }],
649 row_group_indices: smallvec![RowGroupIndex {
650 index: 1,
651 row_group_index: ALL_ROW_GROUPS,
652 }],
653 num_rows: 5,
654 };
655
656 assert!(range.can_split_preserve_order());
657 let mut output = Vec::new();
658 range.maybe_split(&mut output);
659
660 assert_eq!(
661 output,
662 &[
663 RangeMeta {
664 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
665 indices: smallvec![SourceIndex {
666 index: 1,
667 num_row_groups: 2,
668 }],
669 row_group_indices: smallvec![RowGroupIndex {
670 index: 1,
671 row_group_index: 0
672 },],
673 num_rows: 2,
674 },
675 RangeMeta {
676 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
677 indices: smallvec![SourceIndex {
678 index: 1,
679 num_row_groups: 2,
680 }],
681 row_group_indices: smallvec![RowGroupIndex {
682 index: 1,
683 row_group_index: 1
684 }],
685 num_rows: 2,
686 }
687 ]
688 );
689 }
690
691 #[test]
692 fn test_not_split_range() {
693 let range = RangeMeta {
694 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
695 indices: smallvec![
696 SourceIndex {
697 index: 1,
698 num_row_groups: 1,
699 },
700 SourceIndex {
701 index: 2,
702 num_row_groups: 1,
703 }
704 ],
705 row_group_indices: smallvec![
706 RowGroupIndex {
707 index: 1,
708 row_group_index: 1
709 },
710 RowGroupIndex {
711 index: 2,
712 row_group_index: 1
713 }
714 ],
715 num_rows: 5,
716 };
717
718 assert!(!range.can_split_preserve_order());
719 let mut output = Vec::new();
720 range.maybe_split(&mut output);
721 assert_eq!(1, output.len());
722 }
723
724 #[test]
725 fn test_maybe_split_ranges() {
726 let ranges = vec![
727 RangeMeta {
728 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
729 indices: smallvec![SourceIndex {
730 index: 0,
731 num_row_groups: 1,
732 }],
733 row_group_indices: smallvec![RowGroupIndex {
734 index: 0,
735 row_group_index: 0,
736 },],
737 num_rows: 4,
738 },
739 RangeMeta {
740 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
741 indices: smallvec![SourceIndex {
742 index: 1,
743 num_row_groups: 2,
744 }],
745 row_group_indices: smallvec![RowGroupIndex {
746 index: 1,
747 row_group_index: ALL_ROW_GROUPS,
748 },],
749 num_rows: 4,
750 },
751 RangeMeta {
752 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
753 indices: smallvec![
754 SourceIndex {
755 index: 2,
756 num_row_groups: 2,
757 },
758 SourceIndex {
759 index: 3,
760 num_row_groups: 0,
761 }
762 ],
763 row_group_indices: smallvec![
764 RowGroupIndex {
765 index: 2,
766 row_group_index: ALL_ROW_GROUPS,
767 },
768 RowGroupIndex {
769 index: 3,
770 row_group_index: ALL_ROW_GROUPS,
771 }
772 ],
773 num_rows: 5,
774 },
775 ];
776 let output = maybe_split_ranges_for_seq_scan(ranges);
777 assert_eq!(
778 output,
779 vec![
780 RangeMeta {
781 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
782 indices: smallvec![SourceIndex {
783 index: 0,
784 num_row_groups: 1,
785 }],
786 row_group_indices: smallvec![RowGroupIndex {
787 index: 0,
788 row_group_index: 0
789 },],
790 num_rows: 4,
791 },
792 RangeMeta {
793 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
794 indices: smallvec![SourceIndex {
795 index: 1,
796 num_row_groups: 2,
797 }],
798 row_group_indices: smallvec![RowGroupIndex {
799 index: 1,
800 row_group_index: 0
801 },],
802 num_rows: 2,
803 },
804 RangeMeta {
805 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
806 indices: smallvec![SourceIndex {
807 index: 1,
808 num_row_groups: 2,
809 }],
810 row_group_indices: smallvec![RowGroupIndex {
811 index: 1,
812 row_group_index: 1
813 }],
814 num_rows: 2,
815 },
816 RangeMeta {
817 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
818 indices: smallvec![
819 SourceIndex {
820 index: 2,
821 num_row_groups: 2
822 },
823 SourceIndex {
824 index: 3,
825 num_row_groups: 0,
826 }
827 ],
828 row_group_indices: smallvec![
829 RowGroupIndex {
830 index: 2,
831 row_group_index: ALL_ROW_GROUPS,
832 },
833 RowGroupIndex {
834 index: 3,
835 row_group_index: ALL_ROW_GROUPS,
836 }
837 ],
838 num_rows: 5,
839 },
840 ]
841 )
842 }
843}