mito2/compaction/
run.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This file contains code to find sorted runs in a set if ranged items and
16//! along with the best way to merge these items to satisfy the desired run count.
17
18use common_base::readable_size::ReadableSize;
19use common_base::BitVec;
20use common_time::Timestamp;
21use smallvec::{smallvec, SmallVec};
22
23use crate::sst::file::{FileHandle, FileId};
24
25/// Default max compaction output file size when not specified.
26const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
27
28/// Trait for any items with specific range (both boundaries are inclusive).
29pub trait Ranged {
30    type BoundType: Ord + Copy;
31
32    /// Returns the inclusive range of item.
33    fn range(&self) -> (Self::BoundType, Self::BoundType);
34
35    fn overlap<T>(&self, other: &T) -> bool
36    where
37        T: Ranged<BoundType = Self::BoundType>,
38    {
39        let (lhs_start, lhs_end) = self.range();
40        let (rhs_start, rhs_end) = other.range();
41
42        lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
43    }
44}
45
46pub fn find_overlapping_items<T: Item + Clone>(
47    l: &mut SortedRun<T>,
48    r: &mut SortedRun<T>,
49    result: &mut Vec<T>,
50) {
51    if l.items.is_empty() || r.items.is_empty() {
52        return;
53    }
54
55    result.clear();
56    result.reserve(l.items.len() + r.items.len());
57
58    // Sort both arrays by start boundary for more efficient overlap detection
59    if !l.sorted {
60        sort_ranged_items(&mut l.items);
61        l.sorted = true;
62    }
63    if !r.sorted {
64        sort_ranged_items(&mut r.items);
65        r.sorted = true;
66    }
67
68    let mut r_idx = 0;
69
70    let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
71
72    for (lhs_idx, lhs) in l.items.iter().enumerate() {
73        let (lhs_start, lhs_end) = lhs.range();
74
75        // Skip right elements that end before current left element starts
76        while r_idx < r.items.len() {
77            let (_, rhs_end) = r.items[r_idx].range();
78            if rhs_end < lhs_start {
79                r_idx += 1;
80            } else {
81                break;
82            }
83        }
84
85        // Check for overlaps with remaining right elements
86        let mut j = r_idx;
87        while j < r.items.len() {
88            let (rhs_start, rhs_end) = r.items[j].range();
89
90            // If right element starts after left element ends, no more overlaps possible
91            if rhs_start > lhs_end {
92                break;
93            }
94
95            // We have an overlap
96            if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) {
97                if !selected[lhs_idx] {
98                    result.push(lhs.clone());
99                    selected.set(lhs_idx, true);
100                }
101
102                let rhs_selected_idx = l.items.len() + j;
103                if !selected[rhs_selected_idx] {
104                    result.push(r.items[j].clone());
105                    selected.set(rhs_selected_idx, true);
106                }
107            }
108
109            j += 1;
110        }
111    }
112}
113
114// Sorts ranges by start asc and end desc.
115fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
116    values.sort_unstable_by(|l, r| {
117        let (l_start, l_end) = l.range();
118        let (r_start, r_end) = r.range();
119        l_start.cmp(&r_start).then(r_end.cmp(&l_end))
120    });
121}
122
123/// Trait for items to merge.
124pub trait Item: Ranged + Clone {
125    /// Size is used to calculate the cost of merging items.
126    fn size(&self) -> usize;
127}
128
129/// A group of files that are created by the same compaction task.
130#[derive(Debug, Clone)]
131pub struct FileGroup {
132    files: SmallVec<[FileHandle; 2]>,
133    size: usize,
134    num_rows: usize,
135    min_timestamp: Timestamp,
136    max_timestamp: Timestamp,
137}
138
139impl FileGroup {
140    pub(crate) fn new_with_file(file: FileHandle) -> Self {
141        let size = file.size() as usize;
142        let (min_timestamp, max_timestamp) = file.time_range();
143        let num_rows = file.num_rows();
144        Self {
145            files: smallvec![file],
146            size,
147            num_rows,
148            min_timestamp,
149            max_timestamp,
150        }
151    }
152
153    pub(crate) fn num_rows(&self) -> usize {
154        self.num_rows
155    }
156
157    pub(crate) fn add_file(&mut self, file: FileHandle) {
158        self.size += file.size() as usize;
159        self.num_rows += file.num_rows();
160        let (min_timestamp, max_timestamp) = file.time_range();
161        self.min_timestamp = self.min_timestamp.min(min_timestamp);
162        self.max_timestamp = self.max_timestamp.max(max_timestamp);
163        self.files.push(file);
164    }
165
166    #[cfg(test)]
167    pub(crate) fn files(&self) -> &[FileHandle] {
168        &self.files[..]
169    }
170
171    pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
172        SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
173    }
174
175    pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
176        self.files.into_iter()
177    }
178}
179
180impl Ranged for FileGroup {
181    type BoundType = Timestamp;
182
183    fn range(&self) -> (Self::BoundType, Self::BoundType) {
184        (self.min_timestamp, self.max_timestamp)
185    }
186}
187
188impl Item for FileGroup {
189    fn size(&self) -> usize {
190        self.size
191    }
192}
193
194/// A set of files with non-overlapping time ranges.
195#[derive(Debug, Clone)]
196pub struct SortedRun<T: Item> {
197    /// Items to merge
198    items: Vec<T>,
199    /// The total size of all items.
200    size: usize,
201    /// The lower bound of all items.
202    start: Option<T::BoundType>,
203    /// The upper bound of all items.
204    end: Option<T::BoundType>,
205    /// Whether items are sorted.
206    sorted: bool,
207}
208
209impl<T: Item> From<Vec<T>> for SortedRun<T> {
210    fn from(items: Vec<T>) -> Self {
211        let mut r = Self {
212            items: Vec::with_capacity(items.len()),
213            size: 0,
214            start: None,
215            end: None,
216            sorted: false,
217        };
218        for item in items {
219            r.push_item(item);
220        }
221
222        r
223    }
224}
225
226impl<T> Default for SortedRun<T>
227where
228    T: Item,
229{
230    fn default() -> Self {
231        Self {
232            items: vec![],
233            size: 0,
234            start: None,
235            end: None,
236            sorted: false,
237        }
238    }
239}
240
241impl<T> SortedRun<T>
242where
243    T: Item,
244{
245    pub fn items(&self) -> &[T] {
246        &self.items
247    }
248
249    fn push_item(&mut self, t: T) {
250        let (file_start, file_end) = t.range();
251        self.size += t.size();
252        self.items.push(t);
253        self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
254        self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
255    }
256}
257
258/// Finds sorted runs in given items.
259pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
260where
261    T: Item,
262{
263    if items.is_empty() {
264        return vec![];
265    }
266    // sort files
267    sort_ranged_items(items);
268
269    let mut current_run = SortedRun::default();
270    let mut runs = vec![];
271
272    let mut selection = BitVec::repeat(false, items.len());
273    while !selection.all() {
274        // until all items are assigned to some sorted run.
275        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
276            if *selected {
277                // item is already assigned.
278                continue;
279            }
280            match current_run.items.last() {
281                None => {
282                    // current run is empty, just add current_item
283                    selected.set(true);
284                    current_run.push_item(item.clone());
285                }
286                Some(last) => {
287                    // the current item does not overlap with the last item in current run,
288                    // then it belongs to current run.
289                    if !last.overlap(item) {
290                        // does not overlap, push to current run
291                        selected.set(true);
292                        current_run.push_item(item.clone());
293                    }
294                }
295            }
296        }
297        // finished an iteration, we've found a new run.
298        runs.push(std::mem::take(&mut current_run));
299    }
300    runs
301}
302
303/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs.
304/// The penalty of merging is defined as the size of all overlapping files between two runs.
305pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
306    assert!(runs.len() > 1);
307    // sort runs by size
308    runs.sort_unstable_by(|a, b| a.size.cmp(&b.size));
309    // limit max probe runs to 100
310    let probe_end = runs.len().min(100);
311    let mut min_penalty = usize::MAX;
312    let mut files = vec![];
313    let mut temp_files = vec![];
314    for i in 0..probe_end {
315        for j in i + 1..probe_end {
316            let (a, b) = runs.split_at_mut(j);
317            find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
318            let penalty = temp_files.iter().map(|e| e.size()).sum();
319            if penalty < min_penalty {
320                min_penalty = penalty;
321                files.clear();
322                files.extend_from_slice(&temp_files);
323            }
324        }
325    }
326    files
327}
328
329/// Finds the optimal set of adjacent files to merge based on a scoring system.
330///
331/// This function evaluates all possible contiguous subsets of files to find the best
332/// candidates for merging, considering:
333///
334/// 1. File reduction - prioritizes merging more files to reduce the total count
335/// 2. Write amplification - minimizes the ratio of largest file to total size
336/// 3. Size efficiency - prefers merges that utilize available space effectively
337///
338/// When multiple merge candidates have the same score, older files (those with lower indices)
339/// are preferred.
340///
341/// # Arguments
342/// * `input_files` - Slice of files to consider for merging
343/// * `max_file_size` - Optional maximum size constraint for the merged file.
344///   If None, uses 1.5 times the average file size.
345///
346/// # Returns
347/// A vector containing the best set of adjacent files to merge.
348/// Returns an empty vector if input is empty or contains only one file.
349pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
350    if input_files.is_empty() || input_files.len() == 1 {
351        return vec![];
352    }
353
354    // Limit the number of files to process to 100 to control time complexity
355    let files_to_process = if input_files.len() > 100 {
356        &input_files[0..100]
357    } else {
358        input_files
359    };
360
361    // Calculate target size based on max_file_size or average file size
362    let target_size = match max_file_size {
363        Some(size) => size as usize,
364        None => {
365            // Calculate 1.5*average_file_size if max_file_size is not provided and clamp to 2GB
366            let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
367            ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
368                .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
369        }
370    };
371
372    // Find the best group of adjacent files to merge
373    let mut best_group = Vec::new();
374    let mut best_score = f64::NEG_INFINITY;
375
376    // Try different starting positions - iterate from end to start to prefer older files
377    for start_idx in (0..files_to_process.len()).rev() {
378        // Try different ending positions - also iterate from end to start
379        for end_idx in (start_idx + 1..files_to_process.len()).rev() {
380            let group = &files_to_process[start_idx..=end_idx];
381            let total_size: usize = group.iter().map(|f| f.size()).sum();
382
383            // Skip if total size exceeds target size
384            if total_size > target_size {
385                continue; // Use continue instead of break to check smaller ranges
386            }
387
388            // Calculate amplification factor (largest file size / total size)
389            let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
390            let amplification_factor = largest_file_size as f64 / total_size as f64;
391
392            // Calculate file reduction (number of files that will be reduced)
393            let file_reduction = group.len() - 1;
394
395            // Calculate score based on multiple factors:
396            // 1. File reduction (higher is better)
397            // 2. Amplification factor (lower is better)
398            // 3. Size efficiency (how close to target size)
399            let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
400            let amp_factor_score = (1.0 - amplification_factor) * 1.5; // Lower amplification is better
401            let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); // Reward using available space
402
403            let score = file_reduction_score + amp_factor_score + size_efficiency;
404
405            // Check if this group is better than our current best
406            // Use >= instead of > to prefer older files (which we encounter first due to reverse iteration)
407            if score >= best_score {
408                best_score = score;
409                best_group = group.to_vec();
410            }
411        }
412    }
413
414    best_group
415}
416
417#[cfg(test)]
418mod tests {
419    use std::collections::HashSet;
420
421    use super::*;
422
423    #[derive(Clone, Debug, PartialEq)]
424    struct MockFile {
425        start: i64,
426        end: i64,
427        size: usize,
428    }
429
430    impl Ranged for MockFile {
431        type BoundType = i64;
432
433        fn range(&self) -> (Self::BoundType, Self::BoundType) {
434            (self.start, self.end)
435        }
436    }
437
438    impl Item for MockFile {
439        fn size(&self) -> usize {
440            self.size
441        }
442    }
443
444    fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
445        ranges
446            .iter()
447            .map(|(start, end)| MockFile {
448                start: *start,
449                end: *end,
450                size: (*end - *start) as usize,
451            })
452            .collect()
453    }
454
455    fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
456        items
457            .iter()
458            .map(|(start, end, size)| MockFile {
459                start: *start,
460                end: *end,
461                size: *size,
462            })
463            .collect()
464    }
465
466    fn check_sorted_runs(
467        ranges: &[(i64, i64)],
468        expected_runs: &[Vec<(i64, i64)>],
469    ) -> Vec<SortedRun<MockFile>> {
470        let mut files = build_items(ranges);
471        let runs = find_sorted_runs(&mut files);
472
473        let result_file_ranges: Vec<Vec<_>> = runs
474            .iter()
475            .map(|r| r.items.iter().map(|f| f.range()).collect())
476            .collect();
477        assert_eq!(&expected_runs, &result_file_ranges);
478        runs
479    }
480
481    #[test]
482    fn test_find_sorted_runs() {
483        check_sorted_runs(&[], &[]);
484        check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
485        check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
486        check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
487        check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
488        check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
489        check_sorted_runs(
490            &[(1, 3), (2, 4), (4, 5)],
491            &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
492        );
493
494        check_sorted_runs(
495            &[(1, 2), (3, 4), (3, 5)],
496            &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
497        );
498
499        check_sorted_runs(
500            &[(1, 3), (2, 4), (5, 6)],
501            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
502        );
503
504        check_sorted_runs(
505            &[(1, 2), (3, 5), (4, 6)],
506            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
507        );
508
509        check_sorted_runs(
510            &[(1, 2), (3, 4), (4, 6), (7, 8)],
511            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
512        );
513        check_sorted_runs(
514            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
515            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
516        );
517
518        check_sorted_runs(
519            &[(10, 19), (20, 21), (20, 29), (30, 39)],
520            &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
521        );
522
523        check_sorted_runs(
524            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
525            &[
526                vec![(10, 19), (20, 29), (30, 39)],
527                vec![(21, 22), (31, 32), (32, 42)],
528            ],
529        );
530    }
531
532    fn check_reduce_runs(
533        files: &[(i64, i64)],
534        expected_runs: &[Vec<(i64, i64)>],
535        expected: &[(i64, i64)],
536    ) {
537        let runs = check_sorted_runs(files, expected_runs);
538        if runs.len() <= 1 {
539            assert!(expected.is_empty());
540            return;
541        }
542        let files_to_merge = reduce_runs(runs);
543        let file_to_merge_timestamps = files_to_merge
544            .into_iter()
545            .map(|f| (f.start, f.end))
546            .collect::<HashSet<_>>();
547
548        let expected = expected.iter().cloned().collect::<HashSet<_>>();
549        assert_eq!(&expected, &file_to_merge_timestamps);
550    }
551
552    #[test]
553    fn test_reduce_runs() {
554        // [1..3]   [5..6]
555        //   [2..4]
556        check_reduce_runs(
557            &[(1, 3), (2, 4), (5, 6)],
558            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
559            &[(1, 3), (2, 4)],
560        );
561
562        // [1..2][3..5]
563        //         [4..6]
564        check_reduce_runs(
565            &[(1, 2), (3, 5), (4, 6)],
566            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
567            &[(3, 5), (4, 6)],
568        );
569
570        // [1..2][3..4]    [7..8]
571        //          [4..6]
572        check_reduce_runs(
573            &[(1, 2), (3, 4), (4, 6), (7, 8)],
574            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
575            &[],
576        );
577
578        // [1..2][3........6][7..8][8..9]
579        //       [3..4][5..6]
580        check_reduce_runs(
581            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
582            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
583            &[(5, 6), (3, 4), (3, 6)], // already satisfied
584        );
585
586        // [1..2][3........6][7..8][8..9]
587        //       [3..4][5..6]
588        check_reduce_runs(
589            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
590            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
591            &[(3, 4), (3, 6), (5, 6)],
592        );
593
594        // [10..20] [30..40] [50........80][80...100][100..110]
595        //                   [50..60]  [80..90]
596        //
597        check_reduce_runs(
598            &[
599                (10, 20),
600                (30, 40),
601                (50, 60),
602                (50, 80),
603                (80, 90),
604                (80, 100),
605                (100, 110),
606            ],
607            &[
608                vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
609                vec![(50, 60), (80, 90)],
610            ],
611            &[(50, 80), (80, 100), (50, 60), (80, 90)],
612        );
613
614        // [0..10]
615        // [0...11]
616        // [0....12]
617        // [0.....13]
618        check_reduce_runs(
619            &[(0, 10), (0, 11), (0, 12), (0, 13)],
620            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
621            &[(0, 10), (0, 11)],
622        );
623    }
624
625    #[test]
626    fn test_find_overlapping_items() {
627        let mut result = Vec::new();
628
629        // Test empty inputs
630        find_overlapping_items(
631            &mut SortedRun::from(Vec::<MockFile>::new()),
632            &mut SortedRun::from(Vec::<MockFile>::new()),
633            &mut result,
634        );
635        assert_eq!(result, Vec::<MockFile>::new());
636
637        let files1 = build_items(&[(1, 3)]);
638        find_overlapping_items(
639            &mut SortedRun::from(files1.clone()),
640            &mut SortedRun::from(Vec::<MockFile>::new()),
641            &mut result,
642        );
643        assert_eq!(result, Vec::<MockFile>::new());
644
645        find_overlapping_items(
646            &mut SortedRun::from(Vec::<MockFile>::new()),
647            &mut SortedRun::from(files1.clone()),
648            &mut result,
649        );
650        assert_eq!(result, Vec::<MockFile>::new());
651
652        // Test non-overlapping ranges
653        let files1 = build_items(&[(1, 3), (5, 7)]);
654        let files2 = build_items(&[(10, 12), (15, 20)]);
655        find_overlapping_items(
656            &mut SortedRun::from(files1),
657            &mut SortedRun::from(files2),
658            &mut result,
659        );
660        assert_eq!(result, Vec::<MockFile>::new());
661
662        // Test simple overlap
663        let files1 = build_items(&[(1, 5)]);
664        let files2 = build_items(&[(3, 7)]);
665        find_overlapping_items(
666            &mut SortedRun::from(files1),
667            &mut SortedRun::from(files2),
668            &mut result,
669        );
670        assert_eq!(result.len(), 2);
671        assert_eq!(result[0].range(), (1, 5));
672        assert_eq!(result[1].range(), (3, 7));
673
674        // Test multiple overlaps
675        let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
676        let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
677        find_overlapping_items(
678            &mut SortedRun::from(files1),
679            &mut SortedRun::from(files2),
680            &mut result,
681        );
682        assert_eq!(result.len(), 6);
683
684        // Test boundary cases (touching but not overlapping)
685        let files1 = build_items(&[(1, 5)]);
686        let files2 = build_items(&[(5, 10)]); // Touching at 5
687        find_overlapping_items(
688            &mut SortedRun::from(files1),
689            &mut SortedRun::from(files2),
690            &mut result,
691        );
692        assert_eq!(result.len(), 2); // Should overlap since ranges are inclusive
693
694        // Test completely contained ranges
695        let files1 = build_items(&[(1, 10)]);
696        let files2 = build_items(&[(3, 7)]);
697        find_overlapping_items(
698            &mut SortedRun::from(files1),
699            &mut SortedRun::from(files2),
700            &mut result,
701        );
702        assert_eq!(result.len(), 2);
703
704        // Test identical ranges
705        let files1 = build_items(&[(1, 5)]);
706        let files2 = build_items(&[(1, 5)]);
707        find_overlapping_items(
708            &mut SortedRun::from(files1),
709            &mut SortedRun::from(files2),
710            &mut result,
711        );
712        assert_eq!(result.len(), 2);
713
714        // Test unsorted input handling
715        let files1 = build_items(&[(5, 10), (1, 3)]); // Unsorted
716        let files2 = build_items(&[(2, 7), (8, 12)]); // Unsorted
717        find_overlapping_items(
718            &mut SortedRun::from(files1),
719            &mut SortedRun::from(files2),
720            &mut result,
721        );
722        assert_eq!(result.len(), 4); // Should find both overlaps
723    }
724
725    #[test]
726    fn test_merge_seq_files() {
727        // Test empty input
728        let files = Vec::<MockFile>::new();
729        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
730
731        // Test single file input (should return empty vec as no merge needed)
732        let files = build_items(&[(1, 5)]);
733        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
734
735        // Test the example case: [10, 1, 1, 1] - should merge the last three files
736        let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
737        let result = merge_seq_files(&files, None);
738        assert_eq!(result.len(), 3);
739        assert_eq!(result[0].size, 1);
740        assert_eq!(result[1].size, 1);
741        assert_eq!(result[2].size, 1);
742
743        // Test with files of equal size - should merge as many as possible
744        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
745        let result = merge_seq_files(&files, Some(20));
746        assert_eq!(result.len(), 4); // Should merge all 4 files as total size is 20
747
748        // Test with max_file_size constraint
749        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
750        let result = merge_seq_files(&files, Some(10));
751        assert_eq!(result.len(), 2); // Should merge only 2 files as max size is 10
752
753        // Test with uneven file sizes - should prioritize reducing file count
754        let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
755        let result = merge_seq_files(&files, Some(10));
756        assert_eq!(result.len(), 3); // Should merge the first 3 files (total size 9)
757
758        // Test amplification factor prioritization
759        // Two possible merges: [5, 5] (amp factor 0.5) vs [10, 1, 1] (amp factor 0.83)
760        let files =
761            build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
762        let result = merge_seq_files(&files, Some(12));
763        assert_eq!(result.len(), 2);
764        assert_eq!(result[0].size, 5);
765        assert_eq!(result[1].size, 5);
766
767        // Test with large file preventing merges
768        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
769        let result = merge_seq_files(&files, Some(10));
770        assert_eq!(result.len(), 3); // Should merge the last 3 small files
771        assert_eq!(result[0].size, 1);
772        assert_eq!(result[1].size, 1);
773        assert_eq!(result[2].size, 1);
774
775        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
776        let result = merge_seq_files(&files, Some(200));
777        assert_eq!(result.len(), 4);
778
779        let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
780        let result = merge_seq_files(&files, None);
781        assert_eq!(result.len(), 3);
782        assert_eq!(result[0].size, 20);
783        assert_eq!(result[1].size, 20);
784        assert_eq!(result[2].size, 20);
785
786        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
787        let result = merge_seq_files(&files, Some(200));
788        assert_eq!(result.len(), 2);
789        assert_eq!(result[0].size, 100);
790        assert_eq!(result[1].size, 1);
791
792        let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
793        let result = merge_seq_files(&files, Some(40));
794        assert_eq!(result.len(), 2);
795        assert_eq!(result[0].start, 1);
796        assert_eq!(result[1].start, 3);
797    }
798}