Skip to main content

mito2/compaction/
run.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This file contains code to find sorted runs in a set if ranged items and
16//! along with the best way to merge these items to satisfy the desired run count.
17
18use bytes::{Buf, Bytes};
19use common_base::BitVec;
20use common_base::readable_size::ReadableSize;
21use common_time::Timestamp;
22use smallvec::{SmallVec, smallvec};
23
24use crate::sst::file::{FileHandle, RegionFileId};
25
26/// Default max compaction output file size when not specified.
27const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
28
29/// Trait for any items with specific range (both boundaries are inclusive).
30pub trait Ranged {
31    type BoundType: Ord + Copy;
32
33    /// Returns the inclusive range of item.
34    fn range(&self) -> (Self::BoundType, Self::BoundType);
35
36    fn overlap(&self, other: &Self) -> bool {
37        let (lhs_start, lhs_end) = self.range();
38        let (rhs_start, rhs_end) = other.range();
39
40        lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
41    }
42
43    /// Like `overlap`, but treats touching boundaries as overlapping (inclusive).
44    /// Used by `find_overlapping_items` where shared boundaries count as overlap.
45    fn overlap_inclusive(&self, other: &Self) -> bool {
46        let (lhs_start, lhs_end) = self.range();
47        let (rhs_start, rhs_end) = other.range();
48
49        lhs_start.max(rhs_start) <= lhs_end.min(rhs_end)
50    }
51}
52
53pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool {
54    lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk())
55}
56
57pub(crate) fn merge_primary_key_ranges(
58    lhs: Option<(Bytes, Bytes)>,
59    rhs: Option<(Bytes, Bytes)>,
60) -> Option<(Bytes, Bytes)> {
61    match (lhs, rhs) {
62        (Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => {
63            Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max)))
64        }
65        _ => None,
66    }
67}
68
69pub fn find_overlapping_items<T: Item + Clone>(
70    l: &mut SortedRun<T>,
71    r: &mut SortedRun<T>,
72    result: &mut Vec<T>,
73) {
74    if l.items.is_empty() || r.items.is_empty() {
75        return;
76    }
77
78    result.clear();
79    result.reserve(l.items.len() + r.items.len());
80
81    // Sort both arrays by start boundary for more efficient overlap detection
82    if !l.sorted {
83        sort_ranged_items(&mut l.items);
84        l.sorted = true;
85    }
86    if !r.sorted {
87        sort_ranged_items(&mut r.items);
88        r.sorted = true;
89    }
90
91    let mut r_idx = 0;
92
93    let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
94
95    for (lhs_idx, lhs) in l.items.iter().enumerate() {
96        let (lhs_start, lhs_end) = lhs.range();
97
98        // Skip right elements that end before current left element starts
99        while r_idx < r.items.len() {
100            let (_, rhs_end) = r.items[r_idx].range();
101            if rhs_end < lhs_start {
102                r_idx += 1;
103            } else {
104                break;
105            }
106        }
107
108        // Check for overlaps with remaining right elements
109        let mut j = r_idx;
110        while j < r.items.len() {
111            let (rhs_start, _rhs_end) = r.items[j].range();
112
113            // If right element starts after left element ends, no more overlaps possible
114            if rhs_start > lhs_end {
115                break;
116            }
117
118            // We have an overlap (inclusive: touching boundaries count)
119            if lhs.overlap_inclusive(&r.items[j]) {
120                if !selected[lhs_idx] {
121                    result.push(lhs.clone());
122                    selected.set(lhs_idx, true);
123                }
124
125                let rhs_selected_idx = l.items.len() + j;
126                if !selected[rhs_selected_idx] {
127                    result.push(r.items[j].clone());
128                    selected.set(rhs_selected_idx, true);
129                }
130            }
131
132            j += 1;
133        }
134    }
135}
136
137// Sorts ranges by start asc and end desc.
138fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
139    values.sort_unstable_by(|l, r| {
140        let (l_start, l_end) = l.range();
141        let (r_start, r_end) = r.range();
142        l_start.cmp(&r_start).then(r_end.cmp(&l_end))
143    });
144}
145
146/// Trait for items to merge.
147pub trait Item: Ranged + Clone {
148    /// Size is used to calculate the cost of merging items.
149    fn size(&self) -> usize;
150}
151
152/// A group of files that are created by the same compaction task.
153#[derive(Debug, Clone)]
154pub struct FileGroup {
155    files: SmallVec<[FileHandle; 2]>,
156    size: usize,
157    num_rows: usize,
158    min_timestamp: Timestamp,
159    max_timestamp: Timestamp,
160    primary_key_range: Option<(Bytes, Bytes)>,
161}
162
163impl FileGroup {
164    pub(crate) fn new_with_file(file: FileHandle) -> Self {
165        let size = file.size() as usize;
166        let (min_timestamp, max_timestamp) = file.time_range();
167        let num_rows = file.num_rows();
168        let primary_key_range = file.primary_key_range();
169        Self {
170            files: smallvec![file],
171            size,
172            num_rows,
173            min_timestamp,
174            max_timestamp,
175            primary_key_range,
176        }
177    }
178
179    pub(crate) fn num_rows(&self) -> usize {
180        self.num_rows
181    }
182
183    pub(crate) fn add_file(&mut self, file: FileHandle) {
184        self.size += file.size() as usize;
185        self.num_rows += file.num_rows();
186        let (min_timestamp, max_timestamp) = file.time_range();
187        self.min_timestamp = self.min_timestamp.min(min_timestamp);
188        self.max_timestamp = self.max_timestamp.max(max_timestamp);
189        self.primary_key_range =
190            merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
191        self.files.push(file);
192    }
193
194    pub(crate) fn num_files(&self) -> usize {
195        self.files.len()
196    }
197
198    #[cfg(test)]
199    pub(crate) fn files(&self) -> &[FileHandle] {
200        &self.files[..]
201    }
202
203    pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
204        SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
205    }
206
207    pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
208        self.files.into_iter()
209    }
210}
211
212impl Ranged for FileGroup {
213    type BoundType = Timestamp;
214
215    fn range(&self) -> (Self::BoundType, Self::BoundType) {
216        (self.min_timestamp, self.max_timestamp)
217    }
218
219    fn overlap(&self, other: &Self) -> bool {
220        let (lhs_start, lhs_end) = self.range();
221        let (rhs_start, rhs_end) = other.range();
222        if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) {
223            return false;
224        }
225
226        match (&self.primary_key_range, &other.primary_key_range) {
227            (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
228            _ => true,
229        }
230    }
231
232    fn overlap_inclusive(&self, other: &Self) -> bool {
233        let (lhs_start, lhs_end) = self.range();
234        let (rhs_start, rhs_end) = other.range();
235        if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) {
236            return false;
237        }
238
239        match (&self.primary_key_range, &other.primary_key_range) {
240            (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
241            _ => true,
242        }
243    }
244}
245
246impl Item for FileGroup {
247    fn size(&self) -> usize {
248        self.size
249    }
250}
251
252/// A set of files with non-overlapping time ranges.
253#[derive(Debug, Clone)]
254pub struct SortedRun<T: Item> {
255    /// Items to merge
256    items: Vec<T>,
257    /// The total size of all items.
258    size: usize,
259    /// The lower bound of all items.
260    start: Option<T::BoundType>,
261    /// The upper bound of all items.
262    end: Option<T::BoundType>,
263    /// Whether items are sorted.
264    sorted: bool,
265}
266
267impl<T: Item> From<Vec<T>> for SortedRun<T> {
268    fn from(items: Vec<T>) -> Self {
269        let mut r = Self {
270            items: Vec::with_capacity(items.len()),
271            size: 0,
272            start: None,
273            end: None,
274            sorted: false,
275        };
276        for item in items {
277            r.push_item(item);
278        }
279
280        r
281    }
282}
283
284impl<T> Default for SortedRun<T>
285where
286    T: Item,
287{
288    fn default() -> Self {
289        Self {
290            items: vec![],
291            size: 0,
292            start: None,
293            end: None,
294            sorted: false,
295        }
296    }
297}
298
299impl<T> SortedRun<T>
300where
301    T: Item,
302{
303    pub fn items(&self) -> &[T] {
304        &self.items
305    }
306
307    fn push_item(&mut self, t: T) {
308        let (file_start, file_end) = t.range();
309        self.size += t.size();
310        self.items.push(t);
311        self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
312        self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
313    }
314}
315
316/// Finds sorted runs in given items.
317pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
318where
319    T: Item,
320{
321    if items.is_empty() {
322        return vec![];
323    }
324    // sort files
325    sort_ranged_items(items);
326
327    let mut current_run = SortedRun::default();
328    let mut runs = vec![];
329
330    let mut selection = BitVec::repeat(false, items.len());
331    while !selection.all() {
332        // until all items are assigned to some sorted run.
333        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
334            if *selected {
335                // item is already assigned.
336                continue;
337            }
338            if current_run.items.is_empty() {
339                // current run is empty, just add current_item
340                selected.set(true);
341                current_run.push_item(item.clone());
342            } else {
343                // the current item does not overlap with any item in current run,
344                // then it belongs to current run. Because now we introduced primary
345                // key range, we cannot simply use timestamps to check overlapping.
346                let overlaps_any = current_run.items.iter().any(|i| i.overlap(item));
347                if !overlaps_any {
348                    // does not overlap, push to current run
349                    selected.set(true);
350                    current_run.push_item(item.clone());
351                }
352            }
353        }
354        // finished an iteration, we've found a new run.
355        runs.push(std::mem::take(&mut current_run));
356    }
357    runs
358}
359
360/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs.
361/// The penalty of merging is defined as the size of all overlapping files between two runs.
362pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
363    assert!(runs.len() > 1);
364    // sort runs by size
365    runs.sort_unstable_by_key(|a| a.size);
366    // limit max probe runs to 100
367    let probe_end = runs.len().min(100);
368    let mut min_penalty = usize::MAX;
369    let mut files = vec![];
370    let mut temp_files = vec![];
371    for i in 0..probe_end {
372        for j in i + 1..probe_end {
373            let (a, b) = runs.split_at_mut(j);
374            find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
375            let penalty = temp_files.iter().map(|e| e.size()).sum();
376            if penalty < min_penalty {
377                min_penalty = penalty;
378                files.clear();
379                files.extend_from_slice(&temp_files);
380            }
381        }
382    }
383    files
384}
385
386/// Finds the optimal set of adjacent files to merge based on a scoring system.
387///
388/// This function evaluates all possible contiguous subsets of files to find the best
389/// candidates for merging, considering:
390///
391/// 1. File reduction - prioritizes merging more files to reduce the total count
392/// 2. Write amplification - minimizes the ratio of largest file to total size
393/// 3. Size efficiency - prefers merges that utilize available space effectively
394///
395/// When multiple merge candidates have the same score, older files (those with lower indices)
396/// are preferred.
397///
398/// # Arguments
399/// * `input_files` - Slice of files to consider for merging
400/// * `max_file_size` - Optional maximum size constraint for the merged file.
401///   If None, uses 1.5 times the average file size.
402///
403/// # Returns
404/// A vector containing the best set of adjacent files to merge.
405/// Returns an empty vector if input is empty or contains only one file.
406pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
407    if input_files.is_empty() || input_files.len() == 1 {
408        return vec![];
409    }
410
411    // Limit the number of files to process to 100 to control time complexity
412    let files_to_process = if input_files.len() > 100 {
413        &input_files[0..100]
414    } else {
415        input_files
416    };
417
418    // Calculate target size based on max_file_size or average file size
419    let target_size = match max_file_size {
420        Some(size) => size as usize,
421        None => {
422            // Calculate 1.5*average_file_size if max_file_size is not provided and clamp to 2GB
423            let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
424            ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
425                .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
426        }
427    };
428
429    // Find the best group of adjacent files to merge
430    let mut best_group = Vec::new();
431    let mut best_score = f64::NEG_INFINITY;
432
433    // Try different starting positions - iterate from end to start to prefer older files
434    for start_idx in (0..files_to_process.len()).rev() {
435        // Try different ending positions - also iterate from end to start
436        for end_idx in (start_idx + 1..files_to_process.len()).rev() {
437            let group = &files_to_process[start_idx..=end_idx];
438            let total_size: usize = group.iter().map(|f| f.size()).sum();
439
440            // Skip if total size exceeds target size
441            if total_size > target_size {
442                continue; // Use continue instead of break to check smaller ranges
443            }
444
445            // Calculate amplification factor (largest file size / total size)
446            let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
447            let amplification_factor = largest_file_size as f64 / total_size as f64;
448
449            // Calculate file reduction (number of files that will be reduced)
450            let file_reduction = group.len() - 1;
451
452            // Calculate score based on multiple factors:
453            // 1. File reduction (higher is better)
454            // 2. Amplification factor (lower is better)
455            // 3. Size efficiency (how close to target size)
456            let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
457            let amp_factor_score = (1.0 - amplification_factor) * 1.5; // Lower amplification is better
458            let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); // Reward using available space
459
460            let score = file_reduction_score + amp_factor_score + size_efficiency;
461
462            // Check if this group is better than our current best
463            // Use >= instead of > to prefer older files (which we encounter first due to reverse iteration)
464            if score >= best_score {
465                best_score = score;
466                best_group = group.to_vec();
467            }
468        }
469    }
470
471    best_group
472}
473
474#[cfg(test)]
475mod tests {
476    use std::collections::HashSet;
477
478    use bytes::Bytes;
479    use store_api::storage::FileId;
480
481    use super::*;
482    use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
483
484    #[derive(Clone, Debug, PartialEq)]
485    struct MockFile {
486        start: i64,
487        end: i64,
488        size: usize,
489    }
490
491    impl Ranged for MockFile {
492        type BoundType = i64;
493
494        fn range(&self) -> (Self::BoundType, Self::BoundType) {
495            (self.start, self.end)
496        }
497    }
498
499    impl Item for MockFile {
500        fn size(&self) -> usize {
501            self.size
502        }
503    }
504
505    fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
506        ranges
507            .iter()
508            .map(|(start, end)| MockFile {
509                start: *start,
510                end: *end,
511                size: (*end - *start) as usize,
512            })
513            .collect()
514    }
515
516    fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
517        items
518            .iter()
519            .map(|(start, end, size)| MockFile {
520                start: *start,
521                end: *end,
522                size: *size,
523            })
524            .collect()
525    }
526
527    fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
528        Some((Bytes::from_static(min), Bytes::from_static(max)))
529    }
530
531    fn check_sorted_runs(
532        ranges: &[(i64, i64)],
533        expected_runs: &[Vec<(i64, i64)>],
534    ) -> Vec<SortedRun<MockFile>> {
535        let mut files = build_items(ranges);
536        let runs = find_sorted_runs(&mut files);
537
538        let result_file_ranges: Vec<Vec<_>> = runs
539            .iter()
540            .map(|r| r.items.iter().map(|f| f.range()).collect())
541            .collect();
542        assert_eq!(&expected_runs, &result_file_ranges);
543        runs
544    }
545
546    #[test]
547    fn test_find_sorted_runs() {
548        check_sorted_runs(&[], &[]);
549        check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
550        check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
551        check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
552        check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
553        check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
554        check_sorted_runs(
555            &[(1, 3), (2, 4), (4, 5)],
556            &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
557        );
558
559        check_sorted_runs(
560            &[(1, 2), (3, 4), (3, 5)],
561            &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
562        );
563
564        check_sorted_runs(
565            &[(1, 3), (2, 4), (5, 6)],
566            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
567        );
568
569        check_sorted_runs(
570            &[(1, 2), (3, 5), (4, 6)],
571            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
572        );
573
574        check_sorted_runs(
575            &[(1, 2), (3, 4), (4, 6), (7, 8)],
576            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
577        );
578        check_sorted_runs(
579            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
580            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
581        );
582
583        check_sorted_runs(
584            &[(10, 19), (20, 21), (20, 29), (30, 39)],
585            &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
586        );
587
588        check_sorted_runs(
589            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
590            &[
591                vec![(10, 19), (20, 29), (30, 39)],
592                vec![(21, 22), (31, 32), (32, 42)],
593            ],
594        );
595    }
596
597    fn check_reduce_runs(
598        files: &[(i64, i64)],
599        expected_runs: &[Vec<(i64, i64)>],
600        expected: &[(i64, i64)],
601    ) {
602        let runs = check_sorted_runs(files, expected_runs);
603        if runs.len() <= 1 {
604            assert!(expected.is_empty());
605            return;
606        }
607        let files_to_merge = reduce_runs(runs);
608        let file_to_merge_timestamps = files_to_merge
609            .into_iter()
610            .map(|f| (f.start, f.end))
611            .collect::<HashSet<_>>();
612
613        let expected = expected.iter().cloned().collect::<HashSet<_>>();
614        assert_eq!(&expected, &file_to_merge_timestamps);
615    }
616
617    #[test]
618    fn test_reduce_runs() {
619        // [1..3]   [5..6]
620        //   [2..4]
621        check_reduce_runs(
622            &[(1, 3), (2, 4), (5, 6)],
623            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
624            &[(1, 3), (2, 4)],
625        );
626
627        // [1..2][3..5]
628        //         [4..6]
629        check_reduce_runs(
630            &[(1, 2), (3, 5), (4, 6)],
631            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
632            &[(3, 5), (4, 6)],
633        );
634
635        // [1..2][3..4]    [7..8]
636        //          [4..6]
637        check_reduce_runs(
638            &[(1, 2), (3, 4), (4, 6), (7, 8)],
639            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
640            &[],
641        );
642
643        // [1..2][3........6][7..8][8..9]
644        //       [3..4][5..6]
645        check_reduce_runs(
646            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
647            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
648            &[(5, 6), (3, 4), (3, 6)], // already satisfied
649        );
650
651        // [1..2][3........6][7..8][8..9]
652        //       [3..4][5..6]
653        check_reduce_runs(
654            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
655            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
656            &[(3, 4), (3, 6), (5, 6)],
657        );
658
659        // [10..20] [30..40] [50........80][80...100][100..110]
660        //                   [50..60]  [80..90]
661        //
662        check_reduce_runs(
663            &[
664                (10, 20),
665                (30, 40),
666                (50, 60),
667                (50, 80),
668                (80, 90),
669                (80, 100),
670                (100, 110),
671            ],
672            &[
673                vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
674                vec![(50, 60), (80, 90)],
675            ],
676            &[(50, 80), (80, 100), (50, 60), (80, 90)],
677        );
678
679        // [0..10]
680        // [0...11]
681        // [0....12]
682        // [0.....13]
683        check_reduce_runs(
684            &[(0, 10), (0, 11), (0, 12), (0, 13)],
685            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
686            &[(0, 10), (0, 11)],
687        );
688    }
689
690    #[test]
691    fn test_find_overlapping_items() {
692        let mut result = Vec::new();
693
694        // Test empty inputs
695        find_overlapping_items(
696            &mut SortedRun::from(Vec::<MockFile>::new()),
697            &mut SortedRun::from(Vec::<MockFile>::new()),
698            &mut result,
699        );
700        assert_eq!(result, Vec::<MockFile>::new());
701
702        let files1 = build_items(&[(1, 3)]);
703        find_overlapping_items(
704            &mut SortedRun::from(files1.clone()),
705            &mut SortedRun::from(Vec::<MockFile>::new()),
706            &mut result,
707        );
708        assert_eq!(result, Vec::<MockFile>::new());
709
710        find_overlapping_items(
711            &mut SortedRun::from(Vec::<MockFile>::new()),
712            &mut SortedRun::from(files1.clone()),
713            &mut result,
714        );
715        assert_eq!(result, Vec::<MockFile>::new());
716
717        // Test non-overlapping ranges
718        let files1 = build_items(&[(1, 3), (5, 7)]);
719        let files2 = build_items(&[(10, 12), (15, 20)]);
720        find_overlapping_items(
721            &mut SortedRun::from(files1),
722            &mut SortedRun::from(files2),
723            &mut result,
724        );
725        assert_eq!(result, Vec::<MockFile>::new());
726
727        // Test simple overlap
728        let files1 = build_items(&[(1, 5)]);
729        let files2 = build_items(&[(3, 7)]);
730        find_overlapping_items(
731            &mut SortedRun::from(files1),
732            &mut SortedRun::from(files2),
733            &mut result,
734        );
735        assert_eq!(result.len(), 2);
736        assert_eq!(result[0].range(), (1, 5));
737        assert_eq!(result[1].range(), (3, 7));
738
739        // Test multiple overlaps
740        let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
741        let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
742        find_overlapping_items(
743            &mut SortedRun::from(files1),
744            &mut SortedRun::from(files2),
745            &mut result,
746        );
747        assert_eq!(result.len(), 6);
748
749        // Test boundary cases (touching but not overlapping)
750        let files1 = build_items(&[(1, 5)]);
751        let files2 = build_items(&[(5, 10)]); // Touching at 5
752        find_overlapping_items(
753            &mut SortedRun::from(files1),
754            &mut SortedRun::from(files2),
755            &mut result,
756        );
757        assert_eq!(result.len(), 2); // Should overlap since ranges are inclusive
758
759        // Test completely contained ranges
760        let files1 = build_items(&[(1, 10)]);
761        let files2 = build_items(&[(3, 7)]);
762        find_overlapping_items(
763            &mut SortedRun::from(files1),
764            &mut SortedRun::from(files2),
765            &mut result,
766        );
767        assert_eq!(result.len(), 2);
768
769        // Test identical ranges
770        let files1 = build_items(&[(1, 5)]);
771        let files2 = build_items(&[(1, 5)]);
772        find_overlapping_items(
773            &mut SortedRun::from(files1),
774            &mut SortedRun::from(files2),
775            &mut result,
776        );
777        assert_eq!(result.len(), 2);
778
779        // Test unsorted input handling
780        let files1 = build_items(&[(5, 10), (1, 3)]); // Unsorted
781        let files2 = build_items(&[(2, 7), (8, 12)]); // Unsorted
782        find_overlapping_items(
783            &mut SortedRun::from(files1),
784            &mut SortedRun::from(files2),
785            &mut result,
786        );
787        assert_eq!(result.len(), 4); // Should find both overlaps
788    }
789
790    #[test]
791    fn test_file_group_overlap_time_overlap_pk_disjoint() {
792        let lhs =
793            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
794                FileId::random(),
795                0,
796                100,
797                0,
798                1,
799                10,
800                pk_range(b"a", b"f"),
801            ));
802        let rhs =
803            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
804                FileId::random(),
805                50,
806                150,
807                0,
808                2,
809                10,
810                pk_range(b"x", b"z"),
811            ));
812
813        assert!(!lhs.overlap(&rhs));
814    }
815
816    #[test]
817    fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() {
818        let mut files = vec![
819            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
820                FileId::random(),
821                0,
822                100,
823                0,
824                1,
825                10,
826                pk_range(b"a", b"f"),
827            )),
828            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
829                FileId::random(),
830                50,
831                150,
832                0,
833                2,
834                10,
835                pk_range(b"x", b"z"),
836            )),
837        ];
838
839        let runs = find_sorted_runs(&mut files);
840
841        assert_eq!(1, runs.len());
842        assert_eq!(2, runs[0].items().len());
843    }
844
845    #[test]
846    fn test_find_sorted_runs_handles_2d_transitivity_break() {
847        let mut files = vec![
848            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
849                FileId::random(),
850                0,
851                100,
852                0,
853                1,
854                10,
855                pk_range(b"a", b"f"),
856            )),
857            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
858                FileId::random(),
859                50,
860                150,
861                0,
862                2,
863                10,
864                pk_range(b"x", b"z"),
865            )),
866            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
867                FileId::random(),
868                50,
869                150,
870                0,
871                3,
872                10,
873                pk_range(b"a", b"f"),
874            )),
875        ];
876
877        let runs = find_sorted_runs(&mut files);
878
879        assert_eq!(2, runs.len());
880        assert_eq!(2, runs[0].items().len());
881        assert_eq!(1, runs[1].items().len());
882    }
883
884    #[test]
885    fn test_find_overlapping_items_skips_pk_disjoint_pairs() {
886        let mut left = SortedRun::from(vec![FileGroup::new_with_file(
887            new_file_handle_with_size_sequence_and_primary_key_range(
888                FileId::random(),
889                0,
890                100,
891                0,
892                1,
893                10,
894                pk_range(b"a", b"f"),
895            ),
896        )]);
897        let mut right = SortedRun::from(vec![FileGroup::new_with_file(
898            new_file_handle_with_size_sequence_and_primary_key_range(
899                FileId::random(),
900                50,
901                150,
902                0,
903                2,
904                10,
905                pk_range(b"x", b"z"),
906            ),
907        )]);
908        let mut result = Vec::new();
909
910        find_overlapping_items(&mut left, &mut right, &mut result);
911
912        assert!(result.is_empty());
913    }
914
915    #[test]
916    fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() {
917        let lhs =
918            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
919                FileId::random(),
920                0,
921                100,
922                0,
923                1,
924                10,
925                pk_range(b"a", b"f"),
926            ));
927        let rhs =
928            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
929                FileId::random(),
930                100,
931                150,
932                0,
933                2,
934                10,
935                pk_range(b"a", b"f"),
936            ));
937
938        assert!(!lhs.overlap(&rhs));
939    }
940
941    #[test]
942    fn test_merge_seq_files() {
943        // Test empty input
944        let files = Vec::<MockFile>::new();
945        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
946
947        // Test single file input (should return empty vec as no merge needed)
948        let files = build_items(&[(1, 5)]);
949        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
950
951        // Test the example case: [10, 1, 1, 1] - should merge the last three files
952        let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
953        let result = merge_seq_files(&files, None);
954        assert_eq!(result.len(), 3);
955        assert_eq!(result[0].size, 1);
956        assert_eq!(result[1].size, 1);
957        assert_eq!(result[2].size, 1);
958
959        // Test with files of equal size - should merge as many as possible
960        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
961        let result = merge_seq_files(&files, Some(20));
962        assert_eq!(result.len(), 4); // Should merge all 4 files as total size is 20
963
964        // Test with max_file_size constraint
965        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
966        let result = merge_seq_files(&files, Some(10));
967        assert_eq!(result.len(), 2); // Should merge only 2 files as max size is 10
968
969        // Test with uneven file sizes - should prioritize reducing file count
970        let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
971        let result = merge_seq_files(&files, Some(10));
972        assert_eq!(result.len(), 3); // Should merge the first 3 files (total size 9)
973
974        // Test amplification factor prioritization
975        // Two possible merges: [5, 5] (amp factor 0.5) vs [10, 1, 1] (amp factor 0.83)
976        let files =
977            build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
978        let result = merge_seq_files(&files, Some(12));
979        assert_eq!(result.len(), 2);
980        assert_eq!(result[0].size, 5);
981        assert_eq!(result[1].size, 5);
982
983        // Test with large file preventing merges
984        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
985        let result = merge_seq_files(&files, Some(10));
986        assert_eq!(result.len(), 3); // Should merge the last 3 small files
987        assert_eq!(result[0].size, 1);
988        assert_eq!(result[1].size, 1);
989        assert_eq!(result[2].size, 1);
990
991        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
992        let result = merge_seq_files(&files, Some(200));
993        assert_eq!(result.len(), 4);
994
995        let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
996        let result = merge_seq_files(&files, None);
997        assert_eq!(result.len(), 3);
998        assert_eq!(result[0].size, 20);
999        assert_eq!(result[1].size, 20);
1000        assert_eq!(result[2].size, 20);
1001
1002        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
1003        let result = merge_seq_files(&files, Some(200));
1004        assert_eq!(result.len(), 2);
1005        assert_eq!(result[0].size, 100);
1006        assert_eq!(result[1].size, 1);
1007
1008        let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1009        let result = merge_seq_files(&files, Some(40));
1010        assert_eq!(result.len(), 2);
1011        assert_eq!(result[0].start, 1);
1012        assert_eq!(result[1].start, 3);
1013    }
1014}