1use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::mem;
20use std::time::{Duration, Instant};
21
22use async_trait::async_trait;
23use common_telemetry::debug;
24
25use crate::error::Result;
26use crate::memtable::BoxedBatchIterator;
27use crate::metrics::READ_STAGE_ELAPSED;
28use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
29
30pub struct MergeReader {
41 hot: BinaryHeap<Node>,
47 cold: BinaryHeap<Node>,
51 output_batch: Option<Batch>,
53 metrics: Metrics,
55}
56
57#[async_trait]
58impl BatchReader for MergeReader {
59 async fn next_batch(&mut self) -> Result<Option<Batch>> {
60 let start = Instant::now();
61 while !self.hot.is_empty() && self.output_batch.is_none() {
62 if self.hot.len() == 1 {
63 self.fetch_batch_from_hottest().await?;
65 self.metrics.num_fetch_by_batches += 1;
66 } else {
67 self.fetch_rows_from_hottest().await?;
69 self.metrics.num_fetch_by_rows += 1;
70 }
71 }
72
73 if let Some(batch) = self.output_batch.take() {
74 self.metrics.scan_cost += start.elapsed();
75 self.metrics.num_output_rows += batch.num_rows();
76 Ok(Some(batch))
77 } else {
78 self.metrics.scan_cost += start.elapsed();
80 Ok(None)
81 }
82 }
83}
84
85impl Drop for MergeReader {
86 fn drop(&mut self) {
87 debug!("Merge reader finished, metrics: {:?}", self.metrics);
88
89 READ_STAGE_ELAPSED
90 .with_label_values(&["merge"])
91 .observe(self.metrics.scan_cost.as_secs_f64());
92 READ_STAGE_ELAPSED
93 .with_label_values(&["merge_fetch"])
94 .observe(self.metrics.fetch_cost.as_secs_f64());
95 }
96}
97
98impl MergeReader {
99 pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
101 let start = Instant::now();
102 let mut metrics = Metrics::default();
103
104 let mut cold = BinaryHeap::with_capacity(sources.len());
105 let hot = BinaryHeap::with_capacity(sources.len());
106 for source in sources {
107 let node = Node::new(source, &mut metrics).await?;
108 if !node.is_eof() {
109 cold.push(node);
111 }
112 }
113
114 let mut reader = MergeReader {
115 hot,
116 cold,
117 output_batch: None,
118 metrics,
119 };
120 reader.refill_hot();
122
123 reader.metrics.scan_cost += start.elapsed();
124 Ok(reader)
125 }
126
127 fn refill_hot(&mut self) {
130 while !self.cold.is_empty() {
131 if let Some(merge_window) = self.hot.peek() {
132 let warmest = self.cold.peek().unwrap();
133 if warmest.is_behind(merge_window) {
134 break;
138 }
139 }
140
141 let warmest = self.cold.pop().unwrap();
142 self.hot.push(warmest);
143 }
144 }
145
146 async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
148 assert_eq!(1, self.hot.len());
149
150 let mut hottest = self.hot.pop().unwrap();
151 let batch = hottest.fetch_batch(&mut self.metrics).await?;
152 Self::maybe_output_batch(batch, &mut self.output_batch)?;
153 self.reheap(hottest)
154 }
155
156 async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
158 let mut top_node = self.hot.pop().unwrap();
161 let top = top_node.current_batch();
162 let next_min_ts = {
164 let next_node = self.hot.peek().unwrap();
165 let next = next_node.current_batch();
166 debug_assert_eq!(top.primary_key(), next.primary_key());
168 next.first_timestamp().unwrap()
170 };
171
172 let timestamps = top.timestamps_native().unwrap();
174 let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
178 Ok(pos) => pos,
179 Err(pos) => {
180 Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
182 top_node.skip_rows(pos, &mut self.metrics).await?;
183 return self.reheap(top_node);
184 }
185 };
186
187 let output_end = if duplicate_pos == 0 {
189 1
192 } else {
193 duplicate_pos
196 };
197 Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
198 top_node.skip_rows(output_end, &mut self.metrics).await?;
199 self.reheap(top_node)
200 }
201
202 fn reheap(&mut self, node: Node) -> Result<()> {
204 if node.is_eof() {
205 self.refill_hot();
208 } else {
209 let node_is_cold = if let Some(hottest) = self.hot.peek() {
211 node.is_behind(hottest)
214 } else {
215 true
218 };
219
220 if node_is_cold {
221 self.cold.push(node);
222 } else {
223 self.hot.push(node);
224 }
225 self.refill_hot();
227 }
228
229 Ok(())
230 }
231
232 fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
236 debug_assert!(output_batch.is_none());
237 if batch.is_empty() {
238 return Ok(());
239 }
240 *output_batch = Some(batch);
241
242 Ok(())
243 }
244}
245
246#[derive(Default)]
248pub struct MergeReaderBuilder {
249 sources: Vec<Source>,
253}
254
255impl MergeReaderBuilder {
256 pub fn new() -> MergeReaderBuilder {
258 MergeReaderBuilder::default()
259 }
260
261 pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
263 MergeReaderBuilder { sources }
264 }
265
266 pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
268 self.sources.push(Source::Reader(reader));
269 self
270 }
271
272 pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
274 self.sources.push(Source::Iter(iter));
275 self
276 }
277
278 pub async fn build(&mut self) -> Result<MergeReader> {
280 let sources = mem::take(&mut self.sources);
281 MergeReader::new(sources).await
282 }
283}
284
285#[derive(Debug, Default)]
287struct Metrics {
288 scan_cost: Duration,
290 num_fetch_by_batches: usize,
292 num_fetch_by_rows: usize,
294 num_input_rows: usize,
296 num_output_rows: usize,
298 fetch_cost: Duration,
300}
301
302struct Node {
304 source: Source,
306 current_batch: Option<CompareFirst>,
310}
311
312impl Node {
313 async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
317 let start = Instant::now();
319 let current_batch = source.next_batch().await?.map(CompareFirst);
320 metrics.fetch_cost += start.elapsed();
321 metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
322
323 Ok(Node {
324 source,
325 current_batch,
326 })
327 }
328
329 fn is_eof(&self) -> bool {
331 self.current_batch.is_none()
332 }
333
334 fn primary_key(&self) -> &[u8] {
339 self.current_batch().primary_key()
340 }
341
342 fn current_batch(&self) -> &Batch {
347 &self.current_batch.as_ref().unwrap().0
348 }
349
350 async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
356 let current = self.current_batch.take().unwrap();
357 let start = Instant::now();
358 self.current_batch = self.source.next_batch().await?.map(CompareFirst);
360 metrics.fetch_cost += start.elapsed();
361 metrics.num_input_rows += self
362 .current_batch
363 .as_ref()
364 .map(|b| b.0.num_rows())
365 .unwrap_or(0);
366 Ok(current.0)
367 }
368
369 fn is_behind(&self, other: &Node) -> bool {
375 debug_assert!(!self.current_batch().is_empty());
376 debug_assert!(!other.current_batch().is_empty());
377
378 self.primary_key().cmp(other.primary_key()).then_with(|| {
382 self.current_batch()
383 .first_timestamp()
384 .cmp(&other.current_batch().last_timestamp())
385 }) == Ordering::Greater
386 }
387
388 async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> {
394 let batch = self.current_batch();
395 debug_assert!(batch.num_rows() >= num_to_skip);
396
397 let remaining = batch.num_rows() - num_to_skip;
398 if remaining == 0 {
399 self.fetch_batch(metrics).await?;
401 } else {
402 debug_assert!(!batch.is_empty());
403 self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
404 }
405
406 Ok(())
407 }
408}
409
410impl PartialEq for Node {
411 fn eq(&self, other: &Node) -> bool {
412 self.current_batch == other.current_batch
413 }
414}
415
416impl Eq for Node {}
417
418impl PartialOrd for Node {
419 fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
420 Some(self.cmp(other))
421 }
422}
423
424impl Ord for Node {
425 fn cmp(&self, other: &Node) -> Ordering {
426 other.current_batch.cmp(&self.current_batch)
429 }
430}
431
432struct CompareFirst(Batch);
436
437impl PartialEq for CompareFirst {
438 fn eq(&self, other: &Self) -> bool {
439 self.0.primary_key() == other.0.primary_key()
440 && self.0.first_timestamp() == other.0.first_timestamp()
441 && self.0.first_sequence() == other.0.first_sequence()
442 }
443}
444
445impl Eq for CompareFirst {}
446
447impl PartialOrd for CompareFirst {
448 fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
449 Some(self.cmp(other))
450 }
451}
452
453impl Ord for CompareFirst {
454 fn cmp(&self, other: &CompareFirst) -> Ordering {
456 self.0
457 .primary_key()
458 .cmp(other.0.primary_key())
459 .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
460 .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use api::v1::OpType;
467
468 use super::*;
469 use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
470
471 #[tokio::test]
472 async fn test_merge_reader_empty() {
473 let mut reader = MergeReaderBuilder::new().build().await.unwrap();
474 assert!(reader.next_batch().await.unwrap().is_none());
475 assert!(reader.next_batch().await.unwrap().is_none());
476 }
477
478 #[tokio::test]
479 async fn test_merge_non_overlapping() {
480 let reader1 = VecBatchReader::new(&[
481 new_batch(
482 b"k1",
483 &[1, 2],
484 &[11, 12],
485 &[OpType::Put, OpType::Put],
486 &[21, 22],
487 ),
488 new_batch(
489 b"k1",
490 &[7, 8],
491 &[17, 18],
492 &[OpType::Put, OpType::Delete],
493 &[27, 28],
494 ),
495 new_batch(
496 b"k2",
497 &[2, 3],
498 &[12, 13],
499 &[OpType::Delete, OpType::Put],
500 &[22, 23],
501 ),
502 ]);
503 let reader2 = VecBatchReader::new(&[new_batch(
504 b"k1",
505 &[4, 5],
506 &[14, 15],
507 &[OpType::Put, OpType::Put],
508 &[24, 25],
509 )]);
510 let mut reader = MergeReaderBuilder::new()
511 .push_batch_reader(Box::new(reader1))
512 .push_batch_iter(Box::new(reader2))
513 .build()
514 .await
515 .unwrap();
516 check_reader_result(
517 &mut reader,
518 &[
519 new_batch(
520 b"k1",
521 &[1, 2],
522 &[11, 12],
523 &[OpType::Put, OpType::Put],
524 &[21, 22],
525 ),
526 new_batch(
527 b"k1",
528 &[4, 5],
529 &[14, 15],
530 &[OpType::Put, OpType::Put],
531 &[24, 25],
532 ),
533 new_batch(
534 b"k1",
535 &[7, 8],
536 &[17, 18],
537 &[OpType::Put, OpType::Delete],
538 &[27, 28],
539 ),
540 new_batch(
541 b"k2",
542 &[2, 3],
543 &[12, 13],
544 &[OpType::Delete, OpType::Put],
545 &[22, 23],
546 ),
547 ],
548 )
549 .await;
550
551 assert_eq!(8, reader.metrics.num_input_rows);
552 assert_eq!(8, reader.metrics.num_output_rows);
553 }
554
555 #[tokio::test]
556 async fn test_merge_reheap_hot() {
557 let reader1 = VecBatchReader::new(&[
558 new_batch(
559 b"k1",
560 &[1, 3],
561 &[10, 10],
562 &[OpType::Put, OpType::Put],
563 &[21, 23],
564 ),
565 new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
566 ]);
567 let reader2 = VecBatchReader::new(&[new_batch(
568 b"k1",
569 &[2, 4],
570 &[11, 11],
571 &[OpType::Put, OpType::Put],
572 &[32, 34],
573 )]);
574 let mut reader = MergeReaderBuilder::new()
575 .push_batch_reader(Box::new(reader1))
576 .push_batch_iter(Box::new(reader2))
577 .build()
578 .await
579 .unwrap();
580 check_reader_result(
581 &mut reader,
582 &[
583 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
584 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
585 new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
586 new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
587 new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
588 ],
589 )
590 .await;
591 }
592
593 #[tokio::test]
594 async fn test_merge_overlapping() {
595 let reader1 = VecBatchReader::new(&[
596 new_batch(
597 b"k1",
598 &[1, 2],
599 &[11, 12],
600 &[OpType::Put, OpType::Put],
601 &[21, 22],
602 ),
603 new_batch(
604 b"k1",
605 &[4, 5],
606 &[14, 15],
607 &[OpType::Put, OpType::Delete],
609 &[24, 25],
610 ),
611 new_batch(
612 b"k2",
613 &[2, 3],
614 &[12, 13],
615 &[OpType::Delete, OpType::Put],
617 &[22, 23],
618 ),
619 ]);
620 let reader2 = VecBatchReader::new(&[
621 new_batch(
622 b"k1",
623 &[3, 4, 5],
624 &[10, 10, 10],
625 &[OpType::Put, OpType::Put, OpType::Put],
626 &[33, 34, 35],
627 ),
628 new_batch(
629 b"k2",
630 &[1, 10],
631 &[11, 20],
632 &[OpType::Put, OpType::Put],
633 &[21, 30],
634 ),
635 ]);
636 let mut reader = MergeReaderBuilder::new()
637 .push_batch_reader(Box::new(reader1))
638 .push_batch_iter(Box::new(reader2))
639 .build()
640 .await
641 .unwrap();
642 check_reader_result(
643 &mut reader,
644 &[
645 new_batch(
646 b"k1",
647 &[1, 2],
648 &[11, 12],
649 &[OpType::Put, OpType::Put],
650 &[21, 22],
651 ),
652 new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
653 new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
654 new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
655 new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
656 new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
657 new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
658 new_batch(
659 b"k2",
660 &[2, 3],
661 &[12, 13],
662 &[OpType::Delete, OpType::Put],
663 &[22, 23],
664 ),
665 new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
666 ],
667 )
668 .await;
669
670 assert_eq!(11, reader.metrics.num_input_rows);
671 assert_eq!(11, reader.metrics.num_output_rows);
672 }
673
674 #[tokio::test]
675 async fn test_merge_deleted() {
676 let reader1 = VecBatchReader::new(&[
677 new_batch(
678 b"k1",
679 &[1, 2],
680 &[11, 12],
681 &[OpType::Delete, OpType::Delete],
682 &[21, 22],
683 ),
684 new_batch(
685 b"k2",
686 &[2, 3],
687 &[12, 13],
688 &[OpType::Delete, OpType::Put],
689 &[22, 23],
690 ),
691 ]);
692 let reader2 = VecBatchReader::new(&[new_batch(
693 b"k1",
694 &[4, 5],
695 &[14, 15],
696 &[OpType::Delete, OpType::Delete],
697 &[24, 25],
698 )]);
699 let mut reader = MergeReaderBuilder::new()
700 .push_batch_reader(Box::new(reader1))
701 .push_batch_iter(Box::new(reader2))
702 .build()
703 .await
704 .unwrap();
705 check_reader_result(
706 &mut reader,
707 &[
708 new_batch(
709 b"k1",
710 &[1, 2],
711 &[11, 12],
712 &[OpType::Delete, OpType::Delete],
713 &[21, 22],
714 ),
715 new_batch(
716 b"k1",
717 &[4, 5],
718 &[14, 15],
719 &[OpType::Delete, OpType::Delete],
720 &[24, 25],
721 ),
722 new_batch(
723 b"k2",
724 &[2, 3],
725 &[12, 13],
726 &[OpType::Delete, OpType::Put],
727 &[22, 23],
728 ),
729 ],
730 )
731 .await;
732 }
733
734 #[tokio::test]
735 async fn test_merge_next_node_empty() {
736 let reader1 = VecBatchReader::new(&[new_batch(
737 b"k1",
738 &[1, 2],
739 &[11, 12],
740 &[OpType::Put, OpType::Put],
741 &[21, 22],
742 )]);
743 let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
744 let mut reader = MergeReaderBuilder::new()
745 .push_batch_reader(Box::new(reader1))
746 .push_batch_iter(Box::new(reader2))
747 .build()
748 .await
749 .unwrap();
750 check_reader_result(
751 &mut reader,
752 &[
753 new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
754 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
755 new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
756 ],
757 )
758 .await;
759 }
760
761 #[tokio::test]
762 async fn test_merge_top_node_empty() {
763 let reader1 = VecBatchReader::new(&[new_batch(
764 b"k1",
765 &[1, 2],
766 &[10, 10],
767 &[OpType::Put, OpType::Put],
768 &[21, 22],
769 )]);
770 let reader2 = VecBatchReader::new(&[new_batch(
771 b"k1",
772 &[2, 3],
773 &[11, 11],
774 &[OpType::Put, OpType::Put],
775 &[32, 33],
776 )]);
777 let mut reader = MergeReaderBuilder::new()
778 .push_batch_reader(Box::new(reader1))
779 .push_batch_iter(Box::new(reader2))
780 .build()
781 .await
782 .unwrap();
783 check_reader_result(
784 &mut reader,
785 &[
786 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
787 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
788 new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
789 new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
790 ],
791 )
792 .await;
793 }
794
795 #[tokio::test]
796 async fn test_merge_large_range() {
797 let reader1 = VecBatchReader::new(&[new_batch(
798 b"k1",
799 &[1, 10],
800 &[10, 10],
801 &[OpType::Put, OpType::Put],
802 &[21, 30],
803 )]);
804 let reader2 = VecBatchReader::new(&[new_batch(
805 b"k1",
806 &[1, 20],
807 &[11, 11],
808 &[OpType::Put, OpType::Put],
809 &[31, 40],
810 )]);
811 let reader3 = VecBatchReader::new(&[new_batch(
814 b"k1",
815 &[6, 8],
816 &[11, 11],
817 &[OpType::Put, OpType::Put],
818 &[36, 38],
819 )]);
820 let mut reader = MergeReaderBuilder::new()
821 .push_batch_reader(Box::new(reader1))
822 .push_batch_iter(Box::new(reader2))
823 .push_batch_reader(Box::new(reader3))
824 .build()
825 .await
826 .unwrap();
827 check_reader_result(
828 &mut reader,
829 &[
830 new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
831 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
832 new_batch(
833 b"k1",
834 &[6, 8],
835 &[11, 11],
836 &[OpType::Put, OpType::Put],
837 &[36, 38],
838 ),
839 new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
840 new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
841 ],
842 )
843 .await;
844 }
845
846 #[tokio::test]
847 async fn test_merge_many_duplicates() {
848 let mut builder = MergeReaderBuilder::new();
849 for i in 0..10 {
850 let batches: Vec<_> = (0..8)
851 .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
852 .collect();
853 let reader = VecBatchReader::new(&batches);
854 builder.push_batch_reader(Box::new(reader));
855 }
856 let mut reader = builder.build().await.unwrap();
857 let mut expect = Vec::with_capacity(80);
858 for ts in 0..8 {
859 for i in 0..10 {
860 let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
861 expect.push(batch);
862 }
863 }
864 check_reader_result(&mut reader, &expect).await;
865 }
866
867 #[tokio::test]
868 async fn test_merge_keep_duplicate() {
869 let reader1 = VecBatchReader::new(&[new_batch(
870 b"k1",
871 &[1, 2],
872 &[10, 10],
873 &[OpType::Put, OpType::Put],
874 &[21, 22],
875 )]);
876 let reader2 = VecBatchReader::new(&[new_batch(
877 b"k1",
878 &[2, 3],
879 &[11, 11],
880 &[OpType::Put, OpType::Put],
881 &[32, 33],
882 )]);
883 let sources = vec![
884 Source::Reader(Box::new(reader1)),
885 Source::Iter(Box::new(reader2)),
886 ];
887 let mut reader = MergeReaderBuilder::from_sources(sources)
888 .build()
889 .await
890 .unwrap();
891 check_reader_result(
892 &mut reader,
893 &[
894 new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
895 new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
896 new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
897 new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
898 ],
899 )
900 .await;
901 }
902}