mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
40/// candidates.
41#[derive(Debug)]
42pub struct TwcsPicker {
43    /// Minimum file num to trigger a compaction.
44    pub trigger_file_num: usize,
45    /// Compaction time window in seconds.
46    pub time_window_seconds: Option<i64>,
47    /// Max allowed compaction output file size.
48    pub max_output_file_size: Option<u64>,
49    /// Whether the target region is in append mode.
50    pub append_mode: bool,
51}
52
53impl TwcsPicker {
54    /// Builds compaction output from files.
55    fn build_output(
56        &self,
57        region_id: RegionId,
58        time_windows: &mut BTreeMap<i64, Window>,
59        active_window: Option<i64>,
60    ) -> Vec<CompactionOutput> {
61        let mut output = vec![];
62        for (window, files) in time_windows {
63            if files.files.is_empty() {
64                continue;
65            }
66            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67            let sorted_runs = find_sorted_runs(&mut files_to_merge);
68            let found_runs = sorted_runs.len();
69            // We only remove deletion markers if we found less than 2 runs and not in append mode.
70            // because after compaction there will be no overlapping files.
71            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
72
73            let inputs = if found_runs > 1 {
74                reduce_runs(sorted_runs)
75            } else {
76                let run = sorted_runs.last().unwrap();
77                if run.items().len() < self.trigger_file_num {
78                    continue;
79                }
80                // no overlapping files, try merge small files
81                merge_seq_files(run.items(), self.max_output_file_size)
82            };
83
84            if !inputs.is_empty() {
85                log_pick_result(
86                    region_id,
87                    *window,
88                    active_window,
89                    found_runs,
90                    files.files.len(),
91                    self.max_output_file_size,
92                    filter_deleted,
93                    &inputs,
94                );
95                output.push(CompactionOutput {
96                    output_level: LEVEL_COMPACTED, // always compact to l1
97                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
98                    filter_deleted,
99                    output_time_range: None, // we do not enforce output time range in twcs compactions.
100                });
101            }
102        }
103        output
104    }
105}
106
107#[allow(clippy::too_many_arguments)]
108fn log_pick_result(
109    region_id: RegionId,
110    window: i64,
111    active_window: Option<i64>,
112    found_runs: usize,
113    file_num: usize,
114    max_output_file_size: Option<u64>,
115    filter_deleted: bool,
116    inputs: &[FileGroup],
117) {
118    let input_file_str: Vec<String> = inputs
119        .iter()
120        .map(|f| {
121            let range = f.range();
122            let start = range.0.to_iso8601_string();
123            let end = range.1.to_iso8601_string();
124            let num_rows = f.num_rows();
125            format!(
126                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
127                f.file_ids(),
128                start,
129                end,
130                ReadableSize(f.size() as u64),
131                num_rows
132            )
133        })
134        .collect();
135    let window_str = Timestamp::new_second(window).to_iso8601_string();
136    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
137    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
138    info!(
139        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
140            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
141            input files: {:?}",
142        region_id,
143        window_str,
144        active_window_str,
145        found_runs,
146        file_num,
147        max_output_file_size,
148        filter_deleted,
149        input_file_str
150    );
151}
152
153impl Picker for TwcsPicker {
154    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
155        let region_id = compaction_region.region_id;
156        let levels = compaction_region.current_version.ssts.levels();
157
158        let expired_ssts =
159            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
160        if !expired_ssts.is_empty() {
161            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
162            // here we mark expired SSTs as compacting to avoid them being picked.
163            expired_ssts.iter().for_each(|f| f.set_compacting(true));
164        }
165
166        let compaction_time_window = compaction_region
167            .current_version
168            .compaction_time_window
169            .map(|window| window.as_secs() as i64);
170        let time_window_size = compaction_time_window
171            .or(self.time_window_seconds)
172            .unwrap_or_else(|| {
173                let inferred = infer_time_bucket(levels[0].files());
174                info!(
175                    "Compaction window for region {} is not present, inferring from files: {:?}",
176                    region_id, inferred
177                );
178                inferred
179            });
180
181        // Find active window from files in level 0.
182        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
183        // Assign files to windows
184        let mut windows =
185            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
186        let outputs = self.build_output(region_id, &mut windows, active_window);
187
188        if outputs.is_empty() && expired_ssts.is_empty() {
189            return None;
190        }
191
192        let max_file_size = self.max_output_file_size.map(|v| v as usize);
193        Some(PickerOutput {
194            outputs,
195            expired_ssts,
196            time_window_size,
197            max_file_size,
198        })
199    }
200}
201
202struct Window {
203    start: Timestamp,
204    end: Timestamp,
205    // Mapping from file sequence to file groups. Files with the same sequence is considered
206    // created from the same compaction task.
207    files: HashMap<Option<NonZeroU64>, FileGroup>,
208    time_window: i64,
209    overlapping: bool,
210}
211
212impl Window {
213    /// Creates a new [Window] with given file.
214    fn new_with_file(file: FileHandle) -> Self {
215        let (start, end) = file.time_range();
216        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
217        Self {
218            start,
219            end,
220            files,
221            time_window: 0,
222            overlapping: false,
223        }
224    }
225
226    /// Returns the time range of all files in current window (inclusive).
227    fn range(&self) -> (Timestamp, Timestamp) {
228        (self.start, self.end)
229    }
230
231    /// Adds a new file to window and updates time range.
232    fn add_file(&mut self, file: FileHandle) {
233        let (start, end) = file.time_range();
234        self.start = self.start.min(start);
235        self.end = self.end.max(end);
236
237        match self.files.entry(file.meta_ref().sequence) {
238            Entry::Occupied(mut o) => {
239                o.get_mut().add_file(file);
240            }
241            Entry::Vacant(v) => {
242                v.insert(FileGroup::new_with_file(file));
243            }
244        }
245    }
246
247    fn files(&self) -> impl Iterator<Item = &FileGroup> {
248        self.files.values()
249    }
250}
251
252/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
253fn assign_to_windows<'a>(
254    files: impl Iterator<Item = &'a FileHandle>,
255    time_window_size: i64,
256) -> BTreeMap<i64, Window> {
257    let mut windows: HashMap<i64, Window> = HashMap::new();
258    // Iterates all files and assign to time windows according to max timestamp
259    for f in files {
260        if f.compacting() {
261            continue;
262        }
263        let (_, end) = f.time_range();
264        let time_window = end
265            .convert_to(TimeUnit::Second)
266            .unwrap()
267            .value()
268            .align_to_ceil_by_bucket(time_window_size)
269            .unwrap_or(i64::MIN);
270
271        match windows.entry(time_window) {
272            Entry::Occupied(mut e) => {
273                e.get_mut().add_file(f.clone());
274            }
275            Entry::Vacant(e) => {
276                let mut window = Window::new_with_file(f.clone());
277                window.time_window = time_window;
278                e.insert(window);
279            }
280        }
281    }
282    if windows.is_empty() {
283        return BTreeMap::new();
284    }
285
286    let mut windows = windows.into_values().collect::<Vec<_>>();
287    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
288
289    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
290
291    for idx in 1..windows.len() {
292        let next_range = windows[idx].range();
293        if overlaps(&current_range, &next_range) {
294            windows[idx - 1].overlapping = true;
295            windows[idx].overlapping = true;
296        }
297        current_range = (
298            current_range.0.min(next_range.0),
299            current_range.1.max(next_range.1),
300        );
301    }
302
303    windows.into_iter().map(|w| (w.time_window, w)).collect()
304}
305
306/// Finds the latest active writing window among all files.
307/// Returns `None` when there are no files or all files are corrupted.
308fn find_latest_window_in_seconds<'a>(
309    files: impl Iterator<Item = &'a FileHandle>,
310    time_window_size: i64,
311) -> Option<i64> {
312    let mut latest_timestamp = None;
313    for f in files {
314        let (_, end) = f.time_range();
315        if let Some(latest) = latest_timestamp {
316            if end > latest {
317                latest_timestamp = Some(end);
318            }
319        } else {
320            latest_timestamp = Some(end);
321        }
322    }
323    latest_timestamp
324        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
325        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::HashSet;
331
332    use super::*;
333    use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
334    use crate::sst::file::{FileId, Level};
335
336    #[test]
337    fn test_get_latest_window_in_seconds() {
338        assert_eq!(
339            Some(1),
340            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
341        );
342        assert_eq!(
343            Some(1),
344            find_latest_window_in_seconds(
345                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
346                1
347            )
348        );
349
350        assert_eq!(
351            Some(-9223372036854000),
352            find_latest_window_in_seconds(
353                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
354                3600,
355            )
356        );
357
358        assert_eq!(
359            (i64::MAX / 10000000 + 1) * 10000,
360            find_latest_window_in_seconds(
361                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
362                10000,
363            )
364            .unwrap()
365        );
366
367        assert_eq!(
368            Some((i64::MAX / 3600000 + 1) * 3600),
369            find_latest_window_in_seconds(
370                [
371                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
372                    new_file_handle(FileId::random(), 0, 1000, 0)
373                ]
374                .iter(),
375                3600
376            )
377        );
378    }
379
380    #[test]
381    fn test_assign_to_windows() {
382        let windows = assign_to_windows(
383            [
384                new_file_handle(FileId::random(), 0, 999, 0),
385                new_file_handle(FileId::random(), 0, 999, 0),
386                new_file_handle(FileId::random(), 0, 999, 0),
387                new_file_handle(FileId::random(), 0, 999, 0),
388                new_file_handle(FileId::random(), 0, 999, 0),
389            ]
390            .iter(),
391            3,
392        );
393        let fgs = &windows.get(&0).unwrap().files;
394        assert_eq!(1, fgs.len());
395        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
396
397        let files = [FileId::random(); 3];
398        let windows = assign_to_windows(
399            [
400                new_file_handle(files[0], -2000, -3, 0),
401                new_file_handle(files[1], 0, 2999, 0),
402                new_file_handle(files[2], 50, 10001, 0),
403            ]
404            .iter(),
405            3,
406        );
407        assert_eq!(
408            files[0],
409            windows.get(&0).unwrap().files().next().unwrap().files()[0]
410                .file_id()
411                .file_id()
412        );
413        assert_eq!(
414            files[1],
415            windows.get(&3).unwrap().files().next().unwrap().files()[0]
416                .file_id()
417                .file_id()
418        );
419        assert_eq!(
420            files[2],
421            windows.get(&12).unwrap().files().next().unwrap().files()[0]
422                .file_id()
423                .file_id()
424        );
425    }
426
427    #[test]
428    fn test_assign_file_groups_to_windows() {
429        let files = [
430            FileId::random(),
431            FileId::random(),
432            FileId::random(),
433            FileId::random(),
434        ];
435        let windows = assign_to_windows(
436            [
437                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
438                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
439                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
440                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
441            ]
442            .iter(),
443            3,
444        );
445        assert_eq!(windows.len(), 1);
446        let fgs = &windows.get(&0).unwrap().files;
447        assert_eq!(2, fgs.len());
448        assert_eq!(
449            fgs.get(&NonZeroU64::new(1))
450                .unwrap()
451                .files()
452                .iter()
453                .map(|f| f.file_id().file_id())
454                .collect::<HashSet<_>>(),
455            [files[0], files[1]].into_iter().collect()
456        );
457        assert_eq!(
458            fgs.get(&NonZeroU64::new(2))
459                .unwrap()
460                .files()
461                .iter()
462                .map(|f| f.file_id().file_id())
463                .collect::<HashSet<_>>(),
464            [files[2], files[3]].into_iter().collect()
465        );
466    }
467
468    #[test]
469    fn test_assign_compacting_to_windows() {
470        let files = [
471            new_file_handle(FileId::random(), 0, 999, 0),
472            new_file_handle(FileId::random(), 0, 999, 0),
473            new_file_handle(FileId::random(), 0, 999, 0),
474            new_file_handle(FileId::random(), 0, 999, 0),
475            new_file_handle(FileId::random(), 0, 999, 0),
476        ];
477        files[0].set_compacting(true);
478        files[2].set_compacting(true);
479        let mut windows = assign_to_windows(files.iter(), 3);
480        let window0 = windows.remove(&0).unwrap();
481        assert_eq!(1, window0.files.len());
482        let candidates = window0
483            .files
484            .into_values()
485            .flat_map(|fg| fg.into_files())
486            .map(|f| f.file_id().file_id())
487            .collect::<HashSet<_>>();
488        assert_eq!(candidates.len(), 3);
489        assert_eq!(
490            candidates,
491            [
492                files[1].file_id().file_id(),
493                files[3].file_id().file_id(),
494                files[4].file_id().file_id()
495            ]
496            .into_iter()
497            .collect::<HashSet<_>>()
498        );
499    }
500
501    /// (Window value, overlapping, files' time ranges in window)
502    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
503
504    fn check_assign_to_windows_with_overlapping(
505        file_time_ranges: &[(i64, i64)],
506        time_window: i64,
507        expected_files: &[ExpectedWindowSpec],
508    ) {
509        let files: Vec<_> = (0..file_time_ranges.len())
510            .map(|_| FileId::random())
511            .collect();
512
513        let file_handles = files
514            .iter()
515            .zip(file_time_ranges.iter())
516            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
517            .collect::<Vec<_>>();
518
519        let windows = assign_to_windows(file_handles.iter(), time_window);
520
521        for (expected_window, overlapping, window_files) in expected_files {
522            let actual_window = windows.get(expected_window).unwrap();
523            assert_eq!(*overlapping, actual_window.overlapping);
524            let mut file_ranges = actual_window
525                .files
526                .iter()
527                .flat_map(|(_, f)| {
528                    f.files().iter().map(|f| {
529                        let (s, e) = f.time_range();
530                        (s.value(), e.value())
531                    })
532                })
533                .collect::<Vec<_>>();
534            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
535            assert_eq!(window_files, &file_ranges);
536        }
537    }
538
539    #[test]
540    fn test_assign_to_windows_with_overlapping() {
541        check_assign_to_windows_with_overlapping(
542            &[(0, 999), (1000, 1999), (2000, 2999)],
543            2,
544            &[
545                (0, false, vec![(0, 999)]),
546                (2, false, vec![(1000, 1999), (2000, 2999)]),
547            ],
548        );
549
550        check_assign_to_windows_with_overlapping(
551            &[(0, 1), (0, 999), (100, 2999)],
552            2,
553            &[
554                (0, true, vec![(0, 1), (0, 999)]),
555                (2, true, vec![(100, 2999)]),
556            ],
557        );
558
559        check_assign_to_windows_with_overlapping(
560            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
561            2,
562            &[
563                (0, false, vec![(0, 999)]),
564                (2, false, vec![(1000, 1999), (2000, 2999)]),
565                (4, false, vec![(3000, 3999)]),
566            ],
567        );
568
569        check_assign_to_windows_with_overlapping(
570            &[
571                (0, 999),
572                (1000, 1999),
573                (2000, 2999),
574                (3000, 3999),
575                (0, 3999),
576            ],
577            2,
578            &[
579                (0, true, vec![(0, 999)]),
580                (2, true, vec![(1000, 1999), (2000, 2999)]),
581                (4, true, vec![(0, 3999), (3000, 3999)]),
582            ],
583        );
584
585        check_assign_to_windows_with_overlapping(
586            &[
587                (0, 999),
588                (1000, 1999),
589                (2000, 2999),
590                (3000, 3999),
591                (1999, 3999),
592            ],
593            2,
594            &[
595                (0, false, vec![(0, 999)]),
596                (2, true, vec![(1000, 1999), (2000, 2999)]),
597                (4, true, vec![(1999, 3999), (3000, 3999)]),
598            ],
599        );
600
601        check_assign_to_windows_with_overlapping(
602            &[
603                (0, 999),     // window 0
604                (1000, 1999), // window 2
605                (2000, 2999), // window 2
606                (3000, 3999), // window 4
607                (2999, 3999), // window 4
608            ],
609            2,
610            &[
611                // window 2 overlaps with window 4
612                (0, false, vec![(0, 999)]),
613                (2, true, vec![(1000, 1999), (2000, 2999)]),
614                (4, true, vec![(2999, 3999), (3000, 3999)]),
615            ],
616        );
617
618        check_assign_to_windows_with_overlapping(
619            &[
620                (0, 999),     // window 0
621                (1000, 1999), // window 2
622                (2000, 2999), // window 2
623                (3000, 3999), // window 4
624                (0, 1000),    // // window 2
625            ],
626            2,
627            &[
628                // only window 0 overlaps with window 2.
629                (0, true, vec![(0, 999)]),
630                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
631                (4, false, vec![(3000, 3999)]),
632            ],
633        );
634    }
635
636    struct CompactionPickerTestCase {
637        window_size: i64,
638        input_files: Vec<FileHandle>,
639        expected_outputs: Vec<ExpectedOutput>,
640    }
641
642    impl CompactionPickerTestCase {
643        fn check(&self) {
644            let file_id_to_idx = self
645                .input_files
646                .iter()
647                .enumerate()
648                .map(|(idx, file)| (file.file_id(), idx))
649                .collect::<HashMap<_, _>>();
650            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
651            let active_window =
652                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
653            let output = TwcsPicker {
654                trigger_file_num: 4,
655                time_window_seconds: None,
656                max_output_file_size: None,
657                append_mode: false,
658            }
659            .build_output(RegionId::from_u64(0), &mut windows, active_window);
660
661            let output = output
662                .iter()
663                .map(|o| {
664                    let input_file_ids = o
665                        .inputs
666                        .iter()
667                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
668                        .collect::<HashSet<_>>();
669                    (input_file_ids, o.output_level)
670                })
671                .collect::<Vec<_>>();
672
673            let expected = self
674                .expected_outputs
675                .iter()
676                .map(|o| {
677                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
678                    (input_file_ids, o.output_level)
679                })
680                .collect::<Vec<_>>();
681            assert_eq!(expected, output);
682        }
683    }
684
685    struct ExpectedOutput {
686        input_files: Vec<usize>,
687        output_level: Level,
688    }
689
690    #[test]
691    fn test_build_twcs_output() {
692        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
693
694        // Case 1: 2 runs found in each time window.
695        CompactionPickerTestCase {
696            window_size: 3,
697            input_files: [
698                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
699                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
700                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
701                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
702            ]
703            .to_vec(),
704            expected_outputs: vec![
705                ExpectedOutput {
706                    input_files: vec![0, 1],
707                    output_level: 1,
708                },
709                ExpectedOutput {
710                    input_files: vec![2, 3],
711                    output_level: 1,
712                },
713            ],
714        }
715        .check();
716
717        // Case 2:
718        //    -2000........-3
719        // -3000.....-100
720        //                    0..............2999
721        //                      50..........2998
722        //                     11.........2990
723        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
724        CompactionPickerTestCase {
725            window_size: 3,
726            input_files: [
727                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
728                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
729                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
730                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
731                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
732            ]
733            .to_vec(),
734            expected_outputs: vec![
735                ExpectedOutput {
736                    input_files: vec![0, 1],
737                    output_level: 1,
738                },
739                ExpectedOutput {
740                    input_files: vec![2, 4],
741                    output_level: 1,
742                },
743            ],
744        }
745        .check();
746
747        // Case 3:
748        // A compaction may split output into several files that have overlapping time ranges and same sequence,
749        // we should treat these files as one FileGroup.
750        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
751        CompactionPickerTestCase {
752            window_size: 3,
753            input_files: [
754                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
755                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
756                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
757                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
758                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
759            ]
760            .to_vec(),
761            expected_outputs: vec![ExpectedOutput {
762                input_files: vec![0, 1, 4],
763                output_level: 1,
764            }],
765        }
766        .check();
767    }
768
769    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
770}