Skip to main content

mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use rayon::prelude::*;
26use store_api::storage::RegionId;
27
28use crate::compaction::buckets::infer_time_bucket;
29use crate::compaction::compactor::CompactionRegion;
30use crate::compaction::picker::{Picker, PickerOutput};
31use crate::compaction::run::{
32    FileGroup, Item, Ranged, find_sorted_runs, find_sorted_runs_by_time_range,
33    merge_primary_key_ranges, merge_seq_files, primary_key_ranges_overlap, reduce_runs,
34};
35use crate::compaction::{CompactionOutput, get_expired_ssts};
36use crate::sst::file::{FileHandle, Level, overlaps};
37use crate::sst::version::LevelMeta;
38
39const LEVEL_COMPACTED: Level = 1;
40
41/// Default value for max compaction input file num.
42const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
43
44/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
45/// candidates.
46#[derive(Debug)]
47pub struct TwcsPicker {
48    /// Minimum file num to trigger a compaction.
49    pub trigger_file_num: usize,
50    /// Compaction time window in seconds.
51    pub time_window_seconds: Option<i64>,
52    /// Max allowed compaction output file size.
53    pub max_output_file_size: Option<u64>,
54    /// Whether the target region is in append mode.
55    pub append_mode: bool,
56    /// Max background compaction tasks.
57    pub max_background_tasks: Option<usize>,
58}
59
60impl TwcsPicker {
61    /// Builds compaction output from files.
62    fn build_output(
63        &self,
64        region_id: RegionId,
65        time_windows: &mut BTreeMap<i64, Window>,
66        active_window: Option<i64>,
67    ) -> Vec<CompactionOutput> {
68        let find_inputs = |files: &Window,
69                           windows: &BTreeMap<i64, Window>|
70         -> (Vec<FileGroup>, bool) {
71            let window = &files.time_window;
72            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
73
74            // Filter out large files in append mode - they won't benefit from compaction
75            if self.append_mode
76                && let Some(max_size) = self.max_output_file_size
77            {
78                let (kept_files, ignored_files) = files_to_merge
79                    .into_iter()
80                    .partition(|fg| fg.size() <= max_size as usize);
81                files_to_merge = kept_files;
82                info!(
83                    "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
84                    ignored_files.len(),
85                    region_id,
86                    window,
87                    max_size
88                );
89            }
90
91            let sorted_runs = if files_to_merge.len() < 1024 {
92                find_sorted_runs(&mut files_to_merge)
93            } else {
94                find_sorted_runs_by_time_range(&mut files_to_merge)
95            };
96            let found_runs = sorted_runs.len();
97            // We only remove deletion markers if we found less than 2 runs and not in append mode.
98            // because after compaction there will be no overlapping files.
99            let filter_deleted =
100                found_runs <= 2 && !self.append_mode && !window_has_overlap(files, windows);
101            if found_runs == 0 {
102                return (vec![], filter_deleted);
103            }
104
105            let mut inputs = if found_runs > 1 {
106                reduce_runs(sorted_runs)
107            } else {
108                let run = sorted_runs.last().unwrap();
109                if run.items().len() < self.trigger_file_num {
110                    return (vec![], filter_deleted);
111                }
112                // no overlapping files, try merge small files
113                merge_seq_files(run.items(), self.max_output_file_size)
114            };
115
116            // Limits the number of input files.
117            let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
118            if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
119                // Sorts file groups by size first.
120                inputs.sort_unstable_by_key(|fg| fg.size());
121                let mut num_picked_files = 0;
122                inputs = inputs
123                    .into_iter()
124                    .take_while(|fg| {
125                        let current_group_file_num = fg.num_files();
126                        if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
127                            num_picked_files += current_group_file_num;
128                            true
129                        } else {
130                            false
131                        }
132                    })
133                    .collect::<Vec<_>>();
134                info!(
135                    "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
136                    region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
137                );
138            }
139
140            if inputs.len() > 1 {
141                // If we have more than one group to compact.
142                log_pick_result(
143                    region_id,
144                    *window,
145                    active_window,
146                    found_runs,
147                    files.files.len(),
148                    self.max_output_file_size,
149                    filter_deleted,
150                    &inputs,
151                );
152            }
153            (inputs, filter_deleted)
154        };
155
156        let mut output = vec![];
157        let windows = time_windows
158            .values()
159            .filter(|w| !w.files.is_empty())
160            .collect::<Vec<_>>();
161        let chunk_size = self.max_background_tasks.unwrap_or(windows.len()).max(1);
162        'chunks: for chunk in windows.chunks(chunk_size) {
163            for (inputs, filter_deleted) in chunk
164                .par_iter() // parallelly calculate the inputs
165                .map(|window| find_inputs(window, time_windows))
166                .collect::<Vec<_>>()
167            {
168                if inputs.is_empty() {
169                    continue;
170                }
171
172                output.push(CompactionOutput {
173                    output_level: LEVEL_COMPACTED, // always compact to l1
174                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
175                    filter_deleted,
176                    output_time_range: None, // we do not enforce output time range in twcs compactions.
177                });
178
179                if let Some(max_background_tasks) = self.max_background_tasks
180                    && output.len() >= max_background_tasks
181                {
182                    debug!(
183                        "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
184                        region_id, max_background_tasks
185                    );
186                    break 'chunks;
187                }
188            }
189        }
190        output
191    }
192}
193
194#[allow(clippy::too_many_arguments)]
195fn log_pick_result(
196    region_id: RegionId,
197    window: i64,
198    active_window: Option<i64>,
199    found_runs: usize,
200    file_num: usize,
201    max_output_file_size: Option<u64>,
202    filter_deleted: bool,
203    inputs: &[FileGroup],
204) {
205    let input_file_str: Vec<String> = inputs
206        .iter()
207        .map(|f| {
208            let range = f.range();
209            let start = range.0.to_iso8601_string();
210            let end = range.1.to_iso8601_string();
211            let num_rows = f.num_rows();
212            format!(
213                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
214                f.file_ids(),
215                start,
216                end,
217                ReadableSize(f.size() as u64),
218                num_rows
219            )
220        })
221        .collect();
222    let window_str = Timestamp::new_second(window).to_iso8601_string();
223    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
224    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
225    info!(
226        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
227            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
228            input files: {:?}",
229        region_id,
230        window_str,
231        active_window_str,
232        found_runs,
233        file_num,
234        max_output_file_size,
235        filter_deleted,
236        input_file_str
237    );
238}
239
240impl Picker for TwcsPicker {
241    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
242        let region_id = compaction_region.region_id;
243        let levels = compaction_region.current_version.ssts.levels();
244
245        let expired_ssts =
246            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
247        if !expired_ssts.is_empty() {
248            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
249            // here we mark expired SSTs as compacting to avoid them being picked.
250            expired_ssts.iter().for_each(|f| f.set_compacting(true));
251        }
252
253        let compaction_time_window = compaction_region
254            .current_version
255            .compaction_time_window
256            .map(|window| window.as_secs() as i64);
257        let time_window_size = compaction_time_window
258            .or(self.time_window_seconds)
259            .unwrap_or_else(|| {
260                let inferred = infer_time_bucket(levels[0].files());
261                info!(
262                    "Compaction window for region {} is not present, inferring from files: {:?}",
263                    region_id, inferred
264                );
265                inferred
266            });
267
268        // Find active window from files in level 0.
269        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
270        // Assign files to windows
271        let mut windows =
272            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
273        let outputs = self.build_output(region_id, &mut windows, active_window);
274
275        if outputs.is_empty() && expired_ssts.is_empty() {
276            return None;
277        }
278
279        let max_file_size = self.max_output_file_size.map(|v| v as usize);
280        Some(PickerOutput {
281            outputs,
282            expired_ssts,
283            time_window_size,
284            max_file_size,
285        })
286    }
287}
288
289struct Window {
290    start: Timestamp,
291    end: Timestamp,
292    // Mapping from file sequence to file groups. Files with the same sequence is considered
293    // created from the same compaction task.
294    files: HashMap<Option<NonZeroU64>, FileGroup>,
295    time_window: i64,
296    primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
297}
298
299impl Window {
300    /// Creates a new [Window] with given file.
301    fn new_with_file(file: FileHandle) -> Self {
302        let (start, end) = file.time_range();
303        let primary_key_range = file.primary_key_range();
304        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
305        Self {
306            start,
307            end,
308            files,
309            time_window: 0,
310            primary_key_range,
311        }
312    }
313
314    /// Returns the time range of all files in current window (inclusive).
315    fn range(&self) -> (Timestamp, Timestamp) {
316        (self.start, self.end)
317    }
318
319    /// Adds a new file to window and updates time range.
320    fn add_file(&mut self, file: FileHandle) {
321        let (start, end) = file.time_range();
322        self.start = self.start.min(start);
323        self.end = self.end.max(end);
324        self.primary_key_range =
325            merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
326
327        match self.files.entry(file.meta_ref().sequence) {
328            Entry::Occupied(mut o) => {
329                o.get_mut().add_file(file);
330            }
331            Entry::Vacant(v) => {
332                v.insert(FileGroup::new_with_file(file));
333            }
334        }
335    }
336
337    fn files(&self) -> impl Iterator<Item = &FileGroup> {
338        self.files.values()
339    }
340}
341
342/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
343fn assign_to_windows<'a>(
344    files: impl Iterator<Item = &'a FileHandle>,
345    time_window_size: i64,
346) -> BTreeMap<i64, Window> {
347    let mut windows: HashMap<i64, Window> = HashMap::new();
348    // Iterates all files and assign to time windows according to max timestamp
349    for f in files {
350        if f.compacting() {
351            continue;
352        }
353        let (_, end) = f.time_range();
354        let time_window = end
355            .convert_to(TimeUnit::Second)
356            .unwrap()
357            .value()
358            .align_to_ceil_by_bucket(time_window_size)
359            .unwrap_or(i64::MIN);
360
361        match windows.entry(time_window) {
362            Entry::Occupied(mut e) => {
363                e.get_mut().add_file(f.clone());
364            }
365            Entry::Vacant(e) => {
366                let mut window = Window::new_with_file(f.clone());
367                window.time_window = time_window;
368                e.insert(window);
369            }
370        }
371    }
372    windows.into_iter().collect()
373}
374
375fn window_has_overlap(this: &Window, windows: &BTreeMap<i64, Window>) -> bool {
376    windows
377        .values()
378        .filter(|that| this.time_window != that.time_window)
379        .any(|that| {
380            overlaps(&this.range(), &that.range()) && {
381                match (&this.primary_key_range, &that.primary_key_range) {
382                    (Some(l), Some(r)) => primary_key_ranges_overlap(l, r),
383                    _ => true,
384                }
385            }
386        })
387}
388
389/// Finds the latest active writing window among all files.
390/// Returns `None` when there are no files or all files are corrupted.
391fn find_latest_window_in_seconds<'a>(
392    files: impl Iterator<Item = &'a FileHandle>,
393    time_window_size: i64,
394) -> Option<i64> {
395    let mut latest_timestamp = None;
396    for f in files {
397        let (_, end) = f.time_range();
398        if let Some(latest) = latest_timestamp {
399            if end > latest {
400                latest_timestamp = Some(end);
401            }
402        } else {
403            latest_timestamp = Some(end);
404        }
405    }
406    latest_timestamp
407        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
408        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
409}
410
411#[cfg(test)]
412mod tests {
413    use std::collections::HashSet;
414
415    use bytes::Bytes;
416    use store_api::storage::FileId;
417
418    use super::*;
419    use crate::compaction::test_util::{
420        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
421        new_file_handle_with_size_sequence_and_primary_key_range,
422    };
423    use crate::sst::file::Level;
424
425    #[test]
426    fn test_get_latest_window_in_seconds() {
427        assert_eq!(
428            Some(1),
429            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
430        );
431        assert_eq!(
432            Some(1),
433            find_latest_window_in_seconds(
434                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
435                1
436            )
437        );
438
439        assert_eq!(
440            Some(-9223372036854000),
441            find_latest_window_in_seconds(
442                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
443                3600,
444            )
445        );
446
447        assert_eq!(
448            (i64::MAX / 10000000 + 1) * 10000,
449            find_latest_window_in_seconds(
450                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
451                10000,
452            )
453            .unwrap()
454        );
455
456        assert_eq!(
457            Some((i64::MAX / 3600000 + 1) * 3600),
458            find_latest_window_in_seconds(
459                [
460                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
461                    new_file_handle(FileId::random(), 0, 1000, 0)
462                ]
463                .iter(),
464                3600
465            )
466        );
467    }
468
469    #[test]
470    fn test_assign_to_windows() {
471        let windows = assign_to_windows(
472            [
473                new_file_handle(FileId::random(), 0, 999, 0),
474                new_file_handle(FileId::random(), 0, 999, 0),
475                new_file_handle(FileId::random(), 0, 999, 0),
476                new_file_handle(FileId::random(), 0, 999, 0),
477                new_file_handle(FileId::random(), 0, 999, 0),
478            ]
479            .iter(),
480            3,
481        );
482        let fgs = &windows.get(&0).unwrap().files;
483        assert_eq!(1, fgs.len());
484        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
485
486        let files = [FileId::random(); 3];
487        let windows = assign_to_windows(
488            [
489                new_file_handle(files[0], -2000, -3, 0),
490                new_file_handle(files[1], 0, 2999, 0),
491                new_file_handle(files[2], 50, 10001, 0),
492            ]
493            .iter(),
494            3,
495        );
496        assert_eq!(
497            files[0],
498            windows.get(&0).unwrap().files().next().unwrap().files()[0]
499                .file_id()
500                .file_id()
501        );
502        assert_eq!(
503            files[1],
504            windows.get(&3).unwrap().files().next().unwrap().files()[0]
505                .file_id()
506                .file_id()
507        );
508        assert_eq!(
509            files[2],
510            windows.get(&12).unwrap().files().next().unwrap().files()[0]
511                .file_id()
512                .file_id()
513        );
514    }
515
516    #[test]
517    fn test_assign_file_groups_to_windows() {
518        let files = [
519            FileId::random(),
520            FileId::random(),
521            FileId::random(),
522            FileId::random(),
523        ];
524        let windows = assign_to_windows(
525            [
526                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
527                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
528                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
529                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
530            ]
531            .iter(),
532            3,
533        );
534        assert_eq!(windows.len(), 1);
535        let fgs = &windows.get(&0).unwrap().files;
536        assert_eq!(2, fgs.len());
537        assert_eq!(
538            fgs.get(&NonZeroU64::new(1))
539                .unwrap()
540                .files()
541                .iter()
542                .map(|f| f.file_id().file_id())
543                .collect::<HashSet<_>>(),
544            [files[0], files[1]].into_iter().collect()
545        );
546        assert_eq!(
547            fgs.get(&NonZeroU64::new(2))
548                .unwrap()
549                .files()
550                .iter()
551                .map(|f| f.file_id().file_id())
552                .collect::<HashSet<_>>(),
553            [files[2], files[3]].into_iter().collect()
554        );
555    }
556
557    #[test]
558    fn test_assign_compacting_to_windows() {
559        let files = [
560            new_file_handle(FileId::random(), 0, 999, 0),
561            new_file_handle(FileId::random(), 0, 999, 0),
562            new_file_handle(FileId::random(), 0, 999, 0),
563            new_file_handle(FileId::random(), 0, 999, 0),
564            new_file_handle(FileId::random(), 0, 999, 0),
565        ];
566        files[0].set_compacting(true);
567        files[2].set_compacting(true);
568        let mut windows = assign_to_windows(files.iter(), 3);
569        let window0 = windows.remove(&0).unwrap();
570        assert_eq!(1, window0.files.len());
571        let candidates = window0
572            .files
573            .into_values()
574            .flat_map(|fg| fg.into_files())
575            .map(|f| f.file_id().file_id())
576            .collect::<HashSet<_>>();
577        assert_eq!(candidates.len(), 3);
578        assert_eq!(
579            candidates,
580            [
581                files[1].file_id().file_id(),
582                files[3].file_id().file_id(),
583                files[4].file_id().file_id()
584            ]
585            .into_iter()
586            .collect::<HashSet<_>>()
587        );
588    }
589
590    /// (Window value, overlapping, files' time ranges in window)
591    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
592
593    fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
594        Some((Bytes::from_static(min), Bytes::from_static(max)))
595    }
596
597    fn check_assign_to_windows_with_overlapping(
598        file_time_ranges: &[(i64, i64)],
599        time_window: i64,
600        expected_files: &[ExpectedWindowSpec],
601    ) {
602        let files: Vec<_> = (0..file_time_ranges.len())
603            .map(|_| FileId::random())
604            .collect();
605
606        let file_handles = files
607            .iter()
608            .zip(file_time_ranges.iter())
609            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
610            .collect::<Vec<_>>();
611
612        let windows = assign_to_windows(file_handles.iter(), time_window);
613
614        for (expected_window, overlapping, window_files) in expected_files {
615            let actual_window = windows.get(expected_window).unwrap();
616            let actual_overlapping = window_has_overlap(actual_window, &windows);
617            assert_eq!(*overlapping, actual_overlapping);
618            let mut file_ranges = actual_window
619                .files
620                .values()
621                .flat_map(|f| {
622                    f.files().iter().map(|f| {
623                        let (s, e) = f.time_range();
624                        (s.value(), e.value())
625                    })
626                })
627                .collect::<Vec<_>>();
628            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
629            assert_eq!(window_files, &file_ranges);
630        }
631    }
632
633    #[test]
634    fn test_assign_to_windows_with_overlapping() {
635        check_assign_to_windows_with_overlapping(
636            &[(0, 999), (1000, 1999), (2000, 2999)],
637            2,
638            &[
639                (0, false, vec![(0, 999)]),
640                (2, false, vec![(1000, 1999), (2000, 2999)]),
641            ],
642        );
643
644        check_assign_to_windows_with_overlapping(
645            &[(0, 1), (0, 999), (100, 2999)],
646            2,
647            &[
648                (0, true, vec![(0, 1), (0, 999)]),
649                (2, true, vec![(100, 2999)]),
650            ],
651        );
652
653        check_assign_to_windows_with_overlapping(
654            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
655            2,
656            &[
657                (0, false, vec![(0, 999)]),
658                (2, false, vec![(1000, 1999), (2000, 2999)]),
659                (4, false, vec![(3000, 3999)]),
660            ],
661        );
662
663        check_assign_to_windows_with_overlapping(
664            &[
665                (0, 999),
666                (1000, 1999),
667                (2000, 2999),
668                (3000, 3999),
669                (0, 3999),
670            ],
671            2,
672            &[
673                (0, true, vec![(0, 999)]),
674                (2, true, vec![(1000, 1999), (2000, 2999)]),
675                (4, true, vec![(0, 3999), (3000, 3999)]),
676            ],
677        );
678
679        check_assign_to_windows_with_overlapping(
680            &[
681                (0, 999),
682                (1000, 1999),
683                (2000, 2999),
684                (3000, 3999),
685                (1999, 3999),
686            ],
687            2,
688            &[
689                (0, false, vec![(0, 999)]),
690                (2, true, vec![(1000, 1999), (2000, 2999)]),
691                (4, true, vec![(1999, 3999), (3000, 3999)]),
692            ],
693        );
694
695        check_assign_to_windows_with_overlapping(
696            &[
697                (0, 999),     // window 0
698                (1000, 1999), // window 2
699                (2000, 2999), // window 2
700                (3000, 3999), // window 4
701                (2999, 3999), // window 4
702            ],
703            2,
704            &[
705                // window 2 overlaps with window 4
706                (0, false, vec![(0, 999)]),
707                (2, true, vec![(1000, 1999), (2000, 2999)]),
708                (4, true, vec![(2999, 3999), (3000, 3999)]),
709            ],
710        );
711
712        check_assign_to_windows_with_overlapping(
713            &[
714                (0, 999),     // window 0
715                (1000, 1999), // window 2
716                (2000, 2999), // window 2
717                (3000, 3999), // window 4
718                (0, 1000),    // // window 2
719            ],
720            2,
721            &[
722                // only window 0 overlaps with window 2.
723                (0, true, vec![(0, 999)]),
724                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
725                (4, false, vec![(3000, 3999)]),
726            ],
727        );
728    }
729
730    #[test]
731    fn test_assign_to_windows_not_overlapping_when_pk_disjoint() {
732        let files = [
733            new_file_handle_with_size_sequence_and_primary_key_range(
734                FileId::random(),
735                0,
736                1000,
737                0,
738                1,
739                10,
740                pk_range(b"a", b"f"),
741            ),
742            new_file_handle_with_size_sequence_and_primary_key_range(
743                FileId::random(),
744                500,
745                1999,
746                0,
747                2,
748                10,
749                pk_range(b"x", b"z"),
750            ),
751        ];
752
753        let windows = assign_to_windows(files.iter(), 2);
754
755        let overlapping = window_has_overlap(windows.get(&2).unwrap(), &windows);
756        assert!(!overlapping);
757    }
758
759    #[test]
760    fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() {
761        let files = [
762            new_file_handle(FileId::random(), 0, 1999, 0),
763            new_file_handle_with_size_sequence_and_primary_key_range(
764                FileId::random(),
765                2000,
766                3999,
767                0,
768                1,
769                10,
770                pk_range(b"a", b"f"),
771            ),
772            new_file_handle_with_size_sequence_and_primary_key_range(
773                FileId::random(),
774                3000,
775                4999,
776                0,
777                2,
778                10,
779                pk_range(b"x", b"z"),
780            ),
781        ];
782
783        let windows = assign_to_windows(files.iter(), 2);
784
785        let overlapping = window_has_overlap(windows.get(&4).unwrap(), &windows);
786        assert!(!overlapping);
787    }
788
789    struct CompactionPickerTestCase {
790        window_size: i64,
791        input_files: Vec<FileHandle>,
792        expected_outputs: Vec<ExpectedOutput>,
793    }
794
795    impl CompactionPickerTestCase {
796        fn check(&self) {
797            let file_id_to_idx = self
798                .input_files
799                .iter()
800                .enumerate()
801                .map(|(idx, file)| (file.file_id(), idx))
802                .collect::<HashMap<_, _>>();
803            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
804            let active_window =
805                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
806            let output = TwcsPicker {
807                trigger_file_num: 4,
808                time_window_seconds: None,
809                max_output_file_size: None,
810                append_mode: false,
811                max_background_tasks: None,
812            }
813            .build_output(RegionId::from_u64(0), &mut windows, active_window);
814
815            let output = output
816                .iter()
817                .map(|o| {
818                    let input_file_ids = o
819                        .inputs
820                        .iter()
821                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
822                        .collect::<HashSet<_>>();
823                    (input_file_ids, o.output_level)
824                })
825                .collect::<Vec<_>>();
826
827            let expected = self
828                .expected_outputs
829                .iter()
830                .map(|o| {
831                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
832                    (input_file_ids, o.output_level)
833                })
834                .collect::<Vec<_>>();
835            assert_eq!(expected, output);
836        }
837    }
838
839    struct ExpectedOutput {
840        input_files: Vec<usize>,
841        output_level: Level,
842    }
843
844    #[test]
845    fn test_build_twcs_output() {
846        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
847
848        // Case 1: 2 runs found in each time window.
849        CompactionPickerTestCase {
850            window_size: 3,
851            input_files: [
852                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
853                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
854                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
855                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
856            ]
857            .to_vec(),
858            expected_outputs: vec![
859                ExpectedOutput {
860                    input_files: vec![0, 1],
861                    output_level: 1,
862                },
863                ExpectedOutput {
864                    input_files: vec![2, 3],
865                    output_level: 1,
866                },
867            ],
868        }
869        .check();
870
871        // Case 2:
872        //    -2000........-3
873        // -3000.....-100
874        //                    0..............2999
875        //                      50..........2998
876        //                     11.........2990
877        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
878        CompactionPickerTestCase {
879            window_size: 3,
880            input_files: [
881                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
882                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
883                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
884                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
885                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
886            ]
887            .to_vec(),
888            expected_outputs: vec![
889                ExpectedOutput {
890                    input_files: vec![0, 1],
891                    output_level: 1,
892                },
893                ExpectedOutput {
894                    input_files: vec![2, 4],
895                    output_level: 1,
896                },
897            ],
898        }
899        .check();
900
901        // Case 3:
902        // A compaction may split output into several files that have overlapping time ranges and same sequence,
903        // we should treat these files as one FileGroup.
904        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
905        CompactionPickerTestCase {
906            window_size: 3,
907            input_files: [
908                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
909                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
910                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
911                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
912                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
913            ]
914            .to_vec(),
915            expected_outputs: vec![ExpectedOutput {
916                input_files: vec![0, 1, 4],
917                output_level: 1,
918            }],
919        }
920        .check();
921    }
922
923    #[test]
924    fn test_build_output_skips_pk_disjoint_files() {
925        let files = [
926            new_file_handle_with_size_sequence_and_primary_key_range(
927                FileId::random(),
928                0,
929                2999,
930                0,
931                1,
932                10,
933                pk_range(b"a", b"f"),
934            ),
935            new_file_handle_with_size_sequence_and_primary_key_range(
936                FileId::random(),
937                50,
938                2998,
939                0,
940                2,
941                10,
942                pk_range(b"x", b"z"),
943            ),
944        ];
945        let mut windows = assign_to_windows(files.iter(), 3);
946        let active_window = find_latest_window_in_seconds(files.iter(), 3);
947        let output = TwcsPicker {
948            trigger_file_num: 4,
949            time_window_seconds: None,
950            max_output_file_size: None,
951            append_mode: false,
952            max_background_tasks: None,
953        }
954        .build_output(RegionId::from_u64(0), &mut windows, active_window);
955
956        assert!(output.is_empty());
957    }
958
959    #[test]
960    fn test_append_mode_filter_large_files() {
961        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
962        let max_output_file_size = 1000u64;
963
964        // Create files with different sizes
965        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
966        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
967        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
968        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
969
970        // Create file groups (each file is in its own group due to different sequences)
971        let mut files_to_merge = vec![
972            FileGroup::new_with_file(small_file_1),
973            FileGroup::new_with_file(large_file_1),
974            FileGroup::new_with_file(small_file_2),
975            FileGroup::new_with_file(large_file_2),
976        ];
977
978        // Test filtering logic directly
979        let original_count = files_to_merge.len();
980
981        // Apply append mode filtering
982        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
983
984        // Should have filtered out 2 large files, leaving 2 small files
985        assert_eq!(files_to_merge.len(), 2);
986        assert_eq!(original_count, 4);
987
988        // Verify the remaining files are the small ones
989        for fg in &files_to_merge {
990            assert!(
991                fg.size() <= max_output_file_size as usize,
992                "File size {} should be <= {}",
993                fg.size(),
994                max_output_file_size
995            );
996        }
997    }
998
999    #[test]
1000    fn test_build_output_multiple_windows_with_zero_runs() {
1001        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
1002
1003        let files = [
1004            // Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
1005            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1006            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1007            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1008            // Window 3: Contains files that will form 2 runs
1009            new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
1010            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1011            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1012        ];
1013
1014        let mut windows = assign_to_windows(files.iter(), 3);
1015
1016        // Create picker with trigger_file_num of 4 so single files won't form runs in first window
1017        let picker = TwcsPicker {
1018            trigger_file_num: 4, // High enough to prevent runs in first window
1019            time_window_seconds: Some(3),
1020            max_output_file_size: None,
1021            append_mode: false,
1022            max_background_tasks: None,
1023        };
1024
1025        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1026        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1027
1028        assert!(
1029            !output.is_empty(),
1030            "Should have output from windows with runs, even when one window has 0 runs"
1031        );
1032
1033        let all_output_files: Vec<_> = output
1034            .iter()
1035            .flat_map(|o| o.inputs.iter())
1036            .map(|f| f.file_id().file_id())
1037            .collect();
1038
1039        assert!(
1040            all_output_files.contains(&file_ids[3])
1041                || all_output_files.contains(&file_ids[4])
1042                || all_output_files.contains(&file_ids[5]),
1043            "Output should contain files from the window with runs"
1044        );
1045    }
1046
1047    #[test]
1048    fn test_build_output_single_window_zero_runs() {
1049        let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
1050
1051        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
1052        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
1053
1054        let files = [large_file_1, large_file_2];
1055
1056        let mut windows = assign_to_windows(files.iter(), 3);
1057
1058        let picker = TwcsPicker {
1059            trigger_file_num: 2,
1060            time_window_seconds: Some(3),
1061            max_output_file_size: Some(1000),
1062            append_mode: true,
1063            max_background_tasks: None,
1064        };
1065
1066        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1067        let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
1068
1069        // Should return empty output (no compaction needed)
1070        assert!(
1071            output.is_empty(),
1072            "Should return empty output when no runs are found after filtering"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_max_background_tasks_truncation() {
1078        let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
1079        let max_background_tasks = 3;
1080
1081        // Create files across multiple windows that will generate multiple compaction outputs
1082        let files = [
1083            // Window 0: 4 files that will form a run
1084            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1085            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1086            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1087            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1088            // Window 3: 4 files that will form another run
1089            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1090            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1091            new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
1092            new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
1093            // Window 6: 4 files that will form another run
1094            new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
1095            new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
1096        ];
1097
1098        let mut windows = assign_to_windows(files.iter(), 3);
1099
1100        let picker = TwcsPicker {
1101            trigger_file_num: 4,
1102            time_window_seconds: Some(3),
1103            max_output_file_size: None,
1104            append_mode: false,
1105            max_background_tasks: Some(max_background_tasks),
1106        };
1107
1108        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1109        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1110
1111        // Should have at most max_background_tasks outputs
1112        assert!(
1113            output.len() <= max_background_tasks,
1114            "Output should be truncated to max_background_tasks: expected <= {}, got {}",
1115            max_background_tasks,
1116            output.len()
1117        );
1118
1119        // Without max_background_tasks, should have more outputs
1120        let picker_no_limit = TwcsPicker {
1121            trigger_file_num: 4,
1122            time_window_seconds: Some(3),
1123            max_output_file_size: None,
1124            append_mode: false,
1125            max_background_tasks: None,
1126        };
1127
1128        let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1129        let output_no_limit = picker_no_limit.build_output(
1130            RegionId::from_u64(123),
1131            &mut windows_no_limit,
1132            active_window,
1133        );
1134
1135        // Without limit, should have more outputs (if there are enough windows)
1136        if output_no_limit.len() > max_background_tasks {
1137            assert!(
1138                output_no_limit.len() > output.len(),
1139                "Without limit should have more outputs than with limit"
1140            );
1141        }
1142    }
1143
1144    #[test]
1145    fn test_max_background_tasks_no_truncation_when_under_limit() {
1146        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1147        let max_background_tasks = 10; // Larger than expected outputs
1148
1149        // Create files in one window that will generate one compaction output
1150        let files = [
1151            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1152            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1153            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1154            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1155        ];
1156
1157        let mut windows = assign_to_windows(files.iter(), 3);
1158
1159        let picker = TwcsPicker {
1160            trigger_file_num: 4,
1161            time_window_seconds: Some(3),
1162            max_output_file_size: None,
1163            append_mode: false,
1164            max_background_tasks: Some(max_background_tasks),
1165        };
1166
1167        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1168        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1169
1170        // Should have all outputs since we're under the limit
1171        assert!(
1172            output.len() <= max_background_tasks,
1173            "Output should be within limit"
1174        );
1175        // Should have at least one output
1176        assert!(!output.is_empty(), "Should have at least one output");
1177    }
1178
1179    #[test]
1180    fn test_pick_multiple_runs() {
1181        common_telemetry::init_default_ut_logging();
1182
1183        let num_files = 8;
1184        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1185
1186        // Create files with different sequences so they form multiple runs
1187        let files: Vec<_> = file_ids
1188            .iter()
1189            .enumerate()
1190            .map(|(idx, file_id)| {
1191                new_file_handle_with_size_and_sequence(
1192                    *file_id,
1193                    0,
1194                    999,
1195                    0,
1196                    (idx + 1) as u64,
1197                    1024 * 1024,
1198                )
1199            })
1200            .collect();
1201
1202        let mut windows = assign_to_windows(files.iter(), 3);
1203
1204        let picker = TwcsPicker {
1205            trigger_file_num: 4,
1206            time_window_seconds: Some(3),
1207            max_output_file_size: None,
1208            append_mode: false,
1209            max_background_tasks: None,
1210        };
1211
1212        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1213        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1214
1215        assert_eq!(1, output.len());
1216        assert_eq!(output[0].inputs.len(), 2);
1217    }
1218
1219    #[test]
1220    fn test_limit_max_input_files() {
1221        common_telemetry::init_default_ut_logging();
1222
1223        let num_files = 50;
1224        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1225
1226        // Create files with different sequences so they form 2 runs
1227        let files: Vec<_> = file_ids
1228            .iter()
1229            .enumerate()
1230            .map(|(idx, file_id)| {
1231                new_file_handle_with_size_and_sequence(
1232                    *file_id,
1233                    (idx / 2 * 10) as i64,
1234                    (idx / 2 * 10 + 5) as i64,
1235                    0,
1236                    (idx + 1) as u64,
1237                    1024 * 1024,
1238                )
1239            })
1240            .collect();
1241
1242        let mut windows = assign_to_windows(files.iter(), 3);
1243
1244        let picker = TwcsPicker {
1245            trigger_file_num: 4,
1246            time_window_seconds: Some(3),
1247            max_output_file_size: None,
1248            append_mode: false,
1249            max_background_tasks: None,
1250        };
1251
1252        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1253        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1254
1255        assert_eq!(1, output.len());
1256        assert_eq!(output[0].inputs.len(), 32);
1257    }
1258
1259    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
1260}