1use std::cmp::Ordering;
19
20use common_base::BitVec;
21use common_time::Timestamp;
22use itertools::Itertools;
23use smallvec::{smallvec, SmallVec};
24
25use crate::sst::file::FileHandle;
26
27pub(crate) trait Ranged {
29 type BoundType: Ord + Copy;
30
31 fn range(&self) -> (Self::BoundType, Self::BoundType);
33
34 fn overlap<T>(&self, other: &T) -> bool
35 where
36 T: Ranged<BoundType = Self::BoundType>,
37 {
38 let (lhs_start, lhs_end) = self.range();
39 let (rhs_start, rhs_end) = other.range();
40 match lhs_start.cmp(&rhs_start) {
41 Ordering::Less => lhs_end >= rhs_start,
42 Ordering::Equal => true,
43 Ordering::Greater => lhs_start <= rhs_end,
44 }
45 }
46}
47
48fn sort_ranged_items<T: Ranged>(values: &mut [T]) {
50 values.sort_unstable_by(|l, r| {
51 let (l_start, l_end) = l.range();
52 let (r_start, r_end) = r.range();
53 l_start.cmp(&r_start).then(r_end.cmp(&l_end))
54 });
55}
56
57pub(crate) trait Item: Ranged + Clone {
59 fn size(&self) -> usize;
61}
62
63impl Ranged for FileHandle {
64 type BoundType = Timestamp;
65
66 fn range(&self) -> (Self::BoundType, Self::BoundType) {
67 self.time_range()
68 }
69}
70
71impl Item for FileHandle {
72 fn size(&self) -> usize {
73 self.size() as usize
74 }
75}
76
77#[derive(Debug, Clone)]
78struct MergeItems<T: Item> {
79 items: SmallVec<[T; 4]>,
80 start: T::BoundType,
81 end: T::BoundType,
82 size: usize,
83}
84
85impl<T: Item> Ranged for MergeItems<T> {
86 type BoundType = T::BoundType;
87
88 fn range(&self) -> (Self::BoundType, Self::BoundType) {
89 (self.start, self.end)
90 }
91}
92
93impl<T: Item> MergeItems<T> {
94 pub fn new_unmerged(val: T) -> Self {
96 let (start, end) = val.range();
97 let size = val.size();
98 Self {
99 items: smallvec![val],
100 start,
101 end,
102 size,
103 }
104 }
105
106 pub(crate) fn range(&self) -> (T::BoundType, T::BoundType) {
108 (self.start, self.end)
109 }
110
111 pub(crate) fn merge(self, other: Self) -> Self {
113 let start = self.start.min(other.start);
114 let end = self.end.max(other.end);
115 let size = self.size + other.size;
116
117 let mut items = SmallVec::with_capacity(self.items.len() + other.items.len());
118 items.extend(self.items);
119 items.extend(other.items);
120 Self {
121 start,
122 end,
123 size,
124 items,
125 }
126 }
127
128 pub fn merged(&self) -> bool {
130 self.items.len() > 1
131 }
132}
133
134#[derive(Debug, Clone)]
136pub(crate) struct SortedRun<T: Item> {
137 items: Vec<MergeItems<T>>,
139 penalty: usize,
141 start: Option<T::BoundType>,
143 end: Option<T::BoundType>,
145}
146
147impl<T> Default for SortedRun<T>
148where
149 T: Item,
150{
151 fn default() -> Self {
152 Self {
153 items: vec![],
154 penalty: 0,
155 start: None,
156 end: None,
157 }
158 }
159}
160
161impl<T> SortedRun<T>
162where
163 T: Item,
164{
165 fn push_item(&mut self, t: MergeItems<T>) {
166 let (file_start, file_end) = t.range();
167 if t.merged() {
168 self.penalty += t.size;
169 }
170 self.items.push(t);
171 self.start = Some(self.start.map_or(file_start, |v| v.min(file_start)));
172 self.end = Some(self.end.map_or(file_end, |v| v.max(file_end)));
173 }
174}
175
176pub(crate) fn find_sorted_runs<T>(items: &mut [T]) -> Vec<SortedRun<T>>
178where
179 T: Item,
180{
181 if items.is_empty() {
182 return vec![];
183 }
184 sort_ranged_items(items);
186
187 let mut current_run = SortedRun::default();
188 let mut runs = vec![];
189
190 let mut selection = BitVec::repeat(false, items.len());
191 while !selection.all() {
192 for (item, mut selected) in items.iter().zip(selection.iter_mut()) {
194 if *selected {
195 continue;
197 }
198 let current_item = MergeItems::new_unmerged(item.clone());
199 match current_run.items.last() {
200 None => {
201 selected.set(true);
203 current_run.push_item(current_item);
204 }
205 Some(last) => {
206 if !last.overlap(¤t_item) {
209 selected.set(true);
211 current_run.push_item(current_item);
212 }
213 }
214 }
215 }
216 runs.push(std::mem::take(&mut current_run));
218 }
219 runs
220}
221
222fn merge_all_runs<T: Item>(runs: Vec<SortedRun<T>>) -> SortedRun<T> {
223 assert!(!runs.is_empty());
224 let mut all_items = runs
225 .into_iter()
226 .flat_map(|r| r.items.into_iter())
227 .collect::<Vec<_>>();
228
229 sort_ranged_items(&mut all_items);
230
231 let mut res = SortedRun::default();
232 let mut iter = all_items.into_iter();
233 let mut current_item = iter.next().unwrap();
235
236 for item in iter {
237 if current_item.overlap(&item) {
238 current_item = current_item.merge(item);
239 } else {
240 res.push_item(current_item);
241 current_item = item;
242 }
243 }
244 res.push_item(current_item);
245 res
246}
247
248pub(crate) fn reduce_runs<T: Item>(runs: Vec<SortedRun<T>>, target: usize) -> Vec<Vec<T>> {
251 assert_ne!(target, 0);
252 if target >= runs.len() {
253 return vec![];
255 }
256
257 let k = runs.len() + 1 - target;
258 runs.into_iter()
259 .combinations(k) .map(|runs_to_merge| merge_all_runs(runs_to_merge)) .min_by(|p, r| p.penalty.cmp(&r.penalty)) .unwrap() .items
264 .into_iter()
265 .filter(|m| m.merged()) .map(|m| m.items.to_vec())
267 .collect()
268}
269
270#[cfg(test)]
271mod tests {
272 use std::collections::HashSet;
273
274 use super::*;
275
276 #[derive(Clone, Debug)]
277 struct MockFile {
278 start: i64,
279 end: i64,
280 size: usize,
281 }
282
283 impl Ranged for MockFile {
284 type BoundType = i64;
285
286 fn range(&self) -> (Self::BoundType, Self::BoundType) {
287 (self.start, self.end)
288 }
289 }
290
291 impl Item for MockFile {
292 fn size(&self) -> usize {
293 self.size
294 }
295 }
296
297 fn build_items(ranges: &[(i64, i64)]) -> Vec<MockFile> {
298 ranges
299 .iter()
300 .map(|(start, end)| MockFile {
301 start: *start,
302 end: *end,
303 size: (*end - *start) as usize,
304 })
305 .collect()
306 }
307
308 fn check_sorted_runs(
309 ranges: &[(i64, i64)],
310 expected_runs: &[Vec<(i64, i64)>],
311 ) -> Vec<SortedRun<MockFile>> {
312 let mut files = build_items(ranges);
313 let runs = find_sorted_runs(&mut files);
314
315 let result_file_ranges: Vec<Vec<_>> = runs
316 .iter()
317 .map(|r| r.items.iter().map(|f| f.range()).collect())
318 .collect();
319 assert_eq!(&expected_runs, &result_file_ranges);
320 runs
321 }
322
323 #[test]
324 fn test_find_sorted_runs() {
325 check_sorted_runs(&[], &[]);
326 check_sorted_runs(&[(1, 1), (2, 2)], &[vec![(1, 1), (2, 2)]]);
327 check_sorted_runs(&[(1, 2)], &[vec![(1, 2)]]);
328 check_sorted_runs(&[(1, 2), (2, 3)], &[vec![(1, 2)], vec![(2, 3)]]);
329 check_sorted_runs(&[(1, 2), (3, 4)], &[vec![(1, 2), (3, 4)]]);
330 check_sorted_runs(&[(2, 4), (1, 3)], &[vec![(1, 3)], vec![(2, 4)]]);
331 check_sorted_runs(
332 &[(1, 3), (2, 4), (4, 5)],
333 &[vec![(1, 3), (4, 5)], vec![(2, 4)]],
334 );
335
336 check_sorted_runs(
337 &[(1, 2), (3, 4), (3, 5)],
338 &[vec![(1, 2), (3, 5)], vec![(3, 4)]],
339 );
340
341 check_sorted_runs(
342 &[(1, 3), (2, 4), (5, 6)],
343 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
344 );
345
346 check_sorted_runs(
347 &[(1, 2), (3, 5), (4, 6)],
348 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
349 );
350
351 check_sorted_runs(
352 &[(1, 2), (3, 4), (4, 6), (7, 8)],
353 &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
354 );
355 check_sorted_runs(
356 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
357 &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
358 );
359
360 check_sorted_runs(
361 &[(10, 19), (20, 21), (20, 29), (30, 39)],
362 &[vec![(10, 19), (20, 29), (30, 39)], vec![(20, 21)]],
363 );
364
365 check_sorted_runs(
366 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32), (32, 42)],
367 &[
368 vec![(10, 19), (20, 29), (30, 39)],
369 vec![(21, 22), (31, 32)],
370 vec![(32, 42)],
371 ],
372 );
373 }
374
375 fn check_merge_sorted_runs(
376 items: &[(i64, i64)],
377 expected_penalty: usize,
378 expected: &[Vec<(i64, i64)>],
379 ) {
380 let mut items = build_items(items);
381 let runs = find_sorted_runs(&mut items);
382 assert_eq!(2, runs.len());
383 let res = merge_all_runs(runs);
384 let penalty = res.penalty;
385 let ranges = res
386 .items
387 .into_iter()
388 .map(|i| {
389 i.items
390 .into_iter()
391 .map(|f| (f.start, f.end))
392 .sorted_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)))
393 .collect::<Vec<_>>()
394 })
395 .collect::<Vec<_>>();
396 assert_eq!(expected, &ranges);
397 assert_eq!(expected_penalty, penalty);
398 }
399
400 #[test]
401 fn test_merge_sorted_runs() {
402 check_merge_sorted_runs(&[(1, 2), (1, 3)], 3, &[vec![(1, 2), (1, 3)]]);
405
406 check_merge_sorted_runs(
409 &[(1, 2), (2, 3), (3, 4)],
410 3,
411 &[vec![(1, 2), (2, 3), (3, 4)]],
412 );
413
414 check_merge_sorted_runs(
417 &[(1, 10), (11, 20), (21, 30), (18, 18)],
418 9,
419 &[vec![(1, 10)], vec![(11, 20), (18, 18)], vec![(21, 30)]],
420 );
421
422 check_merge_sorted_runs(
425 &[(1, 3), (2, 4), (4, 5)],
426 5,
427 &[vec![(1, 3), (2, 4), (4, 5)]],
428 );
429
430 check_merge_sorted_runs(
433 &[(1, 2), (3, 4), (4, 6), (7, 8)],
434 3,
435 &[vec![(1, 2)], vec![(3, 4), (4, 6)], vec![(7, 8)]],
436 );
437
438 check_merge_sorted_runs(
442 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
443 7,
444 &[
445 vec![(1, 2)],
446 vec![(3, 4), (3, 6), (5, 6)],
447 vec![(7, 8), (8, 9)],
448 ],
449 );
450
451 check_merge_sorted_runs(
454 &[(10, 19), (20, 29), (21, 22), (30, 39), (31, 32)],
455 20,
456 &[
457 vec![(10, 19)],
458 vec![(20, 29), (21, 22)],
459 vec![(30, 39), (31, 32)],
460 ],
461 );
462
463 check_merge_sorted_runs(
466 &[(1, 10), (1, 10), (11, 20), (21, 30), (21, 30)],
467 36,
468 &[
469 vec![(1, 10), (1, 10)],
470 vec![(11, 20)],
471 vec![(21, 30), (21, 30)],
472 ],
473 );
474
475 check_merge_sorted_runs(
478 &[(1, 10), (11, 20), (21, 30), (22, 30)],
479 17,
480 &[vec![(1, 10)], vec![(11, 20)], vec![(21, 30), (22, 30)]],
481 );
482 }
483
484 fn check_merge_all_sorted_runs(
486 files: &[(i64, i64)],
487 expected_penalty: usize,
488 expected: &[Vec<(i64, i64)>],
489 ) {
490 let mut files = build_items(files);
491 let runs = find_sorted_runs(&mut files);
492 let result = merge_all_runs(runs);
493 assert_eq!(expected_penalty, result.penalty);
494 assert_eq!(expected.len(), result.items.len());
495 let res = result
496 .items
497 .iter()
498 .map(|i| {
499 let mut res = i.items.iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
500 res.sort_unstable_by(|l, r| l.0.cmp(&r.0));
501 res
502 })
503 .collect::<Vec<_>>();
504 assert_eq!(expected, &res);
505 }
506
507 #[test]
508 fn test_merge_all_sorted_runs() {
509 check_merge_all_sorted_runs(
512 &[(1, 2), (3, 4), (4, 10)],
513 7, &[vec![(1, 2)], vec![(3, 4), (4, 10)]],
515 );
516
517 check_merge_all_sorted_runs(
520 &[(1, 2), (3, 4), (5, 6), (4, 10)],
521 8, &[vec![(1, 2)], vec![(3, 4), (4, 10), (5, 6)]],
523 );
524
525 check_merge_all_sorted_runs(
529 &[(10, 20), (30, 40), (50, 60), (35, 55), (51, 61)],
530 50,
531 &[vec![(10, 20)], vec![(30, 40), (35, 55), (50, 60), (51, 61)]],
532 );
533 }
534
535 #[test]
536 fn test_sorted_runs_time_range() {
537 let mut files = build_items(&[(1, 2), (3, 4), (4, 10)]);
538 let runs = find_sorted_runs(&mut files);
539 assert_eq!(2, runs.len());
540 let SortedRun { start, end, .. } = &runs[0];
541 assert_eq!(Some(1), *start);
542 assert_eq!(Some(4), *end);
543
544 let SortedRun { start, end, .. } = &runs[1];
545 assert_eq!(Some(4), *start);
546 assert_eq!(Some(10), *end);
547 }
548
549 fn check_reduce_runs(
550 files: &[(i64, i64)],
551 expected_runs: &[Vec<(i64, i64)>],
552 target: usize,
553 expected: &[Vec<(i64, i64)>],
554 ) {
555 let runs = check_sorted_runs(files, expected_runs);
556 let files_to_merge = reduce_runs(runs, target);
557 let file_timestamps = files_to_merge
558 .into_iter()
559 .map(|f| {
560 let mut overlapping = f.into_iter().map(|f| (f.start, f.end)).collect::<Vec<_>>();
561 overlapping.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
562 overlapping
563 })
564 .collect::<HashSet<_>>();
565
566 let expected = expected.iter().cloned().collect::<HashSet<_>>();
567 assert_eq!(&expected, &file_timestamps);
568 }
569
570 #[test]
571 fn test_reduce_runs() {
572 check_reduce_runs(
575 &[(1, 3), (2, 4), (5, 6)],
576 &[vec![(1, 3), (5, 6)], vec![(2, 4)]],
577 1,
578 &[vec![(1, 3), (2, 4)]],
579 );
580
581 check_reduce_runs(
584 &[(1, 2), (3, 5), (4, 6)],
585 &[vec![(1, 2), (3, 5)], vec![(4, 6)]],
586 1,
587 &[vec![(3, 5), (4, 6)]],
588 );
589
590 check_reduce_runs(
594 &[(1, 4), (2, 5), (3, 6)],
595 &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
596 1,
597 &[vec![(1, 4), (2, 5), (3, 6)]],
598 );
599 check_reduce_runs(
600 &[(1, 4), (2, 5), (3, 6)],
601 &[vec![(1, 4)], vec![(2, 5)], vec![(3, 6)]],
602 2,
603 &[vec![(1, 4), (2, 5)]],
604 );
605
606 check_reduce_runs(
609 &[(1, 2), (3, 4), (4, 6), (7, 8)],
610 &[vec![(1, 2), (3, 4), (7, 8)], vec![(4, 6)]],
611 1,
612 &[vec![(3, 4), (4, 6)]],
613 );
614
615 check_reduce_runs(
618 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
619 &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
620 2,
621 &[], );
623
624 check_reduce_runs(
627 &[(1, 2), (3, 4), (5, 6), (3, 6), (7, 8), (8, 9)],
628 &[vec![(1, 2), (3, 6), (7, 8)], vec![(3, 4), (5, 6), (8, 9)]],
629 1,
630 &[vec![(3, 4), (3, 6), (5, 6)], vec![(7, 8), (8, 9)]],
631 );
632
633 check_reduce_runs(
637 &[
638 (10, 20),
639 (30, 40),
640 (50, 60),
641 (50, 80),
642 (80, 90),
643 (80, 100),
644 (100, 110),
645 ],
646 &[
647 vec![(10, 20), (30, 40), (50, 80), (100, 110)],
648 vec![(50, 60), (80, 100)],
649 vec![(80, 90)],
650 ],
651 2,
652 &[vec![(80, 90), (80, 100)]],
653 );
654
655 check_reduce_runs(
659 &[
660 (10, 20),
661 (30, 40),
662 (50, 60),
663 (50, 80),
664 (80, 90),
665 (80, 100),
666 (100, 110),
667 ],
668 &[
669 vec![(10, 20), (30, 40), (50, 80), (100, 110)],
670 vec![(50, 60), (80, 100)],
671 vec![(80, 90)],
672 ],
673 1,
674 &[vec![(50, 60), (50, 80), (80, 90), (80, 100), (100, 110)]],
675 );
676
677 check_reduce_runs(
682 &[(0, 10), (0, 11), (0, 12), (0, 13)],
683 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
684 4,
685 &[],
686 );
687 check_reduce_runs(
689 &[(0, 10), (0, 11), (0, 12), (0, 13)],
690 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
691 3,
692 &[vec![(0, 10), (0, 11)]],
693 );
694 check_reduce_runs(
696 &[(0, 10), (0, 11), (0, 12), (0, 13)],
697 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
698 2,
699 &[vec![(0, 10), (0, 11), (0, 12)]],
700 );
701 check_reduce_runs(
703 &[(0, 10), (0, 11), (0, 12), (0, 13)],
704 &[vec![(0, 13)], vec![(0, 12)], vec![(0, 11)], vec![(0, 10)]],
705 1,
706 &[vec![(0, 10), (0, 11), (0, 12), (0, 13)]],
707 );
708 }
709}