mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
40/// candidates.
41#[derive(Debug)]
42pub struct TwcsPicker {
43    /// Minimum file num to trigger a compaction.
44    pub trigger_file_num: usize,
45    /// Compaction time window in seconds.
46    pub time_window_seconds: Option<i64>,
47    /// Max allowed compaction output file size.
48    pub max_output_file_size: Option<u64>,
49    /// Whether the target region is in append mode.
50    pub append_mode: bool,
51}
52
53impl TwcsPicker {
54    /// Builds compaction output from files.
55    fn build_output(
56        &self,
57        region_id: RegionId,
58        time_windows: &mut BTreeMap<i64, Window>,
59        active_window: Option<i64>,
60    ) -> Vec<CompactionOutput> {
61        let mut output = vec![];
62        for (window, files) in time_windows {
63            if files.files.is_empty() {
64                continue;
65            }
66            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67
68            // Filter out large files in append mode - they won't benefit from compaction
69            if self.append_mode {
70                if let Some(max_size) = self.max_output_file_size {
71                    let (kept_files, ignored_files) = files_to_merge
72                        .into_iter()
73                        .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
74                    files_to_merge = kept_files;
75                    info!(
76                        "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
77                        ignored_files.len(),
78                        region_id,
79                        window,
80                        max_size
81                    );
82                }
83            }
84
85            let sorted_runs = find_sorted_runs(&mut files_to_merge);
86            let found_runs = sorted_runs.len();
87            // We only remove deletion markers if we found less than 2 runs and not in append mode.
88            // because after compaction there will be no overlapping files.
89            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
90            if found_runs == 0 {
91                return output;
92            }
93
94            let inputs = if found_runs > 1 {
95                reduce_runs(sorted_runs)
96            } else {
97                let run = sorted_runs.last().unwrap();
98                if run.items().len() < self.trigger_file_num {
99                    continue;
100                }
101                // no overlapping files, try merge small files
102                merge_seq_files(run.items(), self.max_output_file_size)
103            };
104
105            if !inputs.is_empty() {
106                log_pick_result(
107                    region_id,
108                    *window,
109                    active_window,
110                    found_runs,
111                    files.files.len(),
112                    self.max_output_file_size,
113                    filter_deleted,
114                    &inputs,
115                );
116                output.push(CompactionOutput {
117                    output_level: LEVEL_COMPACTED, // always compact to l1
118                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
119                    filter_deleted,
120                    output_time_range: None, // we do not enforce output time range in twcs compactions.
121                });
122            }
123        }
124        output
125    }
126}
127
128#[allow(clippy::too_many_arguments)]
129fn log_pick_result(
130    region_id: RegionId,
131    window: i64,
132    active_window: Option<i64>,
133    found_runs: usize,
134    file_num: usize,
135    max_output_file_size: Option<u64>,
136    filter_deleted: bool,
137    inputs: &[FileGroup],
138) {
139    let input_file_str: Vec<String> = inputs
140        .iter()
141        .map(|f| {
142            let range = f.range();
143            let start = range.0.to_iso8601_string();
144            let end = range.1.to_iso8601_string();
145            let num_rows = f.num_rows();
146            format!(
147                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
148                f.file_ids(),
149                start,
150                end,
151                ReadableSize(f.size() as u64),
152                num_rows
153            )
154        })
155        .collect();
156    let window_str = Timestamp::new_second(window).to_iso8601_string();
157    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
158    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
159    info!(
160        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
161            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
162            input files: {:?}",
163        region_id,
164        window_str,
165        active_window_str,
166        found_runs,
167        file_num,
168        max_output_file_size,
169        filter_deleted,
170        input_file_str
171    );
172}
173
174impl Picker for TwcsPicker {
175    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
176        let region_id = compaction_region.region_id;
177        let levels = compaction_region.current_version.ssts.levels();
178
179        let expired_ssts =
180            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
181        if !expired_ssts.is_empty() {
182            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
183            // here we mark expired SSTs as compacting to avoid them being picked.
184            expired_ssts.iter().for_each(|f| f.set_compacting(true));
185        }
186
187        let compaction_time_window = compaction_region
188            .current_version
189            .compaction_time_window
190            .map(|window| window.as_secs() as i64);
191        let time_window_size = compaction_time_window
192            .or(self.time_window_seconds)
193            .unwrap_or_else(|| {
194                let inferred = infer_time_bucket(levels[0].files());
195                info!(
196                    "Compaction window for region {} is not present, inferring from files: {:?}",
197                    region_id, inferred
198                );
199                inferred
200            });
201
202        // Find active window from files in level 0.
203        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
204        // Assign files to windows
205        let mut windows =
206            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
207        let outputs = self.build_output(region_id, &mut windows, active_window);
208
209        if outputs.is_empty() && expired_ssts.is_empty() {
210            return None;
211        }
212
213        let max_file_size = self.max_output_file_size.map(|v| v as usize);
214        Some(PickerOutput {
215            outputs,
216            expired_ssts,
217            time_window_size,
218            max_file_size,
219        })
220    }
221}
222
223struct Window {
224    start: Timestamp,
225    end: Timestamp,
226    // Mapping from file sequence to file groups. Files with the same sequence is considered
227    // created from the same compaction task.
228    files: HashMap<Option<NonZeroU64>, FileGroup>,
229    time_window: i64,
230    overlapping: bool,
231}
232
233impl Window {
234    /// Creates a new [Window] with given file.
235    fn new_with_file(file: FileHandle) -> Self {
236        let (start, end) = file.time_range();
237        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
238        Self {
239            start,
240            end,
241            files,
242            time_window: 0,
243            overlapping: false,
244        }
245    }
246
247    /// Returns the time range of all files in current window (inclusive).
248    fn range(&self) -> (Timestamp, Timestamp) {
249        (self.start, self.end)
250    }
251
252    /// Adds a new file to window and updates time range.
253    fn add_file(&mut self, file: FileHandle) {
254        let (start, end) = file.time_range();
255        self.start = self.start.min(start);
256        self.end = self.end.max(end);
257
258        match self.files.entry(file.meta_ref().sequence) {
259            Entry::Occupied(mut o) => {
260                o.get_mut().add_file(file);
261            }
262            Entry::Vacant(v) => {
263                v.insert(FileGroup::new_with_file(file));
264            }
265        }
266    }
267
268    fn files(&self) -> impl Iterator<Item = &FileGroup> {
269        self.files.values()
270    }
271}
272
273/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
274fn assign_to_windows<'a>(
275    files: impl Iterator<Item = &'a FileHandle>,
276    time_window_size: i64,
277) -> BTreeMap<i64, Window> {
278    let mut windows: HashMap<i64, Window> = HashMap::new();
279    // Iterates all files and assign to time windows according to max timestamp
280    for f in files {
281        if f.compacting() {
282            continue;
283        }
284        let (_, end) = f.time_range();
285        let time_window = end
286            .convert_to(TimeUnit::Second)
287            .unwrap()
288            .value()
289            .align_to_ceil_by_bucket(time_window_size)
290            .unwrap_or(i64::MIN);
291
292        match windows.entry(time_window) {
293            Entry::Occupied(mut e) => {
294                e.get_mut().add_file(f.clone());
295            }
296            Entry::Vacant(e) => {
297                let mut window = Window::new_with_file(f.clone());
298                window.time_window = time_window;
299                e.insert(window);
300            }
301        }
302    }
303    if windows.is_empty() {
304        return BTreeMap::new();
305    }
306
307    let mut windows = windows.into_values().collect::<Vec<_>>();
308    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
309
310    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
311
312    for idx in 1..windows.len() {
313        let next_range = windows[idx].range();
314        if overlaps(&current_range, &next_range) {
315            windows[idx - 1].overlapping = true;
316            windows[idx].overlapping = true;
317        }
318        current_range = (
319            current_range.0.min(next_range.0),
320            current_range.1.max(next_range.1),
321        );
322    }
323
324    windows.into_iter().map(|w| (w.time_window, w)).collect()
325}
326
327/// Finds the latest active writing window among all files.
328/// Returns `None` when there are no files or all files are corrupted.
329fn find_latest_window_in_seconds<'a>(
330    files: impl Iterator<Item = &'a FileHandle>,
331    time_window_size: i64,
332) -> Option<i64> {
333    let mut latest_timestamp = None;
334    for f in files {
335        let (_, end) = f.time_range();
336        if let Some(latest) = latest_timestamp {
337            if end > latest {
338                latest_timestamp = Some(end);
339            }
340        } else {
341            latest_timestamp = Some(end);
342        }
343    }
344    latest_timestamp
345        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
346        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
347}
348
349#[cfg(test)]
350mod tests {
351    use std::collections::HashSet;
352
353    use super::*;
354    use crate::compaction::test_util::{
355        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
356    };
357    use crate::sst::file::{FileId, Level};
358
359    #[test]
360    fn test_get_latest_window_in_seconds() {
361        assert_eq!(
362            Some(1),
363            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
364        );
365        assert_eq!(
366            Some(1),
367            find_latest_window_in_seconds(
368                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
369                1
370            )
371        );
372
373        assert_eq!(
374            Some(-9223372036854000),
375            find_latest_window_in_seconds(
376                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
377                3600,
378            )
379        );
380
381        assert_eq!(
382            (i64::MAX / 10000000 + 1) * 10000,
383            find_latest_window_in_seconds(
384                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
385                10000,
386            )
387            .unwrap()
388        );
389
390        assert_eq!(
391            Some((i64::MAX / 3600000 + 1) * 3600),
392            find_latest_window_in_seconds(
393                [
394                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
395                    new_file_handle(FileId::random(), 0, 1000, 0)
396                ]
397                .iter(),
398                3600
399            )
400        );
401    }
402
403    #[test]
404    fn test_assign_to_windows() {
405        let windows = assign_to_windows(
406            [
407                new_file_handle(FileId::random(), 0, 999, 0),
408                new_file_handle(FileId::random(), 0, 999, 0),
409                new_file_handle(FileId::random(), 0, 999, 0),
410                new_file_handle(FileId::random(), 0, 999, 0),
411                new_file_handle(FileId::random(), 0, 999, 0),
412            ]
413            .iter(),
414            3,
415        );
416        let fgs = &windows.get(&0).unwrap().files;
417        assert_eq!(1, fgs.len());
418        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
419
420        let files = [FileId::random(); 3];
421        let windows = assign_to_windows(
422            [
423                new_file_handle(files[0], -2000, -3, 0),
424                new_file_handle(files[1], 0, 2999, 0),
425                new_file_handle(files[2], 50, 10001, 0),
426            ]
427            .iter(),
428            3,
429        );
430        assert_eq!(
431            files[0],
432            windows.get(&0).unwrap().files().next().unwrap().files()[0]
433                .file_id()
434                .file_id()
435        );
436        assert_eq!(
437            files[1],
438            windows.get(&3).unwrap().files().next().unwrap().files()[0]
439                .file_id()
440                .file_id()
441        );
442        assert_eq!(
443            files[2],
444            windows.get(&12).unwrap().files().next().unwrap().files()[0]
445                .file_id()
446                .file_id()
447        );
448    }
449
450    #[test]
451    fn test_assign_file_groups_to_windows() {
452        let files = [
453            FileId::random(),
454            FileId::random(),
455            FileId::random(),
456            FileId::random(),
457        ];
458        let windows = assign_to_windows(
459            [
460                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
461                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
462                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
463                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
464            ]
465            .iter(),
466            3,
467        );
468        assert_eq!(windows.len(), 1);
469        let fgs = &windows.get(&0).unwrap().files;
470        assert_eq!(2, fgs.len());
471        assert_eq!(
472            fgs.get(&NonZeroU64::new(1))
473                .unwrap()
474                .files()
475                .iter()
476                .map(|f| f.file_id().file_id())
477                .collect::<HashSet<_>>(),
478            [files[0], files[1]].into_iter().collect()
479        );
480        assert_eq!(
481            fgs.get(&NonZeroU64::new(2))
482                .unwrap()
483                .files()
484                .iter()
485                .map(|f| f.file_id().file_id())
486                .collect::<HashSet<_>>(),
487            [files[2], files[3]].into_iter().collect()
488        );
489    }
490
491    #[test]
492    fn test_assign_compacting_to_windows() {
493        let files = [
494            new_file_handle(FileId::random(), 0, 999, 0),
495            new_file_handle(FileId::random(), 0, 999, 0),
496            new_file_handle(FileId::random(), 0, 999, 0),
497            new_file_handle(FileId::random(), 0, 999, 0),
498            new_file_handle(FileId::random(), 0, 999, 0),
499        ];
500        files[0].set_compacting(true);
501        files[2].set_compacting(true);
502        let mut windows = assign_to_windows(files.iter(), 3);
503        let window0 = windows.remove(&0).unwrap();
504        assert_eq!(1, window0.files.len());
505        let candidates = window0
506            .files
507            .into_values()
508            .flat_map(|fg| fg.into_files())
509            .map(|f| f.file_id().file_id())
510            .collect::<HashSet<_>>();
511        assert_eq!(candidates.len(), 3);
512        assert_eq!(
513            candidates,
514            [
515                files[1].file_id().file_id(),
516                files[3].file_id().file_id(),
517                files[4].file_id().file_id()
518            ]
519            .into_iter()
520            .collect::<HashSet<_>>()
521        );
522    }
523
524    /// (Window value, overlapping, files' time ranges in window)
525    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
526
527    fn check_assign_to_windows_with_overlapping(
528        file_time_ranges: &[(i64, i64)],
529        time_window: i64,
530        expected_files: &[ExpectedWindowSpec],
531    ) {
532        let files: Vec<_> = (0..file_time_ranges.len())
533            .map(|_| FileId::random())
534            .collect();
535
536        let file_handles = files
537            .iter()
538            .zip(file_time_ranges.iter())
539            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
540            .collect::<Vec<_>>();
541
542        let windows = assign_to_windows(file_handles.iter(), time_window);
543
544        for (expected_window, overlapping, window_files) in expected_files {
545            let actual_window = windows.get(expected_window).unwrap();
546            assert_eq!(*overlapping, actual_window.overlapping);
547            let mut file_ranges = actual_window
548                .files
549                .iter()
550                .flat_map(|(_, f)| {
551                    f.files().iter().map(|f| {
552                        let (s, e) = f.time_range();
553                        (s.value(), e.value())
554                    })
555                })
556                .collect::<Vec<_>>();
557            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
558            assert_eq!(window_files, &file_ranges);
559        }
560    }
561
562    #[test]
563    fn test_assign_to_windows_with_overlapping() {
564        check_assign_to_windows_with_overlapping(
565            &[(0, 999), (1000, 1999), (2000, 2999)],
566            2,
567            &[
568                (0, false, vec![(0, 999)]),
569                (2, false, vec![(1000, 1999), (2000, 2999)]),
570            ],
571        );
572
573        check_assign_to_windows_with_overlapping(
574            &[(0, 1), (0, 999), (100, 2999)],
575            2,
576            &[
577                (0, true, vec![(0, 1), (0, 999)]),
578                (2, true, vec![(100, 2999)]),
579            ],
580        );
581
582        check_assign_to_windows_with_overlapping(
583            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
584            2,
585            &[
586                (0, false, vec![(0, 999)]),
587                (2, false, vec![(1000, 1999), (2000, 2999)]),
588                (4, false, vec![(3000, 3999)]),
589            ],
590        );
591
592        check_assign_to_windows_with_overlapping(
593            &[
594                (0, 999),
595                (1000, 1999),
596                (2000, 2999),
597                (3000, 3999),
598                (0, 3999),
599            ],
600            2,
601            &[
602                (0, true, vec![(0, 999)]),
603                (2, true, vec![(1000, 1999), (2000, 2999)]),
604                (4, true, vec![(0, 3999), (3000, 3999)]),
605            ],
606        );
607
608        check_assign_to_windows_with_overlapping(
609            &[
610                (0, 999),
611                (1000, 1999),
612                (2000, 2999),
613                (3000, 3999),
614                (1999, 3999),
615            ],
616            2,
617            &[
618                (0, false, vec![(0, 999)]),
619                (2, true, vec![(1000, 1999), (2000, 2999)]),
620                (4, true, vec![(1999, 3999), (3000, 3999)]),
621            ],
622        );
623
624        check_assign_to_windows_with_overlapping(
625            &[
626                (0, 999),     // window 0
627                (1000, 1999), // window 2
628                (2000, 2999), // window 2
629                (3000, 3999), // window 4
630                (2999, 3999), // window 4
631            ],
632            2,
633            &[
634                // window 2 overlaps with window 4
635                (0, false, vec![(0, 999)]),
636                (2, true, vec![(1000, 1999), (2000, 2999)]),
637                (4, true, vec![(2999, 3999), (3000, 3999)]),
638            ],
639        );
640
641        check_assign_to_windows_with_overlapping(
642            &[
643                (0, 999),     // window 0
644                (1000, 1999), // window 2
645                (2000, 2999), // window 2
646                (3000, 3999), // window 4
647                (0, 1000),    // // window 2
648            ],
649            2,
650            &[
651                // only window 0 overlaps with window 2.
652                (0, true, vec![(0, 999)]),
653                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
654                (4, false, vec![(3000, 3999)]),
655            ],
656        );
657    }
658
659    struct CompactionPickerTestCase {
660        window_size: i64,
661        input_files: Vec<FileHandle>,
662        expected_outputs: Vec<ExpectedOutput>,
663    }
664
665    impl CompactionPickerTestCase {
666        fn check(&self) {
667            let file_id_to_idx = self
668                .input_files
669                .iter()
670                .enumerate()
671                .map(|(idx, file)| (file.file_id(), idx))
672                .collect::<HashMap<_, _>>();
673            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
674            let active_window =
675                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
676            let output = TwcsPicker {
677                trigger_file_num: 4,
678                time_window_seconds: None,
679                max_output_file_size: None,
680                append_mode: false,
681            }
682            .build_output(RegionId::from_u64(0), &mut windows, active_window);
683
684            let output = output
685                .iter()
686                .map(|o| {
687                    let input_file_ids = o
688                        .inputs
689                        .iter()
690                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
691                        .collect::<HashSet<_>>();
692                    (input_file_ids, o.output_level)
693                })
694                .collect::<Vec<_>>();
695
696            let expected = self
697                .expected_outputs
698                .iter()
699                .map(|o| {
700                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
701                    (input_file_ids, o.output_level)
702                })
703                .collect::<Vec<_>>();
704            assert_eq!(expected, output);
705        }
706    }
707
708    struct ExpectedOutput {
709        input_files: Vec<usize>,
710        output_level: Level,
711    }
712
713    #[test]
714    fn test_build_twcs_output() {
715        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
716
717        // Case 1: 2 runs found in each time window.
718        CompactionPickerTestCase {
719            window_size: 3,
720            input_files: [
721                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
722                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
723                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
724                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
725            ]
726            .to_vec(),
727            expected_outputs: vec![
728                ExpectedOutput {
729                    input_files: vec![0, 1],
730                    output_level: 1,
731                },
732                ExpectedOutput {
733                    input_files: vec![2, 3],
734                    output_level: 1,
735                },
736            ],
737        }
738        .check();
739
740        // Case 2:
741        //    -2000........-3
742        // -3000.....-100
743        //                    0..............2999
744        //                      50..........2998
745        //                     11.........2990
746        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
747        CompactionPickerTestCase {
748            window_size: 3,
749            input_files: [
750                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
751                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
752                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
753                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
754                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
755            ]
756            .to_vec(),
757            expected_outputs: vec![
758                ExpectedOutput {
759                    input_files: vec![0, 1],
760                    output_level: 1,
761                },
762                ExpectedOutput {
763                    input_files: vec![2, 4],
764                    output_level: 1,
765                },
766            ],
767        }
768        .check();
769
770        // Case 3:
771        // A compaction may split output into several files that have overlapping time ranges and same sequence,
772        // we should treat these files as one FileGroup.
773        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
774        CompactionPickerTestCase {
775            window_size: 3,
776            input_files: [
777                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
778                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
779                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
780                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
781                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
782            ]
783            .to_vec(),
784            expected_outputs: vec![ExpectedOutput {
785                input_files: vec![0, 1, 4],
786                output_level: 1,
787            }],
788        }
789        .check();
790    }
791
792    #[test]
793    fn test_append_mode_filter_large_files() {
794        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
795        let max_output_file_size = 1000u64;
796
797        // Create files with different sizes
798        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
799        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
800        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
801        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
802
803        // Create file groups (each file is in its own group due to different sequences)
804        let mut files_to_merge = vec![
805            FileGroup::new_with_file(small_file_1),
806            FileGroup::new_with_file(large_file_1),
807            FileGroup::new_with_file(small_file_2),
808            FileGroup::new_with_file(large_file_2),
809        ];
810
811        // Test filtering logic directly
812        let original_count = files_to_merge.len();
813
814        // Apply append mode filtering
815        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
816
817        // Should have filtered out 2 large files, leaving 2 small files
818        assert_eq!(files_to_merge.len(), 2);
819        assert_eq!(original_count, 4);
820
821        // Verify the remaining files are the small ones
822        for fg in &files_to_merge {
823            assert!(
824                fg.size() <= max_output_file_size as usize,
825                "File size {} should be <= {}",
826                fg.size(),
827                max_output_file_size
828            );
829        }
830    }
831
832    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
833}