mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
40/// candidates.
41#[derive(Debug)]
42pub struct TwcsPicker {
43    /// Minimum file num to trigger a compaction.
44    pub trigger_file_num: usize,
45    /// Compaction time window in seconds.
46    pub time_window_seconds: Option<i64>,
47    /// Max allowed compaction output file size.
48    pub max_output_file_size: Option<u64>,
49    /// Whether the target region is in append mode.
50    pub append_mode: bool,
51    /// Max background compaction tasks.
52    pub max_background_tasks: Option<usize>,
53}
54
55impl TwcsPicker {
56    /// Builds compaction output from files.
57    fn build_output(
58        &self,
59        region_id: RegionId,
60        time_windows: &mut BTreeMap<i64, Window>,
61        active_window: Option<i64>,
62    ) -> Vec<CompactionOutput> {
63        let mut output = vec![];
64        for (window, files) in time_windows {
65            if files.files.is_empty() {
66                continue;
67            }
68            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
69
70            // Filter out large files in append mode - they won't benefit from compaction
71            if self.append_mode
72                && let Some(max_size) = self.max_output_file_size
73            {
74                let (kept_files, ignored_files) = files_to_merge
75                    .into_iter()
76                    .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
77                files_to_merge = kept_files;
78                info!(
79                    "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
80                    ignored_files.len(),
81                    region_id,
82                    window,
83                    max_size
84                );
85            }
86
87            let sorted_runs = find_sorted_runs(&mut files_to_merge);
88            let found_runs = sorted_runs.len();
89            // We only remove deletion markers if we found less than 2 runs and not in append mode.
90            // because after compaction there will be no overlapping files.
91            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
92            if found_runs == 0 {
93                continue;
94            }
95
96            let inputs = if found_runs > 1 {
97                reduce_runs(sorted_runs)
98            } else {
99                let run = sorted_runs.last().unwrap();
100                if run.items().len() < self.trigger_file_num {
101                    continue;
102                }
103                // no overlapping files, try merge small files
104                merge_seq_files(run.items(), self.max_output_file_size)
105            };
106
107            if !inputs.is_empty() {
108                log_pick_result(
109                    region_id,
110                    *window,
111                    active_window,
112                    found_runs,
113                    files.files.len(),
114                    self.max_output_file_size,
115                    filter_deleted,
116                    &inputs,
117                );
118                output.push(CompactionOutput {
119                    output_level: LEVEL_COMPACTED, // always compact to l1
120                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
121                    filter_deleted,
122                    output_time_range: None, // we do not enforce output time range in twcs compactions.
123                });
124
125                if let Some(max_background_tasks) = self.max_background_tasks
126                    && output.len() >= max_background_tasks
127                {
128                    debug!(
129                        "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
130                        region_id, max_background_tasks
131                    );
132                    break;
133                }
134            }
135        }
136        output
137    }
138}
139
140#[allow(clippy::too_many_arguments)]
141fn log_pick_result(
142    region_id: RegionId,
143    window: i64,
144    active_window: Option<i64>,
145    found_runs: usize,
146    file_num: usize,
147    max_output_file_size: Option<u64>,
148    filter_deleted: bool,
149    inputs: &[FileGroup],
150) {
151    let input_file_str: Vec<String> = inputs
152        .iter()
153        .map(|f| {
154            let range = f.range();
155            let start = range.0.to_iso8601_string();
156            let end = range.1.to_iso8601_string();
157            let num_rows = f.num_rows();
158            format!(
159                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
160                f.file_ids(),
161                start,
162                end,
163                ReadableSize(f.size() as u64),
164                num_rows
165            )
166        })
167        .collect();
168    let window_str = Timestamp::new_second(window).to_iso8601_string();
169    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
170    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
171    info!(
172        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
173            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
174            input files: {:?}",
175        region_id,
176        window_str,
177        active_window_str,
178        found_runs,
179        file_num,
180        max_output_file_size,
181        filter_deleted,
182        input_file_str
183    );
184}
185
186impl Picker for TwcsPicker {
187    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
188        let region_id = compaction_region.region_id;
189        let levels = compaction_region.current_version.ssts.levels();
190
191        let expired_ssts =
192            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
193        if !expired_ssts.is_empty() {
194            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
195            // here we mark expired SSTs as compacting to avoid them being picked.
196            expired_ssts.iter().for_each(|f| f.set_compacting(true));
197        }
198
199        let compaction_time_window = compaction_region
200            .current_version
201            .compaction_time_window
202            .map(|window| window.as_secs() as i64);
203        let time_window_size = compaction_time_window
204            .or(self.time_window_seconds)
205            .unwrap_or_else(|| {
206                let inferred = infer_time_bucket(levels[0].files());
207                info!(
208                    "Compaction window for region {} is not present, inferring from files: {:?}",
209                    region_id, inferred
210                );
211                inferred
212            });
213
214        // Find active window from files in level 0.
215        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
216        // Assign files to windows
217        let mut windows =
218            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
219        let outputs = self.build_output(region_id, &mut windows, active_window);
220
221        if outputs.is_empty() && expired_ssts.is_empty() {
222            return None;
223        }
224
225        let max_file_size = self.max_output_file_size.map(|v| v as usize);
226        Some(PickerOutput {
227            outputs,
228            expired_ssts,
229            time_window_size,
230            max_file_size,
231        })
232    }
233}
234
235struct Window {
236    start: Timestamp,
237    end: Timestamp,
238    // Mapping from file sequence to file groups. Files with the same sequence is considered
239    // created from the same compaction task.
240    files: HashMap<Option<NonZeroU64>, FileGroup>,
241    time_window: i64,
242    overlapping: bool,
243}
244
245impl Window {
246    /// Creates a new [Window] with given file.
247    fn new_with_file(file: FileHandle) -> Self {
248        let (start, end) = file.time_range();
249        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
250        Self {
251            start,
252            end,
253            files,
254            time_window: 0,
255            overlapping: false,
256        }
257    }
258
259    /// Returns the time range of all files in current window (inclusive).
260    fn range(&self) -> (Timestamp, Timestamp) {
261        (self.start, self.end)
262    }
263
264    /// Adds a new file to window and updates time range.
265    fn add_file(&mut self, file: FileHandle) {
266        let (start, end) = file.time_range();
267        self.start = self.start.min(start);
268        self.end = self.end.max(end);
269
270        match self.files.entry(file.meta_ref().sequence) {
271            Entry::Occupied(mut o) => {
272                o.get_mut().add_file(file);
273            }
274            Entry::Vacant(v) => {
275                v.insert(FileGroup::new_with_file(file));
276            }
277        }
278    }
279
280    fn files(&self) -> impl Iterator<Item = &FileGroup> {
281        self.files.values()
282    }
283}
284
285/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
286fn assign_to_windows<'a>(
287    files: impl Iterator<Item = &'a FileHandle>,
288    time_window_size: i64,
289) -> BTreeMap<i64, Window> {
290    let mut windows: HashMap<i64, Window> = HashMap::new();
291    // Iterates all files and assign to time windows according to max timestamp
292    for f in files {
293        if f.compacting() {
294            continue;
295        }
296        let (_, end) = f.time_range();
297        let time_window = end
298            .convert_to(TimeUnit::Second)
299            .unwrap()
300            .value()
301            .align_to_ceil_by_bucket(time_window_size)
302            .unwrap_or(i64::MIN);
303
304        match windows.entry(time_window) {
305            Entry::Occupied(mut e) => {
306                e.get_mut().add_file(f.clone());
307            }
308            Entry::Vacant(e) => {
309                let mut window = Window::new_with_file(f.clone());
310                window.time_window = time_window;
311                e.insert(window);
312            }
313        }
314    }
315    if windows.is_empty() {
316        return BTreeMap::new();
317    }
318
319    let mut windows = windows.into_values().collect::<Vec<_>>();
320    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
321
322    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
323
324    for idx in 1..windows.len() {
325        let next_range = windows[idx].range();
326        if overlaps(&current_range, &next_range) {
327            windows[idx - 1].overlapping = true;
328            windows[idx].overlapping = true;
329        }
330        current_range = (
331            current_range.0.min(next_range.0),
332            current_range.1.max(next_range.1),
333        );
334    }
335
336    windows.into_iter().map(|w| (w.time_window, w)).collect()
337}
338
339/// Finds the latest active writing window among all files.
340/// Returns `None` when there are no files or all files are corrupted.
341fn find_latest_window_in_seconds<'a>(
342    files: impl Iterator<Item = &'a FileHandle>,
343    time_window_size: i64,
344) -> Option<i64> {
345    let mut latest_timestamp = None;
346    for f in files {
347        let (_, end) = f.time_range();
348        if let Some(latest) = latest_timestamp {
349            if end > latest {
350                latest_timestamp = Some(end);
351            }
352        } else {
353            latest_timestamp = Some(end);
354        }
355    }
356    latest_timestamp
357        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
358        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
359}
360
361#[cfg(test)]
362mod tests {
363    use std::collections::HashSet;
364
365    use store_api::storage::FileId;
366
367    use super::*;
368    use crate::compaction::test_util::{
369        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
370    };
371    use crate::sst::file::Level;
372
373    #[test]
374    fn test_get_latest_window_in_seconds() {
375        assert_eq!(
376            Some(1),
377            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
378        );
379        assert_eq!(
380            Some(1),
381            find_latest_window_in_seconds(
382                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
383                1
384            )
385        );
386
387        assert_eq!(
388            Some(-9223372036854000),
389            find_latest_window_in_seconds(
390                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
391                3600,
392            )
393        );
394
395        assert_eq!(
396            (i64::MAX / 10000000 + 1) * 10000,
397            find_latest_window_in_seconds(
398                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
399                10000,
400            )
401            .unwrap()
402        );
403
404        assert_eq!(
405            Some((i64::MAX / 3600000 + 1) * 3600),
406            find_latest_window_in_seconds(
407                [
408                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
409                    new_file_handle(FileId::random(), 0, 1000, 0)
410                ]
411                .iter(),
412                3600
413            )
414        );
415    }
416
417    #[test]
418    fn test_assign_to_windows() {
419        let windows = assign_to_windows(
420            [
421                new_file_handle(FileId::random(), 0, 999, 0),
422                new_file_handle(FileId::random(), 0, 999, 0),
423                new_file_handle(FileId::random(), 0, 999, 0),
424                new_file_handle(FileId::random(), 0, 999, 0),
425                new_file_handle(FileId::random(), 0, 999, 0),
426            ]
427            .iter(),
428            3,
429        );
430        let fgs = &windows.get(&0).unwrap().files;
431        assert_eq!(1, fgs.len());
432        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
433
434        let files = [FileId::random(); 3];
435        let windows = assign_to_windows(
436            [
437                new_file_handle(files[0], -2000, -3, 0),
438                new_file_handle(files[1], 0, 2999, 0),
439                new_file_handle(files[2], 50, 10001, 0),
440            ]
441            .iter(),
442            3,
443        );
444        assert_eq!(
445            files[0],
446            windows.get(&0).unwrap().files().next().unwrap().files()[0]
447                .file_id()
448                .file_id()
449        );
450        assert_eq!(
451            files[1],
452            windows.get(&3).unwrap().files().next().unwrap().files()[0]
453                .file_id()
454                .file_id()
455        );
456        assert_eq!(
457            files[2],
458            windows.get(&12).unwrap().files().next().unwrap().files()[0]
459                .file_id()
460                .file_id()
461        );
462    }
463
464    #[test]
465    fn test_assign_file_groups_to_windows() {
466        let files = [
467            FileId::random(),
468            FileId::random(),
469            FileId::random(),
470            FileId::random(),
471        ];
472        let windows = assign_to_windows(
473            [
474                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
475                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
476                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
477                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
478            ]
479            .iter(),
480            3,
481        );
482        assert_eq!(windows.len(), 1);
483        let fgs = &windows.get(&0).unwrap().files;
484        assert_eq!(2, fgs.len());
485        assert_eq!(
486            fgs.get(&NonZeroU64::new(1))
487                .unwrap()
488                .files()
489                .iter()
490                .map(|f| f.file_id().file_id())
491                .collect::<HashSet<_>>(),
492            [files[0], files[1]].into_iter().collect()
493        );
494        assert_eq!(
495            fgs.get(&NonZeroU64::new(2))
496                .unwrap()
497                .files()
498                .iter()
499                .map(|f| f.file_id().file_id())
500                .collect::<HashSet<_>>(),
501            [files[2], files[3]].into_iter().collect()
502        );
503    }
504
505    #[test]
506    fn test_assign_compacting_to_windows() {
507        let files = [
508            new_file_handle(FileId::random(), 0, 999, 0),
509            new_file_handle(FileId::random(), 0, 999, 0),
510            new_file_handle(FileId::random(), 0, 999, 0),
511            new_file_handle(FileId::random(), 0, 999, 0),
512            new_file_handle(FileId::random(), 0, 999, 0),
513        ];
514        files[0].set_compacting(true);
515        files[2].set_compacting(true);
516        let mut windows = assign_to_windows(files.iter(), 3);
517        let window0 = windows.remove(&0).unwrap();
518        assert_eq!(1, window0.files.len());
519        let candidates = window0
520            .files
521            .into_values()
522            .flat_map(|fg| fg.into_files())
523            .map(|f| f.file_id().file_id())
524            .collect::<HashSet<_>>();
525        assert_eq!(candidates.len(), 3);
526        assert_eq!(
527            candidates,
528            [
529                files[1].file_id().file_id(),
530                files[3].file_id().file_id(),
531                files[4].file_id().file_id()
532            ]
533            .into_iter()
534            .collect::<HashSet<_>>()
535        );
536    }
537
538    /// (Window value, overlapping, files' time ranges in window)
539    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
540
541    fn check_assign_to_windows_with_overlapping(
542        file_time_ranges: &[(i64, i64)],
543        time_window: i64,
544        expected_files: &[ExpectedWindowSpec],
545    ) {
546        let files: Vec<_> = (0..file_time_ranges.len())
547            .map(|_| FileId::random())
548            .collect();
549
550        let file_handles = files
551            .iter()
552            .zip(file_time_ranges.iter())
553            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
554            .collect::<Vec<_>>();
555
556        let windows = assign_to_windows(file_handles.iter(), time_window);
557
558        for (expected_window, overlapping, window_files) in expected_files {
559            let actual_window = windows.get(expected_window).unwrap();
560            assert_eq!(*overlapping, actual_window.overlapping);
561            let mut file_ranges = actual_window
562                .files
563                .iter()
564                .flat_map(|(_, f)| {
565                    f.files().iter().map(|f| {
566                        let (s, e) = f.time_range();
567                        (s.value(), e.value())
568                    })
569                })
570                .collect::<Vec<_>>();
571            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
572            assert_eq!(window_files, &file_ranges);
573        }
574    }
575
576    #[test]
577    fn test_assign_to_windows_with_overlapping() {
578        check_assign_to_windows_with_overlapping(
579            &[(0, 999), (1000, 1999), (2000, 2999)],
580            2,
581            &[
582                (0, false, vec![(0, 999)]),
583                (2, false, vec![(1000, 1999), (2000, 2999)]),
584            ],
585        );
586
587        check_assign_to_windows_with_overlapping(
588            &[(0, 1), (0, 999), (100, 2999)],
589            2,
590            &[
591                (0, true, vec![(0, 1), (0, 999)]),
592                (2, true, vec![(100, 2999)]),
593            ],
594        );
595
596        check_assign_to_windows_with_overlapping(
597            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
598            2,
599            &[
600                (0, false, vec![(0, 999)]),
601                (2, false, vec![(1000, 1999), (2000, 2999)]),
602                (4, false, vec![(3000, 3999)]),
603            ],
604        );
605
606        check_assign_to_windows_with_overlapping(
607            &[
608                (0, 999),
609                (1000, 1999),
610                (2000, 2999),
611                (3000, 3999),
612                (0, 3999),
613            ],
614            2,
615            &[
616                (0, true, vec![(0, 999)]),
617                (2, true, vec![(1000, 1999), (2000, 2999)]),
618                (4, true, vec![(0, 3999), (3000, 3999)]),
619            ],
620        );
621
622        check_assign_to_windows_with_overlapping(
623            &[
624                (0, 999),
625                (1000, 1999),
626                (2000, 2999),
627                (3000, 3999),
628                (1999, 3999),
629            ],
630            2,
631            &[
632                (0, false, vec![(0, 999)]),
633                (2, true, vec![(1000, 1999), (2000, 2999)]),
634                (4, true, vec![(1999, 3999), (3000, 3999)]),
635            ],
636        );
637
638        check_assign_to_windows_with_overlapping(
639            &[
640                (0, 999),     // window 0
641                (1000, 1999), // window 2
642                (2000, 2999), // window 2
643                (3000, 3999), // window 4
644                (2999, 3999), // window 4
645            ],
646            2,
647            &[
648                // window 2 overlaps with window 4
649                (0, false, vec![(0, 999)]),
650                (2, true, vec![(1000, 1999), (2000, 2999)]),
651                (4, true, vec![(2999, 3999), (3000, 3999)]),
652            ],
653        );
654
655        check_assign_to_windows_with_overlapping(
656            &[
657                (0, 999),     // window 0
658                (1000, 1999), // window 2
659                (2000, 2999), // window 2
660                (3000, 3999), // window 4
661                (0, 1000),    // // window 2
662            ],
663            2,
664            &[
665                // only window 0 overlaps with window 2.
666                (0, true, vec![(0, 999)]),
667                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
668                (4, false, vec![(3000, 3999)]),
669            ],
670        );
671    }
672
673    struct CompactionPickerTestCase {
674        window_size: i64,
675        input_files: Vec<FileHandle>,
676        expected_outputs: Vec<ExpectedOutput>,
677    }
678
679    impl CompactionPickerTestCase {
680        fn check(&self) {
681            let file_id_to_idx = self
682                .input_files
683                .iter()
684                .enumerate()
685                .map(|(idx, file)| (file.file_id(), idx))
686                .collect::<HashMap<_, _>>();
687            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
688            let active_window =
689                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
690            let output = TwcsPicker {
691                trigger_file_num: 4,
692                time_window_seconds: None,
693                max_output_file_size: None,
694                append_mode: false,
695                max_background_tasks: None,
696            }
697            .build_output(RegionId::from_u64(0), &mut windows, active_window);
698
699            let output = output
700                .iter()
701                .map(|o| {
702                    let input_file_ids = o
703                        .inputs
704                        .iter()
705                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
706                        .collect::<HashSet<_>>();
707                    (input_file_ids, o.output_level)
708                })
709                .collect::<Vec<_>>();
710
711            let expected = self
712                .expected_outputs
713                .iter()
714                .map(|o| {
715                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
716                    (input_file_ids, o.output_level)
717                })
718                .collect::<Vec<_>>();
719            assert_eq!(expected, output);
720        }
721    }
722
723    struct ExpectedOutput {
724        input_files: Vec<usize>,
725        output_level: Level,
726    }
727
728    #[test]
729    fn test_build_twcs_output() {
730        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
731
732        // Case 1: 2 runs found in each time window.
733        CompactionPickerTestCase {
734            window_size: 3,
735            input_files: [
736                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
737                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
738                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
739                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
740            ]
741            .to_vec(),
742            expected_outputs: vec![
743                ExpectedOutput {
744                    input_files: vec![0, 1],
745                    output_level: 1,
746                },
747                ExpectedOutput {
748                    input_files: vec![2, 3],
749                    output_level: 1,
750                },
751            ],
752        }
753        .check();
754
755        // Case 2:
756        //    -2000........-3
757        // -3000.....-100
758        //                    0..............2999
759        //                      50..........2998
760        //                     11.........2990
761        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
762        CompactionPickerTestCase {
763            window_size: 3,
764            input_files: [
765                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
766                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
767                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
768                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
769                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
770            ]
771            .to_vec(),
772            expected_outputs: vec![
773                ExpectedOutput {
774                    input_files: vec![0, 1],
775                    output_level: 1,
776                },
777                ExpectedOutput {
778                    input_files: vec![2, 4],
779                    output_level: 1,
780                },
781            ],
782        }
783        .check();
784
785        // Case 3:
786        // A compaction may split output into several files that have overlapping time ranges and same sequence,
787        // we should treat these files as one FileGroup.
788        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
789        CompactionPickerTestCase {
790            window_size: 3,
791            input_files: [
792                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
793                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
794                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
795                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
796                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
797            ]
798            .to_vec(),
799            expected_outputs: vec![ExpectedOutput {
800                input_files: vec![0, 1, 4],
801                output_level: 1,
802            }],
803        }
804        .check();
805    }
806
807    #[test]
808    fn test_append_mode_filter_large_files() {
809        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
810        let max_output_file_size = 1000u64;
811
812        // Create files with different sizes
813        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
814        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
815        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
816        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
817
818        // Create file groups (each file is in its own group due to different sequences)
819        let mut files_to_merge = vec![
820            FileGroup::new_with_file(small_file_1),
821            FileGroup::new_with_file(large_file_1),
822            FileGroup::new_with_file(small_file_2),
823            FileGroup::new_with_file(large_file_2),
824        ];
825
826        // Test filtering logic directly
827        let original_count = files_to_merge.len();
828
829        // Apply append mode filtering
830        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
831
832        // Should have filtered out 2 large files, leaving 2 small files
833        assert_eq!(files_to_merge.len(), 2);
834        assert_eq!(original_count, 4);
835
836        // Verify the remaining files are the small ones
837        for fg in &files_to_merge {
838            assert!(
839                fg.size() <= max_output_file_size as usize,
840                "File size {} should be <= {}",
841                fg.size(),
842                max_output_file_size
843            );
844        }
845    }
846
847    #[test]
848    fn test_build_output_multiple_windows_with_zero_runs() {
849        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
850
851        let files = [
852            // Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
853            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
854            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
855            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
856            // Window 3: Contains files that will form 2 runs
857            new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
858            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
859            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
860        ];
861
862        let mut windows = assign_to_windows(files.iter(), 3);
863
864        // Create picker with trigger_file_num of 4 so single files won't form runs in first window
865        let picker = TwcsPicker {
866            trigger_file_num: 4, // High enough to prevent runs in first window
867            time_window_seconds: Some(3),
868            max_output_file_size: None,
869            append_mode: false,
870            max_background_tasks: None,
871        };
872
873        let active_window = find_latest_window_in_seconds(files.iter(), 3);
874        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
875
876        assert!(
877            !output.is_empty(),
878            "Should have output from windows with runs, even when one window has 0 runs"
879        );
880
881        let all_output_files: Vec<_> = output
882            .iter()
883            .flat_map(|o| o.inputs.iter())
884            .map(|f| f.file_id().file_id())
885            .collect();
886
887        assert!(
888            all_output_files.contains(&file_ids[3])
889                || all_output_files.contains(&file_ids[4])
890                || all_output_files.contains(&file_ids[5]),
891            "Output should contain files from the window with runs"
892        );
893    }
894
895    #[test]
896    fn test_build_output_single_window_zero_runs() {
897        let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
898
899        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
900        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
901
902        let files = [large_file_1, large_file_2];
903
904        let mut windows = assign_to_windows(files.iter(), 3);
905
906        let picker = TwcsPicker {
907            trigger_file_num: 2,
908            time_window_seconds: Some(3),
909            max_output_file_size: Some(1000),
910            append_mode: true,
911            max_background_tasks: None,
912        };
913
914        let active_window = find_latest_window_in_seconds(files.iter(), 3);
915        let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
916
917        // Should return empty output (no compaction needed)
918        assert!(
919            output.is_empty(),
920            "Should return empty output when no runs are found after filtering"
921        );
922    }
923
924    #[test]
925    fn test_max_background_tasks_truncation() {
926        let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
927        let max_background_tasks = 3;
928
929        // Create files across multiple windows that will generate multiple compaction outputs
930        let files = [
931            // Window 0: 4 files that will form a run
932            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
933            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
934            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
935            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
936            // Window 3: 4 files that will form another run
937            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
938            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
939            new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
940            new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
941            // Window 6: 4 files that will form another run
942            new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
943            new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
944        ];
945
946        let mut windows = assign_to_windows(files.iter(), 3);
947
948        let picker = TwcsPicker {
949            trigger_file_num: 4,
950            time_window_seconds: Some(3),
951            max_output_file_size: None,
952            append_mode: false,
953            max_background_tasks: Some(max_background_tasks),
954        };
955
956        let active_window = find_latest_window_in_seconds(files.iter(), 3);
957        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
958
959        // Should have at most max_background_tasks outputs
960        assert!(
961            output.len() <= max_background_tasks,
962            "Output should be truncated to max_background_tasks: expected <= {}, got {}",
963            max_background_tasks,
964            output.len()
965        );
966
967        // Without max_background_tasks, should have more outputs
968        let picker_no_limit = TwcsPicker {
969            trigger_file_num: 4,
970            time_window_seconds: Some(3),
971            max_output_file_size: None,
972            append_mode: false,
973            max_background_tasks: None,
974        };
975
976        let mut windows_no_limit = assign_to_windows(files.iter(), 3);
977        let output_no_limit = picker_no_limit.build_output(
978            RegionId::from_u64(123),
979            &mut windows_no_limit,
980            active_window,
981        );
982
983        // Without limit, should have more outputs (if there are enough windows)
984        if output_no_limit.len() > max_background_tasks {
985            assert!(
986                output_no_limit.len() > output.len(),
987                "Without limit should have more outputs than with limit"
988            );
989        }
990    }
991
992    #[test]
993    fn test_max_background_tasks_no_truncation_when_under_limit() {
994        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
995        let max_background_tasks = 10; // Larger than expected outputs
996
997        // Create files in one window that will generate one compaction output
998        let files = [
999            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1000            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1001            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1002            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1003        ];
1004
1005        let mut windows = assign_to_windows(files.iter(), 3);
1006
1007        let picker = TwcsPicker {
1008            trigger_file_num: 4,
1009            time_window_seconds: Some(3),
1010            max_output_file_size: None,
1011            append_mode: false,
1012            max_background_tasks: Some(max_background_tasks),
1013        };
1014
1015        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1016        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1017
1018        // Should have all outputs since we're under the limit
1019        assert!(
1020            output.len() <= max_background_tasks,
1021            "Output should be within limit"
1022        );
1023        // Should have at least one output
1024        assert!(!output.is_empty(), "Should have at least one output");
1025    }
1026
1027    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
1028}