1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18
19use common_telemetry::{info, trace};
20use common_time::timestamp::TimeUnit;
21use common_time::timestamp_millis::BucketAligned;
22use common_time::Timestamp;
23
24use crate::compaction::buckets::infer_time_bucket;
25use crate::compaction::compactor::CompactionRegion;
26use crate::compaction::picker::{Picker, PickerOutput};
27use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
28use crate::compaction::{get_expired_ssts, CompactionOutput};
29use crate::sst::file::{overlaps, FileHandle, Level};
30use crate::sst::version::LevelMeta;
31
32const LEVEL_COMPACTED: Level = 1;
33
34#[derive(Debug)]
37pub struct TwcsPicker {
38 pub max_active_window_runs: usize,
40 pub max_active_window_files: usize,
42 pub max_inactive_window_runs: usize,
44 pub max_inactive_window_files: usize,
46 pub time_window_seconds: Option<i64>,
48 pub max_output_file_size: Option<u64>,
50 pub append_mode: bool,
52}
53
54impl TwcsPicker {
55 fn build_output(
59 &self,
60 time_windows: &mut BTreeMap<i64, Window>,
61 active_window: Option<i64>,
62 ) -> Vec<CompactionOutput> {
63 let mut output = vec![];
64 for (window, files) in time_windows {
65 let sorted_runs = find_sorted_runs(&mut files.files);
66
67 let (max_runs, max_files) = if let Some(active_window) = active_window
68 && *window == active_window
69 {
70 (self.max_active_window_runs, self.max_active_window_files)
71 } else {
72 (
73 self.max_inactive_window_runs,
74 self.max_inactive_window_files,
75 )
76 };
77
78 let found_runs = sorted_runs.len();
79 let filter_deleted =
82 !files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
83
84 let inputs = if found_runs > max_runs {
85 let files_to_compact = reduce_runs(sorted_runs, max_runs);
86 let files_to_compact_len = files_to_compact.len();
87 info!(
88 "Building compaction output, active window: {:?}, \
89 current window: {}, \
90 max runs: {}, \
91 found runs: {}, \
92 output size: {}, \
93 max output size: {:?}, \
94 remove deletion markers: {}",
95 active_window,
96 *window,
97 max_runs,
98 found_runs,
99 files_to_compact_len,
100 self.max_output_file_size,
101 filter_deleted
102 );
103 files_to_compact
104 } else if files.files.len() > max_files {
105 info!(
106 "Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
107 *window,
108 active_window,
109 max_files,
110 files.files.len(),
111 self.max_output_file_size,
112 filter_deleted,
113 );
114 vec![enforce_file_num(&files.files, max_files)]
116 } else {
117 trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
118 continue;
119 };
120
121 let split_inputs = if !filter_deleted
122 && let Some(max_output_file_size) = self.max_output_file_size
123 {
124 let len_before_split = inputs.len();
125 let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
126 if maybe_split.len() != len_before_split {
127 info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
128 }
129 maybe_split
130 } else {
131 inputs
132 };
133
134 for input in split_inputs {
135 debug_assert!(input.len() > 1);
136 output.push(CompactionOutput {
137 output_level: LEVEL_COMPACTED, inputs: input,
139 filter_deleted,
140 output_time_range: None, });
142 }
143 }
144 output
145 }
146}
147
148fn enforce_max_output_size(
153 inputs: Vec<Vec<FileHandle>>,
154 max_output_file_size: u64,
155) -> Vec<Vec<FileHandle>> {
156 inputs
157 .into_iter()
158 .flat_map(|input| {
159 debug_assert!(input.len() > 1);
160 let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
161 if estimated_output_size < max_output_file_size {
162 return vec![input];
164 }
165 let mut splits = vec![];
166 let mut new_input = vec![];
167 let mut new_input_size = 0;
168 for f in input {
169 if new_input_size + f.size() > max_output_file_size {
170 splits.push(std::mem::take(&mut new_input));
171 new_input_size = 0;
172 }
173 new_input_size += f.size();
174 new_input.push(f);
175 }
176 if !new_input.is_empty() {
177 splits.push(new_input);
178 }
179 splits
180 })
181 .filter(|p| p.len() > 1)
182 .collect()
183}
184
185fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
190 debug_assert!(files.len() > max_file_num);
191 let to_merge = files.len() - max_file_num + 1;
192 let mut min_penalty = usize::MAX;
193 let mut min_idx = 0;
194
195 for idx in 0..=(files.len() - to_merge) {
196 let current_penalty: usize = files
197 .iter()
198 .skip(idx)
199 .take(to_merge)
200 .map(|f| f.size())
201 .sum();
202 if current_penalty < min_penalty {
203 min_penalty = current_penalty;
204 min_idx = idx;
205 }
206 }
207 files.iter().skip(min_idx).take(to_merge).cloned().collect()
208}
209
210impl Picker for TwcsPicker {
211 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
212 let region_id = compaction_region.region_id;
213 let levels = compaction_region.current_version.ssts.levels();
214
215 let expired_ssts =
216 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
217 if !expired_ssts.is_empty() {
218 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
219 expired_ssts.iter().for_each(|f| f.set_compacting(true));
221 }
222
223 let compaction_time_window = compaction_region
224 .current_version
225 .compaction_time_window
226 .map(|window| window.as_secs() as i64);
227 let time_window_size = compaction_time_window
228 .or(self.time_window_seconds)
229 .unwrap_or_else(|| {
230 let inferred = infer_time_bucket(levels[0].files());
231 info!(
232 "Compaction window for region {} is not present, inferring from files: {:?}",
233 region_id, inferred
234 );
235 inferred
236 });
237
238 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
240 let mut windows =
242 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
243 let outputs = self.build_output(&mut windows, active_window);
244
245 if outputs.is_empty() && expired_ssts.is_empty() {
246 return None;
247 }
248
249 Some(PickerOutput {
250 outputs,
251 expired_ssts,
252 time_window_size,
253 })
254 }
255}
256
257struct Window {
258 start: Timestamp,
259 end: Timestamp,
260 files: Vec<FileHandle>,
261 time_window: i64,
262 overlapping: bool,
263}
264
265impl Window {
266 fn new_with_file(file: FileHandle) -> Self {
268 let (start, end) = file.time_range();
269 Self {
270 start,
271 end,
272 files: vec![file],
273 time_window: 0,
274 overlapping: false,
275 }
276 }
277
278 fn range(&self) -> (Timestamp, Timestamp) {
280 (self.start, self.end)
281 }
282
283 fn add_file(&mut self, file: FileHandle) {
285 let (start, end) = file.time_range();
286 self.start = self.start.min(start);
287 self.end = self.end.max(end);
288 self.files.push(file);
289 }
290}
291
292fn assign_to_windows<'a>(
294 files: impl Iterator<Item = &'a FileHandle>,
295 time_window_size: i64,
296) -> BTreeMap<i64, Window> {
297 let mut windows: HashMap<i64, Window> = HashMap::new();
298 for f in files {
300 if f.compacting() {
301 continue;
302 }
303 let (_, end) = f.time_range();
304 let time_window = end
305 .convert_to(TimeUnit::Second)
306 .unwrap()
307 .value()
308 .align_to_ceil_by_bucket(time_window_size)
309 .unwrap_or(i64::MIN);
310
311 match windows.entry(time_window) {
312 Entry::Occupied(mut e) => {
313 e.get_mut().add_file(f.clone());
314 }
315 Entry::Vacant(e) => {
316 let mut window = Window::new_with_file(f.clone());
317 window.time_window = time_window;
318 e.insert(window);
319 }
320 }
321 }
322 if windows.is_empty() {
323 return BTreeMap::new();
324 }
325
326 let mut windows = windows.into_values().collect::<Vec<_>>();
327 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
328
329 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
332 let next_range = windows[idx].range();
333 if overlaps(¤t_range, &next_range) {
334 windows[idx - 1].overlapping = true;
335 windows[idx].overlapping = true;
336 }
337 current_range = (
338 current_range.0.min(next_range.0),
339 current_range.1.max(next_range.1),
340 );
341 }
342
343 windows.into_iter().map(|w| (w.time_window, w)).collect()
344}
345
346fn find_latest_window_in_seconds<'a>(
349 files: impl Iterator<Item = &'a FileHandle>,
350 time_window_size: i64,
351) -> Option<i64> {
352 let mut latest_timestamp = None;
353 for f in files {
354 let (_, end) = f.time_range();
355 if let Some(latest) = latest_timestamp {
356 if end > latest {
357 latest_timestamp = Some(end);
358 }
359 } else {
360 latest_timestamp = Some(end);
361 }
362 }
363 latest_timestamp
364 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
365 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
366}
367
368#[cfg(test)]
369mod tests {
370 use std::collections::HashSet;
371 use std::sync::Arc;
372
373 use super::*;
374 use crate::compaction::test_util::{new_file_handle, new_file_handles};
375 use crate::sst::file::{FileId, FileMeta, Level};
376 use crate::test_util::NoopFilePurger;
377
378 #[test]
379 fn test_get_latest_window_in_seconds() {
380 assert_eq!(
381 Some(1),
382 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
383 );
384 assert_eq!(
385 Some(1),
386 find_latest_window_in_seconds(
387 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
388 1
389 )
390 );
391
392 assert_eq!(
393 Some(-9223372036854000),
394 find_latest_window_in_seconds(
395 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
396 3600,
397 )
398 );
399
400 assert_eq!(
401 (i64::MAX / 10000000 + 1) * 10000,
402 find_latest_window_in_seconds(
403 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
404 10000,
405 )
406 .unwrap()
407 );
408
409 assert_eq!(
410 Some((i64::MAX / 3600000 + 1) * 3600),
411 find_latest_window_in_seconds(
412 [
413 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
414 new_file_handle(FileId::random(), 0, 1000, 0)
415 ]
416 .iter(),
417 3600
418 )
419 );
420 }
421
422 #[test]
423 fn test_assign_to_windows() {
424 let windows = assign_to_windows(
425 [
426 new_file_handle(FileId::random(), 0, 999, 0),
427 new_file_handle(FileId::random(), 0, 999, 0),
428 new_file_handle(FileId::random(), 0, 999, 0),
429 new_file_handle(FileId::random(), 0, 999, 0),
430 new_file_handle(FileId::random(), 0, 999, 0),
431 ]
432 .iter(),
433 3,
434 );
435 assert_eq!(5, windows.get(&0).unwrap().files.len());
436
437 let files = [FileId::random(); 3];
438 let windows = assign_to_windows(
439 [
440 new_file_handle(files[0], -2000, -3, 0),
441 new_file_handle(files[1], 0, 2999, 0),
442 new_file_handle(files[2], 50, 10001, 0),
443 ]
444 .iter(),
445 3,
446 );
447 assert_eq!(
448 files[0],
449 windows.get(&0).unwrap().files.first().unwrap().file_id()
450 );
451 assert_eq!(
452 files[1],
453 windows.get(&3).unwrap().files.first().unwrap().file_id()
454 );
455 assert_eq!(
456 files[2],
457 windows.get(&12).unwrap().files.first().unwrap().file_id()
458 );
459 }
460
461 #[test]
462 fn test_assign_compacting_to_windows() {
463 let files = [
464 new_file_handle(FileId::random(), 0, 999, 0),
465 new_file_handle(FileId::random(), 0, 999, 0),
466 new_file_handle(FileId::random(), 0, 999, 0),
467 new_file_handle(FileId::random(), 0, 999, 0),
468 new_file_handle(FileId::random(), 0, 999, 0),
469 ];
470 files[0].set_compacting(true);
471 files[2].set_compacting(true);
472 let windows = assign_to_windows(files.iter(), 3);
473 assert_eq!(3, windows.get(&0).unwrap().files.len());
474 }
475
476 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
478
479 fn check_assign_to_windows_with_overlapping(
480 file_time_ranges: &[(i64, i64)],
481 time_window: i64,
482 expected_files: &[ExpectedWindowSpec],
483 ) {
484 let files: Vec<_> = (0..file_time_ranges.len())
485 .map(|_| FileId::random())
486 .collect();
487
488 let file_handles = files
489 .iter()
490 .zip(file_time_ranges.iter())
491 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
492 .collect::<Vec<_>>();
493
494 let windows = assign_to_windows(file_handles.iter(), time_window);
495
496 for (expected_window, overlapping, window_files) in expected_files {
497 let actual_window = windows.get(expected_window).unwrap();
498 assert_eq!(*overlapping, actual_window.overlapping);
499 let mut file_ranges = actual_window
500 .files
501 .iter()
502 .map(|f| {
503 let (s, e) = f.time_range();
504 (s.value(), e.value())
505 })
506 .collect::<Vec<_>>();
507 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
508 assert_eq!(window_files, &file_ranges);
509 }
510 }
511
512 #[test]
513 fn test_assign_to_windows_with_overlapping() {
514 check_assign_to_windows_with_overlapping(
515 &[(0, 999), (1000, 1999), (2000, 2999)],
516 2,
517 &[
518 (0, false, vec![(0, 999)]),
519 (2, false, vec![(1000, 1999), (2000, 2999)]),
520 ],
521 );
522
523 check_assign_to_windows_with_overlapping(
524 &[(0, 1), (0, 999), (100, 2999)],
525 2,
526 &[
527 (0, true, vec![(0, 1), (0, 999)]),
528 (2, true, vec![(100, 2999)]),
529 ],
530 );
531
532 check_assign_to_windows_with_overlapping(
533 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
534 2,
535 &[
536 (0, false, vec![(0, 999)]),
537 (2, false, vec![(1000, 1999), (2000, 2999)]),
538 (4, false, vec![(3000, 3999)]),
539 ],
540 );
541
542 check_assign_to_windows_with_overlapping(
543 &[
544 (0, 999),
545 (1000, 1999),
546 (2000, 2999),
547 (3000, 3999),
548 (0, 3999),
549 ],
550 2,
551 &[
552 (0, true, vec![(0, 999)]),
553 (2, true, vec![(1000, 1999), (2000, 2999)]),
554 (4, true, vec![(0, 3999), (3000, 3999)]),
555 ],
556 );
557
558 check_assign_to_windows_with_overlapping(
559 &[
560 (0, 999),
561 (1000, 1999),
562 (2000, 2999),
563 (3000, 3999),
564 (1999, 3999),
565 ],
566 2,
567 &[
568 (0, false, vec![(0, 999)]),
569 (2, true, vec![(1000, 1999), (2000, 2999)]),
570 (4, true, vec![(1999, 3999), (3000, 3999)]),
571 ],
572 );
573
574 check_assign_to_windows_with_overlapping(
575 &[
576 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
582 2,
583 &[
584 (0, false, vec![(0, 999)]),
586 (2, true, vec![(1000, 1999), (2000, 2999)]),
587 (4, true, vec![(2999, 3999), (3000, 3999)]),
588 ],
589 );
590
591 check_assign_to_windows_with_overlapping(
592 &[
593 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
599 2,
600 &[
601 (0, true, vec![(0, 999)]),
603 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
604 (4, false, vec![(3000, 3999)]),
605 ],
606 );
607 }
608
609 struct CompactionPickerTestCase {
610 window_size: i64,
611 input_files: Vec<FileHandle>,
612 expected_outputs: Vec<ExpectedOutput>,
613 }
614
615 impl CompactionPickerTestCase {
616 fn check(&self) {
617 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
618 let active_window =
619 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
620 let output = TwcsPicker {
621 max_active_window_runs: 4,
622 max_active_window_files: usize::MAX,
623 max_inactive_window_runs: 1,
624 max_inactive_window_files: usize::MAX,
625 time_window_seconds: None,
626 max_output_file_size: None,
627 append_mode: false,
628 }
629 .build_output(&mut windows, active_window);
630
631 let output = output
632 .iter()
633 .map(|o| {
634 let input_file_ids =
635 o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
636 (input_file_ids, o.output_level)
637 })
638 .collect::<Vec<_>>();
639
640 let expected = self
641 .expected_outputs
642 .iter()
643 .map(|o| {
644 let input_file_ids = o
645 .input_files
646 .iter()
647 .map(|idx| self.input_files[*idx].file_id())
648 .collect::<HashSet<_>>();
649 (input_file_ids, o.output_level)
650 })
651 .collect::<Vec<_>>();
652 assert_eq!(expected, output);
653 }
654 }
655
656 struct ExpectedOutput {
657 input_files: Vec<usize>,
658 output_level: Level,
659 }
660
661 fn check_enforce_file_num(
662 input_files: &[(i64, i64, u64)],
663 max_file_num: usize,
664 files_to_merge: &[(i64, i64)],
665 ) {
666 let mut files = new_file_handles(input_files);
667 find_sorted_runs(&mut files);
669 let mut to_merge = enforce_file_num(&files, max_file_num);
670 to_merge.sort_unstable_by_key(|f| f.time_range().0);
671 assert_eq!(
672 files_to_merge.to_vec(),
673 to_merge
674 .iter()
675 .map(|f| {
676 let (start, end) = f.time_range();
677 (start.value(), end.value())
678 })
679 .collect::<Vec<_>>()
680 );
681 }
682
683 #[test]
684 fn test_enforce_file_num() {
685 check_enforce_file_num(
686 &[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
687 2,
688 &[(100, 200), (200, 400)],
689 );
690
691 check_enforce_file_num(
692 &[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
693 1,
694 &[(0, 300), (100, 200), (200, 400)],
695 );
696 }
697
698 #[test]
699 fn test_build_twcs_output() {
700 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
701
702 CompactionPickerTestCase {
703 window_size: 3,
704 input_files: [
705 new_file_handle(file_ids[0], -2000, -3, 0),
706 new_file_handle(file_ids[1], -3000, -100, 0),
707 new_file_handle(file_ids[2], 0, 2999, 0), new_file_handle(file_ids[3], 50, 2998, 0), ]
710 .to_vec(),
711 expected_outputs: vec![ExpectedOutput {
712 input_files: vec![0, 1],
713 output_level: 1,
714 }],
715 }
716 .check();
717
718 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
719 CompactionPickerTestCase {
720 window_size: 3,
721 input_files: [
722 new_file_handle(file_ids[0], -2000, -3, 0),
723 new_file_handle(file_ids[1], -3000, -100, 0),
724 new_file_handle(file_ids[2], 0, 2999, 0),
725 new_file_handle(file_ids[3], 50, 2998, 0),
726 new_file_handle(file_ids[4], 11, 2990, 0),
727 new_file_handle(file_ids[5], 50, 4998, 0),
728 ]
729 .to_vec(),
730 expected_outputs: vec![
731 ExpectedOutput {
732 input_files: vec![0, 1],
733 output_level: 1,
734 },
735 ExpectedOutput {
736 input_files: vec![2, 3, 4],
737 output_level: 1,
738 },
739 ],
740 }
741 .check();
742 }
743
744 fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
745 inputs
746 .iter()
747 .map(|(start, end, size)| {
748 FileHandle::new(
749 FileMeta {
750 region_id: Default::default(),
751 file_id: Default::default(),
752 time_range: (
753 Timestamp::new_millisecond(*start),
754 Timestamp::new_millisecond(*end),
755 ),
756 level: 0,
757 file_size: *size,
758 available_indexes: Default::default(),
759 index_file_size: 0,
760 num_rows: 0,
761 num_row_groups: 0,
762 sequence: None,
763 },
764 Arc::new(NoopFilePurger),
765 )
766 })
767 .collect()
768 }
769
770 #[test]
771 fn test_limit_output_size() {
772 let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
773 let runs = find_sorted_runs(&mut files);
774 assert_eq!(6, runs.len());
775 let files_to_merge = reduce_runs(runs, 2);
776
777 let enforced = enforce_max_output_size(files_to_merge, 2);
778 assert_eq!(2, enforced.len());
779 assert_eq!(2, enforced[0].len());
780 assert_eq!(2, enforced[1].len());
781 }
782
783 }