mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39/// Default value for max compaction input file num.
40const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
41
42/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
43/// candidates.
44#[derive(Debug)]
45pub struct TwcsPicker {
46    /// Minimum file num to trigger a compaction.
47    pub trigger_file_num: usize,
48    /// Compaction time window in seconds.
49    pub time_window_seconds: Option<i64>,
50    /// Max allowed compaction output file size.
51    pub max_output_file_size: Option<u64>,
52    /// Whether the target region is in append mode.
53    pub append_mode: bool,
54    /// Max background compaction tasks.
55    pub max_background_tasks: Option<usize>,
56}
57
58impl TwcsPicker {
59    /// Builds compaction output from files.
60    fn build_output(
61        &self,
62        region_id: RegionId,
63        time_windows: &mut BTreeMap<i64, Window>,
64        active_window: Option<i64>,
65    ) -> Vec<CompactionOutput> {
66        let mut output = vec![];
67        for (window, files) in time_windows {
68            if files.files.is_empty() {
69                continue;
70            }
71            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
72
73            // Filter out large files in append mode - they won't benefit from compaction
74            if self.append_mode
75                && let Some(max_size) = self.max_output_file_size
76            {
77                let (kept_files, ignored_files) = files_to_merge
78                    .into_iter()
79                    .partition(|fg| fg.size() <= max_size as usize);
80                files_to_merge = kept_files;
81                info!(
82                    "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
83                    ignored_files.len(),
84                    region_id,
85                    window,
86                    max_size
87                );
88            }
89
90            let sorted_runs = find_sorted_runs(&mut files_to_merge);
91            let found_runs = sorted_runs.len();
92            // We only remove deletion markers if we found less than 2 runs and not in append mode.
93            // because after compaction there will be no overlapping files.
94            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
95            if found_runs == 0 {
96                continue;
97            }
98
99            let mut inputs = if found_runs > 1 {
100                reduce_runs(sorted_runs)
101            } else {
102                let run = sorted_runs.last().unwrap();
103                if run.items().len() < self.trigger_file_num {
104                    continue;
105                }
106                // no overlapping files, try merge small files
107                merge_seq_files(run.items(), self.max_output_file_size)
108            };
109
110            // Limits the number of input files.
111            let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
112            if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
113                // Sorts file groups by size first.
114                inputs.sort_unstable_by_key(|fg| fg.size());
115                let mut num_picked_files = 0;
116                inputs = inputs
117                    .into_iter()
118                    .take_while(|fg| {
119                        let current_group_file_num = fg.num_files();
120                        if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
121                            num_picked_files += current_group_file_num;
122                            true
123                        } else {
124                            false
125                        }
126                    })
127                    .collect::<Vec<_>>();
128                info!(
129                    "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
130                    region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
131                );
132            }
133
134            if inputs.len() > 1 {
135                // If we have more than one group to compact.
136                log_pick_result(
137                    region_id,
138                    *window,
139                    active_window,
140                    found_runs,
141                    files.files.len(),
142                    self.max_output_file_size,
143                    filter_deleted,
144                    &inputs,
145                );
146                output.push(CompactionOutput {
147                    output_level: LEVEL_COMPACTED, // always compact to l1
148                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
149                    filter_deleted,
150                    output_time_range: None, // we do not enforce output time range in twcs compactions.
151                });
152
153                if let Some(max_background_tasks) = self.max_background_tasks
154                    && output.len() >= max_background_tasks
155                {
156                    debug!(
157                        "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
158                        region_id, max_background_tasks
159                    );
160                    break;
161                }
162            }
163        }
164        output
165    }
166}
167
168#[allow(clippy::too_many_arguments)]
169fn log_pick_result(
170    region_id: RegionId,
171    window: i64,
172    active_window: Option<i64>,
173    found_runs: usize,
174    file_num: usize,
175    max_output_file_size: Option<u64>,
176    filter_deleted: bool,
177    inputs: &[FileGroup],
178) {
179    let input_file_str: Vec<String> = inputs
180        .iter()
181        .map(|f| {
182            let range = f.range();
183            let start = range.0.to_iso8601_string();
184            let end = range.1.to_iso8601_string();
185            let num_rows = f.num_rows();
186            format!(
187                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
188                f.file_ids(),
189                start,
190                end,
191                ReadableSize(f.size() as u64),
192                num_rows
193            )
194        })
195        .collect();
196    let window_str = Timestamp::new_second(window).to_iso8601_string();
197    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
198    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
199    info!(
200        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
201            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
202            input files: {:?}",
203        region_id,
204        window_str,
205        active_window_str,
206        found_runs,
207        file_num,
208        max_output_file_size,
209        filter_deleted,
210        input_file_str
211    );
212}
213
214impl Picker for TwcsPicker {
215    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
216        let region_id = compaction_region.region_id;
217        let levels = compaction_region.current_version.ssts.levels();
218
219        let expired_ssts =
220            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
221        if !expired_ssts.is_empty() {
222            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
223            // here we mark expired SSTs as compacting to avoid them being picked.
224            expired_ssts.iter().for_each(|f| f.set_compacting(true));
225        }
226
227        let compaction_time_window = compaction_region
228            .current_version
229            .compaction_time_window
230            .map(|window| window.as_secs() as i64);
231        let time_window_size = compaction_time_window
232            .or(self.time_window_seconds)
233            .unwrap_or_else(|| {
234                let inferred = infer_time_bucket(levels[0].files());
235                info!(
236                    "Compaction window for region {} is not present, inferring from files: {:?}",
237                    region_id, inferred
238                );
239                inferred
240            });
241
242        // Find active window from files in level 0.
243        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
244        // Assign files to windows
245        let mut windows =
246            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
247        let outputs = self.build_output(region_id, &mut windows, active_window);
248
249        if outputs.is_empty() && expired_ssts.is_empty() {
250            return None;
251        }
252
253        let max_file_size = self.max_output_file_size.map(|v| v as usize);
254        Some(PickerOutput {
255            outputs,
256            expired_ssts,
257            time_window_size,
258            max_file_size,
259        })
260    }
261}
262
263struct Window {
264    start: Timestamp,
265    end: Timestamp,
266    // Mapping from file sequence to file groups. Files with the same sequence is considered
267    // created from the same compaction task.
268    files: HashMap<Option<NonZeroU64>, FileGroup>,
269    time_window: i64,
270    overlapping: bool,
271}
272
273impl Window {
274    /// Creates a new [Window] with given file.
275    fn new_with_file(file: FileHandle) -> Self {
276        let (start, end) = file.time_range();
277        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
278        Self {
279            start,
280            end,
281            files,
282            time_window: 0,
283            overlapping: false,
284        }
285    }
286
287    /// Returns the time range of all files in current window (inclusive).
288    fn range(&self) -> (Timestamp, Timestamp) {
289        (self.start, self.end)
290    }
291
292    /// Adds a new file to window and updates time range.
293    fn add_file(&mut self, file: FileHandle) {
294        let (start, end) = file.time_range();
295        self.start = self.start.min(start);
296        self.end = self.end.max(end);
297
298        match self.files.entry(file.meta_ref().sequence) {
299            Entry::Occupied(mut o) => {
300                o.get_mut().add_file(file);
301            }
302            Entry::Vacant(v) => {
303                v.insert(FileGroup::new_with_file(file));
304            }
305        }
306    }
307
308    fn files(&self) -> impl Iterator<Item = &FileGroup> {
309        self.files.values()
310    }
311}
312
313/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
314fn assign_to_windows<'a>(
315    files: impl Iterator<Item = &'a FileHandle>,
316    time_window_size: i64,
317) -> BTreeMap<i64, Window> {
318    let mut windows: HashMap<i64, Window> = HashMap::new();
319    // Iterates all files and assign to time windows according to max timestamp
320    for f in files {
321        if f.compacting() {
322            continue;
323        }
324        let (_, end) = f.time_range();
325        let time_window = end
326            .convert_to(TimeUnit::Second)
327            .unwrap()
328            .value()
329            .align_to_ceil_by_bucket(time_window_size)
330            .unwrap_or(i64::MIN);
331
332        match windows.entry(time_window) {
333            Entry::Occupied(mut e) => {
334                e.get_mut().add_file(f.clone());
335            }
336            Entry::Vacant(e) => {
337                let mut window = Window::new_with_file(f.clone());
338                window.time_window = time_window;
339                e.insert(window);
340            }
341        }
342    }
343    if windows.is_empty() {
344        return BTreeMap::new();
345    }
346
347    let mut windows = windows.into_values().collect::<Vec<_>>();
348    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
349
350    let mut current_range: (Timestamp, Timestamp) = windows[0].range(); // windows cannot be empty.
351
352    for idx in 1..windows.len() {
353        let next_range = windows[idx].range();
354        if overlaps(&current_range, &next_range) {
355            windows[idx - 1].overlapping = true;
356            windows[idx].overlapping = true;
357        }
358        current_range = (
359            current_range.0.min(next_range.0),
360            current_range.1.max(next_range.1),
361        );
362    }
363
364    windows.into_iter().map(|w| (w.time_window, w)).collect()
365}
366
367/// Finds the latest active writing window among all files.
368/// Returns `None` when there are no files or all files are corrupted.
369fn find_latest_window_in_seconds<'a>(
370    files: impl Iterator<Item = &'a FileHandle>,
371    time_window_size: i64,
372) -> Option<i64> {
373    let mut latest_timestamp = None;
374    for f in files {
375        let (_, end) = f.time_range();
376        if let Some(latest) = latest_timestamp {
377            if end > latest {
378                latest_timestamp = Some(end);
379            }
380        } else {
381            latest_timestamp = Some(end);
382        }
383    }
384    latest_timestamp
385        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
386        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
387}
388
389#[cfg(test)]
390mod tests {
391    use std::collections::HashSet;
392
393    use store_api::storage::FileId;
394
395    use super::*;
396    use crate::compaction::test_util::{
397        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
398    };
399    use crate::sst::file::Level;
400
401    #[test]
402    fn test_get_latest_window_in_seconds() {
403        assert_eq!(
404            Some(1),
405            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
406        );
407        assert_eq!(
408            Some(1),
409            find_latest_window_in_seconds(
410                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
411                1
412            )
413        );
414
415        assert_eq!(
416            Some(-9223372036854000),
417            find_latest_window_in_seconds(
418                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
419                3600,
420            )
421        );
422
423        assert_eq!(
424            (i64::MAX / 10000000 + 1) * 10000,
425            find_latest_window_in_seconds(
426                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
427                10000,
428            )
429            .unwrap()
430        );
431
432        assert_eq!(
433            Some((i64::MAX / 3600000 + 1) * 3600),
434            find_latest_window_in_seconds(
435                [
436                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
437                    new_file_handle(FileId::random(), 0, 1000, 0)
438                ]
439                .iter(),
440                3600
441            )
442        );
443    }
444
445    #[test]
446    fn test_assign_to_windows() {
447        let windows = assign_to_windows(
448            [
449                new_file_handle(FileId::random(), 0, 999, 0),
450                new_file_handle(FileId::random(), 0, 999, 0),
451                new_file_handle(FileId::random(), 0, 999, 0),
452                new_file_handle(FileId::random(), 0, 999, 0),
453                new_file_handle(FileId::random(), 0, 999, 0),
454            ]
455            .iter(),
456            3,
457        );
458        let fgs = &windows.get(&0).unwrap().files;
459        assert_eq!(1, fgs.len());
460        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
461
462        let files = [FileId::random(); 3];
463        let windows = assign_to_windows(
464            [
465                new_file_handle(files[0], -2000, -3, 0),
466                new_file_handle(files[1], 0, 2999, 0),
467                new_file_handle(files[2], 50, 10001, 0),
468            ]
469            .iter(),
470            3,
471        );
472        assert_eq!(
473            files[0],
474            windows.get(&0).unwrap().files().next().unwrap().files()[0]
475                .file_id()
476                .file_id()
477        );
478        assert_eq!(
479            files[1],
480            windows.get(&3).unwrap().files().next().unwrap().files()[0]
481                .file_id()
482                .file_id()
483        );
484        assert_eq!(
485            files[2],
486            windows.get(&12).unwrap().files().next().unwrap().files()[0]
487                .file_id()
488                .file_id()
489        );
490    }
491
492    #[test]
493    fn test_assign_file_groups_to_windows() {
494        let files = [
495            FileId::random(),
496            FileId::random(),
497            FileId::random(),
498            FileId::random(),
499        ];
500        let windows = assign_to_windows(
501            [
502                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
503                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
504                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
505                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
506            ]
507            .iter(),
508            3,
509        );
510        assert_eq!(windows.len(), 1);
511        let fgs = &windows.get(&0).unwrap().files;
512        assert_eq!(2, fgs.len());
513        assert_eq!(
514            fgs.get(&NonZeroU64::new(1))
515                .unwrap()
516                .files()
517                .iter()
518                .map(|f| f.file_id().file_id())
519                .collect::<HashSet<_>>(),
520            [files[0], files[1]].into_iter().collect()
521        );
522        assert_eq!(
523            fgs.get(&NonZeroU64::new(2))
524                .unwrap()
525                .files()
526                .iter()
527                .map(|f| f.file_id().file_id())
528                .collect::<HashSet<_>>(),
529            [files[2], files[3]].into_iter().collect()
530        );
531    }
532
533    #[test]
534    fn test_assign_compacting_to_windows() {
535        let files = [
536            new_file_handle(FileId::random(), 0, 999, 0),
537            new_file_handle(FileId::random(), 0, 999, 0),
538            new_file_handle(FileId::random(), 0, 999, 0),
539            new_file_handle(FileId::random(), 0, 999, 0),
540            new_file_handle(FileId::random(), 0, 999, 0),
541        ];
542        files[0].set_compacting(true);
543        files[2].set_compacting(true);
544        let mut windows = assign_to_windows(files.iter(), 3);
545        let window0 = windows.remove(&0).unwrap();
546        assert_eq!(1, window0.files.len());
547        let candidates = window0
548            .files
549            .into_values()
550            .flat_map(|fg| fg.into_files())
551            .map(|f| f.file_id().file_id())
552            .collect::<HashSet<_>>();
553        assert_eq!(candidates.len(), 3);
554        assert_eq!(
555            candidates,
556            [
557                files[1].file_id().file_id(),
558                files[3].file_id().file_id(),
559                files[4].file_id().file_id()
560            ]
561            .into_iter()
562            .collect::<HashSet<_>>()
563        );
564    }
565
566    /// (Window value, overlapping, files' time ranges in window)
567    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
568
569    fn check_assign_to_windows_with_overlapping(
570        file_time_ranges: &[(i64, i64)],
571        time_window: i64,
572        expected_files: &[ExpectedWindowSpec],
573    ) {
574        let files: Vec<_> = (0..file_time_ranges.len())
575            .map(|_| FileId::random())
576            .collect();
577
578        let file_handles = files
579            .iter()
580            .zip(file_time_ranges.iter())
581            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
582            .collect::<Vec<_>>();
583
584        let windows = assign_to_windows(file_handles.iter(), time_window);
585
586        for (expected_window, overlapping, window_files) in expected_files {
587            let actual_window = windows.get(expected_window).unwrap();
588            assert_eq!(*overlapping, actual_window.overlapping);
589            let mut file_ranges = actual_window
590                .files
591                .iter()
592                .flat_map(|(_, f)| {
593                    f.files().iter().map(|f| {
594                        let (s, e) = f.time_range();
595                        (s.value(), e.value())
596                    })
597                })
598                .collect::<Vec<_>>();
599            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
600            assert_eq!(window_files, &file_ranges);
601        }
602    }
603
604    #[test]
605    fn test_assign_to_windows_with_overlapping() {
606        check_assign_to_windows_with_overlapping(
607            &[(0, 999), (1000, 1999), (2000, 2999)],
608            2,
609            &[
610                (0, false, vec![(0, 999)]),
611                (2, false, vec![(1000, 1999), (2000, 2999)]),
612            ],
613        );
614
615        check_assign_to_windows_with_overlapping(
616            &[(0, 1), (0, 999), (100, 2999)],
617            2,
618            &[
619                (0, true, vec![(0, 1), (0, 999)]),
620                (2, true, vec![(100, 2999)]),
621            ],
622        );
623
624        check_assign_to_windows_with_overlapping(
625            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
626            2,
627            &[
628                (0, false, vec![(0, 999)]),
629                (2, false, vec![(1000, 1999), (2000, 2999)]),
630                (4, false, vec![(3000, 3999)]),
631            ],
632        );
633
634        check_assign_to_windows_with_overlapping(
635            &[
636                (0, 999),
637                (1000, 1999),
638                (2000, 2999),
639                (3000, 3999),
640                (0, 3999),
641            ],
642            2,
643            &[
644                (0, true, vec![(0, 999)]),
645                (2, true, vec![(1000, 1999), (2000, 2999)]),
646                (4, true, vec![(0, 3999), (3000, 3999)]),
647            ],
648        );
649
650        check_assign_to_windows_with_overlapping(
651            &[
652                (0, 999),
653                (1000, 1999),
654                (2000, 2999),
655                (3000, 3999),
656                (1999, 3999),
657            ],
658            2,
659            &[
660                (0, false, vec![(0, 999)]),
661                (2, true, vec![(1000, 1999), (2000, 2999)]),
662                (4, true, vec![(1999, 3999), (3000, 3999)]),
663            ],
664        );
665
666        check_assign_to_windows_with_overlapping(
667            &[
668                (0, 999),     // window 0
669                (1000, 1999), // window 2
670                (2000, 2999), // window 2
671                (3000, 3999), // window 4
672                (2999, 3999), // window 4
673            ],
674            2,
675            &[
676                // window 2 overlaps with window 4
677                (0, false, vec![(0, 999)]),
678                (2, true, vec![(1000, 1999), (2000, 2999)]),
679                (4, true, vec![(2999, 3999), (3000, 3999)]),
680            ],
681        );
682
683        check_assign_to_windows_with_overlapping(
684            &[
685                (0, 999),     // window 0
686                (1000, 1999), // window 2
687                (2000, 2999), // window 2
688                (3000, 3999), // window 4
689                (0, 1000),    // // window 2
690            ],
691            2,
692            &[
693                // only window 0 overlaps with window 2.
694                (0, true, vec![(0, 999)]),
695                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
696                (4, false, vec![(3000, 3999)]),
697            ],
698        );
699    }
700
701    struct CompactionPickerTestCase {
702        window_size: i64,
703        input_files: Vec<FileHandle>,
704        expected_outputs: Vec<ExpectedOutput>,
705    }
706
707    impl CompactionPickerTestCase {
708        fn check(&self) {
709            let file_id_to_idx = self
710                .input_files
711                .iter()
712                .enumerate()
713                .map(|(idx, file)| (file.file_id(), idx))
714                .collect::<HashMap<_, _>>();
715            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
716            let active_window =
717                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
718            let output = TwcsPicker {
719                trigger_file_num: 4,
720                time_window_seconds: None,
721                max_output_file_size: None,
722                append_mode: false,
723                max_background_tasks: None,
724            }
725            .build_output(RegionId::from_u64(0), &mut windows, active_window);
726
727            let output = output
728                .iter()
729                .map(|o| {
730                    let input_file_ids = o
731                        .inputs
732                        .iter()
733                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
734                        .collect::<HashSet<_>>();
735                    (input_file_ids, o.output_level)
736                })
737                .collect::<Vec<_>>();
738
739            let expected = self
740                .expected_outputs
741                .iter()
742                .map(|o| {
743                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
744                    (input_file_ids, o.output_level)
745                })
746                .collect::<Vec<_>>();
747            assert_eq!(expected, output);
748        }
749    }
750
751    struct ExpectedOutput {
752        input_files: Vec<usize>,
753        output_level: Level,
754    }
755
756    #[test]
757    fn test_build_twcs_output() {
758        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
759
760        // Case 1: 2 runs found in each time window.
761        CompactionPickerTestCase {
762            window_size: 3,
763            input_files: [
764                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
765                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
766                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
767                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
768            ]
769            .to_vec(),
770            expected_outputs: vec![
771                ExpectedOutput {
772                    input_files: vec![0, 1],
773                    output_level: 1,
774                },
775                ExpectedOutput {
776                    input_files: vec![2, 3],
777                    output_level: 1,
778                },
779            ],
780        }
781        .check();
782
783        // Case 2:
784        //    -2000........-3
785        // -3000.....-100
786        //                    0..............2999
787        //                      50..........2998
788        //                     11.........2990
789        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
790        CompactionPickerTestCase {
791            window_size: 3,
792            input_files: [
793                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
794                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
795                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
796                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
797                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
798            ]
799            .to_vec(),
800            expected_outputs: vec![
801                ExpectedOutput {
802                    input_files: vec![0, 1],
803                    output_level: 1,
804                },
805                ExpectedOutput {
806                    input_files: vec![2, 4],
807                    output_level: 1,
808                },
809            ],
810        }
811        .check();
812
813        // Case 3:
814        // A compaction may split output into several files that have overlapping time ranges and same sequence,
815        // we should treat these files as one FileGroup.
816        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
817        CompactionPickerTestCase {
818            window_size: 3,
819            input_files: [
820                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
821                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
822                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
823                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
824                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
825            ]
826            .to_vec(),
827            expected_outputs: vec![ExpectedOutput {
828                input_files: vec![0, 1, 4],
829                output_level: 1,
830            }],
831        }
832        .check();
833    }
834
835    #[test]
836    fn test_append_mode_filter_large_files() {
837        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
838        let max_output_file_size = 1000u64;
839
840        // Create files with different sizes
841        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
842        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
843        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
844        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
845
846        // Create file groups (each file is in its own group due to different sequences)
847        let mut files_to_merge = vec![
848            FileGroup::new_with_file(small_file_1),
849            FileGroup::new_with_file(large_file_1),
850            FileGroup::new_with_file(small_file_2),
851            FileGroup::new_with_file(large_file_2),
852        ];
853
854        // Test filtering logic directly
855        let original_count = files_to_merge.len();
856
857        // Apply append mode filtering
858        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
859
860        // Should have filtered out 2 large files, leaving 2 small files
861        assert_eq!(files_to_merge.len(), 2);
862        assert_eq!(original_count, 4);
863
864        // Verify the remaining files are the small ones
865        for fg in &files_to_merge {
866            assert!(
867                fg.size() <= max_output_file_size as usize,
868                "File size {} should be <= {}",
869                fg.size(),
870                max_output_file_size
871            );
872        }
873    }
874
875    #[test]
876    fn test_build_output_multiple_windows_with_zero_runs() {
877        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
878
879        let files = [
880            // Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
881            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
882            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
883            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
884            // Window 3: Contains files that will form 2 runs
885            new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
886            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
887            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
888        ];
889
890        let mut windows = assign_to_windows(files.iter(), 3);
891
892        // Create picker with trigger_file_num of 4 so single files won't form runs in first window
893        let picker = TwcsPicker {
894            trigger_file_num: 4, // High enough to prevent runs in first window
895            time_window_seconds: Some(3),
896            max_output_file_size: None,
897            append_mode: false,
898            max_background_tasks: None,
899        };
900
901        let active_window = find_latest_window_in_seconds(files.iter(), 3);
902        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
903
904        assert!(
905            !output.is_empty(),
906            "Should have output from windows with runs, even when one window has 0 runs"
907        );
908
909        let all_output_files: Vec<_> = output
910            .iter()
911            .flat_map(|o| o.inputs.iter())
912            .map(|f| f.file_id().file_id())
913            .collect();
914
915        assert!(
916            all_output_files.contains(&file_ids[3])
917                || all_output_files.contains(&file_ids[4])
918                || all_output_files.contains(&file_ids[5]),
919            "Output should contain files from the window with runs"
920        );
921    }
922
923    #[test]
924    fn test_build_output_single_window_zero_runs() {
925        let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
926
927        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
928        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
929
930        let files = [large_file_1, large_file_2];
931
932        let mut windows = assign_to_windows(files.iter(), 3);
933
934        let picker = TwcsPicker {
935            trigger_file_num: 2,
936            time_window_seconds: Some(3),
937            max_output_file_size: Some(1000),
938            append_mode: true,
939            max_background_tasks: None,
940        };
941
942        let active_window = find_latest_window_in_seconds(files.iter(), 3);
943        let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
944
945        // Should return empty output (no compaction needed)
946        assert!(
947            output.is_empty(),
948            "Should return empty output when no runs are found after filtering"
949        );
950    }
951
952    #[test]
953    fn test_max_background_tasks_truncation() {
954        let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
955        let max_background_tasks = 3;
956
957        // Create files across multiple windows that will generate multiple compaction outputs
958        let files = [
959            // Window 0: 4 files that will form a run
960            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
961            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
962            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
963            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
964            // Window 3: 4 files that will form another run
965            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
966            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
967            new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
968            new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
969            // Window 6: 4 files that will form another run
970            new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
971            new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
972        ];
973
974        let mut windows = assign_to_windows(files.iter(), 3);
975
976        let picker = TwcsPicker {
977            trigger_file_num: 4,
978            time_window_seconds: Some(3),
979            max_output_file_size: None,
980            append_mode: false,
981            max_background_tasks: Some(max_background_tasks),
982        };
983
984        let active_window = find_latest_window_in_seconds(files.iter(), 3);
985        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
986
987        // Should have at most max_background_tasks outputs
988        assert!(
989            output.len() <= max_background_tasks,
990            "Output should be truncated to max_background_tasks: expected <= {}, got {}",
991            max_background_tasks,
992            output.len()
993        );
994
995        // Without max_background_tasks, should have more outputs
996        let picker_no_limit = TwcsPicker {
997            trigger_file_num: 4,
998            time_window_seconds: Some(3),
999            max_output_file_size: None,
1000            append_mode: false,
1001            max_background_tasks: None,
1002        };
1003
1004        let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1005        let output_no_limit = picker_no_limit.build_output(
1006            RegionId::from_u64(123),
1007            &mut windows_no_limit,
1008            active_window,
1009        );
1010
1011        // Without limit, should have more outputs (if there are enough windows)
1012        if output_no_limit.len() > max_background_tasks {
1013            assert!(
1014                output_no_limit.len() > output.len(),
1015                "Without limit should have more outputs than with limit"
1016            );
1017        }
1018    }
1019
1020    #[test]
1021    fn test_max_background_tasks_no_truncation_when_under_limit() {
1022        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1023        let max_background_tasks = 10; // Larger than expected outputs
1024
1025        // Create files in one window that will generate one compaction output
1026        let files = [
1027            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1028            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1029            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1030            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1031        ];
1032
1033        let mut windows = assign_to_windows(files.iter(), 3);
1034
1035        let picker = TwcsPicker {
1036            trigger_file_num: 4,
1037            time_window_seconds: Some(3),
1038            max_output_file_size: None,
1039            append_mode: false,
1040            max_background_tasks: Some(max_background_tasks),
1041        };
1042
1043        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1044        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1045
1046        // Should have all outputs since we're under the limit
1047        assert!(
1048            output.len() <= max_background_tasks,
1049            "Output should be within limit"
1050        );
1051        // Should have at least one output
1052        assert!(!output.is_empty(), "Should have at least one output");
1053    }
1054
1055    #[test]
1056    fn test_pick_multiple_runs() {
1057        common_telemetry::init_default_ut_logging();
1058
1059        let num_files = 8;
1060        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1061
1062        // Create files with different sequences so they form multiple runs
1063        let files: Vec<_> = file_ids
1064            .iter()
1065            .enumerate()
1066            .map(|(idx, file_id)| {
1067                new_file_handle_with_size_and_sequence(
1068                    *file_id,
1069                    0,
1070                    999,
1071                    0,
1072                    (idx + 1) as u64,
1073                    1024 * 1024,
1074                )
1075            })
1076            .collect();
1077
1078        let mut windows = assign_to_windows(files.iter(), 3);
1079
1080        let picker = TwcsPicker {
1081            trigger_file_num: 4,
1082            time_window_seconds: Some(3),
1083            max_output_file_size: None,
1084            append_mode: false,
1085            max_background_tasks: None,
1086        };
1087
1088        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1089        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1090
1091        assert_eq!(1, output.len());
1092        assert_eq!(output[0].inputs.len(), 2);
1093    }
1094
1095    #[test]
1096    fn test_limit_max_input_files() {
1097        common_telemetry::init_default_ut_logging();
1098
1099        let num_files = 50;
1100        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1101
1102        // Create files with different sequences so they form 2 runs
1103        let files: Vec<_> = file_ids
1104            .iter()
1105            .enumerate()
1106            .map(|(idx, file_id)| {
1107                new_file_handle_with_size_and_sequence(
1108                    *file_id,
1109                    (idx / 2 * 10) as i64,
1110                    (idx / 2 * 10 + 5) as i64,
1111                    0,
1112                    (idx + 1) as u64,
1113                    1024 * 1024,
1114                )
1115            })
1116            .collect();
1117
1118        let mut windows = assign_to_windows(files.iter(), 3);
1119
1120        let picker = TwcsPicker {
1121            trigger_file_num: 4,
1122            time_window_seconds: Some(3),
1123            max_output_file_size: None,
1124            append_mode: false,
1125            max_background_tasks: None,
1126        };
1127
1128        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1129        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1130
1131        assert_eq!(1, output.len());
1132        assert_eq!(output[0].inputs.len(), 32);
1133    }
1134
1135    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
1136}