mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
40/// candidates.
41#[derive(Debug)]
42pub struct TwcsPicker {
43    /// Minimum file num to trigger a compaction.
44    pub trigger_file_num: usize,
45    /// Compaction time window in seconds.
46    pub time_window_seconds: Option<i64>,
47    /// Max allowed compaction output file size.
48    pub max_output_file_size: Option<u64>,
49    /// Whether the target region is in append mode.
50    pub append_mode: bool,
51}
52
53impl TwcsPicker {
54    /// Builds compaction output from files.
55    fn build_output(
56        &self,
57        region_id: RegionId,
58        time_windows: &mut BTreeMap<i64, Window>,
59        active_window: Option<i64>,
60    ) -> Vec<CompactionOutput> {
61        let mut output = vec![];
62        for (window, files) in time_windows {
63            if files.files.is_empty() {
64                continue;
65            }
66            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67
68            // Filter out large files in append mode - they won't benefit from compaction
69            if self.append_mode
70                && let Some(max_size) = self.max_output_file_size
71            {
72                let (kept_files, ignored_files) = files_to_merge
73                    .into_iter()
74                    .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
75                files_to_merge = kept_files;
76                info!(
77                    "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
78                    ignored_files.len(),
79                    region_id,
80                    window,
81                    max_size
82                );
83            }
84
85            let sorted_runs = find_sorted_runs(&mut files_to_merge);
86            let found_runs = sorted_runs.len();
87            // We only remove deletion markers if we found less than 2 runs and not in append mode.
88            // because after compaction there will be no overlapping files.
89            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
90            if found_runs == 0 {
91                return output;
92            }
93
94            let inputs = if found_runs > 1 {
95                reduce_runs(sorted_runs)
96            } else {
97                let run = sorted_runs.last().unwrap();
98                if run.items().len() < self.trigger_file_num {
99                    continue;
100                }
101                // no overlapping files, try merge small files
102                merge_seq_files(run.items(), self.max_output_file_size)
103            };
104
105            if !inputs.is_empty() {
106                log_pick_result(
107                    region_id,
108                    *window,
109                    active_window,
110                    found_runs,
111                    files.files.len(),
112                    self.max_output_file_size,
113                    filter_deleted,
114                    &inputs,
115                );
116                output.push(CompactionOutput {
117                    output_level: LEVEL_COMPACTED, // always compact to l1
118                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
119                    filter_deleted,
120                    output_time_range: None, // we do not enforce output time range in twcs compactions.
121                });
122            }
123        }
124        output
125    }
126}
127
128#[allow(clippy::too_many_arguments)]
129fn log_pick_result(
130    region_id: RegionId,
131    window: i64,
132    active_window: Option<i64>,
133    found_runs: usize,
134    file_num: usize,
135    max_output_file_size: Option<u64>,
136    filter_deleted: bool,
137    inputs: &[FileGroup],
138) {
139    let input_file_str: Vec<String> = inputs
140        .iter()
141        .map(|f| {
142            let range = f.range();
143            let start = range.0.to_iso8601_string();
144            let end = range.1.to_iso8601_string();
145            let num_rows = f.num_rows();
146            format!(
147                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
148                f.file_ids(),
149                start,
150                end,
151                ReadableSize(f.size() as u64),
152                num_rows
153            )
154        })
155        .collect();
156    let window_str = Timestamp::new_second(window).to_iso8601_string();
157    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
158    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
159    info!(
160        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
161            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
162            input files: {:?}",
163        region_id,
164        window_str,
165        active_window_str,
166        found_runs,
167        file_num,
168        max_output_file_size,
169        filter_deleted,
170        input_file_str
171    );
172}
173
174impl Picker for TwcsPicker {
175    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
176        let region_id = compaction_region.region_id;
177        let levels = compaction_region.current_version.ssts.levels();
178
179        let expired_ssts =
180            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
181        if !expired_ssts.is_empty() {
182            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
183            // here we mark expired SSTs as compacting to avoid them being picked.
184            expired_ssts.iter().for_each(|f| f.set_compacting(true));
185        }
186
187        let compaction_time_window = compaction_region
188            .current_version
189            .compaction_time_window
190            .map(|window| window.as_secs() as i64);
191        let time_window_size = compaction_time_window
192            .or(self.time_window_seconds)
193            .unwrap_or_else(|| {
194                let inferred = infer_time_bucket(levels[0].files());
195                info!(
196                    "Compaction window for region {} is not present, inferring from files: {:?}",
197                    region_id, inferred
198                );
199                inferred
200            });
201
202        // Find active window from files in level 0.
203        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
204        // Assign files to windows
205        let mut windows =
206            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
207        let outputs = self.build_output(region_id, &mut windows, active_window);
208
209        if outputs.is_empty() && expired_ssts.is_empty() {
210            return None;
211        }
212
213        let max_file_size = self.max_output_file_size.map(|v| v as usize);
214        Some(PickerOutput {
215            outputs,
216            expired_ssts,
217            time_window_size,
218            max_file_size,
219        })
220    }
221}
222
223struct Window {
224    start: Timestamp,
225    end: Timestamp,
226    // Mapping from file sequence to file groups. Files with the same sequence is considered
227    // created from the same compaction task.
228    files: HashMap<Option<NonZeroU64>, FileGroup>,
229    time_window: i64,
230    overlapping: bool,
231}
232
233impl Window {
234    /// Creates a new [Window] with given file.
235    fn new_with_file(file: FileHandle) -> Self {
236        let (start, end) = file.time_range();
237        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
238        Self {
239            start,
240            end,
241            files,
242            time_window: 0,
243            overlapping: false,
244        }
245    }
246
247    /// Returns the time range of all files in current window (inclusive).
248    fn range(&self) -> (Timestamp, Timestamp) {
249        (self.start, self.end)
250    }
251
252    /// Adds a new file to window and updates time range.
253    fn add_file(&mut self, file: FileHandle) {
254        let (start, end) = file.time_range();
255        self.start = self.start.min(start);
256        self.end = self.end.max(end);
257
258        match self.files.entry(file.meta_ref().sequence) {
259            Entry::Occupied(mut o) => {
260                o.get_mut().add_file(file);
261            }
262            Entry::Vacant(v) => {
263                v.insert(FileGroup::new_with_file(file));
264            }
265        }
266    }
267
268    fn files(&self) -> impl Iterator<Item = &FileGroup> {
269        self.files.values()
270    }
271}
272
273/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
274fn assign_to_windows<'a>(
275    files: impl Iterator<Item = &'a FileHandle>,
276    time_window_size: i64,
277) -> BTreeMap<i64, Window> {
278    let mut windows: HashMap<i64, Window> = HashMap::new();
279    // Iterates all files and assign to time windows according to max timestamp
280    for f in files {
281        if f.compacting() {
282            continue;
283        }
284        let (_, end) = f.time_range();
285        let time_window = end
286            .convert_to(TimeUnit::Second)
287            .unwrap()
288            .value()
289            .align_to_ceil_by_bucket(time_window_size)
290            .unwrap_or(i64::MIN);
291
292        match windows.entry(time_window) {
293            Entry::Occupied(mut e) => {
294                e.get_mut().add_file(f.clone());
295            }
296            Entry::Vacant(e) => {
297                let mut window = Window::new_with_file(f.clone());
298                window.time_window = time_window;
299                e.insert(window);
300            }
301        }
302    }
303    if windows.is_empty() {
304        return BTreeMap::new();
305    }
306
307    let mut windows = windows.into_values().collect::<Vec<_>>();
308    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
309
310    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
311
312    for idx in 1..windows.len() {
313        let next_range = windows[idx].range();
314        if overlaps(&current_range, &next_range) {
315            windows[idx - 1].overlapping = true;
316            windows[idx].overlapping = true;
317        }
318        current_range = (
319            current_range.0.min(next_range.0),
320            current_range.1.max(next_range.1),
321        );
322    }
323
324    windows.into_iter().map(|w| (w.time_window, w)).collect()
325}
326
327/// Finds the latest active writing window among all files.
328/// Returns `None` when there are no files or all files are corrupted.
329fn find_latest_window_in_seconds<'a>(
330    files: impl Iterator<Item = &'a FileHandle>,
331    time_window_size: i64,
332) -> Option<i64> {
333    let mut latest_timestamp = None;
334    for f in files {
335        let (_, end) = f.time_range();
336        if let Some(latest) = latest_timestamp {
337            if end > latest {
338                latest_timestamp = Some(end);
339            }
340        } else {
341            latest_timestamp = Some(end);
342        }
343    }
344    latest_timestamp
345        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
346        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
347}
348
349#[cfg(test)]
350mod tests {
351    use std::collections::HashSet;
352
353    use store_api::storage::FileId;
354
355    use super::*;
356    use crate::compaction::test_util::{
357        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
358    };
359    use crate::sst::file::Level;
360
361    #[test]
362    fn test_get_latest_window_in_seconds() {
363        assert_eq!(
364            Some(1),
365            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
366        );
367        assert_eq!(
368            Some(1),
369            find_latest_window_in_seconds(
370                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
371                1
372            )
373        );
374
375        assert_eq!(
376            Some(-9223372036854000),
377            find_latest_window_in_seconds(
378                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
379                3600,
380            )
381        );
382
383        assert_eq!(
384            (i64::MAX / 10000000 + 1) * 10000,
385            find_latest_window_in_seconds(
386                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
387                10000,
388            )
389            .unwrap()
390        );
391
392        assert_eq!(
393            Some((i64::MAX / 3600000 + 1) * 3600),
394            find_latest_window_in_seconds(
395                [
396                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
397                    new_file_handle(FileId::random(), 0, 1000, 0)
398                ]
399                .iter(),
400                3600
401            )
402        );
403    }
404
405    #[test]
406    fn test_assign_to_windows() {
407        let windows = assign_to_windows(
408            [
409                new_file_handle(FileId::random(), 0, 999, 0),
410                new_file_handle(FileId::random(), 0, 999, 0),
411                new_file_handle(FileId::random(), 0, 999, 0),
412                new_file_handle(FileId::random(), 0, 999, 0),
413                new_file_handle(FileId::random(), 0, 999, 0),
414            ]
415            .iter(),
416            3,
417        );
418        let fgs = &windows.get(&0).unwrap().files;
419        assert_eq!(1, fgs.len());
420        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
421
422        let files = [FileId::random(); 3];
423        let windows = assign_to_windows(
424            [
425                new_file_handle(files[0], -2000, -3, 0),
426                new_file_handle(files[1], 0, 2999, 0),
427                new_file_handle(files[2], 50, 10001, 0),
428            ]
429            .iter(),
430            3,
431        );
432        assert_eq!(
433            files[0],
434            windows.get(&0).unwrap().files().next().unwrap().files()[0]
435                .file_id()
436                .file_id()
437        );
438        assert_eq!(
439            files[1],
440            windows.get(&3).unwrap().files().next().unwrap().files()[0]
441                .file_id()
442                .file_id()
443        );
444        assert_eq!(
445            files[2],
446            windows.get(&12).unwrap().files().next().unwrap().files()[0]
447                .file_id()
448                .file_id()
449        );
450    }
451
452    #[test]
453    fn test_assign_file_groups_to_windows() {
454        let files = [
455            FileId::random(),
456            FileId::random(),
457            FileId::random(),
458            FileId::random(),
459        ];
460        let windows = assign_to_windows(
461            [
462                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
463                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
464                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
465                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
466            ]
467            .iter(),
468            3,
469        );
470        assert_eq!(windows.len(), 1);
471        let fgs = &windows.get(&0).unwrap().files;
472        assert_eq!(2, fgs.len());
473        assert_eq!(
474            fgs.get(&NonZeroU64::new(1))
475                .unwrap()
476                .files()
477                .iter()
478                .map(|f| f.file_id().file_id())
479                .collect::<HashSet<_>>(),
480            [files[0], files[1]].into_iter().collect()
481        );
482        assert_eq!(
483            fgs.get(&NonZeroU64::new(2))
484                .unwrap()
485                .files()
486                .iter()
487                .map(|f| f.file_id().file_id())
488                .collect::<HashSet<_>>(),
489            [files[2], files[3]].into_iter().collect()
490        );
491    }
492
493    #[test]
494    fn test_assign_compacting_to_windows() {
495        let files = [
496            new_file_handle(FileId::random(), 0, 999, 0),
497            new_file_handle(FileId::random(), 0, 999, 0),
498            new_file_handle(FileId::random(), 0, 999, 0),
499            new_file_handle(FileId::random(), 0, 999, 0),
500            new_file_handle(FileId::random(), 0, 999, 0),
501        ];
502        files[0].set_compacting(true);
503        files[2].set_compacting(true);
504        let mut windows = assign_to_windows(files.iter(), 3);
505        let window0 = windows.remove(&0).unwrap();
506        assert_eq!(1, window0.files.len());
507        let candidates = window0
508            .files
509            .into_values()
510            .flat_map(|fg| fg.into_files())
511            .map(|f| f.file_id().file_id())
512            .collect::<HashSet<_>>();
513        assert_eq!(candidates.len(), 3);
514        assert_eq!(
515            candidates,
516            [
517                files[1].file_id().file_id(),
518                files[3].file_id().file_id(),
519                files[4].file_id().file_id()
520            ]
521            .into_iter()
522            .collect::<HashSet<_>>()
523        );
524    }
525
526    /// (Window value, overlapping, files' time ranges in window)
527    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
528
529    fn check_assign_to_windows_with_overlapping(
530        file_time_ranges: &[(i64, i64)],
531        time_window: i64,
532        expected_files: &[ExpectedWindowSpec],
533    ) {
534        let files: Vec<_> = (0..file_time_ranges.len())
535            .map(|_| FileId::random())
536            .collect();
537
538        let file_handles = files
539            .iter()
540            .zip(file_time_ranges.iter())
541            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
542            .collect::<Vec<_>>();
543
544        let windows = assign_to_windows(file_handles.iter(), time_window);
545
546        for (expected_window, overlapping, window_files) in expected_files {
547            let actual_window = windows.get(expected_window).unwrap();
548            assert_eq!(*overlapping, actual_window.overlapping);
549            let mut file_ranges = actual_window
550                .files
551                .iter()
552                .flat_map(|(_, f)| {
553                    f.files().iter().map(|f| {
554                        let (s, e) = f.time_range();
555                        (s.value(), e.value())
556                    })
557                })
558                .collect::<Vec<_>>();
559            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
560            assert_eq!(window_files, &file_ranges);
561        }
562    }
563
564    #[test]
565    fn test_assign_to_windows_with_overlapping() {
566        check_assign_to_windows_with_overlapping(
567            &[(0, 999), (1000, 1999), (2000, 2999)],
568            2,
569            &[
570                (0, false, vec![(0, 999)]),
571                (2, false, vec![(1000, 1999), (2000, 2999)]),
572            ],
573        );
574
575        check_assign_to_windows_with_overlapping(
576            &[(0, 1), (0, 999), (100, 2999)],
577            2,
578            &[
579                (0, true, vec![(0, 1), (0, 999)]),
580                (2, true, vec![(100, 2999)]),
581            ],
582        );
583
584        check_assign_to_windows_with_overlapping(
585            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
586            2,
587            &[
588                (0, false, vec![(0, 999)]),
589                (2, false, vec![(1000, 1999), (2000, 2999)]),
590                (4, false, vec![(3000, 3999)]),
591            ],
592        );
593
594        check_assign_to_windows_with_overlapping(
595            &[
596                (0, 999),
597                (1000, 1999),
598                (2000, 2999),
599                (3000, 3999),
600                (0, 3999),
601            ],
602            2,
603            &[
604                (0, true, vec![(0, 999)]),
605                (2, true, vec![(1000, 1999), (2000, 2999)]),
606                (4, true, vec![(0, 3999), (3000, 3999)]),
607            ],
608        );
609
610        check_assign_to_windows_with_overlapping(
611            &[
612                (0, 999),
613                (1000, 1999),
614                (2000, 2999),
615                (3000, 3999),
616                (1999, 3999),
617            ],
618            2,
619            &[
620                (0, false, vec![(0, 999)]),
621                (2, true, vec![(1000, 1999), (2000, 2999)]),
622                (4, true, vec![(1999, 3999), (3000, 3999)]),
623            ],
624        );
625
626        check_assign_to_windows_with_overlapping(
627            &[
628                (0, 999),     // window 0
629                (1000, 1999), // window 2
630                (2000, 2999), // window 2
631                (3000, 3999), // window 4
632                (2999, 3999), // window 4
633            ],
634            2,
635            &[
636                // window 2 overlaps with window 4
637                (0, false, vec![(0, 999)]),
638                (2, true, vec![(1000, 1999), (2000, 2999)]),
639                (4, true, vec![(2999, 3999), (3000, 3999)]),
640            ],
641        );
642
643        check_assign_to_windows_with_overlapping(
644            &[
645                (0, 999),     // window 0
646                (1000, 1999), // window 2
647                (2000, 2999), // window 2
648                (3000, 3999), // window 4
649                (0, 1000),    // // window 2
650            ],
651            2,
652            &[
653                // only window 0 overlaps with window 2.
654                (0, true, vec![(0, 999)]),
655                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
656                (4, false, vec![(3000, 3999)]),
657            ],
658        );
659    }
660
661    struct CompactionPickerTestCase {
662        window_size: i64,
663        input_files: Vec<FileHandle>,
664        expected_outputs: Vec<ExpectedOutput>,
665    }
666
667    impl CompactionPickerTestCase {
668        fn check(&self) {
669            let file_id_to_idx = self
670                .input_files
671                .iter()
672                .enumerate()
673                .map(|(idx, file)| (file.file_id(), idx))
674                .collect::<HashMap<_, _>>();
675            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
676            let active_window =
677                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
678            let output = TwcsPicker {
679                trigger_file_num: 4,
680                time_window_seconds: None,
681                max_output_file_size: None,
682                append_mode: false,
683            }
684            .build_output(RegionId::from_u64(0), &mut windows, active_window);
685
686            let output = output
687                .iter()
688                .map(|o| {
689                    let input_file_ids = o
690                        .inputs
691                        .iter()
692                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
693                        .collect::<HashSet<_>>();
694                    (input_file_ids, o.output_level)
695                })
696                .collect::<Vec<_>>();
697
698            let expected = self
699                .expected_outputs
700                .iter()
701                .map(|o| {
702                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
703                    (input_file_ids, o.output_level)
704                })
705                .collect::<Vec<_>>();
706            assert_eq!(expected, output);
707        }
708    }
709
710    struct ExpectedOutput {
711        input_files: Vec<usize>,
712        output_level: Level,
713    }
714
715    #[test]
716    fn test_build_twcs_output() {
717        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
718
719        // Case 1: 2 runs found in each time window.
720        CompactionPickerTestCase {
721            window_size: 3,
722            input_files: [
723                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
724                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
725                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
726                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
727            ]
728            .to_vec(),
729            expected_outputs: vec![
730                ExpectedOutput {
731                    input_files: vec![0, 1],
732                    output_level: 1,
733                },
734                ExpectedOutput {
735                    input_files: vec![2, 3],
736                    output_level: 1,
737                },
738            ],
739        }
740        .check();
741
742        // Case 2:
743        //    -2000........-3
744        // -3000.....-100
745        //                    0..............2999
746        //                      50..........2998
747        //                     11.........2990
748        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
749        CompactionPickerTestCase {
750            window_size: 3,
751            input_files: [
752                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
753                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
754                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
755                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
756                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
757            ]
758            .to_vec(),
759            expected_outputs: vec![
760                ExpectedOutput {
761                    input_files: vec![0, 1],
762                    output_level: 1,
763                },
764                ExpectedOutput {
765                    input_files: vec![2, 4],
766                    output_level: 1,
767                },
768            ],
769        }
770        .check();
771
772        // Case 3:
773        // A compaction may split output into several files that have overlapping time ranges and same sequence,
774        // we should treat these files as one FileGroup.
775        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
776        CompactionPickerTestCase {
777            window_size: 3,
778            input_files: [
779                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
780                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
781                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
782                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
783                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
784            ]
785            .to_vec(),
786            expected_outputs: vec![ExpectedOutput {
787                input_files: vec![0, 1, 4],
788                output_level: 1,
789            }],
790        }
791        .check();
792    }
793
794    #[test]
795    fn test_append_mode_filter_large_files() {
796        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
797        let max_output_file_size = 1000u64;
798
799        // Create files with different sizes
800        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
801        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
802        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
803        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
804
805        // Create file groups (each file is in its own group due to different sequences)
806        let mut files_to_merge = vec![
807            FileGroup::new_with_file(small_file_1),
808            FileGroup::new_with_file(large_file_1),
809            FileGroup::new_with_file(small_file_2),
810            FileGroup::new_with_file(large_file_2),
811        ];
812
813        // Test filtering logic directly
814        let original_count = files_to_merge.len();
815
816        // Apply append mode filtering
817        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
818
819        // Should have filtered out 2 large files, leaving 2 small files
820        assert_eq!(files_to_merge.len(), 2);
821        assert_eq!(original_count, 4);
822
823        // Verify the remaining files are the small ones
824        for fg in &files_to_merge {
825            assert!(
826                fg.size() <= max_output_file_size as usize,
827                "File size {} should be <= {}",
828                fg.size(),
829                max_output_file_size
830            );
831        }
832    }
833
834    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
835}