mito2/compaction/
run.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This file contains code to find sorted runs in a set if ranged items and
16//! along with the best way to merge these items to satisfy the desired run count.
17
18use std::cmp::Ordering;
19
20use common_base::BitVec;
21use common_time::Timestamp;
22use itertools::Itertools;
23use smallvec::{smallvec, SmallVec};
24
25use crate::sst::file::FileHandle;
26
27/// Trait for any items with specific range.
28pub(crate) trait Ranged {
29    type BoundType: Ord + Copy;
30
31    /// Returns the inclusive range of item.
32    fn range(&self) -> (Self::BoundType, Self::BoundType);
33
34    fn overlap<T>(&self, other: &T) -> bool
35    where
36        T: Ranged<BoundType = Self::BoundType>,
37    {
38        let (lhs_start, lhs_end) = self.range();
39        let (rhs_start, rhs_end) = other.range();
40        match lhs_start.cmp(&rhs_start) {
41            Ordering::Less => lhs_end >= rhs_start,
42            Ordering::Equal => true,
43            Ordering::Greater => lhs_start <= rhs_end,
44        }
45    }
46}
47
48// Sorts ranges by start asc and end desc.
49fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
50    values.sort_unstable_by(|l, r| {
51        let (l_start, l_end) = l.range();
52        let (r_start, r_end) = r.range();
53        l_start.cmp(&r_start).then(r_end.cmp(&l_end))
54    });
55}
56
57/// Trait for items to merge.
58pub(crate) trait Item: Ranged + Clone {
59    /// Size is used to calculate the cost of merging items.
60    fn size(&self) -> usize;
61}
62
63impl Ranged for FileHandle {
64    type BoundType = Timestamp;
65
66    fn range(&self) -> (Self::BoundType, Self::BoundType) {
67        self.time_range()
68    }
69}
70
71impl Item for FileHandle {
72    fn size(&self) -> usize {
73        self.size() as usize
74    }
75}
76
77#[derive(Debug, Clone)]
78struct MergeItems<T: Item> {
79    items: SmallVec<[T; 4]>,
80    start: T::BoundType,
81    end: T::BoundType,
82    size: usize,
83}
84
85impl<T: Item> Ranged for MergeItems<T> {
86    type BoundType = T::BoundType;
87
88    fn range(&self) -> (Self::BoundType, Self::BoundType) {
89        (self.start, self.end)
90    }
91}
92
93impl<T: Item> MergeItems<T> {
94    /// Creates unmerged item from given value.
95    pub fn new_unmerged(val: T) -> Self {
96        let (start, end) = val.range();
97        let size = val.size();
98        Self {
99            items: smallvec![val],
100            start,
101            end,
102            size,
103        }
104    }
105
106    /// The range of current merge item
107    pub(crate) fn range(&self) -> (T::BoundType, T::BoundType) {
108        (self.start, self.end)
109    }
110
111    /// Merges current item with other item.
112    pub(crate) fn merge(self, other: Self) -> Self {
113        let start = self.start.min(other.start);
114        let end = self.end.max(other.end);
115        let size = self.size + other.size;
116
117        let mut items = SmallVec::with_capacity(self.items.len() + other.items.len());
118        items.extend(self.items);
119        items.extend(other.items);
120        Self {
121            start,
122            end,
123            size,
124            items,
125        }
126    }
127
128    /// Returns true if current item is merged from two items.
129    pub fn merged(&self) -> bool {
130        self.items.len() > 1
131    }
132}
133
134/// A set of files with non-overlapping time ranges.
135#[derive(Debug, Clone)]
136pub(crate) struct SortedRun<T: Item> {
137    /// Items to merge
138    items: Vec<MergeItems<T>>,
139    /// penalty is defined as the total size of merged items.
140    penalty: usize,
141    /// The lower bound of all items.
142    start: Option<T::BoundType>,
143    // The upper bound of all items.
144    end: Option<T::BoundType>,
145}
146
147impl<T> Default for SortedRun<T>
148where
149    T: Item,
150{
151    fn default() -> Self {
152        Self {
153            items: vec![],
154            penalty: 0,
155            start: None,
156            end: None,
157        }
158    }
159}
160
161impl<T> SortedRun<T>
162where
163    T: Item,
164{
165    fn push_item(&mut self, t: MergeItems<T>) {
166        let (file_start, file_end) = t.range();
167        if t.merged() {
168            self.penalty += t.size;
169        }
170        self.items.push(t);
171        self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
172        self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
173    }
174}
175
176/// Finds sorted runs in given items.
177pub(crate) fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
178where
179    T: Item,
180{
181    if items.is_empty() {
182        return vec![];
183    }
184    // sort files
185    sort_ranged_items(items);
186
187    let mut current_run = SortedRun::default();
188    let mut runs = vec![];
189
190    let mut selection = BitVec::repeat(false, items.len());
191    while !selection.all() {
192        // until all items are assigned to some sorted run.
193        for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
194            if *selected {
195                // item is already assigned.
196                continue;
197            }
198            let current_item = MergeItems::new_unmerged(item.clone());
199            match current_run.items.last() {
200                None => {
201                    // current run is empty, just add current_item
202                    selected.set(true);
203                    current_run.push_item(current_item);
204                }
205                Some(last) => {
206                    // the current item does not overlap with the last item in current run,
207                    // then it belongs to current run.
208                    if !last.overlap(&current_item) {
209                        // does not overlap, push to current run
210                        selected.set(true);
211                        current_run.push_item(current_item);
212                    }
213                }
214            }
215        }
216        // finished an iteration, we've found a new run.
217        runs.push(std::mem::take(&mut current_run));
218    }
219    runs
220}
221
222fn merge_all_runs<T: Item>(runs: Vec<SortedRun<T>>) -> SortedRun<T> {
223    assert!(!runs.is_empty());
224    let mut all_items = runs
225        .into_iter()
226        .flat_map(|r| r.items.into_iter())
227        .collect::<Vec<_>>();
228
229    sort_ranged_items(&mut all_items);
230
231    let mut res = SortedRun::default();
232    let mut iter = all_items.into_iter();
233    // safety: all_items is not empty
234    let mut current_item = iter.next().unwrap();
235
236    for item in iter {
237        if current_item.overlap(&item) {
238            current_item = current_item.merge(item);
239        } else {
240            res.push_item(current_item);
241            current_item = item;
242        }
243    }
244    res.push_item(current_item);
245    res
246}
247
248/// Reduces the num of runs to given target and returns items to merge.
249/// The time complexity of this function is `C_{k}_{runs.len()}` where k=`runs.len()`-target+1.
250pub(crate) fn reduce_runs<T: Item>(runs: Vec<SortedRun<T>>, target: usize) -> Vec<Vec<T>> {
251    assert_ne!(target, 0);
252    if target >= runs.len() {
253        // already satisfied.
254        return vec![];
255    }
256
257    let k = runs.len() + 1 - target;
258    runs.into_iter()
259        .combinations(k) // find all possible solutions
260        .map(|runs_to_merge| merge_all_runs(runs_to_merge)) // calculate merge penalty
261        .min_by(|p, r| p.penalty.cmp(&r.penalty)) // find solution with the min penalty
262        .unwrap() // safety: their must be at least one solution.
263        .items
264        .into_iter()
265        .filter(|m| m.merged()) // find all files to merge in that solution
266        .map(|m| m.items.to_vec())
267        .collect()
268}
269
270#[cfg(test)]
271mod tests {
272    use std::collections::HashSet;
273
274    use super::*;
275
276    #[derive(Clone, Debug)]
277    struct MockFile {
278        start: i64,
279        end: i64,
280        size: usize,
281    }
282
283    impl Ranged for MockFile {
284        type BoundType = i64;
285
286        fn range(&self) -> (Self::BoundType, Self::BoundType) {
287            (self.start, self.end)
288        }
289    }
290
291    impl Item for MockFile {
292        fn size(&self) -> usize {
293            self.size
294        }
295    }
296
297    fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
298        ranges
299            .iter()
300            .map(|(start, end)| MockFile {
301                start: *start,
302                end: *end,
303                size: (*end - *start) as usize,
304            })
305            .collect()
306    }
307
308    fn check_sorted_runs(
309        ranges: &[(i64, i64)],
310        expected_runs: &[Vec<(i64, i64)>],
311    ) -> Vec<SortedRun<MockFile>> {
312        let mut files = build_items(ranges);
313        let runs = find_sorted_runs(&mut files);
314
315        let result_file_ranges: Vec<Vec<_>> = runs
316            .iter()
317            .map(|r| r.items.iter().map(|f| f.range()).collect())
318            .collect();
319        assert_eq!(&expected_runs, &result_file_ranges);
320        runs
321    }
322
323    #[test]
324    fn test_find_sorted_runs() {
325        check_sorted_runs(&[], &[]);
326        check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
327        check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
328        check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2)], vec![(2, 3)]]);
329        check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
330        check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
331        check_sorted_runs(
332            &[(1, 3), (2, 4), (4, 5)],
333            &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
334        );
335
336        check_sorted_runs(
337            &[(1, 2), (3, 4), (3, 5)],
338            &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
339        );
340
341        check_sorted_runs(
342            &[(1, 3), (2, 4), (5, 6)],
343            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
344        );
345
346        check_sorted_runs(
347            &[(1, 2), (3, 5), (4, 6)],
348            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
349        );
350
351        check_sorted_runs(
352            &[(1, 2), (3, 4), (4, 6), (7, 8)],
353            &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
354        );
355        check_sorted_runs(
356            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
357            &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
358        );
359
360        check_sorted_runs(
361            &[(10, 19), (20, 21), (20, 29), (30, 39)],
362            &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
363        );
364
365        check_sorted_runs(
366            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
367            &[
368                vec![(10, 19), (20, 29), (30, 39)],
369                vec![(21, 22), (31, 32)],
370                vec![(32, 42)],
371            ],
372        );
373    }
374
375    fn check_merge_sorted_runs(
376        items: &[(i64, i64)],
377        expected_penalty: usize,
378        expected: &[Vec<(i64, i64)>],
379    ) {
380        let mut items = build_items(items);
381        let runs = find_sorted_runs(&mut items);
382        assert_eq!(2, runs.len());
383        let res = merge_all_runs(runs);
384        let penalty = res.penalty;
385        let ranges = res
386            .items
387            .into_iter()
388            .map(|i| {
389                i.items
390                    .into_iter()
391                    .map(|f| (f.start, f.end))
392                    .sorted_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)))
393                    .collect::<Vec<_>>()
394            })
395            .collect::<Vec<_>>();
396        assert_eq!(expected, &ranges);
397        assert_eq!(expected_penalty, penalty);
398    }
399
400    #[test]
401    fn test_merge_sorted_runs() {
402        // [1..2]
403        // [1...3]
404        check_merge_sorted_runs(&[(1, 2), (1, 3)], 3, &[vec![(1, 2), (1, 3)]]);
405
406        // [1..2][3..4]
407        //    [2..3]
408        check_merge_sorted_runs(
409            &[(1, 2), (2, 3), (3, 4)],
410            3,
411            &[vec![(1, 2), (2, 3), (3, 4)]],
412        );
413
414        // [1..10][11..20][21...30]
415        //          [18]
416        check_merge_sorted_runs(
417            &[(1, 10), (11, 20), (21, 30), (18, 18)],
418            9,
419            &[vec![(1, 10)], vec![(11, 20), (18, 18)], vec![(21, 30)]],
420        );
421
422        // [1..3][4..5]
423        //   [2...4]
424        check_merge_sorted_runs(
425            &[(1, 3), (2, 4), (4, 5)],
426            5,
427            &[vec![(1, 3), (2, 4), (4, 5)]],
428        );
429
430        // [1..2][3..4]    [7..8]
431        //          [4..6]
432        check_merge_sorted_runs(
433            &[(1, 2), (3, 4), (4, 6), (7, 8)],
434            3,
435            &[vec![(1, 2)], vec![(3, 4), (4, 6)], vec![(7, 8)]],
436        );
437
438        // [1..2][3..4][5..6][7..8]
439        //       [3........6]   [8..9]
440        //
441        check_merge_sorted_runs(
442            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
443            7,
444            &[
445                vec![(1, 2)],
446                vec![(3, 4), (3, 6), (5, 6)],
447                vec![(7, 8), (8, 9)],
448            ],
449        );
450
451        // [10.....19][20........29][30........39]
452        //              [21..22]     [31..32]
453        check_merge_sorted_runs(
454            &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32)],
455            20,
456            &[
457                vec![(10, 19)],
458                vec![(20, 29), (21, 22)],
459                vec![(30, 39), (31, 32)],
460            ],
461        );
462
463        // [1..10][11..20][21..30]
464        // [1..10]        [21..30]
465        check_merge_sorted_runs(
466            &[(1, 10), (1, 10), (11, 20), (21, 30), (21, 30)],
467            36,
468            &[
469                vec![(1, 10), (1, 10)],
470                vec![(11, 20)],
471                vec![(21, 30), (21, 30)],
472            ],
473        );
474
475        // [1..10][11..20][21...30]
476        //                 [22..30]
477        check_merge_sorted_runs(
478            &[(1, 10), (11, 20), (21, 30), (22, 30)],
479            17,
480            &[vec![(1, 10)], vec![(11, 20)], vec![(21, 30), (22, 30)]],
481        );
482    }
483
484    /// files: file arrangement with two sorted runs.
485    fn check_merge_all_sorted_runs(
486        files: &[(i64, i64)],
487        expected_penalty: usize,
488        expected: &[Vec<(i64, i64)>],
489    ) {
490        let mut files = build_items(files);
491        let runs = find_sorted_runs(&mut files);
492        let result = merge_all_runs(runs);
493        assert_eq!(expected_penalty, result.penalty);
494        assert_eq!(expected.len(), result.items.len());
495        let res = result
496            .items
497            .iter()
498            .map(|i| {
499                let mut res = i.items.iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
500                res.sort_unstable_by(|l, r| l.0.cmp(&r.0));
501                res
502            })
503            .collect::<Vec<_>>();
504        assert_eq!(expected, &res);
505    }
506
507    #[test]
508    fn test_merge_all_sorted_runs() {
509        // [1..2][3..4]
510        //          [4..10]
511        check_merge_all_sorted_runs(
512            &[(1, 2), (3, 4), (4, 10)],
513            7, // 1+6
514            &[vec![(1, 2)], vec![(3, 4), (4, 10)]],
515        );
516
517        // [1..2] [3..4] [5..6]
518        //           [4..........10]
519        check_merge_all_sorted_runs(
520            &[(1, 2), (3, 4), (5, 6), (4, 10)],
521            8, // 1+1+6
522            &[vec![(1, 2)], vec![(3, 4), (4, 10), (5, 6)]],
523        );
524
525        // [10..20] [30..40] [50....60]
526        //             [35........55]
527        //                     [51..61]
528        check_merge_all_sorted_runs(
529            &[(10, 20), (30, 40), (50, 60), (35, 55), (51, 61)],
530            50,
531            &[vec![(10, 20)], vec![(30, 40), (35, 55), (50, 60), (51, 61)]],
532        );
533    }
534
535    #[test]
536    fn test_sorted_runs_time_range() {
537        let mut files = build_items(&[(1, 2), (3, 4), (4, 10)]);
538        let runs = find_sorted_runs(&mut files);
539        assert_eq!(2, runs.len());
540        let SortedRun { start, end, .. } = &runs[0];
541        assert_eq!(Some(1), *start);
542        assert_eq!(Some(4), *end);
543
544        let SortedRun { start, end, .. } = &runs[1];
545        assert_eq!(Some(4), *start);
546        assert_eq!(Some(10), *end);
547    }
548
549    fn check_reduce_runs(
550        files: &[(i64, i64)],
551        expected_runs: &[Vec<(i64, i64)>],
552        target: usize,
553        expected: &[Vec<(i64, i64)>],
554    ) {
555        let runs = check_sorted_runs(files, expected_runs);
556        let files_to_merge = reduce_runs(runs, target);
557        let file_timestamps = files_to_merge
558            .into_iter()
559            .map(|f| {
560                let mut overlapping = f.into_iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
561                overlapping.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
562                overlapping
563            })
564            .collect::<HashSet<_>>();
565
566        let expected = expected.iter().cloned().collect::<HashSet<_>>();
567        assert_eq!(&expected, &file_timestamps);
568    }
569
570    #[test]
571    fn test_reduce_runs() {
572        // [1..3]   [5..6]
573        //   [2..4]
574        check_reduce_runs(
575            &[(1, 3), (2, 4), (5, 6)],
576            &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
577            1,
578            &[vec![(1, 3), (2, 4)]],
579        );
580
581        // [1..2][3..5]
582        //         [4..6]
583        check_reduce_runs(
584            &[(1, 2), (3, 5), (4, 6)],
585            &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
586            1,
587            &[vec![(3, 5), (4, 6)]],
588        );
589
590        // [1..4]
591        //  [2..5]
592        //   [3..6]
593        check_reduce_runs(
594            &[(1, 4), (2, 5), (3, 6)],
595            &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
596            1,
597            &[vec![(1, 4), (2, 5), (3, 6)]],
598        );
599        check_reduce_runs(
600            &[(1, 4), (2, 5), (3, 6)],
601            &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
602            2,
603            &[vec![(1, 4), (2, 5)]],
604        );
605
606        // [1..2][3..4]    [7..8]
607        //          [4..6]
608        check_reduce_runs(
609            &[(1, 2), (3, 4), (4, 6), (7, 8)],
610            &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
611            1,
612            &[vec![(3, 4), (4, 6)]],
613        );
614
615        // [1..2][3........6][7..8]
616        //       [3..4][5..6]   [8..9]
617        check_reduce_runs(
618            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
619            &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
620            2,
621            &[], // already satisfied
622        );
623
624        // [1..2][3........6][7..8]
625        //       [3..4][5..6]   [8..9]
626        check_reduce_runs(
627            &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
628            &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
629            1,
630            &[vec![(3, 4), (3, 6), (5, 6)], vec![(7, 8), (8, 9)]],
631        );
632
633        // [10..20] [30..40] [50........80]  [100..110]
634        //                   [50..60]  [80...100]
635        //                             [80..90]
636        check_reduce_runs(
637            &[
638                (10, 20),
639                (30, 40),
640                (50, 60),
641                (50, 80),
642                (80, 90),
643                (80, 100),
644                (100, 110),
645            ],
646            &[
647                vec![(10, 20), (30, 40), (50, 80), (100, 110)],
648                vec![(50, 60), (80, 100)],
649                vec![(80, 90)],
650            ],
651            2,
652            &[vec![(80, 90), (80, 100)]],
653        );
654
655        // [10..20] [30..40] [50........80]     [100..110]
656        //                   [50..60]  [80.......100]
657        //                             [80..90]
658        check_reduce_runs(
659            &[
660                (10, 20),
661                (30, 40),
662                (50, 60),
663                (50, 80),
664                (80, 90),
665                (80, 100),
666                (100, 110),
667            ],
668            &[
669                vec![(10, 20), (30, 40), (50, 80), (100, 110)],
670                vec![(50, 60), (80, 100)],
671                vec![(80, 90)],
672            ],
673            1,
674            &[vec![(50, 60), (50, 80), (80, 90), (80, 100), (100, 110)]],
675        );
676
677        // [0..10]
678        // [0...11]
679        // [0....12]
680        // [0.....13]
681        check_reduce_runs(
682            &[(0, 10), (0, 11), (0, 12), (0, 13)],
683            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
684            4,
685            &[],
686        );
687        // enforce 3 runs
688        check_reduce_runs(
689            &[(0, 10), (0, 11), (0, 12), (0, 13)],
690            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
691            3,
692            &[vec![(0, 10), (0, 11)]],
693        );
694        // enforce 2 runs
695        check_reduce_runs(
696            &[(0, 10), (0, 11), (0, 12), (0, 13)],
697            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
698            2,
699            &[vec![(0, 10), (0, 11), (0, 12)]],
700        );
701        // enforce 1 run
702        check_reduce_runs(
703            &[(0, 10), (0, 11), (0, 12), (0, 13)],
704            &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
705            1,
706            &[vec![(0, 10), (0, 11), (0, 12), (0, 13)]],
707        );
708    }
709}