1use std::collections::BTreeMap;
18use std::sync::{Arc, Mutex};
19
20use common_time::Timestamp;
21use parquet::arrow::arrow_reader::RowSelection;
22use smallvec::{smallvec, SmallVec};
23use store_api::region_engine::PartitionRange;
24use store_api::storage::TimeSeriesDistribution;
25
26use crate::cache::CacheStrategy;
27use crate::error::Result;
28use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
29use crate::read::scan_region::ScanInput;
30use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
31use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
32use crate::sst::parquet::format::parquet_row_group_time_range;
33use crate::sst::parquet::reader::ReaderMetrics;
34use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
35
36const ALL_ROW_GROUPS: i64 = -1;
37
38#[derive(Debug, Clone, Copy, PartialEq)]
40pub(crate) struct SourceIndex {
41 pub(crate) index: usize,
43 pub(crate) num_row_groups: u64,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq)]
50pub(crate) struct RowGroupIndex {
51 pub(crate) index: usize,
53 pub(crate) row_group_index: i64,
56}
57
58#[derive(Debug, PartialEq)]
62pub(crate) struct RangeMeta {
63 pub(crate) time_range: FileTimeRange,
65 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
67 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
69 pub(crate) num_rows: usize,
71}
72
73impl RangeMeta {
74 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
77 PartitionRange {
78 start: self.time_range.0,
79 end: Timestamp::new(
80 self.time_range
83 .1
84 .value()
85 .checked_add(1)
86 .unwrap_or(self.time_range.1.value()),
87 self.time_range.1.unit(),
88 ),
89 num_rows: self.num_rows,
90 identifier,
91 }
92 }
93
94 pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
97 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
98 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
99 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
100
101 let ranges = group_ranges_for_seq_scan(ranges);
102 if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
103 return ranges;
105 }
106 maybe_split_ranges_for_seq_scan(ranges)
107 }
108
109 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
111 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
112 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
113 Self::push_unordered_file_ranges(
114 input.memtables.len(),
115 &input.files,
116 &input.cache_strategy,
117 &mut ranges,
118 );
119
120 ranges
121 }
122
123 fn overlaps(&self, meta: &RangeMeta) -> bool {
125 overlaps(&self.time_range, &meta.time_range)
126 }
127
128 fn merge(&mut self, mut other: RangeMeta) {
131 debug_assert!(self.overlaps(&other));
132 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
133 debug_assert!(self
134 .row_group_indices
135 .iter()
136 .all(|idx| !other.row_group_indices.contains(idx)));
137
138 self.time_range = (
139 self.time_range.0.min(other.time_range.0),
140 self.time_range.1.max(other.time_range.1),
141 );
142 self.indices.append(&mut other.indices);
143 self.row_group_indices.append(&mut other.row_group_indices);
144 self.num_rows += other.num_rows;
145 }
146
147 fn can_split_preserve_order(&self) -> bool {
150 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
151 }
152
153 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
155 if self.can_split_preserve_order() {
156 let num_row_groups = self.indices[0].num_row_groups;
157 debug_assert_eq!(1, self.row_group_indices.len());
158 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
159
160 output.reserve(self.row_group_indices.len());
161 let num_rows = self.num_rows / num_row_groups as usize;
162 for row_group_index in 0..num_row_groups {
164 output.push(RangeMeta {
165 time_range: self.time_range,
166 indices: self.indices.clone(),
167 row_group_indices: smallvec![RowGroupIndex {
168 index: self.indices[0].index,
169 row_group_index: row_group_index as i64,
170 }],
171 num_rows,
172 });
173 }
174 } else {
175 output.push(self);
176 }
177 }
178
179 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
180 for (memtable_index, memtable) in memtables.iter().enumerate() {
182 let stats = memtable.stats();
183 let Some(time_range) = stats.time_range() else {
184 continue;
185 };
186 for row_group_index in 0..stats.num_ranges() {
187 let num_rows = stats.num_rows() / stats.num_ranges();
188 ranges.push(RangeMeta {
189 time_range,
190 indices: smallvec![SourceIndex {
191 index: memtable_index,
192 num_row_groups: stats.num_ranges() as u64,
193 }],
194 row_group_indices: smallvec![RowGroupIndex {
195 index: memtable_index,
196 row_group_index: row_group_index as i64,
197 }],
198 num_rows,
199 });
200 }
201 }
202 }
203
204 fn push_unordered_file_ranges(
205 num_memtables: usize,
206 files: &[FileHandle],
207 cache: &CacheStrategy,
208 ranges: &mut Vec<RangeMeta>,
209 ) {
210 for (i, file) in files.iter().enumerate() {
212 let file_index = num_memtables + i;
213 let parquet_meta =
215 cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
216 if let Some(parquet_meta) = parquet_meta {
217 for row_group_index in 0..file.meta_ref().num_row_groups {
219 let time_range = parquet_row_group_time_range(
220 file.meta_ref(),
221 &parquet_meta,
222 row_group_index as usize,
223 );
224 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
225 ranges.push(RangeMeta {
226 time_range: time_range.unwrap_or_else(|| file.time_range()),
227 indices: smallvec![SourceIndex {
228 index: file_index,
229 num_row_groups: file.meta_ref().num_row_groups,
230 }],
231 row_group_indices: smallvec![RowGroupIndex {
232 index: file_index,
233 row_group_index: row_group_index as i64,
234 }],
235 num_rows: num_rows as usize,
236 });
237 }
238 } else if file.meta_ref().num_row_groups > 0 {
239 for row_group_index in 0..file.meta_ref().num_row_groups {
241 ranges.push(RangeMeta {
242 time_range: file.time_range(),
243 indices: smallvec![SourceIndex {
244 index: file_index,
245 num_row_groups: file.meta_ref().num_row_groups,
246 }],
247 row_group_indices: smallvec![RowGroupIndex {
248 index: file_index,
249 row_group_index: row_group_index as i64,
250 }],
251 num_rows: DEFAULT_ROW_GROUP_SIZE,
252 });
253 }
254 } else {
255 ranges.push(RangeMeta {
257 time_range: file.time_range(),
258 indices: smallvec![SourceIndex {
259 index: file_index,
260 num_row_groups: 0,
261 }],
262 row_group_indices: smallvec![RowGroupIndex {
263 index: file_index,
264 row_group_index: ALL_ROW_GROUPS,
265 }],
266 num_rows: file.meta_ref().num_rows as usize,
268 });
269 }
270 }
271 }
272
273 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
274 for (i, memtable) in memtables.iter().enumerate() {
276 let stats = memtable.stats();
277 let Some(time_range) = stats.time_range() else {
278 continue;
279 };
280 ranges.push(RangeMeta {
281 time_range,
282 indices: smallvec![SourceIndex {
283 index: i,
284 num_row_groups: stats.num_ranges() as u64,
285 }],
286 row_group_indices: smallvec![RowGroupIndex {
287 index: i,
288 row_group_index: ALL_ROW_GROUPS,
289 }],
290 num_rows: stats.num_rows(),
291 });
292 }
293 }
294
295 fn push_seq_file_ranges(
296 num_memtables: usize,
297 files: &[FileHandle],
298 ranges: &mut Vec<RangeMeta>,
299 ) {
300 for (i, file) in files.iter().enumerate() {
302 let file_index = num_memtables + i;
303 ranges.push(RangeMeta {
304 time_range: file.time_range(),
305 indices: smallvec![SourceIndex {
306 index: file_index,
307 num_row_groups: file.meta_ref().num_row_groups,
308 }],
309 row_group_indices: smallvec![RowGroupIndex {
310 index: file_index,
311 row_group_index: ALL_ROW_GROUPS,
312 }],
313 num_rows: file.meta_ref().num_rows as usize,
314 });
315 }
316 }
317}
318
319fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
322 if ranges.is_empty() {
323 return ranges;
324 }
325
326 ranges.sort_unstable_by(|a, b| {
328 let l = a.time_range;
329 let r = b.time_range;
330 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
331 });
332 let mut range_in_progress = None;
333 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
335 for range in ranges {
336 let Some(mut prev_range) = range_in_progress.take() else {
337 range_in_progress = Some(range);
339 continue;
340 };
341
342 if prev_range.overlaps(&range) {
343 prev_range.merge(range);
344 range_in_progress = Some(prev_range);
345 } else {
346 exclusive_ranges.push(prev_range);
347 range_in_progress = Some(range);
348 }
349 }
350 if let Some(range) = range_in_progress {
351 exclusive_ranges.push(range);
352 }
353
354 exclusive_ranges
355}
356
357fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
360 let mut new_ranges = Vec::with_capacity(ranges.len());
361 for range in ranges {
362 range.maybe_split(&mut new_ranges);
363 }
364
365 new_ranges
366}
367
368#[derive(Default)]
370pub(crate) struct FileRangeBuilder {
371 context: Option<FileRangeContextRef>,
374 row_groups: BTreeMap<usize, Option<RowSelection>>,
377}
378
379impl FileRangeBuilder {
380 pub(crate) fn new(
382 context: FileRangeContextRef,
383 row_groups: BTreeMap<usize, Option<RowSelection>>,
384 ) -> Self {
385 Self {
386 context: Some(context),
387 row_groups,
388 }
389 }
390
391 pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
394 let Some(context) = self.context.clone() else {
395 return;
396 };
397 if row_group_index >= 0 {
398 let row_group_index = row_group_index as usize;
399 let Some(row_selection) = self.row_groups.get(&row_group_index) else {
401 return;
402 };
403 ranges.push(FileRange::new(
404 context,
405 row_group_index,
406 row_selection.clone(),
407 ));
408 } else {
409 ranges.extend(
411 self.row_groups
412 .iter()
413 .map(|(row_group_index, row_selection)| {
414 FileRange::new(context.clone(), *row_group_index, row_selection.clone())
415 }),
416 );
417 }
418 }
419}
420
421pub(crate) struct MemRangeBuilder {
423 ranges: MemtableRanges,
425}
426
427impl MemRangeBuilder {
428 pub(crate) fn new(ranges: MemtableRanges) -> Self {
430 Self { ranges }
431 }
432
433 pub(crate) fn build_ranges(
436 &self,
437 row_group_index: i64,
438 ranges: &mut SmallVec<[MemtableRange; 2]>,
439 ) {
440 if row_group_index >= 0 {
441 let row_group_index = row_group_index as usize;
442 let Some(range) = self.ranges.ranges.get(&row_group_index) else {
444 return;
445 };
446 ranges.push(range.clone());
447 } else {
448 ranges.extend(self.ranges.ranges.values().cloned());
449 }
450 }
451
452 pub(crate) fn stats(&self) -> &MemtableStats {
454 &self.ranges.stats
455 }
456}
457
458pub(crate) struct RangeBuilderList {
462 num_memtables: usize,
463 file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
464}
465
466impl RangeBuilderList {
467 pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
469 let file_builders = (0..num_files).map(|_| None).collect();
470 Self {
471 num_memtables,
472 file_builders: Mutex::new(file_builders),
473 }
474 }
475
476 pub(crate) async fn build_file_ranges(
478 &self,
479 input: &ScanInput,
480 index: RowGroupIndex,
481 reader_metrics: &mut ReaderMetrics,
482 ) -> Result<SmallVec<[FileRange; 2]>> {
483 let mut ranges = SmallVec::new();
484 let file_index = index.index - self.num_memtables;
485 let builder_opt = self.get_file_builder(file_index);
486 match builder_opt {
487 Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
488 None => {
489 let builder = input.prune_file(file_index, reader_metrics).await?;
490 builder.build_ranges(index.row_group_index, &mut ranges);
491 self.set_file_builder(file_index, Arc::new(builder));
492 }
493 }
494 Ok(ranges)
495 }
496
497 fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
498 let file_builders = self.file_builders.lock().unwrap();
499 file_builders[index].clone()
500 }
501
502 fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
503 let mut file_builders = self.file_builders.lock().unwrap();
504 file_builders[index] = Some(builder);
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use common_time::timestamp::TimeUnit;
511 use common_time::Timestamp;
512
513 use super::*;
514
515 type Output = (Vec<usize>, i64, i64);
516
517 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
518 let ranges = input
519 .iter()
520 .map(|(idx, start, end)| {
521 let time_range = (
522 Timestamp::new(*start, TimeUnit::Second),
523 Timestamp::new(*end, TimeUnit::Second),
524 );
525 RangeMeta {
526 time_range,
527 indices: smallvec![SourceIndex {
528 index: *idx,
529 num_row_groups: 0,
530 }],
531 row_group_indices: smallvec![RowGroupIndex {
532 index: *idx,
533 row_group_index: 0
534 }],
535 num_rows: 1,
536 }
537 })
538 .collect();
539 let output = group_ranges_for_seq_scan(ranges);
540 let actual: Vec<_> = output
541 .iter()
542 .map(|range| {
543 let indices = range.indices.iter().map(|index| index.index).collect();
544 let group_indices: Vec<_> = range
545 .row_group_indices
546 .iter()
547 .map(|idx| idx.index)
548 .collect();
549 assert_eq!(indices, group_indices);
550 let range = range.time_range;
551 (indices, range.0.value(), range.1.value())
552 })
553 .collect();
554 assert_eq!(expect, actual);
555 }
556
557 #[test]
558 fn test_group_ranges() {
559 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
561
562 run_group_ranges_test(
564 &[
565 (1, 1000, 2000),
566 (2, 6000, 7000),
567 (3, 0, 1500),
568 (4, 1500, 3000),
569 ],
570 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
571 );
572
573 run_group_ranges_test(
575 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
576 &[
577 (vec![3], 0, 1000),
578 (vec![1], 3000, 4000),
579 (vec![2], 4001, 6000),
580 ],
581 );
582
583 run_group_ranges_test(
585 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
586 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
587 );
588 }
589
590 #[test]
591 fn test_merge_range() {
592 let mut left = RangeMeta {
593 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
594 indices: smallvec![SourceIndex {
595 index: 1,
596 num_row_groups: 2,
597 }],
598 row_group_indices: smallvec![
599 RowGroupIndex {
600 index: 1,
601 row_group_index: 1
602 },
603 RowGroupIndex {
604 index: 1,
605 row_group_index: 2
606 }
607 ],
608 num_rows: 5,
609 };
610 let right = RangeMeta {
611 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
612 indices: smallvec![SourceIndex {
613 index: 2,
614 num_row_groups: 2,
615 }],
616 row_group_indices: smallvec![
617 RowGroupIndex {
618 index: 2,
619 row_group_index: 1
620 },
621 RowGroupIndex {
622 index: 2,
623 row_group_index: 2
624 }
625 ],
626 num_rows: 4,
627 };
628 left.merge(right);
629
630 assert_eq!(
631 left,
632 RangeMeta {
633 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
634 indices: smallvec![
635 SourceIndex {
636 index: 1,
637 num_row_groups: 2
638 },
639 SourceIndex {
640 index: 2,
641 num_row_groups: 2
642 }
643 ],
644 row_group_indices: smallvec![
645 RowGroupIndex {
646 index: 1,
647 row_group_index: 1
648 },
649 RowGroupIndex {
650 index: 1,
651 row_group_index: 2
652 },
653 RowGroupIndex {
654 index: 2,
655 row_group_index: 1
656 },
657 RowGroupIndex {
658 index: 2,
659 row_group_index: 2
660 },
661 ],
662 num_rows: 9,
663 }
664 );
665 }
666
667 #[test]
668 fn test_split_range() {
669 let range = RangeMeta {
670 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
671 indices: smallvec![SourceIndex {
672 index: 1,
673 num_row_groups: 2,
674 }],
675 row_group_indices: smallvec![RowGroupIndex {
676 index: 1,
677 row_group_index: ALL_ROW_GROUPS,
678 }],
679 num_rows: 5,
680 };
681
682 assert!(range.can_split_preserve_order());
683 let mut output = Vec::new();
684 range.maybe_split(&mut output);
685
686 assert_eq!(
687 output,
688 &[
689 RangeMeta {
690 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
691 indices: smallvec![SourceIndex {
692 index: 1,
693 num_row_groups: 2,
694 }],
695 row_group_indices: smallvec![RowGroupIndex {
696 index: 1,
697 row_group_index: 0
698 },],
699 num_rows: 2,
700 },
701 RangeMeta {
702 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
703 indices: smallvec![SourceIndex {
704 index: 1,
705 num_row_groups: 2,
706 }],
707 row_group_indices: smallvec![RowGroupIndex {
708 index: 1,
709 row_group_index: 1
710 }],
711 num_rows: 2,
712 }
713 ]
714 );
715 }
716
717 #[test]
718 fn test_not_split_range() {
719 let range = RangeMeta {
720 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
721 indices: smallvec![
722 SourceIndex {
723 index: 1,
724 num_row_groups: 1,
725 },
726 SourceIndex {
727 index: 2,
728 num_row_groups: 1,
729 }
730 ],
731 row_group_indices: smallvec![
732 RowGroupIndex {
733 index: 1,
734 row_group_index: 1
735 },
736 RowGroupIndex {
737 index: 2,
738 row_group_index: 1
739 }
740 ],
741 num_rows: 5,
742 };
743
744 assert!(!range.can_split_preserve_order());
745 let mut output = Vec::new();
746 range.maybe_split(&mut output);
747 assert_eq!(1, output.len());
748 }
749
750 #[test]
751 fn test_maybe_split_ranges() {
752 let ranges = vec![
753 RangeMeta {
754 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
755 indices: smallvec![SourceIndex {
756 index: 0,
757 num_row_groups: 1,
758 }],
759 row_group_indices: smallvec![RowGroupIndex {
760 index: 0,
761 row_group_index: 0,
762 },],
763 num_rows: 4,
764 },
765 RangeMeta {
766 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
767 indices: smallvec![SourceIndex {
768 index: 1,
769 num_row_groups: 2,
770 }],
771 row_group_indices: smallvec![RowGroupIndex {
772 index: 1,
773 row_group_index: ALL_ROW_GROUPS,
774 },],
775 num_rows: 4,
776 },
777 RangeMeta {
778 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
779 indices: smallvec![
780 SourceIndex {
781 index: 2,
782 num_row_groups: 2,
783 },
784 SourceIndex {
785 index: 3,
786 num_row_groups: 0,
787 }
788 ],
789 row_group_indices: smallvec![
790 RowGroupIndex {
791 index: 2,
792 row_group_index: ALL_ROW_GROUPS,
793 },
794 RowGroupIndex {
795 index: 3,
796 row_group_index: ALL_ROW_GROUPS,
797 }
798 ],
799 num_rows: 5,
800 },
801 ];
802 let output = maybe_split_ranges_for_seq_scan(ranges);
803 assert_eq!(
804 output,
805 vec![
806 RangeMeta {
807 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
808 indices: smallvec![SourceIndex {
809 index: 0,
810 num_row_groups: 1,
811 }],
812 row_group_indices: smallvec![RowGroupIndex {
813 index: 0,
814 row_group_index: 0
815 },],
816 num_rows: 4,
817 },
818 RangeMeta {
819 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
820 indices: smallvec![SourceIndex {
821 index: 1,
822 num_row_groups: 2,
823 }],
824 row_group_indices: smallvec![RowGroupIndex {
825 index: 1,
826 row_group_index: 0
827 },],
828 num_rows: 2,
829 },
830 RangeMeta {
831 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
832 indices: smallvec![SourceIndex {
833 index: 1,
834 num_row_groups: 2,
835 }],
836 row_group_indices: smallvec![RowGroupIndex {
837 index: 1,
838 row_group_index: 1
839 }],
840 num_rows: 2,
841 },
842 RangeMeta {
843 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
844 indices: smallvec![
845 SourceIndex {
846 index: 2,
847 num_row_groups: 2
848 },
849 SourceIndex {
850 index: 3,
851 num_row_groups: 0,
852 }
853 ],
854 row_group_indices: smallvec![
855 RowGroupIndex {
856 index: 2,
857 row_group_index: ALL_ROW_GROUPS,
858 },
859 RowGroupIndex {
860 index: 3,
861 row_group_index: ALL_ROW_GROUPS,
862 }
863 ],
864 num_rows: 5,
865 },
866 ]
867 )
868 }
869}