1use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{SmallVec, smallvec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{FileHandle, FileTimeRange, overlaps};
29use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
30use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
31use crate::sst::parquet::format::parquet_row_group_time_range;
32use crate::sst::parquet::reader::ReaderMetrics;
33use crate::sst::parquet::row_selection::RowGroupSelection;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40 pub(crate) index: usize,
42 pub(crate) num_row_groups: u64,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50 pub(crate) index: usize,
52 pub row_group_index: i64,
55}
56
57#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62 pub(crate) time_range: FileTimeRange,
64 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68 pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76 PartitionRange {
77 start: self.time_range.0,
78 end: Timestamp::new(
79 self.time_range
82 .1
83 .value()
84 .checked_add(1)
85 .unwrap_or(self.time_range.1.value()),
86 self.time_range.1.unit(),
87 ),
88 num_rows: self.num_rows,
89 identifier,
90 }
91 }
92
93 pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
96 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100 #[cfg(feature = "enterprise")]
101 Self::push_extension_ranges(input, &mut ranges);
102
103 let ranges = group_ranges_for_seq_scan(ranges);
104 if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105 return ranges;
107 }
108 maybe_split_ranges_for_seq_scan(ranges)
109 }
110
111 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115 Self::push_unordered_file_ranges(
116 input.memtables.len(),
117 &input.files,
118 &input.cache_strategy,
119 &mut ranges,
120 );
121
122 #[cfg(feature = "enterprise")]
123 Self::push_extension_ranges(input, &mut ranges);
124
125 ranges
126 }
127
128 fn overlaps(&self, meta: &RangeMeta) -> bool {
130 overlaps(&self.time_range, &meta.time_range)
131 }
132
133 fn merge(&mut self, mut other: RangeMeta) {
136 debug_assert!(self.overlaps(&other));
137 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138 debug_assert!(
139 self.row_group_indices
140 .iter()
141 .all(|idx| !other.row_group_indices.contains(idx))
142 );
143
144 self.time_range = (
145 self.time_range.0.min(other.time_range.0),
146 self.time_range.1.max(other.time_range.1),
147 );
148 self.indices.append(&mut other.indices);
149 self.row_group_indices.append(&mut other.row_group_indices);
150 self.num_rows += other.num_rows;
151 }
152
153 fn can_split_preserve_order(&self) -> bool {
156 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
157 }
158
159 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
161 if self.can_split_preserve_order() {
162 let num_row_groups = self.indices[0].num_row_groups;
163 debug_assert_eq!(1, self.row_group_indices.len());
164 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
165
166 output.reserve(self.row_group_indices.len());
167 let num_rows = self.num_rows / num_row_groups as usize;
168 for row_group_index in 0..num_row_groups {
170 output.push(RangeMeta {
171 time_range: self.time_range,
172 indices: self.indices.clone(),
173 row_group_indices: smallvec![RowGroupIndex {
174 index: self.indices[0].index,
175 row_group_index: row_group_index as i64,
176 }],
177 num_rows,
178 });
179 }
180 } else {
181 output.push(self);
182 }
183 }
184
185 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
186 for (memtable_index, memtable) in memtables.iter().enumerate() {
188 let stats = memtable.stats();
189 let Some(time_range) = stats.time_range() else {
190 continue;
191 };
192 for row_group_index in 0..stats.num_ranges() {
193 let num_rows = stats.num_rows() / stats.num_ranges();
194 ranges.push(RangeMeta {
195 time_range,
196 indices: smallvec![SourceIndex {
197 index: memtable_index,
198 num_row_groups: stats.num_ranges() as u64,
199 }],
200 row_group_indices: smallvec![RowGroupIndex {
201 index: memtable_index,
202 row_group_index: row_group_index as i64,
203 }],
204 num_rows,
205 });
206 }
207 }
208 }
209
210 fn push_unordered_file_ranges(
211 num_memtables: usize,
212 files: &[FileHandle],
213 cache: &CacheStrategy,
214 ranges: &mut Vec<RangeMeta>,
215 ) {
216 for (i, file) in files.iter().enumerate() {
218 let file_index = num_memtables + i;
219 let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
221 if let Some(parquet_meta) = parquet_meta {
222 for row_group_index in 0..file.meta_ref().num_row_groups {
224 let time_range = parquet_row_group_time_range(
225 file.meta_ref(),
226 &parquet_meta,
227 row_group_index as usize,
228 );
229 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
230 ranges.push(RangeMeta {
231 time_range: time_range.unwrap_or_else(|| file.time_range()),
232 indices: smallvec![SourceIndex {
233 index: file_index,
234 num_row_groups: file.meta_ref().num_row_groups,
235 }],
236 row_group_indices: smallvec![RowGroupIndex {
237 index: file_index,
238 row_group_index: row_group_index as i64,
239 }],
240 num_rows: num_rows as usize,
241 });
242 }
243 } else if file.meta_ref().num_row_groups > 0 {
244 for row_group_index in 0..file.meta_ref().num_row_groups {
246 ranges.push(RangeMeta {
247 time_range: file.time_range(),
248 indices: smallvec![SourceIndex {
249 index: file_index,
250 num_row_groups: file.meta_ref().num_row_groups,
251 }],
252 row_group_indices: smallvec![RowGroupIndex {
253 index: file_index,
254 row_group_index: row_group_index as i64,
255 }],
256 num_rows: DEFAULT_ROW_GROUP_SIZE,
257 });
258 }
259 } else {
260 ranges.push(RangeMeta {
262 time_range: file.time_range(),
263 indices: smallvec![SourceIndex {
264 index: file_index,
265 num_row_groups: 0,
266 }],
267 row_group_indices: smallvec![RowGroupIndex {
268 index: file_index,
269 row_group_index: ALL_ROW_GROUPS,
270 }],
271 num_rows: file.meta_ref().num_rows as usize,
273 });
274 }
275 }
276 }
277
278 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
279 for (i, memtable) in memtables.iter().enumerate() {
281 let stats = memtable.stats();
282 let Some(time_range) = stats.time_range() else {
283 continue;
284 };
285 ranges.push(RangeMeta {
286 time_range,
287 indices: smallvec![SourceIndex {
288 index: i,
289 num_row_groups: stats.num_ranges() as u64,
290 }],
291 row_group_indices: smallvec![RowGroupIndex {
292 index: i,
293 row_group_index: ALL_ROW_GROUPS,
294 }],
295 num_rows: stats.num_rows(),
296 });
297 }
298 }
299
300 fn push_seq_file_ranges(
301 num_memtables: usize,
302 files: &[FileHandle],
303 ranges: &mut Vec<RangeMeta>,
304 ) {
305 for (i, file) in files.iter().enumerate() {
307 let file_index = num_memtables + i;
308 ranges.push(RangeMeta {
309 time_range: file.time_range(),
310 indices: smallvec![SourceIndex {
311 index: file_index,
312 num_row_groups: file.meta_ref().num_row_groups,
313 }],
314 row_group_indices: smallvec![RowGroupIndex {
315 index: file_index,
316 row_group_index: ALL_ROW_GROUPS,
317 }],
318 num_rows: file.meta_ref().num_rows as usize,
319 });
320 }
321 }
322
323 #[cfg(feature = "enterprise")]
324 fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
325 for (i, range) in input.extension_ranges().iter().enumerate() {
326 let index = input.num_memtables() + input.num_files() + i;
327 metas.push(RangeMeta {
328 time_range: range.time_range(),
329 indices: smallvec![SourceIndex {
330 index,
331 num_row_groups: range.num_row_groups(),
332 }],
333 row_group_indices: smallvec![RowGroupIndex {
334 index,
335 row_group_index: ALL_ROW_GROUPS,
336 }],
337 num_rows: range.num_rows() as usize,
338 });
339 }
340 }
341}
342
343fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
346 if ranges.is_empty() {
347 return ranges;
348 }
349
350 ranges.sort_unstable_by(|a, b| {
352 let l = a.time_range;
353 let r = b.time_range;
354 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
355 });
356 let mut range_in_progress = None;
357 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
359 for range in ranges {
360 let Some(mut prev_range) = range_in_progress.take() else {
361 range_in_progress = Some(range);
363 continue;
364 };
365
366 if prev_range.overlaps(&range) {
367 prev_range.merge(range);
368 range_in_progress = Some(prev_range);
369 } else {
370 exclusive_ranges.push(prev_range);
371 range_in_progress = Some(range);
372 }
373 }
374 if let Some(range) = range_in_progress {
375 exclusive_ranges.push(range);
376 }
377
378 exclusive_ranges
379}
380
381fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
384 let mut new_ranges = Vec::with_capacity(ranges.len());
385 for range in ranges {
386 range.maybe_split(&mut new_ranges);
387 }
388
389 new_ranges
390}
391
392#[derive(Default)]
394pub struct FileRangeBuilder {
395 context: Option<FileRangeContextRef>,
398 selection: RowGroupSelection,
400}
401
402impl FileRangeBuilder {
403 pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
405 Self {
406 context: Some(context),
407 selection,
408 }
409 }
410
411 pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
414 let Some(context) = self.context.clone() else {
415 return;
416 };
417 if row_group_index >= 0 {
418 let row_group_index = row_group_index as usize;
419 let Some(row_selection) = self.selection.get(row_group_index) else {
421 return;
422 };
423 ranges.push(FileRange::new(
424 context,
425 row_group_index,
426 Some(row_selection.clone()),
427 ));
428 } else {
429 ranges.extend(
431 self.selection
432 .iter()
433 .map(|(row_group_index, row_selection)| {
434 FileRange::new(
435 context.clone(),
436 *row_group_index,
437 Some(row_selection.clone()),
438 )
439 }),
440 );
441 }
442 }
443
444 pub(crate) fn memory_size(&self) -> usize {
446 let context_size = self
447 .context
448 .as_ref()
449 .map(|ctx| ctx.memory_size())
450 .unwrap_or(0);
451 let selection_size = self.selection.mem_usage();
452 context_size + selection_size
453 }
454}
455
456pub(crate) struct MemRangeBuilder {
458 range: MemtableRange,
460 stats: MemtableStats,
462}
463
464impl MemRangeBuilder {
465 pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
467 Self { range, stats }
468 }
469
470 pub(crate) fn build_ranges(
473 &self,
474 _row_group_index: i64,
475 ranges: &mut SmallVec<[MemtableRange; 2]>,
476 ) {
477 ranges.push(self.range.clone())
478 }
479
480 pub(crate) fn stats(&self) -> &MemtableStats {
482 &self.stats
483 }
484}
485
486pub(crate) fn file_range_counts<'a>(
494 num_memtables: usize,
495 num_files: usize,
496 ranges: &[RangeMeta],
497 part_ranges: impl Iterator<Item = &'a PartitionRange>,
498) -> Vec<usize> {
499 let mut counts = vec![0usize; num_files];
500 for part_range in part_ranges {
501 let range_meta = &ranges[part_range.identifier];
502 for row_group_index in &range_meta.row_group_indices {
503 if row_group_index.index >= num_memtables {
504 let file_index = row_group_index.index - num_memtables;
505 if file_index < num_files {
506 counts[file_index] += 1;
507 }
508 }
509 }
510 }
511 counts
512}
513
514struct FileBuilderEntry {
516 builder: Option<Arc<FileRangeBuilder>>,
518 remaining_ranges: usize,
520}
521
522pub(crate) struct RangeBuilderList {
526 num_memtables: usize,
527 file_entries: Mutex<Vec<FileBuilderEntry>>,
528}
529
530impl RangeBuilderList {
531 pub(crate) fn new(num_memtables: usize, file_range_counts: Vec<usize>) -> Self {
537 let file_entries = file_range_counts
538 .into_iter()
539 .map(|count| FileBuilderEntry {
540 builder: None,
541 remaining_ranges: count,
542 })
543 .collect();
544
545 Self {
546 num_memtables,
547 file_entries: Mutex::new(file_entries),
548 }
549 }
550
551 pub(crate) async fn build_file_ranges(
553 &self,
554 input: &ScanInput,
555 index: RowGroupIndex,
556 reader_metrics: &mut ReaderMetrics,
557 ) -> Result<SmallVec<[FileRange; 2]>> {
558 let mut ranges = SmallVec::new();
559 let file_index = index.index - self.num_memtables;
560 let builder_opt = self.get_file_builder(file_index);
561 match builder_opt {
562 Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
563 None => {
564 let file = &input.files[file_index];
565 let builder = input.prune_file(file, reader_metrics).await?;
566 builder.build_ranges(index.row_group_index, &mut ranges);
567
568 reader_metrics.metadata_mem_size += builder.memory_size() as isize;
570 reader_metrics.num_range_builders += 1;
571
572 self.set_file_builder(file_index, Arc::new(builder));
573 }
574 }
575
576 self.decrement_and_maybe_clear(file_index, reader_metrics);
578
579 Ok(ranges)
580 }
581
582 fn get_file_builder(&self, file_index: usize) -> Option<Arc<FileRangeBuilder>> {
583 let entries = self.file_entries.lock().unwrap();
584 entries
585 .get(file_index)
586 .and_then(|entry| entry.builder.clone())
587 }
588
589 fn set_file_builder(&self, file_index: usize, builder: Arc<FileRangeBuilder>) {
590 let mut entries = self.file_entries.lock().unwrap();
591 if let Some(entry) = entries.get_mut(file_index) {
592 entry.builder = Some(builder);
593 }
594 }
595
596 fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
598 let mut entries = self.file_entries.lock().unwrap();
599 if let Some(entry) = entries.get_mut(file_index)
600 && entry.remaining_ranges > 0
601 {
602 entry.remaining_ranges -= 1;
603 if entry.remaining_ranges == 0
604 && let Some(builder) = entry.builder.take()
605 {
606 reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
607 reader_metrics.num_range_builders -= 1;
608 }
609 }
610 }
611}
612
613#[cfg(test)]
614mod tests {
615 use common_time::Timestamp;
616 use common_time::timestamp::TimeUnit;
617
618 use super::*;
619
620 type Output = (Vec<usize>, i64, i64);
621
622 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
623 let ranges = input
624 .iter()
625 .map(|(idx, start, end)| {
626 let time_range = (
627 Timestamp::new(*start, TimeUnit::Second),
628 Timestamp::new(*end, TimeUnit::Second),
629 );
630 RangeMeta {
631 time_range,
632 indices: smallvec![SourceIndex {
633 index: *idx,
634 num_row_groups: 0,
635 }],
636 row_group_indices: smallvec![RowGroupIndex {
637 index: *idx,
638 row_group_index: 0
639 }],
640 num_rows: 1,
641 }
642 })
643 .collect();
644 let output = group_ranges_for_seq_scan(ranges);
645 let actual: Vec<_> = output
646 .iter()
647 .map(|range| {
648 let indices = range.indices.iter().map(|index| index.index).collect();
649 let group_indices: Vec<_> = range
650 .row_group_indices
651 .iter()
652 .map(|idx| idx.index)
653 .collect();
654 assert_eq!(indices, group_indices);
655 let range = range.time_range;
656 (indices, range.0.value(), range.1.value())
657 })
658 .collect();
659 assert_eq!(expect, actual);
660 }
661
662 #[test]
663 fn test_group_ranges() {
664 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
666
667 run_group_ranges_test(
669 &[
670 (1, 1000, 2000),
671 (2, 6000, 7000),
672 (3, 0, 1500),
673 (4, 1500, 3000),
674 ],
675 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
676 );
677
678 run_group_ranges_test(
680 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
681 &[
682 (vec![3], 0, 1000),
683 (vec![1], 3000, 4000),
684 (vec![2], 4001, 6000),
685 ],
686 );
687
688 run_group_ranges_test(
690 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
691 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
692 );
693 }
694
695 #[test]
696 fn test_merge_range() {
697 let mut left = RangeMeta {
698 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
699 indices: smallvec![SourceIndex {
700 index: 1,
701 num_row_groups: 2,
702 }],
703 row_group_indices: smallvec![
704 RowGroupIndex {
705 index: 1,
706 row_group_index: 1
707 },
708 RowGroupIndex {
709 index: 1,
710 row_group_index: 2
711 }
712 ],
713 num_rows: 5,
714 };
715 let right = RangeMeta {
716 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
717 indices: smallvec![SourceIndex {
718 index: 2,
719 num_row_groups: 2,
720 }],
721 row_group_indices: smallvec![
722 RowGroupIndex {
723 index: 2,
724 row_group_index: 1
725 },
726 RowGroupIndex {
727 index: 2,
728 row_group_index: 2
729 }
730 ],
731 num_rows: 4,
732 };
733 left.merge(right);
734
735 assert_eq!(
736 left,
737 RangeMeta {
738 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
739 indices: smallvec![
740 SourceIndex {
741 index: 1,
742 num_row_groups: 2
743 },
744 SourceIndex {
745 index: 2,
746 num_row_groups: 2
747 }
748 ],
749 row_group_indices: smallvec![
750 RowGroupIndex {
751 index: 1,
752 row_group_index: 1
753 },
754 RowGroupIndex {
755 index: 1,
756 row_group_index: 2
757 },
758 RowGroupIndex {
759 index: 2,
760 row_group_index: 1
761 },
762 RowGroupIndex {
763 index: 2,
764 row_group_index: 2
765 },
766 ],
767 num_rows: 9,
768 }
769 );
770 }
771
772 #[test]
773 fn test_split_range() {
774 let range = RangeMeta {
775 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
776 indices: smallvec![SourceIndex {
777 index: 1,
778 num_row_groups: 2,
779 }],
780 row_group_indices: smallvec![RowGroupIndex {
781 index: 1,
782 row_group_index: ALL_ROW_GROUPS,
783 }],
784 num_rows: 5,
785 };
786
787 assert!(range.can_split_preserve_order());
788 let mut output = Vec::new();
789 range.maybe_split(&mut output);
790
791 assert_eq!(
792 output,
793 &[
794 RangeMeta {
795 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
796 indices: smallvec![SourceIndex {
797 index: 1,
798 num_row_groups: 2,
799 }],
800 row_group_indices: smallvec![RowGroupIndex {
801 index: 1,
802 row_group_index: 0
803 },],
804 num_rows: 2,
805 },
806 RangeMeta {
807 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
808 indices: smallvec![SourceIndex {
809 index: 1,
810 num_row_groups: 2,
811 }],
812 row_group_indices: smallvec![RowGroupIndex {
813 index: 1,
814 row_group_index: 1
815 }],
816 num_rows: 2,
817 }
818 ]
819 );
820 }
821
822 #[test]
823 fn test_not_split_range() {
824 let range = RangeMeta {
825 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
826 indices: smallvec![
827 SourceIndex {
828 index: 1,
829 num_row_groups: 1,
830 },
831 SourceIndex {
832 index: 2,
833 num_row_groups: 1,
834 }
835 ],
836 row_group_indices: smallvec![
837 RowGroupIndex {
838 index: 1,
839 row_group_index: 1
840 },
841 RowGroupIndex {
842 index: 2,
843 row_group_index: 1
844 }
845 ],
846 num_rows: 5,
847 };
848
849 assert!(!range.can_split_preserve_order());
850 let mut output = Vec::new();
851 range.maybe_split(&mut output);
852 assert_eq!(1, output.len());
853 }
854
855 #[test]
856 fn test_maybe_split_ranges() {
857 let ranges = vec![
858 RangeMeta {
859 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
860 indices: smallvec![SourceIndex {
861 index: 0,
862 num_row_groups: 1,
863 }],
864 row_group_indices: smallvec![RowGroupIndex {
865 index: 0,
866 row_group_index: 0,
867 },],
868 num_rows: 4,
869 },
870 RangeMeta {
871 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
872 indices: smallvec![SourceIndex {
873 index: 1,
874 num_row_groups: 2,
875 }],
876 row_group_indices: smallvec![RowGroupIndex {
877 index: 1,
878 row_group_index: ALL_ROW_GROUPS,
879 },],
880 num_rows: 4,
881 },
882 RangeMeta {
883 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
884 indices: smallvec![
885 SourceIndex {
886 index: 2,
887 num_row_groups: 2,
888 },
889 SourceIndex {
890 index: 3,
891 num_row_groups: 0,
892 }
893 ],
894 row_group_indices: smallvec![
895 RowGroupIndex {
896 index: 2,
897 row_group_index: ALL_ROW_GROUPS,
898 },
899 RowGroupIndex {
900 index: 3,
901 row_group_index: ALL_ROW_GROUPS,
902 }
903 ],
904 num_rows: 5,
905 },
906 ];
907 let output = maybe_split_ranges_for_seq_scan(ranges);
908 assert_eq!(
909 output,
910 vec![
911 RangeMeta {
912 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
913 indices: smallvec![SourceIndex {
914 index: 0,
915 num_row_groups: 1,
916 }],
917 row_group_indices: smallvec![RowGroupIndex {
918 index: 0,
919 row_group_index: 0
920 },],
921 num_rows: 4,
922 },
923 RangeMeta {
924 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
925 indices: smallvec![SourceIndex {
926 index: 1,
927 num_row_groups: 2,
928 }],
929 row_group_indices: smallvec![RowGroupIndex {
930 index: 1,
931 row_group_index: 0
932 },],
933 num_rows: 2,
934 },
935 RangeMeta {
936 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
937 indices: smallvec![SourceIndex {
938 index: 1,
939 num_row_groups: 2,
940 }],
941 row_group_indices: smallvec![RowGroupIndex {
942 index: 1,
943 row_group_index: 1
944 }],
945 num_rows: 2,
946 },
947 RangeMeta {
948 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
949 indices: smallvec![
950 SourceIndex {
951 index: 2,
952 num_row_groups: 2
953 },
954 SourceIndex {
955 index: 3,
956 num_row_groups: 0,
957 }
958 ],
959 row_group_indices: smallvec![
960 RowGroupIndex {
961 index: 2,
962 row_group_index: ALL_ROW_GROUPS,
963 },
964 RowGroupIndex {
965 index: 3,
966 row_group_index: ALL_ROW_GROUPS,
967 }
968 ],
969 num_rows: 5,
970 },
971 ]
972 )
973 }
974}