1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::info;
22use common_time::Timestamp;
23use common_time::timestamp::TimeUnit;
24use common_time::timestamp_millis::BucketAligned;
25use store_api::storage::RegionId;
26
27use crate::compaction::buckets::infer_time_bucket;
28use crate::compaction::compactor::CompactionRegion;
29use crate::compaction::picker::{Picker, PickerOutput};
30use crate::compaction::run::{
31 FileGroup, Item, Ranged, find_sorted_runs, merge_seq_files, reduce_runs,
32};
33use crate::compaction::{CompactionOutput, get_expired_ssts};
34use crate::sst::file::{FileHandle, Level, overlaps};
35use crate::sst::version::LevelMeta;
36
37const LEVEL_COMPACTED: Level = 1;
38
39#[derive(Debug)]
42pub struct TwcsPicker {
43 pub trigger_file_num: usize,
45 pub time_window_seconds: Option<i64>,
47 pub max_output_file_size: Option<u64>,
49 pub append_mode: bool,
51}
52
53impl TwcsPicker {
54 fn build_output(
56 &self,
57 region_id: RegionId,
58 time_windows: &mut BTreeMap<i64, Window>,
59 active_window: Option<i64>,
60 ) -> Vec<CompactionOutput> {
61 let mut output = vec![];
62 for (window, files) in time_windows {
63 if files.files.is_empty() {
64 continue;
65 }
66 let mut files_to_merge: Vec<_> = files.files().cloned().collect();
67
68 if self.append_mode
70 && let Some(max_size) = self.max_output_file_size
71 {
72 let (kept_files, ignored_files) = files_to_merge
73 .into_iter()
74 .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
75 files_to_merge = kept_files;
76 info!(
77 "Skipped {} large files in append mode for region {}, window {}, max_size: {}",
78 ignored_files.len(),
79 region_id,
80 window,
81 max_size
82 );
83 }
84
85 let sorted_runs = find_sorted_runs(&mut files_to_merge);
86 let found_runs = sorted_runs.len();
87 let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
90 if found_runs == 0 {
91 return output;
92 }
93
94 let inputs = if found_runs > 1 {
95 reduce_runs(sorted_runs)
96 } else {
97 let run = sorted_runs.last().unwrap();
98 if run.items().len() < self.trigger_file_num {
99 continue;
100 }
101 merge_seq_files(run.items(), self.max_output_file_size)
103 };
104
105 if !inputs.is_empty() {
106 log_pick_result(
107 region_id,
108 *window,
109 active_window,
110 found_runs,
111 files.files.len(),
112 self.max_output_file_size,
113 filter_deleted,
114 &inputs,
115 );
116 output.push(CompactionOutput {
117 output_level: LEVEL_COMPACTED, inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
119 filter_deleted,
120 output_time_range: None, });
122 }
123 }
124 output
125 }
126}
127
128#[allow(clippy::too_many_arguments)]
129fn log_pick_result(
130 region_id: RegionId,
131 window: i64,
132 active_window: Option<i64>,
133 found_runs: usize,
134 file_num: usize,
135 max_output_file_size: Option<u64>,
136 filter_deleted: bool,
137 inputs: &[FileGroup],
138) {
139 let input_file_str: Vec<String> = inputs
140 .iter()
141 .map(|f| {
142 let range = f.range();
143 let start = range.0.to_iso8601_string();
144 let end = range.1.to_iso8601_string();
145 let num_rows = f.num_rows();
146 format!(
147 "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
148 f.file_ids(),
149 start,
150 end,
151 ReadableSize(f.size() as u64),
152 num_rows
153 )
154 })
155 .collect();
156 let window_str = Timestamp::new_second(window).to_iso8601_string();
157 let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
158 let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
159 info!(
160 "Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
161 found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
162 input files: {:?}",
163 region_id,
164 window_str,
165 active_window_str,
166 found_runs,
167 file_num,
168 max_output_file_size,
169 filter_deleted,
170 input_file_str
171 );
172}
173
174impl Picker for TwcsPicker {
175 fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
176 let region_id = compaction_region.region_id;
177 let levels = compaction_region.current_version.ssts.levels();
178
179 let expired_ssts =
180 get_expired_ssts(levels, compaction_region.ttl, Timestamp::current_millis());
181 if !expired_ssts.is_empty() {
182 info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
183 expired_ssts.iter().for_each(|f| f.set_compacting(true));
185 }
186
187 let compaction_time_window = compaction_region
188 .current_version
189 .compaction_time_window
190 .map(|window| window.as_secs() as i64);
191 let time_window_size = compaction_time_window
192 .or(self.time_window_seconds)
193 .unwrap_or_else(|| {
194 let inferred = infer_time_bucket(levels[0].files());
195 info!(
196 "Compaction window for region {} is not present, inferring from files: {:?}",
197 region_id, inferred
198 );
199 inferred
200 });
201
202 let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
204 let mut windows =
206 assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
207 let outputs = self.build_output(region_id, &mut windows, active_window);
208
209 if outputs.is_empty() && expired_ssts.is_empty() {
210 return None;
211 }
212
213 let max_file_size = self.max_output_file_size.map(|v| v as usize);
214 Some(PickerOutput {
215 outputs,
216 expired_ssts,
217 time_window_size,
218 max_file_size,
219 })
220 }
221}
222
223struct Window {
224 start: Timestamp,
225 end: Timestamp,
226 files: HashMap<Option<NonZeroU64>, FileGroup>,
229 time_window: i64,
230 overlapping: bool,
231}
232
233impl Window {
234 fn new_with_file(file: FileHandle) -> Self {
236 let (start, end) = file.time_range();
237 let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
238 Self {
239 start,
240 end,
241 files,
242 time_window: 0,
243 overlapping: false,
244 }
245 }
246
247 fn range(&self) -> (Timestamp, Timestamp) {
249 (self.start, self.end)
250 }
251
252 fn add_file(&mut self, file: FileHandle) {
254 let (start, end) = file.time_range();
255 self.start = self.start.min(start);
256 self.end = self.end.max(end);
257
258 match self.files.entry(file.meta_ref().sequence) {
259 Entry::Occupied(mut o) => {
260 o.get_mut().add_file(file);
261 }
262 Entry::Vacant(v) => {
263 v.insert(FileGroup::new_with_file(file));
264 }
265 }
266 }
267
268 fn files(&self) -> impl Iterator<Item = &FileGroup> {
269 self.files.values()
270 }
271}
272
273fn assign_to_windows<'a>(
275 files: impl Iterator<Item = &'a FileHandle>,
276 time_window_size: i64,
277) -> BTreeMap<i64, Window> {
278 let mut windows: HashMap<i64, Window> = HashMap::new();
279 for f in files {
281 if f.compacting() {
282 continue;
283 }
284 let (_, end) = f.time_range();
285 let time_window = end
286 .convert_to(TimeUnit::Second)
287 .unwrap()
288 .value()
289 .align_to_ceil_by_bucket(time_window_size)
290 .unwrap_or(i64::MIN);
291
292 match windows.entry(time_window) {
293 Entry::Occupied(mut e) => {
294 e.get_mut().add_file(f.clone());
295 }
296 Entry::Vacant(e) => {
297 let mut window = Window::new_with_file(f.clone());
298 window.time_window = time_window;
299 e.insert(window);
300 }
301 }
302 }
303 if windows.is_empty() {
304 return BTreeMap::new();
305 }
306
307 let mut windows = windows.into_values().collect::<Vec<_>>();
308 windows.sort_unstable_by(|l, r| l.start.cmp(&r.start).then(l.end.cmp(&r.end).reverse()));
309
310 let mut current_range: (Timestamp, Timestamp) = windows[0].range(); for idx in 1..windows.len() {
313 let next_range = windows[idx].range();
314 if overlaps(¤t_range, &next_range) {
315 windows[idx - 1].overlapping = true;
316 windows[idx].overlapping = true;
317 }
318 current_range = (
319 current_range.0.min(next_range.0),
320 current_range.1.max(next_range.1),
321 );
322 }
323
324 windows.into_iter().map(|w| (w.time_window, w)).collect()
325}
326
327fn find_latest_window_in_seconds<'a>(
330 files: impl Iterator<Item = &'a FileHandle>,
331 time_window_size: i64,
332) -> Option<i64> {
333 let mut latest_timestamp = None;
334 for f in files {
335 let (_, end) = f.time_range();
336 if let Some(latest) = latest_timestamp {
337 if end > latest {
338 latest_timestamp = Some(end);
339 }
340 } else {
341 latest_timestamp = Some(end);
342 }
343 }
344 latest_timestamp
345 .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
346 .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
347}
348
349#[cfg(test)]
350mod tests {
351 use std::collections::HashSet;
352
353 use store_api::storage::FileId;
354
355 use super::*;
356 use crate::compaction::test_util::{
357 new_file_handle, new_file_handle_with_sequence, new_file_handle_with_size_and_sequence,
358 };
359 use crate::sst::file::Level;
360
361 #[test]
362 fn test_get_latest_window_in_seconds() {
363 assert_eq!(
364 Some(1),
365 find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
366 );
367 assert_eq!(
368 Some(1),
369 find_latest_window_in_seconds(
370 [new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
371 1
372 )
373 );
374
375 assert_eq!(
376 Some(-9223372036854000),
377 find_latest_window_in_seconds(
378 [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
379 3600,
380 )
381 );
382
383 assert_eq!(
384 (i64::MAX / 10000000 + 1) * 10000,
385 find_latest_window_in_seconds(
386 [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
387 10000,
388 )
389 .unwrap()
390 );
391
392 assert_eq!(
393 Some((i64::MAX / 3600000 + 1) * 3600),
394 find_latest_window_in_seconds(
395 [
396 new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
397 new_file_handle(FileId::random(), 0, 1000, 0)
398 ]
399 .iter(),
400 3600
401 )
402 );
403 }
404
405 #[test]
406 fn test_assign_to_windows() {
407 let windows = assign_to_windows(
408 [
409 new_file_handle(FileId::random(), 0, 999, 0),
410 new_file_handle(FileId::random(), 0, 999, 0),
411 new_file_handle(FileId::random(), 0, 999, 0),
412 new_file_handle(FileId::random(), 0, 999, 0),
413 new_file_handle(FileId::random(), 0, 999, 0),
414 ]
415 .iter(),
416 3,
417 );
418 let fgs = &windows.get(&0).unwrap().files;
419 assert_eq!(1, fgs.len());
420 assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
421
422 let files = [FileId::random(); 3];
423 let windows = assign_to_windows(
424 [
425 new_file_handle(files[0], -2000, -3, 0),
426 new_file_handle(files[1], 0, 2999, 0),
427 new_file_handle(files[2], 50, 10001, 0),
428 ]
429 .iter(),
430 3,
431 );
432 assert_eq!(
433 files[0],
434 windows.get(&0).unwrap().files().next().unwrap().files()[0]
435 .file_id()
436 .file_id()
437 );
438 assert_eq!(
439 files[1],
440 windows.get(&3).unwrap().files().next().unwrap().files()[0]
441 .file_id()
442 .file_id()
443 );
444 assert_eq!(
445 files[2],
446 windows.get(&12).unwrap().files().next().unwrap().files()[0]
447 .file_id()
448 .file_id()
449 );
450 }
451
452 #[test]
453 fn test_assign_file_groups_to_windows() {
454 let files = [
455 FileId::random(),
456 FileId::random(),
457 FileId::random(),
458 FileId::random(),
459 ];
460 let windows = assign_to_windows(
461 [
462 new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
463 new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
464 new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
465 new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
466 ]
467 .iter(),
468 3,
469 );
470 assert_eq!(windows.len(), 1);
471 let fgs = &windows.get(&0).unwrap().files;
472 assert_eq!(2, fgs.len());
473 assert_eq!(
474 fgs.get(&NonZeroU64::new(1))
475 .unwrap()
476 .files()
477 .iter()
478 .map(|f| f.file_id().file_id())
479 .collect::<HashSet<_>>(),
480 [files[0], files[1]].into_iter().collect()
481 );
482 assert_eq!(
483 fgs.get(&NonZeroU64::new(2))
484 .unwrap()
485 .files()
486 .iter()
487 .map(|f| f.file_id().file_id())
488 .collect::<HashSet<_>>(),
489 [files[2], files[3]].into_iter().collect()
490 );
491 }
492
493 #[test]
494 fn test_assign_compacting_to_windows() {
495 let files = [
496 new_file_handle(FileId::random(), 0, 999, 0),
497 new_file_handle(FileId::random(), 0, 999, 0),
498 new_file_handle(FileId::random(), 0, 999, 0),
499 new_file_handle(FileId::random(), 0, 999, 0),
500 new_file_handle(FileId::random(), 0, 999, 0),
501 ];
502 files[0].set_compacting(true);
503 files[2].set_compacting(true);
504 let mut windows = assign_to_windows(files.iter(), 3);
505 let window0 = windows.remove(&0).unwrap();
506 assert_eq!(1, window0.files.len());
507 let candidates = window0
508 .files
509 .into_values()
510 .flat_map(|fg| fg.into_files())
511 .map(|f| f.file_id().file_id())
512 .collect::<HashSet<_>>();
513 assert_eq!(candidates.len(), 3);
514 assert_eq!(
515 candidates,
516 [
517 files[1].file_id().file_id(),
518 files[3].file_id().file_id(),
519 files[4].file_id().file_id()
520 ]
521 .into_iter()
522 .collect::<HashSet<_>>()
523 );
524 }
525
526 type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>);
528
529 fn check_assign_to_windows_with_overlapping(
530 file_time_ranges: &[(i64, i64)],
531 time_window: i64,
532 expected_files: &[ExpectedWindowSpec],
533 ) {
534 let files: Vec<_> = (0..file_time_ranges.len())
535 .map(|_| FileId::random())
536 .collect();
537
538 let file_handles = files
539 .iter()
540 .zip(file_time_ranges.iter())
541 .map(|(file_id, range)| new_file_handle(*file_id, range.0, range.1, 0))
542 .collect::<Vec<_>>();
543
544 let windows = assign_to_windows(file_handles.iter(), time_window);
545
546 for (expected_window, overlapping, window_files) in expected_files {
547 let actual_window = windows.get(expected_window).unwrap();
548 assert_eq!(*overlapping, actual_window.overlapping);
549 let mut file_ranges = actual_window
550 .files
551 .iter()
552 .flat_map(|(_, f)| {
553 f.files().iter().map(|f| {
554 let (s, e) = f.time_range();
555 (s.value(), e.value())
556 })
557 })
558 .collect::<Vec<_>>();
559 file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
560 assert_eq!(window_files, &file_ranges);
561 }
562 }
563
564 #[test]
565 fn test_assign_to_windows_with_overlapping() {
566 check_assign_to_windows_with_overlapping(
567 &[(0, 999), (1000, 1999), (2000, 2999)],
568 2,
569 &[
570 (0, false, vec![(0, 999)]),
571 (2, false, vec![(1000, 1999), (2000, 2999)]),
572 ],
573 );
574
575 check_assign_to_windows_with_overlapping(
576 &[(0, 1), (0, 999), (100, 2999)],
577 2,
578 &[
579 (0, true, vec![(0, 1), (0, 999)]),
580 (2, true, vec![(100, 2999)]),
581 ],
582 );
583
584 check_assign_to_windows_with_overlapping(
585 &[(0, 999), (1000, 1999), (2000, 2999), (3000, 3999)],
586 2,
587 &[
588 (0, false, vec![(0, 999)]),
589 (2, false, vec![(1000, 1999), (2000, 2999)]),
590 (4, false, vec![(3000, 3999)]),
591 ],
592 );
593
594 check_assign_to_windows_with_overlapping(
595 &[
596 (0, 999),
597 (1000, 1999),
598 (2000, 2999),
599 (3000, 3999),
600 (0, 3999),
601 ],
602 2,
603 &[
604 (0, true, vec![(0, 999)]),
605 (2, true, vec![(1000, 1999), (2000, 2999)]),
606 (4, true, vec![(0, 3999), (3000, 3999)]),
607 ],
608 );
609
610 check_assign_to_windows_with_overlapping(
611 &[
612 (0, 999),
613 (1000, 1999),
614 (2000, 2999),
615 (3000, 3999),
616 (1999, 3999),
617 ],
618 2,
619 &[
620 (0, false, vec![(0, 999)]),
621 (2, true, vec![(1000, 1999), (2000, 2999)]),
622 (4, true, vec![(1999, 3999), (3000, 3999)]),
623 ],
624 );
625
626 check_assign_to_windows_with_overlapping(
627 &[
628 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (2999, 3999), ],
634 2,
635 &[
636 (0, false, vec![(0, 999)]),
638 (2, true, vec![(1000, 1999), (2000, 2999)]),
639 (4, true, vec![(2999, 3999), (3000, 3999)]),
640 ],
641 );
642
643 check_assign_to_windows_with_overlapping(
644 &[
645 (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (0, 1000), ],
651 2,
652 &[
653 (0, true, vec![(0, 999)]),
655 (2, true, vec![(0, 1000), (1000, 1999), (2000, 2999)]),
656 (4, false, vec![(3000, 3999)]),
657 ],
658 );
659 }
660
661 struct CompactionPickerTestCase {
662 window_size: i64,
663 input_files: Vec<FileHandle>,
664 expected_outputs: Vec<ExpectedOutput>,
665 }
666
667 impl CompactionPickerTestCase {
668 fn check(&self) {
669 let file_id_to_idx = self
670 .input_files
671 .iter()
672 .enumerate()
673 .map(|(idx, file)| (file.file_id(), idx))
674 .collect::<HashMap<_, _>>();
675 let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
676 let active_window =
677 find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
678 let output = TwcsPicker {
679 trigger_file_num: 4,
680 time_window_seconds: None,
681 max_output_file_size: None,
682 append_mode: false,
683 }
684 .build_output(RegionId::from_u64(0), &mut windows, active_window);
685
686 let output = output
687 .iter()
688 .map(|o| {
689 let input_file_ids = o
690 .inputs
691 .iter()
692 .map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
693 .collect::<HashSet<_>>();
694 (input_file_ids, o.output_level)
695 })
696 .collect::<Vec<_>>();
697
698 let expected = self
699 .expected_outputs
700 .iter()
701 .map(|o| {
702 let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
703 (input_file_ids, o.output_level)
704 })
705 .collect::<Vec<_>>();
706 assert_eq!(expected, output);
707 }
708 }
709
710 struct ExpectedOutput {
711 input_files: Vec<usize>,
712 output_level: Level,
713 }
714
715 #[test]
716 fn test_build_twcs_output() {
717 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
718
719 CompactionPickerTestCase {
721 window_size: 3,
722 input_files: [
723 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
724 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
725 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), ]
728 .to_vec(),
729 expected_outputs: vec![
730 ExpectedOutput {
731 input_files: vec![0, 1],
732 output_level: 1,
733 },
734 ExpectedOutput {
735 input_files: vec![2, 3],
736 output_level: 1,
737 },
738 ],
739 }
740 .check();
741
742 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
749 CompactionPickerTestCase {
750 window_size: 3,
751 input_files: [
752 new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
753 new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
754 new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
755 new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
756 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
757 ]
758 .to_vec(),
759 expected_outputs: vec![
760 ExpectedOutput {
761 input_files: vec![0, 1],
762 output_level: 1,
763 },
764 ExpectedOutput {
765 input_files: vec![2, 4],
766 output_level: 1,
767 },
768 ],
769 }
770 .check();
771
772 let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
776 CompactionPickerTestCase {
777 window_size: 3,
778 input_files: [
779 new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
780 new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
781 new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
782 new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
783 new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
784 ]
785 .to_vec(),
786 expected_outputs: vec![ExpectedOutput {
787 input_files: vec![0, 1, 4],
788 output_level: 1,
789 }],
790 }
791 .check();
792 }
793
794 #[test]
795 fn test_append_mode_filter_large_files() {
796 let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
797 let max_output_file_size = 1000u64;
798
799 let small_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 500);
801 let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 1500);
802 let small_file_2 = new_file_handle_with_size_and_sequence(file_ids[2], 0, 999, 0, 3, 800);
803 let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[3], 0, 999, 0, 4, 2000);
804
805 let mut files_to_merge = vec![
807 FileGroup::new_with_file(small_file_1),
808 FileGroup::new_with_file(large_file_1),
809 FileGroup::new_with_file(small_file_2),
810 FileGroup::new_with_file(large_file_2),
811 ];
812
813 let original_count = files_to_merge.len();
815
816 files_to_merge.retain(|fg| fg.size() <= max_output_file_size as usize);
818
819 assert_eq!(files_to_merge.len(), 2);
821 assert_eq!(original_count, 4);
822
823 for fg in &files_to_merge {
825 assert!(
826 fg.size() <= max_output_file_size as usize,
827 "File size {} should be <= {}",
828 fg.size(),
829 max_output_file_size
830 );
831 }
832 }
833
834 }