1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
41
42#[derive(Debug)]
45pub struct TwcsPicker {
46 pub trigger_file_num: usize,
48 pub time_window_seconds: Option<i64>,
50 pub max_output_file_size: Option<u64>,
52 pub append_mode: bool,
54 pub max_background_tasks: Option<usize>,
56}
57
58impl TwcsPicker {
59 fn build_output(
61 &self,
62 region_id: RegionId,
63 time_windows: &mut BTreeMap<i64, Window>,
64 active_window: Option<i64>,
65 ) -> Vec<CompactionOutput> {
66 let mut output = vec![];
67 for (window, files) in time_windows {
68 if files.files.is_empty() {
69 continue;
70 }
71 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
72
73 if self.append_mode
75 && let Some(max_size) = self.max_output_file_size
76 {
77 let (kept_files, ignored_files) = files_to_merge
78 .into_iter()
79 .partition(|fg| fg.size() <= max_size as usize);
80 files_to_merge = kept_files;
81 info!(
82 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
83 ignored_files.len(),
84 region_id,
85 window,
86 max_size
87 );
88 }
89
90 let sorted_runs = find_sorted_runs(&mut files_to_merge);
91 let found_runs = sorted_runs.len();
92 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
95 if found_runs == 0 {
96 continue;
97 }
98
99 let mut inputs = if found_runs > 1 {
100 reduce_runs(sorted_runs)
101 } else {
102 let run = sorted_runs.last().unwrap();
103 if run.items().len() < self.trigger_file_num {
104 continue;
105 }
106 merge_seq_files(run.items(), self.max_output_file_size)
108 };
109
110 let total_input_files: usize = inputs.iter().map(|fg| fg.num_files()).sum();
112 if total_input_files > DEFAULT_MAX_INPUT_FILE_NUM {
113 inputs.sort_unstable_by_key(|fg| fg.size());
115 let mut num_picked_files = 0;
116 inputs = inputs
117 .into_iter()
118 .take_while(|fg| {
119 let current_group_file_num = fg.num_files();
120 if current_group_file_num + num_picked_files <= DEFAULT_MAX_INPUT_FILE_NUM {
121 num_picked_files += current_group_file_num;
122 true
123 } else {
124 false
125 }
126 })
127 .collect::<Vec<_>>();
128 info!(
129 "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
130 region_id, DEFAULT_MAX_INPUT_FILE_NUM, total_input_files, inputs
131 );
132 }
133
134 if inputs.len() > 1 {
135 log_pick_result(
137 region_id,
138 *window,
139 active_window,
140 found_runs,
141 files.files.len(),
142 self.max_output_file_size,
143 filter_deleted,
144 &inputs,
145 );
146 output.push(CompactionOutput {
147 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
149 filter_deleted,
150 output_time_range: None, });
152
153 if let Some(max_background_tasks) = self.max_background_tasks
154 && output.len() >= max_background_tasks
155 {
156 debug!(
157 "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
158 region_id, max_background_tasks
159 );
160 break;
161 }
162 }
163 }
164 output
165 }
166}
167
168#[allow(clippy::too_many_arguments)]
169fn log_pick_result(
170 region_id: RegionId,
171 window: i64,
172 active_window: Option<i64>,
173 found_runs: usize,
174 file_num: usize,
175 max_output_file_size: Option<u64>,
176 filter_deleted: bool,
177 inputs: &[FileGroup],
178) {
179 let input_file_str: Vec<String> = inputs
180 .iter()
181 .map(|f| {
182 let range = f.range();
183 let start = range.0.to_iso8601_string();
184 let end = range.1.to_iso8601_string();
185 let num_rows = f.num_rows();
186 format!(
187 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
188 f.file_ids(),
189 start,
190 end,
191 ReadableSize(f.size() as u64),
192 num_rows
193 )
194 })
195 .collect();
196 let window_str = Timestamp::new_second(window).to_iso8601_string();
197 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
198 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
199 info!(
200 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
201 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
202 input files: {:?}",
203 region_id,
204 window_str,
205 active_window_str,
206 found_runs,
207 file_num,
208 max_output_file_size,
209 filter_deleted,
210 input_file_str
211 );
212}
213
214impl Picker for TwcsPicker {
215 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
216 let region_id = compaction_region.region_id;
217 let levels = compaction_region.current_version.ssts.levels();
218
219 let expired_ssts =
220 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
221 if !expired_ssts.is_empty() {
222 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
223 expired_ssts.iter().for_each(|f| f.set_compacting(true));
225 }
226
227 let compaction_time_window = compaction_region
228 .current_version
229 .compaction_time_window
230 .map(|window| window.as_secs() as i64);
231 let time_window_size = compaction_time_window
232 .or(self.time_window_seconds)
233 .unwrap_or_else(|| {
234 let inferred = infer_time_bucket(levels[0].files());
235 info!(
236 "Compaction window for region {} is not present, inferring from files: {:?}",
237 region_id, inferred
238 );
239 inferred
240 });
241
242 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
244 let mut windows =
246 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
247 let outputs = self.build_output(region_id, &mut windows, active_window);
248
249 if outputs.is_empty() && expired_ssts.is_empty() {
250 return None;
251 }
252
253 let max_file_size = self.max_output_file_size.map(|v| v as usize);
254 Some(PickerOutput {
255 outputs,
256 expired_ssts,
257 time_window_size,
258 max_file_size,
259 })
260 }
261}
262
263struct Window {
264 start: Timestamp,
265 end: Timestamp,
266 files: HashMap<Option<NonZeroU64>, FileGroup>,
269 time_window: i64,
270 overlapping: bool,
271}
272
273impl Window {
274 fn new_with_file(file: FileHandle) -> Self {
276 let (start, end) = file.time_range();
277 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
278 Self {
279 start,
280 end,
281 files,
282 time_window: 0,
283 overlapping: false,
284 }
285 }
286
287 fn range(&self) -> (Timestamp, Timestamp) {
289 (self.start, self.end)
290 }
291
292 fn add_file(&mut self, file: FileHandle) {
294 let (start, end) = file.time_range();
295 self.start = self.start.min(start);
296 self.end = self.end.max(end);
297
298 match self.files.entry(file.meta_ref().sequence) {
299 Entry::Occupied(mut o) => {
300 o.get_mut().add_file(file);
301 }
302 Entry::Vacant(v) => {
303 v.insert(FileGroup::new_with_file(file));
304 }
305 }
306 }
307
308 fn files(&self) -> impl Iterator<Item = &FileGroup> {
309 self.files.values()
310 }
311}
312
313fn assign_to_windows<'a>(
315 files: impl Iterator<Item = &'a FileHandle>,
316 time_window_size: i64,
317) -> BTreeMap<i64, Window> {
318 let mut windows: HashMap<i64, Window> = HashMap::new();
319 for f in files {
321 if f.compacting() {
322 continue;
323 }
324 let (_, end) = f.time_range();
325 let time_window = end
326 .convert_to(TimeUnit::Second)
327 .unwrap()
328 .value()
329 .align_to_ceil_by_bucket(time_window_size)
330 .unwrap_or(i64::MIN);
331
332 match windows.entry(time_window) {
333 Entry::Occupied(mut e) => {
334 e.get_mut().add_file(f.clone());
335 }
336 Entry::Vacant(e) => {
337 let mut window = Window::new_with_file(f.clone());
338 window.time_window = time_window;
339 e.insert(window);
340 }
341 }
342 }
343 if windows.is_empty() {
344 return BTreeMap::new();
345 }
346
347 let mut windows = windows.into_values().collect::<Vec<_>>();
348 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
349
350 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
353 let next_range = windows[idx].range();
354 if overlaps(¤t_range, &next_range) {
355 windows[idx - 1].overlapping = true;
356 windows[idx].overlapping = true;
357 }
358 current_range = (
359 current_range.0.min(next_range.0),
360 current_range.1.max(next_range.1),
361 );
362 }
363
364 windows.into_iter().map(|w| (w.time_window, w)).collect()
365}
366
367fn find_latest_window_in_seconds<'a>(
370 files: impl Iterator<Item = &'a FileHandle>,
371 time_window_size: i64,
372) -> Option<i64> {
373 let mut latest_timestamp = None;
374 for f in files {
375 let (_, end) = f.time_range();
376 if let Some(latest) = latest_timestamp {
377 if end > latest {
378 latest_timestamp = Some(end);
379 }
380 } else {
381 latest_timestamp = Some(end);
382 }
383 }
384 latest_timestamp
385 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
386 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
387}
388
389#[cfg(test)]
390mod tests {
391 use std::collections::HashSet;
392
393 use store_api::storage::FileId;
394
395 use super::*;
396 use crate::compaction::test_util::{
397 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
398 };
399 use crate::sst::file::Level;
400
401 #[test]
402 fn test_get_latest_window_in_seconds() {
403 assert_eq!(
404 Some(1),
405 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
406 );
407 assert_eq!(
408 Some(1),
409 find_latest_window_in_seconds(
410 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
411 1
412 )
413 );
414
415 assert_eq!(
416 Some(-9223372036854000),
417 find_latest_window_in_seconds(
418 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
419 3600,
420 )
421 );
422
423 assert_eq!(
424 (i64::MAX / 10000000 + 1) * 10000,
425 find_latest_window_in_seconds(
426 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
427 10000,
428 )
429 .unwrap()
430 );
431
432 assert_eq!(
433 Some((i64::MAX / 3600000 + 1) * 3600),
434 find_latest_window_in_seconds(
435 [
436 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
437 new_file_handle(FileId::random(), 0, 1000, 0)
438 ]
439 .iter(),
440 3600
441 )
442 );
443 }
444
445 #[test]
446 fn test_assign_to_windows() {
447 let windows = assign_to_windows(
448 [
449 new_file_handle(FileId::random(), 0, 999, 0),
450 new_file_handle(FileId::random(), 0, 999, 0),
451 new_file_handle(FileId::random(), 0, 999, 0),
452 new_file_handle(FileId::random(), 0, 999, 0),
453 new_file_handle(FileId::random(), 0, 999, 0),
454 ]
455 .iter(),
456 3,
457 );
458 let fgs = &windows.get(&0).unwrap().files;
459 assert_eq!(1, fgs.len());
460 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
461
462 let files = [FileId::random(); 3];
463 let windows = assign_to_windows(
464 [
465 new_file_handle(files[0], -2000, -3, 0),
466 new_file_handle(files[1], 0, 2999, 0),
467 new_file_handle(files[2], 50, 10001, 0),
468 ]
469 .iter(),
470 3,
471 );
472 assert_eq!(
473 files[0],
474 windows.get(&0).unwrap().files().next().unwrap().files()[0]
475 .file_id()
476 .file_id()
477 );
478 assert_eq!(
479 files[1],
480 windows.get(&3).unwrap().files().next().unwrap().files()[0]
481 .file_id()
482 .file_id()
483 );
484 assert_eq!(
485 files[2],
486 windows.get(&12).unwrap().files().next().unwrap().files()[0]
487 .file_id()
488 .file_id()
489 );
490 }
491
492 #[test]
493 fn test_assign_file_groups_to_windows() {
494 let files = [
495 FileId::random(),
496 FileId::random(),
497 FileId::random(),
498 FileId::random(),
499 ];
500 let windows = assign_to_windows(
501 [
502 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
503 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
504 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
505 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
506 ]
507 .iter(),
508 3,
509 );
510 assert_eq!(windows.len(), 1);
511 let fgs = &windows.get(&0).unwrap().files;
512 assert_eq!(2, fgs.len());
513 assert_eq!(
514 fgs.get(&NonZeroU64::new(1))
515 .unwrap()
516 .files()
517 .iter()
518 .map(|f| f.file_id().file_id())
519 .collect::<HashSet<_>>(),
520 [files[0], files[1]].into_iter().collect()
521 );
522 assert_eq!(
523 fgs.get(&NonZeroU64::new(2))
524 .unwrap()
525 .files()
526 .iter()
527 .map(|f| f.file_id().file_id())
528 .collect::<HashSet<_>>(),
529 [files[2], files[3]].into_iter().collect()
530 );
531 }
532
533 #[test]
534 fn test_assign_compacting_to_windows() {
535 let files = [
536 new_file_handle(FileId::random(), 0, 999, 0),
537 new_file_handle(FileId::random(), 0, 999, 0),
538 new_file_handle(FileId::random(), 0, 999, 0),
539 new_file_handle(FileId::random(), 0, 999, 0),
540 new_file_handle(FileId::random(), 0, 999, 0),
541 ];
542 files[0].set_compacting(true);
543 files[2].set_compacting(true);
544 let mut windows = assign_to_windows(files.iter(), 3);
545 let window0 = windows.remove(&0).unwrap();
546 assert_eq!(1, window0.files.len());
547 let candidates = window0
548 .files
549 .into_values()
550 .flat_map(|fg| fg.into_files())
551 .map(|f| f.file_id().file_id())
552 .collect::<HashSet<_>>();
553 assert_eq!(candidates.len(), 3);
554 assert_eq!(
555 candidates,
556 [
557 files[1].file_id().file_id(),
558 files[3].file_id().file_id(),
559 files[4].file_id().file_id()
560 ]
561 .into_iter()
562 .collect::<HashSet<_>>()
563 );
564 }
565
566 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
568
569 fn check_assign_to_windows_with_overlapping(
570 file_time_ranges: &[(i64, i64)],
571 time_window: i64,
572 expected_files: &[ExpectedWindowSpec],
573 ) {
574 let files: Vec<_> = (0..file_time_ranges.len())
575 .map(|_| FileId::random())
576 .collect();
577
578 let file_handles = files
579 .iter()
580 .zip(file_time_ranges.iter())
581 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
582 .collect::<Vec<_>>();
583
584 let windows = assign_to_windows(file_handles.iter(), time_window);
585
586 for (expected_window, overlapping, window_files) in expected_files {
587 let actual_window = windows.get(expected_window).unwrap();
588 assert_eq!(*overlapping, actual_window.overlapping);
589 let mut file_ranges = actual_window
590 .files
591 .iter()
592 .flat_map(|(_, f)| {
593 f.files().iter().map(|f| {
594 let (s, e) = f.time_range();
595 (s.value(), e.value())
596 })
597 })
598 .collect::<Vec<_>>();
599 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
600 assert_eq!(window_files, &file_ranges);
601 }
602 }
603
604 #[test]
605 fn test_assign_to_windows_with_overlapping() {
606 check_assign_to_windows_with_overlapping(
607 &[(0, 999), (1000, 1999), (2000, 2999)],
608 2,
609 &[
610 (0, false, vec![(0, 999)]),
611 (2, false, vec![(1000, 1999), (2000, 2999)]),
612 ],
613 );
614
615 check_assign_to_windows_with_overlapping(
616 &[(0, 1), (0, 999), (100, 2999)],
617 2,
618 &[
619 (0, true, vec![(0, 1), (0, 999)]),
620 (2, true, vec![(100, 2999)]),
621 ],
622 );
623
624 check_assign_to_windows_with_overlapping(
625 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
626 2,
627 &[
628 (0, false, vec![(0, 999)]),
629 (2, false, vec![(1000, 1999), (2000, 2999)]),
630 (4, false, vec![(3000, 3999)]),
631 ],
632 );
633
634 check_assign_to_windows_with_overlapping(
635 &[
636 (0, 999),
637 (1000, 1999),
638 (2000, 2999),
639 (3000, 3999),
640 (0, 3999),
641 ],
642 2,
643 &[
644 (0, true, vec![(0, 999)]),
645 (2, true, vec![(1000, 1999), (2000, 2999)]),
646 (4, true, vec![(0, 3999), (3000, 3999)]),
647 ],
648 );
649
650 check_assign_to_windows_with_overlapping(
651 &[
652 (0, 999),
653 (1000, 1999),
654 (2000, 2999),
655 (3000, 3999),
656 (1999, 3999),
657 ],
658 2,
659 &[
660 (0, false, vec![(0, 999)]),
661 (2, true, vec![(1000, 1999), (2000, 2999)]),
662 (4, true, vec![(1999, 3999), (3000, 3999)]),
663 ],
664 );
665
666 check_assign_to_windows_with_overlapping(
667 &[
668 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
674 2,
675 &[
676 (0, false, vec![(0, 999)]),
678 (2, true, vec![(1000, 1999), (2000, 2999)]),
679 (4, true, vec![(2999, 3999), (3000, 3999)]),
680 ],
681 );
682
683 check_assign_to_windows_with_overlapping(
684 &[
685 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
691 2,
692 &[
693 (0, true, vec![(0, 999)]),
695 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
696 (4, false, vec![(3000, 3999)]),
697 ],
698 );
699 }
700
701 struct CompactionPickerTestCase {
702 window_size: i64,
703 input_files: Vec<FileHandle>,
704 expected_outputs: Vec<ExpectedOutput>,
705 }
706
707 impl CompactionPickerTestCase {
708 fn check(&self) {
709 let file_id_to_idx = self
710 .input_files
711 .iter()
712 .enumerate()
713 .map(|(idx, file)| (file.file_id(), idx))
714 .collect::<HashMap<_, _>>();
715 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
716 let active_window =
717 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
718 let output = TwcsPicker {
719 trigger_file_num: 4,
720 time_window_seconds: None,
721 max_output_file_size: None,
722 append_mode: false,
723 max_background_tasks: None,
724 }
725 .build_output(RegionId::from_u64(0), &mut windows, active_window);
726
727 let output = output
728 .iter()
729 .map(|o| {
730 let input_file_ids = o
731 .inputs
732 .iter()
733 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
734 .collect::<HashSet<_>>();
735 (input_file_ids, o.output_level)
736 })
737 .collect::<Vec<_>>();
738
739 let expected = self
740 .expected_outputs
741 .iter()
742 .map(|o| {
743 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
744 (input_file_ids, o.output_level)
745 })
746 .collect::<Vec<_>>();
747 assert_eq!(expected, output);
748 }
749 }
750
751 struct ExpectedOutput {
752 input_files: Vec<usize>,
753 output_level: Level,
754 }
755
756 #[test]
757 fn test_build_twcs_output() {
758 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
759
760 CompactionPickerTestCase {
762 window_size: 3,
763 input_files: [
764 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
765 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
766 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
769 .to_vec(),
770 expected_outputs: vec![
771 ExpectedOutput {
772 input_files: vec![0, 1],
773 output_level: 1,
774 },
775 ExpectedOutput {
776 input_files: vec![2, 3],
777 output_level: 1,
778 },
779 ],
780 }
781 .check();
782
783 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
790 CompactionPickerTestCase {
791 window_size: 3,
792 input_files: [
793 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
794 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
795 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
796 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
797 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
798 ]
799 .to_vec(),
800 expected_outputs: vec![
801 ExpectedOutput {
802 input_files: vec![0, 1],
803 output_level: 1,
804 },
805 ExpectedOutput {
806 input_files: vec![2, 4],
807 output_level: 1,
808 },
809 ],
810 }
811 .check();
812
813 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
817 CompactionPickerTestCase {
818 window_size: 3,
819 input_files: [
820 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
821 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
822 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
823 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
824 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
825 ]
826 .to_vec(),
827 expected_outputs: vec![ExpectedOutput {
828 input_files: vec![0, 1, 4],
829 output_level: 1,
830 }],
831 }
832 .check();
833 }
834
835 #[test]
836 fn test_append_mode_filter_large_files() {
837 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
838 let max_output_file_size = 1000u64;
839
840 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
842 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
843 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
844 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
845
846 let mut files_to_merge = vec![
848 FileGroup::new_with_file(small_file_1),
849 FileGroup::new_with_file(large_file_1),
850 FileGroup::new_with_file(small_file_2),
851 FileGroup::new_with_file(large_file_2),
852 ];
853
854 let original_count = files_to_merge.len();
856
857 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
859
860 assert_eq!(files_to_merge.len(), 2);
862 assert_eq!(original_count, 4);
863
864 for fg in &files_to_merge {
866 assert!(
867 fg.size() <= max_output_file_size as usize,
868 "File size {} should be <= {}",
869 fg.size(),
870 max_output_file_size
871 );
872 }
873 }
874
875 #[test]
876 fn test_build_output_multiple_windows_with_zero_runs() {
877 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
878
879 let files = [
880 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
882 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
883 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
884 new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
886 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
887 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
888 ];
889
890 let mut windows = assign_to_windows(files.iter(), 3);
891
892 let picker = TwcsPicker {
894 trigger_file_num: 4, time_window_seconds: Some(3),
896 max_output_file_size: None,
897 append_mode: false,
898 max_background_tasks: None,
899 };
900
901 let active_window = find_latest_window_in_seconds(files.iter(), 3);
902 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
903
904 assert!(
905 !output.is_empty(),
906 "Should have output from windows with runs, even when one window has 0 runs"
907 );
908
909 let all_output_files: Vec<_> = output
910 .iter()
911 .flat_map(|o| o.inputs.iter())
912 .map(|f| f.file_id().file_id())
913 .collect();
914
915 assert!(
916 all_output_files.contains(&file_ids[3])
917 || all_output_files.contains(&file_ids[4])
918 || all_output_files.contains(&file_ids[5]),
919 "Output should contain files from the window with runs"
920 );
921 }
922
923 #[test]
924 fn test_build_output_single_window_zero_runs() {
925 let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
926
927 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); let files = [large_file_1, large_file_2];
931
932 let mut windows = assign_to_windows(files.iter(), 3);
933
934 let picker = TwcsPicker {
935 trigger_file_num: 2,
936 time_window_seconds: Some(3),
937 max_output_file_size: Some(1000),
938 append_mode: true,
939 max_background_tasks: None,
940 };
941
942 let active_window = find_latest_window_in_seconds(files.iter(), 3);
943 let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
944
945 assert!(
947 output.is_empty(),
948 "Should return empty output when no runs are found after filtering"
949 );
950 }
951
952 #[test]
953 fn test_max_background_tasks_truncation() {
954 let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
955 let max_background_tasks = 3;
956
957 let files = [
959 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
961 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
962 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
963 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
964 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
966 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
967 new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
968 new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
969 new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
971 new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
972 ];
973
974 let mut windows = assign_to_windows(files.iter(), 3);
975
976 let picker = TwcsPicker {
977 trigger_file_num: 4,
978 time_window_seconds: Some(3),
979 max_output_file_size: None,
980 append_mode: false,
981 max_background_tasks: Some(max_background_tasks),
982 };
983
984 let active_window = find_latest_window_in_seconds(files.iter(), 3);
985 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
986
987 assert!(
989 output.len() <= max_background_tasks,
990 "Output should be truncated to max_background_tasks: expected <= {}, got {}",
991 max_background_tasks,
992 output.len()
993 );
994
995 let picker_no_limit = TwcsPicker {
997 trigger_file_num: 4,
998 time_window_seconds: Some(3),
999 max_output_file_size: None,
1000 append_mode: false,
1001 max_background_tasks: None,
1002 };
1003
1004 let mut windows_no_limit = assign_to_windows(files.iter(), 3);
1005 let output_no_limit = picker_no_limit.build_output(
1006 RegionId::from_u64(123),
1007 &mut windows_no_limit,
1008 active_window,
1009 );
1010
1011 if output_no_limit.len() > max_background_tasks {
1013 assert!(
1014 output_no_limit.len() > output.len(),
1015 "Without limit should have more outputs than with limit"
1016 );
1017 }
1018 }
1019
1020 #[test]
1021 fn test_max_background_tasks_no_truncation_when_under_limit() {
1022 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
1023 let max_background_tasks = 10; let files = [
1027 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1028 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1029 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1030 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1031 ];
1032
1033 let mut windows = assign_to_windows(files.iter(), 3);
1034
1035 let picker = TwcsPicker {
1036 trigger_file_num: 4,
1037 time_window_seconds: Some(3),
1038 max_output_file_size: None,
1039 append_mode: false,
1040 max_background_tasks: Some(max_background_tasks),
1041 };
1042
1043 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1044 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1045
1046 assert!(
1048 output.len() <= max_background_tasks,
1049 "Output should be within limit"
1050 );
1051 assert!(!output.is_empty(), "Should have at least one output");
1053 }
1054
1055 #[test]
1056 fn test_pick_multiple_runs() {
1057 common_telemetry::init_default_ut_logging();
1058
1059 let num_files = 8;
1060 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1061
1062 let files: Vec<_> = file_ids
1064 .iter()
1065 .enumerate()
1066 .map(|(idx, file_id)| {
1067 new_file_handle_with_size_and_sequence(
1068 *file_id,
1069 0,
1070 999,
1071 0,
1072 (idx + 1) as u64,
1073 1024 * 1024,
1074 )
1075 })
1076 .collect();
1077
1078 let mut windows = assign_to_windows(files.iter(), 3);
1079
1080 let picker = TwcsPicker {
1081 trigger_file_num: 4,
1082 time_window_seconds: Some(3),
1083 max_output_file_size: None,
1084 append_mode: false,
1085 max_background_tasks: None,
1086 };
1087
1088 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1089 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1090
1091 assert_eq!(1, output.len());
1092 assert_eq!(output[0].inputs.len(), 2);
1093 }
1094
1095 #[test]
1096 fn test_limit_max_input_files() {
1097 common_telemetry::init_default_ut_logging();
1098
1099 let num_files = 50;
1100 let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
1101
1102 let files: Vec<_> = file_ids
1104 .iter()
1105 .enumerate()
1106 .map(|(idx, file_id)| {
1107 new_file_handle_with_size_and_sequence(
1108 *file_id,
1109 (idx / 2 * 10) as i64,
1110 (idx / 2 * 10 + 5) as i64,
1111 0,
1112 (idx + 1) as u64,
1113 1024 * 1024,
1114 )
1115 })
1116 .collect();
1117
1118 let mut windows = assign_to_windows(files.iter(), 3);
1119
1120 let picker = TwcsPicker {
1121 trigger_file_num: 4,
1122 time_window_seconds: Some(3),
1123 max_output_file_size: None,
1124 append_mode: false,
1125 max_background_tasks: None,
1126 };
1127
1128 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1129 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1130
1131 assert_eq!(1, output.len());
1132 assert_eq!(output[0].inputs.len(), 32);
1133 }
1134
1135 }