1use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use std::{fmt, mem};
22
23use async_trait::async_trait;
24use common_telemetry::debug;
25
26use crate::error::Result;
27use crate::memtable::BoxedBatchIterator;
28use crate::metrics::READ_STAGE_ELAPSED;
29use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
30
31pub trait MergeMetricsReport: Send + Sync {
33 fn report(&self, metrics: &mut MergeMetrics);
35}
36
37pub struct MergeReader {
48 hot: BinaryHeap<Node>,
54 cold: BinaryHeap<Node>,
58 output_batch: Option<Batch>,
60 metrics: MergeMetrics,
62 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
64}
65
66#[async_trait]
67impl BatchReader for MergeReader {
68 async fn next_batch(&mut self) -> Result<Option<Batch>> {
69 let start = Instant::now();
70 while !self.hot.is_empty() && self.output_batch.is_none() {
71 if self.hot.len() == 1 {
72 self.fetch_batch_from_hottest().await?;
74 self.metrics.num_fetch_by_batches += 1;
75 } else {
76 self.fetch_rows_from_hottest().await?;
78 self.metrics.num_fetch_by_rows += 1;
79 }
80 }
81
82 if let Some(batch) = self.output_batch.take() {
83 self.metrics.scan_cost += start.elapsed();
84 self.metrics.maybe_report(&self.metrics_reporter);
85 Ok(Some(batch))
86 } else {
87 self.metrics.scan_cost += start.elapsed();
89 self.metrics.maybe_report(&self.metrics_reporter);
90 Ok(None)
91 }
92 }
93}
94
95impl Drop for MergeReader {
96 fn drop(&mut self) {
97 debug!("Merge reader finished, metrics: {:?}", self.metrics);
98
99 READ_STAGE_ELAPSED
100 .with_label_values(&["merge"])
101 .observe(self.metrics.scan_cost.as_secs_f64());
102 READ_STAGE_ELAPSED
103 .with_label_values(&["merge_fetch"])
104 .observe(self.metrics.fetch_cost.as_secs_f64());
105
106 if let Some(reporter) = &self.metrics_reporter {
108 reporter.report(&mut self.metrics);
109 }
110 }
111}
112
113impl MergeReader {
114 pub async fn new(
116 sources: Vec<Source>,
117 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
118 ) -> Result<MergeReader> {
119 let start = Instant::now();
120 let mut metrics = MergeMetrics::default();
121
122 let mut cold = BinaryHeap::with_capacity(sources.len());
123 let hot = BinaryHeap::with_capacity(sources.len());
124 for source in sources {
125 let node = Node::new(source, &mut metrics).await?;
126 if !node.is_eof() {
127 cold.push(node);
129 }
130 }
131
132 let mut reader = MergeReader {
133 hot,
134 cold,
135 output_batch: None,
136 metrics,
137 metrics_reporter,
138 };
139 reader.refill_hot();
141
142 let elapsed = start.elapsed();
143 reader.metrics.init_cost += elapsed;
144 reader.metrics.scan_cost += elapsed;
145 Ok(reader)
146 }
147
148 fn refill_hot(&mut self) {
151 while !self.cold.is_empty() {
152 if let Some(merge_window) = self.hot.peek() {
153 let warmest = self.cold.peek().unwrap();
154 if warmest.is_behind(merge_window) {
155 break;
159 }
160 }
161
162 let warmest = self.cold.pop().unwrap();
163 self.hot.push(warmest);
164 }
165 }
166
167 async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
169 assert_eq!(1, self.hot.len());
170
171 let mut hottest = self.hot.pop().unwrap();
172 let batch = hottest.fetch_batch(&mut self.metrics).await?;
173 Self::maybe_output_batch(batch, &mut self.output_batch)?;
174 self.reheap(hottest)
175 }
176
177 async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
179 let mut top_node = self.hot.pop().unwrap();
182 let top = top_node.current_batch();
183 let next_min_ts = {
185 let next_node = self.hot.peek().unwrap();
186 let next = next_node.current_batch();
187 debug_assert_eq!(top.primary_key(), next.primary_key());
189 next.first_timestamp().unwrap()
191 };
192
193 let timestamps = top.timestamps_native().unwrap();
195 let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
199 Ok(pos) => pos,
200 Err(pos) => {
201 Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
203 top_node.skip_rows(pos, &mut self.metrics).await?;
204 return self.reheap(top_node);
205 }
206 };
207
208 let output_end = if duplicate_pos == 0 {
210 1
213 } else {
214 duplicate_pos
217 };
218 Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
219 top_node.skip_rows(output_end, &mut self.metrics).await?;
220 self.reheap(top_node)
221 }
222
223 fn reheap(&mut self, node: Node) -> Result<()> {
225 if node.is_eof() {
226 self.refill_hot();
229 } else {
230 let node_is_cold = if let Some(hottest) = self.hot.peek() {
232 node.is_behind(hottest)
235 } else {
236 true
239 };
240
241 if node_is_cold {
242 self.cold.push(node);
243 } else {
244 self.hot.push(node);
245 }
246 self.refill_hot();
248 }
249
250 Ok(())
251 }
252
253 fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
257 debug_assert!(output_batch.is_none());
258 if batch.is_empty() {
259 return Ok(());
260 }
261 *output_batch = Some(batch);
262
263 Ok(())
264 }
265}
266
267#[derive(Default)]
269pub struct MergeReaderBuilder {
270 sources: Vec<Source>,
274 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
276}
277
278impl MergeReaderBuilder {
279 pub fn new() -> MergeReaderBuilder {
281 MergeReaderBuilder::default()
282 }
283
284 pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
286 MergeReaderBuilder {
287 sources,
288 metrics_reporter: None,
289 }
290 }
291
292 pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
294 self.sources.push(Source::Reader(reader));
295 self
296 }
297
298 pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
300 self.sources.push(Source::Iter(iter));
301 self
302 }
303
304 pub fn with_metrics_reporter(
306 &mut self,
307 reporter: Option<Arc<dyn MergeMetricsReport>>,
308 ) -> &mut Self {
309 self.metrics_reporter = reporter;
310 self
311 }
312
313 pub async fn build(&mut self) -> Result<MergeReader> {
315 let sources = mem::take(&mut self.sources);
316 let metrics_reporter = self.metrics_reporter.take();
317 MergeReader::new(sources, metrics_reporter).await
318 }
319}
320
321#[derive(Default)]
323pub struct MergeMetrics {
324 pub(crate) init_cost: Duration,
326 pub(crate) scan_cost: Duration,
328 pub(crate) num_fetch_by_batches: usize,
330 pub(crate) num_fetch_by_rows: usize,
332 pub(crate) fetch_cost: Duration,
334}
335
336impl fmt::Debug for MergeMetrics {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 if self.scan_cost.is_zero() {
340 return write!(f, "{{}}");
341 }
342
343 write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
344
345 if !self.init_cost.is_zero() {
346 write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
347 }
348 if self.num_fetch_by_batches > 0 {
349 write!(
350 f,
351 r#", "num_fetch_by_batches":{}"#,
352 self.num_fetch_by_batches
353 )?;
354 }
355 if self.num_fetch_by_rows > 0 {
356 write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
357 }
358 if !self.fetch_cost.is_zero() {
359 write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
360 }
361
362 write!(f, "}}")
363 }
364}
365
366impl MergeMetrics {
367 pub(crate) fn merge(&mut self, other: &MergeMetrics) {
369 let MergeMetrics {
370 init_cost,
371 scan_cost,
372 num_fetch_by_batches,
373 num_fetch_by_rows,
374 fetch_cost,
375 } = other;
376
377 self.init_cost += *init_cost;
378 self.scan_cost += *scan_cost;
379 self.num_fetch_by_batches += *num_fetch_by_batches;
380 self.num_fetch_by_rows += *num_fetch_by_rows;
381 self.fetch_cost += *fetch_cost;
382 }
383
384 pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
386 if self.scan_cost.as_millis() > 10
387 && let Some(r) = reporter
388 {
389 r.report(self);
390 }
391 }
392}
393
394struct Node {
396 source: Source,
398 current_batch: Option<CompareFirst>,
402}
403
404impl Node {
405 async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result<Node> {
409 let start = Instant::now();
411 let current_batch = source.next_batch().await?.map(CompareFirst);
412 metrics.fetch_cost += start.elapsed();
413
414 Ok(Node {
415 source,
416 current_batch,
417 })
418 }
419
420 fn is_eof(&self) -> bool {
422 self.current_batch.is_none()
423 }
424
425 fn primary_key(&self) -> &[u8] {
430 self.current_batch().primary_key()
431 }
432
433 fn current_batch(&self) -> &Batch {
438 &self.current_batch.as_ref().unwrap().0
439 }
440
441 async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result<Batch> {
447 let current = self.current_batch.take().unwrap();
448 let start = Instant::now();
449 self.current_batch = self.source.next_batch().await?.map(CompareFirst);
451 metrics.fetch_cost += start.elapsed();
452 Ok(current.0)
453 }
454
455 fn is_behind(&self, other: &Node) -> bool {
461 debug_assert!(!self.current_batch().is_empty());
462 debug_assert!(!other.current_batch().is_empty());
463
464 self.primary_key().cmp(other.primary_key()).then_with(|| {
468 self.current_batch()
469 .first_timestamp()
470 .cmp(&other.current_batch().last_timestamp())
471 }) == Ordering::Greater
472 }
473
474 async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> {
480 let batch = self.current_batch();
481 debug_assert!(batch.num_rows() >= num_to_skip);
482
483 let remaining = batch.num_rows() - num_to_skip;
484 if remaining == 0 {
485 self.fetch_batch(metrics).await?;
487 } else {
488 debug_assert!(!batch.is_empty());
489 self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
490 }
491
492 Ok(())
493 }
494}
495
496impl PartialEq for Node {
497 fn eq(&self, other: &Node) -> bool {
498 self.current_batch == other.current_batch
499 }
500}
501
502impl Eq for Node {}
503
504impl PartialOrd for Node {
505 fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
506 Some(self.cmp(other))
507 }
508}
509
510impl Ord for Node {
511 fn cmp(&self, other: &Node) -> Ordering {
512 other.current_batch.cmp(&self.current_batch)
515 }
516}
517
518struct CompareFirst(Batch);
522
523impl PartialEq for CompareFirst {
524 fn eq(&self, other: &Self) -> bool {
525 self.0.primary_key() == other.0.primary_key()
526 && self.0.first_timestamp() == other.0.first_timestamp()
527 && self.0.first_sequence() == other.0.first_sequence()
528 }
529}
530
531impl Eq for CompareFirst {}
532
533impl PartialOrd for CompareFirst {
534 fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
535 Some(self.cmp(other))
536 }
537}
538
539impl Ord for CompareFirst {
540 fn cmp(&self, other: &CompareFirst) -> Ordering {
542 self.0
543 .primary_key()
544 .cmp(other.0.primary_key())
545 .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
546 .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use api::v1::OpType;
553
554 use super::*;
555 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
556
557 #[tokio::test]
558 async fn test_merge_reader_empty() {
559 let mut reader = MergeReaderBuilder::new().build().await.unwrap();
560 assert!(reader.next_batch().await.unwrap().is_none());
561 assert!(reader.next_batch().await.unwrap().is_none());
562 }
563
564 #[tokio::test]
565 async fn test_merge_non_overlapping() {
566 let reader1 = VecBatchReader::new(&[
567 new_batch(
568 b"k1",
569 &[1, 2],
570 &[11, 12],
571 &[OpType::Put, OpType::Put],
572 &[21, 22],
573 ),
574 new_batch(
575 b"k1",
576 &[7, 8],
577 &[17, 18],
578 &[OpType::Put, OpType::Delete],
579 &[27, 28],
580 ),
581 new_batch(
582 b"k2",
583 &[2, 3],
584 &[12, 13],
585 &[OpType::Delete, OpType::Put],
586 &[22, 23],
587 ),
588 ]);
589 let reader2 = VecBatchReader::new(&[new_batch(
590 b"k1",
591 &[4, 5],
592 &[14, 15],
593 &[OpType::Put, OpType::Put],
594 &[24, 25],
595 )]);
596 let mut reader = MergeReaderBuilder::new()
597 .push_batch_reader(Box::new(reader1))
598 .push_batch_iter(Box::new(reader2))
599 .build()
600 .await
601 .unwrap();
602 check_reader_result(
603 &mut reader,
604 &[
605 new_batch(
606 b"k1",
607 &[1, 2],
608 &[11, 12],
609 &[OpType::Put, OpType::Put],
610 &[21, 22],
611 ),
612 new_batch(
613 b"k1",
614 &[4, 5],
615 &[14, 15],
616 &[OpType::Put, OpType::Put],
617 &[24, 25],
618 ),
619 new_batch(
620 b"k1",
621 &[7, 8],
622 &[17, 18],
623 &[OpType::Put, OpType::Delete],
624 &[27, 28],
625 ),
626 new_batch(
627 b"k2",
628 &[2, 3],
629 &[12, 13],
630 &[OpType::Delete, OpType::Put],
631 &[22, 23],
632 ),
633 ],
634 )
635 .await;
636 }
637
638 #[tokio::test]
639 async fn test_merge_reheap_hot() {
640 let reader1 = VecBatchReader::new(&[
641 new_batch(
642 b"k1",
643 &[1, 3],
644 &[10, 10],
645 &[OpType::Put, OpType::Put],
646 &[21, 23],
647 ),
648 new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
649 ]);
650 let reader2 = VecBatchReader::new(&[new_batch(
651 b"k1",
652 &[2, 4],
653 &[11, 11],
654 &[OpType::Put, OpType::Put],
655 &[32, 34],
656 )]);
657 let mut reader = MergeReaderBuilder::new()
658 .push_batch_reader(Box::new(reader1))
659 .push_batch_iter(Box::new(reader2))
660 .build()
661 .await
662 .unwrap();
663 check_reader_result(
664 &mut reader,
665 &[
666 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
667 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
668 new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
669 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
670 new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
671 ],
672 )
673 .await;
674 }
675
676 #[tokio::test]
677 async fn test_merge_overlapping() {
678 let reader1 = VecBatchReader::new(&[
679 new_batch(
680 b"k1",
681 &[1, 2],
682 &[11, 12],
683 &[OpType::Put, OpType::Put],
684 &[21, 22],
685 ),
686 new_batch(
687 b"k1",
688 &[4, 5],
689 &[14, 15],
690 &[OpType::Put, OpType::Delete],
692 &[24, 25],
693 ),
694 new_batch(
695 b"k2",
696 &[2, 3],
697 &[12, 13],
698 &[OpType::Delete, OpType::Put],
700 &[22, 23],
701 ),
702 ]);
703 let reader2 = VecBatchReader::new(&[
704 new_batch(
705 b"k1",
706 &[3, 4, 5],
707 &[10, 10, 10],
708 &[OpType::Put, OpType::Put, OpType::Put],
709 &[33, 34, 35],
710 ),
711 new_batch(
712 b"k2",
713 &[1, 10],
714 &[11, 20],
715 &[OpType::Put, OpType::Put],
716 &[21, 30],
717 ),
718 ]);
719 let mut reader = MergeReaderBuilder::new()
720 .push_batch_reader(Box::new(reader1))
721 .push_batch_iter(Box::new(reader2))
722 .build()
723 .await
724 .unwrap();
725 check_reader_result(
726 &mut reader,
727 &[
728 new_batch(
729 b"k1",
730 &[1, 2],
731 &[11, 12],
732 &[OpType::Put, OpType::Put],
733 &[21, 22],
734 ),
735 new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
736 new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
737 new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
738 new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
739 new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
740 new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
741 new_batch(
742 b"k2",
743 &[2, 3],
744 &[12, 13],
745 &[OpType::Delete, OpType::Put],
746 &[22, 23],
747 ),
748 new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
749 ],
750 )
751 .await;
752 }
753
754 #[tokio::test]
755 async fn test_merge_deleted() {
756 let reader1 = VecBatchReader::new(&[
757 new_batch(
758 b"k1",
759 &[1, 2],
760 &[11, 12],
761 &[OpType::Delete, OpType::Delete],
762 &[21, 22],
763 ),
764 new_batch(
765 b"k2",
766 &[2, 3],
767 &[12, 13],
768 &[OpType::Delete, OpType::Put],
769 &[22, 23],
770 ),
771 ]);
772 let reader2 = VecBatchReader::new(&[new_batch(
773 b"k1",
774 &[4, 5],
775 &[14, 15],
776 &[OpType::Delete, OpType::Delete],
777 &[24, 25],
778 )]);
779 let mut reader = MergeReaderBuilder::new()
780 .push_batch_reader(Box::new(reader1))
781 .push_batch_iter(Box::new(reader2))
782 .build()
783 .await
784 .unwrap();
785 check_reader_result(
786 &mut reader,
787 &[
788 new_batch(
789 b"k1",
790 &[1, 2],
791 &[11, 12],
792 &[OpType::Delete, OpType::Delete],
793 &[21, 22],
794 ),
795 new_batch(
796 b"k1",
797 &[4, 5],
798 &[14, 15],
799 &[OpType::Delete, OpType::Delete],
800 &[24, 25],
801 ),
802 new_batch(
803 b"k2",
804 &[2, 3],
805 &[12, 13],
806 &[OpType::Delete, OpType::Put],
807 &[22, 23],
808 ),
809 ],
810 )
811 .await;
812 }
813
814 #[tokio::test]
815 async fn test_merge_next_node_empty() {
816 let reader1 = VecBatchReader::new(&[new_batch(
817 b"k1",
818 &[1, 2],
819 &[11, 12],
820 &[OpType::Put, OpType::Put],
821 &[21, 22],
822 )]);
823 let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
824 let mut reader = MergeReaderBuilder::new()
825 .push_batch_reader(Box::new(reader1))
826 .push_batch_iter(Box::new(reader2))
827 .build()
828 .await
829 .unwrap();
830 check_reader_result(
831 &mut reader,
832 &[
833 new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
834 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
835 new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
836 ],
837 )
838 .await;
839 }
840
841 #[tokio::test]
842 async fn test_merge_top_node_empty() {
843 let reader1 = VecBatchReader::new(&[new_batch(
844 b"k1",
845 &[1, 2],
846 &[10, 10],
847 &[OpType::Put, OpType::Put],
848 &[21, 22],
849 )]);
850 let reader2 = VecBatchReader::new(&[new_batch(
851 b"k1",
852 &[2, 3],
853 &[11, 11],
854 &[OpType::Put, OpType::Put],
855 &[32, 33],
856 )]);
857 let mut reader = MergeReaderBuilder::new()
858 .push_batch_reader(Box::new(reader1))
859 .push_batch_iter(Box::new(reader2))
860 .build()
861 .await
862 .unwrap();
863 check_reader_result(
864 &mut reader,
865 &[
866 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
867 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
868 new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
869 new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
870 ],
871 )
872 .await;
873 }
874
875 #[tokio::test]
876 async fn test_merge_large_range() {
877 let reader1 = VecBatchReader::new(&[new_batch(
878 b"k1",
879 &[1, 10],
880 &[10, 10],
881 &[OpType::Put, OpType::Put],
882 &[21, 30],
883 )]);
884 let reader2 = VecBatchReader::new(&[new_batch(
885 b"k1",
886 &[1, 20],
887 &[11, 11],
888 &[OpType::Put, OpType::Put],
889 &[31, 40],
890 )]);
891 let reader3 = VecBatchReader::new(&[new_batch(
894 b"k1",
895 &[6, 8],
896 &[11, 11],
897 &[OpType::Put, OpType::Put],
898 &[36, 38],
899 )]);
900 let mut reader = MergeReaderBuilder::new()
901 .push_batch_reader(Box::new(reader1))
902 .push_batch_iter(Box::new(reader2))
903 .push_batch_reader(Box::new(reader3))
904 .build()
905 .await
906 .unwrap();
907 check_reader_result(
908 &mut reader,
909 &[
910 new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
911 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
912 new_batch(
913 b"k1",
914 &[6, 8],
915 &[11, 11],
916 &[OpType::Put, OpType::Put],
917 &[36, 38],
918 ),
919 new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
920 new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
921 ],
922 )
923 .await;
924 }
925
926 #[tokio::test]
927 async fn test_merge_many_duplicates() {
928 let mut builder = MergeReaderBuilder::new();
929 for i in 0..10 {
930 let batches: Vec<_> = (0..8)
931 .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
932 .collect();
933 let reader = VecBatchReader::new(&batches);
934 builder.push_batch_reader(Box::new(reader));
935 }
936 let mut reader = builder.build().await.unwrap();
937 let mut expect = Vec::with_capacity(80);
938 for ts in 0..8 {
939 for i in 0..10 {
940 let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
941 expect.push(batch);
942 }
943 }
944 check_reader_result(&mut reader, &expect).await;
945 }
946
947 #[tokio::test]
948 async fn test_merge_keep_duplicate() {
949 let reader1 = VecBatchReader::new(&[new_batch(
950 b"k1",
951 &[1, 2],
952 &[10, 10],
953 &[OpType::Put, OpType::Put],
954 &[21, 22],
955 )]);
956 let reader2 = VecBatchReader::new(&[new_batch(
957 b"k1",
958 &[2, 3],
959 &[11, 11],
960 &[OpType::Put, OpType::Put],
961 &[32, 33],
962 )]);
963 let sources = vec![
964 Source::Reader(Box::new(reader1)),
965 Source::Iter(Box::new(reader2)),
966 ];
967 let mut reader = MergeReaderBuilder::from_sources(sources)
968 .build()
969 .await
970 .unwrap();
971 check_reader_result(
972 &mut reader,
973 &[
974 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
975 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
976 new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
977 new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
978 ],
979 )
980 .await;
981 }
982}