1use common_base::readable_size::ReadableSize;
19use common_base::BitVec;
20use common_time::Timestamp;
21use smallvec::{smallvec, SmallVec};
22
23use crate::sst::file::{FileHandle, RegionFileId};
24
25const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
27
28pub trait Ranged {
30 type BoundType: Ord + Copy;
31
32 fn range(&self) -> (Self::BoundType, Self::BoundType);
34
35 fn overlap<T>(&self, other: &T) -> bool
36 where
37 T: Ranged<BoundType = Self::BoundType>,
38 {
39 let (lhs_start, lhs_end) = self.range();
40 let (rhs_start, rhs_end) = other.range();
41
42 lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
43 }
44}
45
46pub fn find_overlapping_items<T: Item + Clone>(
47 l: &mut SortedRun<T>,
48 r: &mut SortedRun<T>,
49 result: &mut Vec<T>,
50) {
51 if l.items.is_empty() || r.items.is_empty() {
52 return;
53 }
54
55 result.clear();
56 result.reserve(l.items.len() + r.items.len());
57
58 if !l.sorted {
60 sort_ranged_items(&mut l.items);
61 l.sorted = true;
62 }
63 if !r.sorted {
64 sort_ranged_items(&mut r.items);
65 r.sorted = true;
66 }
67
68 let mut r_idx = 0;
69
70 let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
71
72 for (lhs_idx, lhs) in l.items.iter().enumerate() {
73 let (lhs_start, lhs_end) = lhs.range();
74
75 while r_idx < r.items.len() {
77 let (_, rhs_end) = r.items[r_idx].range();
78 if rhs_end < lhs_start {
79 r_idx += 1;
80 } else {
81 break;
82 }
83 }
84
85 let mut j = r_idx;
87 while j < r.items.len() {
88 let (rhs_start, rhs_end) = r.items[j].range();
89
90 if rhs_start > lhs_end {
92 break;
93 }
94
95 if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) {
97 if !selected[lhs_idx] {
98 result.push(lhs.clone());
99 selected.set(lhs_idx, true);
100 }
101
102 let rhs_selected_idx = l.items.len() + j;
103 if !selected[rhs_selected_idx] {
104 result.push(r.items[j].clone());
105 selected.set(rhs_selected_idx, true);
106 }
107 }
108
109 j += 1;
110 }
111 }
112}
113
114fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
116 values.sort_unstable_by(|l, r| {
117 let (l_start, l_end) = l.range();
118 let (r_start, r_end) = r.range();
119 l_start.cmp(&r_start).then(r_end.cmp(&l_end))
120 });
121}
122
123pub trait Item: Ranged + Clone {
125 fn size(&self) -> usize;
127}
128
129#[derive(Debug, Clone)]
131pub struct FileGroup {
132 files: SmallVec<[FileHandle; 2]>,
133 size: usize,
134 num_rows: usize,
135 min_timestamp: Timestamp,
136 max_timestamp: Timestamp,
137}
138
139impl FileGroup {
140 pub(crate) fn new_with_file(file: FileHandle) -> Self {
141 let size = file.size() as usize;
142 let (min_timestamp, max_timestamp) = file.time_range();
143 let num_rows = file.num_rows();
144 Self {
145 files: smallvec![file],
146 size,
147 num_rows,
148 min_timestamp,
149 max_timestamp,
150 }
151 }
152
153 pub(crate) fn num_rows(&self) -> usize {
154 self.num_rows
155 }
156
157 pub(crate) fn add_file(&mut self, file: FileHandle) {
158 self.size += file.size() as usize;
159 self.num_rows += file.num_rows();
160 let (min_timestamp, max_timestamp) = file.time_range();
161 self.min_timestamp = self.min_timestamp.min(min_timestamp);
162 self.max_timestamp = self.max_timestamp.max(max_timestamp);
163 self.files.push(file);
164 }
165
166 #[cfg(test)]
167 pub(crate) fn files(&self) -> &[FileHandle] {
168 &self.files[..]
169 }
170
171 pub(crate) fn file_ids(&self) -> SmallVec<[RegionFileId; 2]> {
172 SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
173 }
174
175 pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
176 self.files.into_iter()
177 }
178
179 pub(crate) fn is_all_level_0(&self) -> bool {
180 self.files.iter().all(|f| f.level() == 0)
181 }
182}
183
184impl Ranged for FileGroup {
185 type BoundType = Timestamp;
186
187 fn range(&self) -> (Self::BoundType, Self::BoundType) {
188 (self.min_timestamp, self.max_timestamp)
189 }
190}
191
192impl Item for FileGroup {
193 fn size(&self) -> usize {
194 self.size
195 }
196}
197
198#[derive(Debug, Clone)]
200pub struct SortedRun<T: Item> {
201 items: Vec<T>,
203 size: usize,
205 start: Option<T::BoundType>,
207 end: Option<T::BoundType>,
209 sorted: bool,
211}
212
213impl<T: Item> From<Vec<T>> for SortedRun<T> {
214 fn from(items: Vec<T>) -> Self {
215 let mut r = Self {
216 items: Vec::with_capacity(items.len()),
217 size: 0,
218 start: None,
219 end: None,
220 sorted: false,
221 };
222 for item in items {
223 r.push_item(item);
224 }
225
226 r
227 }
228}
229
230impl<T> Default for SortedRun<T>
231where
232 T: Item,
233{
234 fn default() -> Self {
235 Self {
236 items: vec![],
237 size: 0,
238 start: None,
239 end: None,
240 sorted: false,
241 }
242 }
243}
244
245impl<T> SortedRun<T>
246where
247 T: Item,
248{
249 pub fn items(&self) -> &[T] {
250 &self.items
251 }
252
253 fn push_item(&mut self, t: T) {
254 let (file_start, file_end) = t.range();
255 self.size += t.size();
256 self.items.push(t);
257 self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
258 self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
259 }
260}
261
262pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
264where
265 T: Item,
266{
267 if items.is_empty() {
268 return vec![];
269 }
270 sort_ranged_items(items);
272
273 let mut current_run = SortedRun::default();
274 let mut runs = vec![];
275
276 let mut selection = BitVec::repeat(false, items.len());
277 while !selection.all() {
278 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
280 if *selected {
281 continue;
283 }
284 match current_run.items.last() {
285 None => {
286 selected.set(true);
288 current_run.push_item(item.clone());
289 }
290 Some(last) => {
291 if !last.overlap(item) {
294 selected.set(true);
296 current_run.push_item(item.clone());
297 }
298 }
299 }
300 }
301 runs.push(std::mem::take(&mut current_run));
303 }
304 runs
305}
306
307pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
310 assert!(runs.len() > 1);
311 runs.sort_unstable_by(|a, b| a.size.cmp(&b.size));
313 let probe_end = runs.len().min(100);
315 let mut min_penalty = usize::MAX;
316 let mut files = vec![];
317 let mut temp_files = vec![];
318 for i in 0..probe_end {
319 for j in i + 1..probe_end {
320 let (a, b) = runs.split_at_mut(j);
321 find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
322 let penalty = temp_files.iter().map(|e| e.size()).sum();
323 if penalty < min_penalty {
324 min_penalty = penalty;
325 files.clear();
326 files.extend_from_slice(&temp_files);
327 }
328 }
329 }
330 files
331}
332
333pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
354 if input_files.is_empty() || input_files.len() == 1 {
355 return vec![];
356 }
357
358 let files_to_process = if input_files.len() > 100 {
360 &input_files[0..100]
361 } else {
362 input_files
363 };
364
365 let target_size = match max_file_size {
367 Some(size) => size as usize,
368 None => {
369 let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
371 ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
372 .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
373 }
374 };
375
376 let mut best_group = Vec::new();
378 let mut best_score = f64::NEG_INFINITY;
379
380 for start_idx in (0..files_to_process.len()).rev() {
382 for end_idx in (start_idx + 1..files_to_process.len()).rev() {
384 let group = &files_to_process[start_idx..=end_idx];
385 let total_size: usize = group.iter().map(|f| f.size()).sum();
386
387 if total_size > target_size {
389 continue; }
391
392 let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
394 let amplification_factor = largest_file_size as f64 / total_size as f64;
395
396 let file_reduction = group.len() - 1;
398
399 let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
404 let amp_factor_score = (1.0 - amplification_factor) * 1.5; let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); let score = file_reduction_score + amp_factor_score + size_efficiency;
408
409 if score >= best_score {
412 best_score = score;
413 best_group = group.to_vec();
414 }
415 }
416 }
417
418 best_group
419}
420
421#[cfg(test)]
422mod tests {
423 use std::collections::HashSet;
424
425 use super::*;
426
427 #[derive(Clone, Debug, PartialEq)]
428 struct MockFile {
429 start: i64,
430 end: i64,
431 size: usize,
432 }
433
434 impl Ranged for MockFile {
435 type BoundType = i64;
436
437 fn range(&self) -> (Self::BoundType, Self::BoundType) {
438 (self.start, self.end)
439 }
440 }
441
442 impl Item for MockFile {
443 fn size(&self) -> usize {
444 self.size
445 }
446 }
447
448 fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
449 ranges
450 .iter()
451 .map(|(start, end)| MockFile {
452 start: *start,
453 end: *end,
454 size: (*end - *start) as usize,
455 })
456 .collect()
457 }
458
459 fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
460 items
461 .iter()
462 .map(|(start, end, size)| MockFile {
463 start: *start,
464 end: *end,
465 size: *size,
466 })
467 .collect()
468 }
469
470 fn check_sorted_runs(
471 ranges: &[(i64, i64)],
472 expected_runs: &[Vec<(i64, i64)>],
473 ) -> Vec<SortedRun<MockFile>> {
474 let mut files = build_items(ranges);
475 let runs = find_sorted_runs(&mut files);
476
477 let result_file_ranges: Vec<Vec<_>> = runs
478 .iter()
479 .map(|r| r.items.iter().map(|f| f.range()).collect())
480 .collect();
481 assert_eq!(&expected_runs, &result_file_ranges);
482 runs
483 }
484
485 #[test]
486 fn test_find_sorted_runs() {
487 check_sorted_runs(&[], &[]);
488 check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
489 check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
490 check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
491 check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
492 check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
493 check_sorted_runs(
494 &[(1, 3), (2, 4), (4, 5)],
495 &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
496 );
497
498 check_sorted_runs(
499 &[(1, 2), (3, 4), (3, 5)],
500 &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
501 );
502
503 check_sorted_runs(
504 &[(1, 3), (2, 4), (5, 6)],
505 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
506 );
507
508 check_sorted_runs(
509 &[(1, 2), (3, 5), (4, 6)],
510 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
511 );
512
513 check_sorted_runs(
514 &[(1, 2), (3, 4), (4, 6), (7, 8)],
515 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
516 );
517 check_sorted_runs(
518 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
519 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
520 );
521
522 check_sorted_runs(
523 &[(10, 19), (20, 21), (20, 29), (30, 39)],
524 &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
525 );
526
527 check_sorted_runs(
528 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
529 &[
530 vec![(10, 19), (20, 29), (30, 39)],
531 vec![(21, 22), (31, 32), (32, 42)],
532 ],
533 );
534 }
535
536 fn check_reduce_runs(
537 files: &[(i64, i64)],
538 expected_runs: &[Vec<(i64, i64)>],
539 expected: &[(i64, i64)],
540 ) {
541 let runs = check_sorted_runs(files, expected_runs);
542 if runs.len() <= 1 {
543 assert!(expected.is_empty());
544 return;
545 }
546 let files_to_merge = reduce_runs(runs);
547 let file_to_merge_timestamps = files_to_merge
548 .into_iter()
549 .map(|f| (f.start, f.end))
550 .collect::<HashSet<_>>();
551
552 let expected = expected.iter().cloned().collect::<HashSet<_>>();
553 assert_eq!(&expected, &file_to_merge_timestamps);
554 }
555
556 #[test]
557 fn test_reduce_runs() {
558 check_reduce_runs(
561 &[(1, 3), (2, 4), (5, 6)],
562 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
563 &[(1, 3), (2, 4)],
564 );
565
566 check_reduce_runs(
569 &[(1, 2), (3, 5), (4, 6)],
570 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
571 &[(3, 5), (4, 6)],
572 );
573
574 check_reduce_runs(
577 &[(1, 2), (3, 4), (4, 6), (7, 8)],
578 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
579 &[],
580 );
581
582 check_reduce_runs(
585 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
586 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
587 &[(5, 6), (3, 4), (3, 6)], );
589
590 check_reduce_runs(
593 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
594 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
595 &[(3, 4), (3, 6), (5, 6)],
596 );
597
598 check_reduce_runs(
602 &[
603 (10, 20),
604 (30, 40),
605 (50, 60),
606 (50, 80),
607 (80, 90),
608 (80, 100),
609 (100, 110),
610 ],
611 &[
612 vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
613 vec![(50, 60), (80, 90)],
614 ],
615 &[(50, 80), (80, 100), (50, 60), (80, 90)],
616 );
617
618 check_reduce_runs(
623 &[(0, 10), (0, 11), (0, 12), (0, 13)],
624 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
625 &[(0, 10), (0, 11)],
626 );
627 }
628
629 #[test]
630 fn test_find_overlapping_items() {
631 let mut result = Vec::new();
632
633 find_overlapping_items(
635 &mut SortedRun::from(Vec::<MockFile>::new()),
636 &mut SortedRun::from(Vec::<MockFile>::new()),
637 &mut result,
638 );
639 assert_eq!(result, Vec::<MockFile>::new());
640
641 let files1 = build_items(&[(1, 3)]);
642 find_overlapping_items(
643 &mut SortedRun::from(files1.clone()),
644 &mut SortedRun::from(Vec::<MockFile>::new()),
645 &mut result,
646 );
647 assert_eq!(result, Vec::<MockFile>::new());
648
649 find_overlapping_items(
650 &mut SortedRun::from(Vec::<MockFile>::new()),
651 &mut SortedRun::from(files1.clone()),
652 &mut result,
653 );
654 assert_eq!(result, Vec::<MockFile>::new());
655
656 let files1 = build_items(&[(1, 3), (5, 7)]);
658 let files2 = build_items(&[(10, 12), (15, 20)]);
659 find_overlapping_items(
660 &mut SortedRun::from(files1),
661 &mut SortedRun::from(files2),
662 &mut result,
663 );
664 assert_eq!(result, Vec::<MockFile>::new());
665
666 let files1 = build_items(&[(1, 5)]);
668 let files2 = build_items(&[(3, 7)]);
669 find_overlapping_items(
670 &mut SortedRun::from(files1),
671 &mut SortedRun::from(files2),
672 &mut result,
673 );
674 assert_eq!(result.len(), 2);
675 assert_eq!(result[0].range(), (1, 5));
676 assert_eq!(result[1].range(), (3, 7));
677
678 let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
680 let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
681 find_overlapping_items(
682 &mut SortedRun::from(files1),
683 &mut SortedRun::from(files2),
684 &mut result,
685 );
686 assert_eq!(result.len(), 6);
687
688 let files1 = build_items(&[(1, 5)]);
690 let files2 = build_items(&[(5, 10)]); find_overlapping_items(
692 &mut SortedRun::from(files1),
693 &mut SortedRun::from(files2),
694 &mut result,
695 );
696 assert_eq!(result.len(), 2); let files1 = build_items(&[(1, 10)]);
700 let files2 = build_items(&[(3, 7)]);
701 find_overlapping_items(
702 &mut SortedRun::from(files1),
703 &mut SortedRun::from(files2),
704 &mut result,
705 );
706 assert_eq!(result.len(), 2);
707
708 let files1 = build_items(&[(1, 5)]);
710 let files2 = build_items(&[(1, 5)]);
711 find_overlapping_items(
712 &mut SortedRun::from(files1),
713 &mut SortedRun::from(files2),
714 &mut result,
715 );
716 assert_eq!(result.len(), 2);
717
718 let files1 = build_items(&[(5, 10), (1, 3)]); let files2 = build_items(&[(2, 7), (8, 12)]); find_overlapping_items(
722 &mut SortedRun::from(files1),
723 &mut SortedRun::from(files2),
724 &mut result,
725 );
726 assert_eq!(result.len(), 4); }
728
729 #[test]
730 fn test_merge_seq_files() {
731 let files = Vec::<MockFile>::new();
733 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
734
735 let files = build_items(&[(1, 5)]);
737 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
738
739 let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
741 let result = merge_seq_files(&files, None);
742 assert_eq!(result.len(), 3);
743 assert_eq!(result[0].size, 1);
744 assert_eq!(result[1].size, 1);
745 assert_eq!(result[2].size, 1);
746
747 let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
749 let result = merge_seq_files(&files, Some(20));
750 assert_eq!(result.len(), 4); let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
754 let result = merge_seq_files(&files, Some(10));
755 assert_eq!(result.len(), 2); let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
759 let result = merge_seq_files(&files, Some(10));
760 assert_eq!(result.len(), 3); let files =
765 build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
766 let result = merge_seq_files(&files, Some(12));
767 assert_eq!(result.len(), 2);
768 assert_eq!(result[0].size, 5);
769 assert_eq!(result[1].size, 5);
770
771 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
773 let result = merge_seq_files(&files, Some(10));
774 assert_eq!(result.len(), 3); assert_eq!(result[0].size, 1);
776 assert_eq!(result[1].size, 1);
777 assert_eq!(result[2].size, 1);
778
779 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
780 let result = merge_seq_files(&files, Some(200));
781 assert_eq!(result.len(), 4);
782
783 let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
784 let result = merge_seq_files(&files, None);
785 assert_eq!(result.len(), 3);
786 assert_eq!(result[0].size, 20);
787 assert_eq!(result[1].size, 20);
788 assert_eq!(result[2].size, 20);
789
790 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
791 let result = merge_seq_files(&files, Some(200));
792 assert_eq!(result.len(), 2);
793 assert_eq!(result[0].size, 100);
794 assert_eq!(result[1].size, 1);
795
796 let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
797 let result = merge_seq_files(&files, Some(40));
798 assert_eq!(result.len(), 2);
799 assert_eq!(result[0].start, 1);
800 assert_eq!(result[1].start, 3);
801 }
802}