1use common_base::readable_size::ReadableSize;
19use common_base::BitVec;
20use common_time::Timestamp;
21use smallvec::{smallvec, SmallVec};
22
23use crate::sst::file::{FileHandle, FileId};
24
25const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
27
28pub trait Ranged {
30 type BoundType: Ord + Copy;
31
32 fn range(&self) -> (Self::BoundType, Self::BoundType);
34
35 fn overlap<T>(&self, other: &T) -> bool
36 where
37 T: Ranged<BoundType = Self::BoundType>,
38 {
39 let (lhs_start, lhs_end) = self.range();
40 let (rhs_start, rhs_end) = other.range();
41
42 lhs_start.max(rhs_start) < lhs_end.min(rhs_end)
43 }
44}
45
46pub fn find_overlapping_items<T: Item + Clone>(
47 l: &mut SortedRun<T>,
48 r: &mut SortedRun<T>,
49 result: &mut Vec<T>,
50) {
51 if l.items.is_empty() || r.items.is_empty() {
52 return;
53 }
54
55 result.clear();
56 result.reserve(l.items.len() + r.items.len());
57
58 if !l.sorted {
60 sort_ranged_items(&mut l.items);
61 l.sorted = true;
62 }
63 if !r.sorted {
64 sort_ranged_items(&mut r.items);
65 r.sorted = true;
66 }
67
68 let mut r_idx = 0;
69
70 let mut selected = BitVec::repeat(false, r.items().len() + l.items.len());
71
72 for (lhs_idx, lhs) in l.items.iter().enumerate() {
73 let (lhs_start, lhs_end) = lhs.range();
74
75 while r_idx < r.items.len() {
77 let (_, rhs_end) = r.items[r_idx].range();
78 if rhs_end < lhs_start {
79 r_idx += 1;
80 } else {
81 break;
82 }
83 }
84
85 let mut j = r_idx;
87 while j < r.items.len() {
88 let (rhs_start, rhs_end) = r.items[j].range();
89
90 if rhs_start > lhs_end {
92 break;
93 }
94
95 if lhs_start.max(rhs_start) <= lhs_end.min(rhs_end) {
97 if !selected[lhs_idx] {
98 result.push(lhs.clone());
99 selected.set(lhs_idx, true);
100 }
101
102 let rhs_selected_idx = l.items.len() + j;
103 if !selected[rhs_selected_idx] {
104 result.push(r.items[j].clone());
105 selected.set(rhs_selected_idx, true);
106 }
107 }
108
109 j += 1;
110 }
111 }
112}
113
114fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
116 values.sort_unstable_by(|l, r| {
117 let (l_start, l_end) = l.range();
118 let (r_start, r_end) = r.range();
119 l_start.cmp(&r_start).then(r_end.cmp(&l_end))
120 });
121}
122
123pub trait Item: Ranged + Clone {
125 fn size(&self) -> usize;
127}
128
129#[derive(Debug, Clone)]
131pub struct FileGroup {
132 files: SmallVec<[FileHandle; 2]>,
133 size: usize,
134 num_rows: usize,
135 min_timestamp: Timestamp,
136 max_timestamp: Timestamp,
137}
138
139impl FileGroup {
140 pub(crate) fn new_with_file(file: FileHandle) -> Self {
141 let size = file.size() as usize;
142 let (min_timestamp, max_timestamp) = file.time_range();
143 let num_rows = file.num_rows();
144 Self {
145 files: smallvec![file],
146 size,
147 num_rows,
148 min_timestamp,
149 max_timestamp,
150 }
151 }
152
153 pub(crate) fn num_rows(&self) -> usize {
154 self.num_rows
155 }
156
157 pub(crate) fn add_file(&mut self, file: FileHandle) {
158 self.size += file.size() as usize;
159 self.num_rows += file.num_rows();
160 let (min_timestamp, max_timestamp) = file.time_range();
161 self.min_timestamp = self.min_timestamp.min(min_timestamp);
162 self.max_timestamp = self.max_timestamp.max(max_timestamp);
163 self.files.push(file);
164 }
165
166 #[cfg(test)]
167 pub(crate) fn files(&self) -> &[FileHandle] {
168 &self.files[..]
169 }
170
171 pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
172 SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
173 }
174
175 pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
176 self.files.into_iter()
177 }
178}
179
180impl Ranged for FileGroup {
181 type BoundType = Timestamp;
182
183 fn range(&self) -> (Self::BoundType, Self::BoundType) {
184 (self.min_timestamp, self.max_timestamp)
185 }
186}
187
188impl Item for FileGroup {
189 fn size(&self) -> usize {
190 self.size
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct SortedRun<T: Item> {
197 items: Vec<T>,
199 size: usize,
201 start: Option<T::BoundType>,
203 end: Option<T::BoundType>,
205 sorted: bool,
207}
208
209impl<T: Item> From<Vec<T>> for SortedRun<T> {
210 fn from(items: Vec<T>) -> Self {
211 let mut r = Self {
212 items: Vec::with_capacity(items.len()),
213 size: 0,
214 start: None,
215 end: None,
216 sorted: false,
217 };
218 for item in items {
219 r.push_item(item);
220 }
221
222 r
223 }
224}
225
226impl<T> Default for SortedRun<T>
227where
228 T: Item,
229{
230 fn default() -> Self {
231 Self {
232 items: vec![],
233 size: 0,
234 start: None,
235 end: None,
236 sorted: false,
237 }
238 }
239}
240
241impl<T> SortedRun<T>
242where
243 T: Item,
244{
245 pub fn items(&self) -> &[T] {
246 &self.items
247 }
248
249 fn push_item(&mut self, t: T) {
250 let (file_start, file_end) = t.range();
251 self.size += t.size();
252 self.items.push(t);
253 self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
254 self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
255 }
256}
257
258pub fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
260where
261 T: Item,
262{
263 if items.is_empty() {
264 return vec![];
265 }
266 sort_ranged_items(items);
268
269 let mut current_run = SortedRun::default();
270 let mut runs = vec![];
271
272 let mut selection = BitVec::repeat(false, items.len());
273 while !selection.all() {
274 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
276 if *selected {
277 continue;
279 }
280 match current_run.items.last() {
281 None => {
282 selected.set(true);
284 current_run.push_item(item.clone());
285 }
286 Some(last) => {
287 if !last.overlap(item) {
290 selected.set(true);
292 current_run.push_item(item.clone());
293 }
294 }
295 }
296 }
297 runs.push(std::mem::take(&mut current_run));
299 }
300 runs
301}
302
303pub fn reduce_runs<T: Item>(mut runs: Vec<SortedRun<T>>) -> Vec<T> {
306 assert!(runs.len() > 1);
307 runs.sort_unstable_by(|a, b| a.size.cmp(&b.size));
309 let probe_end = runs.len().min(100);
311 let mut min_penalty = usize::MAX;
312 let mut files = vec![];
313 let mut temp_files = vec![];
314 for i in 0..probe_end {
315 for j in i + 1..probe_end {
316 let (a, b) = runs.split_at_mut(j);
317 find_overlapping_items(&mut a[i], &mut b[0], &mut temp_files);
318 let penalty = temp_files.iter().map(|e| e.size()).sum();
319 if penalty < min_penalty {
320 min_penalty = penalty;
321 files.clear();
322 files.extend_from_slice(&temp_files);
323 }
324 }
325 }
326 files
327}
328
329pub fn merge_seq_files<T: Item>(input_files: &[T], max_file_size: Option<u64>) -> Vec<T> {
350 if input_files.is_empty() || input_files.len() == 1 {
351 return vec![];
352 }
353
354 let files_to_process = if input_files.len() > 100 {
356 &input_files[0..100]
357 } else {
358 input_files
359 };
360
361 let target_size = match max_file_size {
363 Some(size) => size as usize,
364 None => {
365 let total_size: usize = files_to_process.iter().map(|f| f.size()).sum();
367 ((((total_size as f64) / (files_to_process.len() as f64)) * 1.5) as usize)
368 .min(DEFAULT_MAX_OUTPUT_SIZE as usize)
369 }
370 };
371
372 let mut best_group = Vec::new();
374 let mut best_score = f64::NEG_INFINITY;
375
376 for start_idx in (0..files_to_process.len()).rev() {
378 for end_idx in (start_idx + 1..files_to_process.len()).rev() {
380 let group = &files_to_process[start_idx..=end_idx];
381 let total_size: usize = group.iter().map(|f| f.size()).sum();
382
383 if total_size > target_size {
385 continue; }
387
388 let largest_file_size = group.iter().map(|f| f.size()).max().unwrap_or(0);
390 let amplification_factor = largest_file_size as f64 / total_size as f64;
391
392 let file_reduction = group.len() - 1;
394
395 let file_reduction_score = file_reduction as f64 / files_to_process.len() as f64;
400 let amp_factor_score = (1.0 - amplification_factor) * 1.5; let size_efficiency = (total_size as f64 / target_size as f64).min(1.0); let score = file_reduction_score + amp_factor_score + size_efficiency;
404
405 if score >= best_score {
408 best_score = score;
409 best_group = group.to_vec();
410 }
411 }
412 }
413
414 best_group
415}
416
417#[cfg(test)]
418mod tests {
419 use std::collections::HashSet;
420
421 use super::*;
422
423 #[derive(Clone, Debug, PartialEq)]
424 struct MockFile {
425 start: i64,
426 end: i64,
427 size: usize,
428 }
429
430 impl Ranged for MockFile {
431 type BoundType = i64;
432
433 fn range(&self) -> (Self::BoundType, Self::BoundType) {
434 (self.start, self.end)
435 }
436 }
437
438 impl Item for MockFile {
439 fn size(&self) -> usize {
440 self.size
441 }
442 }
443
444 fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
445 ranges
446 .iter()
447 .map(|(start, end)| MockFile {
448 start: *start,
449 end: *end,
450 size: (*end - *start) as usize,
451 })
452 .collect()
453 }
454
455 fn build_items_with_size(items: &[(i64, i64, usize)]) -> Vec<MockFile> {
456 items
457 .iter()
458 .map(|(start, end, size)| MockFile {
459 start: *start,
460 end: *end,
461 size: *size,
462 })
463 .collect()
464 }
465
466 fn check_sorted_runs(
467 ranges: &[(i64, i64)],
468 expected_runs: &[Vec<(i64, i64)>],
469 ) -> Vec<SortedRun<MockFile>> {
470 let mut files = build_items(ranges);
471 let runs = find_sorted_runs(&mut files);
472
473 let result_file_ranges: Vec<Vec<_>> = runs
474 .iter()
475 .map(|r| r.items.iter().map(|f| f.range()).collect())
476 .collect();
477 assert_eq!(&expected_runs, &result_file_ranges);
478 runs
479 }
480
481 #[test]
482 fn test_find_sorted_runs() {
483 check_sorted_runs(&[], &[]);
484 check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
485 check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
486 check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2), (2, 3)]]);
487 check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
488 check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
489 check_sorted_runs(
490 &[(1, 3), (2, 4), (4, 5)],
491 &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
492 );
493
494 check_sorted_runs(
495 &[(1, 2), (3, 4), (3, 5)],
496 &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
497 );
498
499 check_sorted_runs(
500 &[(1, 3), (2, 4), (5, 6)],
501 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
502 );
503
504 check_sorted_runs(
505 &[(1, 2), (3, 5), (4, 6)],
506 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
507 );
508
509 check_sorted_runs(
510 &[(1, 2), (3, 4), (4, 6), (7, 8)],
511 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
512 );
513 check_sorted_runs(
514 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
515 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
516 );
517
518 check_sorted_runs(
519 &[(10, 19), (20, 21), (20, 29), (30, 39)],
520 &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
521 );
522
523 check_sorted_runs(
524 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
525 &[
526 vec![(10, 19), (20, 29), (30, 39)],
527 vec![(21, 22), (31, 32), (32, 42)],
528 ],
529 );
530 }
531
532 fn check_reduce_runs(
533 files: &[(i64, i64)],
534 expected_runs: &[Vec<(i64, i64)>],
535 expected: &[(i64, i64)],
536 ) {
537 let runs = check_sorted_runs(files, expected_runs);
538 if runs.len() <= 1 {
539 assert!(expected.is_empty());
540 return;
541 }
542 let files_to_merge = reduce_runs(runs);
543 let file_to_merge_timestamps = files_to_merge
544 .into_iter()
545 .map(|f| (f.start, f.end))
546 .collect::<HashSet<_>>();
547
548 let expected = expected.iter().cloned().collect::<HashSet<_>>();
549 assert_eq!(&expected, &file_to_merge_timestamps);
550 }
551
552 #[test]
553 fn test_reduce_runs() {
554 check_reduce_runs(
557 &[(1, 3), (2, 4), (5, 6)],
558 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
559 &[(1, 3), (2, 4)],
560 );
561
562 check_reduce_runs(
565 &[(1, 2), (3, 5), (4, 6)],
566 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
567 &[(3, 5), (4, 6)],
568 );
569
570 check_reduce_runs(
573 &[(1, 2), (3, 4), (4, 6), (7, 8)],
574 &[vec![(1, 2), (3, 4), (4, 6), (7, 8)]],
575 &[],
576 );
577
578 check_reduce_runs(
581 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
582 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
583 &[(5, 6), (3, 4), (3, 6)], );
585
586 check_reduce_runs(
589 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
590 &[vec![(1, 2), (3, 6), (7, 8), (8, 9)], vec![(3, 4), (5, 6)]],
591 &[(3, 4), (3, 6), (5, 6)],
592 );
593
594 check_reduce_runs(
598 &[
599 (10, 20),
600 (30, 40),
601 (50, 60),
602 (50, 80),
603 (80, 90),
604 (80, 100),
605 (100, 110),
606 ],
607 &[
608 vec![(10, 20), (30, 40), (50, 80), (80, 100), (100, 110)],
609 vec![(50, 60), (80, 90)],
610 ],
611 &[(50, 80), (80, 100), (50, 60), (80, 90)],
612 );
613
614 check_reduce_runs(
619 &[(0, 10), (0, 11), (0, 12), (0, 13)],
620 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
621 &[(0, 10), (0, 11)],
622 );
623 }
624
625 #[test]
626 fn test_find_overlapping_items() {
627 let mut result = Vec::new();
628
629 find_overlapping_items(
631 &mut SortedRun::from(Vec::<MockFile>::new()),
632 &mut SortedRun::from(Vec::<MockFile>::new()),
633 &mut result,
634 );
635 assert_eq!(result, Vec::<MockFile>::new());
636
637 let files1 = build_items(&[(1, 3)]);
638 find_overlapping_items(
639 &mut SortedRun::from(files1.clone()),
640 &mut SortedRun::from(Vec::<MockFile>::new()),
641 &mut result,
642 );
643 assert_eq!(result, Vec::<MockFile>::new());
644
645 find_overlapping_items(
646 &mut SortedRun::from(Vec::<MockFile>::new()),
647 &mut SortedRun::from(files1.clone()),
648 &mut result,
649 );
650 assert_eq!(result, Vec::<MockFile>::new());
651
652 let files1 = build_items(&[(1, 3), (5, 7)]);
654 let files2 = build_items(&[(10, 12), (15, 20)]);
655 find_overlapping_items(
656 &mut SortedRun::from(files1),
657 &mut SortedRun::from(files2),
658 &mut result,
659 );
660 assert_eq!(result, Vec::<MockFile>::new());
661
662 let files1 = build_items(&[(1, 5)]);
664 let files2 = build_items(&[(3, 7)]);
665 find_overlapping_items(
666 &mut SortedRun::from(files1),
667 &mut SortedRun::from(files2),
668 &mut result,
669 );
670 assert_eq!(result.len(), 2);
671 assert_eq!(result[0].range(), (1, 5));
672 assert_eq!(result[1].range(), (3, 7));
673
674 let files1 = build_items(&[(1, 5), (8, 12), (15, 20)]);
676 let files2 = build_items(&[(3, 6), (7, 10), (18, 25)]);
677 find_overlapping_items(
678 &mut SortedRun::from(files1),
679 &mut SortedRun::from(files2),
680 &mut result,
681 );
682 assert_eq!(result.len(), 6);
683
684 let files1 = build_items(&[(1, 5)]);
686 let files2 = build_items(&[(5, 10)]); find_overlapping_items(
688 &mut SortedRun::from(files1),
689 &mut SortedRun::from(files2),
690 &mut result,
691 );
692 assert_eq!(result.len(), 2); let files1 = build_items(&[(1, 10)]);
696 let files2 = build_items(&[(3, 7)]);
697 find_overlapping_items(
698 &mut SortedRun::from(files1),
699 &mut SortedRun::from(files2),
700 &mut result,
701 );
702 assert_eq!(result.len(), 2);
703
704 let files1 = build_items(&[(1, 5)]);
706 let files2 = build_items(&[(1, 5)]);
707 find_overlapping_items(
708 &mut SortedRun::from(files1),
709 &mut SortedRun::from(files2),
710 &mut result,
711 );
712 assert_eq!(result.len(), 2);
713
714 let files1 = build_items(&[(5, 10), (1, 3)]); let files2 = build_items(&[(2, 7), (8, 12)]); find_overlapping_items(
718 &mut SortedRun::from(files1),
719 &mut SortedRun::from(files2),
720 &mut result,
721 );
722 assert_eq!(result.len(), 4); }
724
725 #[test]
726 fn test_merge_seq_files() {
727 let files = Vec::<MockFile>::new();
729 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
730
731 let files = build_items(&[(1, 5)]);
733 assert_eq!(merge_seq_files(&files, None), Vec::<MockFile>::new());
734
735 let files = build_items_with_size(&[(1, 2, 10), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
737 let result = merge_seq_files(&files, None);
738 assert_eq!(result.len(), 3);
739 assert_eq!(result[0].size, 1);
740 assert_eq!(result[1].size, 1);
741 assert_eq!(result[2].size, 1);
742
743 let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
745 let result = merge_seq_files(&files, Some(20));
746 assert_eq!(result.len(), 4); let files = build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 5), (7, 8, 5)]);
750 let result = merge_seq_files(&files, Some(10));
751 assert_eq!(result.len(), 2); let files = build_items_with_size(&[(1, 2, 2), (3, 4, 3), (5, 6, 4), (7, 8, 10)]);
755 let result = merge_seq_files(&files, Some(10));
756 assert_eq!(result.len(), 3); let files =
761 build_items_with_size(&[(1, 2, 5), (3, 4, 5), (5, 6, 10), (7, 8, 1), (9, 10, 1)]);
762 let result = merge_seq_files(&files, Some(12));
763 assert_eq!(result.len(), 2);
764 assert_eq!(result[0].size, 5);
765 assert_eq!(result[1].size, 5);
766
767 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1), (5, 6, 1), (7, 8, 1)]);
769 let result = merge_seq_files(&files, Some(10));
770 assert_eq!(result.len(), 3); assert_eq!(result[0].size, 1);
772 assert_eq!(result[1].size, 1);
773 assert_eq!(result[2].size, 1);
774
775 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
776 let result = merge_seq_files(&files, Some(200));
777 assert_eq!(result.len(), 4);
778
779 let files = build_items_with_size(&[(1, 2, 160), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
780 let result = merge_seq_files(&files, None);
781 assert_eq!(result.len(), 3);
782 assert_eq!(result[0].size, 20);
783 assert_eq!(result[1].size, 20);
784 assert_eq!(result[2].size, 20);
785
786 let files = build_items_with_size(&[(1, 2, 100), (3, 4, 1)]);
787 let result = merge_seq_files(&files, Some(200));
788 assert_eq!(result.len(), 2);
789 assert_eq!(result[0].size, 100);
790 assert_eq!(result[1].size, 1);
791
792 let files = build_items_with_size(&[(1, 2, 20), (3, 4, 20), (5, 6, 20), (7, 8, 20)]);
793 let result = merge_seq_files(&files, Some(40));
794 assert_eq!(result.len(), 2);
795 assert_eq!(result[0].start, 1);
796 assert_eq!(result[1].start, 3);
797 }
798}