1use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40 pub(crate) index: usize,
42 pub(crate) num_row_groups: u64,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq)]
49pub(crate) struct RowGroupIndex {
50 pub(crate) index: usize,
52 pub(crate) row_group_index: i64,
55}
56
57#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62 pub(crate) time_range: FileTimeRange,
64 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68 pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76 PartitionRange {
77 start: self.time_range.0,
78 end: Timestamp::new(
79 self.time_range
82 .1
83 .value()
84 .checked_add(1)
85 .unwrap_or(self.time_range.1.value()),
86 self.time_range.1.unit(),
87 ),
88 num_rows: self.num_rows,
89 identifier,
90 }
91 }
92
93 pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100 let ranges = group_ranges_for_seq_scan(ranges);
101 if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
102 return ranges;
104 }
105 maybe_split_ranges_for_seq_scan(ranges)
106 }
107
108 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
110 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
111 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
112 Self::push_unordered_file_ranges(
113 input.memtables.len(),
114 &input.files,
115 &input.cache_strategy,
116 &mut ranges,
117 );
118
119 ranges
120 }
121
122 fn overlaps(&self, meta: &RangeMeta) -> bool {
124 overlaps(&self.time_range, &meta.time_range)
125 }
126
127 fn merge(&mut self, mut other: RangeMeta) {
130 debug_assert!(self.overlaps(&other));
131 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
132 debug_assert!(self
133 .row_group_indices
134 .iter()
135 .all(|idx| !other.row_group_indices.contains(idx)));
136
137 self.time_range = (
138 self.time_range.0.min(other.time_range.0),
139 self.time_range.1.max(other.time_range.1),
140 );
141 self.indices.append(&mut other.indices);
142 self.row_group_indices.append(&mut other.row_group_indices);
143 self.num_rows += other.num_rows;
144 }
145
146 fn can_split_preserve_order(&self) -> bool {
149 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
150 }
151
152 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
154 if self.can_split_preserve_order() {
155 let num_row_groups = self.indices[0].num_row_groups;
156 debug_assert_eq!(1, self.row_group_indices.len());
157 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
158
159 output.reserve(self.row_group_indices.len());
160 let num_rows = self.num_rows / num_row_groups as usize;
161 for row_group_index in 0..num_row_groups {
163 output.push(RangeMeta {
164 time_range: self.time_range,
165 indices: self.indices.clone(),
166 row_group_indices: smallvec![RowGroupIndex {
167 index: self.indices[0].index,
168 row_group_index: row_group_index as i64,
169 }],
170 num_rows,
171 });
172 }
173 } else {
174 output.push(self);
175 }
176 }
177
178 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
179 for (memtable_index, memtable) in memtables.iter().enumerate() {
181 let stats = memtable.stats();
182 let Some(time_range) = stats.time_range() else {
183 continue;
184 };
185 for row_group_index in 0..stats.num_ranges() {
186 let num_rows = stats.num_rows() / stats.num_ranges();
187 ranges.push(RangeMeta {
188 time_range,
189 indices: smallvec![SourceIndex {
190 index: memtable_index,
191 num_row_groups: stats.num_ranges() as u64,
192 }],
193 row_group_indices: smallvec![RowGroupIndex {
194 index: memtable_index,
195 row_group_index: row_group_index as i64,
196 }],
197 num_rows,
198 });
199 }
200 }
201 }
202
203 fn push_unordered_file_ranges(
204 num_memtables: usize,
205 files: &[FileHandle],
206 cache: &CacheStrategy,
207 ranges: &mut Vec<RangeMeta>,
208 ) {
209 for (i, file) in files.iter().enumerate() {
211 let file_index = num_memtables + i;
212 let parquet_meta =
214 cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
215 if let Some(parquet_meta) = parquet_meta {
216 for row_group_index in 0..file.meta_ref().num_row_groups {
218 let time_range = parquet_row_group_time_range(
219 file.meta_ref(),
220 &parquet_meta,
221 row_group_index as usize,
222 );
223 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
224 ranges.push(RangeMeta {
225 time_range: time_range.unwrap_or_else(|| file.time_range()),
226 indices: smallvec![SourceIndex {
227 index: file_index,
228 num_row_groups: file.meta_ref().num_row_groups,
229 }],
230 row_group_indices: smallvec![RowGroupIndex {
231 index: file_index,
232 row_group_index: row_group_index as i64,
233 }],
234 num_rows: num_rows as usize,
235 });
236 }
237 } else if file.meta_ref().num_row_groups > 0 {
238 for row_group_index in 0..file.meta_ref().num_row_groups {
240 ranges.push(RangeMeta {
241 time_range: file.time_range(),
242 indices: smallvec![SourceIndex {
243 index: file_index,
244 num_row_groups: file.meta_ref().num_row_groups,
245 }],
246 row_group_indices: smallvec![RowGroupIndex {
247 index: file_index,
248 row_group_index: row_group_index as i64,
249 }],
250 num_rows: DEFAULT_ROW_GROUP_SIZE,
251 });
252 }
253 } else {
254 ranges.push(RangeMeta {
256 time_range: file.time_range(),
257 indices: smallvec![SourceIndex {
258 index: file_index,
259 num_row_groups: 0,
260 }],
261 row_group_indices: smallvec![RowGroupIndex {
262 index: file_index,
263 row_group_index: ALL_ROW_GROUPS,
264 }],
265 num_rows: file.meta_ref().num_rows as usize,
267 });
268 }
269 }
270 }
271
272 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
273 for (i, memtable) in memtables.iter().enumerate() {
275 let stats = memtable.stats();
276 let Some(time_range) = stats.time_range() else {
277 continue;
278 };
279 ranges.push(RangeMeta {
280 time_range,
281 indices: smallvec![SourceIndex {
282 index: i,
283 num_row_groups: stats.num_ranges() as u64,
284 }],
285 row_group_indices: smallvec![RowGroupIndex {
286 index: i,
287 row_group_index: ALL_ROW_GROUPS,
288 }],
289 num_rows: stats.num_rows(),
290 });
291 }
292 }
293
294 fn push_seq_file_ranges(
295 num_memtables: usize,
296 files: &[FileHandle],
297 ranges: &mut Vec<RangeMeta>,
298 ) {
299 for (i, file) in files.iter().enumerate() {
301 let file_index = num_memtables + i;
302 ranges.push(RangeMeta {
303 time_range: file.time_range(),
304 indices: smallvec![SourceIndex {
305 index: file_index,
306 num_row_groups: file.meta_ref().num_row_groups,
307 }],
308 row_group_indices: smallvec![RowGroupIndex {
309 index: file_index,
310 row_group_index: ALL_ROW_GROUPS,
311 }],
312 num_rows: file.meta_ref().num_rows as usize,
313 });
314 }
315 }
316}
317
318fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
321 if ranges.is_empty() {
322 return ranges;
323 }
324
325 ranges.sort_unstable_by(|a, b| {
327 let l = a.time_range;
328 let r = b.time_range;
329 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
330 });
331 let mut range_in_progress = None;
332 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
334 for range in ranges {
335 let Some(mut prev_range) = range_in_progress.take() else {
336 range_in_progress = Some(range);
338 continue;
339 };
340
341 if prev_range.overlaps(&range) {
342 prev_range.merge(range);
343 range_in_progress = Some(prev_range);
344 } else {
345 exclusive_ranges.push(prev_range);
346 range_in_progress = Some(range);
347 }
348 }
349 if let Some(range) = range_in_progress {
350 exclusive_ranges.push(range);
351 }
352
353 exclusive_ranges
354}
355
356fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
359 let mut new_ranges = Vec::with_capacity(ranges.len());
360 for range in ranges {
361 range.maybe_split(&mut new_ranges);
362 }
363
364 new_ranges
365}
366
367#[derive(Default)]
369pub(crate) struct FileRangeBuilder {
370 context: Option<FileRangeContextRef>,
373 selection: RowGroupSelection,
375}
376
377impl FileRangeBuilder {
378 pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
380 Self {
381 context: Some(context),
382 selection,
383 }
384 }
385
386 pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
389 let Some(context) = self.context.clone() else {
390 return;
391 };
392 if row_group_index >= 0 {
393 let row_group_index = row_group_index as usize;
394 let Some(row_selection) = self.selection.get(row_group_index) else {
396 return;
397 };
398 ranges.push(FileRange::new(
399 context,
400 row_group_index,
401 Some(row_selection.clone()),
402 ));
403 } else {
404 ranges.extend(
406 self.selection
407 .iter()
408 .map(|(row_group_index, row_selection)| {
409 FileRange::new(
410 context.clone(),
411 *row_group_index,
412 Some(row_selection.clone()),
413 )
414 }),
415 );
416 }
417 }
418}
419
420pub(crate) struct MemRangeBuilder {
422 ranges: MemtableRanges,
424}
425
426impl MemRangeBuilder {
427 pub(crate) fn new(ranges: MemtableRanges) -> Self {
429 Self { ranges }
430 }
431
432 pub(crate) fn build_ranges(
435 &self,
436 row_group_index: i64,
437 ranges: &mut SmallVec<[MemtableRange; 2]>,
438 ) {
439 if row_group_index >= 0 {
440 let row_group_index = row_group_index as usize;
441 let Some(range) = self.ranges.ranges.get(&row_group_index) else {
443 return;
444 };
445 ranges.push(range.clone());
446 } else {
447 ranges.extend(self.ranges.ranges.values().cloned());
448 }
449 }
450
451 pub(crate) fn stats(&self) -> &MemtableStats {
453 &self.ranges.stats
454 }
455}
456
457pub(crate) struct RangeBuilderList {
461 num_memtables: usize,
462 file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
463}
464
465impl RangeBuilderList {
466 pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
468 let file_builders = (0..num_files).map(|_| None).collect();
469 Self {
470 num_memtables,
471 file_builders: Mutex::new(file_builders),
472 }
473 }
474
475 pub(crate) async fn build_file_ranges(
477 &self,
478 input: &ScanInput,
479 index: RowGroupIndex,
480 reader_metrics: &mut ReaderMetrics,
481 ) -> Result<SmallVec<[FileRange; 2]>> {
482 let mut ranges = SmallVec::new();
483 let file_index = index.index - self.num_memtables;
484 let builder_opt = self.get_file_builder(file_index);
485 match builder_opt {
486 Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
487 None => {
488 let builder = input.prune_file(file_index, reader_metrics).await?;
489 builder.build_ranges(index.row_group_index, &mut ranges);
490 self.set_file_builder(file_index, Arc::new(builder));
491 }
492 }
493 Ok(ranges)
494 }
495
496 fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
497 let file_builders = self.file_builders.lock().unwrap();
498 file_builders[index].clone()
499 }
500
501 fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
502 let mut file_builders = self.file_builders.lock().unwrap();
503 file_builders[index] = Some(builder);
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use common_time::timestamp::TimeUnit;
510 use common_time::Timestamp;
511
512 use super::*;
513
514 type Output = (Vec<usize>, i64, i64);
515
516 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
517 let ranges = input
518 .iter()
519 .map(|(idx, start, end)| {
520 let time_range = (
521 Timestamp::new(*start, TimeUnit::Second),
522 Timestamp::new(*end, TimeUnit::Second),
523 );
524 RangeMeta {
525 time_range,
526 indices: smallvec![SourceIndex {
527 index: *idx,
528 num_row_groups: 0,
529 }],
530 row_group_indices: smallvec![RowGroupIndex {
531 index: *idx,
532 row_group_index: 0
533 }],
534 num_rows: 1,
535 }
536 })
537 .collect();
538 let output = group_ranges_for_seq_scan(ranges);
539 let actual: Vec<_> = output
540 .iter()
541 .map(|range| {
542 let indices = range.indices.iter().map(|index| index.index).collect();
543 let group_indices: Vec<_> = range
544 .row_group_indices
545 .iter()
546 .map(|idx| idx.index)
547 .collect();
548 assert_eq!(indices, group_indices);
549 let range = range.time_range;
550 (indices, range.0.value(), range.1.value())
551 })
552 .collect();
553 assert_eq!(expect, actual);
554 }
555
556 #[test]
557 fn test_group_ranges() {
558 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
560
561 run_group_ranges_test(
563 &[
564 (1, 1000, 2000),
565 (2, 6000, 7000),
566 (3, 0, 1500),
567 (4, 1500, 3000),
568 ],
569 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
570 );
571
572 run_group_ranges_test(
574 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
575 &[
576 (vec![3], 0, 1000),
577 (vec![1], 3000, 4000),
578 (vec![2], 4001, 6000),
579 ],
580 );
581
582 run_group_ranges_test(
584 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
585 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
586 );
587 }
588
589 #[test]
590 fn test_merge_range() {
591 let mut left = RangeMeta {
592 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
593 indices: smallvec![SourceIndex {
594 index: 1,
595 num_row_groups: 2,
596 }],
597 row_group_indices: smallvec![
598 RowGroupIndex {
599 index: 1,
600 row_group_index: 1
601 },
602 RowGroupIndex {
603 index: 1,
604 row_group_index: 2
605 }
606 ],
607 num_rows: 5,
608 };
609 let right = RangeMeta {
610 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
611 indices: smallvec![SourceIndex {
612 index: 2,
613 num_row_groups: 2,
614 }],
615 row_group_indices: smallvec![
616 RowGroupIndex {
617 index: 2,
618 row_group_index: 1
619 },
620 RowGroupIndex {
621 index: 2,
622 row_group_index: 2
623 }
624 ],
625 num_rows: 4,
626 };
627 left.merge(right);
628
629 assert_eq!(
630 left,
631 RangeMeta {
632 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
633 indices: smallvec![
634 SourceIndex {
635 index: 1,
636 num_row_groups: 2
637 },
638 SourceIndex {
639 index: 2,
640 num_row_groups: 2
641 }
642 ],
643 row_group_indices: smallvec![
644 RowGroupIndex {
645 index: 1,
646 row_group_index: 1
647 },
648 RowGroupIndex {
649 index: 1,
650 row_group_index: 2
651 },
652 RowGroupIndex {
653 index: 2,
654 row_group_index: 1
655 },
656 RowGroupIndex {
657 index: 2,
658 row_group_index: 2
659 },
660 ],
661 num_rows: 9,
662 }
663 );
664 }
665
666 #[test]
667 fn test_split_range() {
668 let range = RangeMeta {
669 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
670 indices: smallvec![SourceIndex {
671 index: 1,
672 num_row_groups: 2,
673 }],
674 row_group_indices: smallvec![RowGroupIndex {
675 index: 1,
676 row_group_index: ALL_ROW_GROUPS,
677 }],
678 num_rows: 5,
679 };
680
681 assert!(range.can_split_preserve_order());
682 let mut output = Vec::new();
683 range.maybe_split(&mut output);
684
685 assert_eq!(
686 output,
687 &[
688 RangeMeta {
689 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
690 indices: smallvec![SourceIndex {
691 index: 1,
692 num_row_groups: 2,
693 }],
694 row_group_indices: smallvec![RowGroupIndex {
695 index: 1,
696 row_group_index: 0
697 },],
698 num_rows: 2,
699 },
700 RangeMeta {
701 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
702 indices: smallvec![SourceIndex {
703 index: 1,
704 num_row_groups: 2,
705 }],
706 row_group_indices: smallvec![RowGroupIndex {
707 index: 1,
708 row_group_index: 1
709 }],
710 num_rows: 2,
711 }
712 ]
713 );
714 }
715
716 #[test]
717 fn test_not_split_range() {
718 let range = RangeMeta {
719 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
720 indices: smallvec![
721 SourceIndex {
722 index: 1,
723 num_row_groups: 1,
724 },
725 SourceIndex {
726 index: 2,
727 num_row_groups: 1,
728 }
729 ],
730 row_group_indices: smallvec![
731 RowGroupIndex {
732 index: 1,
733 row_group_index: 1
734 },
735 RowGroupIndex {
736 index: 2,
737 row_group_index: 1
738 }
739 ],
740 num_rows: 5,
741 };
742
743 assert!(!range.can_split_preserve_order());
744 let mut output = Vec::new();
745 range.maybe_split(&mut output);
746 assert_eq!(1, output.len());
747 }
748
749 #[test]
750 fn test_maybe_split_ranges() {
751 let ranges = vec![
752 RangeMeta {
753 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
754 indices: smallvec![SourceIndex {
755 index: 0,
756 num_row_groups: 1,
757 }],
758 row_group_indices: smallvec![RowGroupIndex {
759 index: 0,
760 row_group_index: 0,
761 },],
762 num_rows: 4,
763 },
764 RangeMeta {
765 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
766 indices: smallvec![SourceIndex {
767 index: 1,
768 num_row_groups: 2,
769 }],
770 row_group_indices: smallvec![RowGroupIndex {
771 index: 1,
772 row_group_index: ALL_ROW_GROUPS,
773 },],
774 num_rows: 4,
775 },
776 RangeMeta {
777 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
778 indices: smallvec![
779 SourceIndex {
780 index: 2,
781 num_row_groups: 2,
782 },
783 SourceIndex {
784 index: 3,
785 num_row_groups: 0,
786 }
787 ],
788 row_group_indices: smallvec![
789 RowGroupIndex {
790 index: 2,
791 row_group_index: ALL_ROW_GROUPS,
792 },
793 RowGroupIndex {
794 index: 3,
795 row_group_index: ALL_ROW_GROUPS,
796 }
797 ],
798 num_rows: 5,
799 },
800 ];
801 let output = maybe_split_ranges_for_seq_scan(ranges);
802 assert_eq!(
803 output,
804 vec![
805 RangeMeta {
806 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
807 indices: smallvec![SourceIndex {
808 index: 0,
809 num_row_groups: 1,
810 }],
811 row_group_indices: smallvec![RowGroupIndex {
812 index: 0,
813 row_group_index: 0
814 },],
815 num_rows: 4,
816 },
817 RangeMeta {
818 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
819 indices: smallvec![SourceIndex {
820 index: 1,
821 num_row_groups: 2,
822 }],
823 row_group_indices: smallvec![RowGroupIndex {
824 index: 1,
825 row_group_index: 0
826 },],
827 num_rows: 2,
828 },
829 RangeMeta {
830 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
831 indices: smallvec![SourceIndex {
832 index: 1,
833 num_row_groups: 2,
834 }],
835 row_group_indices: smallvec![RowGroupIndex {
836 index: 1,
837 row_group_index: 1
838 }],
839 num_rows: 2,
840 },
841 RangeMeta {
842 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
843 indices: smallvec![
844 SourceIndex {
845 index: 2,
846 num_row_groups: 2
847 },
848 SourceIndex {
849 index: 3,
850 num_row_groups: 0,
851 }
852 ],
853 row_group_indices: smallvec![
854 RowGroupIndex {
855 index: 2,
856 row_group_index: ALL_ROW_GROUPS,
857 },
858 RowGroupIndex {
859 index: 3,
860 row_group_index: ALL_ROW_GROUPS,
861 }
862 ],
863 num_rows: 5,
864 },
865 ]
866 )
867 }
868}