1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39#[derive(Debug)]
42pub struct TwcsPicker {
43 pub trigger_file_num: usize,
45 pub time_window_seconds: Option<i64>,
47 pub max_output_file_size: Option<u64>,
49 pub append_mode: bool,
51}
52
53impl TwcsPicker {
54 fn build_output(
56 &self,
57 region_id: RegionId,
58 time_windows: &mut BTreeMap<i64, Window>,
59 active_window: Option<i64>,
60 ) -> Vec<CompactionOutput> {
61 let mut output = vec![];
62 for (window, files) in time_windows {
63 if files.files.is_empty() {
64 continue;
65 }
66 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67 let sorted_runs = find_sorted_runs(&mut files_to_merge);
68 let found_runs = sorted_runs.len();
69 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
72
73 let inputs = if found_runs > 1 {
74 reduce_runs(sorted_runs)
75 } else {
76 let run = sorted_runs.last().unwrap();
77 if run.items().len() < self.trigger_file_num {
78 continue;
79 }
80 merge_seq_files(run.items(), self.max_output_file_size)
82 };
83
84 if !inputs.is_empty() {
85 log_pick_result(
86 region_id,
87 *window,
88 active_window,
89 found_runs,
90 files.files.len(),
91 self.max_output_file_size,
92 filter_deleted,
93 &inputs,
94 );
95 output.push(CompactionOutput {
96 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
98 filter_deleted,
99 output_time_range: None, });
101 }
102 }
103 output
104 }
105}
106
107#[allow(clippy::too_many_arguments)]
108fn log_pick_result(
109 region_id: RegionId,
110 window: i64,
111 active_window: Option<i64>,
112 found_runs: usize,
113 file_num: usize,
114 max_output_file_size: Option<u64>,
115 filter_deleted: bool,
116 inputs: &[FileGroup],
117) {
118 let input_file_str: Vec<String> = inputs
119 .iter()
120 .map(|f| {
121 let range = f.range();
122 let start = range.0.to_iso8601_string();
123 let end = range.1.to_iso8601_string();
124 let num_rows = f.num_rows();
125 format!(
126 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
127 f.file_ids(),
128 start,
129 end,
130 ReadableSize(f.size() as u64),
131 num_rows
132 )
133 })
134 .collect();
135 let window_str = Timestamp::new_second(window).to_iso8601_string();
136 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
137 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
138 info!(
139 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
140 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
141 input files: {:?}",
142 region_id,
143 window_str,
144 active_window_str,
145 found_runs,
146 file_num,
147 max_output_file_size,
148 filter_deleted,
149 input_file_str
150 );
151}
152
153impl Picker for TwcsPicker {
154 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
155 let region_id = compaction_region.region_id;
156 let levels = compaction_region.current_version.ssts.levels();
157
158 let expired_ssts =
159 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
160 if !expired_ssts.is_empty() {
161 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
162 expired_ssts.iter().for_each(|f| f.set_compacting(true));
164 }
165
166 let compaction_time_window = compaction_region
167 .current_version
168 .compaction_time_window
169 .map(|window| window.as_secs() as i64);
170 let time_window_size = compaction_time_window
171 .or(self.time_window_seconds)
172 .unwrap_or_else(|| {
173 let inferred = infer_time_bucket(levels[0].files());
174 info!(
175 "Compaction window for region {} is not present, inferring from files: {:?}",
176 region_id, inferred
177 );
178 inferred
179 });
180
181 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
183 let mut windows =
185 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
186 let outputs = self.build_output(region_id, &mut windows, active_window);
187
188 if outputs.is_empty() && expired_ssts.is_empty() {
189 return None;
190 }
191
192 let max_file_size = self.max_output_file_size.map(|v| v as usize);
193 Some(PickerOutput {
194 outputs,
195 expired_ssts,
196 time_window_size,
197 max_file_size,
198 })
199 }
200}
201
202struct Window {
203 start: Timestamp,
204 end: Timestamp,
205 files: HashMap<Option<NonZeroU64>, FileGroup>,
208 time_window: i64,
209 overlapping: bool,
210}
211
212impl Window {
213 fn new_with_file(file: FileHandle) -> Self {
215 let (start, end) = file.time_range();
216 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
217 Self {
218 start,
219 end,
220 files,
221 time_window: 0,
222 overlapping: false,
223 }
224 }
225
226 fn range(&self) -> (Timestamp, Timestamp) {
228 (self.start, self.end)
229 }
230
231 fn add_file(&mut self, file: FileHandle) {
233 let (start, end) = file.time_range();
234 self.start = self.start.min(start);
235 self.end = self.end.max(end);
236
237 match self.files.entry(file.meta_ref().sequence) {
238 Entry::Occupied(mut o) => {
239 o.get_mut().add_file(file);
240 }
241 Entry::Vacant(v) => {
242 v.insert(FileGroup::new_with_file(file));
243 }
244 }
245 }
246
247 fn files(&self) -> impl Iterator<Item = &FileGroup> {
248 self.files.values()
249 }
250}
251
252fn assign_to_windows<'a>(
254 files: impl Iterator<Item = &'a FileHandle>,
255 time_window_size: i64,
256) -> BTreeMap<i64, Window> {
257 let mut windows: HashMap<i64, Window> = HashMap::new();
258 for f in files {
260 if f.compacting() {
261 continue;
262 }
263 let (_, end) = f.time_range();
264 let time_window = end
265 .convert_to(TimeUnit::Second)
266 .unwrap()
267 .value()
268 .align_to_ceil_by_bucket(time_window_size)
269 .unwrap_or(i64::MIN);
270
271 match windows.entry(time_window) {
272 Entry::Occupied(mut e) => {
273 e.get_mut().add_file(f.clone());
274 }
275 Entry::Vacant(e) => {
276 let mut window = Window::new_with_file(f.clone());
277 window.time_window = time_window;
278 e.insert(window);
279 }
280 }
281 }
282 if windows.is_empty() {
283 return BTreeMap::new();
284 }
285
286 let mut windows = windows.into_values().collect::<Vec<_>>();
287 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
288
289 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
292 let next_range = windows[idx].range();
293 if overlaps(¤t_range, &next_range) {
294 windows[idx - 1].overlapping = true;
295 windows[idx].overlapping = true;
296 }
297 current_range = (
298 current_range.0.min(next_range.0),
299 current_range.1.max(next_range.1),
300 );
301 }
302
303 windows.into_iter().map(|w| (w.time_window, w)).collect()
304}
305
306fn find_latest_window_in_seconds<'a>(
309 files: impl Iterator<Item = &'a FileHandle>,
310 time_window_size: i64,
311) -> Option<i64> {
312 let mut latest_timestamp = None;
313 for f in files {
314 let (_, end) = f.time_range();
315 if let Some(latest) = latest_timestamp {
316 if end > latest {
317 latest_timestamp = Some(end);
318 }
319 } else {
320 latest_timestamp = Some(end);
321 }
322 }
323 latest_timestamp
324 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
325 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::HashSet;
331
332 use super::*;
333 use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
334 use crate::sst::file::{FileId, Level};
335
336 #[test]
337 fn test_get_latest_window_in_seconds() {
338 assert_eq!(
339 Some(1),
340 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
341 );
342 assert_eq!(
343 Some(1),
344 find_latest_window_in_seconds(
345 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
346 1
347 )
348 );
349
350 assert_eq!(
351 Some(-9223372036854000),
352 find_latest_window_in_seconds(
353 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
354 3600,
355 )
356 );
357
358 assert_eq!(
359 (i64::MAX / 10000000 + 1) * 10000,
360 find_latest_window_in_seconds(
361 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
362 10000,
363 )
364 .unwrap()
365 );
366
367 assert_eq!(
368 Some((i64::MAX / 3600000 + 1) * 3600),
369 find_latest_window_in_seconds(
370 [
371 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
372 new_file_handle(FileId::random(), 0, 1000, 0)
373 ]
374 .iter(),
375 3600
376 )
377 );
378 }
379
380 #[test]
381 fn test_assign_to_windows() {
382 let windows = assign_to_windows(
383 [
384 new_file_handle(FileId::random(), 0, 999, 0),
385 new_file_handle(FileId::random(), 0, 999, 0),
386 new_file_handle(FileId::random(), 0, 999, 0),
387 new_file_handle(FileId::random(), 0, 999, 0),
388 new_file_handle(FileId::random(), 0, 999, 0),
389 ]
390 .iter(),
391 3,
392 );
393 let fgs = &windows.get(&0).unwrap().files;
394 assert_eq!(1, fgs.len());
395 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
396
397 let files = [FileId::random(); 3];
398 let windows = assign_to_windows(
399 [
400 new_file_handle(files[0], -2000, -3, 0),
401 new_file_handle(files[1], 0, 2999, 0),
402 new_file_handle(files[2], 50, 10001, 0),
403 ]
404 .iter(),
405 3,
406 );
407 assert_eq!(
408 files[0],
409 windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
410 );
411 assert_eq!(
412 files[1],
413 windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
414 );
415 assert_eq!(
416 files[2],
417 windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
418 );
419 }
420
421 #[test]
422 fn test_assign_file_groups_to_windows() {
423 let files = [
424 FileId::random(),
425 FileId::random(),
426 FileId::random(),
427 FileId::random(),
428 ];
429 let windows = assign_to_windows(
430 [
431 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
432 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
433 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
434 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
435 ]
436 .iter(),
437 3,
438 );
439 assert_eq!(windows.len(), 1);
440 let fgs = &windows.get(&0).unwrap().files;
441 assert_eq!(2, fgs.len());
442 assert_eq!(
443 fgs.get(&NonZeroU64::new(1))
444 .unwrap()
445 .files()
446 .iter()
447 .map(|f| f.file_id())
448 .collect::<HashSet<_>>(),
449 [files[0], files[1]].into_iter().collect()
450 );
451 assert_eq!(
452 fgs.get(&NonZeroU64::new(2))
453 .unwrap()
454 .files()
455 .iter()
456 .map(|f| f.file_id())
457 .collect::<HashSet<_>>(),
458 [files[2], files[3]].into_iter().collect()
459 );
460 }
461
462 #[test]
463 fn test_assign_compacting_to_windows() {
464 let files = [
465 new_file_handle(FileId::random(), 0, 999, 0),
466 new_file_handle(FileId::random(), 0, 999, 0),
467 new_file_handle(FileId::random(), 0, 999, 0),
468 new_file_handle(FileId::random(), 0, 999, 0),
469 new_file_handle(FileId::random(), 0, 999, 0),
470 ];
471 files[0].set_compacting(true);
472 files[2].set_compacting(true);
473 let mut windows = assign_to_windows(files.iter(), 3);
474 let window0 = windows.remove(&0).unwrap();
475 assert_eq!(1, window0.files.len());
476 let candidates = window0
477 .files
478 .into_values()
479 .flat_map(|fg| fg.into_files())
480 .map(|f| f.file_id())
481 .collect::<HashSet<_>>();
482 assert_eq!(candidates.len(), 3);
483 assert_eq!(
484 candidates,
485 [files[1].file_id(), files[3].file_id(), files[4].file_id()]
486 .into_iter()
487 .collect::<HashSet<_>>()
488 );
489 }
490
491 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
493
494 fn check_assign_to_windows_with_overlapping(
495 file_time_ranges: &[(i64, i64)],
496 time_window: i64,
497 expected_files: &[ExpectedWindowSpec],
498 ) {
499 let files: Vec<_> = (0..file_time_ranges.len())
500 .map(|_| FileId::random())
501 .collect();
502
503 let file_handles = files
504 .iter()
505 .zip(file_time_ranges.iter())
506 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
507 .collect::<Vec<_>>();
508
509 let windows = assign_to_windows(file_handles.iter(), time_window);
510
511 for (expected_window, overlapping, window_files) in expected_files {
512 let actual_window = windows.get(expected_window).unwrap();
513 assert_eq!(*overlapping, actual_window.overlapping);
514 let mut file_ranges = actual_window
515 .files
516 .iter()
517 .flat_map(|(_, f)| {
518 f.files().iter().map(|f| {
519 let (s, e) = f.time_range();
520 (s.value(), e.value())
521 })
522 })
523 .collect::<Vec<_>>();
524 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
525 assert_eq!(window_files, &file_ranges);
526 }
527 }
528
529 #[test]
530 fn test_assign_to_windows_with_overlapping() {
531 check_assign_to_windows_with_overlapping(
532 &[(0, 999), (1000, 1999), (2000, 2999)],
533 2,
534 &[
535 (0, false, vec![(0, 999)]),
536 (2, false, vec![(1000, 1999), (2000, 2999)]),
537 ],
538 );
539
540 check_assign_to_windows_with_overlapping(
541 &[(0, 1), (0, 999), (100, 2999)],
542 2,
543 &[
544 (0, true, vec![(0, 1), (0, 999)]),
545 (2, true, vec![(100, 2999)]),
546 ],
547 );
548
549 check_assign_to_windows_with_overlapping(
550 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
551 2,
552 &[
553 (0, false, vec![(0, 999)]),
554 (2, false, vec![(1000, 1999), (2000, 2999)]),
555 (4, false, vec![(3000, 3999)]),
556 ],
557 );
558
559 check_assign_to_windows_with_overlapping(
560 &[
561 (0, 999),
562 (1000, 1999),
563 (2000, 2999),
564 (3000, 3999),
565 (0, 3999),
566 ],
567 2,
568 &[
569 (0, true, vec![(0, 999)]),
570 (2, true, vec![(1000, 1999), (2000, 2999)]),
571 (4, true, vec![(0, 3999), (3000, 3999)]),
572 ],
573 );
574
575 check_assign_to_windows_with_overlapping(
576 &[
577 (0, 999),
578 (1000, 1999),
579 (2000, 2999),
580 (3000, 3999),
581 (1999, 3999),
582 ],
583 2,
584 &[
585 (0, false, vec![(0, 999)]),
586 (2, true, vec![(1000, 1999), (2000, 2999)]),
587 (4, true, vec![(1999, 3999), (3000, 3999)]),
588 ],
589 );
590
591 check_assign_to_windows_with_overlapping(
592 &[
593 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
599 2,
600 &[
601 (0, false, vec![(0, 999)]),
603 (2, true, vec![(1000, 1999), (2000, 2999)]),
604 (4, true, vec![(2999, 3999), (3000, 3999)]),
605 ],
606 );
607
608 check_assign_to_windows_with_overlapping(
609 &[
610 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
616 2,
617 &[
618 (0, true, vec![(0, 999)]),
620 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
621 (4, false, vec![(3000, 3999)]),
622 ],
623 );
624 }
625
626 struct CompactionPickerTestCase {
627 window_size: i64,
628 input_files: Vec<FileHandle>,
629 expected_outputs: Vec<ExpectedOutput>,
630 }
631
632 impl CompactionPickerTestCase {
633 fn check(&self) {
634 let file_id_to_idx = self
635 .input_files
636 .iter()
637 .enumerate()
638 .map(|(idx, file)| (file.file_id(), idx))
639 .collect::<HashMap<_, _>>();
640 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
641 let active_window =
642 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
643 let output = TwcsPicker {
644 trigger_file_num: 4,
645 time_window_seconds: None,
646 max_output_file_size: None,
647 append_mode: false,
648 }
649 .build_output(RegionId::from_u64(0), &mut windows, active_window);
650
651 let output = output
652 .iter()
653 .map(|o| {
654 let input_file_ids = o
655 .inputs
656 .iter()
657 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
658 .collect::<HashSet<_>>();
659 (input_file_ids, o.output_level)
660 })
661 .collect::<Vec<_>>();
662
663 let expected = self
664 .expected_outputs
665 .iter()
666 .map(|o| {
667 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
668 (input_file_ids, o.output_level)
669 })
670 .collect::<Vec<_>>();
671 assert_eq!(expected, output);
672 }
673 }
674
675 struct ExpectedOutput {
676 input_files: Vec<usize>,
677 output_level: Level,
678 }
679
680 #[test]
681 fn test_build_twcs_output() {
682 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
683
684 CompactionPickerTestCase {
686 window_size: 3,
687 input_files: [
688 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
689 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
690 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
693 .to_vec(),
694 expected_outputs: vec![
695 ExpectedOutput {
696 input_files: vec![0, 1],
697 output_level: 1,
698 },
699 ExpectedOutput {
700 input_files: vec![2, 3],
701 output_level: 1,
702 },
703 ],
704 }
705 .check();
706
707 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
714 CompactionPickerTestCase {
715 window_size: 3,
716 input_files: [
717 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
718 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
719 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
720 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
721 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
722 ]
723 .to_vec(),
724 expected_outputs: vec![
725 ExpectedOutput {
726 input_files: vec![0, 1],
727 output_level: 1,
728 },
729 ExpectedOutput {
730 input_files: vec![2, 4],
731 output_level: 1,
732 },
733 ],
734 }
735 .check();
736
737 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
741 CompactionPickerTestCase {
742 window_size: 3,
743 input_files: [
744 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
745 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
746 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
747 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
748 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
749 ]
750 .to_vec(),
751 expected_outputs: vec![ExpectedOutput {
752 input_files: vec![0, 1, 4],
753 output_level: 1,
754 }],
755 }
756 .check();
757 }
758
759 }