mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
40/// candidates.
41#[derive(Debug)]
42pub struct TwcsPicker {
43    /// Minimum file num to trigger a compaction.
44    pub trigger_file_num: usize,
45    /// Compaction time window in seconds.
46    pub time_window_seconds: Option<i64>,
47    /// Max allowed compaction output file size.
48    pub max_output_file_size: Option<u64>,
49    /// Whether the target region is in append mode.
50    pub append_mode: bool,
51}
52
53impl TwcsPicker {
54    /// Builds compaction output from files.
55    fn build_output(
56        &self,
57        region_id: RegionId,
58        time_windows: &mut BTreeMap<i64, Window>,
59        active_window: Option<i64>,
60    ) -> Vec<CompactionOutput> {
61        let mut output = vec![];
62        for (window, files) in time_windows {
63            if files.files.is_empty() {
64                continue;
65            }
66            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67            let sorted_runs = find_sorted_runs(&mut files_to_merge);
68            let found_runs = sorted_runs.len();
69            // We only remove deletion markers if we found less than 2 runs and not in append mode.
70            // because after compaction there will be no overlapping files.
71            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
72
73            let inputs = if found_runs > 1 {
74                reduce_runs(sorted_runs)
75            } else {
76                let run = sorted_runs.last().unwrap();
77                if run.items().len() < self.trigger_file_num {
78                    continue;
79                }
80                // no overlapping files, try merge small files
81                merge_seq_files(run.items(), self.max_output_file_size)
82            };
83
84            if !inputs.is_empty() {
85                log_pick_result(
86                    region_id,
87                    *window,
88                    active_window,
89                    found_runs,
90                    files.files.len(),
91                    self.max_output_file_size,
92                    filter_deleted,
93                    &inputs,
94                );
95                output.push(CompactionOutput {
96                    output_level: LEVEL_COMPACTED, // always compact to l1
97                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
98                    filter_deleted,
99                    output_time_range: None, // we do not enforce output time range in twcs compactions.
100                });
101            }
102        }
103        output
104    }
105}
106
107#[allow(clippy::too_many_arguments)]
108fn log_pick_result(
109    region_id: RegionId,
110    window: i64,
111    active_window: Option<i64>,
112    found_runs: usize,
113    file_num: usize,
114    max_output_file_size: Option<u64>,
115    filter_deleted: bool,
116    inputs: &[FileGroup],
117) {
118    let input_file_str: Vec<String> = inputs
119        .iter()
120        .map(|f| {
121            let range = f.range();
122            let start = range.0.to_iso8601_string();
123            let end = range.1.to_iso8601_string();
124            let num_rows = f.num_rows();
125            format!(
126                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
127                f.file_ids(),
128                start,
129                end,
130                ReadableSize(f.size() as u64),
131                num_rows
132            )
133        })
134        .collect();
135    let window_str = Timestamp::new_second(window).to_iso8601_string();
136    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
137    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
138    info!(
139        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
140            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
141            input files: {:?}",
142        region_id,
143        window_str,
144        active_window_str,
145        found_runs,
146        file_num,
147        max_output_file_size,
148        filter_deleted,
149        input_file_str
150    );
151}
152
153impl Picker for TwcsPicker {
154    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
155        let region_id = compaction_region.region_id;
156        let levels = compaction_region.current_version.ssts.levels();
157
158        let expired_ssts =
159            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
160        if !expired_ssts.is_empty() {
161            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
162            // here we mark expired SSTs as compacting to avoid them being picked.
163            expired_ssts.iter().for_each(|f| f.set_compacting(true));
164        }
165
166        let compaction_time_window = compaction_region
167            .current_version
168            .compaction_time_window
169            .map(|window| window.as_secs() as i64);
170        let time_window_size = compaction_time_window
171            .or(self.time_window_seconds)
172            .unwrap_or_else(|| {
173                let inferred = infer_time_bucket(levels[0].files());
174                info!(
175                    "Compaction window for region {} is not present, inferring from files: {:?}",
176                    region_id, inferred
177                );
178                inferred
179            });
180
181        // Find active window from files in level 0.
182        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
183        // Assign files to windows
184        let mut windows =
185            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
186        let outputs = self.build_output(region_id, &mut windows, active_window);
187
188        if outputs.is_empty() && expired_ssts.is_empty() {
189            return None;
190        }
191
192        let max_file_size = self.max_output_file_size.map(|v| v as usize);
193        Some(PickerOutput {
194            outputs,
195            expired_ssts,
196            time_window_size,
197            max_file_size,
198        })
199    }
200}
201
202struct Window {
203    start: Timestamp,
204    end: Timestamp,
205    // Mapping from file sequence to file groups. Files with the same sequence is considered
206    // created from the same compaction task.
207    files: HashMap<Option<NonZeroU64>, FileGroup>,
208    time_window: i64,
209    overlapping: bool,
210}
211
212impl Window {
213    /// Creates a new [Window] with given file.
214    fn new_with_file(file: FileHandle) -> Self {
215        let (start, end) = file.time_range();
216        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
217        Self {
218            start,
219            end,
220            files,
221            time_window: 0,
222            overlapping: false,
223        }
224    }
225
226    /// Returns the time range of all files in current window (inclusive).
227    fn range(&self) -> (Timestamp, Timestamp) {
228        (self.start, self.end)
229    }
230
231    /// Adds a new file to window and updates time range.
232    fn add_file(&mut self, file: FileHandle) {
233        let (start, end) = file.time_range();
234        self.start = self.start.min(start);
235        self.end = self.end.max(end);
236
237        match self.files.entry(file.meta_ref().sequence) {
238            Entry::Occupied(mut o) => {
239                o.get_mut().add_file(file);
240            }
241            Entry::Vacant(v) => {
242                v.insert(FileGroup::new_with_file(file));
243            }
244        }
245    }
246
247    fn files(&self) -> impl Iterator<Item = &FileGroup> {
248        self.files.values()
249    }
250}
251
252/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
253fn assign_to_windows<'a>(
254    files: impl Iterator<Item = &'a FileHandle>,
255    time_window_size: i64,
256) -> BTreeMap<i64, Window> {
257    let mut windows: HashMap<i64, Window> = HashMap::new();
258    // Iterates all files and assign to time windows according to max timestamp
259    for f in files {
260        if f.compacting() {
261            continue;
262        }
263        let (_, end) = f.time_range();
264        let time_window = end
265            .convert_to(TimeUnit::Second)
266            .unwrap()
267            .value()
268            .align_to_ceil_by_bucket(time_window_size)
269            .unwrap_or(i64::MIN);
270
271        match windows.entry(time_window) {
272            Entry::Occupied(mut e) => {
273                e.get_mut().add_file(f.clone());
274            }
275            Entry::Vacant(e) => {
276                let mut window = Window::new_with_file(f.clone());
277                window.time_window = time_window;
278                e.insert(window);
279            }
280        }
281    }
282    if windows.is_empty() {
283        return BTreeMap::new();
284    }
285
286    let mut windows = windows.into_values().collect::<Vec<_>>();
287    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
288
289    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
290
291    for idx in 1..windows.len() {
292        let next_range = windows[idx].range();
293        if overlaps(&current_range, &next_range) {
294            windows[idx - 1].overlapping = true;
295            windows[idx].overlapping = true;
296        }
297        current_range = (
298            current_range.0.min(next_range.0),
299            current_range.1.max(next_range.1),
300        );
301    }
302
303    windows.into_iter().map(|w| (w.time_window, w)).collect()
304}
305
306/// Finds the latest active writing window among all files.
307/// Returns `None` when there are no files or all files are corrupted.
308fn find_latest_window_in_seconds<'a>(
309    files: impl Iterator<Item = &'a FileHandle>,
310    time_window_size: i64,
311) -> Option<i64> {
312    let mut latest_timestamp = None;
313    for f in files {
314        let (_, end) = f.time_range();
315        if let Some(latest) = latest_timestamp {
316            if end > latest {
317                latest_timestamp = Some(end);
318            }
319        } else {
320            latest_timestamp = Some(end);
321        }
322    }
323    latest_timestamp
324        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
325        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::HashSet;
331
332    use super::*;
333    use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
334    use crate::sst::file::{FileId, Level};
335
336    #[test]
337    fn test_get_latest_window_in_seconds() {
338        assert_eq!(
339            Some(1),
340            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
341        );
342        assert_eq!(
343            Some(1),
344            find_latest_window_in_seconds(
345                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
346                1
347            )
348        );
349
350        assert_eq!(
351            Some(-9223372036854000),
352            find_latest_window_in_seconds(
353                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
354                3600,
355            )
356        );
357
358        assert_eq!(
359            (i64::MAX / 10000000 + 1) * 10000,
360            find_latest_window_in_seconds(
361                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
362                10000,
363            )
364            .unwrap()
365        );
366
367        assert_eq!(
368            Some((i64::MAX / 3600000 + 1) * 3600),
369            find_latest_window_in_seconds(
370                [
371                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
372                    new_file_handle(FileId::random(), 0, 1000, 0)
373                ]
374                .iter(),
375                3600
376            )
377        );
378    }
379
380    #[test]
381    fn test_assign_to_windows() {
382        let windows = assign_to_windows(
383            [
384                new_file_handle(FileId::random(), 0, 999, 0),
385                new_file_handle(FileId::random(), 0, 999, 0),
386                new_file_handle(FileId::random(), 0, 999, 0),
387                new_file_handle(FileId::random(), 0, 999, 0),
388                new_file_handle(FileId::random(), 0, 999, 0),
389            ]
390            .iter(),
391            3,
392        );
393        let fgs = &windows.get(&0).unwrap().files;
394        assert_eq!(1, fgs.len());
395        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
396
397        let files = [FileId::random(); 3];
398        let windows = assign_to_windows(
399            [
400                new_file_handle(files[0], -2000, -3, 0),
401                new_file_handle(files[1], 0, 2999, 0),
402                new_file_handle(files[2], 50, 10001, 0),
403            ]
404            .iter(),
405            3,
406        );
407        assert_eq!(
408            files[0],
409            windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
410        );
411        assert_eq!(
412            files[1],
413            windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
414        );
415        assert_eq!(
416            files[2],
417            windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
418        );
419    }
420
421    #[test]
422    fn test_assign_file_groups_to_windows() {
423        let files = [
424            FileId::random(),
425            FileId::random(),
426            FileId::random(),
427            FileId::random(),
428        ];
429        let windows = assign_to_windows(
430            [
431                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
432                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
433                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
434                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
435            ]
436            .iter(),
437            3,
438        );
439        assert_eq!(windows.len(), 1);
440        let fgs = &windows.get(&0).unwrap().files;
441        assert_eq!(2, fgs.len());
442        assert_eq!(
443            fgs.get(&NonZeroU64::new(1))
444                .unwrap()
445                .files()
446                .iter()
447                .map(|f| f.file_id())
448                .collect::<HashSet<_>>(),
449            [files[0], files[1]].into_iter().collect()
450        );
451        assert_eq!(
452            fgs.get(&NonZeroU64::new(2))
453                .unwrap()
454                .files()
455                .iter()
456                .map(|f| f.file_id())
457                .collect::<HashSet<_>>(),
458            [files[2], files[3]].into_iter().collect()
459        );
460    }
461
462    #[test]
463    fn test_assign_compacting_to_windows() {
464        let files = [
465            new_file_handle(FileId::random(), 0, 999, 0),
466            new_file_handle(FileId::random(), 0, 999, 0),
467            new_file_handle(FileId::random(), 0, 999, 0),
468            new_file_handle(FileId::random(), 0, 999, 0),
469            new_file_handle(FileId::random(), 0, 999, 0),
470        ];
471        files[0].set_compacting(true);
472        files[2].set_compacting(true);
473        let mut windows = assign_to_windows(files.iter(), 3);
474        let window0 = windows.remove(&0).unwrap();
475        assert_eq!(1, window0.files.len());
476        let candidates = window0
477            .files
478            .into_values()
479            .flat_map(|fg| fg.into_files())
480            .map(|f| f.file_id())
481            .collect::<HashSet<_>>();
482        assert_eq!(candidates.len(), 3);
483        assert_eq!(
484            candidates,
485            [files[1].file_id(), files[3].file_id(), files[4].file_id()]
486                .into_iter()
487                .collect::<HashSet<_>>()
488        );
489    }
490
491    /// (Window value, overlapping, files' time ranges in window)
492    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
493
494    fn check_assign_to_windows_with_overlapping(
495        file_time_ranges: &[(i64, i64)],
496        time_window: i64,
497        expected_files: &[ExpectedWindowSpec],
498    ) {
499        let files: Vec<_> = (0..file_time_ranges.len())
500            .map(|_| FileId::random())
501            .collect();
502
503        let file_handles = files
504            .iter()
505            .zip(file_time_ranges.iter())
506            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
507            .collect::<Vec<_>>();
508
509        let windows = assign_to_windows(file_handles.iter(), time_window);
510
511        for (expected_window, overlapping, window_files) in expected_files {
512            let actual_window = windows.get(expected_window).unwrap();
513            assert_eq!(*overlapping, actual_window.overlapping);
514            let mut file_ranges = actual_window
515                .files
516                .iter()
517                .flat_map(|(_, f)| {
518                    f.files().iter().map(|f| {
519                        let (s, e) = f.time_range();
520                        (s.value(), e.value())
521                    })
522                })
523                .collect::<Vec<_>>();
524            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
525            assert_eq!(window_files, &file_ranges);
526        }
527    }
528
529    #[test]
530    fn test_assign_to_windows_with_overlapping() {
531        check_assign_to_windows_with_overlapping(
532            &[(0, 999), (1000, 1999), (2000, 2999)],
533            2,
534            &[
535                (0, false, vec![(0, 999)]),
536                (2, false, vec![(1000, 1999), (2000, 2999)]),
537            ],
538        );
539
540        check_assign_to_windows_with_overlapping(
541            &[(0, 1), (0, 999), (100, 2999)],
542            2,
543            &[
544                (0, true, vec![(0, 1), (0, 999)]),
545                (2, true, vec![(100, 2999)]),
546            ],
547        );
548
549        check_assign_to_windows_with_overlapping(
550            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
551            2,
552            &[
553                (0, false, vec![(0, 999)]),
554                (2, false, vec![(1000, 1999), (2000, 2999)]),
555                (4, false, vec![(3000, 3999)]),
556            ],
557        );
558
559        check_assign_to_windows_with_overlapping(
560            &[
561                (0, 999),
562                (1000, 1999),
563                (2000, 2999),
564                (3000, 3999),
565                (0, 3999),
566            ],
567            2,
568            &[
569                (0, true, vec![(0, 999)]),
570                (2, true, vec![(1000, 1999), (2000, 2999)]),
571                (4, true, vec![(0, 3999), (3000, 3999)]),
572            ],
573        );
574
575        check_assign_to_windows_with_overlapping(
576            &[
577                (0, 999),
578                (1000, 1999),
579                (2000, 2999),
580                (3000, 3999),
581                (1999, 3999),
582            ],
583            2,
584            &[
585                (0, false, vec![(0, 999)]),
586                (2, true, vec![(1000, 1999), (2000, 2999)]),
587                (4, true, vec![(1999, 3999), (3000, 3999)]),
588            ],
589        );
590
591        check_assign_to_windows_with_overlapping(
592            &[
593                (0, 999),     // window 0
594                (1000, 1999), // window 2
595                (2000, 2999), // window 2
596                (3000, 3999), // window 4
597                (2999, 3999), // window 4
598            ],
599            2,
600            &[
601                // window 2 overlaps with window 4
602                (0, false, vec![(0, 999)]),
603                (2, true, vec![(1000, 1999), (2000, 2999)]),
604                (4, true, vec![(2999, 3999), (3000, 3999)]),
605            ],
606        );
607
608        check_assign_to_windows_with_overlapping(
609            &[
610                (0, 999),     // window 0
611                (1000, 1999), // window 2
612                (2000, 2999), // window 2
613                (3000, 3999), // window 4
614                (0, 1000),    // // window 2
615            ],
616            2,
617            &[
618                // only window 0 overlaps with window 2.
619                (0, true, vec![(0, 999)]),
620                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
621                (4, false, vec![(3000, 3999)]),
622            ],
623        );
624    }
625
626    struct CompactionPickerTestCase {
627        window_size: i64,
628        input_files: Vec<FileHandle>,
629        expected_outputs: Vec<ExpectedOutput>,
630    }
631
632    impl CompactionPickerTestCase {
633        fn check(&self) {
634            let file_id_to_idx = self
635                .input_files
636                .iter()
637                .enumerate()
638                .map(|(idx, file)| (file.file_id(), idx))
639                .collect::<HashMap<_, _>>();
640            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
641            let active_window =
642                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
643            let output = TwcsPicker {
644                trigger_file_num: 4,
645                time_window_seconds: None,
646                max_output_file_size: None,
647                append_mode: false,
648            }
649            .build_output(RegionId::from_u64(0), &mut windows, active_window);
650
651            let output = output
652                .iter()
653                .map(|o| {
654                    let input_file_ids = o
655                        .inputs
656                        .iter()
657                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
658                        .collect::<HashSet<_>>();
659                    (input_file_ids, o.output_level)
660                })
661                .collect::<Vec<_>>();
662
663            let expected = self
664                .expected_outputs
665                .iter()
666                .map(|o| {
667                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
668                    (input_file_ids, o.output_level)
669                })
670                .collect::<Vec<_>>();
671            assert_eq!(expected, output);
672        }
673    }
674
675    struct ExpectedOutput {
676        input_files: Vec<usize>,
677        output_level: Level,
678    }
679
680    #[test]
681    fn test_build_twcs_output() {
682        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
683
684        // Case 1: 2 runs found in each time window.
685        CompactionPickerTestCase {
686            window_size: 3,
687            input_files: [
688                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
689                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
690                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
691                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
692            ]
693            .to_vec(),
694            expected_outputs: vec![
695                ExpectedOutput {
696                    input_files: vec![0, 1],
697                    output_level: 1,
698                },
699                ExpectedOutput {
700                    input_files: vec![2, 3],
701                    output_level: 1,
702                },
703            ],
704        }
705        .check();
706
707        // Case 2:
708        //    -2000........-3
709        // -3000.....-100
710        //                    0..............2999
711        //                      50..........2998
712        //                     11.........2990
713        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
714        CompactionPickerTestCase {
715            window_size: 3,
716            input_files: [
717                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
718                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
719                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
720                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
721                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
722            ]
723            .to_vec(),
724            expected_outputs: vec![
725                ExpectedOutput {
726                    input_files: vec![0, 1],
727                    output_level: 1,
728                },
729                ExpectedOutput {
730                    input_files: vec![2, 4],
731                    output_level: 1,
732                },
733            ],
734        }
735        .check();
736
737        // Case 3:
738        // A compaction may split output into several files that have overlapping time ranges and same sequence,
739        // we should treat these files as one FileGroup.
740        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
741        CompactionPickerTestCase {
742            window_size: 3,
743            input_files: [
744                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
745                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
746                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
747                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
748                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
749            ]
750            .to_vec(),
751            expected_outputs: vec![ExpectedOutput {
752                input_files: vec![0, 1, 4],
753                output_level: 1,
754            }],
755        }
756        .check();
757    }
758
759    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
760}