1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 FileGroup, Item, Ranged, find_sorted_runs, merge_primary_key_ranges, merge_seq_files,
32 primary_key_ranges_overlap, reduce_runs,
33};
34use crate::compaction::{CompactionOutput, get_expired_ssts};
35use crate::sst::file::{FileHandle, Level, overlaps};
36use crate::sst::version::LevelMeta;
37
38const LEVEL_COMPACTED: Level = 1;
39
40const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
42
43#[derive(Debug)]
46pub struct TwcsPicker {
47 pub trigger_file_num: usize,
49 pub time_window_seconds: Option<i64>,
51 pub max_output_file_size: Option<u64>,
53 pub append_mode: bool,
55 pub max_background_tasks: Option<usize>,
57}
58
59impl TwcsPicker {
60 fn build_output(
62 &self,
63 region_id: RegionId,
64 time_windows: &mut BTreeMap<i64, Window>,
65 active_window: Option<i64>,
66 ) -> Vec<CompactionOutput> {
67 let mut output = vec![];
68 for (window, files) in time_windows {
69 if files.files.is_empty() {
70 continue;
71 }
72 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
73
74 if self.append_mode
76 && let Some(max_size) = self.max_output_file_size
77 {
78 let (kept_files, ignored_files) = files_to_merge
79 .into_iter()
80 .partition(|fg| fg.size() <= max_size as usize);
81 files_to_merge = kept_files;
82 info!(
83 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
84 ignored_files.len(),
85 region_id,
86 window,
87 max_size
88 );
89 }
90
91 let sorted_runs = find_sorted_runs(&mut files_to_merge);
92 let found_runs = sorted_runs.len();
93 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
96 if found_runs == 0 {
97 continue;
98 }
99
100 let mut inputs = if found_runs > 1 {
101 reduce_runs(sorted_runs)
102 } else {
103 let run = sorted_runs.last().unwrap();
104 if run.items().len() < self.trigger_file_num {
105 continue;
106 }
107 merge_seq_files(run.items(), self.max_output_file_size)
109 };
110
111 let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
113 if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
114 inputs.sort_unstable_by_key(|fg| fg.size());
116 let mut num_picked_files = 0;
117 inputs = inputs
118 .into_iter()
119 .take_while(|fg| {
120 let current_group_file_num = fg.num_files();
121 if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
122 num_picked_files += current_group_file_num;
123 true
124 } else {
125 false
126 }
127 })
128 .collect::<Vec<_>>();
129 info!(
130 "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
131 region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
132 );
133 }
134
135 if inputs.len() > 1 {
136 log_pick_result(
138 region_id,
139 *window,
140 active_window,
141 found_runs,
142 files.files.len(),
143 self.max_output_file_size,
144 filter_deleted,
145 &inputs,
146 );
147 output.push(CompactionOutput {
148 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
150 filter_deleted,
151 output_time_range: None, });
153
154 if let Some(max_background_tasks) = self.max_background_tasks
155 && output.len() >= max_background_tasks
156 {
157 debug!(
158 "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
159 region_id, max_background_tasks
160 );
161 break;
162 }
163 }
164 }
165 output
166 }
167}
168
169#[allow(clippy::too_many_arguments)]
170fn log_pick_result(
171 region_id: RegionId,
172 window: i64,
173 active_window: Option<i64>,
174 found_runs: usize,
175 file_num: usize,
176 max_output_file_size: Option<u64>,
177 filter_deleted: bool,
178 inputs: &[FileGroup],
179) {
180 let input_file_str: Vec<String> = inputs
181 .iter()
182 .map(|f| {
183 let range = f.range();
184 let start = range.0.to_iso8601_string();
185 let end = range.1.to_iso8601_string();
186 let num_rows = f.num_rows();
187 format!(
188 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
189 f.file_ids(),
190 start,
191 end,
192 ReadableSize(f.size() as u64),
193 num_rows
194 )
195 })
196 .collect();
197 let window_str = Timestamp::new_second(window).to_iso8601_string();
198 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
199 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
200 info!(
201 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
202 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
203 input files: {:?}",
204 region_id,
205 window_str,
206 active_window_str,
207 found_runs,
208 file_num,
209 max_output_file_size,
210 filter_deleted,
211 input_file_str
212 );
213}
214
215impl Picker for TwcsPicker {
216 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
217 let region_id = compaction_region.region_id;
218 let levels = compaction_region.current_version.ssts.levels();
219
220 let expired_ssts =
221 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
222 if !expired_ssts.is_empty() {
223 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
224 expired_ssts.iter().for_each(|f| f.set_compacting(true));
226 }
227
228 let compaction_time_window = compaction_region
229 .current_version
230 .compaction_time_window
231 .map(|window| window.as_secs() as i64);
232 let time_window_size = compaction_time_window
233 .or(self.time_window_seconds)
234 .unwrap_or_else(|| {
235 let inferred = infer_time_bucket(levels[0].files());
236 info!(
237 "Compaction window for region {} is not present, inferring from files: {:?}",
238 region_id, inferred
239 );
240 inferred
241 });
242
243 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
245 let mut windows =
247 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
248 let outputs = self.build_output(region_id, &mut windows, active_window);
249
250 if outputs.is_empty() && expired_ssts.is_empty() {
251 return None;
252 }
253
254 let max_file_size = self.max_output_file_size.map(|v| v as usize);
255 Some(PickerOutput {
256 outputs,
257 expired_ssts,
258 time_window_size,
259 max_file_size,
260 })
261 }
262}
263
264struct Window {
265 start: Timestamp,
266 end: Timestamp,
267 files: HashMap<Option<NonZeroU64>, FileGroup>,
270 time_window: i64,
271 overlapping: bool,
272 primary_key_range: Option<(bytes::Bytes, bytes::Bytes)>,
273}
274
275impl Window {
276 fn new_with_file(file: FileHandle) -> Self {
278 let (start, end) = file.time_range();
279 let primary_key_range = file.primary_key_range();
280 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
281 Self {
282 start,
283 end,
284 files,
285 time_window: 0,
286 overlapping: false,
287 primary_key_range,
288 }
289 }
290
291 fn range(&self) -> (Timestamp, Timestamp) {
293 (self.start, self.end)
294 }
295
296 fn add_file(&mut self, file: FileHandle) {
298 let (start, end) = file.time_range();
299 self.start = self.start.min(start);
300 self.end = self.end.max(end);
301 self.primary_key_range =
302 merge_primary_key_ranges(self.primary_key_range.take(), file.primary_key_range());
303
304 match self.files.entry(file.meta_ref().sequence) {
305 Entry::Occupied(mut o) => {
306 o.get_mut().add_file(file);
307 }
308 Entry::Vacant(v) => {
309 v.insert(FileGroup::new_with_file(file));
310 }
311 }
312 }
313
314 fn files(&self) -> impl Iterator<Item = &FileGroup> {
315 self.files.values()
316 }
317}
318
319fn assign_to_windows<'a>(
321 files: impl Iterator<Item = &'a FileHandle>,
322 time_window_size: i64,
323) -> BTreeMap<i64, Window> {
324 let mut windows: HashMap<i64, Window> = HashMap::new();
325 for f in files {
327 if f.compacting() {
328 continue;
329 }
330 let (_, end) = f.time_range();
331 let time_window = end
332 .convert_to(TimeUnit::Second)
333 .unwrap()
334 .value()
335 .align_to_ceil_by_bucket(time_window_size)
336 .unwrap_or(i64::MIN);
337
338 match windows.entry(time_window) {
339 Entry::Occupied(mut e) => {
340 e.get_mut().add_file(f.clone());
341 }
342 Entry::Vacant(e) => {
343 let mut window = Window::new_with_file(f.clone());
344 window.time_window = time_window;
345 e.insert(window);
346 }
347 }
348 }
349 if windows.is_empty() {
350 return BTreeMap::new();
351 }
352
353 let mut windows = windows.into_values().collect::<Vec<_>>();
354 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
355
356 for idx in 0..windows.len() {
357 let lhs_range = windows[idx].range();
358 for next_idx in idx + 1..windows.len() {
359 let rhs_range = windows[next_idx].range();
360 if rhs_range.0 > lhs_range.1 {
361 break;
362 }
363
364 let windows_overlap = overlaps(&lhs_range, &rhs_range)
365 && match (
366 &windows[idx].primary_key_range,
367 &windows[next_idx].primary_key_range,
368 ) {
369 (Some(lhs), Some(rhs)) => primary_key_ranges_overlap(lhs, rhs),
370 _ => true,
371 };
372 if windows_overlap {
373 windows[idx].overlapping = true;
374 windows[next_idx].overlapping = true;
375 }
376 }
377 }
378
379 windows.into_iter().map(|w| (w.time_window, w)).collect()
380}
381
382fn find_latest_window_in_seconds<'a>(
385 files: impl Iterator<Item = &'a FileHandle>,
386 time_window_size: i64,
387) -> Option<i64> {
388 let mut latest_timestamp = None;
389 for f in files {
390 let (_, end) = f.time_range();
391 if let Some(latest) = latest_timestamp {
392 if end > latest {
393 latest_timestamp = Some(end);
394 }
395 } else {
396 latest_timestamp = Some(end);
397 }
398 }
399 latest_timestamp
400 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
401 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
402}
403
404#[cfg(test)]
405mod tests {
406 use std::collections::HashSet;
407
408 use bytes::Bytes;
409 use store_api::storage::FileId;
410
411 use super::*;
412 use crate::compaction::test_util::{
413 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
414 new_file_handle_with_size_sequence_and_primary_key_range,
415 };
416 use crate::sst::file::Level;
417
418 #[test]
419 fn test_get_latest_window_in_seconds() {
420 assert_eq!(
421 Some(1),
422 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
423 );
424 assert_eq!(
425 Some(1),
426 find_latest_window_in_seconds(
427 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
428 1
429 )
430 );
431
432 assert_eq!(
433 Some(-9223372036854000),
434 find_latest_window_in_seconds(
435 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
436 3600,
437 )
438 );
439
440 assert_eq!(
441 (i64::MAX / 10000000 + 1) * 10000,
442 find_latest_window_in_seconds(
443 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
444 10000,
445 )
446 .unwrap()
447 );
448
449 assert_eq!(
450 Some((i64::MAX / 3600000 + 1) * 3600),
451 find_latest_window_in_seconds(
452 [
453 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
454 new_file_handle(FileId::random(), 0, 1000, 0)
455 ]
456 .iter(),
457 3600
458 )
459 );
460 }
461
462 #[test]
463 fn test_assign_to_windows() {
464 let windows = assign_to_windows(
465 [
466 new_file_handle(FileId::random(), 0, 999, 0),
467 new_file_handle(FileId::random(), 0, 999, 0),
468 new_file_handle(FileId::random(), 0, 999, 0),
469 new_file_handle(FileId::random(), 0, 999, 0),
470 new_file_handle(FileId::random(), 0, 999, 0),
471 ]
472 .iter(),
473 3,
474 );
475 let fgs = &windows.get(&0).unwrap().files;
476 assert_eq!(1, fgs.len());
477 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
478
479 let files = [FileId::random(); 3];
480 let windows = assign_to_windows(
481 [
482 new_file_handle(files[0], -2000, -3, 0),
483 new_file_handle(files[1], 0, 2999, 0),
484 new_file_handle(files[2], 50, 10001, 0),
485 ]
486 .iter(),
487 3,
488 );
489 assert_eq!(
490 files[0],
491 windows.get(&0).unwrap().files().next().unwrap().files()[0]
492 .file_id()
493 .file_id()
494 );
495 assert_eq!(
496 files[1],
497 windows.get(&3).unwrap().files().next().unwrap().files()[0]
498 .file_id()
499 .file_id()
500 );
501 assert_eq!(
502 files[2],
503 windows.get(&12).unwrap().files().next().unwrap().files()[0]
504 .file_id()
505 .file_id()
506 );
507 }
508
509 #[test]
510 fn test_assign_file_groups_to_windows() {
511 let files = [
512 FileId::random(),
513 FileId::random(),
514 FileId::random(),
515 FileId::random(),
516 ];
517 let windows = assign_to_windows(
518 [
519 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
520 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
521 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
522 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
523 ]
524 .iter(),
525 3,
526 );
527 assert_eq!(windows.len(), 1);
528 let fgs = &windows.get(&0).unwrap().files;
529 assert_eq!(2, fgs.len());
530 assert_eq!(
531 fgs.get(&NonZeroU64::new(1))
532 .unwrap()
533 .files()
534 .iter()
535 .map(|f| f.file_id().file_id())
536 .collect::<HashSet<_>>(),
537 [files[0], files[1]].into_iter().collect()
538 );
539 assert_eq!(
540 fgs.get(&NonZeroU64::new(2))
541 .unwrap()
542 .files()
543 .iter()
544 .map(|f| f.file_id().file_id())
545 .collect::<HashSet<_>>(),
546 [files[2], files[3]].into_iter().collect()
547 );
548 }
549
550 #[test]
551 fn test_assign_compacting_to_windows() {
552 let files = [
553 new_file_handle(FileId::random(), 0, 999, 0),
554 new_file_handle(FileId::random(), 0, 999, 0),
555 new_file_handle(FileId::random(), 0, 999, 0),
556 new_file_handle(FileId::random(), 0, 999, 0),
557 new_file_handle(FileId::random(), 0, 999, 0),
558 ];
559 files[0].set_compacting(true);
560 files[2].set_compacting(true);
561 let mut windows = assign_to_windows(files.iter(), 3);
562 let window0 = windows.remove(&0).unwrap();
563 assert_eq!(1, window0.files.len());
564 let candidates = window0
565 .files
566 .into_values()
567 .flat_map(|fg| fg.into_files())
568 .map(|f| f.file_id().file_id())
569 .collect::<HashSet<_>>();
570 assert_eq!(candidates.len(), 3);
571 assert_eq!(
572 candidates,
573 [
574 files[1].file_id().file_id(),
575 files[3].file_id().file_id(),
576 files[4].file_id().file_id()
577 ]
578 .into_iter()
579 .collect::<HashSet<_>>()
580 );
581 }
582
583 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
585
586 fn pk_range(min: &'static [u8], max: &'static [u8]) -> Option<(Bytes, Bytes)> {
587 Some((Bytes::from_static(min), Bytes::from_static(max)))
588 }
589
590 fn check_assign_to_windows_with_overlapping(
591 file_time_ranges: &[(i64, i64)],
592 time_window: i64,
593 expected_files: &[ExpectedWindowSpec],
594 ) {
595 let files: Vec<_> = (0..file_time_ranges.len())
596 .map(|_| FileId::random())
597 .collect();
598
599 let file_handles = files
600 .iter()
601 .zip(file_time_ranges.iter())
602 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
603 .collect::<Vec<_>>();
604
605 let windows = assign_to_windows(file_handles.iter(), time_window);
606
607 for (expected_window, overlapping, window_files) in expected_files {
608 let actual_window = windows.get(expected_window).unwrap();
609 assert_eq!(*overlapping, actual_window.overlapping);
610 let mut file_ranges = actual_window
611 .files
612 .values()
613 .flat_map(|f| {
614 f.files().iter().map(|f| {
615 let (s, e) = f.time_range();
616 (s.value(), e.value())
617 })
618 })
619 .collect::<Vec<_>>();
620 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
621 assert_eq!(window_files, &file_ranges);
622 }
623 }
624
625 #[test]
626 fn test_assign_to_windows_with_overlapping() {
627 check_assign_to_windows_with_overlapping(
628 &[(0, 999), (1000, 1999), (2000, 2999)],
629 2,
630 &[
631 (0, false, vec![(0, 999)]),
632 (2, false, vec![(1000, 1999), (2000, 2999)]),
633 ],
634 );
635
636 check_assign_to_windows_with_overlapping(
637 &[(0, 1), (0, 999), (100, 2999)],
638 2,
639 &[
640 (0, true, vec![(0, 1), (0, 999)]),
641 (2, true, vec![(100, 2999)]),
642 ],
643 );
644
645 check_assign_to_windows_with_overlapping(
646 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
647 2,
648 &[
649 (0, false, vec![(0, 999)]),
650 (2, false, vec![(1000, 1999), (2000, 2999)]),
651 (4, false, vec![(3000, 3999)]),
652 ],
653 );
654
655 check_assign_to_windows_with_overlapping(
656 &[
657 (0, 999),
658 (1000, 1999),
659 (2000, 2999),
660 (3000, 3999),
661 (0, 3999),
662 ],
663 2,
664 &[
665 (0, true, vec![(0, 999)]),
666 (2, true, vec![(1000, 1999), (2000, 2999)]),
667 (4, true, vec![(0, 3999), (3000, 3999)]),
668 ],
669 );
670
671 check_assign_to_windows_with_overlapping(
672 &[
673 (0, 999),
674 (1000, 1999),
675 (2000, 2999),
676 (3000, 3999),
677 (1999, 3999),
678 ],
679 2,
680 &[
681 (0, false, vec![(0, 999)]),
682 (2, true, vec![(1000, 1999), (2000, 2999)]),
683 (4, true, vec![(1999, 3999), (3000, 3999)]),
684 ],
685 );
686
687 check_assign_to_windows_with_overlapping(
688 &[
689 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
695 2,
696 &[
697 (0, false, vec![(0, 999)]),
699 (2, true, vec![(1000, 1999), (2000, 2999)]),
700 (4, true, vec![(2999, 3999), (3000, 3999)]),
701 ],
702 );
703
704 check_assign_to_windows_with_overlapping(
705 &[
706 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
712 2,
713 &[
714 (0, true, vec![(0, 999)]),
716 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
717 (4, false, vec![(3000, 3999)]),
718 ],
719 );
720 }
721
722 #[test]
723 fn test_assign_to_windows_not_overlapping_when_pk_disjoint() {
724 let files = [
725 new_file_handle_with_size_sequence_and_primary_key_range(
726 FileId::random(),
727 0,
728 1000,
729 0,
730 1,
731 10,
732 pk_range(b"a", b"f"),
733 ),
734 new_file_handle_with_size_sequence_and_primary_key_range(
735 FileId::random(),
736 500,
737 1999,
738 0,
739 2,
740 10,
741 pk_range(b"x", b"z"),
742 ),
743 ];
744
745 let windows = assign_to_windows(files.iter(), 2);
746
747 assert!(!windows.get(&2).unwrap().overlapping);
748 }
749
750 #[test]
751 fn test_assign_to_windows_pk_unknown_in_earlier_window_does_not_poison_later_windows() {
752 let files = [
753 new_file_handle(FileId::random(), 0, 1999, 0),
754 new_file_handle_with_size_sequence_and_primary_key_range(
755 FileId::random(),
756 2000,
757 3999,
758 0,
759 1,
760 10,
761 pk_range(b"a", b"f"),
762 ),
763 new_file_handle_with_size_sequence_and_primary_key_range(
764 FileId::random(),
765 3000,
766 4999,
767 0,
768 2,
769 10,
770 pk_range(b"x", b"z"),
771 ),
772 ];
773
774 let windows = assign_to_windows(files.iter(), 2);
775
776 assert!(!windows.get(&4).unwrap().overlapping);
777 }
778
779 struct CompactionPickerTestCase {
780 window_size: i64,
781 input_files: Vec<FileHandle>,
782 expected_outputs: Vec<ExpectedOutput>,
783 }
784
785 impl CompactionPickerTestCase {
786 fn check(&self) {
787 let file_id_to_idx = self
788 .input_files
789 .iter()
790 .enumerate()
791 .map(|(idx, file)| (file.file_id(), idx))
792 .collect::<HashMap<_, _>>();
793 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
794 let active_window =
795 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
796 let output = TwcsPicker {
797 trigger_file_num: 4,
798 time_window_seconds: None,
799 max_output_file_size: None,
800 append_mode: false,
801 max_background_tasks: None,
802 }
803 .build_output(RegionId::from_u64(0), &mut windows, active_window);
804
805 let output = output
806 .iter()
807 .map(|o| {
808 let input_file_ids = o
809 .inputs
810 .iter()
811 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
812 .collect::<HashSet<_>>();
813 (input_file_ids, o.output_level)
814 })
815 .collect::<Vec<_>>();
816
817 let expected = self
818 .expected_outputs
819 .iter()
820 .map(|o| {
821 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
822 (input_file_ids, o.output_level)
823 })
824 .collect::<Vec<_>>();
825 assert_eq!(expected, output);
826 }
827 }
828
829 struct ExpectedOutput {
830 input_files: Vec<usize>,
831 output_level: Level,
832 }
833
834 #[test]
835 fn test_build_twcs_output() {
836 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
837
838 CompactionPickerTestCase {
840 window_size: 3,
841 input_files: [
842 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
843 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
844 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
847 .to_vec(),
848 expected_outputs: vec![
849 ExpectedOutput {
850 input_files: vec![0, 1],
851 output_level: 1,
852 },
853 ExpectedOutput {
854 input_files: vec![2, 3],
855 output_level: 1,
856 },
857 ],
858 }
859 .check();
860
861 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
868 CompactionPickerTestCase {
869 window_size: 3,
870 input_files: [
871 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
872 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
873 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
874 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
875 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
876 ]
877 .to_vec(),
878 expected_outputs: vec![
879 ExpectedOutput {
880 input_files: vec![0, 1],
881 output_level: 1,
882 },
883 ExpectedOutput {
884 input_files: vec![2, 4],
885 output_level: 1,
886 },
887 ],
888 }
889 .check();
890
891 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
895 CompactionPickerTestCase {
896 window_size: 3,
897 input_files: [
898 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
899 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
900 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
901 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
902 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
903 ]
904 .to_vec(),
905 expected_outputs: vec![ExpectedOutput {
906 input_files: vec![0, 1, 4],
907 output_level: 1,
908 }],
909 }
910 .check();
911 }
912
913 #[test]
914 fn test_build_output_skips_pk_disjoint_files() {
915 let files = [
916 new_file_handle_with_size_sequence_and_primary_key_range(
917 FileId::random(),
918 0,
919 2999,
920 0,
921 1,
922 10,
923 pk_range(b"a", b"f"),
924 ),
925 new_file_handle_with_size_sequence_and_primary_key_range(
926 FileId::random(),
927 50,
928 2998,
929 0,
930 2,
931 10,
932 pk_range(b"x", b"z"),
933 ),
934 ];
935 let mut windows = assign_to_windows(files.iter(), 3);
936 let active_window = find_latest_window_in_seconds(files.iter(), 3);
937 let output = TwcsPicker {
938 trigger_file_num: 4,
939 time_window_seconds: None,
940 max_output_file_size: None,
941 append_mode: false,
942 max_background_tasks: None,
943 }
944 .build_output(RegionId::from_u64(0), &mut windows, active_window);
945
946 assert!(output.is_empty());
947 }
948
949 #[test]
950 fn test_append_mode_filter_large_files() {
951 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
952 let max_output_file_size = 1000u64;
953
954 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
956 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
957 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
958 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
959
960 let mut files_to_merge = vec![
962 FileGroup::new_with_file(small_file_1),
963 FileGroup::new_with_file(large_file_1),
964 FileGroup::new_with_file(small_file_2),
965 FileGroup::new_with_file(large_file_2),
966 ];
967
968 let original_count = files_to_merge.len();
970
971 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
973
974 assert_eq!(files_to_merge.len(), 2);
976 assert_eq!(original_count, 4);
977
978 for fg in &files_to_merge {
980 assert!(
981 fg.size() <= max_output_file_size as usize,
982 "File size {} should be <= {}",
983 fg.size(),
984 max_output_file_size
985 );
986 }
987 }
988
989 #[test]
990 fn test_build_output_multiple_windows_with_zero_runs() {
991 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
992
993 let files = [
994 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
996 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
997 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
998 new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
1000 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1001 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1002 ];
1003
1004 let mut windows = assign_to_windows(files.iter(), 3);
1005
1006 let picker = TwcsPicker {
1008 trigger_file_num: 4, time_window_seconds: Some(3),
1010 max_output_file_size: None,
1011 append_mode: false,
1012 max_background_tasks: None,
1013 };
1014
1015 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1016 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1017
1018 assert!(
1019 !output.is_empty(),
1020 "Should have output from windows with runs, even when one window has 0 runs"
1021 );
1022
1023 let all_output_files: Vec<_> = output
1024 .iter()
1025 .flat_map(|o| o.inputs.iter())
1026 .map(|f| f.file_id().file_id())
1027 .collect();
1028
1029 assert!(
1030 all_output_files.contains(&file_ids[3])
1031 || all_output_files.contains(&file_ids[4])
1032 || all_output_files.contains(&file_ids[5]),
1033 "Output should contain files from the window with runs"
1034 );
1035 }
1036
1037 #[test]
1038 fn test_build_output_single_window_zero_runs() {
1039 let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
1040
1041 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); let files = [large_file_1, large_file_2];
1045
1046 let mut windows = assign_to_windows(files.iter(), 3);
1047
1048 let picker = TwcsPicker {
1049 trigger_file_num: 2,
1050 time_window_seconds: Some(3),
1051 max_output_file_size: Some(1000),
1052 append_mode: true,
1053 max_background_tasks: None,
1054 };
1055
1056 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1057 let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
1058
1059 assert!(
1061 output.is_empty(),
1062 "Should return empty output when no runs are found after filtering"
1063 );
1064 }
1065
1066 #[test]
1067 fn test_max_background_tasks_truncation() {
1068 let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
1069 let max_background_tasks = 3;
1070
1071 let files = [
1073 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1075 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1076 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1077 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1078 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
1080 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
1081 new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
1082 new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
1083 new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
1085 new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
1086 ];
1087
1088 let mut windows = assign_to_windows(files.iter(), 3);
1089
1090 let picker = TwcsPicker {
1091 trigger_file_num: 4,
1092 time_window_seconds: Some(3),
1093 max_output_file_size: None,
1094 append_mode: false,
1095 max_background_tasks: Some(max_background_tasks),
1096 };
1097
1098 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1099 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1100
1101 assert!(
1103 output.len() <= max_background_tasks,
1104 "Output should be truncated to max_background_tasks: expected <= {}, got {}",
1105 max_background_tasks,
1106 output.len()
1107 );
1108
1109 let picker_no_limit = TwcsPicker {
1111 trigger_file_num: 4,
1112 time_window_seconds: Some(3),
1113 max_output_file_size: None,
1114 append_mode: false,
1115 max_background_tasks: None,
1116 };
1117
1118 let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1119 let output_no_limit = picker_no_limit.build_output(
1120 RegionId::from_u64(123),
1121 &mut windows_no_limit,
1122 active_window,
1123 );
1124
1125 if output_no_limit.len() > max_background_tasks {
1127 assert!(
1128 output_no_limit.len() > output.len(),
1129 "Without limit should have more outputs than with limit"
1130 );
1131 }
1132 }
1133
1134 #[test]
1135 fn test_max_background_tasks_no_truncation_when_under_limit() {
1136 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1137 let max_background_tasks = 10; let files = [
1141 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1142 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1143 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1144 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1145 ];
1146
1147 let mut windows = assign_to_windows(files.iter(), 3);
1148
1149 let picker = TwcsPicker {
1150 trigger_file_num: 4,
1151 time_window_seconds: Some(3),
1152 max_output_file_size: None,
1153 append_mode: false,
1154 max_background_tasks: Some(max_background_tasks),
1155 };
1156
1157 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1158 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1159
1160 assert!(
1162 output.len() <= max_background_tasks,
1163 "Output should be within limit"
1164 );
1165 assert!(!output.is_empty(), "Should have at least one output");
1167 }
1168
1169 #[test]
1170 fn test_pick_multiple_runs() {
1171 common_telemetry::init_default_ut_logging();
1172
1173 let num_files = 8;
1174 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1175
1176 let files: Vec<_> = file_ids
1178 .iter()
1179 .enumerate()
1180 .map(|(idx, file_id)| {
1181 new_file_handle_with_size_and_sequence(
1182 *file_id,
1183 0,
1184 999,
1185 0,
1186 (idx + 1) as u64,
1187 1024 * 1024,
1188 )
1189 })
1190 .collect();
1191
1192 let mut windows = assign_to_windows(files.iter(), 3);
1193
1194 let picker = TwcsPicker {
1195 trigger_file_num: 4,
1196 time_window_seconds: Some(3),
1197 max_output_file_size: None,
1198 append_mode: false,
1199 max_background_tasks: None,
1200 };
1201
1202 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1203 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1204
1205 assert_eq!(1, output.len());
1206 assert_eq!(output[0].inputs.len(), 2);
1207 }
1208
1209 #[test]
1210 fn test_limit_max_input_files() {
1211 common_telemetry::init_default_ut_logging();
1212
1213 let num_files = 50;
1214 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1215
1216 let files: Vec<_> = file_ids
1218 .iter()
1219 .enumerate()
1220 .map(|(idx, file_id)| {
1221 new_file_handle_with_size_and_sequence(
1222 *file_id,
1223 (idx / 2 * 10) as i64,
1224 (idx / 2 * 10 + 5) as i64,
1225 0,
1226 (idx + 1) as u64,
1227 1024 * 1024,
1228 )
1229 })
1230 .collect();
1231
1232 let mut windows = assign_to_windows(files.iter(), 3);
1233
1234 let picker = TwcsPicker {
1235 trigger_file_num: 4,
1236 time_window_seconds: Some(3),
1237 max_output_file_size: None,
1238 append_mode: false,
1239 max_background_tasks: None,
1240 };
1241
1242 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1243 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1244
1245 assert_eq!(1, output.len());
1246 assert_eq!(output[0].inputs.len(), 32);
1247 }
1248
1249 }