Skip to main content

mito2/compaction/
run.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This file contains code to find sorted runs in a set if ranged items and
16//! along with the best way to merge these items to satisfy the desired run count.
17
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20
21use bytes::{Buf, Bytes};
22use common_base::BitVec;
23use common_base::readable_size::ReadableSize;
24use common_time::Timestamp;
25use smallvec::{SmallVec, smallvec};
26
27use crate::sst::file::{FileHandle, RegionFileId};
28
29/// Default max compaction output file size when not specified.
30const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
31
32/// Trait for any items with specific range (both boundaries are inclusive).
33pub trait Ranged {
34    type BoundType: Ord + Copy;
35
36    /// Returns the inclusive range of item.
37    fn range(&self) -> (Self::BoundType, Self::BoundType);
38
39    fn overlap(&self, other: &Self) -> bool {
40        let (lhs_start, lhs_end) = self.range();
41        let (rhs_start, rhs_end) = other.range();
42
43        lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
44    }
45
46    /// Like `overlap`, but treats touching boundaries as overlapping (inclusive).
47    /// Used by `find_overlapping_items` where shared boundaries count as overlap.
48    fn overlap_inclusive(&self, other: &Self) -> bool {
49        let (lhs_start, lhs_end) = self.range();
50        let (rhs_start, rhs_end) = other.range();
51
52        lhs_start.max(rhs_start) <= lhs_end.min(rhs_end)
53    }
54}
55
56pub(crate) fn primary_key_ranges_overlap(lhs: &(Bytes, Bytes), rhs: &(Bytes, Bytes)) -> bool {
57    lhs.0.chunk().max(rhs.0.chunk()) <= lhs.1.chunk().min(rhs.1.chunk())
58}
59
60pub(crate) fn merge_primary_key_ranges(
61    lhs: Option<(Bytes, Bytes)>,
62    rhs: Option<(Bytes, Bytes)>,
63) -> Option<(Bytes, Bytes)> {
64    match (lhs, rhs) {
65        (Some((lhs_min, lhs_max)), Some((rhs_min, rhs_max))) => {
66            Some((lhs_min.min(rhs_min), lhs_max.max(rhs_max)))
67        }
68        _ => None,
69    }
70}
71
72pub fn find_overlapping_items<T: Item + Clone>(
73    l: &mut SortedRun<T>,
74    r: &mut SortedRun<T>,
75    result: &mut Vec<T>,
76) {
77    if l.items.is_empty() || r.items.is_empty() {
78        return;
79    }
80
81    result.clear();
82    result.reserve(l.items.len() + r.items.len());
83
84    // Sort both arrays by start boundary for more efficient overlap detection
85    if !l.sorted {
86        sort_ranged_items(&mut l.items);
87        l.sorted = true;
88    }
89    if !r.sorted {
90        sort_ranged_items(&mut r.items);
91        r.sorted = true;
92    }
93
94    let mut r_idx = 0;
95
96    let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
97
98    for (lhs_idx, lhs) in l.items.iter().enumerate() {
99        let (lhs_start, lhs_end) = lhs.range();
100
101        // Skip right elements that end before current left element starts
102        while r_idx < r.items.len() {
103            let (_, rhs_end) = r.items[r_idx].range();
104            if rhs_end < lhs_start {
105                r_idx += 1;
106            } else {
107                break;
108            }
109        }
110
111        // Check for overlaps with remaining right elements
112        let mut j = r_idx;
113        while j < r.items.len() {
114            let (rhs_start, _rhs_end) = r.items[j].range();
115
116            // If right element starts after left element ends, no more overlaps possible
117            if rhs_start > lhs_end {
118                break;
119            }
120
121            // We have an overlap (inclusive: touching boundaries count)
122            if lhs.overlap_inclusive(&r.items[j]) {
123                if !selected[lhs_idx] {
124                    result.push(lhs.clone());
125                    selected.set(lhs_idx, true);
126                }
127
128                let rhs_selected_idx = l.items.len() + j;
129                if !selected[rhs_selected_idx] {
130                    result.push(r.items[j].clone());
131                    selected.set(rhs_selected_idx, true);
132                }
133            }
134
135            j += 1;
136        }
137    }
138}
139
140// Sorts ranges by start asc and end desc.
141fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
142    values.sort_unstable_by(|l, r| {
143        let (l_start, l_end) = l.range();
144        let (r_start, r_end) = r.range();
145        l_start.cmp(&r_start).then(r_end.cmp(&l_end))
146    });
147}
148
149/// Trait for items to merge.
150pub trait Item: Ranged + Clone {
151    /// Size is used to calculate the cost of merging items.
152    fn size(&self) -> usize;
153}
154
155/// A group of files that are created by the same compaction task.
156#[derive(Debug, Clone)]
157pub struct FileGroup {
158    files: SmallVec<[FileHandle; 2]>,
159    size: usize,
160    num_rows: usize,
161    min_timestamp: Timestamp,
162    max_timestamp: Timestamp,
163    primary_key_range: Option<(Bytes, Bytes)>,
164}
165
166impl FileGroup {
167    pub(crate) fn new_with_file(file: FileHandle) -> Self {
168        let size = file.size() as usize;
169        let (min_timestamp, max_timestamp) = file.time_range();
170        let num_rows = file.num_rows();
171        let primary_key_range = file.primary_key_range();
172        Self {
173            files: smallvec![file],
174            size,
175            num_rows,
176            min_timestamp,
177            max_timestamp,
178            primary_key_range,
179        }
180    }
181
182    pub(crate) fn num_rows(&self) -> usize {
183        self.num_rows
184    }
185
186    pub(crate) fn add_file(&mut self, file: FileHandle) {
187        self.size += file.size() as usize;
188        self.num_rows += file.num_rows();
189        let (min_timestamp, max_timestamp) = file.time_range();
190        self.min_timestamp = self.min_timestamp.min(min_timestamp);
191        self.max_timestamp = self.max_timestamp.max(max_timestamp);
192        self.primary_key_range =
193            merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
194        self.files.push(file);
195    }
196
197    pub(crate) fn num_files(&self) -> usize {
198        self.files.len()
199    }
200
201    #[cfg(test)]
202    pub(crate) fn files(&self) -> &[FileHandle] {
203        &self.files[..]
204    }
205
206    pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
207        SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
208    }
209
210    pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
211        self.files.into_iter()
212    }
213}
214
215impl Ranged for FileGroup {
216    type BoundType = Timestamp;
217
218    fn range(&self) -> (Self::BoundType, Self::BoundType) {
219        (self.min_timestamp, self.max_timestamp)
220    }
221
222    fn overlap(&self, other: &Self) -> bool {
223        let (lhs_start, lhs_end) = self.range();
224        let (rhs_start, rhs_end) = other.range();
225        if lhs_start.max(rhs_start) >= lhs_end.min(rhs_end) {
226            return false;
227        }
228
229        match (&self.primary_key_range, &other.primary_key_range) {
230            (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
231            _ => true,
232        }
233    }
234
235    fn overlap_inclusive(&self, other: &Self) -> bool {
236        let (lhs_start, lhs_end) = self.range();
237        let (rhs_start, rhs_end) = other.range();
238        if lhs_start.max(rhs_start) > lhs_end.min(rhs_end) {
239            return false;
240        }
241
242        match (&self.primary_key_range, &other.primary_key_range) {
243            (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
244            _ => true,
245        }
246    }
247}
248
249impl Item for FileGroup {
250    fn size(&self) -> usize {
251        self.size
252    }
253}
254
255/// A set of files with non-overlapping time ranges.
256#[derive(Debug, Clone)]
257pub struct SortedRun<T: Item> {
258    /// Items to merge
259    items: Vec<T>,
260    /// The total size of all items.
261    size: usize,
262    /// The lower bound of all items.
263    start: Option<T::BoundType>,
264    /// The upper bound of all items.
265    end: Option<T::BoundType>,
266    /// Whether items are sorted.
267    sorted: bool,
268}
269
270impl<T: Item> From<Vec<T>> for SortedRun<T> {
271    fn from(items: Vec<T>) -> Self {
272        let mut r = Self {
273            items: Vec::with_capacity(items.len()),
274            size: 0,
275            start: None,
276            end: None,
277            sorted: false,
278        };
279        for item in items {
280            r.push_item(item);
281        }
282
283        r
284    }
285}
286
287impl<T> Default for SortedRun<T>
288where
289    T: Item,
290{
291    fn default() -> Self {
292        Self {
293            items: vec![],
294            size: 0,
295            start: None,
296            end: None,
297            sorted: false,
298        }
299    }
300}
301
302impl<T> SortedRun<T>
303where
304    T: Item,
305{
306    pub fn items(&self) -> &[T] {
307        &self.items
308    }
309
310    fn push_item(&mut self, t: T) {
311        let (file_start, file_end) = t.range();
312        self.size += t.size();
313        self.items.push(t);
314        self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
315        self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
316    }
317}
318
319/// Finds sorted runs in given items.
320pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
321where
322    T: Item,
323{
324    if items.is_empty() {
325        return vec![];
326    }
327    // sort files
328    sort_ranged_items(items);
329
330    let mut current_run = SortedRun::default();
331    let mut runs = vec![];
332    let mut active_run_item_indices = Vec::new();
333
334    let mut selection = BitVec::repeat(false, items.len());
335    while !selection.all() {
336        // until all items are assigned to some sorted run.
337        let mut last_pruned_start = None;
338        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
339            if *selected {
340                // item is already assigned.
341                continue;
342            }
343            if current_run.items.is_empty() {
344                // current run is empty, just add current_item
345                selected.set(true);
346                current_run.push_item(item.clone());
347                active_run_item_indices.push(current_run.items.len() - 1);
348            } else {
349                // the current item does not overlap with any item in current run,
350                // then it belongs to current run. Because now we introduced primary
351                // key range, we cannot simply use timestamps to check overlapping.
352                let (item_start, _) = item.range();
353                if last_pruned_start != Some(item_start) {
354                    active_run_item_indices.retain(|idx| {
355                        let (_, run_item_end) = current_run.items[*idx].range();
356                        run_item_end > item_start
357                    });
358                    last_pruned_start = Some(item_start);
359                }
360
361                let mut overlaps_any = false;
362                for idx in &active_run_item_indices {
363                    let run_item = &current_run.items[*idx];
364                    if run_item.overlap(item) {
365                        overlaps_any = true;
366                        break;
367                    }
368                }
369                if !overlaps_any {
370                    // does not overlap, push to current run
371                    selected.set(true);
372                    let item_idx = current_run.items.len();
373                    current_run.push_item(item.clone());
374                    active_run_item_indices.push(item_idx);
375                }
376            }
377        }
378        // finished an iteration, we've found a new run.
379        runs.push(std::mem::take(&mut current_run));
380        active_run_item_indices.clear();
381    }
382    runs
383}
384
385#[cfg(any(test, feature = "test", feature = "testing"))]
386pub fn find_sorted_runs_original<T>(items: &mut [T]) -> Vec<SortedRun<T>>
387where
388    T: Item,
389{
390    if items.is_empty() {
391        return vec![];
392    }
393    // sort files
394    sort_ranged_items(items);
395
396    let mut current_run = SortedRun::default();
397    let mut runs = vec![];
398
399    let mut selection = BitVec::repeat(false, items.len());
400    while !selection.all() {
401        // until all items are assigned to some sorted run.
402        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
403            if *selected {
404                // item is already assigned.
405                continue;
406            }
407            if current_run.items.is_empty() {
408                // current run is empty, just add current_item
409                selected.set(true);
410                current_run.push_item(item.clone());
411            } else {
412                // the current item does not overlap with any item in current run,
413                // then it belongs to current run. Because now we introduced primary
414                // key range, we cannot simply use timestamps to check overlapping.
415                let overlaps_any = current_run.items.iter().any(|i| i.overlap(item));
416                if !overlaps_any {
417                    // does not overlap, push to current run
418                    selected.set(true);
419                    current_run.push_item(item.clone());
420                }
421            }
422        }
423        // finished an iteration, we've found a new run.
424        runs.push(std::mem::take(&mut current_run));
425    }
426    runs
427}
428
429pub(crate) fn find_sorted_runs_by_time_range<T>(items: &mut [T]) -> Vec<SortedRun<T>>
430where
431    T: Item,
432{
433    if items.is_empty() {
434        return vec![];
435    }
436    sort_ranged_items(items);
437
438    use derive_more::{Eq, PartialEq};
439
440    /// `SortedRun` with a creation sequence `i`.
441    #[derive(PartialEq, Eq)]
442    struct Run<T: Item> {
443        i: usize,
444        #[partial_eq(skip)]
445        run: SortedRun<T>,
446    }
447
448    impl<T: Item> Run<T> {
449        fn new(i: usize, item: &T) -> Run<T> {
450            let mut run = SortedRun::default();
451            run.push_item(item.clone());
452            Run { i, run }
453        }
454
455        fn push_item(&mut self, item: &T) {
456            self.run.push_item(item.clone());
457        }
458    }
459
460    impl<T: Item> PartialOrd for Run<T> {
461        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
462            Some(self.cmp(other))
463        }
464    }
465
466    /// Sort by run's `end` desc then `start` asc.
467    impl<T: Item> Ord for Run<T> {
468        fn cmp(&self, other: &Self) -> Ordering {
469            let l_run = &self.run;
470            let r_run = &other.run;
471
472            // Safety: `start` and `end` must both exist because it's guaranteed that whenever a
473            // `Run` is created, an item is pushed into it immediately (see its `new` method above).
474            // And there are no other ways to create a `Run` beyond its `new` method in this
475            // function's scope.
476            let l_end = l_run.end.unwrap();
477            let r_end = r_run.end.unwrap();
478            r_end
479                .cmp(&l_end)
480                .then_with(|| {
481                    let l_start = l_run.start.unwrap();
482                    let r_start = r_run.start.unwrap();
483                    l_start.cmp(&r_start)
484                })
485                .then_with(|| self.i.cmp(&other.i))
486        }
487    }
488
489    /// Wrapper around the `Run` above, to support sorting them by their creation sequence `i`.
490    #[derive(PartialEq, Eq)]
491    struct Wrapper<T: Item>(Run<T>);
492
493    impl<T: Item> PartialOrd for Wrapper<T> {
494        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
495            Some(self.cmp(other))
496        }
497    }
498
499    impl<T: Item> Ord for Wrapper<T> {
500        fn cmp(&self, other: &Self) -> Ordering {
501            other.0.i.cmp(&self.0.i)
502        }
503    }
504
505    // Two heaps for finding a run that is both:
506    // 1. not overlapping with item's range,
507    // 2. and is created earliest,
508    // when iterating the items.
509    //
510    // Heap 1 (`runs_sorted_by_end`) is for storing the runs of which top has the minimal "end"
511    // just about to overlap with the current selected item.
512    //
513    // Heap 2 (`runs_sort_by_index`) is for storing the runs that all have "end"s non-overlap with
514    // the current selected item, and of which top is the earliest created run.
515    //
516    // The finding of a suitable run basically works like this:
517    // 1. moves the runs in heap 1 to heap 2, until the top is overlapping with the current item;
518    // 2. now heap 2 has all the runs that can accept the current item, pop its top;
519    // 3. the top is the earliest created run, push the current item;
520    // 4. because the run has changed, push it back to heap 1;
521    // 5. check the next item. Important: we don't need to push the runs in heap 2 to 1, because
522    //    the items are sorted by "start". When checking the next item, heap 2's runs must all have
523    //    "end"s smaller than next item's "start".
524    //
525    // Actually the heap 2 is only for aligning with the runs selection outcomes in the original
526    // `find_sorted_runs` implementation. If we just need the invariant that each run has the
527    // non-overlapping items, we can get rid of heap 2 and make the codes simpler.
528
529    let mut runs_sort_by_end = BinaryHeap::<Run<T>>::new();
530    let mut runs_sort_by_index = BinaryHeap::<Wrapper<T>>::new();
531    let mut i = 0;
532
533    for item in items {
534        let (start, _) = item.range();
535
536        while let Some(run) = runs_sort_by_end.pop_if(|x| x.run.end.unwrap() <= start) {
537            runs_sort_by_index.push(Wrapper(run));
538        }
539
540        let Some(mut run) = runs_sort_by_index.pop() else {
541            i += 1;
542            runs_sort_by_end.push(Run::new(i, item));
543            continue;
544        };
545
546        run.0.push_item(item);
547        runs_sort_by_end.push(run.0);
548    }
549
550    let mut runs = runs_sort_by_end.into_vec();
551    runs.extend(runs_sort_by_index.into_vec().into_iter().map(|x| x.0));
552    runs.sort_unstable_by_key(|run| run.i);
553    runs.into_iter().map(|x| x.run).collect()
554}
555
556/// Finds a set of files with minimum penalty to merge that can reduce the total num of runs.
557/// The penalty of merging is defined as the size of all overlapping files between two runs.
558pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
559    assert!(runs.len() > 1);
560    // sort runs by size
561    runs.sort_unstable_by_key(|a| a.size);
562    // limit max probe runs to 100
563    let probe_end = runs.len().min(100);
564    let mut min_penalty = usize::MAX;
565    let mut files = vec![];
566    let mut temp_files = vec![];
567    for i in 0..probe_end {
568        for j in i + 1..probe_end {
569            let (a, b) = runs.split_at_mut(j);
570            find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
571            let penalty = temp_files.iter().map(|e| e.size()).sum();
572            if penalty < min_penalty {
573                min_penalty = penalty;
574                files.clear();
575                files.extend_from_slice(&temp_files);
576            }
577        }
578    }
579    files
580}
581
582/// Finds the optimal set of adjacent files to merge based on a scoring system.
583///
584/// This function evaluates all possible contiguous subsets of files to find the best
585/// candidates for merging, considering:
586///
587/// 1. File reduction - prioritizes merging more files to reduce the total count
588/// 2. Write amplification - minimizes the ratio of largest file to total size
589/// 3. Size efficiency - prefers merges that utilize available space effectively
590///
591/// When multiple merge candidates have the same score, older files (those with lower indices)
592/// are preferred.
593///
594/// # Arguments
595/// * `input_files` - Slice of files to consider for merging
596/// * `max_file_size` - Optional maximum size constraint for the merged file.
597///   If None, uses 1.5 times the average file size.
598///
599/// # Returns
600/// A vector containing the best set of adjacent files to merge.
601/// Returns an empty vector if input is empty or contains only one file.
602pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
603    if input_files.is_empty() || input_files.len() == 1 {
604        return vec![];
605    }
606
607    // Limit the number of files to process to 100 to control time complexity
608    let files_to_process = if input_files.len() > 100 {
609        &input_files[0..100]
610    } else {
611        input_files
612    };
613
614    // Calculate target size based on max_file_size or average file size
615    let target_size = match max_file_size {
616        Some(size) => size as usize,
617        None => {
618            // Calculate 1.5*average_file_size if max_file_size is not provided and clamp to 2GB
619            let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
620            ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
621                .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
622        }
623    };
624
625    // Find the best group of adjacent files to merge
626    let mut best_group = Vec::new();
627    let mut best_score = f64::NEG_INFINITY;
628
629    // Try different starting positions - iterate from end to start to prefer older files
630    for start_idx in (0..files_to_process.len()).rev() {
631        // Try different ending positions - also iterate from end to start
632        for end_idx in (start_idx + 1..files_to_process.len()).rev() {
633            let group = &files_to_process[start_idx..=end_idx];
634            let total_size: usize = group.iter().map(|f| f.size()).sum();
635
636            // Skip if total size exceeds target size
637            if total_size > target_size {
638                continue; // Use continue instead of break to check smaller ranges
639            }
640
641            // Calculate amplification factor (largest file size / total size)
642            let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
643            let amplification_factor = largest_file_size as f64 / total_size as f64;
644
645            // Calculate file reduction (number of files that will be reduced)
646            let file_reduction = group.len() - 1;
647
648            // Calculate score based on multiple factors:
649            // 1. File reduction (higher is better)
650            // 2. Amplification factor (lower is better)
651            // 3. Size efficiency (how close to target size)
652            let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
653            let amp_factor_score = (1.0 - amplification_factor) * 1.5; // Lower amplification is better
654            let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); // Reward using available space
655
656            let score = file_reduction_score + amp_factor_score + size_efficiency;
657
658            // Check if this group is better than our current best
659            // Use >= instead of > to prefer older files (which we encounter first due to reverse iteration)
660            if score >= best_score {
661                best_score = score;
662                best_group = group.to_vec();
663            }
664        }
665    }
666
667    best_group
668}
669
670#[cfg(test)]
671mod tests {
672    use std::collections::HashSet;
673
674    use bytes::Bytes;
675    use store_api::storage::FileId;
676
677    use super::*;
678    use crate::compaction::test_util::new_file_handle_with_size_sequence_and_primary_key_range;
679
680    #[derive(Clone, Debug, PartialEq)]
681    struct MockFile {
682        start: i64,
683        end: i64,
684        size: usize,
685    }
686
687    impl Ranged for MockFile {
688        type BoundType = i64;
689
690        fn range(&self) -> (Self::BoundType, Self::BoundType) {
691            (self.start, self.end)
692        }
693    }
694
695    impl Item for MockFile {
696        fn size(&self) -> usize {
697            self.size
698        }
699    }
700
701    fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
702        ranges
703            .iter()
704            .map(|(start, end)| MockFile {
705                start: *start,
706                end: *end,
707                size: (*end - *start) as usize,
708            })
709            .collect()
710    }
711
712    fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
713        items
714            .iter()
715            .map(|(start, end, size)| MockFile {
716                start: *start,
717                end: *end,
718                size: *size,
719            })
720            .collect()
721    }
722
723    fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
724        Some((Bytes::from_static(min), Bytes::from_static(max)))
725    }
726
727    fn check_sorted_runs(
728        ranges: &[(i64, i64)],
729        expected_runs: &[Vec<(i64, i64)>],
730    ) -> Vec<SortedRun<MockFile>> {
731        let mut files = build_items(ranges);
732        let mut files_clone = files.clone();
733
734        let runs = find_sorted_runs(&mut files);
735
736        let result_file_ranges: Vec<Vec<_>> = runs
737            .iter()
738            .map(|r| r.items.iter().map(|f| f.range()).collect())
739            .collect();
740        assert_eq!(&expected_runs, &result_file_ranges);
741
742        let runs_by_time_range = find_sorted_runs_by_time_range(&mut files_clone);
743        let results: Vec<Vec<_>> = runs_by_time_range
744            .iter()
745            .map(|r| r.items.iter().map(|f| f.range()).collect())
746            .collect();
747        assert_eq!(&expected_runs, &results);
748        runs
749    }
750
751    fn sorted_run_ranges<T: Item>(runs: &[SortedRun<T>]) -> Vec<Vec<T::BoundType>> {
752        runs.iter()
753            .map(|r| {
754                r.items
755                    .iter()
756                    .flat_map(|f| {
757                        let (start, end) = f.range();
758                        [start, end]
759                    })
760                    .collect()
761            })
762            .collect()
763    }
764
765    fn check_find_sorted_runs_consistency(ranges: &[(i64, i64)]) {
766        let mut files = build_items(ranges);
767        let mut files_for_original = files.clone();
768
769        let runs = find_sorted_runs(&mut files);
770        let original_runs = find_sorted_runs_original(&mut files_for_original);
771
772        assert_eq!(sorted_run_ranges(&original_runs), sorted_run_ranges(&runs));
773    }
774
775    #[test]
776    fn test_find_sorted_runs() {
777        check_sorted_runs(&[], &[]);
778        check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
779        check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
780        check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
781        check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
782        check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
783        check_sorted_runs(
784            &[(1, 3), (2, 4), (4, 5)],
785            &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
786        );
787
788        check_sorted_runs(
789            &[(1, 2), (3, 4), (3, 5)],
790            &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
791        );
792
793        check_sorted_runs(
794            &[(1, 3), (2, 4), (5, 6)],
795            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
796        );
797
798        check_sorted_runs(
799            &[(1, 2), (3, 5), (4, 6)],
800            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
801        );
802
803        check_sorted_runs(
804            &[(1, 2), (3, 4), (4, 6), (7, 8)],
805            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
806        );
807        check_sorted_runs(
808            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
809            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
810        );
811
812        check_sorted_runs(
813            &[(10, 19), (20, 21), (20, 29), (30, 39)],
814            &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
815        );
816
817        check_sorted_runs(
818            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
819            &[
820                vec![(10, 19), (20, 29), (30, 39)],
821                vec![(21, 22), (31, 32), (32, 42)],
822            ],
823        );
824    }
825
826    #[test]
827    fn test_find_sorted_runs_matches_original_impl() {
828        for ranges in [
829            &[][..],
830            &[(1, 1), (2, 2)],
831            &[(1, 2), (2, 3)],
832            &[(2, 4), (1, 3)],
833            &[(1, 3), (2, 4), (4, 5)],
834            &[(1, 2), (3, 4), (3, 5)],
835            &[(1, 3), (2, 4), (5, 6)],
836            &[(1, 2), (3, 5), (4, 6)],
837            &[(1, 2), (3, 4), (4, 6), (7, 8)],
838            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
839            &[(10, 19), (20, 21), (20, 29), (30, 39)],
840            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
841            &[(32, 42), (10, 19), (31, 32), (20, 29), (21, 22), (30, 39)],
842        ] {
843            check_find_sorted_runs_consistency(ranges);
844        }
845    }
846
847    fn check_reduce_runs(
848        files: &[(i64, i64)],
849        expected_runs: &[Vec<(i64, i64)>],
850        expected: &[(i64, i64)],
851    ) {
852        let runs = check_sorted_runs(files, expected_runs);
853        if runs.len() <= 1 {
854            assert!(expected.is_empty());
855            return;
856        }
857        let files_to_merge = reduce_runs(runs);
858        let file_to_merge_timestamps = files_to_merge
859            .into_iter()
860            .map(|f| (f.start, f.end))
861            .collect::<HashSet<_>>();
862
863        let expected = expected.iter().cloned().collect::<HashSet<_>>();
864        assert_eq!(&expected, &file_to_merge_timestamps);
865    }
866
867    #[test]
868    fn test_reduce_runs() {
869        // [1..3]   [5..6]
870        //   [2..4]
871        check_reduce_runs(
872            &[(1, 3), (2, 4), (5, 6)],
873            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
874            &[(1, 3), (2, 4)],
875        );
876
877        // [1..2][3..5]
878        //         [4..6]
879        check_reduce_runs(
880            &[(1, 2), (3, 5), (4, 6)],
881            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
882            &[(3, 5), (4, 6)],
883        );
884
885        // [1..2][3..4]    [7..8]
886        //          [4..6]
887        check_reduce_runs(
888            &[(1, 2), (3, 4), (4, 6), (7, 8)],
889            &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
890            &[],
891        );
892
893        // [1..2][3........6][7..8][8..9]
894        //       [3..4][5..6]
895        check_reduce_runs(
896            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
897            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
898            &[(5, 6), (3, 4), (3, 6)], // already satisfied
899        );
900
901        // [1..2][3........6][7..8][8..9]
902        //       [3..4][5..6]
903        check_reduce_runs(
904            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
905            &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
906            &[(3, 4), (3, 6), (5, 6)],
907        );
908
909        // [10..20] [30..40] [50........80][80...100][100..110]
910        //                   [50..60]  [80..90]
911        //
912        check_reduce_runs(
913            &[
914                (10, 20),
915                (30, 40),
916                (50, 60),
917                (50, 80),
918                (80, 90),
919                (80, 100),
920                (100, 110),
921            ],
922            &[
923                vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
924                vec![(50, 60), (80, 90)],
925            ],
926            &[(50, 80), (80, 100), (50, 60), (80, 90)],
927        );
928
929        // [0..10]
930        // [0...11]
931        // [0....12]
932        // [0.....13]
933        check_reduce_runs(
934            &[(0, 10), (0, 11), (0, 12), (0, 13)],
935            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
936            &[(0, 10), (0, 11)],
937        );
938    }
939
940    #[test]
941    fn test_find_overlapping_items() {
942        let mut result = Vec::new();
943
944        // Test empty inputs
945        find_overlapping_items(
946            &mut SortedRun::from(Vec::<MockFile>::new()),
947            &mut SortedRun::from(Vec::<MockFile>::new()),
948            &mut result,
949        );
950        assert_eq!(result, Vec::<MockFile>::new());
951
952        let files1 = build_items(&[(1, 3)]);
953        find_overlapping_items(
954            &mut SortedRun::from(files1.clone()),
955            &mut SortedRun::from(Vec::<MockFile>::new()),
956            &mut result,
957        );
958        assert_eq!(result, Vec::<MockFile>::new());
959
960        find_overlapping_items(
961            &mut SortedRun::from(Vec::<MockFile>::new()),
962            &mut SortedRun::from(files1.clone()),
963            &mut result,
964        );
965        assert_eq!(result, Vec::<MockFile>::new());
966
967        // Test non-overlapping ranges
968        let files1 = build_items(&[(1, 3), (5, 7)]);
969        let files2 = build_items(&[(10, 12), (15, 20)]);
970        find_overlapping_items(
971            &mut SortedRun::from(files1),
972            &mut SortedRun::from(files2),
973            &mut result,
974        );
975        assert_eq!(result, Vec::<MockFile>::new());
976
977        // Test simple overlap
978        let files1 = build_items(&[(1, 5)]);
979        let files2 = build_items(&[(3, 7)]);
980        find_overlapping_items(
981            &mut SortedRun::from(files1),
982            &mut SortedRun::from(files2),
983            &mut result,
984        );
985        assert_eq!(result.len(), 2);
986        assert_eq!(result[0].range(), (1, 5));
987        assert_eq!(result[1].range(), (3, 7));
988
989        // Test multiple overlaps
990        let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
991        let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
992        find_overlapping_items(
993            &mut SortedRun::from(files1),
994            &mut SortedRun::from(files2),
995            &mut result,
996        );
997        assert_eq!(result.len(), 6);
998
999        // Test boundary cases (touching but not overlapping)
1000        let files1 = build_items(&[(1, 5)]);
1001        let files2 = build_items(&[(5, 10)]); // Touching at 5
1002        find_overlapping_items(
1003            &mut SortedRun::from(files1),
1004            &mut SortedRun::from(files2),
1005            &mut result,
1006        );
1007        assert_eq!(result.len(), 2); // Should overlap since ranges are inclusive
1008
1009        // Test completely contained ranges
1010        let files1 = build_items(&[(1, 10)]);
1011        let files2 = build_items(&[(3, 7)]);
1012        find_overlapping_items(
1013            &mut SortedRun::from(files1),
1014            &mut SortedRun::from(files2),
1015            &mut result,
1016        );
1017        assert_eq!(result.len(), 2);
1018
1019        // Test identical ranges
1020        let files1 = build_items(&[(1, 5)]);
1021        let files2 = build_items(&[(1, 5)]);
1022        find_overlapping_items(
1023            &mut SortedRun::from(files1),
1024            &mut SortedRun::from(files2),
1025            &mut result,
1026        );
1027        assert_eq!(result.len(), 2);
1028
1029        // Test unsorted input handling
1030        let files1 = build_items(&[(5, 10), (1, 3)]); // Unsorted
1031        let files2 = build_items(&[(2, 7), (8, 12)]); // Unsorted
1032        find_overlapping_items(
1033            &mut SortedRun::from(files1),
1034            &mut SortedRun::from(files2),
1035            &mut result,
1036        );
1037        assert_eq!(result.len(), 4); // Should find both overlaps
1038    }
1039
1040    #[test]
1041    fn test_file_group_overlap_time_overlap_pk_disjoint() {
1042        let lhs =
1043            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1044                FileId::random(),
1045                0,
1046                100,
1047                0,
1048                1,
1049                10,
1050                pk_range(b"a", b"f"),
1051            ));
1052        let rhs =
1053            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1054                FileId::random(),
1055                50,
1056                150,
1057                0,
1058                2,
1059                10,
1060                pk_range(b"x", b"z"),
1061            ));
1062
1063        assert!(!lhs.overlap(&rhs));
1064    }
1065
1066    #[test]
1067    fn test_find_sorted_runs_collapses_pk_disjoint_files_into_one_run() {
1068        let mut files = vec![
1069            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1070                FileId::random(),
1071                0,
1072                100,
1073                0,
1074                1,
1075                10,
1076                pk_range(b"a", b"f"),
1077            )),
1078            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1079                FileId::random(),
1080                50,
1081                150,
1082                0,
1083                2,
1084                10,
1085                pk_range(b"x", b"z"),
1086            )),
1087        ];
1088
1089        let runs = find_sorted_runs(&mut files);
1090
1091        assert_eq!(1, runs.len());
1092        assert_eq!(2, runs[0].items().len());
1093    }
1094
1095    #[test]
1096    fn test_find_sorted_runs_handles_2d_transitivity_break() {
1097        let mut files = vec![
1098            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1099                FileId::random(),
1100                0,
1101                100,
1102                0,
1103                1,
1104                10,
1105                pk_range(b"a", b"f"),
1106            )),
1107            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1108                FileId::random(),
1109                50,
1110                150,
1111                0,
1112                2,
1113                10,
1114                pk_range(b"x", b"z"),
1115            )),
1116            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1117                FileId::random(),
1118                50,
1119                150,
1120                0,
1121                3,
1122                10,
1123                pk_range(b"a", b"f"),
1124            )),
1125        ];
1126
1127        let runs = find_sorted_runs(&mut files);
1128
1129        assert_eq!(2, runs.len());
1130        assert_eq!(2, runs[0].items().len());
1131        assert_eq!(1, runs[1].items().len());
1132    }
1133
1134    #[test]
1135    fn test_find_overlapping_items_skips_pk_disjoint_pairs() {
1136        let mut left = SortedRun::from(vec![FileGroup::new_with_file(
1137            new_file_handle_with_size_sequence_and_primary_key_range(
1138                FileId::random(),
1139                0,
1140                100,
1141                0,
1142                1,
1143                10,
1144                pk_range(b"a", b"f"),
1145            ),
1146        )]);
1147        let mut right = SortedRun::from(vec![FileGroup::new_with_file(
1148            new_file_handle_with_size_sequence_and_primary_key_range(
1149                FileId::random(),
1150                50,
1151                150,
1152                0,
1153                2,
1154                10,
1155                pk_range(b"x", b"z"),
1156            ),
1157        )]);
1158        let mut result = Vec::new();
1159
1160        find_overlapping_items(&mut left, &mut right, &mut result);
1161
1162        assert!(result.is_empty());
1163    }
1164
1165    #[test]
1166    fn test_file_group_touching_time_boundary_with_same_pk_is_not_overlap() {
1167        let lhs =
1168            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1169                FileId::random(),
1170                0,
1171                100,
1172                0,
1173                1,
1174                10,
1175                pk_range(b"a", b"f"),
1176            ));
1177        let rhs =
1178            FileGroup::new_with_file(new_file_handle_with_size_sequence_and_primary_key_range(
1179                FileId::random(),
1180                100,
1181                150,
1182                0,
1183                2,
1184                10,
1185                pk_range(b"a", b"f"),
1186            ));
1187
1188        assert!(!lhs.overlap(&rhs));
1189    }
1190
1191    #[test]
1192    fn test_merge_seq_files() {
1193        // Test empty input
1194        let files = Vec::<MockFile>::new();
1195        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
1196
1197        // Test single file input (should return empty vec as no merge needed)
1198        let files = build_items(&[(1, 5)]);
1199        assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
1200
1201        // Test the example case: [10, 1, 1, 1] - should merge the last three files
1202        let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
1203        let result = merge_seq_files(&files, None);
1204        assert_eq!(result.len(), 3);
1205        assert_eq!(result[0].size, 1);
1206        assert_eq!(result[1].size, 1);
1207        assert_eq!(result[2].size, 1);
1208
1209        // Test with files of equal size - should merge as many as possible
1210        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
1211        let result = merge_seq_files(&files, Some(20));
1212        assert_eq!(result.len(), 4); // Should merge all 4 files as total size is 20
1213
1214        // Test with max_file_size constraint
1215        let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
1216        let result = merge_seq_files(&files, Some(10));
1217        assert_eq!(result.len(), 2); // Should merge only 2 files as max size is 10
1218
1219        // Test with uneven file sizes - should prioritize reducing file count
1220        let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
1221        let result = merge_seq_files(&files, Some(10));
1222        assert_eq!(result.len(), 3); // Should merge the first 3 files (total size 9)
1223
1224        // Test amplification factor prioritization
1225        // Two possible merges: [5, 5] (amp factor 0.5) vs [10, 1, 1] (amp factor 0.83)
1226        let files =
1227            build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
1228        let result = merge_seq_files(&files, Some(12));
1229        assert_eq!(result.len(), 2);
1230        assert_eq!(result[0].size, 5);
1231        assert_eq!(result[1].size, 5);
1232
1233        // Test with large file preventing merges
1234        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
1235        let result = merge_seq_files(&files, Some(10));
1236        assert_eq!(result.len(), 3); // Should merge the last 3 small files
1237        assert_eq!(result[0].size, 1);
1238        assert_eq!(result[1].size, 1);
1239        assert_eq!(result[2].size, 1);
1240
1241        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1242        let result = merge_seq_files(&files, Some(200));
1243        assert_eq!(result.len(), 4);
1244
1245        let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1246        let result = merge_seq_files(&files, None);
1247        assert_eq!(result.len(), 3);
1248        assert_eq!(result[0].size, 20);
1249        assert_eq!(result[1].size, 20);
1250        assert_eq!(result[2].size, 20);
1251
1252        let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
1253        let result = merge_seq_files(&files, Some(200));
1254        assert_eq!(result.len(), 2);
1255        assert_eq!(result[0].size, 100);
1256        assert_eq!(result[1].size, 1);
1257
1258        let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
1259        let result = merge_seq_files(&files, Some(40));
1260        assert_eq!(result.len(), 2);
1261        assert_eq!(result[0].start, 1);
1262        assert_eq!(result[1].start, 3);
1263    }
1264}