1use common_time::Timestamp;
18use smallvec::{SmallVec, smallvec};
19use store_api::region_engine::PartitionRange;
20use store_api::storage::TimeSeriesDistribution;
21
22use crate::cache::CacheStrategy;
23use crate::memtable::{MemtableRange, MemtableStats};
24use crate::read::scan_region::ScanInput;
25use crate::sst::file::{FileHandle, FileTimeRange, overlaps};
26use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
27use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
28use crate::sst::parquet::format::parquet_row_group_time_range;
29use crate::sst::parquet::row_selection::RowGroupSelection;
30
31const ALL_ROW_GROUPS: i64 = -1;
32
33#[derive(Debug, Clone, Copy, PartialEq)]
35pub(crate) struct SourceIndex {
36 pub(crate) index: usize,
38 pub(crate) num_row_groups: u64,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq)]
45pub struct RowGroupIndex {
46 pub(crate) index: usize,
48 pub row_group_index: i64,
51}
52
53#[derive(Debug, PartialEq, Clone)]
57pub(crate) struct RangeMeta {
58 pub(crate) time_range: FileTimeRange,
60 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
62 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
64 pub(crate) num_rows: usize,
66}
67
68impl RangeMeta {
69 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
72 PartitionRange {
73 start: self.time_range.0,
74 end: Timestamp::new(
75 self.time_range
78 .1
79 .value()
80 .checked_add(1)
81 .unwrap_or(self.time_range.1.value()),
82 self.time_range.1.unit(),
83 ),
84 num_rows: self.num_rows,
85 identifier,
86 }
87 }
88
89 pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
92 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
93 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
94 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
95
96 #[cfg(feature = "enterprise")]
97 Self::push_extension_ranges(input, &mut ranges);
98
99 let ranges = group_ranges_for_seq_scan(ranges);
100 if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
101 return ranges;
103 }
104 maybe_split_ranges_for_seq_scan(ranges)
105 }
106
107 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
109 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
110 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
111 Self::push_unordered_file_ranges(
112 input.memtables.len(),
113 &input.files,
114 &input.cache_strategy,
115 &mut ranges,
116 );
117
118 #[cfg(feature = "enterprise")]
119 Self::push_extension_ranges(input, &mut ranges);
120
121 ranges
122 }
123
124 fn overlaps(&self, meta: &RangeMeta) -> bool {
126 overlaps(&self.time_range, &meta.time_range)
127 }
128
129 fn merge(&mut self, mut other: RangeMeta) {
132 debug_assert!(self.overlaps(&other));
133 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
134 debug_assert!(
135 self.row_group_indices
136 .iter()
137 .all(|idx| !other.row_group_indices.contains(idx))
138 );
139
140 self.time_range = (
141 self.time_range.0.min(other.time_range.0),
142 self.time_range.1.max(other.time_range.1),
143 );
144 self.indices.append(&mut other.indices);
145 self.row_group_indices.append(&mut other.row_group_indices);
146 self.num_rows += other.num_rows;
147 }
148
149 fn can_split_preserve_order(&self) -> bool {
152 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
153 }
154
155 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
157 if self.can_split_preserve_order() {
158 let num_row_groups = self.indices[0].num_row_groups;
159 debug_assert_eq!(1, self.row_group_indices.len());
160 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
161
162 output.reserve(self.row_group_indices.len());
163 let num_rows = self.num_rows / num_row_groups as usize;
164 for row_group_index in 0..num_row_groups {
166 output.push(RangeMeta {
167 time_range: self.time_range,
168 indices: self.indices.clone(),
169 row_group_indices: smallvec![RowGroupIndex {
170 index: self.indices[0].index,
171 row_group_index: row_group_index as i64,
172 }],
173 num_rows,
174 });
175 }
176 } else {
177 output.push(self);
178 }
179 }
180
181 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
182 for (memtable_index, memtable) in memtables.iter().enumerate() {
184 let stats = memtable.stats();
185 let Some(time_range) = stats.time_range() else {
186 continue;
187 };
188 for row_group_index in 0..stats.num_ranges() {
189 let num_rows = stats.num_rows() / stats.num_ranges();
190 ranges.push(RangeMeta {
191 time_range,
192 indices: smallvec![SourceIndex {
193 index: memtable_index,
194 num_row_groups: stats.num_ranges() as u64,
195 }],
196 row_group_indices: smallvec![RowGroupIndex {
197 index: memtable_index,
198 row_group_index: row_group_index as i64,
199 }],
200 num_rows,
201 });
202 }
203 }
204 }
205
206 fn push_unordered_file_ranges(
207 num_memtables: usize,
208 files: &[FileHandle],
209 cache: &CacheStrategy,
210 ranges: &mut Vec<RangeMeta>,
211 ) {
212 for (i, file) in files.iter().enumerate() {
214 let file_index = num_memtables + i;
215 let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
217 if let Some(parquet_meta) = parquet_meta {
218 for row_group_index in 0..file.meta_ref().num_row_groups {
220 let time_range = parquet_row_group_time_range(
221 file.meta_ref(),
222 &parquet_meta,
223 row_group_index as usize,
224 );
225 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
226 ranges.push(RangeMeta {
227 time_range: time_range.unwrap_or_else(|| file.time_range()),
228 indices: smallvec![SourceIndex {
229 index: file_index,
230 num_row_groups: file.meta_ref().num_row_groups,
231 }],
232 row_group_indices: smallvec![RowGroupIndex {
233 index: file_index,
234 row_group_index: row_group_index as i64,
235 }],
236 num_rows: num_rows as usize,
237 });
238 }
239 } else if file.meta_ref().num_row_groups > 0 {
240 for row_group_index in 0..file.meta_ref().num_row_groups {
242 ranges.push(RangeMeta {
243 time_range: file.time_range(),
244 indices: smallvec![SourceIndex {
245 index: file_index,
246 num_row_groups: file.meta_ref().num_row_groups,
247 }],
248 row_group_indices: smallvec![RowGroupIndex {
249 index: file_index,
250 row_group_index: row_group_index as i64,
251 }],
252 num_rows: DEFAULT_ROW_GROUP_SIZE,
253 });
254 }
255 } else {
256 ranges.push(RangeMeta {
258 time_range: file.time_range(),
259 indices: smallvec![SourceIndex {
260 index: file_index,
261 num_row_groups: 0,
262 }],
263 row_group_indices: smallvec![RowGroupIndex {
264 index: file_index,
265 row_group_index: ALL_ROW_GROUPS,
266 }],
267 num_rows: file.meta_ref().num_rows as usize,
269 });
270 }
271 }
272 }
273
274 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
275 for (i, memtable) in memtables.iter().enumerate() {
277 let stats = memtable.stats();
278 let Some(time_range) = stats.time_range() else {
279 continue;
280 };
281 ranges.push(RangeMeta {
282 time_range,
283 indices: smallvec![SourceIndex {
284 index: i,
285 num_row_groups: stats.num_ranges() as u64,
286 }],
287 row_group_indices: smallvec![RowGroupIndex {
288 index: i,
289 row_group_index: ALL_ROW_GROUPS,
290 }],
291 num_rows: stats.num_rows(),
292 });
293 }
294 }
295
296 fn push_seq_file_ranges(
297 num_memtables: usize,
298 files: &[FileHandle],
299 ranges: &mut Vec<RangeMeta>,
300 ) {
301 for (i, file) in files.iter().enumerate() {
303 let file_index = num_memtables + i;
304 ranges.push(RangeMeta {
305 time_range: file.time_range(),
306 indices: smallvec![SourceIndex {
307 index: file_index,
308 num_row_groups: file.meta_ref().num_row_groups,
309 }],
310 row_group_indices: smallvec![RowGroupIndex {
311 index: file_index,
312 row_group_index: ALL_ROW_GROUPS,
313 }],
314 num_rows: file.meta_ref().num_rows as usize,
315 });
316 }
317 }
318
319 #[cfg(feature = "enterprise")]
320 fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
321 for (i, range) in input.extension_ranges().iter().enumerate() {
322 let index = input.num_memtables() + input.num_files() + i;
323 metas.push(RangeMeta {
324 time_range: range.time_range(),
325 indices: smallvec![SourceIndex {
326 index,
327 num_row_groups: range.num_row_groups(),
328 }],
329 row_group_indices: smallvec![RowGroupIndex {
330 index,
331 row_group_index: ALL_ROW_GROUPS,
332 }],
333 num_rows: range.num_rows() as usize,
334 });
335 }
336 }
337}
338
339fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
342 if ranges.is_empty() {
343 return ranges;
344 }
345
346 ranges.sort_unstable_by(|a, b| {
348 let l = a.time_range;
349 let r = b.time_range;
350 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
351 });
352 let mut range_in_progress = None;
353 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
355 for range in ranges {
356 let Some(mut prev_range) = range_in_progress.take() else {
357 range_in_progress = Some(range);
359 continue;
360 };
361
362 if prev_range.overlaps(&range) {
363 prev_range.merge(range);
364 range_in_progress = Some(prev_range);
365 } else {
366 exclusive_ranges.push(prev_range);
367 range_in_progress = Some(range);
368 }
369 }
370 if let Some(range) = range_in_progress {
371 exclusive_ranges.push(range);
372 }
373
374 exclusive_ranges
375}
376
377fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
380 let mut new_ranges = Vec::with_capacity(ranges.len());
381 for range in ranges {
382 range.maybe_split(&mut new_ranges);
383 }
384
385 new_ranges
386}
387
388#[derive(Default)]
390pub struct FileRangeBuilder {
391 context: Option<FileRangeContextRef>,
394 selection: RowGroupSelection,
396}
397
398impl FileRangeBuilder {
399 pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
401 Self {
402 context: Some(context),
403 selection,
404 }
405 }
406
407 pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
410 let Some(context) = self.context.clone() else {
411 return;
412 };
413 if row_group_index >= 0 {
414 let row_group_index = row_group_index as usize;
415 let Some(row_selection) = self.selection.get(row_group_index) else {
417 return;
418 };
419 ranges.push(FileRange::new(
420 context,
421 row_group_index,
422 Some(row_selection.clone()),
423 ));
424 } else {
425 ranges.extend(
427 self.selection
428 .iter()
429 .map(|(row_group_index, row_selection)| {
430 FileRange::new(
431 context.clone(),
432 *row_group_index,
433 Some(row_selection.clone()),
434 )
435 }),
436 );
437 }
438 }
439
440 pub(crate) fn memory_size(&self) -> usize {
442 let context_size = self
443 .context
444 .as_ref()
445 .map(|ctx| ctx.memory_size())
446 .unwrap_or(0);
447 let selection_size = self.selection.mem_usage();
448 context_size + selection_size
449 }
450}
451
452#[derive(Clone)]
454pub(crate) struct MemRangeBuilder {
455 range: MemtableRange,
457 stats: MemtableStats,
459}
460
461impl MemRangeBuilder {
462 pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
464 Self { range, stats }
465 }
466
467 pub(crate) fn build_ranges(
470 &self,
471 _row_group_index: i64,
472 ranges: &mut SmallVec<[MemtableRange; 2]>,
473 ) {
474 ranges.push(self.range.clone())
475 }
476
477 pub(crate) fn stats(&self) -> &MemtableStats {
479 &self.stats
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use common_time::Timestamp;
486 use common_time::timestamp::TimeUnit;
487
488 use super::*;
489
490 type Output = (Vec<usize>, i64, i64);
491
492 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
493 let ranges = input
494 .iter()
495 .map(|(idx, start, end)| {
496 let time_range = (
497 Timestamp::new(*start, TimeUnit::Second),
498 Timestamp::new(*end, TimeUnit::Second),
499 );
500 RangeMeta {
501 time_range,
502 indices: smallvec![SourceIndex {
503 index: *idx,
504 num_row_groups: 0,
505 }],
506 row_group_indices: smallvec![RowGroupIndex {
507 index: *idx,
508 row_group_index: 0
509 }],
510 num_rows: 1,
511 }
512 })
513 .collect();
514 let output = group_ranges_for_seq_scan(ranges);
515 let actual: Vec<_> = output
516 .iter()
517 .map(|range| {
518 let indices = range.indices.iter().map(|index| index.index).collect();
519 let group_indices: Vec<_> = range
520 .row_group_indices
521 .iter()
522 .map(|idx| idx.index)
523 .collect();
524 assert_eq!(indices, group_indices);
525 let range = range.time_range;
526 (indices, range.0.value(), range.1.value())
527 })
528 .collect();
529 assert_eq!(expect, actual);
530 }
531
532 #[test]
533 fn test_group_ranges() {
534 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
536
537 run_group_ranges_test(
539 &[
540 (1, 1000, 2000),
541 (2, 6000, 7000),
542 (3, 0, 1500),
543 (4, 1500, 3000),
544 ],
545 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
546 );
547
548 run_group_ranges_test(
550 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
551 &[
552 (vec![3], 0, 1000),
553 (vec![1], 3000, 4000),
554 (vec![2], 4001, 6000),
555 ],
556 );
557
558 run_group_ranges_test(
560 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
561 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
562 );
563 }
564
565 #[test]
566 fn test_merge_range() {
567 let mut left = RangeMeta {
568 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
569 indices: smallvec![SourceIndex {
570 index: 1,
571 num_row_groups: 2,
572 }],
573 row_group_indices: smallvec![
574 RowGroupIndex {
575 index: 1,
576 row_group_index: 1
577 },
578 RowGroupIndex {
579 index: 1,
580 row_group_index: 2
581 }
582 ],
583 num_rows: 5,
584 };
585 let right = RangeMeta {
586 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
587 indices: smallvec![SourceIndex {
588 index: 2,
589 num_row_groups: 2,
590 }],
591 row_group_indices: smallvec![
592 RowGroupIndex {
593 index: 2,
594 row_group_index: 1
595 },
596 RowGroupIndex {
597 index: 2,
598 row_group_index: 2
599 }
600 ],
601 num_rows: 4,
602 };
603 left.merge(right);
604
605 assert_eq!(
606 left,
607 RangeMeta {
608 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
609 indices: smallvec![
610 SourceIndex {
611 index: 1,
612 num_row_groups: 2
613 },
614 SourceIndex {
615 index: 2,
616 num_row_groups: 2
617 }
618 ],
619 row_group_indices: smallvec![
620 RowGroupIndex {
621 index: 1,
622 row_group_index: 1
623 },
624 RowGroupIndex {
625 index: 1,
626 row_group_index: 2
627 },
628 RowGroupIndex {
629 index: 2,
630 row_group_index: 1
631 },
632 RowGroupIndex {
633 index: 2,
634 row_group_index: 2
635 },
636 ],
637 num_rows: 9,
638 }
639 );
640 }
641
642 #[test]
643 fn test_split_range() {
644 let range = RangeMeta {
645 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
646 indices: smallvec![SourceIndex {
647 index: 1,
648 num_row_groups: 2,
649 }],
650 row_group_indices: smallvec![RowGroupIndex {
651 index: 1,
652 row_group_index: ALL_ROW_GROUPS,
653 }],
654 num_rows: 5,
655 };
656
657 assert!(range.can_split_preserve_order());
658 let mut output = Vec::new();
659 range.maybe_split(&mut output);
660
661 assert_eq!(
662 output,
663 &[
664 RangeMeta {
665 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
666 indices: smallvec![SourceIndex {
667 index: 1,
668 num_row_groups: 2,
669 }],
670 row_group_indices: smallvec![RowGroupIndex {
671 index: 1,
672 row_group_index: 0
673 },],
674 num_rows: 2,
675 },
676 RangeMeta {
677 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
678 indices: smallvec![SourceIndex {
679 index: 1,
680 num_row_groups: 2,
681 }],
682 row_group_indices: smallvec![RowGroupIndex {
683 index: 1,
684 row_group_index: 1
685 }],
686 num_rows: 2,
687 }
688 ]
689 );
690 }
691
692 #[test]
693 fn test_not_split_range() {
694 let range = RangeMeta {
695 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
696 indices: smallvec![
697 SourceIndex {
698 index: 1,
699 num_row_groups: 1,
700 },
701 SourceIndex {
702 index: 2,
703 num_row_groups: 1,
704 }
705 ],
706 row_group_indices: smallvec![
707 RowGroupIndex {
708 index: 1,
709 row_group_index: 1
710 },
711 RowGroupIndex {
712 index: 2,
713 row_group_index: 1
714 }
715 ],
716 num_rows: 5,
717 };
718
719 assert!(!range.can_split_preserve_order());
720 let mut output = Vec::new();
721 range.maybe_split(&mut output);
722 assert_eq!(1, output.len());
723 }
724
725 #[test]
726 fn test_maybe_split_ranges() {
727 let ranges = vec![
728 RangeMeta {
729 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
730 indices: smallvec![SourceIndex {
731 index: 0,
732 num_row_groups: 1,
733 }],
734 row_group_indices: smallvec![RowGroupIndex {
735 index: 0,
736 row_group_index: 0,
737 },],
738 num_rows: 4,
739 },
740 RangeMeta {
741 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
742 indices: smallvec![SourceIndex {
743 index: 1,
744 num_row_groups: 2,
745 }],
746 row_group_indices: smallvec![RowGroupIndex {
747 index: 1,
748 row_group_index: ALL_ROW_GROUPS,
749 },],
750 num_rows: 4,
751 },
752 RangeMeta {
753 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
754 indices: smallvec![
755 SourceIndex {
756 index: 2,
757 num_row_groups: 2,
758 },
759 SourceIndex {
760 index: 3,
761 num_row_groups: 0,
762 }
763 ],
764 row_group_indices: smallvec![
765 RowGroupIndex {
766 index: 2,
767 row_group_index: ALL_ROW_GROUPS,
768 },
769 RowGroupIndex {
770 index: 3,
771 row_group_index: ALL_ROW_GROUPS,
772 }
773 ],
774 num_rows: 5,
775 },
776 ];
777 let output = maybe_split_ranges_for_seq_scan(ranges);
778 assert_eq!(
779 output,
780 vec![
781 RangeMeta {
782 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
783 indices: smallvec![SourceIndex {
784 index: 0,
785 num_row_groups: 1,
786 }],
787 row_group_indices: smallvec![RowGroupIndex {
788 index: 0,
789 row_group_index: 0
790 },],
791 num_rows: 4,
792 },
793 RangeMeta {
794 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
795 indices: smallvec![SourceIndex {
796 index: 1,
797 num_row_groups: 2,
798 }],
799 row_group_indices: smallvec![RowGroupIndex {
800 index: 1,
801 row_group_index: 0
802 },],
803 num_rows: 2,
804 },
805 RangeMeta {
806 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
807 indices: smallvec![SourceIndex {
808 index: 1,
809 num_row_groups: 2,
810 }],
811 row_group_indices: smallvec![RowGroupIndex {
812 index: 1,
813 row_group_index: 1
814 }],
815 num_rows: 2,
816 },
817 RangeMeta {
818 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
819 indices: smallvec![
820 SourceIndex {
821 index: 2,
822 num_row_groups: 2
823 },
824 SourceIndex {
825 index: 3,
826 num_row_groups: 0,
827 }
828 ],
829 row_group_indices: smallvec![
830 RowGroupIndex {
831 index: 2,
832 row_group_index: ALL_ROW_GROUPS,
833 },
834 RowGroupIndex {
835 index: 3,
836 row_group_index: ALL_ROW_GROUPS,
837 }
838 ],
839 num_rows: 5,
840 },
841 ]
842 )
843 }
844}