1use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20
21use bytes::{Buf, Bytes};
22use common_base::BitVec;
23use common_base::readable_size::ReadableSize;
24use common_time::Timestamp;
25use smallvec::{SmallVec, smallvec};
26
27use crate::sst::file::{FileHandle, RegionFileId};
28
29const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
31
32pub trait Ranged {
34 type BoundType: Ord + Copy;
35
36 fn range(&self) -> (Self::BoundType, Self::BoundType);
38
39 fn overlap(&self, other: &Self) -> bool {
40 let (lhs_start, lhs_end) = self.range();
41 let (rhs_start, rhs_end) = other.range();
42
43 lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
44 }
45
46 fn overlap_inclusive(&self, other: &Self) -> bool {
49 let (lhs_start, lhs_end) = self.range();
50 let (rhs_start, rhs_end) = other.range();
51
52 lhs_start.max(rhs_start) <= lhs_end.min(rhs_end)
53 }
54}
55
56pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool {
57 lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk())
58}
59
60pub(crate) fn merge_primary_key_ranges(
61 lhs: Option<(Bytes, Bytes)>,
62 rhs: Option<(Bytes, Bytes)>,
63) -> Option<(Bytes, Bytes)> {
64 match (lhs, rhs) {
65 (Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => {
66 Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max)))
67 }
68 _ => None,
69 }
70}
71
72pub fn find_overlapping_items<T: Item + Clone>(
73 l: &mut SortedRun<T>,
74 r: &mut SortedRun<T>,
75 result: &mut Vec<T>,
76) {
77 if l.items.is_empty() || r.items.is_empty() {
78 return;
79 }
80
81 result.clear();
82 result.reserve(l.items.len() + r.items.len());
83
84 if !l.sorted {
86 sort_ranged_items(&mut l.items);
87 l.sorted = true;
88 }
89 if !r.sorted {
90 sort_ranged_items(&mut r.items);
91 r.sorted = true;
92 }
93
94 let mut r_idx = 0;
95
96 let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
97
98 for (lhs_idx, lhs) in l.items.iter().enumerate() {
99 let (lhs_start, lhs_end) = lhs.range();
100
101 while r_idx < r.items.len() {
103 let (_, rhs_end) = r.items[r_idx].range();
104 if rhs_end < lhs_start {
105 r_idx += 1;
106 } else {
107 break;
108 }
109 }
110
111 let mut j = r_idx;
113 while j < r.items.len() {
114 let (rhs_start, _rhs_end) = r.items[j].range();
115
116 if rhs_start > lhs_end {
118 break;
119 }
120
121 if lhs.overlap_inclusive(&r.items[j]) {
123 if !selected[lhs_idx] {
124 result.push(lhs.clone());
125 selected.set(lhs_idx, true);
126 }
127
128 let rhs_selected_idx = l.items.len() + j;
129 if !selected[rhs_selected_idx] {
130 result.push(r.items[j].clone());
131 selected.set(rhs_selected_idx, true);
132 }
133 }
134
135 j += 1;
136 }
137 }
138}
139
140fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
142 values.sort_unstable_by(|l, r| {
143 let (l_start, l_end) = l.range();
144 let (r_start, r_end) = r.range();
145 l_start.cmp(&r_start).then(r_end.cmp(&l_end))
146 });
147}
148
149pub trait Item: Ranged + Clone {
151 fn size(&self) -> usize;
153}
154
155#[derive(Debug, Clone)]
157pub struct FileGroup {
158 files: SmallVec<[FileHandle; 2]>,
159 size: usize,
160 num_rows: usize,
161 min_timestamp: Timestamp,
162 max_timestamp: Timestamp,
163 primary_key_range: Option<(Bytes, Bytes)>,
164}
165
166impl FileGroup {
167 pub(crate) fn new_with_file(file: FileHandle) -> Self {
168 let size = file.size() as usize;
169 let (min_timestamp, max_timestamp) = file.time_range();
170 let num_rows = file.num_rows();
171 let primary_key_range = file.primary_key_range();
172 Self {
173 files: smallvec![file],
174 size,
175 num_rows,
176 min_timestamp,
177 max_timestamp,
178 primary_key_range,
179 }
180 }
181
182 pub(crate) fn num_rows(&self) -> usize {
183 self.num_rows
184 }
185
186 pub(crate) fn add_file(&mut self, file: FileHandle) {
187 self.size += file.size() as usize;
188 self.num_rows += file.num_rows();
189 let (min_timestamp, max_timestamp) = file.time_range();
190 self.min_timestamp = self.min_timestamp.min(min_timestamp);
191 self.max_timestamp = self.max_timestamp.max(max_timestamp);
192 self.primary_key_range =
193 merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
194 self.files.push(file);
195 }
196
197 pub(crate) fn num_files(&self) -> usize {
198 self.files.len()
199 }
200
201 #[cfg(test)]
202 pub(crate) fn files(&self) -> &[FileHandle] {
203 &self.files[..]
204 }
205
206 pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
207 SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
208 }
209
210 pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
211 self.files.into_iter()
212 }
213}
214
215impl Ranged for FileGroup {
216 type BoundType = Timestamp;
217
218 fn range(&self) -> (Self::BoundType, Self::BoundType) {
219 (self.min_timestamp, self.max_timestamp)
220 }
221
222 fn overlap(&self, other: &Self) -> bool {
223 let (lhs_start, lhs_end) = self.range();
224 let (rhs_start, rhs_end) = other.range();
225 if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) {
226 return false;
227 }
228
229 match (&self.primary_key_range, &other.primary_key_range) {
230 (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
231 _ => true,
232 }
233 }
234
235 fn overlap_inclusive(&self, other: &Self) -> bool {
236 let (lhs_start, lhs_end) = self.range();
237 let (rhs_start, rhs_end) = other.range();
238 if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) {
239 return false;
240 }
241
242 match (&self.primary_key_range, &other.primary_key_range) {
243 (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
244 _ => true,
245 }
246 }
247}
248
249impl Item for FileGroup {
250 fn size(&self) -> usize {
251 self.size
252 }
253}
254
255#[derive(Debug, Clone)]
257pub struct SortedRun<T: Item> {
258 items: Vec<T>,
260 size: usize,
262 start: Option<T::BoundType>,
264 end: Option<T::BoundType>,
266 sorted: bool,
268}
269
270impl<T: Item> From<Vec<T>> for SortedRun<T> {
271 fn from(items: Vec<T>) -> Self {
272 let mut r = Self {
273 items: Vec::with_capacity(items.len()),
274 size: 0,
275 start: None,
276 end: None,
277 sorted: false,
278 };
279 for item in items {
280 r.push_item(item);
281 }
282
283 r
284 }
285}
286
287impl<T> Default for SortedRun<T>
288where
289 T: Item,
290{
291 fn default() -> Self {
292 Self {
293 items: vec![],
294 size: 0,
295 start: None,
296 end: None,
297 sorted: false,
298 }
299 }
300}
301
302impl<T> SortedRun<T>
303where
304 T: Item,
305{
306 pub fn items(&self) -> &[T] {
307 &self.items
308 }
309
310 fn push_item(&mut self, t: T) {
311 let (file_start, file_end) = t.range();
312 self.size += t.size();
313 self.items.push(t);
314 self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
315 self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
316 }
317}
318
319pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
321where
322 T: Item,
323{
324 if items.is_empty() {
325 return vec![];
326 }
327 sort_ranged_items(items);
329
330 let mut current_run = SortedRun::default();
331 let mut runs = vec![];
332 let mut active_run_item_indices = Vec::new();
333
334 let mut selection = BitVec::repeat(false, items.len());
335 while !selection.all() {
336 let mut last_pruned_start = None;
338 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
339 if *selected {
340 continue;
342 }
343 if current_run.items.is_empty() {
344 selected.set(true);
346 current_run.push_item(item.clone());
347 active_run_item_indices.push(current_run.items.len() - 1);
348 } else {
349 let (item_start, _) = item.range();
353 if last_pruned_start != Some(item_start) {
354 active_run_item_indices.retain(|idx| {
355 let (_, run_item_end) = current_run.items[*idx].range();
356 run_item_end > item_start
357 });
358 last_pruned_start = Some(item_start);
359 }
360
361 let mut overlaps_any = false;
362 for idx in &active_run_item_indices {
363 let run_item = ¤t_run.items[*idx];
364 if run_item.overlap(item) {
365 overlaps_any = true;
366 break;
367 }
368 }
369 if !overlaps_any {
370 selected.set(true);
372 let item_idx = current_run.items.len();
373 current_run.push_item(item.clone());
374 active_run_item_indices.push(item_idx);
375 }
376 }
377 }
378 runs.push(std::mem::take(&mut current_run));
380 active_run_item_indices.clear();
381 }
382 runs
383}
384
385#[cfg(any(test, feature = "test", feature = "testing"))]
386pub fn find_sorted_runs_original<T>(items: &mut [T]) -> Vec<SortedRun<T>>
387where
388 T: Item,
389{
390 if items.is_empty() {
391 return vec![];
392 }
393 sort_ranged_items(items);
395
396 let mut current_run = SortedRun::default();
397 let mut runs = vec![];
398
399 let mut selection = BitVec::repeat(false, items.len());
400 while !selection.all() {
401 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
403 if *selected {
404 continue;
406 }
407 if current_run.items.is_empty() {
408 selected.set(true);
410 current_run.push_item(item.clone());
411 } else {
412 let overlaps_any = current_run.items.iter().any(|i| i.overlap(item));
416 if !overlaps_any {
417 selected.set(true);
419 current_run.push_item(item.clone());
420 }
421 }
422 }
423 runs.push(std::mem::take(&mut current_run));
425 }
426 runs
427}
428
429pub(crate) fn find_sorted_runs_by_time_range<T>(items: &mut [T]) -> Vec<SortedRun<T>>
430where
431 T: Item,
432{
433 if items.is_empty() {
434 return vec![];
435 }
436 sort_ranged_items(items);
437
438 use derive_more::{Eq, PartialEq};
439
440 #[derive(PartialEq, Eq)]
442 struct Run<T: Item> {
443 i: usize,
444 #[partial_eq(skip)]
445 run: SortedRun<T>,
446 }
447
448 impl<T: Item> Run<T> {
449 fn new(i: usize, item: &T) -> Run<T> {
450 let mut run = SortedRun::default();
451 run.push_item(item.clone());
452 Run { i, run }
453 }
454
455 fn push_item(&mut self, item: &T) {
456 self.run.push_item(item.clone());
457 }
458 }
459
460 impl<T: Item> PartialOrd for Run<T> {
461 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
462 Some(self.cmp(other))
463 }
464 }
465
466 impl<T: Item> Ord for Run<T> {
468 fn cmp(&self, other: &Self) -> Ordering {
469 let l_run = &self.run;
470 let r_run = &other.run;
471
472 let l_end = l_run.end.unwrap();
477 let r_end = r_run.end.unwrap();
478 r_end
479 .cmp(&l_end)
480 .then_with(|| {
481 let l_start = l_run.start.unwrap();
482 let r_start = r_run.start.unwrap();
483 l_start.cmp(&r_start)
484 })
485 .then_with(|| self.i.cmp(&other.i))
486 }
487 }
488
489 #[derive(PartialEq, Eq)]
491 struct Wrapper<T: Item>(Run<T>);
492
493 impl<T: Item> PartialOrd for Wrapper<T> {
494 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
495 Some(self.cmp(other))
496 }
497 }
498
499 impl<T: Item> Ord for Wrapper<T> {
500 fn cmp(&self, other: &Self) -> Ordering {
501 other.0.i.cmp(&self.0.i)
502 }
503 }
504
505 let mut runs_sort_by_end = BinaryHeap::<Run<T>>::new();
530 let mut runs_sort_by_index = BinaryHeap::<Wrapper<T>>::new();
531 let mut i = 0;
532
533 for item in items {
534 let (start, _) = item.range();
535
536 while let Some(run) = runs_sort_by_end.pop_if(|x| x.run.end.unwrap() <= start) {
537 runs_sort_by_index.push(Wrapper(run));
538 }
539
540 let Some(mut run) = runs_sort_by_index.pop() else {
541 i += 1;
542 runs_sort_by_end.push(Run::new(i, item));
543 continue;
544 };
545
546 run.0.push_item(item);
547 runs_sort_by_end.push(run.0);
548 }
549
550 let mut runs = runs_sort_by_end.into_vec();
551 runs.extend(runs_sort_by_index.into_vec().into_iter().map(|x| x.0));
552 runs.sort_unstable_by_key(|run| run.i);
553 runs.into_iter().map(|x| x.run).collect()
554}
555
556pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
559 assert!(runs.len() > 1);
560 runs.sort_unstable_by_key(|a| a.size);
562 let probe_end = runs.len().min(100);
564 let mut min_penalty = usize::MAX;
565 let mut files = vec![];
566 let mut temp_files = vec![];
567 for i in 0..probe_end {
568 for j in i + 1..probe_end {
569 let (a, b) = runs.split_at_mut(j);
570 find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
571 let penalty = temp_files.iter().map(|e| e.size()).sum();
572 if penalty < min_penalty {
573 min_penalty = penalty;
574 files.clear();
575 files.extend_from_slice(&temp_files);
576 }
577 }
578 }
579 files
580}
581
582pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
603 if input_files.is_empty() || input_files.len() == 1 {
604 return vec![];
605 }
606
607 let files_to_process = if input_files.len() > 100 {
609 &input_files[0..100]
610 } else {
611 input_files
612 };
613
614 let target_size = match max_file_size {
616 Some(size) => size as usize,
617 None => {
618 let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
620 ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
621 .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
622 }
623 };
624
625 let mut best_group = Vec::new();
627 let mut best_score = f64::NEG_INFINITY;
628
629 for start_idx in (0..files_to_process.len()).rev() {
631 for end_idx in (start_idx + 1..files_to_process.len()).rev() {
633 let group = &files_to_process[start_idx..=end_idx];
634 let total_size: usize = group.iter().map(|f| f.size()).sum();
635
636 if total_size > target_size {
638 continue; }
640
641 let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
643 let amplification_factor = largest_file_size as f64 / total_size as f64;
644
645 let file_reduction = group.len() - 1;
647
648 let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
653 let amp_factor_score = (1.0 - amplification_factor) * 1.5; let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); let score = file_reduction_score + amp_factor_score + size_efficiency;
657
658 if score >= best_score {
661 best_score = score;
662 best_group = group.to_vec();
663 }
664 }
665 }
666
667 best_group
668}
669
670#[cfg(test)]
671mod tests {
672 use std::collections::HashSet;
673
674 use bytes::Bytes;
675 use store_api::storage::FileId;
676
677 use super::*;
678 use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
679
680 #[derive(Clone, Debug, PartialEq)]
681 struct MockFile {
682 start: i64,
683 end: i64,
684 size: usize,
685 }
686
687 impl Ranged for MockFile {
688 type BoundType = i64;
689
690 fn range(&self) -> (Self::BoundType, Self::BoundType) {
691 (self.start, self.end)
692 }
693 }
694
695 impl Item for MockFile {
696 fn size(&self) -> usize {
697 self.size
698 }
699 }
700
701 fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
702 ranges
703 .iter()
704 .map(|(start, end)| MockFile {
705 start: *start,
706 end: *end,
707 size: (*end - *start) as usize,
708 })
709 .collect()
710 }
711
712 fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
713 items
714 .iter()
715 .map(|(start, end, size)| MockFile {
716 start: *start,
717 end: *end,
718 size: *size,
719 })
720 .collect()
721 }
722
723 fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
724 Some((Bytes::from_static(min), Bytes::from_static(max)))
725 }
726
727 fn check_sorted_runs(
728 ranges: &[(i64, i64)],
729 expected_runs: &[Vec<(i64, i64)>],
730 ) -> Vec<SortedRun<MockFile>> {
731 let mut files = build_items(ranges);
732 let mut files_clone = files.clone();
733
734 let runs = find_sorted_runs(&mut files);
735
736 let result_file_ranges: Vec<Vec<_>> = runs
737 .iter()
738 .map(|r| r.items.iter().map(|f| f.range()).collect())
739 .collect();
740 assert_eq!(&expected_runs, &result_file_ranges);
741
742 let runs_by_time_range = find_sorted_runs_by_time_range(&mut files_clone);
743 let results: Vec<Vec<_>> = runs_by_time_range
744 .iter()
745 .map(|r| r.items.iter().map(|f| f.range()).collect())
746 .collect();
747 assert_eq!(&expected_runs, &results);
748 runs
749 }
750
751 fn sorted_run_ranges<T: Item>(runs: &[SortedRun<T>]) -> Vec<Vec<T::BoundType>> {
752 runs.iter()
753 .map(|r| {
754 r.items
755 .iter()
756 .flat_map(|f| {
757 let (start, end) = f.range();
758 [start, end]
759 })
760 .collect()
761 })
762 .collect()
763 }
764
765 fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) {
766 let mut files = build_items(ranges);
767 let mut files_for_original = files.clone();
768
769 let runs = find_sorted_runs(&mut files);
770 let original_runs = find_sorted_runs_original(&mut files_for_original);
771
772 assert_eq!(sorted_run_ranges(&original_runs), sorted_run_ranges(&runs));
773 }
774
775 #[test]
776 fn test_find_sorted_runs() {
777 check_sorted_runs(&[], &[]);
778 check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
779 check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
780 check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
781 check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
782 check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
783 check_sorted_runs(
784 &[(1, 3), (2, 4), (4, 5)],
785 &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
786 );
787
788 check_sorted_runs(
789 &[(1, 2), (3, 4), (3, 5)],
790 &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
791 );
792
793 check_sorted_runs(
794 &[(1, 3), (2, 4), (5, 6)],
795 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
796 );
797
798 check_sorted_runs(
799 &[(1, 2), (3, 5), (4, 6)],
800 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
801 );
802
803 check_sorted_runs(
804 &[(1, 2), (3, 4), (4, 6), (7, 8)],
805 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
806 );
807 check_sorted_runs(
808 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
809 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
810 );
811
812 check_sorted_runs(
813 &[(10, 19), (20, 21), (20, 29), (30, 39)],
814 &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
815 );
816
817 check_sorted_runs(
818 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
819 &[
820 vec![(10, 19), (20, 29), (30, 39)],
821 vec![(21, 22), (31, 32), (32, 42)],
822 ],
823 );
824 }
825
826 #[test]
827 fn test_find_sorted_runs_matches_original_impl() {
828 for ranges in [
829 &[][..],
830 &[(1, 1), (2, 2)],
831 &[(1, 2), (2, 3)],
832 &[(2, 4), (1, 3)],
833 &[(1, 3), (2, 4), (4, 5)],
834 &[(1, 2), (3, 4), (3, 5)],
835 &[(1, 3), (2, 4), (5, 6)],
836 &[(1, 2), (3, 5), (4, 6)],
837 &[(1, 2), (3, 4), (4, 6), (7, 8)],
838 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
839 &[(10, 19), (20, 21), (20, 29), (30, 39)],
840 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
841 &[(32, 42), (10, 19), (31, 32), (20, 29), (21, 22), (30, 39)],
842 ] {
843 check_find_sorted_runs_consistency(ranges);
844 }
845 }
846
847 fn check_reduce_runs(
848 files: &[(i64, i64)],
849 expected_runs: &[Vec<(i64, i64)>],
850 expected: &[(i64, i64)],
851 ) {
852 let runs = check_sorted_runs(files, expected_runs);
853 if runs.len() <= 1 {
854 assert!(expected.is_empty());
855 return;
856 }
857 let files_to_merge = reduce_runs(runs);
858 let file_to_merge_timestamps = files_to_merge
859 .into_iter()
860 .map(|f| (f.start, f.end))
861 .collect::<HashSet<_>>();
862
863 let expected = expected.iter().cloned().collect::<HashSet<_>>();
864 assert_eq!(&expected, &file_to_merge_timestamps);
865 }
866
867 #[test]
868 fn test_reduce_runs() {
869 check_reduce_runs(
872 &[(1, 3), (2, 4), (5, 6)],
873 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
874 &[(1, 3), (2, 4)],
875 );
876
877 check_reduce_runs(
880 &[(1, 2), (3, 5), (4, 6)],
881 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
882 &[(3, 5), (4, 6)],
883 );
884
885 check_reduce_runs(
888 &[(1, 2), (3, 4), (4, 6), (7, 8)],
889 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
890 &[],
891 );
892
893 check_reduce_runs(
896 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
897 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
898 &[(5, 6), (3, 4), (3, 6)], );
900
901 check_reduce_runs(
904 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
905 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
906 &[(3, 4), (3, 6), (5, 6)],
907 );
908
909 check_reduce_runs(
913 &[
914 (10, 20),
915 (30, 40),
916 (50, 60),
917 (50, 80),
918 (80, 90),
919 (80, 100),
920 (100, 110),
921 ],
922 &[
923 vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
924 vec![(50, 60), (80, 90)],
925 ],
926 &[(50, 80), (80, 100), (50, 60), (80, 90)],
927 );
928
929 check_reduce_runs(
934 &[(0, 10), (0, 11), (0, 12), (0, 13)],
935 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
936 &[(0, 10), (0, 11)],
937 );
938 }
939
940 #[test]
941 fn test_find_overlapping_items() {
942 let mut result = Vec::new();
943
944 find_overlapping_items(
946 &mut SortedRun::from(Vec::<MockFile>::new()),
947 &mut SortedRun::from(Vec::<MockFile>::new()),
948 &mut result,
949 );
950 assert_eq!(result, Vec::<MockFile>::new());
951
952 let files1 = build_items(&[(1, 3)]);
953 find_overlapping_items(
954 &mut SortedRun::from(files1.clone()),
955 &mut SortedRun::from(Vec::<MockFile>::new()),
956 &mut result,
957 );
958 assert_eq!(result, Vec::<MockFile>::new());
959
960 find_overlapping_items(
961 &mut SortedRun::from(Vec::<MockFile>::new()),
962 &mut SortedRun::from(files1.clone()),
963 &mut result,
964 );
965 assert_eq!(result, Vec::<MockFile>::new());
966
967 let files1 = build_items(&[(1, 3), (5, 7)]);
969 let files2 = build_items(&[(10, 12), (15, 20)]);
970 find_overlapping_items(
971 &mut SortedRun::from(files1),
972 &mut SortedRun::from(files2),
973 &mut result,
974 );
975 assert_eq!(result, Vec::<MockFile>::new());
976
977 let files1 = build_items(&[(1, 5)]);
979 let files2 = build_items(&[(3, 7)]);
980 find_overlapping_items(
981 &mut SortedRun::from(files1),
982 &mut SortedRun::from(files2),
983 &mut result,
984 );
985 assert_eq!(result.len(), 2);
986 assert_eq!(result[0].range(), (1, 5));
987 assert_eq!(result[1].range(), (3, 7));
988
989 let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
991 let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
992 find_overlapping_items(
993 &mut SortedRun::from(files1),
994 &mut SortedRun::from(files2),
995 &mut result,
996 );
997 assert_eq!(result.len(), 6);
998
999 let files1 = build_items(&[(1, 5)]);
1001 let files2 = build_items(&[(5, 10)]); find_overlapping_items(
1003 &mut SortedRun::from(files1),
1004 &mut SortedRun::from(files2),
1005 &mut result,
1006 );
1007 assert_eq!(result.len(), 2); let files1 = build_items(&[(1, 10)]);
1011 let files2 = build_items(&[(3, 7)]);
1012 find_overlapping_items(
1013 &mut SortedRun::from(files1),
1014 &mut SortedRun::from(files2),
1015 &mut result,
1016 );
1017 assert_eq!(result.len(), 2);
1018
1019 let files1 = build_items(&[(1, 5)]);
1021 let files2 = build_items(&[(1, 5)]);
1022 find_overlapping_items(
1023 &mut SortedRun::from(files1),
1024 &mut SortedRun::from(files2),
1025 &mut result,
1026 );
1027 assert_eq!(result.len(), 2);
1028
1029 let files1 = build_items(&[(5, 10), (1, 3)]); let files2 = build_items(&[(2, 7), (8, 12)]); find_overlapping_items(
1033 &mut SortedRun::from(files1),
1034 &mut SortedRun::from(files2),
1035 &mut result,
1036 );
1037 assert_eq!(result.len(), 4); }
1039
1040 #[test]
1041 fn test_file_group_overlap_time_overlap_pk_disjoint() {
1042 let lhs =
1043 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1044 FileId::random(),
1045 0,
1046 100,
1047 0,
1048 1,
1049 10,
1050 pk_range(b"a", b"f"),
1051 ));
1052 let rhs =
1053 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1054 FileId::random(),
1055 50,
1056 150,
1057 0,
1058 2,
1059 10,
1060 pk_range(b"x", b"z"),
1061 ));
1062
1063 assert!(!lhs.overlap(&rhs));
1064 }
1065
1066 #[test]
1067 fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() {
1068 let mut files = vec![
1069 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1070 FileId::random(),
1071 0,
1072 100,
1073 0,
1074 1,
1075 10,
1076 pk_range(b"a", b"f"),
1077 )),
1078 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1079 FileId::random(),
1080 50,
1081 150,
1082 0,
1083 2,
1084 10,
1085 pk_range(b"x", b"z"),
1086 )),
1087 ];
1088
1089 let runs = find_sorted_runs(&mut files);
1090
1091 assert_eq!(1, runs.len());
1092 assert_eq!(2, runs[0].items().len());
1093 }
1094
1095 #[test]
1096 fn test_find_sorted_runs_handles_2d_transitivity_break() {
1097 let mut files = vec![
1098 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1099 FileId::random(),
1100 0,
1101 100,
1102 0,
1103 1,
1104 10,
1105 pk_range(b"a", b"f"),
1106 )),
1107 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1108 FileId::random(),
1109 50,
1110 150,
1111 0,
1112 2,
1113 10,
1114 pk_range(b"x", b"z"),
1115 )),
1116 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1117 FileId::random(),
1118 50,
1119 150,
1120 0,
1121 3,
1122 10,
1123 pk_range(b"a", b"f"),
1124 )),
1125 ];
1126
1127 let runs = find_sorted_runs(&mut files);
1128
1129 assert_eq!(2, runs.len());
1130 assert_eq!(2, runs[0].items().len());
1131 assert_eq!(1, runs[1].items().len());
1132 }
1133
1134 #[test]
1135 fn test_find_overlapping_items_skips_pk_disjoint_pairs() {
1136 let mut left = SortedRun::from(vec![FileGroup::new_with_file(
1137 new_file_handle_with_size_sequence_and_primary_key_range(
1138 FileId::random(),
1139 0,
1140 100,
1141 0,
1142 1,
1143 10,
1144 pk_range(b"a", b"f"),
1145 ),
1146 )]);
1147 let mut right = SortedRun::from(vec![FileGroup::new_with_file(
1148 new_file_handle_with_size_sequence_and_primary_key_range(
1149 FileId::random(),
1150 50,
1151 150,
1152 0,
1153 2,
1154 10,
1155 pk_range(b"x", b"z"),
1156 ),
1157 )]);
1158 let mut result = Vec::new();
1159
1160 find_overlapping_items(&mut left, &mut right, &mut result);
1161
1162 assert!(result.is_empty());
1163 }
1164
1165 #[test]
1166 fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() {
1167 let lhs =
1168 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1169 FileId::random(),
1170 0,
1171 100,
1172 0,
1173 1,
1174 10,
1175 pk_range(b"a", b"f"),
1176 ));
1177 let rhs =
1178 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1179 FileId::random(),
1180 100,
1181 150,
1182 0,
1183 2,
1184 10,
1185 pk_range(b"a", b"f"),
1186 ));
1187
1188 assert!(!lhs.overlap(&rhs));
1189 }
1190
1191 #[test]
1192 fn test_merge_seq_files() {
1193 let files = Vec::<MockFile>::new();
1195 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
1196
1197 let files = build_items(&[(1, 5)]);
1199 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
1200
1201 let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
1203 let result = merge_seq_files(&files, None);
1204 assert_eq!(result.len(), 3);
1205 assert_eq!(result[0].size, 1);
1206 assert_eq!(result[1].size, 1);
1207 assert_eq!(result[2].size, 1);
1208
1209 let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
1211 let result = merge_seq_files(&files, Some(20));
1212 assert_eq!(result.len(), 4); let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
1216 let result = merge_seq_files(&files, Some(10));
1217 assert_eq!(result.len(), 2); let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
1221 let result = merge_seq_files(&files, Some(10));
1222 assert_eq!(result.len(), 3); let files =
1227 build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
1228 let result = merge_seq_files(&files, Some(12));
1229 assert_eq!(result.len(), 2);
1230 assert_eq!(result[0].size, 5);
1231 assert_eq!(result[1].size, 5);
1232
1233 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
1235 let result = merge_seq_files(&files, Some(10));
1236 assert_eq!(result.len(), 3); assert_eq!(result[0].size, 1);
1238 assert_eq!(result[1].size, 1);
1239 assert_eq!(result[2].size, 1);
1240
1241 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1242 let result = merge_seq_files(&files, Some(200));
1243 assert_eq!(result.len(), 4);
1244
1245 let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1246 let result = merge_seq_files(&files, None);
1247 assert_eq!(result.len(), 3);
1248 assert_eq!(result[0].size, 20);
1249 assert_eq!(result[1].size, 20);
1250 assert_eq!(result[2].size, 20);
1251
1252 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
1253 let result = merge_seq_files(&files, Some(200));
1254 assert_eq!(result.len(), 2);
1255 assert_eq!(result[0].size, 100);
1256 assert_eq!(result[1].size, 1);
1257
1258 let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1259 let result = merge_seq_files(&files, Some(40));
1260 assert_eq!(result.len(), 2);
1261 assert_eq!(result[0].start, 1);
1262 assert_eq!(result[1].start, 3);
1263 }
1264}