mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18
19use common_telemetry::{info, trace};
20use common_time::timestamp::TimeUnit;
21use common_time::timestamp_millis::BucketAligned;
22use common_time::Timestamp;
23
24use crate::compaction::buckets::infer_time_bucket;
25use crate::compaction::compactor::CompactionRegion;
26use crate::compaction::picker::{Picker, PickerOutput};
27use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
28use crate::compaction::{get_expired_ssts, CompactionOutput};
29use crate::sst::file::{overlaps, FileHandle, Level};
30use crate::sst::version::LevelMeta;
31
32const LEVEL_COMPACTED: Level = 1;
33
34/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
35/// candidates.
36#[derive(Debug)]
37pub struct TwcsPicker {
38    /// Max allowed sorted runs in active window.
39    pub max_active_window_runs: usize,
40    /// Max allowed files in active window.
41    pub max_active_window_files: usize,
42    /// Max allowed sorted runs in inactive windows.
43    pub max_inactive_window_runs: usize,
44    /// Max allowed files in inactive windows.
45    pub max_inactive_window_files: usize,
46    /// Compaction time window in seconds.
47    pub time_window_seconds: Option<i64>,
48    /// Max allowed compaction output file size.
49    pub max_output_file_size: Option<u64>,
50    /// Whether the target region is in append mode.
51    pub append_mode: bool,
52}
53
54impl TwcsPicker {
55    /// Builds compaction output from files.
56    /// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
57    /// fragmentation. For other windows, we allow at most 1 file at each window.
58    fn build_output(
59        &self,
60        time_windows: &mut BTreeMap<i64, Window>,
61        active_window: Option<i64>,
62    ) -> Vec<CompactionOutput> {
63        let mut output = vec![];
64        for (window, files) in time_windows {
65            let sorted_runs = find_sorted_runs(&mut files.files);
66
67            let (max_runs, max_files) = if let Some(active_window) = active_window
68                && *window == active_window
69            {
70                (self.max_active_window_runs, self.max_active_window_files)
71            } else {
72                (
73                    self.max_inactive_window_runs,
74                    self.max_inactive_window_files,
75                )
76            };
77
78            let found_runs = sorted_runs.len();
79            // We only remove deletion markers once no file in current window overlaps with any other window
80            // and region is not in append mode.
81            let filter_deleted =
82                !files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
83
84            let inputs = if found_runs > max_runs {
85                let files_to_compact = reduce_runs(sorted_runs, max_runs);
86                let files_to_compact_len = files_to_compact.len();
87                info!(
88                    "Building compaction output, active window: {:?}, \
89                        current window: {}, \
90                        max runs: {}, \
91                        found runs: {}, \
92                        output size: {}, \
93                        max output size: {:?}, \
94                        remove deletion markers: {}",
95                    active_window,
96                    *window,
97                    max_runs,
98                    found_runs,
99                    files_to_compact_len,
100                    self.max_output_file_size,
101                    filter_deleted
102                );
103                files_to_compact
104            } else if files.files.len() > max_files {
105                info!(
106                    "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
107                    *window,
108                    active_window,
109                    max_files,
110                    files.files.len(),
111                    self.max_output_file_size,
112                    filter_deleted,
113                );
114                // Files in window exceeds file num limit
115                vec![enforce_file_num(&files.files, max_files)]
116            } else {
117                trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
118                continue;
119            };
120
121            let split_inputs = if !filter_deleted
122                && let Some(max_output_file_size) = self.max_output_file_size
123            {
124                let len_before_split = inputs.len();
125                let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
126                if maybe_split.len() != len_before_split {
127                    info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
128                }
129                maybe_split
130            } else {
131                inputs
132            };
133
134            for input in split_inputs {
135                debug_assert!(input.len() > 1);
136                output.push(CompactionOutput {
137                    output_level: LEVEL_COMPACTED, // always compact to l1
138                    inputs: input,
139                    filter_deleted,
140                    output_time_range: None, // we do not enforce output time range in twcs compactions.
141                });
142            }
143        }
144        output
145    }
146}
147
148/// Limits the size of compaction output in a naive manner.
149/// todo(hl): we can find the output file size more precisely by checking the time range
150/// of each row group and adding the sizes of those non-overlapping row groups. But now
151/// we'd better not to expose the SST details in this level.
152fn enforce_max_output_size(
153    inputs: Vec<Vec<FileHandle>>,
154    max_output_file_size: u64,
155) -> Vec<Vec<FileHandle>> {
156    inputs
157        .into_iter()
158        .flat_map(|input| {
159            debug_assert!(input.len() > 1);
160            let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
161            if estimated_output_size < max_output_file_size {
162                // total file size does not exceed the threshold, just return the original input.
163                return vec![input];
164            }
165            let mut splits = vec![];
166            let mut new_input = vec![];
167            let mut new_input_size = 0;
168            for f in input {
169                if new_input_size + f.size() > max_output_file_size {
170                    splits.push(std::mem::take(&mut new_input));
171                    new_input_size = 0;
172                }
173                new_input_size += f.size();
174                new_input.push(f);
175            }
176            if !new_input.is_empty() {
177                splits.push(new_input);
178            }
179            splits
180        })
181        .filter(|p| p.len() > 1)
182        .collect()
183}
184
185/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
186/// the solution with minimum overhead according to files sizes to be merged.
187/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
188/// `runs` must be sorted according to time ranges.
189fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
190    debug_assert!(files.len() > max_file_num);
191    let to_merge = files.len() - max_file_num + 1;
192    let mut min_penalty = usize::MAX;
193    let mut min_idx = 0;
194
195    for idx in 0..=(files.len() - to_merge) {
196        let current_penalty: usize = files
197            .iter()
198            .skip(idx)
199            .take(to_merge)
200            .map(|f| f.size())
201            .sum();
202        if current_penalty < min_penalty {
203            min_penalty = current_penalty;
204            min_idx = idx;
205        }
206    }
207    files.iter().skip(min_idx).take(to_merge).cloned().collect()
208}
209
210impl Picker for TwcsPicker {
211    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
212        let region_id = compaction_region.region_id;
213        let levels = compaction_region.current_version.ssts.levels();
214
215        let expired_ssts =
216            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
217        if !expired_ssts.is_empty() {
218            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
219            // here we mark expired SSTs as compacting to avoid them being picked.
220            expired_ssts.iter().for_each(|f| f.set_compacting(true));
221        }
222
223        let compaction_time_window = compaction_region
224            .current_version
225            .compaction_time_window
226            .map(|window| window.as_secs() as i64);
227        let time_window_size = compaction_time_window
228            .or(self.time_window_seconds)
229            .unwrap_or_else(|| {
230                let inferred = infer_time_bucket(levels[0].files());
231                info!(
232                    "Compaction window for region {} is not present, inferring from files: {:?}",
233                    region_id, inferred
234                );
235                inferred
236            });
237
238        // Find active window from files in level 0.
239        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
240        // Assign files to windows
241        let mut windows =
242            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
243        let outputs = self.build_output(&mut windows, active_window);
244
245        if outputs.is_empty() && expired_ssts.is_empty() {
246            return None;
247        }
248
249        Some(PickerOutput {
250            outputs,
251            expired_ssts,
252            time_window_size,
253        })
254    }
255}
256
257struct Window {
258    start: Timestamp,
259    end: Timestamp,
260    files: Vec<FileHandle>,
261    time_window: i64,
262    overlapping: bool,
263}
264
265impl Window {
266    /// Creates a new [Window] with given file.
267    fn new_with_file(file: FileHandle) -> Self {
268        let (start, end) = file.time_range();
269        Self {
270            start,
271            end,
272            files: vec![file],
273            time_window: 0,
274            overlapping: false,
275        }
276    }
277
278    /// Returns the time range of all files in current window (inclusive).
279    fn range(&self) -> (Timestamp, Timestamp) {
280        (self.start, self.end)
281    }
282
283    /// Adds a new file to window and updates time range.
284    fn add_file(&mut self, file: FileHandle) {
285        let (start, end) = file.time_range();
286        self.start = self.start.min(start);
287        self.end = self.end.max(end);
288        self.files.push(file);
289    }
290}
291
292/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
293fn assign_to_windows<'a>(
294    files: impl Iterator<Item = &'a FileHandle>,
295    time_window_size: i64,
296) -> BTreeMap<i64, Window> {
297    let mut windows: HashMap<i64, Window> = HashMap::new();
298    // Iterates all files and assign to time windows according to max timestamp
299    for f in files {
300        if f.compacting() {
301            continue;
302        }
303        let (_, end) = f.time_range();
304        let time_window = end
305            .convert_to(TimeUnit::Second)
306            .unwrap()
307            .value()
308            .align_to_ceil_by_bucket(time_window_size)
309            .unwrap_or(i64::MIN);
310
311        match windows.entry(time_window) {
312            Entry::Occupied(mut e) => {
313                e.get_mut().add_file(f.clone());
314            }
315            Entry::Vacant(e) => {
316                let mut window = Window::new_with_file(f.clone());
317                window.time_window = time_window;
318                e.insert(window);
319            }
320        }
321    }
322    if windows.is_empty() {
323        return BTreeMap::new();
324    }
325
326    let mut windows = windows.into_values().collect::<Vec<_>>();
327    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
328
329    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
330
331    for idx in 1..windows.len() {
332        let next_range = windows[idx].range();
333        if overlaps(&current_range, &next_range) {
334            windows[idx - 1].overlapping = true;
335            windows[idx].overlapping = true;
336        }
337        current_range = (
338            current_range.0.min(next_range.0),
339            current_range.1.max(next_range.1),
340        );
341    }
342
343    windows.into_iter().map(|w| (w.time_window, w)).collect()
344}
345
346/// Finds the latest active writing window among all files.
347/// Returns `None` when there are no files or all files are corrupted.
348fn find_latest_window_in_seconds<'a>(
349    files: impl Iterator<Item = &'a FileHandle>,
350    time_window_size: i64,
351) -> Option<i64> {
352    let mut latest_timestamp = None;
353    for f in files {
354        let (_, end) = f.time_range();
355        if let Some(latest) = latest_timestamp {
356            if end > latest {
357                latest_timestamp = Some(end);
358            }
359        } else {
360            latest_timestamp = Some(end);
361        }
362    }
363    latest_timestamp
364        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
365        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
366}
367
368#[cfg(test)]
369mod tests {
370    use std::collections::HashSet;
371    use std::sync::Arc;
372
373    use super::*;
374    use crate::compaction::test_util::{new_file_handle, new_file_handles};
375    use crate::sst::file::{FileId, FileMeta, Level};
376    use crate::test_util::NoopFilePurger;
377
378    #[test]
379    fn test_get_latest_window_in_seconds() {
380        assert_eq!(
381            Some(1),
382            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
383        );
384        assert_eq!(
385            Some(1),
386            find_latest_window_in_seconds(
387                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
388                1
389            )
390        );
391
392        assert_eq!(
393            Some(-9223372036854000),
394            find_latest_window_in_seconds(
395                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
396                3600,
397            )
398        );
399
400        assert_eq!(
401            (i64::MAX / 10000000 + 1) * 10000,
402            find_latest_window_in_seconds(
403                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
404                10000,
405            )
406            .unwrap()
407        );
408
409        assert_eq!(
410            Some((i64::MAX / 3600000 + 1) * 3600),
411            find_latest_window_in_seconds(
412                [
413                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
414                    new_file_handle(FileId::random(), 0, 1000, 0)
415                ]
416                .iter(),
417                3600
418            )
419        );
420    }
421
422    #[test]
423    fn test_assign_to_windows() {
424        let windows = assign_to_windows(
425            [
426                new_file_handle(FileId::random(), 0, 999, 0),
427                new_file_handle(FileId::random(), 0, 999, 0),
428                new_file_handle(FileId::random(), 0, 999, 0),
429                new_file_handle(FileId::random(), 0, 999, 0),
430                new_file_handle(FileId::random(), 0, 999, 0),
431            ]
432            .iter(),
433            3,
434        );
435        assert_eq!(5, windows.get(&0).unwrap().files.len());
436
437        let files = [FileId::random(); 3];
438        let windows = assign_to_windows(
439            [
440                new_file_handle(files[0], -2000, -3, 0),
441                new_file_handle(files[1], 0, 2999, 0),
442                new_file_handle(files[2], 50, 10001, 0),
443            ]
444            .iter(),
445            3,
446        );
447        assert_eq!(
448            files[0],
449            windows.get(&0).unwrap().files.first().unwrap().file_id()
450        );
451        assert_eq!(
452            files[1],
453            windows.get(&3).unwrap().files.first().unwrap().file_id()
454        );
455        assert_eq!(
456            files[2],
457            windows.get(&12).unwrap().files.first().unwrap().file_id()
458        );
459    }
460
461    #[test]
462    fn test_assign_compacting_to_windows() {
463        let files = [
464            new_file_handle(FileId::random(), 0, 999, 0),
465            new_file_handle(FileId::random(), 0, 999, 0),
466            new_file_handle(FileId::random(), 0, 999, 0),
467            new_file_handle(FileId::random(), 0, 999, 0),
468            new_file_handle(FileId::random(), 0, 999, 0),
469        ];
470        files[0].set_compacting(true);
471        files[2].set_compacting(true);
472        let windows = assign_to_windows(files.iter(), 3);
473        assert_eq!(3, windows.get(&0).unwrap().files.len());
474    }
475
476    /// (Window value, overlapping, files' time ranges in window)
477    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
478
479    fn check_assign_to_windows_with_overlapping(
480        file_time_ranges: &[(i64, i64)],
481        time_window: i64,
482        expected_files: &[ExpectedWindowSpec],
483    ) {
484        let files: Vec<_> = (0..file_time_ranges.len())
485            .map(|_| FileId::random())
486            .collect();
487
488        let file_handles = files
489            .iter()
490            .zip(file_time_ranges.iter())
491            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
492            .collect::<Vec<_>>();
493
494        let windows = assign_to_windows(file_handles.iter(), time_window);
495
496        for (expected_window, overlapping, window_files) in expected_files {
497            let actual_window = windows.get(expected_window).unwrap();
498            assert_eq!(*overlapping, actual_window.overlapping);
499            let mut file_ranges = actual_window
500                .files
501                .iter()
502                .map(|f| {
503                    let (s, e) = f.time_range();
504                    (s.value(), e.value())
505                })
506                .collect::<Vec<_>>();
507            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
508            assert_eq!(window_files, &file_ranges);
509        }
510    }
511
512    #[test]
513    fn test_assign_to_windows_with_overlapping() {
514        check_assign_to_windows_with_overlapping(
515            &[(0, 999), (1000, 1999), (2000, 2999)],
516            2,
517            &[
518                (0, false, vec![(0, 999)]),
519                (2, false, vec![(1000, 1999), (2000, 2999)]),
520            ],
521        );
522
523        check_assign_to_windows_with_overlapping(
524            &[(0, 1), (0, 999), (100, 2999)],
525            2,
526            &[
527                (0, true, vec![(0, 1), (0, 999)]),
528                (2, true, vec![(100, 2999)]),
529            ],
530        );
531
532        check_assign_to_windows_with_overlapping(
533            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
534            2,
535            &[
536                (0, false, vec![(0, 999)]),
537                (2, false, vec![(1000, 1999), (2000, 2999)]),
538                (4, false, vec![(3000, 3999)]),
539            ],
540        );
541
542        check_assign_to_windows_with_overlapping(
543            &[
544                (0, 999),
545                (1000, 1999),
546                (2000, 2999),
547                (3000, 3999),
548                (0, 3999),
549            ],
550            2,
551            &[
552                (0, true, vec![(0, 999)]),
553                (2, true, vec![(1000, 1999), (2000, 2999)]),
554                (4, true, vec![(0, 3999), (3000, 3999)]),
555            ],
556        );
557
558        check_assign_to_windows_with_overlapping(
559            &[
560                (0, 999),
561                (1000, 1999),
562                (2000, 2999),
563                (3000, 3999),
564                (1999, 3999),
565            ],
566            2,
567            &[
568                (0, false, vec![(0, 999)]),
569                (2, true, vec![(1000, 1999), (2000, 2999)]),
570                (4, true, vec![(1999, 3999), (3000, 3999)]),
571            ],
572        );
573
574        check_assign_to_windows_with_overlapping(
575            &[
576                (0, 999),     // window 0
577                (1000, 1999), // window 2
578                (2000, 2999), // window 2
579                (3000, 3999), // window 4
580                (2999, 3999), // window 4
581            ],
582            2,
583            &[
584                // window 2 overlaps with window 4
585                (0, false, vec![(0, 999)]),
586                (2, true, vec![(1000, 1999), (2000, 2999)]),
587                (4, true, vec![(2999, 3999), (3000, 3999)]),
588            ],
589        );
590
591        check_assign_to_windows_with_overlapping(
592            &[
593                (0, 999),     // window 0
594                (1000, 1999), // window 2
595                (2000, 2999), // window 2
596                (3000, 3999), // window 4
597                (0, 1000),    // // window 2
598            ],
599            2,
600            &[
601                // only window 0 overlaps with window 2.
602                (0, true, vec![(0, 999)]),
603                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
604                (4, false, vec![(3000, 3999)]),
605            ],
606        );
607    }
608
609    struct CompactionPickerTestCase {
610        window_size: i64,
611        input_files: Vec<FileHandle>,
612        expected_outputs: Vec<ExpectedOutput>,
613    }
614
615    impl CompactionPickerTestCase {
616        fn check(&self) {
617            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
618            let active_window =
619                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
620            let output = TwcsPicker {
621                max_active_window_runs: 4,
622                max_active_window_files: usize::MAX,
623                max_inactive_window_runs: 1,
624                max_inactive_window_files: usize::MAX,
625                time_window_seconds: None,
626                max_output_file_size: None,
627                append_mode: false,
628            }
629            .build_output(&mut windows, active_window);
630
631            let output = output
632                .iter()
633                .map(|o| {
634                    let input_file_ids =
635                        o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
636                    (input_file_ids, o.output_level)
637                })
638                .collect::<Vec<_>>();
639
640            let expected = self
641                .expected_outputs
642                .iter()
643                .map(|o| {
644                    let input_file_ids = o
645                        .input_files
646                        .iter()
647                        .map(|idx| self.input_files[*idx].file_id())
648                        .collect::<HashSet<_>>();
649                    (input_file_ids, o.output_level)
650                })
651                .collect::<Vec<_>>();
652            assert_eq!(expected, output);
653        }
654    }
655
656    struct ExpectedOutput {
657        input_files: Vec<usize>,
658        output_level: Level,
659    }
660
661    fn check_enforce_file_num(
662        input_files: &[(i64, i64, u64)],
663        max_file_num: usize,
664        files_to_merge: &[(i64, i64)],
665    ) {
666        let mut files = new_file_handles(input_files);
667        // ensure sorted
668        find_sorted_runs(&mut files);
669        let mut to_merge = enforce_file_num(&files, max_file_num);
670        to_merge.sort_unstable_by_key(|f| f.time_range().0);
671        assert_eq!(
672            files_to_merge.to_vec(),
673            to_merge
674                .iter()
675                .map(|f| {
676                    let (start, end) = f.time_range();
677                    (start.value(), end.value())
678                })
679                .collect::<Vec<_>>()
680        );
681    }
682
683    #[test]
684    fn test_enforce_file_num() {
685        check_enforce_file_num(
686            &[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
687            2,
688            &[(100, 200), (200, 400)],
689        );
690
691        check_enforce_file_num(
692            &[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
693            1,
694            &[(0, 300), (100, 200), (200, 400)],
695        );
696    }
697
698    #[test]
699    fn test_build_twcs_output() {
700        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
701
702        CompactionPickerTestCase {
703            window_size: 3,
704            input_files: [
705                new_file_handle(file_ids[0], -2000, -3, 0),
706                new_file_handle(file_ids[1], -3000, -100, 0),
707                new_file_handle(file_ids[2], 0, 2999, 0), //active windows
708                new_file_handle(file_ids[3], 50, 2998, 0), //active windows
709            ]
710            .to_vec(),
711            expected_outputs: vec![ExpectedOutput {
712                input_files: vec![0, 1],
713                output_level: 1,
714            }],
715        }
716        .check();
717
718        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
719        CompactionPickerTestCase {
720            window_size: 3,
721            input_files: [
722                new_file_handle(file_ids[0], -2000, -3, 0),
723                new_file_handle(file_ids[1], -3000, -100, 0),
724                new_file_handle(file_ids[2], 0, 2999, 0),
725                new_file_handle(file_ids[3], 50, 2998, 0),
726                new_file_handle(file_ids[4], 11, 2990, 0),
727                new_file_handle(file_ids[5], 50, 4998, 0),
728            ]
729            .to_vec(),
730            expected_outputs: vec![
731                ExpectedOutput {
732                    input_files: vec![0, 1],
733                    output_level: 1,
734                },
735                ExpectedOutput {
736                    input_files: vec![2, 3, 4],
737                    output_level: 1,
738                },
739            ],
740        }
741        .check();
742    }
743
744    fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
745        inputs
746            .iter()
747            .map(|(start, end, size)| {
748                FileHandle::new(
749                    FileMeta {
750                        region_id: Default::default(),
751                        file_id: Default::default(),
752                        time_range: (
753                            Timestamp::new_millisecond(*start),
754                            Timestamp::new_millisecond(*end),
755                        ),
756                        level: 0,
757                        file_size: *size,
758                        available_indexes: Default::default(),
759                        index_file_size: 0,
760                        num_rows: 0,
761                        num_row_groups: 0,
762                        sequence: None,
763                    },
764                    Arc::new(NoopFilePurger),
765                )
766            })
767            .collect()
768    }
769
770    #[test]
771    fn test_limit_output_size() {
772        let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
773        let runs = find_sorted_runs(&mut files);
774        assert_eq!(6, runs.len());
775        let files_to_merge = reduce_runs(runs, 2);
776
777        let enforced = enforce_max_output_size(files_to_merge, 2);
778        assert_eq!(2, enforced.len());
779        assert_eq!(2, enforced[0].len());
780        assert_eq!(2, enforced[1].len());
781    }
782
783    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
784}