1use bytes::{Buf, Bytes};
19use common_base::BitVec;
20use common_base::readable_size::ReadableSize;
21use common_time::Timestamp;
22use smallvec::{SmallVec, smallvec};
23
24use crate::sst::file::{FileHandle, RegionFileId};
25
26const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
28
29pub trait Ranged {
31 type BoundType: Ord + Copy;
32
33 fn range(&self) -> (Self::BoundType, Self::BoundType);
35
36 fn overlap(&self, other: &Self) -> bool {
37 let (lhs_start, lhs_end) = self.range();
38 let (rhs_start, rhs_end) = other.range();
39
40 lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
41 }
42
43 fn overlap_inclusive(&self, other: &Self) -> bool {
46 let (lhs_start, lhs_end) = self.range();
47 let (rhs_start, rhs_end) = other.range();
48
49 lhs_start.max(rhs_start) <= lhs_end.min(rhs_end)
50 }
51}
52
53pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool {
54 lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk())
55}
56
57pub(crate) fn merge_primary_key_ranges(
58 lhs: Option<(Bytes, Bytes)>,
59 rhs: Option<(Bytes, Bytes)>,
60) -> Option<(Bytes, Bytes)> {
61 match (lhs, rhs) {
62 (Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => {
63 Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max)))
64 }
65 _ => None,
66 }
67}
68
69pub fn find_overlapping_items<T: Item + Clone>(
70 l: &mut SortedRun<T>,
71 r: &mut SortedRun<T>,
72 result: &mut Vec<T>,
73) {
74 if l.items.is_empty() || r.items.is_empty() {
75 return;
76 }
77
78 result.clear();
79 result.reserve(l.items.len() + r.items.len());
80
81 if !l.sorted {
83 sort_ranged_items(&mut l.items);
84 l.sorted = true;
85 }
86 if !r.sorted {
87 sort_ranged_items(&mut r.items);
88 r.sorted = true;
89 }
90
91 let mut r_idx = 0;
92
93 let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
94
95 for (lhs_idx, lhs) in l.items.iter().enumerate() {
96 let (lhs_start, lhs_end) = lhs.range();
97
98 while r_idx < r.items.len() {
100 let (_, rhs_end) = r.items[r_idx].range();
101 if rhs_end < lhs_start {
102 r_idx += 1;
103 } else {
104 break;
105 }
106 }
107
108 let mut j = r_idx;
110 while j < r.items.len() {
111 let (rhs_start, _rhs_end) = r.items[j].range();
112
113 if rhs_start > lhs_end {
115 break;
116 }
117
118 if lhs.overlap_inclusive(&r.items[j]) {
120 if !selected[lhs_idx] {
121 result.push(lhs.clone());
122 selected.set(lhs_idx, true);
123 }
124
125 let rhs_selected_idx = l.items.len() + j;
126 if !selected[rhs_selected_idx] {
127 result.push(r.items[j].clone());
128 selected.set(rhs_selected_idx, true);
129 }
130 }
131
132 j += 1;
133 }
134 }
135}
136
137fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
139 values.sort_unstable_by(|l, r| {
140 let (l_start, l_end) = l.range();
141 let (r_start, r_end) = r.range();
142 l_start.cmp(&r_start).then(r_end.cmp(&l_end))
143 });
144}
145
146pub trait Item: Ranged + Clone {
148 fn size(&self) -> usize;
150}
151
152#[derive(Debug, Clone)]
154pub struct FileGroup {
155 files: SmallVec<[FileHandle; 2]>,
156 size: usize,
157 num_rows: usize,
158 min_timestamp: Timestamp,
159 max_timestamp: Timestamp,
160 primary_key_range: Option<(Bytes, Bytes)>,
161}
162
163impl FileGroup {
164 pub(crate) fn new_with_file(file: FileHandle) -> Self {
165 let size = file.size() as usize;
166 let (min_timestamp, max_timestamp) = file.time_range();
167 let num_rows = file.num_rows();
168 let primary_key_range = file.primary_key_range();
169 Self {
170 files: smallvec![file],
171 size,
172 num_rows,
173 min_timestamp,
174 max_timestamp,
175 primary_key_range,
176 }
177 }
178
179 pub(crate) fn num_rows(&self) -> usize {
180 self.num_rows
181 }
182
183 pub(crate) fn add_file(&mut self, file: FileHandle) {
184 self.size += file.size() as usize;
185 self.num_rows += file.num_rows();
186 let (min_timestamp, max_timestamp) = file.time_range();
187 self.min_timestamp = self.min_timestamp.min(min_timestamp);
188 self.max_timestamp = self.max_timestamp.max(max_timestamp);
189 self.primary_key_range =
190 merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
191 self.files.push(file);
192 }
193
194 pub(crate) fn num_files(&self) -> usize {
195 self.files.len()
196 }
197
198 #[cfg(test)]
199 pub(crate) fn files(&self) -> &[FileHandle] {
200 &self.files[..]
201 }
202
203 pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
204 SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
205 }
206
207 pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
208 self.files.into_iter()
209 }
210}
211
212impl Ranged for FileGroup {
213 type BoundType = Timestamp;
214
215 fn range(&self) -> (Self::BoundType, Self::BoundType) {
216 (self.min_timestamp, self.max_timestamp)
217 }
218
219 fn overlap(&self, other: &Self) -> bool {
220 let (lhs_start, lhs_end) = self.range();
221 let (rhs_start, rhs_end) = other.range();
222 if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) {
223 return false;
224 }
225
226 match (&self.primary_key_range, &other.primary_key_range) {
227 (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
228 _ => true,
229 }
230 }
231
232 fn overlap_inclusive(&self, other: &Self) -> bool {
233 let (lhs_start, lhs_end) = self.range();
234 let (rhs_start, rhs_end) = other.range();
235 if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) {
236 return false;
237 }
238
239 match (&self.primary_key_range, &other.primary_key_range) {
240 (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
241 _ => true,
242 }
243 }
244}
245
246impl Item for FileGroup {
247 fn size(&self) -> usize {
248 self.size
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct SortedRun<T: Item> {
255 items: Vec<T>,
257 size: usize,
259 start: Option<T::BoundType>,
261 end: Option<T::BoundType>,
263 sorted: bool,
265}
266
267impl<T: Item> From<Vec<T>> for SortedRun<T> {
268 fn from(items: Vec<T>) -> Self {
269 let mut r = Self {
270 items: Vec::with_capacity(items.len()),
271 size: 0,
272 start: None,
273 end: None,
274 sorted: false,
275 };
276 for item in items {
277 r.push_item(item);
278 }
279
280 r
281 }
282}
283
284impl<T> Default for SortedRun<T>
285where
286 T: Item,
287{
288 fn default() -> Self {
289 Self {
290 items: vec![],
291 size: 0,
292 start: None,
293 end: None,
294 sorted: false,
295 }
296 }
297}
298
299impl<T> SortedRun<T>
300where
301 T: Item,
302{
303 pub fn items(&self) -> &[T] {
304 &self.items
305 }
306
307 fn push_item(&mut self, t: T) {
308 let (file_start, file_end) = t.range();
309 self.size += t.size();
310 self.items.push(t);
311 self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
312 self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
313 }
314}
315
316pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
318where
319 T: Item,
320{
321 if items.is_empty() {
322 return vec![];
323 }
324 sort_ranged_items(items);
326
327 let mut current_run = SortedRun::default();
328 let mut runs = vec![];
329
330 let mut selection = BitVec::repeat(false, items.len());
331 while !selection.all() {
332 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
334 if *selected {
335 continue;
337 }
338 if current_run.items.is_empty() {
339 selected.set(true);
341 current_run.push_item(item.clone());
342 } else {
343 let overlaps_any = current_run.items.iter().any(|i| i.overlap(item));
347 if !overlaps_any {
348 selected.set(true);
350 current_run.push_item(item.clone());
351 }
352 }
353 }
354 runs.push(std::mem::take(&mut current_run));
356 }
357 runs
358}
359
360pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
363 assert!(runs.len() > 1);
364 runs.sort_unstable_by_key(|a| a.size);
366 let probe_end = runs.len().min(100);
368 let mut min_penalty = usize::MAX;
369 let mut files = vec![];
370 let mut temp_files = vec![];
371 for i in 0..probe_end {
372 for j in i + 1..probe_end {
373 let (a, b) = runs.split_at_mut(j);
374 find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
375 let penalty = temp_files.iter().map(|e| e.size()).sum();
376 if penalty < min_penalty {
377 min_penalty = penalty;
378 files.clear();
379 files.extend_from_slice(&temp_files);
380 }
381 }
382 }
383 files
384}
385
386pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
407 if input_files.is_empty() || input_files.len() == 1 {
408 return vec![];
409 }
410
411 let files_to_process = if input_files.len() > 100 {
413 &input_files[0..100]
414 } else {
415 input_files
416 };
417
418 let target_size = match max_file_size {
420 Some(size) => size as usize,
421 None => {
422 let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
424 ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
425 .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
426 }
427 };
428
429 let mut best_group = Vec::new();
431 let mut best_score = f64::NEG_INFINITY;
432
433 for start_idx in (0..files_to_process.len()).rev() {
435 for end_idx in (start_idx + 1..files_to_process.len()).rev() {
437 let group = &files_to_process[start_idx..=end_idx];
438 let total_size: usize = group.iter().map(|f| f.size()).sum();
439
440 if total_size > target_size {
442 continue; }
444
445 let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
447 let amplification_factor = largest_file_size as f64 / total_size as f64;
448
449 let file_reduction = group.len() - 1;
451
452 let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
457 let amp_factor_score = (1.0 - amplification_factor) * 1.5; let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); let score = file_reduction_score + amp_factor_score + size_efficiency;
461
462 if score >= best_score {
465 best_score = score;
466 best_group = group.to_vec();
467 }
468 }
469 }
470
471 best_group
472}
473
474#[cfg(test)]
475mod tests {
476 use std::collections::HashSet;
477
478 use bytes::Bytes;
479 use store_api::storage::FileId;
480
481 use super::*;
482 use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
483
484 #[derive(Clone, Debug, PartialEq)]
485 struct MockFile {
486 start: i64,
487 end: i64,
488 size: usize,
489 }
490
491 impl Ranged for MockFile {
492 type BoundType = i64;
493
494 fn range(&self) -> (Self::BoundType, Self::BoundType) {
495 (self.start, self.end)
496 }
497 }
498
499 impl Item for MockFile {
500 fn size(&self) -> usize {
501 self.size
502 }
503 }
504
505 fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
506 ranges
507 .iter()
508 .map(|(start, end)| MockFile {
509 start: *start,
510 end: *end,
511 size: (*end - *start) as usize,
512 })
513 .collect()
514 }
515
516 fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
517 items
518 .iter()
519 .map(|(start, end, size)| MockFile {
520 start: *start,
521 end: *end,
522 size: *size,
523 })
524 .collect()
525 }
526
527 fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
528 Some((Bytes::from_static(min), Bytes::from_static(max)))
529 }
530
531 fn check_sorted_runs(
532 ranges: &[(i64, i64)],
533 expected_runs: &[Vec<(i64, i64)>],
534 ) -> Vec<SortedRun<MockFile>> {
535 let mut files = build_items(ranges);
536 let runs = find_sorted_runs(&mut files);
537
538 let result_file_ranges: Vec<Vec<_>> = runs
539 .iter()
540 .map(|r| r.items.iter().map(|f| f.range()).collect())
541 .collect();
542 assert_eq!(&expected_runs, &result_file_ranges);
543 runs
544 }
545
546 #[test]
547 fn test_find_sorted_runs() {
548 check_sorted_runs(&[], &[]);
549 check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
550 check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
551 check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
552 check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
553 check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
554 check_sorted_runs(
555 &[(1, 3), (2, 4), (4, 5)],
556 &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
557 );
558
559 check_sorted_runs(
560 &[(1, 2), (3, 4), (3, 5)],
561 &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
562 );
563
564 check_sorted_runs(
565 &[(1, 3), (2, 4), (5, 6)],
566 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
567 );
568
569 check_sorted_runs(
570 &[(1, 2), (3, 5), (4, 6)],
571 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
572 );
573
574 check_sorted_runs(
575 &[(1, 2), (3, 4), (4, 6), (7, 8)],
576 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
577 );
578 check_sorted_runs(
579 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
580 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
581 );
582
583 check_sorted_runs(
584 &[(10, 19), (20, 21), (20, 29), (30, 39)],
585 &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
586 );
587
588 check_sorted_runs(
589 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
590 &[
591 vec![(10, 19), (20, 29), (30, 39)],
592 vec![(21, 22), (31, 32), (32, 42)],
593 ],
594 );
595 }
596
597 fn check_reduce_runs(
598 files: &[(i64, i64)],
599 expected_runs: &[Vec<(i64, i64)>],
600 expected: &[(i64, i64)],
601 ) {
602 let runs = check_sorted_runs(files, expected_runs);
603 if runs.len() <= 1 {
604 assert!(expected.is_empty());
605 return;
606 }
607 let files_to_merge = reduce_runs(runs);
608 let file_to_merge_timestamps = files_to_merge
609 .into_iter()
610 .map(|f| (f.start, f.end))
611 .collect::<HashSet<_>>();
612
613 let expected = expected.iter().cloned().collect::<HashSet<_>>();
614 assert_eq!(&expected, &file_to_merge_timestamps);
615 }
616
617 #[test]
618 fn test_reduce_runs() {
619 check_reduce_runs(
622 &[(1, 3), (2, 4), (5, 6)],
623 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
624 &[(1, 3), (2, 4)],
625 );
626
627 check_reduce_runs(
630 &[(1, 2), (3, 5), (4, 6)],
631 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
632 &[(3, 5), (4, 6)],
633 );
634
635 check_reduce_runs(
638 &[(1, 2), (3, 4), (4, 6), (7, 8)],
639 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
640 &[],
641 );
642
643 check_reduce_runs(
646 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
647 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
648 &[(5, 6), (3, 4), (3, 6)], );
650
651 check_reduce_runs(
654 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
655 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
656 &[(3, 4), (3, 6), (5, 6)],
657 );
658
659 check_reduce_runs(
663 &[
664 (10, 20),
665 (30, 40),
666 (50, 60),
667 (50, 80),
668 (80, 90),
669 (80, 100),
670 (100, 110),
671 ],
672 &[
673 vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
674 vec![(50, 60), (80, 90)],
675 ],
676 &[(50, 80), (80, 100), (50, 60), (80, 90)],
677 );
678
679 check_reduce_runs(
684 &[(0, 10), (0, 11), (0, 12), (0, 13)],
685 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
686 &[(0, 10), (0, 11)],
687 );
688 }
689
690 #[test]
691 fn test_find_overlapping_items() {
692 let mut result = Vec::new();
693
694 find_overlapping_items(
696 &mut SortedRun::from(Vec::<MockFile>::new()),
697 &mut SortedRun::from(Vec::<MockFile>::new()),
698 &mut result,
699 );
700 assert_eq!(result, Vec::<MockFile>::new());
701
702 let files1 = build_items(&[(1, 3)]);
703 find_overlapping_items(
704 &mut SortedRun::from(files1.clone()),
705 &mut SortedRun::from(Vec::<MockFile>::new()),
706 &mut result,
707 );
708 assert_eq!(result, Vec::<MockFile>::new());
709
710 find_overlapping_items(
711 &mut SortedRun::from(Vec::<MockFile>::new()),
712 &mut SortedRun::from(files1.clone()),
713 &mut result,
714 );
715 assert_eq!(result, Vec::<MockFile>::new());
716
717 let files1 = build_items(&[(1, 3), (5, 7)]);
719 let files2 = build_items(&[(10, 12), (15, 20)]);
720 find_overlapping_items(
721 &mut SortedRun::from(files1),
722 &mut SortedRun::from(files2),
723 &mut result,
724 );
725 assert_eq!(result, Vec::<MockFile>::new());
726
727 let files1 = build_items(&[(1, 5)]);
729 let files2 = build_items(&[(3, 7)]);
730 find_overlapping_items(
731 &mut SortedRun::from(files1),
732 &mut SortedRun::from(files2),
733 &mut result,
734 );
735 assert_eq!(result.len(), 2);
736 assert_eq!(result[0].range(), (1, 5));
737 assert_eq!(result[1].range(), (3, 7));
738
739 let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
741 let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
742 find_overlapping_items(
743 &mut SortedRun::from(files1),
744 &mut SortedRun::from(files2),
745 &mut result,
746 );
747 assert_eq!(result.len(), 6);
748
749 let files1 = build_items(&[(1, 5)]);
751 let files2 = build_items(&[(5, 10)]); find_overlapping_items(
753 &mut SortedRun::from(files1),
754 &mut SortedRun::from(files2),
755 &mut result,
756 );
757 assert_eq!(result.len(), 2); let files1 = build_items(&[(1, 10)]);
761 let files2 = build_items(&[(3, 7)]);
762 find_overlapping_items(
763 &mut SortedRun::from(files1),
764 &mut SortedRun::from(files2),
765 &mut result,
766 );
767 assert_eq!(result.len(), 2);
768
769 let files1 = build_items(&[(1, 5)]);
771 let files2 = build_items(&[(1, 5)]);
772 find_overlapping_items(
773 &mut SortedRun::from(files1),
774 &mut SortedRun::from(files2),
775 &mut result,
776 );
777 assert_eq!(result.len(), 2);
778
779 let files1 = build_items(&[(5, 10), (1, 3)]); let files2 = build_items(&[(2, 7), (8, 12)]); find_overlapping_items(
783 &mut SortedRun::from(files1),
784 &mut SortedRun::from(files2),
785 &mut result,
786 );
787 assert_eq!(result.len(), 4); }
789
790 #[test]
791 fn test_file_group_overlap_time_overlap_pk_disjoint() {
792 let lhs =
793 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
794 FileId::random(),
795 0,
796 100,
797 0,
798 1,
799 10,
800 pk_range(b"a", b"f"),
801 ));
802 let rhs =
803 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
804 FileId::random(),
805 50,
806 150,
807 0,
808 2,
809 10,
810 pk_range(b"x", b"z"),
811 ));
812
813 assert!(!lhs.overlap(&rhs));
814 }
815
816 #[test]
817 fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() {
818 let mut files = vec![
819 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
820 FileId::random(),
821 0,
822 100,
823 0,
824 1,
825 10,
826 pk_range(b"a", b"f"),
827 )),
828 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
829 FileId::random(),
830 50,
831 150,
832 0,
833 2,
834 10,
835 pk_range(b"x", b"z"),
836 )),
837 ];
838
839 let runs = find_sorted_runs(&mut files);
840
841 assert_eq!(1, runs.len());
842 assert_eq!(2, runs[0].items().len());
843 }
844
845 #[test]
846 fn test_find_sorted_runs_handles_2d_transitivity_break() {
847 let mut files = vec![
848 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
849 FileId::random(),
850 0,
851 100,
852 0,
853 1,
854 10,
855 pk_range(b"a", b"f"),
856 )),
857 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
858 FileId::random(),
859 50,
860 150,
861 0,
862 2,
863 10,
864 pk_range(b"x", b"z"),
865 )),
866 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
867 FileId::random(),
868 50,
869 150,
870 0,
871 3,
872 10,
873 pk_range(b"a", b"f"),
874 )),
875 ];
876
877 let runs = find_sorted_runs(&mut files);
878
879 assert_eq!(2, runs.len());
880 assert_eq!(2, runs[0].items().len());
881 assert_eq!(1, runs[1].items().len());
882 }
883
884 #[test]
885 fn test_find_overlapping_items_skips_pk_disjoint_pairs() {
886 let mut left = SortedRun::from(vec![FileGroup::new_with_file(
887 new_file_handle_with_size_sequence_and_primary_key_range(
888 FileId::random(),
889 0,
890 100,
891 0,
892 1,
893 10,
894 pk_range(b"a", b"f"),
895 ),
896 )]);
897 let mut right = SortedRun::from(vec![FileGroup::new_with_file(
898 new_file_handle_with_size_sequence_and_primary_key_range(
899 FileId::random(),
900 50,
901 150,
902 0,
903 2,
904 10,
905 pk_range(b"x", b"z"),
906 ),
907 )]);
908 let mut result = Vec::new();
909
910 find_overlapping_items(&mut left, &mut right, &mut result);
911
912 assert!(result.is_empty());
913 }
914
915 #[test]
916 fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() {
917 let lhs =
918 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
919 FileId::random(),
920 0,
921 100,
922 0,
923 1,
924 10,
925 pk_range(b"a", b"f"),
926 ));
927 let rhs =
928 FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
929 FileId::random(),
930 100,
931 150,
932 0,
933 2,
934 10,
935 pk_range(b"a", b"f"),
936 ));
937
938 assert!(!lhs.overlap(&rhs));
939 }
940
941 #[test]
942 fn test_merge_seq_files() {
943 let files = Vec::<MockFile>::new();
945 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
946
947 let files = build_items(&[(1, 5)]);
949 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
950
951 let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
953 let result = merge_seq_files(&files, None);
954 assert_eq!(result.len(), 3);
955 assert_eq!(result[0].size, 1);
956 assert_eq!(result[1].size, 1);
957 assert_eq!(result[2].size, 1);
958
959 let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
961 let result = merge_seq_files(&files, Some(20));
962 assert_eq!(result.len(), 4); let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
966 let result = merge_seq_files(&files, Some(10));
967 assert_eq!(result.len(), 2); let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
971 let result = merge_seq_files(&files, Some(10));
972 assert_eq!(result.len(), 3); let files =
977 build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
978 let result = merge_seq_files(&files, Some(12));
979 assert_eq!(result.len(), 2);
980 assert_eq!(result[0].size, 5);
981 assert_eq!(result[1].size, 5);
982
983 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
985 let result = merge_seq_files(&files, Some(10));
986 assert_eq!(result.len(), 3); assert_eq!(result[0].size, 1);
988 assert_eq!(result[1].size, 1);
989 assert_eq!(result[2].size, 1);
990
991 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
992 let result = merge_seq_files(&files, Some(200));
993 assert_eq!(result.len(), 4);
994
995 let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
996 let result = merge_seq_files(&files, None);
997 assert_eq!(result.len(), 3);
998 assert_eq!(result[0].size, 20);
999 assert_eq!(result[1].size, 20);
1000 assert_eq!(result[2].size, 20);
1001
1002 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
1003 let result = merge_seq_files(&files, Some(200));
1004 assert_eq!(result.len(), 2);
1005 assert_eq!(result[0].size, 100);
1006 assert_eq!(result[1].size, 1);
1007
1008 let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1009 let result = merge_seq_files(&files, Some(40));
1010 assert_eq!(result.len(), 2);
1011 assert_eq!(result[0].start, 1);
1012 assert_eq!(result[1].start, 3);
1013 }
1014}