mito2/compaction/
run.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This file contains code to find sorted runs in a set if ranged items and
16//! along with the best way to merge these items to satisfy the desired run count.
17
18use common_base::readable_size::ReadableSize;
19use common_base::BitVec;
20use common_time::Timestamp;
21use smallvec::{smallvec, SmallVec};
22
23use crate::sst::file::{FileHandle, RegionFileId};
24
25/// Default max compaction output file size when not specified.
26const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
27
28/// Trait for any items with specific range (both boundaries are inclusive).
29pub trait Ranged {
30    type BoundType: Ord + Copy;
31
32    /// Returns the inclusive range of item.
33    fn range(&self) -> (Self::BoundType, Self::BoundType);
34
35    fn overlap<T>(&self, other: &T) -> bool
36    where
37        T: Ranged<BoundType = Self::BoundType>,
38    {
39        let (lhs_start, lhs_end) = self.range();
40        let (rhs_start, rhs_end) = other.range();
41
42        lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
43    }
44}
45
46pub fn find_overlapping_items<T: Item + Clone>(
47    l: &mut SortedRun<T>,
48    r: &mut SortedRun<T>,
49    result: &mut Vec<T>,
50) {
51    if l.items.is_empty() || r.items.is_empty() {
52        return;
53    }
54
55    result.clear();
56    result.reserve(l.items.len() + r.items.len());
57
58    // Sort both arrays by start boundary for more efficient overlap detection
59    if !l.sorted {
60        sort_ranged_items(&mut l.items);
61        l.sorted = true;
62    }
63    if !r.sorted {
64        sort_ranged_items(&mut r.items);
65        r.sorted = true;
66    }
67
68    let mut r_idx = 0;
69
70    let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
71
72    for (lhs_idx, lhs) in l.items.iter().enumerate() {
73        let (lhs_start, lhs_end) = lhs.range();
74
75        // Skip right elements that end before current left element starts
76        while r_idx < r.items.len() {
77            let (_, rhs_end) = r.items[r_idx].range();
78            if rhs_end < lhs_start {
79                r_idx += 1;
80            } else {
81                break;
82            }
83        }
84
85        // Check for overlaps with remaining right elements
86        let mut j = r_idx;
87        while j < r.items.len() {
88            let (rhs_start, rhs_end) = r.items[j].range();
89
90            // If right element starts after left element ends, no more overlaps possible
91            if rhs_start > lhs_end {
92                break;
93            }
94
95            // We have an overlap
96            if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) {
97                if !selected[lhs_idx] {
98                    result.push(lhs.clone());
99                    selected.set(lhs_idx, true);
100                }
101
102                let rhs_selected_idx = l.items.len() + j;
103                if !selected[rhs_selected_idx] {
104                    result.push(r.items[j].clone());
105                    selected.set(rhs_selected_idx, true);
106                }
107            }
108
109            j += 1;
110        }
111    }
112}
113
114// Sorts ranges by start asc and end desc.
115fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
116    values.sort_unstable_by(|l, r| {
117        let (l_start, l_end) = l.range();
118        let (r_start, r_end) = r.range();
119        l_start.cmp(&r_start).then(r_end.cmp(&l_end))
120    });
121}
122
123/// Trait for items to merge.
124pub trait Item: Ranged + Clone {
125    /// Size is used to calculate the cost of merging items.
126    fn size(&self) -> usize;
127}
128
129/// A group of files that are created by the same compaction task.
130#[derive(Debug, Clone)]
131pub struct FileGroup {
132    files: SmallVec<[FileHandle; 2]>,
133    size: usize,
134    num_rows: usize,
135    min_timestamp: Timestamp,
136    max_timestamp: Timestamp,
137}
138
139impl FileGroup {
140    pub(crate) fn new_with_file(file: FileHandle) -> Self {
141        let size = file.size() as usize;
142        let (min_timestamp, max_timestamp) = file.time_range();
143        let num_rows = file.num_rows();
144        Self {
145            files: smallvec![file],
146            size,
147            num_rows,
148            min_timestamp,
149            max_timestamp,
150        }
151    }
152
153    pub(crate) fn num_rows(&self) -> usize {
154        self.num_rows
155    }
156
157    pub(crate) fn add_file(&mut self, file: FileHandle) {
158        self.size += file.size() as usize;
159        self.num_rows += file.num_rows();
160        let (min_timestamp, max_timestamp) = file.time_range();
161        self.min_timestamp = self.min_timestamp.min(min_timestamp);
162        self.max_timestamp = self.max_timestamp.max(max_timestamp);
163        self.files.push(file);
164    }
165
166    #[cfg(test)]
167    pub(crate) fn files(&self) -> &[FileHandle] {
168        &self.files[..]
169    }
170
171    pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
172        SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
173    }
174
175    pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
176        self.files.into_iter()
177    }
178
179    pub(crate) fn is_all_level_0(&self) -> bool {
180        self.files.iter().all(|f| f.level() == 0)
181    }
182}
183
184impl Ranged for FileGroup {
185    type BoundType = Timestamp;
186
187    fn range(&self) -> (Self::BoundType, Self::BoundType) {
188        (self.min_timestamp, self.max_timestamp)
189    }
190}
191
192impl Item for FileGroup {
193    fn size(&self) -> usize {
194        self.size
195    }
196}
197
198/// A set of files with non-overlapping time ranges.
199#[derive(Debug, Clone)]
200pub struct SortedRun<T: Item> {
201    /// Items to merge
202    items: Vec<T>,
203    /// The total size of all items.
204    size: usize,
205    /// The lower bound of all items.
206    start: Option<T::BoundType>,
207    /// The upper bound of all items.
208    end: Option<T::BoundType>,
209    /// Whether items are sorted.
210    sorted: bool,
211}
212
213impl<T: Item> From<Vec<T>> for SortedRun<T> {
214    fn from(items: Vec<T>) -> Self {
215        let mut r = Self {
216            items: Vec::with_capacity(items.len()),
217            size: 0,
218            start: None,
219            end: None,
220            sorted: false,
221        };
222        for item in items {
223            r.push_item(item);
224        }
225
226        r
227    }
228}
229
230impl<T> Default for SortedRun<T>
231where
232    T: Item,
233{
234    fn default() -> Self {
235        Self {
236            items: vec![],
237            size: 0,
238            start: None,
239            end: None,
240            sorted: false,
241        }
242    }
243}
244
245impl<T> SortedRun<T>
246where
247    T: Item,
248{
249    pub fn items(&self) -> &[T] {
250        &self.items
251    }
252
253    fn push_item(&mut self, t: T) {
254        let (file_start, file_end) = t.range();
255        self.size += t.size();
256        self.items.push(t);
257        self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
258        self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
259    }
260}
261
262/// Finds sorted runs in given items.
263pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
264where
265    T: Item,
266{
267    if items.is_empty() {
268        return vec![];
269    }
270    // sort files
271    sort_ranged_items(items);
272
273    let mut current_run = SortedRun::default();
274    let mut runs = vec![];
275
276    let mut selection = BitVec::repeat(false, items.len());
277    while !selection.all() {
278        // until all items are assigned to some sorted run.
279        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
280            if *selected {
281                // item is already assigned.
282                continue;
283            }
284            match current_run.items.last() {
285                None => {
286                    // current run is empty, just add current_item
287                    selected.set(true);
288                    current_run.push_item(item.clone());
289                }
290                Some(last) => {
291                    // the current item does not overlap with the last item in current run,
292                    // then it belongs to current run.
293                    if !last.overlap(item) {
294                        // does not overlap, push to current run
295                        selected.set(true);
296                        current_run.push_item(item.clone());
297                    }
298                }
299            }
300        }
301        // finished an iteration, we've found a new run.
302        runs.push(std::mem::take(&mut current_run));
303    }
304    runs
305}
306
307/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs.
308/// The penalty of merging is defined as the size of all overlapping files between two runs.
309pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
310    assert!(runs.len() > 1);
311    // sort runs by size
312    runs.sort_unstable_by(|a, b| a.size.cmp(&b.size));
313    // limit max probe runs to 100
314    let probe_end = runs.len().min(100);
315    let mut min_penalty = usize::MAX;
316    let mut files = vec![];
317    let mut temp_files = vec![];
318    for i in 0..probe_end {
319        for j in i + 1..probe_end {
320            let (a, b) = runs.split_at_mut(j);
321            find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
322            let penalty = temp_files.iter().map(|e| e.size()).sum();
323            if penalty < min_penalty {
324                min_penalty = penalty;
325                files.clear();
326                files.extend_from_slice(&temp_files);
327            }
328        }
329    }
330    files
331}
332
333/// Finds the optimal set of adjacent files to merge based on a scoring system.
334///
335/// This function evaluates all possible contiguous subsets of files to find the best
336/// candidates for merging, considering:
337///
338/// 1. File reduction - prioritizes merging more files to reduce the total count
339/// 2. Write amplification - minimizes the ratio of largest file to total size
340/// 3. Size efficiency - prefers merges that utilize available space effectively
341///
342/// When multiple merge candidates have the same score, older files (those with lower indices)
343/// are preferred.
344///
345/// # Arguments
346/// * `input_files` - Slice of files to consider for merging
347/// * `max_file_size` - Optional maximum size constraint for the merged file.
348///   If None, uses 1.5 times the average file size.
349///
350/// # Returns
351/// A vector containing the best set of adjacent files to merge.
352/// Returns an empty vector if input is empty or contains only one file.
353pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
354    if input_files.is_empty() || input_files.len() == 1 {
355        return vec![];
356    }
357
358    // Limit the number of files to process to 100 to control time complexity
359    let files_to_process = if input_files.len() > 100 {
360        &input_files[0..100]
361    } else {
362        input_files
363    };
364
365    // Calculate target size based on max_file_size or average file size
366    let target_size = match max_file_size {
367        Some(size) => size as usize,
368        None => {
369            // Calculate 1.5*average_file_size if max_file_size is not provided and clamp to 2GB
370            let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
371            ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
372                .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
373        }
374    };
375
376    // Find the best group of adjacent files to merge
377    let mut best_group = Vec::new();
378    let mut best_score = f64::NEG_INFINITY;
379
380    // Try different starting positions - iterate from end to start to prefer older files
381    for start_idx in (0..files_to_process.len()).rev() {
382        // Try different ending positions - also iterate from end to start
383        for end_idx in (start_idx + 1..files_to_process.len()).rev() {
384            let group = &files_to_process[start_idx..=end_idx];
385            let total_size: usize = group.iter().map(|f| f.size()).sum();
386
387            // Skip if total size exceeds target size
388            if total_size > target_size {
389                continue; // Use continue instead of break to check smaller ranges
390            }
391
392            // Calculate amplification factor (largest file size / total size)
393            let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
394            let amplification_factor = largest_file_size as f64 / total_size as f64;
395
396            // Calculate file reduction (number of files that will be reduced)
397            let file_reduction = group.len() - 1;
398
399            // Calculate score based on multiple factors:
400            // 1. File reduction (higher is better)
401            // 2. Amplification factor (lower is better)
402            // 3. Size efficiency (how close to target size)
403            let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
404            let amp_factor_score = (1.0 - amplification_factor) * 1.5; // Lower amplification is better
405            let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); // Reward using available space
406
407            let score = file_reduction_score + amp_factor_score + size_efficiency;
408
409            // Check if this group is better than our current best
410            // Use >= instead of > to prefer older files (which we encounter first due to reverse iteration)
411            if score >= best_score {
412                best_score = score;
413                best_group = group.to_vec();
414            }
415        }
416    }
417
418    best_group
419}
420
421#[cfg(test)]
422mod tests {
423    use std::collections::HashSet;
424
425    use super::*;
426
427    #[derive(Clone, Debug, PartialEq)]
428    struct MockFile {
429        start: i64,
430        end: i64,
431        size: usize,
432    }
433
434    impl Ranged for MockFile {
435        type BoundType = i64;
436
437        fn range(&self) -> (Self::BoundType, Self::BoundType) {
438            (self.start, self.end)
439        }
440    }
441
442    impl Item for MockFile {
443        fn size(&self) -> usize {
444            self.size
445        }
446    }
447
448    fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
449        ranges
450            .iter()
451            .map(|(start, end)| MockFile {
452                start: *start,
453                end: *end,
454                size: (*end - *start) as usize,
455            })
456            .collect()
457    }
458
459    fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
460        items
461            .iter()
462            .map(|(start, end, size)| MockFile {
463                start: *start,
464                end: *end,
465                size: *size,
466            })
467            .collect()
468    }
469
470    fn check_sorted_runs(
471        ranges: &[(i64, i64)],
472        expected_runs: &[Vec<(i64, i64)>],
473    ) -> Vec<SortedRun<MockFile>> {
474        let mut files = build_items(ranges);
475        let runs = find_sorted_runs(&mut files);
476
477        let result_file_ranges: Vec<Vec<_>> = runs
478            .iter()
479            .map(|r| r.items.iter().map(|f| f.range()).collect())
480            .collect();
481        assert_eq!(&expected_runs, &result_file_ranges);
482        runs
483    }
484
485    #[test]
486    fn test_find_sorted_runs() {
487        check_sorted_runs(&[], &[]);
488        check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
489        check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
490        check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
491        check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
492        check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
493        check_sorted_runs(
494            &[(1, 3), (2, 4), (4, 5)],
495            &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
496        );
497
498        check_sorted_runs(
499            &[(1, 2), (3, 4), (3, 5)],
500            &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
501        );
502
503        check_sorted_runs(
504            &[(1, 3), (2, 4), (5, 6)],
505            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
506        );
507
508        check_sorted_runs(
509            &[(1, 2), (3, 5), (4, 6)],
510            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
511        );
512
513        check_sorted_runs(
514            &[(1, 2), (3, 4), (4, 6), (7, 8)],
515            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
516        );
517        check_sorted_runs(
518            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
519            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
520        );
521
522        check_sorted_runs(
523            &[(10, 19), (20, 21), (20, 29), (30, 39)],
524            &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
525        );
526
527        check_sorted_runs(
528            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
529            &[
530                vec![(10, 19), (20, 29), (30, 39)],
531                vec![(21, 22), (31, 32), (32, 42)],
532            ],
533        );
534    }
535
536    fn check_reduce_runs(
537        files: &[(i64, i64)],
538        expected_runs: &[Vec<(i64, i64)>],
539        expected: &[(i64, i64)],
540    ) {
541        let runs = check_sorted_runs(files, expected_runs);
542        if runs.len() <= 1 {
543            assert!(expected.is_empty());
544            return;
545        }
546        let files_to_merge = reduce_runs(runs);
547        let file_to_merge_timestamps = files_to_merge
548            .into_iter()
549            .map(|f| (f.start, f.end))
550            .collect::<HashSet<_>>();
551
552        let expected = expected.iter().cloned().collect::<HashSet<_>>();
553        assert_eq!(&expected, &file_to_merge_timestamps);
554    }
555
556    #[test]
557    fn test_reduce_runs() {
558        // [1..3]   [5..6]
559        //   [2..4]
560        check_reduce_runs(
561            &[(1, 3), (2, 4), (5, 6)],
562            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
563            &[(1, 3), (2, 4)],
564        );
565
566        // [1..2][3..5]
567        //         [4..6]
568        check_reduce_runs(
569            &[(1, 2), (3, 5), (4, 6)],
570            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
571            &[(3, 5), (4, 6)],
572        );
573
574        // [1..2][3..4]    [7..8]
575        //          [4..6]
576        check_reduce_runs(
577            &[(1, 2), (3, 4), (4, 6), (7, 8)],
578            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
579            &[],
580        );
581
582        // [1..2][3........6][7..8][8..9]
583        //       [3..4][5..6]
584        check_reduce_runs(
585            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
586            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
587            &[(5, 6), (3, 4), (3, 6)], // already satisfied
588        );
589
590        // [1..2][3........6][7..8][8..9]
591        //       [3..4][5..6]
592        check_reduce_runs(
593            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
594            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
595            &[(3, 4), (3, 6), (5, 6)],
596        );
597
598        // [10..20] [30..40] [50........80][80...100][100..110]
599        //                   [50..60]  [80..90]
600        //
601        check_reduce_runs(
602            &[
603                (10, 20),
604                (30, 40),
605                (50, 60),
606                (50, 80),
607                (80, 90),
608                (80, 100),
609                (100, 110),
610            ],
611            &[
612                vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
613                vec![(50, 60), (80, 90)],
614            ],
615            &[(50, 80), (80, 100), (50, 60), (80, 90)],
616        );
617
618        // [0..10]
619        // [0...11]
620        // [0....12]
621        // [0.....13]
622        check_reduce_runs(
623            &[(0, 10), (0, 11), (0, 12), (0, 13)],
624            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
625            &[(0, 10), (0, 11)],
626        );
627    }
628
629    #[test]
630    fn test_find_overlapping_items() {
631        let mut result = Vec::new();
632
633        // Test empty inputs
634        find_overlapping_items(
635            &mut SortedRun::from(Vec::<MockFile>::new()),
636            &mut SortedRun::from(Vec::<MockFile>::new()),
637            &mut result,
638        );
639        assert_eq!(result, Vec::<MockFile>::new());
640
641        let files1 = build_items(&[(1, 3)]);
642        find_overlapping_items(
643            &mut SortedRun::from(files1.clone()),
644            &mut SortedRun::from(Vec::<MockFile>::new()),
645            &mut result,
646        );
647        assert_eq!(result, Vec::<MockFile>::new());
648
649        find_overlapping_items(
650            &mut SortedRun::from(Vec::<MockFile>::new()),
651            &mut SortedRun::from(files1.clone()),
652            &mut result,
653        );
654        assert_eq!(result, Vec::<MockFile>::new());
655
656        // Test non-overlapping ranges
657        let files1 = build_items(&[(1, 3), (5, 7)]);
658        let files2 = build_items(&[(10, 12), (15, 20)]);
659        find_overlapping_items(
660            &mut SortedRun::from(files1),
661            &mut SortedRun::from(files2),
662            &mut result,
663        );
664        assert_eq!(result, Vec::<MockFile>::new());
665
666        // Test simple overlap
667        let files1 = build_items(&[(1, 5)]);
668        let files2 = build_items(&[(3, 7)]);
669        find_overlapping_items(
670            &mut SortedRun::from(files1),
671            &mut SortedRun::from(files2),
672            &mut result,
673        );
674        assert_eq!(result.len(), 2);
675        assert_eq!(result[0].range(), (1, 5));
676        assert_eq!(result[1].range(), (3, 7));
677
678        // Test multiple overlaps
679        let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
680        let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
681        find_overlapping_items(
682            &mut SortedRun::from(files1),
683            &mut SortedRun::from(files2),
684            &mut result,
685        );
686        assert_eq!(result.len(), 6);
687
688        // Test boundary cases (touching but not overlapping)
689        let files1 = build_items(&[(1, 5)]);
690        let files2 = build_items(&[(5, 10)]); // Touching at 5
691        find_overlapping_items(
692            &mut SortedRun::from(files1),
693            &mut SortedRun::from(files2),
694            &mut result,
695        );
696        assert_eq!(result.len(), 2); // Should overlap since ranges are inclusive
697
698        // Test completely contained ranges
699        let files1 = build_items(&[(1, 10)]);
700        let files2 = build_items(&[(3, 7)]);
701        find_overlapping_items(
702            &mut SortedRun::from(files1),
703            &mut SortedRun::from(files2),
704            &mut result,
705        );
706        assert_eq!(result.len(), 2);
707
708        // Test identical ranges
709        let files1 = build_items(&[(1, 5)]);
710        let files2 = build_items(&[(1, 5)]);
711        find_overlapping_items(
712            &mut SortedRun::from(files1),
713            &mut SortedRun::from(files2),
714            &mut result,
715        );
716        assert_eq!(result.len(), 2);
717
718        // Test unsorted input handling
719        let files1 = build_items(&[(5, 10), (1, 3)]); // Unsorted
720        let files2 = build_items(&[(2, 7), (8, 12)]); // Unsorted
721        find_overlapping_items(
722            &mut SortedRun::from(files1),
723            &mut SortedRun::from(files2),
724            &mut result,
725        );
726        assert_eq!(result.len(), 4); // Should find both overlaps
727    }
728
729    #[test]
730    fn test_merge_seq_files() {
731        // Test empty input
732        let files = Vec::<MockFile>::new();
733        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
734
735        // Test single file input (should return empty vec as no merge needed)
736        let files = build_items(&[(1, 5)]);
737        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
738
739        // Test the example case: [10, 1, 1, 1] - should merge the last three files
740        let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
741        let result = merge_seq_files(&files, None);
742        assert_eq!(result.len(), 3);
743        assert_eq!(result[0].size, 1);
744        assert_eq!(result[1].size, 1);
745        assert_eq!(result[2].size, 1);
746
747        // Test with files of equal size - should merge as many as possible
748        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
749        let result = merge_seq_files(&files, Some(20));
750        assert_eq!(result.len(), 4); // Should merge all 4 files as total size is 20
751
752        // Test with max_file_size constraint
753        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
754        let result = merge_seq_files(&files, Some(10));
755        assert_eq!(result.len(), 2); // Should merge only 2 files as max size is 10
756
757        // Test with uneven file sizes - should prioritize reducing file count
758        let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
759        let result = merge_seq_files(&files, Some(10));
760        assert_eq!(result.len(), 3); // Should merge the first 3 files (total size 9)
761
762        // Test amplification factor prioritization
763        // Two possible merges: [5, 5] (amp factor 0.5) vs [10, 1, 1] (amp factor 0.83)
764        let files =
765            build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
766        let result = merge_seq_files(&files, Some(12));
767        assert_eq!(result.len(), 2);
768        assert_eq!(result[0].size, 5);
769        assert_eq!(result[1].size, 5);
770
771        // Test with large file preventing merges
772        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
773        let result = merge_seq_files(&files, Some(10));
774        assert_eq!(result.len(), 3); // Should merge the last 3 small files
775        assert_eq!(result[0].size, 1);
776        assert_eq!(result[1].size, 1);
777        assert_eq!(result[2].size, 1);
778
779        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
780        let result = merge_seq_files(&files, Some(200));
781        assert_eq!(result.len(), 4);
782
783        let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
784        let result = merge_seq_files(&files, None);
785        assert_eq!(result.len(), 3);
786        assert_eq!(result[0].size, 20);
787        assert_eq!(result[1].size, 20);
788        assert_eq!(result[2].size, 20);
789
790        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
791        let result = merge_seq_files(&files, Some(200));
792        assert_eq!(result.len(), 2);
793        assert_eq!(result[0].size, 100);
794        assert_eq!(result[1].size, 1);
795
796        let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
797        let result = merge_seq_files(&files, Some(40));
798        assert_eq!(result.len(), 2);
799        assert_eq!(result[0].start, 1);
800        assert_eq!(result[1].start, 3);
801    }
802}