1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39#[derive(Debug)]
42pub struct TwcsPicker {
43 pub trigger_file_num: usize,
45 pub time_window_seconds: Option<i64>,
47 pub max_output_file_size: Option<u64>,
49 pub append_mode: bool,
51 pub max_background_tasks: Option<usize>,
53}
54
55impl TwcsPicker {
56 fn build_output(
58 &self,
59 region_id: RegionId,
60 time_windows: &mut BTreeMap<i64, Window>,
61 active_window: Option<i64>,
62 ) -> Vec<CompactionOutput> {
63 let mut output = vec![];
64 for (window, files) in time_windows {
65 if files.files.is_empty() {
66 continue;
67 }
68 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
69
70 if self.append_mode
72 && let Some(max_size) = self.max_output_file_size
73 {
74 let (kept_files, ignored_files) = files_to_merge
75 .into_iter()
76 .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
77 files_to_merge = kept_files;
78 info!(
79 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
80 ignored_files.len(),
81 region_id,
82 window,
83 max_size
84 );
85 }
86
87 let sorted_runs = find_sorted_runs(&mut files_to_merge);
88 let found_runs = sorted_runs.len();
89 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
92 if found_runs == 0 {
93 continue;
94 }
95
96 let inputs = if found_runs > 1 {
97 reduce_runs(sorted_runs)
98 } else {
99 let run = sorted_runs.last().unwrap();
100 if run.items().len() < self.trigger_file_num {
101 continue;
102 }
103 merge_seq_files(run.items(), self.max_output_file_size)
105 };
106
107 if !inputs.is_empty() {
108 log_pick_result(
109 region_id,
110 *window,
111 active_window,
112 found_runs,
113 files.files.len(),
114 self.max_output_file_size,
115 filter_deleted,
116 &inputs,
117 );
118 output.push(CompactionOutput {
119 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
121 filter_deleted,
122 output_time_range: None, });
124
125 if let Some(max_background_tasks) = self.max_background_tasks
126 && output.len() >= max_background_tasks
127 {
128 debug!(
129 "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
130 region_id, max_background_tasks
131 );
132 break;
133 }
134 }
135 }
136 output
137 }
138}
139
140#[allow(clippy::too_many_arguments)]
141fn log_pick_result(
142 region_id: RegionId,
143 window: i64,
144 active_window: Option<i64>,
145 found_runs: usize,
146 file_num: usize,
147 max_output_file_size: Option<u64>,
148 filter_deleted: bool,
149 inputs: &[FileGroup],
150) {
151 let input_file_str: Vec<String> = inputs
152 .iter()
153 .map(|f| {
154 let range = f.range();
155 let start = range.0.to_iso8601_string();
156 let end = range.1.to_iso8601_string();
157 let num_rows = f.num_rows();
158 format!(
159 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
160 f.file_ids(),
161 start,
162 end,
163 ReadableSize(f.size() as u64),
164 num_rows
165 )
166 })
167 .collect();
168 let window_str = Timestamp::new_second(window).to_iso8601_string();
169 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
170 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
171 info!(
172 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
173 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
174 input files: {:?}",
175 region_id,
176 window_str,
177 active_window_str,
178 found_runs,
179 file_num,
180 max_output_file_size,
181 filter_deleted,
182 input_file_str
183 );
184}
185
186impl Picker for TwcsPicker {
187 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
188 let region_id = compaction_region.region_id;
189 let levels = compaction_region.current_version.ssts.levels();
190
191 let expired_ssts =
192 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
193 if !expired_ssts.is_empty() {
194 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
195 expired_ssts.iter().for_each(|f| f.set_compacting(true));
197 }
198
199 let compaction_time_window = compaction_region
200 .current_version
201 .compaction_time_window
202 .map(|window| window.as_secs() as i64);
203 let time_window_size = compaction_time_window
204 .or(self.time_window_seconds)
205 .unwrap_or_else(|| {
206 let inferred = infer_time_bucket(levels[0].files());
207 info!(
208 "Compaction window for region {} is not present, inferring from files: {:?}",
209 region_id, inferred
210 );
211 inferred
212 });
213
214 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
216 let mut windows =
218 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
219 let outputs = self.build_output(region_id, &mut windows, active_window);
220
221 if outputs.is_empty() && expired_ssts.is_empty() {
222 return None;
223 }
224
225 let max_file_size = self.max_output_file_size.map(|v| v as usize);
226 Some(PickerOutput {
227 outputs,
228 expired_ssts,
229 time_window_size,
230 max_file_size,
231 })
232 }
233}
234
235struct Window {
236 start: Timestamp,
237 end: Timestamp,
238 files: HashMap<Option<NonZeroU64>, FileGroup>,
241 time_window: i64,
242 overlapping: bool,
243}
244
245impl Window {
246 fn new_with_file(file: FileHandle) -> Self {
248 let (start, end) = file.time_range();
249 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
250 Self {
251 start,
252 end,
253 files,
254 time_window: 0,
255 overlapping: false,
256 }
257 }
258
259 fn range(&self) -> (Timestamp, Timestamp) {
261 (self.start, self.end)
262 }
263
264 fn add_file(&mut self, file: FileHandle) {
266 let (start, end) = file.time_range();
267 self.start = self.start.min(start);
268 self.end = self.end.max(end);
269
270 match self.files.entry(file.meta_ref().sequence) {
271 Entry::Occupied(mut o) => {
272 o.get_mut().add_file(file);
273 }
274 Entry::Vacant(v) => {
275 v.insert(FileGroup::new_with_file(file));
276 }
277 }
278 }
279
280 fn files(&self) -> impl Iterator<Item = &FileGroup> {
281 self.files.values()
282 }
283}
284
285fn assign_to_windows<'a>(
287 files: impl Iterator<Item = &'a FileHandle>,
288 time_window_size: i64,
289) -> BTreeMap<i64, Window> {
290 let mut windows: HashMap<i64, Window> = HashMap::new();
291 for f in files {
293 if f.compacting() {
294 continue;
295 }
296 let (_, end) = f.time_range();
297 let time_window = end
298 .convert_to(TimeUnit::Second)
299 .unwrap()
300 .value()
301 .align_to_ceil_by_bucket(time_window_size)
302 .unwrap_or(i64::MIN);
303
304 match windows.entry(time_window) {
305 Entry::Occupied(mut e) => {
306 e.get_mut().add_file(f.clone());
307 }
308 Entry::Vacant(e) => {
309 let mut window = Window::new_with_file(f.clone());
310 window.time_window = time_window;
311 e.insert(window);
312 }
313 }
314 }
315 if windows.is_empty() {
316 return BTreeMap::new();
317 }
318
319 let mut windows = windows.into_values().collect::<Vec<_>>();
320 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
321
322 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
325 let next_range = windows[idx].range();
326 if overlaps(¤t_range, &next_range) {
327 windows[idx - 1].overlapping = true;
328 windows[idx].overlapping = true;
329 }
330 current_range = (
331 current_range.0.min(next_range.0),
332 current_range.1.max(next_range.1),
333 );
334 }
335
336 windows.into_iter().map(|w| (w.time_window, w)).collect()
337}
338
339fn find_latest_window_in_seconds<'a>(
342 files: impl Iterator<Item = &'a FileHandle>,
343 time_window_size: i64,
344) -> Option<i64> {
345 let mut latest_timestamp = None;
346 for f in files {
347 let (_, end) = f.time_range();
348 if let Some(latest) = latest_timestamp {
349 if end > latest {
350 latest_timestamp = Some(end);
351 }
352 } else {
353 latest_timestamp = Some(end);
354 }
355 }
356 latest_timestamp
357 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
358 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
359}
360
361#[cfg(test)]
362mod tests {
363 use std::collections::HashSet;
364
365 use store_api::storage::FileId;
366
367 use super::*;
368 use crate::compaction::test_util::{
369 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
370 };
371 use crate::sst::file::Level;
372
373 #[test]
374 fn test_get_latest_window_in_seconds() {
375 assert_eq!(
376 Some(1),
377 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
378 );
379 assert_eq!(
380 Some(1),
381 find_latest_window_in_seconds(
382 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
383 1
384 )
385 );
386
387 assert_eq!(
388 Some(-9223372036854000),
389 find_latest_window_in_seconds(
390 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
391 3600,
392 )
393 );
394
395 assert_eq!(
396 (i64::MAX / 10000000 + 1) * 10000,
397 find_latest_window_in_seconds(
398 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
399 10000,
400 )
401 .unwrap()
402 );
403
404 assert_eq!(
405 Some((i64::MAX / 3600000 + 1) * 3600),
406 find_latest_window_in_seconds(
407 [
408 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
409 new_file_handle(FileId::random(), 0, 1000, 0)
410 ]
411 .iter(),
412 3600
413 )
414 );
415 }
416
417 #[test]
418 fn test_assign_to_windows() {
419 let windows = assign_to_windows(
420 [
421 new_file_handle(FileId::random(), 0, 999, 0),
422 new_file_handle(FileId::random(), 0, 999, 0),
423 new_file_handle(FileId::random(), 0, 999, 0),
424 new_file_handle(FileId::random(), 0, 999, 0),
425 new_file_handle(FileId::random(), 0, 999, 0),
426 ]
427 .iter(),
428 3,
429 );
430 let fgs = &windows.get(&0).unwrap().files;
431 assert_eq!(1, fgs.len());
432 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
433
434 let files = [FileId::random(); 3];
435 let windows = assign_to_windows(
436 [
437 new_file_handle(files[0], -2000, -3, 0),
438 new_file_handle(files[1], 0, 2999, 0),
439 new_file_handle(files[2], 50, 10001, 0),
440 ]
441 .iter(),
442 3,
443 );
444 assert_eq!(
445 files[0],
446 windows.get(&0).unwrap().files().next().unwrap().files()[0]
447 .file_id()
448 .file_id()
449 );
450 assert_eq!(
451 files[1],
452 windows.get(&3).unwrap().files().next().unwrap().files()[0]
453 .file_id()
454 .file_id()
455 );
456 assert_eq!(
457 files[2],
458 windows.get(&12).unwrap().files().next().unwrap().files()[0]
459 .file_id()
460 .file_id()
461 );
462 }
463
464 #[test]
465 fn test_assign_file_groups_to_windows() {
466 let files = [
467 FileId::random(),
468 FileId::random(),
469 FileId::random(),
470 FileId::random(),
471 ];
472 let windows = assign_to_windows(
473 [
474 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
475 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
476 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
477 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
478 ]
479 .iter(),
480 3,
481 );
482 assert_eq!(windows.len(), 1);
483 let fgs = &windows.get(&0).unwrap().files;
484 assert_eq!(2, fgs.len());
485 assert_eq!(
486 fgs.get(&NonZeroU64::new(1))
487 .unwrap()
488 .files()
489 .iter()
490 .map(|f| f.file_id().file_id())
491 .collect::<HashSet<_>>(),
492 [files[0], files[1]].into_iter().collect()
493 );
494 assert_eq!(
495 fgs.get(&NonZeroU64::new(2))
496 .unwrap()
497 .files()
498 .iter()
499 .map(|f| f.file_id().file_id())
500 .collect::<HashSet<_>>(),
501 [files[2], files[3]].into_iter().collect()
502 );
503 }
504
505 #[test]
506 fn test_assign_compacting_to_windows() {
507 let files = [
508 new_file_handle(FileId::random(), 0, 999, 0),
509 new_file_handle(FileId::random(), 0, 999, 0),
510 new_file_handle(FileId::random(), 0, 999, 0),
511 new_file_handle(FileId::random(), 0, 999, 0),
512 new_file_handle(FileId::random(), 0, 999, 0),
513 ];
514 files[0].set_compacting(true);
515 files[2].set_compacting(true);
516 let mut windows = assign_to_windows(files.iter(), 3);
517 let window0 = windows.remove(&0).unwrap();
518 assert_eq!(1, window0.files.len());
519 let candidates = window0
520 .files
521 .into_values()
522 .flat_map(|fg| fg.into_files())
523 .map(|f| f.file_id().file_id())
524 .collect::<HashSet<_>>();
525 assert_eq!(candidates.len(), 3);
526 assert_eq!(
527 candidates,
528 [
529 files[1].file_id().file_id(),
530 files[3].file_id().file_id(),
531 files[4].file_id().file_id()
532 ]
533 .into_iter()
534 .collect::<HashSet<_>>()
535 );
536 }
537
538 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
540
541 fn check_assign_to_windows_with_overlapping(
542 file_time_ranges: &[(i64, i64)],
543 time_window: i64,
544 expected_files: &[ExpectedWindowSpec],
545 ) {
546 let files: Vec<_> = (0..file_time_ranges.len())
547 .map(|_| FileId::random())
548 .collect();
549
550 let file_handles = files
551 .iter()
552 .zip(file_time_ranges.iter())
553 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
554 .collect::<Vec<_>>();
555
556 let windows = assign_to_windows(file_handles.iter(), time_window);
557
558 for (expected_window, overlapping, window_files) in expected_files {
559 let actual_window = windows.get(expected_window).unwrap();
560 assert_eq!(*overlapping, actual_window.overlapping);
561 let mut file_ranges = actual_window
562 .files
563 .iter()
564 .flat_map(|(_, f)| {
565 f.files().iter().map(|f| {
566 let (s, e) = f.time_range();
567 (s.value(), e.value())
568 })
569 })
570 .collect::<Vec<_>>();
571 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
572 assert_eq!(window_files, &file_ranges);
573 }
574 }
575
576 #[test]
577 fn test_assign_to_windows_with_overlapping() {
578 check_assign_to_windows_with_overlapping(
579 &[(0, 999), (1000, 1999), (2000, 2999)],
580 2,
581 &[
582 (0, false, vec![(0, 999)]),
583 (2, false, vec![(1000, 1999), (2000, 2999)]),
584 ],
585 );
586
587 check_assign_to_windows_with_overlapping(
588 &[(0, 1), (0, 999), (100, 2999)],
589 2,
590 &[
591 (0, true, vec![(0, 1), (0, 999)]),
592 (2, true, vec![(100, 2999)]),
593 ],
594 );
595
596 check_assign_to_windows_with_overlapping(
597 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
598 2,
599 &[
600 (0, false, vec![(0, 999)]),
601 (2, false, vec![(1000, 1999), (2000, 2999)]),
602 (4, false, vec![(3000, 3999)]),
603 ],
604 );
605
606 check_assign_to_windows_with_overlapping(
607 &[
608 (0, 999),
609 (1000, 1999),
610 (2000, 2999),
611 (3000, 3999),
612 (0, 3999),
613 ],
614 2,
615 &[
616 (0, true, vec![(0, 999)]),
617 (2, true, vec![(1000, 1999), (2000, 2999)]),
618 (4, true, vec![(0, 3999), (3000, 3999)]),
619 ],
620 );
621
622 check_assign_to_windows_with_overlapping(
623 &[
624 (0, 999),
625 (1000, 1999),
626 (2000, 2999),
627 (3000, 3999),
628 (1999, 3999),
629 ],
630 2,
631 &[
632 (0, false, vec![(0, 999)]),
633 (2, true, vec![(1000, 1999), (2000, 2999)]),
634 (4, true, vec![(1999, 3999), (3000, 3999)]),
635 ],
636 );
637
638 check_assign_to_windows_with_overlapping(
639 &[
640 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
646 2,
647 &[
648 (0, false, vec![(0, 999)]),
650 (2, true, vec![(1000, 1999), (2000, 2999)]),
651 (4, true, vec![(2999, 3999), (3000, 3999)]),
652 ],
653 );
654
655 check_assign_to_windows_with_overlapping(
656 &[
657 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
663 2,
664 &[
665 (0, true, vec![(0, 999)]),
667 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
668 (4, false, vec![(3000, 3999)]),
669 ],
670 );
671 }
672
673 struct CompactionPickerTestCase {
674 window_size: i64,
675 input_files: Vec<FileHandle>,
676 expected_outputs: Vec<ExpectedOutput>,
677 }
678
679 impl CompactionPickerTestCase {
680 fn check(&self) {
681 let file_id_to_idx = self
682 .input_files
683 .iter()
684 .enumerate()
685 .map(|(idx, file)| (file.file_id(), idx))
686 .collect::<HashMap<_, _>>();
687 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
688 let active_window =
689 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
690 let output = TwcsPicker {
691 trigger_file_num: 4,
692 time_window_seconds: None,
693 max_output_file_size: None,
694 append_mode: false,
695 max_background_tasks: None,
696 }
697 .build_output(RegionId::from_u64(0), &mut windows, active_window);
698
699 let output = output
700 .iter()
701 .map(|o| {
702 let input_file_ids = o
703 .inputs
704 .iter()
705 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
706 .collect::<HashSet<_>>();
707 (input_file_ids, o.output_level)
708 })
709 .collect::<Vec<_>>();
710
711 let expected = self
712 .expected_outputs
713 .iter()
714 .map(|o| {
715 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
716 (input_file_ids, o.output_level)
717 })
718 .collect::<Vec<_>>();
719 assert_eq!(expected, output);
720 }
721 }
722
723 struct ExpectedOutput {
724 input_files: Vec<usize>,
725 output_level: Level,
726 }
727
728 #[test]
729 fn test_build_twcs_output() {
730 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
731
732 CompactionPickerTestCase {
734 window_size: 3,
735 input_files: [
736 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
737 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
738 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
741 .to_vec(),
742 expected_outputs: vec![
743 ExpectedOutput {
744 input_files: vec![0, 1],
745 output_level: 1,
746 },
747 ExpectedOutput {
748 input_files: vec![2, 3],
749 output_level: 1,
750 },
751 ],
752 }
753 .check();
754
755 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
762 CompactionPickerTestCase {
763 window_size: 3,
764 input_files: [
765 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
766 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
767 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
768 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
769 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
770 ]
771 .to_vec(),
772 expected_outputs: vec![
773 ExpectedOutput {
774 input_files: vec![0, 1],
775 output_level: 1,
776 },
777 ExpectedOutput {
778 input_files: vec![2, 4],
779 output_level: 1,
780 },
781 ],
782 }
783 .check();
784
785 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
789 CompactionPickerTestCase {
790 window_size: 3,
791 input_files: [
792 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
793 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
794 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
795 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
796 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
797 ]
798 .to_vec(),
799 expected_outputs: vec![ExpectedOutput {
800 input_files: vec![0, 1, 4],
801 output_level: 1,
802 }],
803 }
804 .check();
805 }
806
807 #[test]
808 fn test_append_mode_filter_large_files() {
809 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
810 let max_output_file_size = 1000u64;
811
812 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
814 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
815 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
816 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
817
818 let mut files_to_merge = vec![
820 FileGroup::new_with_file(small_file_1),
821 FileGroup::new_with_file(large_file_1),
822 FileGroup::new_with_file(small_file_2),
823 FileGroup::new_with_file(large_file_2),
824 ];
825
826 let original_count = files_to_merge.len();
828
829 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
831
832 assert_eq!(files_to_merge.len(), 2);
834 assert_eq!(original_count, 4);
835
836 for fg in &files_to_merge {
838 assert!(
839 fg.size() <= max_output_file_size as usize,
840 "File size {} should be <= {}",
841 fg.size(),
842 max_output_file_size
843 );
844 }
845 }
846
847 #[test]
848 fn test_build_output_multiple_windows_with_zero_runs() {
849 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
850
851 let files = [
852 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
854 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
855 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
856 new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
858 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
859 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
860 ];
861
862 let mut windows = assign_to_windows(files.iter(), 3);
863
864 let picker = TwcsPicker {
866 trigger_file_num: 4, time_window_seconds: Some(3),
868 max_output_file_size: None,
869 append_mode: false,
870 max_background_tasks: None,
871 };
872
873 let active_window = find_latest_window_in_seconds(files.iter(), 3);
874 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
875
876 assert!(
877 !output.is_empty(),
878 "Should have output from windows with runs, even when one window has 0 runs"
879 );
880
881 let all_output_files: Vec<_> = output
882 .iter()
883 .flat_map(|o| o.inputs.iter())
884 .map(|f| f.file_id().file_id())
885 .collect();
886
887 assert!(
888 all_output_files.contains(&file_ids[3])
889 || all_output_files.contains(&file_ids[4])
890 || all_output_files.contains(&file_ids[5]),
891 "Output should contain files from the window with runs"
892 );
893 }
894
895 #[test]
896 fn test_build_output_single_window_zero_runs() {
897 let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
898
899 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); let files = [large_file_1, large_file_2];
903
904 let mut windows = assign_to_windows(files.iter(), 3);
905
906 let picker = TwcsPicker {
907 trigger_file_num: 2,
908 time_window_seconds: Some(3),
909 max_output_file_size: Some(1000),
910 append_mode: true,
911 max_background_tasks: None,
912 };
913
914 let active_window = find_latest_window_in_seconds(files.iter(), 3);
915 let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
916
917 assert!(
919 output.is_empty(),
920 "Should return empty output when no runs are found after filtering"
921 );
922 }
923
924 #[test]
925 fn test_max_background_tasks_truncation() {
926 let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
927 let max_background_tasks = 3;
928
929 let files = [
931 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
933 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
934 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
935 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
936 new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
938 new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
939 new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
940 new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
941 new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
943 new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
944 ];
945
946 let mut windows = assign_to_windows(files.iter(), 3);
947
948 let picker = TwcsPicker {
949 trigger_file_num: 4,
950 time_window_seconds: Some(3),
951 max_output_file_size: None,
952 append_mode: false,
953 max_background_tasks: Some(max_background_tasks),
954 };
955
956 let active_window = find_latest_window_in_seconds(files.iter(), 3);
957 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
958
959 assert!(
961 output.len() <= max_background_tasks,
962 "Output should be truncated to max_background_tasks: expected <= {}, got {}",
963 max_background_tasks,
964 output.len()
965 );
966
967 let picker_no_limit = TwcsPicker {
969 trigger_file_num: 4,
970 time_window_seconds: Some(3),
971 max_output_file_size: None,
972 append_mode: false,
973 max_background_tasks: None,
974 };
975
976 let mut windows_no_limit = assign_to_windows(files.iter(), 3);
977 let output_no_limit = picker_no_limit.build_output(
978 RegionId::from_u64(123),
979 &mut windows_no_limit,
980 active_window,
981 );
982
983 if output_no_limit.len() > max_background_tasks {
985 assert!(
986 output_no_limit.len() > output.len(),
987 "Without limit should have more outputs than with limit"
988 );
989 }
990 }
991
992 #[test]
993 fn test_max_background_tasks_no_truncation_when_under_limit() {
994 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
995 let max_background_tasks = 10; let files = [
999 new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
1000 new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
1001 new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
1002 new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
1003 ];
1004
1005 let mut windows = assign_to_windows(files.iter(), 3);
1006
1007 let picker = TwcsPicker {
1008 trigger_file_num: 4,
1009 time_window_seconds: Some(3),
1010 max_output_file_size: None,
1011 append_mode: false,
1012 max_background_tasks: Some(max_background_tasks),
1013 };
1014
1015 let active_window = find_latest_window_in_seconds(files.iter(), 3);
1016 let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
1017
1018 assert!(
1020 output.len() <= max_background_tasks,
1021 "Output should be within limit"
1022 );
1023 assert!(!output.is_empty(), "Should have at least one output");
1025 }
1026
1027 }