Skip to main content

mito2/compaction/
twcs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31    FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files,
32    primary_key_ranges_overlap, reduce_runs,
33};
34use crate::compaction::{CompactionOutput, get_expired_ssts};
35use crate::sst::file::{FileHandle, Level, overlaps};
36use crate::sst::version::LevelMeta;
37
38const LEVEL_COMPACTED: Level = 1;
39
40/// Default value for max compaction input file num.
41const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
42
43/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
44/// candidates.
45#[derive(Debug)]
46pub struct TwcsPicker {
47    /// Minimum file num to trigger a compaction.
48    pub trigger_file_num: usize,
49    /// Compaction time window in seconds.
50    pub time_window_seconds: Option<i64>,
51    /// Max allowed compaction output file size.
52    pub max_output_file_size: Option<u64>,
53    /// Whether the target region is in append mode.
54    pub append_mode: bool,
55    /// Max background compaction tasks.
56    pub max_background_tasks: Option<usize>,
57}
58
59impl TwcsPicker {
60    /// Builds compaction output from files.
61    fn build_output(
62        &self,
63        region_id: RegionId,
64        time_windows: &mut BTreeMap<i64, Window>,
65        active_window: Option<i64>,
66    ) -> Vec<CompactionOutput> {
67        let mut output = vec![];
68        for (window, files) in time_windows {
69            if files.files.is_empty() {
70                continue;
71            }
72            let mut files_to_merge: Vec<_> = files.files().cloned().collect();
73
74            // Filter out large files in append mode - they won't benefit from compaction
75            if self.append_mode
76                && let Some(max_size) = self.max_output_file_size
77            {
78                let (kept_files, ignored_files) = files_to_merge
79                    .into_iter()
80                    .partition(|fg| fg.size() <= max_size as usize);
81                files_to_merge = kept_files;
82                info!(
83                    "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
84                    ignored_files.len(),
85                    region_id,
86                    window,
87                    max_size
88                );
89            }
90
91            let sorted_runs = find_sorted_runs(&mut files_to_merge);
92            let found_runs = sorted_runs.len();
93            // We only remove deletion markers if we found less than 2 runs and not in append mode.
94            // because after compaction there will be no overlapping files.
95            let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
96            if found_runs == 0 {
97                continue;
98            }
99
100            let mut inputs = if found_runs > 1 {
101                reduce_runs(sorted_runs)
102            } else {
103                let run = sorted_runs.last().unwrap();
104                if run.items().len() < self.trigger_file_num {
105                    continue;
106                }
107                // no overlapping files, try merge small files
108                merge_seq_files(run.items(), self.max_output_file_size)
109            };
110
111            // Limits the number of input files.
112            let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
113            if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
114                // Sorts file groups by size first.
115                inputs.sort_unstable_by_key(|fg| fg.size());
116                let mut num_picked_files = 0;
117                inputs = inputs
118                    .into_iter()
119                    .take_while(|fg| {
120                        let current_group_file_num = fg.num_files();
121                        if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
122                            num_picked_files += current_group_file_num;
123                            true
124                        } else {
125                            false
126                        }
127                    })
128                    .collect::<Vec<_>>();
129                info!(
130                    "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
131                    region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
132                );
133            }
134
135            if inputs.len() > 1 {
136                // If we have more than one group to compact.
137                log_pick_result(
138                    region_id,
139                    *window,
140                    active_window,
141                    found_runs,
142                    files.files.len(),
143                    self.max_output_file_size,
144                    filter_deleted,
145                    &inputs,
146                );
147                output.push(CompactionOutput {
148                    output_level: LEVEL_COMPACTED, // always compact to l1
149                    inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
150                    filter_deleted,
151                    output_time_range: None, // we do not enforce output time range in twcs compactions.
152                });
153
154                if let Some(max_background_tasks) = self.max_background_tasks
155                    && output.len() >= max_background_tasks
156                {
157                    debug!(
158                        "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
159                        region_id, max_background_tasks
160                    );
161                    break;
162                }
163            }
164        }
165        output
166    }
167}
168
169#[allow(clippy::too_many_arguments)]
170fn log_pick_result(
171    region_id: RegionId,
172    window: i64,
173    active_window: Option<i64>,
174    found_runs: usize,
175    file_num: usize,
176    max_output_file_size: Option<u64>,
177    filter_deleted: bool,
178    inputs: &[FileGroup],
179) {
180    let input_file_str: Vec<String> = inputs
181        .iter()
182        .map(|f| {
183            let range = f.range();
184            let start = range.0.to_iso8601_string();
185            let end = range.1.to_iso8601_string();
186            let num_rows = f.num_rows();
187            format!(
188                "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
189                f.file_ids(),
190                start,
191                end,
192                ReadableSize(f.size() as u64),
193                num_rows
194            )
195        })
196        .collect();
197    let window_str = Timestamp::new_second(window).to_iso8601_string();
198    let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
199    let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
200    info!(
201        "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
202            found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
203            input files: {:?}",
204        region_id,
205        window_str,
206        active_window_str,
207        found_runs,
208        file_num,
209        max_output_file_size,
210        filter_deleted,
211        input_file_str
212    );
213}
214
215impl Picker for TwcsPicker {
216    fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
217        let region_id = compaction_region.region_id;
218        let levels = compaction_region.current_version.ssts.levels();
219
220        let expired_ssts =
221            get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
222        if !expired_ssts.is_empty() {
223            info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
224            // here we mark expired SSTs as compacting to avoid them being picked.
225            expired_ssts.iter().for_each(|f| f.set_compacting(true));
226        }
227
228        let compaction_time_window = compaction_region
229            .current_version
230            .compaction_time_window
231            .map(|window| window.as_secs() as i64);
232        let time_window_size = compaction_time_window
233            .or(self.time_window_seconds)
234            .unwrap_or_else(|| {
235                let inferred = infer_time_bucket(levels[0].files());
236                info!(
237                    "Compaction window for region {} is not present, inferring from files: {:?}",
238                    region_id, inferred
239                );
240                inferred
241            });
242
243        // Find active window from files in level 0.
244        let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
245        // Assign files to windows
246        let mut windows =
247            assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
248        let outputs = self.build_output(region_id, &mut windows, active_window);
249
250        if outputs.is_empty() && expired_ssts.is_empty() {
251            return None;
252        }
253
254        let max_file_size = self.max_output_file_size.map(|v| v as usize);
255        Some(PickerOutput {
256            outputs,
257            expired_ssts,
258            time_window_size,
259            max_file_size,
260        })
261    }
262}
263
264struct Window {
265    start: Timestamp,
266    end: Timestamp,
267    // Mapping from file sequence to file groups. Files with the same sequence is considered
268    // created from the same compaction task.
269    files: HashMap<Option<NonZeroU64>, FileGroup>,
270    time_window: i64,
271    overlapping: bool,
272    primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
273}
274
275impl Window {
276    /// Creates a new [Window] with given file.
277    fn new_with_file(file: FileHandle) -> Self {
278        let (start, end) = file.time_range();
279        let primary_key_range = file.primary_key_range();
280        let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
281        Self {
282            start,
283            end,
284            files,
285            time_window: 0,
286            overlapping: false,
287            primary_key_range,
288        }
289    }
290
291    /// Returns the time range of all files in current window (inclusive).
292    fn range(&self) -> (Timestamp, Timestamp) {
293        (self.start, self.end)
294    }
295
296    /// Adds a new file to window and updates time range.
297    fn add_file(&mut self, file: FileHandle) {
298        let (start, end) = file.time_range();
299        self.start = self.start.min(start);
300        self.end = self.end.max(end);
301        self.primary_key_range =
302            merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
303
304        match self.files.entry(file.meta_ref().sequence) {
305            Entry::Occupied(mut o) => {
306                o.get_mut().add_file(file);
307            }
308            Entry::Vacant(v) => {
309                v.insert(FileGroup::new_with_file(file));
310            }
311        }
312    }
313
314    fn files(&self) -> impl Iterator<Item = &FileGroup> {
315        self.files.values()
316    }
317}
318
319/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
320fn assign_to_windows<'a>(
321    files: impl Iterator<Item = &'a FileHandle>,
322    time_window_size: i64,
323) -> BTreeMap<i64, Window> {
324    let mut windows: HashMap<i64, Window> = HashMap::new();
325    // Iterates all files and assign to time windows according to max timestamp
326    for f in files {
327        if f.compacting() {
328            continue;
329        }
330        let (_, end) = f.time_range();
331        let time_window = end
332            .convert_to(TimeUnit::Second)
333            .unwrap()
334            .value()
335            .align_to_ceil_by_bucket(time_window_size)
336            .unwrap_or(i64::MIN);
337
338        match windows.entry(time_window) {
339            Entry::Occupied(mut e) => {
340                e.get_mut().add_file(f.clone());
341            }
342            Entry::Vacant(e) => {
343                let mut window = Window::new_with_file(f.clone());
344                window.time_window = time_window;
345                e.insert(window);
346            }
347        }
348    }
349    if windows.is_empty() {
350        return BTreeMap::new();
351    }
352
353    let mut windows = windows.into_values().collect::<Vec<_>>();
354    windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
355
356    for idx in 0..windows.len() {
357        let lhs_range = windows[idx].range();
358        for next_idx in idx + 1..windows.len() {
359            let rhs_range = windows[next_idx].range();
360            if rhs_range.0 > lhs_range.1 {
361                break;
362            }
363
364            let windows_overlap = overlaps(&lhs_range, &rhs_range)
365                && match (
366                    &windows[idx].primary_key_range,
367                    &windows[next_idx].primary_key_range,
368                ) {
369                    (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
370                    _ => true,
371                };
372            if windows_overlap {
373                windows[idx].overlapping = true;
374                windows[next_idx].overlapping = true;
375            }
376        }
377    }
378
379    windows.into_iter().map(|w| (w.time_window, w)).collect()
380}
381
382/// Finds the latest active writing window among all files.
383/// Returns `None` when there are no files or all files are corrupted.
384fn find_latest_window_in_seconds<'a>(
385    files: impl Iterator<Item = &'a FileHandle>,
386    time_window_size: i64,
387) -> Option<i64> {
388    let mut latest_timestamp = None;
389    for f in files {
390        let (_, end) = f.time_range();
391        if let Some(latest) = latest_timestamp {
392            if end > latest {
393                latest_timestamp = Some(end);
394            }
395        } else {
396            latest_timestamp = Some(end);
397        }
398    }
399    latest_timestamp
400        .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
401        .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
402}
403
404#[cfg(test)]
405mod tests {
406    use std::collections::HashSet;
407
408    use bytes::Bytes;
409    use store_api::storage::FileId;
410
411    use super::*;
412    use crate::compaction::test_util::{
413        new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
414        new_file_handle_with_size_sequence_and_primary_key_range,
415    };
416    use crate::sst::file::Level;
417
418    #[test]
419    fn test_get_latest_window_in_seconds() {
420        assert_eq!(
421            Some(1),
422            find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
423        );
424        assert_eq!(
425            Some(1),
426            find_latest_window_in_seconds(
427                [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
428                1
429            )
430        );
431
432        assert_eq!(
433            Some(-9223372036854000),
434            find_latest_window_in_seconds(
435                [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
436                3600,
437            )
438        );
439
440        assert_eq!(
441            (i64::MAX / 10000000 + 1) * 10000,
442            find_latest_window_in_seconds(
443                [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
444                10000,
445            )
446            .unwrap()
447        );
448
449        assert_eq!(
450            Some((i64::MAX / 3600000 + 1) * 3600),
451            find_latest_window_in_seconds(
452                [
453                    new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
454                    new_file_handle(FileId::random(), 0, 1000, 0)
455                ]
456                .iter(),
457                3600
458            )
459        );
460    }
461
462    #[test]
463    fn test_assign_to_windows() {
464        let windows = assign_to_windows(
465            [
466                new_file_handle(FileId::random(), 0, 999, 0),
467                new_file_handle(FileId::random(), 0, 999, 0),
468                new_file_handle(FileId::random(), 0, 999, 0),
469                new_file_handle(FileId::random(), 0, 999, 0),
470                new_file_handle(FileId::random(), 0, 999, 0),
471            ]
472            .iter(),
473            3,
474        );
475        let fgs = &windows.get(&0).unwrap().files;
476        assert_eq!(1, fgs.len());
477        assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
478
479        let files = [FileId::random(); 3];
480        let windows = assign_to_windows(
481            [
482                new_file_handle(files[0], -2000, -3, 0),
483                new_file_handle(files[1], 0, 2999, 0),
484                new_file_handle(files[2], 50, 10001, 0),
485            ]
486            .iter(),
487            3,
488        );
489        assert_eq!(
490            files[0],
491            windows.get(&0).unwrap().files().next().unwrap().files()[0]
492                .file_id()
493                .file_id()
494        );
495        assert_eq!(
496            files[1],
497            windows.get(&3).unwrap().files().next().unwrap().files()[0]
498                .file_id()
499                .file_id()
500        );
501        assert_eq!(
502            files[2],
503            windows.get(&12).unwrap().files().next().unwrap().files()[0]
504                .file_id()
505                .file_id()
506        );
507    }
508
509    #[test]
510    fn test_assign_file_groups_to_windows() {
511        let files = [
512            FileId::random(),
513            FileId::random(),
514            FileId::random(),
515            FileId::random(),
516        ];
517        let windows = assign_to_windows(
518            [
519                new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
520                new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
521                new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
522                new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
523            ]
524            .iter(),
525            3,
526        );
527        assert_eq!(windows.len(), 1);
528        let fgs = &windows.get(&0).unwrap().files;
529        assert_eq!(2, fgs.len());
530        assert_eq!(
531            fgs.get(&NonZeroU64::new(1))
532                .unwrap()
533                .files()
534                .iter()
535                .map(|f| f.file_id().file_id())
536                .collect::<HashSet<_>>(),
537            [files[0], files[1]].into_iter().collect()
538        );
539        assert_eq!(
540            fgs.get(&NonZeroU64::new(2))
541                .unwrap()
542                .files()
543                .iter()
544                .map(|f| f.file_id().file_id())
545                .collect::<HashSet<_>>(),
546            [files[2], files[3]].into_iter().collect()
547        );
548    }
549
550    #[test]
551    fn test_assign_compacting_to_windows() {
552        let files = [
553            new_file_handle(FileId::random(), 0, 999, 0),
554            new_file_handle(FileId::random(), 0, 999, 0),
555            new_file_handle(FileId::random(), 0, 999, 0),
556            new_file_handle(FileId::random(), 0, 999, 0),
557            new_file_handle(FileId::random(), 0, 999, 0),
558        ];
559        files[0].set_compacting(true);
560        files[2].set_compacting(true);
561        let mut windows = assign_to_windows(files.iter(), 3);
562        let window0 = windows.remove(&0).unwrap();
563        assert_eq!(1, window0.files.len());
564        let candidates = window0
565            .files
566            .into_values()
567            .flat_map(|fg| fg.into_files())
568            .map(|f| f.file_id().file_id())
569            .collect::<HashSet<_>>();
570        assert_eq!(candidates.len(), 3);
571        assert_eq!(
572            candidates,
573            [
574                files[1].file_id().file_id(),
575                files[3].file_id().file_id(),
576                files[4].file_id().file_id()
577            ]
578            .into_iter()
579            .collect::<HashSet<_>>()
580        );
581    }
582
583    /// (Window value, overlapping, files' time ranges in window)
584    type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
585
586    fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
587        Some((Bytes::from_static(min), Bytes::from_static(max)))
588    }
589
590    fn check_assign_to_windows_with_overlapping(
591        file_time_ranges: &[(i64, i64)],
592        time_window: i64,
593        expected_files: &[ExpectedWindowSpec],
594    ) {
595        let files: Vec<_> = (0..file_time_ranges.len())
596            .map(|_| FileId::random())
597            .collect();
598
599        let file_handles = files
600            .iter()
601            .zip(file_time_ranges.iter())
602            .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
603            .collect::<Vec<_>>();
604
605        let windows = assign_to_windows(file_handles.iter(), time_window);
606
607        for (expected_window, overlapping, window_files) in expected_files {
608            let actual_window = windows.get(expected_window).unwrap();
609            assert_eq!(*overlapping, actual_window.overlapping);
610            let mut file_ranges = actual_window
611                .files
612                .values()
613                .flat_map(|f| {
614                    f.files().iter().map(|f| {
615                        let (s, e) = f.time_range();
616                        (s.value(), e.value())
617                    })
618                })
619                .collect::<Vec<_>>();
620            file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
621            assert_eq!(window_files, &file_ranges);
622        }
623    }
624
625    #[test]
626    fn test_assign_to_windows_with_overlapping() {
627        check_assign_to_windows_with_overlapping(
628            &[(0, 999), (1000, 1999), (2000, 2999)],
629            2,
630            &[
631                (0, false, vec![(0, 999)]),
632                (2, false, vec![(1000, 1999), (2000, 2999)]),
633            ],
634        );
635
636        check_assign_to_windows_with_overlapping(
637            &[(0, 1), (0, 999), (100, 2999)],
638            2,
639            &[
640                (0, true, vec![(0, 1), (0, 999)]),
641                (2, true, vec![(100, 2999)]),
642            ],
643        );
644
645        check_assign_to_windows_with_overlapping(
646            &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
647            2,
648            &[
649                (0, false, vec![(0, 999)]),
650                (2, false, vec![(1000, 1999), (2000, 2999)]),
651                (4, false, vec![(3000, 3999)]),
652            ],
653        );
654
655        check_assign_to_windows_with_overlapping(
656            &[
657                (0, 999),
658                (1000, 1999),
659                (2000, 2999),
660                (3000, 3999),
661                (0, 3999),
662            ],
663            2,
664            &[
665                (0, true, vec![(0, 999)]),
666                (2, true, vec![(1000, 1999), (2000, 2999)]),
667                (4, true, vec![(0, 3999), (3000, 3999)]),
668            ],
669        );
670
671        check_assign_to_windows_with_overlapping(
672            &[
673                (0, 999),
674                (1000, 1999),
675                (2000, 2999),
676                (3000, 3999),
677                (1999, 3999),
678            ],
679            2,
680            &[
681                (0, false, vec![(0, 999)]),
682                (2, true, vec![(1000, 1999), (2000, 2999)]),
683                (4, true, vec![(1999, 3999), (3000, 3999)]),
684            ],
685        );
686
687        check_assign_to_windows_with_overlapping(
688            &[
689                (0, 999),     // window 0
690                (1000, 1999), // window 2
691                (2000, 2999), // window 2
692                (3000, 3999), // window 4
693                (2999, 3999), // window 4
694            ],
695            2,
696            &[
697                // window 2 overlaps with window 4
698                (0, false, vec![(0, 999)]),
699                (2, true, vec![(1000, 1999), (2000, 2999)]),
700                (4, true, vec![(2999, 3999), (3000, 3999)]),
701            ],
702        );
703
704        check_assign_to_windows_with_overlapping(
705            &[
706                (0, 999),     // window 0
707                (1000, 1999), // window 2
708                (2000, 2999), // window 2
709                (3000, 3999), // window 4
710                (0, 1000),    // // window 2
711            ],
712            2,
713            &[
714                // only window 0 overlaps with window 2.
715                (0, true, vec![(0, 999)]),
716                (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
717                (4, false, vec![(3000, 3999)]),
718            ],
719        );
720    }
721
722    #[test]
723    fn test_assign_to_windows_not_overlapping_when_pk_disjoint() {
724        let files = [
725            new_file_handle_with_size_sequence_and_primary_key_range(
726                FileId::random(),
727                0,
728                1000,
729                0,
730                1,
731                10,
732                pk_range(b"a", b"f"),
733            ),
734            new_file_handle_with_size_sequence_and_primary_key_range(
735                FileId::random(),
736                500,
737                1999,
738                0,
739                2,
740                10,
741                pk_range(b"x", b"z"),
742            ),
743        ];
744
745        let windows = assign_to_windows(files.iter(), 2);
746
747        assert!(!windows.get(&2).unwrap().overlapping);
748    }
749
750    #[test]
751    fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() {
752        let files = [
753            new_file_handle(FileId::random(), 0, 1999, 0),
754            new_file_handle_with_size_sequence_and_primary_key_range(
755                FileId::random(),
756                2000,
757                3999,
758                0,
759                1,
760                10,
761                pk_range(b"a", b"f"),
762            ),
763            new_file_handle_with_size_sequence_and_primary_key_range(
764                FileId::random(),
765                3000,
766                4999,
767                0,
768                2,
769                10,
770                pk_range(b"x", b"z"),
771            ),
772        ];
773
774        let windows = assign_to_windows(files.iter(), 2);
775
776        assert!(!windows.get(&4).unwrap().overlapping);
777    }
778
779    struct CompactionPickerTestCase {
780        window_size: i64,
781        input_files: Vec<FileHandle>,
782        expected_outputs: Vec<ExpectedOutput>,
783    }
784
785    impl CompactionPickerTestCase {
786        fn check(&self) {
787            let file_id_to_idx = self
788                .input_files
789                .iter()
790                .enumerate()
791                .map(|(idx, file)| (file.file_id(), idx))
792                .collect::<HashMap<_, _>>();
793            let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
794            let active_window =
795                find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
796            let output = TwcsPicker {
797                trigger_file_num: 4,
798                time_window_seconds: None,
799                max_output_file_size: None,
800                append_mode: false,
801                max_background_tasks: None,
802            }
803            .build_output(RegionId::from_u64(0), &mut windows, active_window);
804
805            let output = output
806                .iter()
807                .map(|o| {
808                    let input_file_ids = o
809                        .inputs
810                        .iter()
811                        .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
812                        .collect::<HashSet<_>>();
813                    (input_file_ids, o.output_level)
814                })
815                .collect::<Vec<_>>();
816
817            let expected = self
818                .expected_outputs
819                .iter()
820                .map(|o| {
821                    let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
822                    (input_file_ids, o.output_level)
823                })
824                .collect::<Vec<_>>();
825            assert_eq!(expected, output);
826        }
827    }
828
829    struct ExpectedOutput {
830        input_files: Vec<usize>,
831        output_level: Level,
832    }
833
834    #[test]
835    fn test_build_twcs_output() {
836        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
837
838        // Case 1: 2 runs found in each time window.
839        CompactionPickerTestCase {
840            window_size: 3,
841            input_files: [
842                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
843                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
844                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
845                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
846            ]
847            .to_vec(),
848            expected_outputs: vec![
849                ExpectedOutput {
850                    input_files: vec![0, 1],
851                    output_level: 1,
852                },
853                ExpectedOutput {
854                    input_files: vec![2, 3],
855                    output_level: 1,
856                },
857            ],
858        }
859        .check();
860
861        // Case 2:
862        //    -2000........-3
863        // -3000.....-100
864        //                    0..............2999
865        //                      50..........2998
866        //                     11.........2990
867        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
868        CompactionPickerTestCase {
869            window_size: 3,
870            input_files: [
871                new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
872                new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
873                new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
874                new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
875                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
876            ]
877            .to_vec(),
878            expected_outputs: vec![
879                ExpectedOutput {
880                    input_files: vec![0, 1],
881                    output_level: 1,
882                },
883                ExpectedOutput {
884                    input_files: vec![2, 4],
885                    output_level: 1,
886                },
887            ],
888        }
889        .check();
890
891        // Case 3:
892        // A compaction may split output into several files that have overlapping time ranges and same sequence,
893        // we should treat these files as one FileGroup.
894        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
895        CompactionPickerTestCase {
896            window_size: 3,
897            input_files: [
898                new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
899                new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
900                new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
901                new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
902                new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
903            ]
904            .to_vec(),
905            expected_outputs: vec![ExpectedOutput {
906                input_files: vec![0, 1, 4],
907                output_level: 1,
908            }],
909        }
910        .check();
911    }
912
913    #[test]
914    fn test_build_output_skips_pk_disjoint_files() {
915        let files = [
916            new_file_handle_with_size_sequence_and_primary_key_range(
917                FileId::random(),
918                0,
919                2999,
920                0,
921                1,
922                10,
923                pk_range(b"a", b"f"),
924            ),
925            new_file_handle_with_size_sequence_and_primary_key_range(
926                FileId::random(),
927                50,
928                2998,
929                0,
930                2,
931                10,
932                pk_range(b"x", b"z"),
933            ),
934        ];
935        let mut windows = assign_to_windows(files.iter(), 3);
936        let active_window = find_latest_window_in_seconds(files.iter(), 3);
937        let output = TwcsPicker {
938            trigger_file_num: 4,
939            time_window_seconds: None,
940            max_output_file_size: None,
941            append_mode: false,
942            max_background_tasks: None,
943        }
944        .build_output(RegionId::from_u64(0), &mut windows, active_window);
945
946        assert!(output.is_empty());
947    }
948
949    #[test]
950    fn test_append_mode_filter_large_files() {
951        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
952        let max_output_file_size = 1000u64;
953
954        // Create files with different sizes
955        let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
956        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
957        let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
958        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
959
960        // Create file groups (each file is in its own group due to different sequences)
961        let mut files_to_merge = vec![
962            FileGroup::new_with_file(small_file_1),
963            FileGroup::new_with_file(large_file_1),
964            FileGroup::new_with_file(small_file_2),
965            FileGroup::new_with_file(large_file_2),
966        ];
967
968        // Test filtering logic directly
969        let original_count = files_to_merge.len();
970
971        // Apply append mode filtering
972        files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
973
974        // Should have filtered out 2 large files, leaving 2 small files
975        assert_eq!(files_to_merge.len(), 2);
976        assert_eq!(original_count, 4);
977
978        // Verify the remaining files are the small ones
979        for fg in &files_to_merge {
980            assert!(
981                fg.size() <= max_output_file_size as usize,
982                "File size {} should be <= {}",
983                fg.size(),
984                max_output_file_size
985            );
986        }
987    }
988
989    #[test]
990    fn test_build_output_multiple_windows_with_zero_runs() {
991        let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
992
993        let files = [
994            // Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
995            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
996            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
997            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
998            // Window 3: Contains files that will form 2 runs
999            new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
1000            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1001            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1002        ];
1003
1004        let mut windows = assign_to_windows(files.iter(), 3);
1005
1006        // Create picker with trigger_file_num of 4 so single files won't form runs in first window
1007        let picker = TwcsPicker {
1008            trigger_file_num: 4, // High enough to prevent runs in first window
1009            time_window_seconds: Some(3),
1010            max_output_file_size: None,
1011            append_mode: false,
1012            max_background_tasks: None,
1013        };
1014
1015        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1016        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1017
1018        assert!(
1019            !output.is_empty(),
1020            "Should have output from windows with runs, even when one window has 0 runs"
1021        );
1022
1023        let all_output_files: Vec<_> = output
1024            .iter()
1025            .flat_map(|o| o.inputs.iter())
1026            .map(|f| f.file_id().file_id())
1027            .collect();
1028
1029        assert!(
1030            all_output_files.contains(&file_ids[3])
1031                || all_output_files.contains(&file_ids[4])
1032                || all_output_files.contains(&file_ids[5]),
1033            "Output should contain files from the window with runs"
1034        );
1035    }
1036
1037    #[test]
1038    fn test_build_output_single_window_zero_runs() {
1039        let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
1040
1041        let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
1042        let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
1043
1044        let files = [large_file_1, large_file_2];
1045
1046        let mut windows = assign_to_windows(files.iter(), 3);
1047
1048        let picker = TwcsPicker {
1049            trigger_file_num: 2,
1050            time_window_seconds: Some(3),
1051            max_output_file_size: Some(1000),
1052            append_mode: true,
1053            max_background_tasks: None,
1054        };
1055
1056        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1057        let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
1058
1059        // Should return empty output (no compaction needed)
1060        assert!(
1061            output.is_empty(),
1062            "Should return empty output when no runs are found after filtering"
1063        );
1064    }
1065
1066    #[test]
1067    fn test_max_background_tasks_truncation() {
1068        let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
1069        let max_background_tasks = 3;
1070
1071        // Create files across multiple windows that will generate multiple compaction outputs
1072        let files = [
1073            // Window 0: 4 files that will form a run
1074            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1075            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1076            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1077            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1078            // Window 3: 4 files that will form another run
1079            new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1080            new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1081            new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
1082            new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
1083            // Window 6: 4 files that will form another run
1084            new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
1085            new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
1086        ];
1087
1088        let mut windows = assign_to_windows(files.iter(), 3);
1089
1090        let picker = TwcsPicker {
1091            trigger_file_num: 4,
1092            time_window_seconds: Some(3),
1093            max_output_file_size: None,
1094            append_mode: false,
1095            max_background_tasks: Some(max_background_tasks),
1096        };
1097
1098        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1099        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1100
1101        // Should have at most max_background_tasks outputs
1102        assert!(
1103            output.len() <= max_background_tasks,
1104            "Output should be truncated to max_background_tasks: expected <= {}, got {}",
1105            max_background_tasks,
1106            output.len()
1107        );
1108
1109        // Without max_background_tasks, should have more outputs
1110        let picker_no_limit = TwcsPicker {
1111            trigger_file_num: 4,
1112            time_window_seconds: Some(3),
1113            max_output_file_size: None,
1114            append_mode: false,
1115            max_background_tasks: None,
1116        };
1117
1118        let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1119        let output_no_limit = picker_no_limit.build_output(
1120            RegionId::from_u64(123),
1121            &mut windows_no_limit,
1122            active_window,
1123        );
1124
1125        // Without limit, should have more outputs (if there are enough windows)
1126        if output_no_limit.len() > max_background_tasks {
1127            assert!(
1128                output_no_limit.len() > output.len(),
1129                "Without limit should have more outputs than with limit"
1130            );
1131        }
1132    }
1133
1134    #[test]
1135    fn test_max_background_tasks_no_truncation_when_under_limit() {
1136        let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1137        let max_background_tasks = 10; // Larger than expected outputs
1138
1139        // Create files in one window that will generate one compaction output
1140        let files = [
1141            new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1142            new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1143            new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1144            new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1145        ];
1146
1147        let mut windows = assign_to_windows(files.iter(), 3);
1148
1149        let picker = TwcsPicker {
1150            trigger_file_num: 4,
1151            time_window_seconds: Some(3),
1152            max_output_file_size: None,
1153            append_mode: false,
1154            max_background_tasks: Some(max_background_tasks),
1155        };
1156
1157        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1158        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1159
1160        // Should have all outputs since we're under the limit
1161        assert!(
1162            output.len() <= max_background_tasks,
1163            "Output should be within limit"
1164        );
1165        // Should have at least one output
1166        assert!(!output.is_empty(), "Should have at least one output");
1167    }
1168
1169    #[test]
1170    fn test_pick_multiple_runs() {
1171        common_telemetry::init_default_ut_logging();
1172
1173        let num_files = 8;
1174        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1175
1176        // Create files with different sequences so they form multiple runs
1177        let files: Vec<_> = file_ids
1178            .iter()
1179            .enumerate()
1180            .map(|(idx, file_id)| {
1181                new_file_handle_with_size_and_sequence(
1182                    *file_id,
1183                    0,
1184                    999,
1185                    0,
1186                    (idx + 1) as u64,
1187                    1024 * 1024,
1188                )
1189            })
1190            .collect();
1191
1192        let mut windows = assign_to_windows(files.iter(), 3);
1193
1194        let picker = TwcsPicker {
1195            trigger_file_num: 4,
1196            time_window_seconds: Some(3),
1197            max_output_file_size: None,
1198            append_mode: false,
1199            max_background_tasks: None,
1200        };
1201
1202        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1203        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1204
1205        assert_eq!(1, output.len());
1206        assert_eq!(output[0].inputs.len(), 2);
1207    }
1208
1209    #[test]
1210    fn test_limit_max_input_files() {
1211        common_telemetry::init_default_ut_logging();
1212
1213        let num_files = 50;
1214        let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1215
1216        // Create files with different sequences so they form 2 runs
1217        let files: Vec<_> = file_ids
1218            .iter()
1219            .enumerate()
1220            .map(|(idx, file_id)| {
1221                new_file_handle_with_size_and_sequence(
1222                    *file_id,
1223                    (idx / 2 * 10) as i64,
1224                    (idx / 2 * 10 + 5) as i64,
1225                    0,
1226                    (idx + 1) as u64,
1227                    1024 * 1024,
1228                )
1229            })
1230            .collect();
1231
1232        let mut windows = assign_to_windows(files.iter(), 3);
1233
1234        let picker = TwcsPicker {
1235            trigger_file_num: 4,
1236            time_window_seconds: Some(3),
1237            max_output_file_size: None,
1238            append_mode: false,
1239            max_background_tasks: None,
1240        };
1241
1242        let active_window = find_latest_window_in_seconds(files.iter(), 3);
1243        let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1244
1245        assert_eq!(1, output.len());
1246        assert_eq!(output[0].inputs.len(), 32);
1247    }
1248
1249    // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
1250}