1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use rayon::prelude::*;
26use store_api::storage::RegionId;
27
28use crate::compaction::buckets::infer_time_bucket;
29use crate::compaction::compactor::CompactionRegion;
30use crate::compaction::picker::{Picker, PickerOutput};
31use crate::compaction::run::{
32 FileGroup, Item, Ranged, find_sorted_runs, find_sorted_runs_by_time_range,
33 merge_primary_key_ranges, merge_seq_files, primary_key_ranges_overlap, reduce_runs,
34};
35use crate::compaction::{CompactionOutput, get_expired_ssts};
36use crate::sst::file::{FileHandle, Level, overlaps};
37use crate::sst::version::LevelMeta;
38
39const LEVEL_COMPACTED: Level = 1;
40
41const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
43
44#[derive(Debug)]
47pub struct TwcsPicker {
48 pub trigger_file_num: usize,
50 pub time_window_seconds: Option<i64>,
52 pub max_output_file_size: Option<u64>,
54 pub append_mode: bool,
56 pub max_background_tasks: Option<usize>,
58}
59
60impl TwcsPicker {
61 fn build_output(
63 &self,
64 region_id: RegionId,
65 time_windows: &mut BTreeMap<i64, Window>,
66 active_window: Option<i64>,
67 ) -> Vec<CompactionOutput> {
68 let find_inputs = |files: &Window,
69 windows: &BTreeMap<i64, Window>|
70 -> (Vec<FileGroup>, bool) {
71 let window = &files.time_window;
72 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
73
74 if self.append_mode
76 && let Some(max_size) = self.max_output_file_size
77 {
78 let (kept_files, ignored_files) = files_to_merge
79 .into_iter()
80 .partition(|fg| fg.size() <= max_size as usize);
81 files_to_merge = kept_files;
82 info!(
83 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
84 ignored_files.len(),
85 region_id,
86 window,
87 max_size
88 );
89 }
90
91 let sorted_runs = if files_to_merge.len() < 1024 {
92 find_sorted_runs(&mut files_to_merge)
93 } else {
94 find_sorted_runs_by_time_range(&mut files_to_merge)
95 };
96 let found_runs = sorted_runs.len();
97 let filter_deleted =
100 found_runs <= 2 && !self.append_mode && !window_has_overlap(files, windows);
101 if found_runs == 0 {
102 return (vec![], filter_deleted);
103 }
104
105 let mut inputs = if found_runs > 1 {
106 reduce_runs(sorted_runs)
107 } else {
108 let run = sorted_runs.last().unwrap();
109 if run.items().len() < self.trigger_file_num {
110 return (vec![], filter_deleted);
111 }
112 merge_seq_files(run.items(), self.max_output_file_size)
114 };
115
116 let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
118 if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
119 inputs.sort_unstable_by_key(|fg| fg.size());
121 let mut num_picked_files = 0;
122 inputs = inputs
123 .into_iter()
124 .take_while(|fg| {
125 let current_group_file_num = fg.num_files();
126 if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
127 num_picked_files += current_group_file_num;
128 true
129 } else {
130 false
131 }
132 })
133 .collect::<Vec<_>>();
134 info!(
135 "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
136 region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
137 );
138 }
139
140 if inputs.len() > 1 {
141 log_pick_result(
143 region_id,
144 *window,
145 active_window,
146 found_runs,
147 files.files.len(),
148 self.max_output_file_size,
149 filter_deleted,
150 &inputs,
151 );
152 }
153 (inputs, filter_deleted)
154 };
155
156 let mut output = vec![];
157 let windows = time_windows
158 .values()
159 .filter(|w| !w.files.is_empty())
160 .collect::<Vec<_>>();
161 let chunk_size = self.max_background_tasks.unwrap_or(windows.len()).max(1);
162 'chunks: for chunk in windows.chunks(chunk_size) {
163 for (inputs, filter_deleted) in chunk
164 .par_iter() .map(|window| find_inputs(window, time_windows))
166 .collect::<Vec<_>>()
167 {
168 if inputs.is_empty() {
169 continue;
170 }
171
172 output.push(CompactionOutput {
173 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
175 filter_deleted,
176 output_time_range: None, });
178
179 if let Some(max_background_tasks) = self.max_background_tasks
180 && output.len() >= max_background_tasks
181 {
182 debug!(
183 "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
184 region_id, max_background_tasks
185 );
186 break 'chunks;
187 }
188 }
189 }
190 output
191 }
192}
193
194#[allow(clippy::too_many_arguments)]
195fn log_pick_result(
196 region_id: RegionId,
197 window: i64,
198 active_window: Option<i64>,
199 found_runs: usize,
200 file_num: usize,
201 max_output_file_size: Option<u64>,
202 filter_deleted: bool,
203 inputs: &[FileGroup],
204) {
205 let input_file_str: Vec<String> = inputs
206 .iter()
207 .map(|f| {
208 let range = f.range();
209 let start = range.0.to_iso8601_string();
210 let end = range.1.to_iso8601_string();
211 let num_rows = f.num_rows();
212 format!(
213 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
214 f.file_ids(),
215 start,
216 end,
217 ReadableSize(f.size() as u64),
218 num_rows
219 )
220 })
221 .collect();
222 let window_str = Timestamp::new_second(window).to_iso8601_string();
223 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
224 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
225 info!(
226 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
227 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
228 input files: {:?}",
229 region_id,
230 window_str,
231 active_window_str,
232 found_runs,
233 file_num,
234 max_output_file_size,
235 filter_deleted,
236 input_file_str
237 );
238}
239
240impl Picker for TwcsPicker {
241 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
242 let region_id = compaction_region.region_id;
243 let levels = compaction_region.current_version.ssts.levels();
244
245 let expired_ssts =
246 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
247 if !expired_ssts.is_empty() {
248 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
249 expired_ssts.iter().for_each(|f| f.set_compacting(true));
251 }
252
253 let compaction_time_window = compaction_region
254 .current_version
255 .compaction_time_window
256 .map(|window| window.as_secs() as i64);
257 let time_window_size = compaction_time_window
258 .or(self.time_window_seconds)
259 .unwrap_or_else(|| {
260 let inferred = infer_time_bucket(levels[0].files());
261 info!(
262 "Compaction window for region {} is not present, inferring from files: {:?}",
263 region_id, inferred
264 );
265 inferred
266 });
267
268 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
270 let mut windows =
272 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
273 let outputs = self.build_output(region_id, &mut windows, active_window);
274
275 if outputs.is_empty() && expired_ssts.is_empty() {
276 return None;
277 }
278
279 let max_file_size = self.max_output_file_size.map(|v| v as usize);
280 Some(PickerOutput {
281 outputs,
282 expired_ssts,
283 time_window_size,
284 max_file_size,
285 })
286 }
287}
288
289struct Window {
290 start: Timestamp,
291 end: Timestamp,
292 files: HashMap<Option<NonZeroU64>, FileGroup>,
295 time_window: i64,
296 primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
297}
298
299impl Window {
300 fn new_with_file(file: FileHandle) -> Self {
302 let (start, end) = file.time_range();
303 let primary_key_range = file.primary_key_range();
304 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
305 Self {
306 start,
307 end,
308 files,
309 time_window: 0,
310 primary_key_range,
311 }
312 }
313
314 fn range(&self) -> (Timestamp, Timestamp) {
316 (self.start, self.end)
317 }
318
319 fn add_file(&mut self, file: FileHandle) {
321 let (start, end) = file.time_range();
322 self.start = self.start.min(start);
323 self.end = self.end.max(end);
324 self.primary_key_range =
325 merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
326
327 match self.files.entry(file.meta_ref().sequence) {
328 Entry::Occupied(mut o) => {
329 o.get_mut().add_file(file);
330 }
331 Entry::Vacant(v) => {
332 v.insert(FileGroup::new_with_file(file));
333 }
334 }
335 }
336
337 fn files(&self) -> impl Iterator<Item = &FileGroup> {
338 self.files.values()
339 }
340}
341
342fn assign_to_windows<'a>(
344 files: impl Iterator<Item = &'a FileHandle>,
345 time_window_size: i64,
346) -> BTreeMap<i64, Window> {
347 let mut windows: HashMap<i64, Window> = HashMap::new();
348 for f in files {
350 if f.compacting() {
351 continue;
352 }
353 let (_, end) = f.time_range();
354 let time_window = end
355 .convert_to(TimeUnit::Second)
356 .unwrap()
357 .value()
358 .align_to_ceil_by_bucket(time_window_size)
359 .unwrap_or(i64::MIN);
360
361 match windows.entry(time_window) {
362 Entry::Occupied(mut e) => {
363 e.get_mut().add_file(f.clone());
364 }
365 Entry::Vacant(e) => {
366 let mut window = Window::new_with_file(f.clone());
367 window.time_window = time_window;
368 e.insert(window);
369 }
370 }
371 }
372 windows.into_iter().collect()
373}
374
375fn window_has_overlap(this: &Window, windows: &BTreeMap<i64, Window>) -> bool {
376 windows
377 .values()
378 .filter(|that| this.time_window != that.time_window)
379 .any(|that| {
380 overlaps(&this.range(), &that.range()) && {
381 match (&this.primary_key_range, &that.primary_key_range) {
382 (Some(l), Some(r)) => primary_key_ranges_overlap(l, r),
383 _ => true,
384 }
385 }
386 })
387}
388
389fn find_latest_window_in_seconds<'a>(
392 files: impl Iterator<Item = &'a FileHandle>,
393 time_window_size: i64,
394) -> Option<i64> {
395 let mut latest_timestamp = None;
396 for f in files {
397 let (_, end) = f.time_range();
398 if let Some(latest) = latest_timestamp {
399 if end > latest {
400 latest_timestamp = Some(end);
401 }
402 } else {
403 latest_timestamp = Some(end);
404 }
405 }
406 latest_timestamp
407 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
408 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
409}
410
411#[cfg(test)]
412mod tests {
413 use std::collections::HashSet;
414
415 use bytes::Bytes;
416 use store_api::storage::FileId;
417
418 use super::*;
419 use crate::compaction::test_util::{
420 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
421 new_file_handle_with_size_sequence_and_primary_key_range,
422 };
423 use crate::sst::file::Level;
424
425 #[test]
426 fn test_get_latest_window_in_seconds() {
427 assert_eq!(
428 Some(1),
429 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
430 );
431 assert_eq!(
432 Some(1),
433 find_latest_window_in_seconds(
434 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
435 1
436 )
437 );
438
439 assert_eq!(
440 Some(-9223372036854000),
441 find_latest_window_in_seconds(
442 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
443 3600,
444 )
445 );
446
447 assert_eq!(
448 (i64::MAX / 10000000 + 1) * 10000,
449 find_latest_window_in_seconds(
450 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
451 10000,
452 )
453 .unwrap()
454 );
455
456 assert_eq!(
457 Some((i64::MAX / 3600000 + 1) * 3600),
458 find_latest_window_in_seconds(
459 [
460 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
461 new_file_handle(FileId::random(), 0, 1000, 0)
462 ]
463 .iter(),
464 3600
465 )
466 );
467 }
468
469 #[test]
470 fn test_assign_to_windows() {
471 let windows = assign_to_windows(
472 [
473 new_file_handle(FileId::random(), 0, 999, 0),
474 new_file_handle(FileId::random(), 0, 999, 0),
475 new_file_handle(FileId::random(), 0, 999, 0),
476 new_file_handle(FileId::random(), 0, 999, 0),
477 new_file_handle(FileId::random(), 0, 999, 0),
478 ]
479 .iter(),
480 3,
481 );
482 let fgs = &windows.get(&0).unwrap().files;
483 assert_eq!(1, fgs.len());
484 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
485
486 let files = [FileId::random(); 3];
487 let windows = assign_to_windows(
488 [
489 new_file_handle(files[0], -2000, -3, 0),
490 new_file_handle(files[1], 0, 2999, 0),
491 new_file_handle(files[2], 50, 10001, 0),
492 ]
493 .iter(),
494 3,
495 );
496 assert_eq!(
497 files[0],
498 windows.get(&0).unwrap().files().next().unwrap().files()[0]
499 .file_id()
500 .file_id()
501 );
502 assert_eq!(
503 files[1],
504 windows.get(&3).unwrap().files().next().unwrap().files()[0]
505 .file_id()
506 .file_id()
507 );
508 assert_eq!(
509 files[2],
510 windows.get(&12).unwrap().files().next().unwrap().files()[0]
511 .file_id()
512 .file_id()
513 );
514 }
515
516 #[test]
517 fn test_assign_file_groups_to_windows() {
518 let files = [
519 FileId::random(),
520 FileId::random(),
521 FileId::random(),
522 FileId::random(),
523 ];
524 let windows = assign_to_windows(
525 [
526 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
527 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
528 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
529 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
530 ]
531 .iter(),
532 3,
533 );
534 assert_eq!(windows.len(), 1);
535 let fgs = &windows.get(&0).unwrap().files;
536 assert_eq!(2, fgs.len());
537 assert_eq!(
538 fgs.get(&NonZeroU64::new(1))
539 .unwrap()
540 .files()
541 .iter()
542 .map(|f| f.file_id().file_id())
543 .collect::<HashSet<_>>(),
544 [files[0], files[1]].into_iter().collect()
545 );
546 assert_eq!(
547 fgs.get(&NonZeroU64::new(2))
548 .unwrap()
549 .files()
550 .iter()
551 .map(|f| f.file_id().file_id())
552 .collect::<HashSet<_>>(),
553 [files[2], files[3]].into_iter().collect()
554 );
555 }
556
557 #[test]
558 fn test_assign_compacting_to_windows() {
559 let files = [
560 new_file_handle(FileId::random(), 0, 999, 0),
561 new_file_handle(FileId::random(), 0, 999, 0),
562 new_file_handle(FileId::random(), 0, 999, 0),
563 new_file_handle(FileId::random(), 0, 999, 0),
564 new_file_handle(FileId::random(), 0, 999, 0),
565 ];
566 files[0].set_compacting(true);
567 files[2].set_compacting(true);
568 let mut windows = assign_to_windows(files.iter(), 3);
569 let window0 = windows.remove(&0).unwrap();
570 assert_eq!(1, window0.files.len());
571 let candidates = window0
572 .files
573 .into_values()
574 .flat_map(|fg| fg.into_files())
575 .map(|f| f.file_id().file_id())
576 .collect::<HashSet<_>>();
577 assert_eq!(candidates.len(), 3);
578 assert_eq!(
579 candidates,
580 [
581 files[1].file_id().file_id(),
582 files[3].file_id().file_id(),
583 files[4].file_id().file_id()
584 ]
585 .into_iter()
586 .collect::<HashSet<_>>()
587 );
588 }
589
590 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
592
593 fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
594 Some((Bytes::from_static(min), Bytes::from_static(max)))
595 }
596
597 fn check_assign_to_windows_with_overlapping(
598 file_time_ranges: &[(i64, i64)],
599 time_window: i64,
600 expected_files: &[ExpectedWindowSpec],
601 ) {
602 let files: Vec<_> = (0..file_time_ranges.len())
603 .map(|_| FileId::random())
604 .collect();
605
606 let file_handles = files
607 .iter()
608 .zip(file_time_ranges.iter())
609 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
610 .collect::<Vec<_>>();
611
612 let windows = assign_to_windows(file_handles.iter(), time_window);
613
614 for (expected_window, overlapping, window_files) in expected_files {
615 let actual_window = windows.get(expected_window).unwrap();
616 let actual_overlapping = window_has_overlap(actual_window, &windows);
617 assert_eq!(*overlapping, actual_overlapping);
618 let mut file_ranges = actual_window
619 .files
620 .values()
621 .flat_map(|f| {
622 f.files().iter().map(|f| {
623 let (s, e) = f.time_range();
624 (s.value(), e.value())
625 })
626 })
627 .collect::<Vec<_>>();
628 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
629 assert_eq!(window_files, &file_ranges);
630 }
631 }
632
633 #[test]
634 fn test_assign_to_windows_with_overlapping() {
635 check_assign_to_windows_with_overlapping(
636 &[(0, 999), (1000, 1999), (2000, 2999)],
637 2,
638 &[
639 (0, false, vec![(0, 999)]),
640 (2, false, vec![(1000, 1999), (2000, 2999)]),
641 ],
642 );
643
644 check_assign_to_windows_with_overlapping(
645 &[(0, 1), (0, 999), (100, 2999)],
646 2,
647 &[
648 (0, true, vec![(0, 1), (0, 999)]),
649 (2, true, vec![(100, 2999)]),
650 ],
651 );
652
653 check_assign_to_windows_with_overlapping(
654 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
655 2,
656 &[
657 (0, false, vec![(0, 999)]),
658 (2, false, vec![(1000, 1999), (2000, 2999)]),
659 (4, false, vec![(3000, 3999)]),
660 ],
661 );
662
663 check_assign_to_windows_with_overlapping(
664 &[
665 (0, 999),
666 (1000, 1999),
667 (2000, 2999),
668 (3000, 3999),
669 (0, 3999),
670 ],
671 2,
672 &[
673 (0, true, vec![(0, 999)]),
674 (2, true, vec![(1000, 1999), (2000, 2999)]),
675 (4, true, vec![(0, 3999), (3000, 3999)]),
676 ],
677 );
678
679 check_assign_to_windows_with_overlapping(
680 &[
681 (0, 999),
682 (1000, 1999),
683 (2000, 2999),
684 (3000, 3999),
685 (1999, 3999),
686 ],
687 2,
688 &[
689 (0, false, vec![(0, 999)]),
690 (2, true, vec![(1000, 1999), (2000, 2999)]),
691 (4, true, vec![(1999, 3999), (3000, 3999)]),
692 ],
693 );
694
695 check_assign_to_windows_with_overlapping(
696 &[
697 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
703 2,
704 &[
705 (0, false, vec![(0, 999)]),
707 (2, true, vec![(1000, 1999), (2000, 2999)]),
708 (4, true, vec![(2999, 3999), (3000, 3999)]),
709 ],
710 );
711
712 check_assign_to_windows_with_overlapping(
713 &[
714 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
720 2,
721 &[
722 (0, true, vec![(0, 999)]),
724 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
725 (4, false, vec![(3000, 3999)]),
726 ],
727 );
728 }
729
730 #[test]
731 fn test_assign_to_windows_not_overlapping_when_pk_disjoint() {
732 let files = [
733 new_file_handle_with_size_sequence_and_primary_key_range(
734 FileId::random(),
735 0,
736 1000,
737 0,
738 1,
739 10,
740 pk_range(b"a", b"f"),
741 ),
742 new_file_handle_with_size_sequence_and_primary_key_range(
743 FileId::random(),
744 500,
745 1999,
746 0,
747 2,
748 10,
749 pk_range(b"x", b"z"),
750 ),
751 ];
752
753 let windows = assign_to_windows(files.iter(), 2);
754
755 let overlapping = window_has_overlap(windows.get(&2).unwrap(), &windows);
756 assert!(!overlapping);
757 }
758
759 #[test]
760 fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() {
761 let files = [
762 new_file_handle(FileId::random(), 0, 1999, 0),
763 new_file_handle_with_size_sequence_and_primary_key_range(
764 FileId::random(),
765 2000,
766 3999,
767 0,
768 1,
769 10,
770 pk_range(b"a", b"f"),
771 ),
772 new_file_handle_with_size_sequence_and_primary_key_range(
773 FileId::random(),
774 3000,
775 4999,
776 0,
777 2,
778 10,
779 pk_range(b"x", b"z"),
780 ),
781 ];
782
783 let windows = assign_to_windows(files.iter(), 2);
784
785 let overlapping = window_has_overlap(windows.get(&4).unwrap(), &windows);
786 assert!(!overlapping);
787 }
788
789 struct CompactionPickerTestCase {
790 window_size: i64,
791 input_files: Vec<FileHandle>,
792 expected_outputs: Vec<ExpectedOutput>,
793 }
794
795 impl CompactionPickerTestCase {
796 fn check(&self) {
797 let file_id_to_idx = self
798 .input_files
799 .iter()
800 .enumerate()
801 .map(|(idx, file)| (file.file_id(), idx))
802 .collect::<HashMap<_, _>>();
803 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
804 let active_window =
805 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
806 let output = TwcsPicker {
807 trigger_file_num: 4,
808 time_window_seconds: None,
809 max_output_file_size: None,
810 append_mode: false,
811 max_background_tasks: None,
812 }
813 .build_output(RegionId::from_u64(0), &mut windows, active_window);
814
815 let output = output
816 .iter()
817 .map(|o| {
818 let input_file_ids = o
819 .inputs
820 .iter()
821 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
822 .collect::<HashSet<_>>();
823 (input_file_ids, o.output_level)
824 })
825 .collect::<Vec<_>>();
826
827 let expected = self
828 .expected_outputs
829 .iter()
830 .map(|o| {
831 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
832 (input_file_ids, o.output_level)
833 })
834 .collect::<Vec<_>>();
835 assert_eq!(expected, output);
836 }
837 }
838
839 struct ExpectedOutput {
840 input_files: Vec<usize>,
841 output_level: Level,
842 }
843
844 #[test]
845 fn test_build_twcs_output() {
846 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
847
848 CompactionPickerTestCase {
850 window_size: 3,
851 input_files: [
852 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
853 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
854 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
857 .to_vec(),
858 expected_outputs: vec![
859 ExpectedOutput {
860 input_files: vec![0, 1],
861 output_level: 1,
862 },
863 ExpectedOutput {
864 input_files: vec![2, 3],
865 output_level: 1,
866 },
867 ],
868 }
869 .check();
870
871 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
878 CompactionPickerTestCase {
879 window_size: 3,
880 input_files: [
881 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
882 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
883 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
884 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
885 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
886 ]
887 .to_vec(),
888 expected_outputs: vec![
889 ExpectedOutput {
890 input_files: vec![0, 1],
891 output_level: 1,
892 },
893 ExpectedOutput {
894 input_files: vec![2, 4],
895 output_level: 1,
896 },
897 ],
898 }
899 .check();
900
901 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
905 CompactionPickerTestCase {
906 window_size: 3,
907 input_files: [
908 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
909 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
910 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
911 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
912 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
913 ]
914 .to_vec(),
915 expected_outputs: vec![ExpectedOutput {
916 input_files: vec![0, 1, 4],
917 output_level: 1,
918 }],
919 }
920 .check();
921 }
922
923 #[test]
924 fn test_build_output_skips_pk_disjoint_files() {
925 let files = [
926 new_file_handle_with_size_sequence_and_primary_key_range(
927 FileId::random(),
928 0,
929 2999,
930 0,
931 1,
932 10,
933 pk_range(b"a", b"f"),
934 ),
935 new_file_handle_with_size_sequence_and_primary_key_range(
936 FileId::random(),
937 50,
938 2998,
939 0,
940 2,
941 10,
942 pk_range(b"x", b"z"),
943 ),
944 ];
945 let mut windows = assign_to_windows(files.iter(), 3);
946 let active_window = find_latest_window_in_seconds(files.iter(), 3);
947 let output = TwcsPicker {
948 trigger_file_num: 4,
949 time_window_seconds: None,
950 max_output_file_size: None,
951 append_mode: false,
952 max_background_tasks: None,
953 }
954 .build_output(RegionId::from_u64(0), &mut windows, active_window);
955
956 assert!(output.is_empty());
957 }
958
959 #[test]
960 fn test_append_mode_filter_large_files() {
961 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
962 let max_output_file_size = 1000u64;
963
964 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
966 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
967 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
968 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
969
970 let mut files_to_merge = vec![
972 FileGroup::new_with_file(small_file_1),
973 FileGroup::new_with_file(large_file_1),
974 FileGroup::new_with_file(small_file_2),
975 FileGroup::new_with_file(large_file_2),
976 ];
977
978 let original_count = files_to_merge.len();
980
981 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
983
984 assert_eq!(files_to_merge.len(), 2);
986 assert_eq!(original_count, 4);
987
988 for fg in &files_to_merge {
990 assert!(
991 fg.size() <= max_output_file_size as usize,
992 "File size {} should be <= {}",
993 fg.size(),
994 max_output_file_size
995 );
996 }
997 }
998
999 #[test]
1000 fn test_build_output_multiple_windows_with_zero_runs() {
1001 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
1002
1003 let files = [
1004 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1006 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1007 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1008 new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
1010 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1011 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1012 ];
1013
1014 let mut windows = assign_to_windows(files.iter(), 3);
1015
1016 let picker = TwcsPicker {
1018 trigger_file_num: 4, time_window_seconds: Some(3),
1020 max_output_file_size: None,
1021 append_mode: false,
1022 max_background_tasks: None,
1023 };
1024
1025 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1026 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1027
1028 assert!(
1029 !output.is_empty(),
1030 "Should have output from windows with runs, even when one window has 0 runs"
1031 );
1032
1033 let all_output_files: Vec<_> = output
1034 .iter()
1035 .flat_map(|o| o.inputs.iter())
1036 .map(|f| f.file_id().file_id())
1037 .collect();
1038
1039 assert!(
1040 all_output_files.contains(&file_ids[3])
1041 || all_output_files.contains(&file_ids[4])
1042 || all_output_files.contains(&file_ids[5]),
1043 "Output should contain files from the window with runs"
1044 );
1045 }
1046
1047 #[test]
1048 fn test_build_output_single_window_zero_runs() {
1049 let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
1050
1051 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); let files = [large_file_1, large_file_2];
1055
1056 let mut windows = assign_to_windows(files.iter(), 3);
1057
1058 let picker = TwcsPicker {
1059 trigger_file_num: 2,
1060 time_window_seconds: Some(3),
1061 max_output_file_size: Some(1000),
1062 append_mode: true,
1063 max_background_tasks: None,
1064 };
1065
1066 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1067 let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
1068
1069 assert!(
1071 output.is_empty(),
1072 "Should return empty output when no runs are found after filtering"
1073 );
1074 }
1075
1076 #[test]
1077 fn test_max_background_tasks_truncation() {
1078 let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
1079 let max_background_tasks = 3;
1080
1081 let files = [
1083 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1085 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1086 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1087 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1088 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1090 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1091 new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
1092 new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
1093 new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
1095 new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
1096 ];
1097
1098 let mut windows = assign_to_windows(files.iter(), 3);
1099
1100 let picker = TwcsPicker {
1101 trigger_file_num: 4,
1102 time_window_seconds: Some(3),
1103 max_output_file_size: None,
1104 append_mode: false,
1105 max_background_tasks: Some(max_background_tasks),
1106 };
1107
1108 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1109 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1110
1111 assert!(
1113 output.len() <= max_background_tasks,
1114 "Output should be truncated to max_background_tasks: expected <= {}, got {}",
1115 max_background_tasks,
1116 output.len()
1117 );
1118
1119 let picker_no_limit = TwcsPicker {
1121 trigger_file_num: 4,
1122 time_window_seconds: Some(3),
1123 max_output_file_size: None,
1124 append_mode: false,
1125 max_background_tasks: None,
1126 };
1127
1128 let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1129 let output_no_limit = picker_no_limit.build_output(
1130 RegionId::from_u64(123),
1131 &mut windows_no_limit,
1132 active_window,
1133 );
1134
1135 if output_no_limit.len() > max_background_tasks {
1137 assert!(
1138 output_no_limit.len() > output.len(),
1139 "Without limit should have more outputs than with limit"
1140 );
1141 }
1142 }
1143
1144 #[test]
1145 fn test_max_background_tasks_no_truncation_when_under_limit() {
1146 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1147 let max_background_tasks = 10; let files = [
1151 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1152 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1153 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1154 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1155 ];
1156
1157 let mut windows = assign_to_windows(files.iter(), 3);
1158
1159 let picker = TwcsPicker {
1160 trigger_file_num: 4,
1161 time_window_seconds: Some(3),
1162 max_output_file_size: None,
1163 append_mode: false,
1164 max_background_tasks: Some(max_background_tasks),
1165 };
1166
1167 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1168 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1169
1170 assert!(
1172 output.len() <= max_background_tasks,
1173 "Output should be within limit"
1174 );
1175 assert!(!output.is_empty(), "Should have at least one output");
1177 }
1178
1179 #[test]
1180 fn test_pick_multiple_runs() {
1181 common_telemetry::init_default_ut_logging();
1182
1183 let num_files = 8;
1184 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1185
1186 let files: Vec<_> = file_ids
1188 .iter()
1189 .enumerate()
1190 .map(|(idx, file_id)| {
1191 new_file_handle_with_size_and_sequence(
1192 *file_id,
1193 0,
1194 999,
1195 0,
1196 (idx + 1) as u64,
1197 1024 * 1024,
1198 )
1199 })
1200 .collect();
1201
1202 let mut windows = assign_to_windows(files.iter(), 3);
1203
1204 let picker = TwcsPicker {
1205 trigger_file_num: 4,
1206 time_window_seconds: Some(3),
1207 max_output_file_size: None,
1208 append_mode: false,
1209 max_background_tasks: None,
1210 };
1211
1212 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1213 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1214
1215 assert_eq!(1, output.len());
1216 assert_eq!(output[0].inputs.len(), 2);
1217 }
1218
1219 #[test]
1220 fn test_limit_max_input_files() {
1221 common_telemetry::init_default_ut_logging();
1222
1223 let num_files = 50;
1224 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1225
1226 let files: Vec<_> = file_ids
1228 .iter()
1229 .enumerate()
1230 .map(|(idx, file_id)| {
1231 new_file_handle_with_size_and_sequence(
1232 *file_id,
1233 (idx / 2 * 10) as i64,
1234 (idx / 2 * 10 + 5) as i64,
1235 0,
1236 (idx + 1) as u64,
1237 1024 * 1024,
1238 )
1239 })
1240 .collect();
1241
1242 let mut windows = assign_to_windows(files.iter(), 3);
1243
1244 let picker = TwcsPicker {
1245 trigger_file_num: 4,
1246 time_window_seconds: Some(3),
1247 max_output_file_size: None,
1248 append_mode: false,
1249 max_background_tasks: None,
1250 };
1251
1252 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1253 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1254
1255 assert_eq!(1, output.len());
1256 assert_eq!(output[0].inputs.len(), 32);
1257 }
1258
1259 }