1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::timestamp::TimeUnit;
23use common_time::timestamp_millis::BucketAligned;
24use common_time::Timestamp;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
32};
33use crate::compaction::{get_expired_ssts, CompactionOutput};
34use crate::sst::file::{overlaps, FileHandle, Level};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39#[derive(Debug)]
42pub struct TwcsPicker {
43 pub trigger_file_num: usize,
45 pub time_window_seconds: Option<i64>,
47 pub max_output_file_size: Option<u64>,
49 pub append_mode: bool,
51}
52
53impl TwcsPicker {
54 fn build_output(
56 &self,
57 region_id: RegionId,
58 time_windows: &mut BTreeMap<i64, Window>,
59 active_window: Option<i64>,
60 ) -> Vec<CompactionOutput> {
61 let mut output = vec![];
62 for (window, files) in time_windows {
63 if files.files.is_empty() {
64 continue;
65 }
66 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67
68 if self.append_mode {
70 if let Some(max_size) = self.max_output_file_size {
71 let (kept_files, ignored_files) = files_to_merge
72 .into_iter()
73 .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
74 files_to_merge = kept_files;
75 info!(
76 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
77 ignored_files.len(),
78 region_id,
79 window,
80 max_size
81 );
82 }
83 }
84
85 let sorted_runs = find_sorted_runs(&mut files_to_merge);
86 let found_runs = sorted_runs.len();
87 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
90 if found_runs == 0 {
91 return output;
92 }
93
94 let inputs = if found_runs > 1 {
95 reduce_runs(sorted_runs)
96 } else {
97 let run = sorted_runs.last().unwrap();
98 if run.items().len() < self.trigger_file_num {
99 continue;
100 }
101 merge_seq_files(run.items(), self.max_output_file_size)
103 };
104
105 if !inputs.is_empty() {
106 log_pick_result(
107 region_id,
108 *window,
109 active_window,
110 found_runs,
111 files.files.len(),
112 self.max_output_file_size,
113 filter_deleted,
114 &inputs,
115 );
116 output.push(CompactionOutput {
117 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
119 filter_deleted,
120 output_time_range: None, });
122 }
123 }
124 output
125 }
126}
127
128#[allow(clippy::too_many_arguments)]
129fn log_pick_result(
130 region_id: RegionId,
131 window: i64,
132 active_window: Option<i64>,
133 found_runs: usize,
134 file_num: usize,
135 max_output_file_size: Option<u64>,
136 filter_deleted: bool,
137 inputs: &[FileGroup],
138) {
139 let input_file_str: Vec<String> = inputs
140 .iter()
141 .map(|f| {
142 let range = f.range();
143 let start = range.0.to_iso8601_string();
144 let end = range.1.to_iso8601_string();
145 let num_rows = f.num_rows();
146 format!(
147 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
148 f.file_ids(),
149 start,
150 end,
151 ReadableSize(f.size() as u64),
152 num_rows
153 )
154 })
155 .collect();
156 let window_str = Timestamp::new_second(window).to_iso8601_string();
157 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
158 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
159 info!(
160 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
161 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
162 input files: {:?}",
163 region_id,
164 window_str,
165 active_window_str,
166 found_runs,
167 file_num,
168 max_output_file_size,
169 filter_deleted,
170 input_file_str
171 );
172}
173
174impl Picker for TwcsPicker {
175 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
176 let region_id = compaction_region.region_id;
177 let levels = compaction_region.current_version.ssts.levels();
178
179 let expired_ssts =
180 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
181 if !expired_ssts.is_empty() {
182 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
183 expired_ssts.iter().for_each(|f| f.set_compacting(true));
185 }
186
187 let compaction_time_window = compaction_region
188 .current_version
189 .compaction_time_window
190 .map(|window| window.as_secs() as i64);
191 let time_window_size = compaction_time_window
192 .or(self.time_window_seconds)
193 .unwrap_or_else(|| {
194 let inferred = infer_time_bucket(levels[0].files());
195 info!(
196 "Compaction window for region {} is not present, inferring from files: {:?}",
197 region_id, inferred
198 );
199 inferred
200 });
201
202 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
204 let mut windows =
206 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
207 let outputs = self.build_output(region_id, &mut windows, active_window);
208
209 if outputs.is_empty() && expired_ssts.is_empty() {
210 return None;
211 }
212
213 let max_file_size = self.max_output_file_size.map(|v| v as usize);
214 Some(PickerOutput {
215 outputs,
216 expired_ssts,
217 time_window_size,
218 max_file_size,
219 })
220 }
221}
222
223struct Window {
224 start: Timestamp,
225 end: Timestamp,
226 files: HashMap<Option<NonZeroU64>, FileGroup>,
229 time_window: i64,
230 overlapping: bool,
231}
232
233impl Window {
234 fn new_with_file(file: FileHandle) -> Self {
236 let (start, end) = file.time_range();
237 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
238 Self {
239 start,
240 end,
241 files,
242 time_window: 0,
243 overlapping: false,
244 }
245 }
246
247 fn range(&self) -> (Timestamp, Timestamp) {
249 (self.start, self.end)
250 }
251
252 fn add_file(&mut self, file: FileHandle) {
254 let (start, end) = file.time_range();
255 self.start = self.start.min(start);
256 self.end = self.end.max(end);
257
258 match self.files.entry(file.meta_ref().sequence) {
259 Entry::Occupied(mut o) => {
260 o.get_mut().add_file(file);
261 }
262 Entry::Vacant(v) => {
263 v.insert(FileGroup::new_with_file(file));
264 }
265 }
266 }
267
268 fn files(&self) -> impl Iterator<Item = &FileGroup> {
269 self.files.values()
270 }
271}
272
273fn assign_to_windows<'a>(
275 files: impl Iterator<Item = &'a FileHandle>,
276 time_window_size: i64,
277) -> BTreeMap<i64, Window> {
278 let mut windows: HashMap<i64, Window> = HashMap::new();
279 for f in files {
281 if f.compacting() {
282 continue;
283 }
284 let (_, end) = f.time_range();
285 let time_window = end
286 .convert_to(TimeUnit::Second)
287 .unwrap()
288 .value()
289 .align_to_ceil_by_bucket(time_window_size)
290 .unwrap_or(i64::MIN);
291
292 match windows.entry(time_window) {
293 Entry::Occupied(mut e) => {
294 e.get_mut().add_file(f.clone());
295 }
296 Entry::Vacant(e) => {
297 let mut window = Window::new_with_file(f.clone());
298 window.time_window = time_window;
299 e.insert(window);
300 }
301 }
302 }
303 if windows.is_empty() {
304 return BTreeMap::new();
305 }
306
307 let mut windows = windows.into_values().collect::<Vec<_>>();
308 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
309
310 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
313 let next_range = windows[idx].range();
314 if overlaps(¤t_range, &next_range) {
315 windows[idx - 1].overlapping = true;
316 windows[idx].overlapping = true;
317 }
318 current_range = (
319 current_range.0.min(next_range.0),
320 current_range.1.max(next_range.1),
321 );
322 }
323
324 windows.into_iter().map(|w| (w.time_window, w)).collect()
325}
326
327fn find_latest_window_in_seconds<'a>(
330 files: impl Iterator<Item = &'a FileHandle>,
331 time_window_size: i64,
332) -> Option<i64> {
333 let mut latest_timestamp = None;
334 for f in files {
335 let (_, end) = f.time_range();
336 if let Some(latest) = latest_timestamp {
337 if end > latest {
338 latest_timestamp = Some(end);
339 }
340 } else {
341 latest_timestamp = Some(end);
342 }
343 }
344 latest_timestamp
345 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
346 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
347}
348
349#[cfg(test)]
350mod tests {
351 use std::collections::HashSet;
352
353 use super::*;
354 use crate::compaction::test_util::{
355 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
356 };
357 use crate::sst::file::{FileId, Level};
358
359 #[test]
360 fn test_get_latest_window_in_seconds() {
361 assert_eq!(
362 Some(1),
363 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
364 );
365 assert_eq!(
366 Some(1),
367 find_latest_window_in_seconds(
368 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
369 1
370 )
371 );
372
373 assert_eq!(
374 Some(-9223372036854000),
375 find_latest_window_in_seconds(
376 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
377 3600,
378 )
379 );
380
381 assert_eq!(
382 (i64::MAX / 10000000 + 1) * 10000,
383 find_latest_window_in_seconds(
384 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
385 10000,
386 )
387 .unwrap()
388 );
389
390 assert_eq!(
391 Some((i64::MAX / 3600000 + 1) * 3600),
392 find_latest_window_in_seconds(
393 [
394 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
395 new_file_handle(FileId::random(), 0, 1000, 0)
396 ]
397 .iter(),
398 3600
399 )
400 );
401 }
402
403 #[test]
404 fn test_assign_to_windows() {
405 let windows = assign_to_windows(
406 [
407 new_file_handle(FileId::random(), 0, 999, 0),
408 new_file_handle(FileId::random(), 0, 999, 0),
409 new_file_handle(FileId::random(), 0, 999, 0),
410 new_file_handle(FileId::random(), 0, 999, 0),
411 new_file_handle(FileId::random(), 0, 999, 0),
412 ]
413 .iter(),
414 3,
415 );
416 let fgs = &windows.get(&0).unwrap().files;
417 assert_eq!(1, fgs.len());
418 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
419
420 let files = [FileId::random(); 3];
421 let windows = assign_to_windows(
422 [
423 new_file_handle(files[0], -2000, -3, 0),
424 new_file_handle(files[1], 0, 2999, 0),
425 new_file_handle(files[2], 50, 10001, 0),
426 ]
427 .iter(),
428 3,
429 );
430 assert_eq!(
431 files[0],
432 windows.get(&0).unwrap().files().next().unwrap().files()[0]
433 .file_id()
434 .file_id()
435 );
436 assert_eq!(
437 files[1],
438 windows.get(&3).unwrap().files().next().unwrap().files()[0]
439 .file_id()
440 .file_id()
441 );
442 assert_eq!(
443 files[2],
444 windows.get(&12).unwrap().files().next().unwrap().files()[0]
445 .file_id()
446 .file_id()
447 );
448 }
449
450 #[test]
451 fn test_assign_file_groups_to_windows() {
452 let files = [
453 FileId::random(),
454 FileId::random(),
455 FileId::random(),
456 FileId::random(),
457 ];
458 let windows = assign_to_windows(
459 [
460 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
461 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
462 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
463 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
464 ]
465 .iter(),
466 3,
467 );
468 assert_eq!(windows.len(), 1);
469 let fgs = &windows.get(&0).unwrap().files;
470 assert_eq!(2, fgs.len());
471 assert_eq!(
472 fgs.get(&NonZeroU64::new(1))
473 .unwrap()
474 .files()
475 .iter()
476 .map(|f| f.file_id().file_id())
477 .collect::<HashSet<_>>(),
478 [files[0], files[1]].into_iter().collect()
479 );
480 assert_eq!(
481 fgs.get(&NonZeroU64::new(2))
482 .unwrap()
483 .files()
484 .iter()
485 .map(|f| f.file_id().file_id())
486 .collect::<HashSet<_>>(),
487 [files[2], files[3]].into_iter().collect()
488 );
489 }
490
491 #[test]
492 fn test_assign_compacting_to_windows() {
493 let files = [
494 new_file_handle(FileId::random(), 0, 999, 0),
495 new_file_handle(FileId::random(), 0, 999, 0),
496 new_file_handle(FileId::random(), 0, 999, 0),
497 new_file_handle(FileId::random(), 0, 999, 0),
498 new_file_handle(FileId::random(), 0, 999, 0),
499 ];
500 files[0].set_compacting(true);
501 files[2].set_compacting(true);
502 let mut windows = assign_to_windows(files.iter(), 3);
503 let window0 = windows.remove(&0).unwrap();
504 assert_eq!(1, window0.files.len());
505 let candidates = window0
506 .files
507 .into_values()
508 .flat_map(|fg| fg.into_files())
509 .map(|f| f.file_id().file_id())
510 .collect::<HashSet<_>>();
511 assert_eq!(candidates.len(), 3);
512 assert_eq!(
513 candidates,
514 [
515 files[1].file_id().file_id(),
516 files[3].file_id().file_id(),
517 files[4].file_id().file_id()
518 ]
519 .into_iter()
520 .collect::<HashSet<_>>()
521 );
522 }
523
524 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
526
527 fn check_assign_to_windows_with_overlapping(
528 file_time_ranges: &[(i64, i64)],
529 time_window: i64,
530 expected_files: &[ExpectedWindowSpec],
531 ) {
532 let files: Vec<_> = (0..file_time_ranges.len())
533 .map(|_| FileId::random())
534 .collect();
535
536 let file_handles = files
537 .iter()
538 .zip(file_time_ranges.iter())
539 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
540 .collect::<Vec<_>>();
541
542 let windows = assign_to_windows(file_handles.iter(), time_window);
543
544 for (expected_window, overlapping, window_files) in expected_files {
545 let actual_window = windows.get(expected_window).unwrap();
546 assert_eq!(*overlapping, actual_window.overlapping);
547 let mut file_ranges = actual_window
548 .files
549 .iter()
550 .flat_map(|(_, f)| {
551 f.files().iter().map(|f| {
552 let (s, e) = f.time_range();
553 (s.value(), e.value())
554 })
555 })
556 .collect::<Vec<_>>();
557 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
558 assert_eq!(window_files, &file_ranges);
559 }
560 }
561
562 #[test]
563 fn test_assign_to_windows_with_overlapping() {
564 check_assign_to_windows_with_overlapping(
565 &[(0, 999), (1000, 1999), (2000, 2999)],
566 2,
567 &[
568 (0, false, vec![(0, 999)]),
569 (2, false, vec![(1000, 1999), (2000, 2999)]),
570 ],
571 );
572
573 check_assign_to_windows_with_overlapping(
574 &[(0, 1), (0, 999), (100, 2999)],
575 2,
576 &[
577 (0, true, vec![(0, 1), (0, 999)]),
578 (2, true, vec![(100, 2999)]),
579 ],
580 );
581
582 check_assign_to_windows_with_overlapping(
583 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
584 2,
585 &[
586 (0, false, vec![(0, 999)]),
587 (2, false, vec![(1000, 1999), (2000, 2999)]),
588 (4, false, vec![(3000, 3999)]),
589 ],
590 );
591
592 check_assign_to_windows_with_overlapping(
593 &[
594 (0, 999),
595 (1000, 1999),
596 (2000, 2999),
597 (3000, 3999),
598 (0, 3999),
599 ],
600 2,
601 &[
602 (0, true, vec![(0, 999)]),
603 (2, true, vec![(1000, 1999), (2000, 2999)]),
604 (4, true, vec![(0, 3999), (3000, 3999)]),
605 ],
606 );
607
608 check_assign_to_windows_with_overlapping(
609 &[
610 (0, 999),
611 (1000, 1999),
612 (2000, 2999),
613 (3000, 3999),
614 (1999, 3999),
615 ],
616 2,
617 &[
618 (0, false, vec![(0, 999)]),
619 (2, true, vec![(1000, 1999), (2000, 2999)]),
620 (4, true, vec![(1999, 3999), (3000, 3999)]),
621 ],
622 );
623
624 check_assign_to_windows_with_overlapping(
625 &[
626 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
632 2,
633 &[
634 (0, false, vec![(0, 999)]),
636 (2, true, vec![(1000, 1999), (2000, 2999)]),
637 (4, true, vec![(2999, 3999), (3000, 3999)]),
638 ],
639 );
640
641 check_assign_to_windows_with_overlapping(
642 &[
643 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
649 2,
650 &[
651 (0, true, vec![(0, 999)]),
653 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
654 (4, false, vec![(3000, 3999)]),
655 ],
656 );
657 }
658
659 struct CompactionPickerTestCase {
660 window_size: i64,
661 input_files: Vec<FileHandle>,
662 expected_outputs: Vec<ExpectedOutput>,
663 }
664
665 impl CompactionPickerTestCase {
666 fn check(&self) {
667 let file_id_to_idx = self
668 .input_files
669 .iter()
670 .enumerate()
671 .map(|(idx, file)| (file.file_id(), idx))
672 .collect::<HashMap<_, _>>();
673 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
674 let active_window =
675 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
676 let output = TwcsPicker {
677 trigger_file_num: 4,
678 time_window_seconds: None,
679 max_output_file_size: None,
680 append_mode: false,
681 }
682 .build_output(RegionId::from_u64(0), &mut windows, active_window);
683
684 let output = output
685 .iter()
686 .map(|o| {
687 let input_file_ids = o
688 .inputs
689 .iter()
690 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
691 .collect::<HashSet<_>>();
692 (input_file_ids, o.output_level)
693 })
694 .collect::<Vec<_>>();
695
696 let expected = self
697 .expected_outputs
698 .iter()
699 .map(|o| {
700 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
701 (input_file_ids, o.output_level)
702 })
703 .collect::<Vec<_>>();
704 assert_eq!(expected, output);
705 }
706 }
707
708 struct ExpectedOutput {
709 input_files: Vec<usize>,
710 output_level: Level,
711 }
712
713 #[test]
714 fn test_build_twcs_output() {
715 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
716
717 CompactionPickerTestCase {
719 window_size: 3,
720 input_files: [
721 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
722 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
723 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
726 .to_vec(),
727 expected_outputs: vec![
728 ExpectedOutput {
729 input_files: vec![0, 1],
730 output_level: 1,
731 },
732 ExpectedOutput {
733 input_files: vec![2, 3],
734 output_level: 1,
735 },
736 ],
737 }
738 .check();
739
740 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
747 CompactionPickerTestCase {
748 window_size: 3,
749 input_files: [
750 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
751 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
752 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
753 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
754 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
755 ]
756 .to_vec(),
757 expected_outputs: vec![
758 ExpectedOutput {
759 input_files: vec![0, 1],
760 output_level: 1,
761 },
762 ExpectedOutput {
763 input_files: vec![2, 4],
764 output_level: 1,
765 },
766 ],
767 }
768 .check();
769
770 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
774 CompactionPickerTestCase {
775 window_size: 3,
776 input_files: [
777 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
778 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
779 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
780 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
781 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
782 ]
783 .to_vec(),
784 expected_outputs: vec![ExpectedOutput {
785 input_files: vec![0, 1, 4],
786 output_level: 1,
787 }],
788 }
789 .check();
790 }
791
792 #[test]
793 fn test_append_mode_filter_large_files() {
794 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
795 let max_output_file_size = 1000u64;
796
797 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
799 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
800 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
801 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
802
803 let mut files_to_merge = vec![
805 FileGroup::new_with_file(small_file_1),
806 FileGroup::new_with_file(large_file_1),
807 FileGroup::new_with_file(small_file_2),
808 FileGroup::new_with_file(large_file_2),
809 ];
810
811 let original_count = files_to_merge.len();
813
814 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
816
817 assert_eq!(files_to_merge.len(), 2);
819 assert_eq!(original_count, 4);
820
821 for fg in &files_to_merge {
823 assert!(
824 fg.size() <= max_output_file_size as usize,
825 "File size {} should be <= {}",
826 fg.size(),
827 max_output_file_size
828 );
829 }
830 }
831
832 }