1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39#[derive(Debug)]
42pub struct TwcsPicker {
43 pub trigger_file_num: usize,
45 pub time_window_seconds: Option<i64>,
47 pub max_output_file_size: Option<u64>,
49 pub append_mode: bool,
51}
52
53impl TwcsPicker {
54 fn build_output(
56 &self,
57 region_id: RegionId,
58 time_windows: &mut BTreeMap<i64, Window>,
59 active_window: Option<i64>,
60 ) -> Vec<CompactionOutput> {
61 let mut output = vec![];
62 for (window, files) in time_windows {
63 if files.files.is_empty() {
64 continue;
65 }
66 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67 let sorted_runs = find_sorted_runs(&mut files_to_merge);
68 let found_runs = sorted_runs.len();
69 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
72
73 let inputs = if found_runs > 1 {
74 reduce_runs(sorted_runs)
75 } else {
76 let run = sorted_runs.last().unwrap();
77 if run.items().len() < self.trigger_file_num {
78 continue;
79 }
80 merge_seq_files(run.items(), self.max_output_file_size)
82 };
83
84 if !inputs.is_empty() {
85 log_pick_result(
86 region_id,
87 *window,
88 active_window,
89 found_runs,
90 files.files.len(),
91 self.max_output_file_size,
92 filter_deleted,
93 &inputs,
94 );
95 output.push(CompactionOutput {
96 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
98 filter_deleted,
99 output_time_range: None, });
101 }
102 }
103 output
104 }
105}
106
107#[allow(clippy::too_many_arguments)]
108fn log_pick_result(
109 region_id: RegionId,
110 window: i64,
111 active_window: Option<i64>,
112 found_runs: usize,
113 file_num: usize,
114 max_output_file_size: Option<u64>,
115 filter_deleted: bool,
116 inputs: &[FileGroup],
117) {
118 let input_file_str: Vec<String> = inputs
119 .iter()
120 .map(|f| {
121 let range = f.range();
122 let start = range.0.to_iso8601_string();
123 let end = range.1.to_iso8601_string();
124 let num_rows = f.num_rows();
125 format!(
126 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
127 f.file_ids(),
128 start,
129 end,
130 ReadableSize(f.size() as u64),
131 num_rows
132 )
133 })
134 .collect();
135 let window_str = Timestamp::new_second(window).to_iso8601_string();
136 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
137 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
138 info!(
139 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
140 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
141 input files: {:?}",
142 region_id,
143 window_str,
144 active_window_str,
145 found_runs,
146 file_num,
147 max_output_file_size,
148 filter_deleted,
149 input_file_str
150 );
151}
152
153impl Picker for TwcsPicker {
154 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
155 let region_id = compaction_region.region_id;
156 let levels = compaction_region.current_version.ssts.levels();
157
158 let expired_ssts =
159 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
160 if !expired_ssts.is_empty() {
161 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
162 expired_ssts.iter().for_each(|f| f.set_compacting(true));
164 }
165
166 let compaction_time_window = compaction_region
167 .current_version
168 .compaction_time_window
169 .map(|window| window.as_secs() as i64);
170 let time_window_size = compaction_time_window
171 .or(self.time_window_seconds)
172 .unwrap_or_else(|| {
173 let inferred = infer_time_bucket(levels[0].files());
174 info!(
175 "Compaction window for region {} is not present, inferring from files: {:?}",
176 region_id, inferred
177 );
178 inferred
179 });
180
181 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
183 let mut windows =
185 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
186 let outputs = self.build_output(region_id, &mut windows, active_window);
187
188 if outputs.is_empty() && expired_ssts.is_empty() {
189 return None;
190 }
191
192 let max_file_size = self.max_output_file_size.map(|v| v as usize);
193 Some(PickerOutput {
194 outputs,
195 expired_ssts,
196 time_window_size,
197 max_file_size,
198 })
199 }
200}
201
202struct Window {
203 start: Timestamp,
204 end: Timestamp,
205 files: HashMap<Option<NonZeroU64>, FileGroup>,
208 time_window: i64,
209 overlapping: bool,
210}
211
212impl Window {
213 fn new_with_file(file: FileHandle) -> Self {
215 let (start, end) = file.time_range();
216 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
217 Self {
218 start,
219 end,
220 files,
221 time_window: 0,
222 overlapping: false,
223 }
224 }
225
226 fn range(&self) -> (Timestamp, Timestamp) {
228 (self.start, self.end)
229 }
230
231 fn add_file(&mut self, file: FileHandle) {
233 let (start, end) = file.time_range();
234 self.start = self.start.min(start);
235 self.end = self.end.max(end);
236
237 match self.files.entry(file.meta_ref().sequence) {
238 Entry::Occupied(mut o) => {
239 o.get_mut().add_file(file);
240 }
241 Entry::Vacant(v) => {
242 v.insert(FileGroup::new_with_file(file));
243 }
244 }
245 }
246
247 fn files(&self) -> impl Iterator<Item = &FileGroup> {
248 self.files.values()
249 }
250}
251
252fn assign_to_windows<'a>(
254 files: impl Iterator<Item = &'a FileHandle>,
255 time_window_size: i64,
256) -> BTreeMap<i64, Window> {
257 let mut windows: HashMap<i64, Window> = HashMap::new();
258 for f in files {
260 if f.compacting() {
261 continue;
262 }
263 let (_, end) = f.time_range();
264 let time_window = end
265 .convert_to(TimeUnit::Second)
266 .unwrap()
267 .value()
268 .align_to_ceil_by_bucket(time_window_size)
269 .unwrap_or(i64::MIN);
270
271 match windows.entry(time_window) {
272 Entry::Occupied(mut e) => {
273 e.get_mut().add_file(f.clone());
274 }
275 Entry::Vacant(e) => {
276 let mut window = Window::new_with_file(f.clone());
277 window.time_window = time_window;
278 e.insert(window);
279 }
280 }
281 }
282 if windows.is_empty() {
283 return BTreeMap::new();
284 }
285
286 let mut windows = windows.into_values().collect::<Vec<_>>();
287 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
288
289 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
292 let next_range = windows[idx].range();
293 if overlaps(¤t_range, &next_range) {
294 windows[idx - 1].overlapping = true;
295 windows[idx].overlapping = true;
296 }
297 current_range = (
298 current_range.0.min(next_range.0),
299 current_range.1.max(next_range.1),
300 );
301 }
302
303 windows.into_iter().map(|w| (w.time_window, w)).collect()
304}
305
306fn find_latest_window_in_seconds<'a>(
309 files: impl Iterator<Item = &'a FileHandle>,
310 time_window_size: i64,
311) -> Option<i64> {
312 let mut latest_timestamp = None;
313 for f in files {
314 let (_, end) = f.time_range();
315 if let Some(latest) = latest_timestamp {
316 if end > latest {
317 latest_timestamp = Some(end);
318 }
319 } else {
320 latest_timestamp = Some(end);
321 }
322 }
323 latest_timestamp
324 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
325 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::HashSet;
331
332 use super::*;
333 use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
334 use crate::sst::file::{FileId, Level};
335
336 #[test]
337 fn test_get_latest_window_in_seconds() {
338 assert_eq!(
339 Some(1),
340 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
341 );
342 assert_eq!(
343 Some(1),
344 find_latest_window_in_seconds(
345 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
346 1
347 )
348 );
349
350 assert_eq!(
351 Some(-9223372036854000),
352 find_latest_window_in_seconds(
353 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
354 3600,
355 )
356 );
357
358 assert_eq!(
359 (i64::MAX / 10000000 + 1) * 10000,
360 find_latest_window_in_seconds(
361 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
362 10000,
363 )
364 .unwrap()
365 );
366
367 assert_eq!(
368 Some((i64::MAX / 3600000 + 1) * 3600),
369 find_latest_window_in_seconds(
370 [
371 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
372 new_file_handle(FileId::random(), 0, 1000, 0)
373 ]
374 .iter(),
375 3600
376 )
377 );
378 }
379
380 #[test]
381 fn test_assign_to_windows() {
382 let windows = assign_to_windows(
383 [
384 new_file_handle(FileId::random(), 0, 999, 0),
385 new_file_handle(FileId::random(), 0, 999, 0),
386 new_file_handle(FileId::random(), 0, 999, 0),
387 new_file_handle(FileId::random(), 0, 999, 0),
388 new_file_handle(FileId::random(), 0, 999, 0),
389 ]
390 .iter(),
391 3,
392 );
393 let fgs = &windows.get(&0).unwrap().files;
394 assert_eq!(1, fgs.len());
395 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
396
397 let files = [FileId::random(); 3];
398 let windows = assign_to_windows(
399 [
400 new_file_handle(files[0], -2000, -3, 0),
401 new_file_handle(files[1], 0, 2999, 0),
402 new_file_handle(files[2], 50, 10001, 0),
403 ]
404 .iter(),
405 3,
406 );
407 assert_eq!(
408 files[0],
409 windows.get(&0).unwrap().files().next().unwrap().files()[0]
410 .file_id()
411 .file_id()
412 );
413 assert_eq!(
414 files[1],
415 windows.get(&3).unwrap().files().next().unwrap().files()[0]
416 .file_id()
417 .file_id()
418 );
419 assert_eq!(
420 files[2],
421 windows.get(&12).unwrap().files().next().unwrap().files()[0]
422 .file_id()
423 .file_id()
424 );
425 }
426
427 #[test]
428 fn test_assign_file_groups_to_windows() {
429 let files = [
430 FileId::random(),
431 FileId::random(),
432 FileId::random(),
433 FileId::random(),
434 ];
435 let windows = assign_to_windows(
436 [
437 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
438 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
439 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
440 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
441 ]
442 .iter(),
443 3,
444 );
445 assert_eq!(windows.len(), 1);
446 let fgs = &windows.get(&0).unwrap().files;
447 assert_eq!(2, fgs.len());
448 assert_eq!(
449 fgs.get(&NonZeroU64::new(1))
450 .unwrap()
451 .files()
452 .iter()
453 .map(|f| f.file_id().file_id())
454 .collect::<HashSet<_>>(),
455 [files[0], files[1]].into_iter().collect()
456 );
457 assert_eq!(
458 fgs.get(&NonZeroU64::new(2))
459 .unwrap()
460 .files()
461 .iter()
462 .map(|f| f.file_id().file_id())
463 .collect::<HashSet<_>>(),
464 [files[2], files[3]].into_iter().collect()
465 );
466 }
467
468 #[test]
469 fn test_assign_compacting_to_windows() {
470 let files = [
471 new_file_handle(FileId::random(), 0, 999, 0),
472 new_file_handle(FileId::random(), 0, 999, 0),
473 new_file_handle(FileId::random(), 0, 999, 0),
474 new_file_handle(FileId::random(), 0, 999, 0),
475 new_file_handle(FileId::random(), 0, 999, 0),
476 ];
477 files[0].set_compacting(true);
478 files[2].set_compacting(true);
479 let mut windows = assign_to_windows(files.iter(), 3);
480 let window0 = windows.remove(&0).unwrap();
481 assert_eq!(1, window0.files.len());
482 let candidates = window0
483 .files
484 .into_values()
485 .flat_map(|fg| fg.into_files())
486 .map(|f| f.file_id().file_id())
487 .collect::<HashSet<_>>();
488 assert_eq!(candidates.len(), 3);
489 assert_eq!(
490 candidates,
491 [
492 files[1].file_id().file_id(),
493 files[3].file_id().file_id(),
494 files[4].file_id().file_id()
495 ]
496 .into_iter()
497 .collect::<HashSet<_>>()
498 );
499 }
500
501 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
503
504 fn check_assign_to_windows_with_overlapping(
505 file_time_ranges: &[(i64, i64)],
506 time_window: i64,
507 expected_files: &[ExpectedWindowSpec],
508 ) {
509 let files: Vec<_> = (0..file_time_ranges.len())
510 .map(|_| FileId::random())
511 .collect();
512
513 let file_handles = files
514 .iter()
515 .zip(file_time_ranges.iter())
516 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
517 .collect::<Vec<_>>();
518
519 let windows = assign_to_windows(file_handles.iter(), time_window);
520
521 for (expected_window, overlapping, window_files) in expected_files {
522 let actual_window = windows.get(expected_window).unwrap();
523 assert_eq!(*overlapping, actual_window.overlapping);
524 let mut file_ranges = actual_window
525 .files
526 .iter()
527 .flat_map(|(_, f)| {
528 f.files().iter().map(|f| {
529 let (s, e) = f.time_range();
530 (s.value(), e.value())
531 })
532 })
533 .collect::<Vec<_>>();
534 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
535 assert_eq!(window_files, &file_ranges);
536 }
537 }
538
539 #[test]
540 fn test_assign_to_windows_with_overlapping() {
541 check_assign_to_windows_with_overlapping(
542 &[(0, 999), (1000, 1999), (2000, 2999)],
543 2,
544 &[
545 (0, false, vec![(0, 999)]),
546 (2, false, vec![(1000, 1999), (2000, 2999)]),
547 ],
548 );
549
550 check_assign_to_windows_with_overlapping(
551 &[(0, 1), (0, 999), (100, 2999)],
552 2,
553 &[
554 (0, true, vec![(0, 1), (0, 999)]),
555 (2, true, vec![(100, 2999)]),
556 ],
557 );
558
559 check_assign_to_windows_with_overlapping(
560 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
561 2,
562 &[
563 (0, false, vec![(0, 999)]),
564 (2, false, vec![(1000, 1999), (2000, 2999)]),
565 (4, false, vec![(3000, 3999)]),
566 ],
567 );
568
569 check_assign_to_windows_with_overlapping(
570 &[
571 (0, 999),
572 (1000, 1999),
573 (2000, 2999),
574 (3000, 3999),
575 (0, 3999),
576 ],
577 2,
578 &[
579 (0, true, vec![(0, 999)]),
580 (2, true, vec![(1000, 1999), (2000, 2999)]),
581 (4, true, vec![(0, 3999), (3000, 3999)]),
582 ],
583 );
584
585 check_assign_to_windows_with_overlapping(
586 &[
587 (0, 999),
588 (1000, 1999),
589 (2000, 2999),
590 (3000, 3999),
591 (1999, 3999),
592 ],
593 2,
594 &[
595 (0, false, vec![(0, 999)]),
596 (2, true, vec![(1000, 1999), (2000, 2999)]),
597 (4, true, vec![(1999, 3999), (3000, 3999)]),
598 ],
599 );
600
601 check_assign_to_windows_with_overlapping(
602 &[
603 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
609 2,
610 &[
611 (0, false, vec![(0, 999)]),
613 (2, true, vec![(1000, 1999), (2000, 2999)]),
614 (4, true, vec![(2999, 3999), (3000, 3999)]),
615 ],
616 );
617
618 check_assign_to_windows_with_overlapping(
619 &[
620 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
626 2,
627 &[
628 (0, true, vec![(0, 999)]),
630 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
631 (4, false, vec![(3000, 3999)]),
632 ],
633 );
634 }
635
636 struct CompactionPickerTestCase {
637 window_size: i64,
638 input_files: Vec<FileHandle>,
639 expected_outputs: Vec<ExpectedOutput>,
640 }
641
642 impl CompactionPickerTestCase {
643 fn check(&self) {
644 let file_id_to_idx = self
645 .input_files
646 .iter()
647 .enumerate()
648 .map(|(idx, file)| (file.file_id(), idx))
649 .collect::<HashMap<_, _>>();
650 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
651 let active_window =
652 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
653 let output = TwcsPicker {
654 trigger_file_num: 4,
655 time_window_seconds: None,
656 max_output_file_size: None,
657 append_mode: false,
658 }
659 .build_output(RegionId::from_u64(0), &mut windows, active_window);
660
661 let output = output
662 .iter()
663 .map(|o| {
664 let input_file_ids = o
665 .inputs
666 .iter()
667 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
668 .collect::<HashSet<_>>();
669 (input_file_ids, o.output_level)
670 })
671 .collect::<Vec<_>>();
672
673 let expected = self
674 .expected_outputs
675 .iter()
676 .map(|o| {
677 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
678 (input_file_ids, o.output_level)
679 })
680 .collect::<Vec<_>>();
681 assert_eq!(expected, output);
682 }
683 }
684
685 struct ExpectedOutput {
686 input_files: Vec<usize>,
687 output_level: Level,
688 }
689
690 #[test]
691 fn test_build_twcs_output() {
692 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
693
694 CompactionPickerTestCase {
696 window_size: 3,
697 input_files: [
698 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
699 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
700 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
703 .to_vec(),
704 expected_outputs: vec![
705 ExpectedOutput {
706 input_files: vec![0, 1],
707 output_level: 1,
708 },
709 ExpectedOutput {
710 input_files: vec![2, 3],
711 output_level: 1,
712 },
713 ],
714 }
715 .check();
716
717 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
724 CompactionPickerTestCase {
725 window_size: 3,
726 input_files: [
727 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
728 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
729 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
730 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
731 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
732 ]
733 .to_vec(),
734 expected_outputs: vec![
735 ExpectedOutput {
736 input_files: vec![0, 1],
737 output_level: 1,
738 },
739 ExpectedOutput {
740 input_files: vec![2, 4],
741 output_level: 1,
742 },
743 ],
744 }
745 .check();
746
747 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
751 CompactionPickerTestCase {
752 window_size: 3,
753 input_files: [
754 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
755 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
756 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
757 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
758 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
759 ]
760 .to_vec(),
761 expected_outputs: vec![ExpectedOutput {
762 input_files: vec![0, 1, 4],
763 output_level: 1,
764 }],
765 }
766 .check();
767 }
768
769 }