mito2/read/
merge.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Merge reader implementation.
16
17use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::mem;
20use std::time::{Duration, Instant};
21
22use async_trait::async_trait;
23use common_telemetry::debug;
24
25use crate::error::Result;
26use crate::memtable::BoxedBatchIterator;
27use crate::metrics::READ_STAGE_ELAPSED;
28use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
29
30/// Reader to merge sorted batches.
31///
32/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
33/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
34///    ignore op type as sequence is already unique).
35/// 2. Batches from sources **must** not be empty.
36///
37/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
38/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
39/// as the first one in the next batch.
40pub struct MergeReader {
41    /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
42    /// Each node yields batches from a `source`.
43    ///
44    /// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp)
45    /// range of the **root node** in the `hot` heap.
46    hot: BinaryHeap<Node>,
47    /// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
48    ///
49    /// `Node` in this heap **must** not be empty.
50    cold: BinaryHeap<Node>,
51    /// Batch to output.
52    output_batch: Option<Batch>,
53    /// Local metrics.
54    metrics: Metrics,
55}
56
57#[async_trait]
58impl BatchReader for MergeReader {
59    async fn next_batch(&mut self) -> Result<Option<Batch>> {
60        let start = Instant::now();
61        while !self.hot.is_empty() && self.output_batch.is_none() {
62            if self.hot.len() == 1 {
63                // No need to do merge sort if only one batch in the hot heap.
64                self.fetch_batch_from_hottest().await?;
65                self.metrics.num_fetch_by_batches += 1;
66            } else {
67                // We could only fetch rows that less than the next node from the hottest node.
68                self.fetch_rows_from_hottest().await?;
69                self.metrics.num_fetch_by_rows += 1;
70            }
71        }
72
73        if let Some(batch) = self.output_batch.take() {
74            self.metrics.scan_cost += start.elapsed();
75            self.metrics.num_output_rows += batch.num_rows();
76            Ok(Some(batch))
77        } else {
78            // Nothing fetched.
79            self.metrics.scan_cost += start.elapsed();
80            Ok(None)
81        }
82    }
83}
84
85impl Drop for MergeReader {
86    fn drop(&mut self) {
87        debug!("Merge reader finished, metrics: {:?}", self.metrics);
88
89        READ_STAGE_ELAPSED
90            .with_label_values(&["merge"])
91            .observe(self.metrics.scan_cost.as_secs_f64());
92        READ_STAGE_ELAPSED
93            .with_label_values(&["merge_fetch"])
94            .observe(self.metrics.fetch_cost.as_secs_f64());
95    }
96}
97
98impl MergeReader {
99    /// Creates and initializes a new [MergeReader].
100    pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
101        let start = Instant::now();
102        let mut metrics = Metrics::default();
103
104        let mut cold = BinaryHeap::with_capacity(sources.len());
105        let hot = BinaryHeap::with_capacity(sources.len());
106        for source in sources {
107            let node = Node::new(source, &mut metrics).await?;
108            if !node.is_eof() {
109                // Ensure `cold` don't have eof nodes.
110                cold.push(node);
111            }
112        }
113
114        let mut reader = MergeReader {
115            hot,
116            cold,
117            output_batch: None,
118            metrics,
119        };
120        // Initializes the reader.
121        reader.refill_hot();
122
123        reader.metrics.scan_cost += start.elapsed();
124        Ok(reader)
125    }
126
127    /// Moves nodes in `cold` heap, whose key range is overlapped with current merge
128    /// window to `hot` heap.
129    fn refill_hot(&mut self) {
130        while !self.cold.is_empty() {
131            if let Some(merge_window) = self.hot.peek() {
132                let warmest = self.cold.peek().unwrap();
133                if warmest.is_behind(merge_window) {
134                    // if the warmest node in the `cold` heap is totally after the
135                    // `merge_window`, then no need to add more nodes into the `hot`
136                    // heap for merge sorting.
137                    break;
138                }
139            }
140
141            let warmest = self.cold.pop().unwrap();
142            self.hot.push(warmest);
143        }
144    }
145
146    /// Fetches one batch from the hottest node.
147    async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
148        assert_eq!(1, self.hot.len());
149
150        let mut hottest = self.hot.pop().unwrap();
151        let batch = hottest.fetch_batch(&mut self.metrics).await?;
152        Self::maybe_output_batch(batch, &mut self.output_batch)?;
153        self.reheap(hottest)
154    }
155
156    /// Fetches non-duplicated rows from the hottest node.
157    async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
158        // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
159        // Pop hottest node.
160        let mut top_node = self.hot.pop().unwrap();
161        let top = top_node.current_batch();
162        // Min timestamp and its sequence in the next batch.
163        let next_min_ts = {
164            let next_node = self.hot.peek().unwrap();
165            let next = next_node.current_batch();
166            // top and next have overlapping rows so they must have same primary keys.
167            debug_assert_eq!(top.primary_key(), next.primary_key());
168            // Safety: Batches in the heap is not empty, so we can use unwrap here.
169            next.first_timestamp().unwrap()
170        };
171
172        // Safety: Batches in the heap is not empty, so we can use unwrap here.
173        let timestamps = top.timestamps_native().unwrap();
174        // Binary searches the timestamp in the top batch.
175        // Safety: Batches should have the same timestamp resolution so we can compare the native
176        // value directly.
177        let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
178            Ok(pos) => pos,
179            Err(pos) => {
180                // No duplicate timestamp. Outputs timestamp before `pos`.
181                Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
182                top_node.skip_rows(pos, &mut self.metrics).await?;
183                return self.reheap(top_node);
184            }
185        };
186
187        // No need to remove duplicate timestamps.
188        let output_end = if duplicate_pos == 0 {
189            // If the first timestamp of the top node is duplicate. We can simply return the first row
190            // as the heap ensure it is the one with largest sequence.
191            1
192        } else {
193            // We don't know which one has the larger sequence so we use the range before
194            // the duplicate pos.
195            duplicate_pos
196        };
197        Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
198        top_node.skip_rows(output_end, &mut self.metrics).await?;
199        self.reheap(top_node)
200    }
201
202    /// Push the node popped from `hot` back to a proper heap.
203    fn reheap(&mut self, node: Node) -> Result<()> {
204        if node.is_eof() {
205            // If the node is EOF, don't put it into the heap again.
206            // The merge window would be updated, need to refill the hot heap.
207            self.refill_hot();
208        } else {
209            // Find a proper heap for this node.
210            let node_is_cold = if let Some(hottest) = self.hot.peek() {
211                // If key range of this node is behind the hottest node's then we can
212                // push it to the cold heap. Otherwise we should push it to the hot heap.
213                node.is_behind(hottest)
214            } else {
215                // The hot heap is empty, but we don't known whether the current
216                // batch of this node is still the hottest.
217                true
218            };
219
220            if node_is_cold {
221                self.cold.push(node);
222            } else {
223                self.hot.push(node);
224            }
225            // Anyway, the merge window has been changed, we need to refill the hot heap.
226            self.refill_hot();
227        }
228
229        Ok(())
230    }
231
232    /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
233    ///
234    /// Ignores the `batch` if it is empty.
235    fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
236        debug_assert!(output_batch.is_none());
237        if batch.is_empty() {
238            return Ok(());
239        }
240        *output_batch = Some(batch);
241
242        Ok(())
243    }
244}
245
246/// Builder to build and initialize a [MergeReader].
247#[derive(Default)]
248pub struct MergeReaderBuilder {
249    /// Input sources.
250    ///
251    /// All source must yield batches with the same schema.
252    sources: Vec<Source>,
253}
254
255impl MergeReaderBuilder {
256    /// Returns an empty builder.
257    pub fn new() -> MergeReaderBuilder {
258        MergeReaderBuilder::default()
259    }
260
261    /// Creates a builder from sources.
262    pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
263        MergeReaderBuilder { sources }
264    }
265
266    /// Pushes a batch reader to sources.
267    pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
268        self.sources.push(Source::Reader(reader));
269        self
270    }
271
272    /// Pushes a batch iterator to sources.
273    pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
274        self.sources.push(Source::Iter(iter));
275        self
276    }
277
278    /// Builds and initializes the reader, then resets the builder.
279    pub async fn build(&mut self) -> Result<MergeReader> {
280        let sources = mem::take(&mut self.sources);
281        MergeReader::new(sources).await
282    }
283}
284
285/// Metrics for the merge reader.
286#[derive(Debug, Default)]
287struct Metrics {
288    /// Total scan cost of the reader.
289    scan_cost: Duration,
290    /// Number of times to fetch batches.
291    num_fetch_by_batches: usize,
292    /// Number of times to fetch rows.
293    num_fetch_by_rows: usize,
294    /// Number of input rows.
295    num_input_rows: usize,
296    /// Number of output rows.
297    num_output_rows: usize,
298    /// Cost to fetch batches from sources.
299    fetch_cost: Duration,
300}
301
302/// A `Node` represent an individual input data source to be merged.
303struct Node {
304    /// Data source of this `Node`.
305    source: Source,
306    /// Current batch to be read. The node ensures the batch is not empty.
307    ///
308    /// `None` means the `source` has reached EOF.
309    current_batch: Option<CompareFirst>,
310}
311
312impl Node {
313    /// Initialize a node.
314    ///
315    /// It tries to fetch one batch from the `source`.
316    async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
317        // Ensures batch is not empty.
318        let start = Instant::now();
319        let current_batch = source.next_batch().await?.map(CompareFirst);
320        metrics.fetch_cost += start.elapsed();
321        metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
322
323        Ok(Node {
324            source,
325            current_batch,
326        })
327    }
328
329    /// Returns whether the node still has batch to read.
330    fn is_eof(&self) -> bool {
331        self.current_batch.is_none()
332    }
333
334    /// Returns the primary key of current batch.
335    ///
336    /// # Panics
337    /// Panics if the node has reached EOF.
338    fn primary_key(&self) -> &[u8] {
339        self.current_batch().primary_key()
340    }
341
342    /// Returns current batch.
343    ///
344    /// # Panics
345    /// Panics if the node has reached EOF.
346    fn current_batch(&self) -> &Batch {
347        &self.current_batch.as_ref().unwrap().0
348    }
349
350    /// Returns current batch and fetches next batch
351    /// from the source.
352    ///
353    /// # Panics
354    /// Panics if the node has reached EOF.
355    async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
356        let current = self.current_batch.take().unwrap();
357        let start = Instant::now();
358        // Ensures batch is not empty.
359        self.current_batch = self.source.next_batch().await?.map(CompareFirst);
360        metrics.fetch_cost += start.elapsed();
361        metrics.num_input_rows += self
362            .current_batch
363            .as_ref()
364            .map(|b| b.0.num_rows())
365            .unwrap_or(0);
366        Ok(current.0)
367    }
368
369    /// Returns true if the key range of current batch in `self` is behind (exclusive) current
370    /// batch in `other`.
371    ///
372    /// # Panics
373    /// Panics if either `self` or `other` is EOF.
374    fn is_behind(&self, other: &Node) -> bool {
375        debug_assert!(!self.current_batch().is_empty());
376        debug_assert!(!other.current_batch().is_empty());
377
378        // We only compare pk and timestamp so nodes in the cold
379        // heap don't have overlapping timestamps with the hottest node
380        // in the hot heap.
381        self.primary_key().cmp(other.primary_key()).then_with(|| {
382            self.current_batch()
383                .first_timestamp()
384                .cmp(&other.current_batch().last_timestamp())
385        }) == Ordering::Greater
386    }
387
388    /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches
389    /// next batch from the node.
390    ///
391    /// # Panics
392    /// Panics if the node is EOF.
393    async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> {
394        let batch = self.current_batch();
395        debug_assert!(batch.num_rows() >= num_to_skip);
396
397        let remaining = batch.num_rows() - num_to_skip;
398        if remaining == 0 {
399            // Nothing remains, we need to fetch next batch to ensure the batch is not empty.
400            self.fetch_batch(metrics).await?;
401        } else {
402            debug_assert!(!batch.is_empty());
403            self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
404        }
405
406        Ok(())
407    }
408}
409
410impl PartialEq for Node {
411    fn eq(&self, other: &Node) -> bool {
412        self.current_batch == other.current_batch
413    }
414}
415
416impl Eq for Node {}
417
418impl PartialOrd for Node {
419    fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
420        Some(self.cmp(other))
421    }
422}
423
424impl Ord for Node {
425    fn cmp(&self, other: &Node) -> Ordering {
426        // The std binary heap is a max heap, but we want the nodes are ordered in
427        // ascend order, so we compare the nodes in reverse order.
428        other.current_batch.cmp(&self.current_batch)
429    }
430}
431
432/// Type to compare [Batch] by first row.
433///
434/// It ignores op type as sequence is enough to distinguish different rows.
435struct CompareFirst(Batch);
436
437impl PartialEq for CompareFirst {
438    fn eq(&self, other: &Self) -> bool {
439        self.0.primary_key() == other.0.primary_key()
440            && self.0.first_timestamp() == other.0.first_timestamp()
441            && self.0.first_sequence() == other.0.first_sequence()
442    }
443}
444
445impl Eq for CompareFirst {}
446
447impl PartialOrd for CompareFirst {
448    fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
449        Some(self.cmp(other))
450    }
451}
452
453impl Ord for CompareFirst {
454    /// Compares by primary key, time index, sequence desc.
455    fn cmp(&self, other: &CompareFirst) -> Ordering {
456        self.0
457            .primary_key()
458            .cmp(other.0.primary_key())
459            .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
460            .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use api::v1::OpType;
467
468    use super::*;
469    use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
470
471    #[tokio::test]
472    async fn test_merge_reader_empty() {
473        let mut reader = MergeReaderBuilder::new().build().await.unwrap();
474        assert!(reader.next_batch().await.unwrap().is_none());
475        assert!(reader.next_batch().await.unwrap().is_none());
476    }
477
478    #[tokio::test]
479    async fn test_merge_non_overlapping() {
480        let reader1 = VecBatchReader::new(&[
481            new_batch(
482                b"k1",
483                &[1, 2],
484                &[11, 12],
485                &[OpType::Put, OpType::Put],
486                &[21, 22],
487            ),
488            new_batch(
489                b"k1",
490                &[7, 8],
491                &[17, 18],
492                &[OpType::Put, OpType::Delete],
493                &[27, 28],
494            ),
495            new_batch(
496                b"k2",
497                &[2, 3],
498                &[12, 13],
499                &[OpType::Delete, OpType::Put],
500                &[22, 23],
501            ),
502        ]);
503        let reader2 = VecBatchReader::new(&[new_batch(
504            b"k1",
505            &[4, 5],
506            &[14, 15],
507            &[OpType::Put, OpType::Put],
508            &[24, 25],
509        )]);
510        let mut reader = MergeReaderBuilder::new()
511            .push_batch_reader(Box::new(reader1))
512            .push_batch_iter(Box::new(reader2))
513            .build()
514            .await
515            .unwrap();
516        check_reader_result(
517            &mut reader,
518            &[
519                new_batch(
520                    b"k1",
521                    &[1, 2],
522                    &[11, 12],
523                    &[OpType::Put, OpType::Put],
524                    &[21, 22],
525                ),
526                new_batch(
527                    b"k1",
528                    &[4, 5],
529                    &[14, 15],
530                    &[OpType::Put, OpType::Put],
531                    &[24, 25],
532                ),
533                new_batch(
534                    b"k1",
535                    &[7, 8],
536                    &[17, 18],
537                    &[OpType::Put, OpType::Delete],
538                    &[27, 28],
539                ),
540                new_batch(
541                    b"k2",
542                    &[2, 3],
543                    &[12, 13],
544                    &[OpType::Delete, OpType::Put],
545                    &[22, 23],
546                ),
547            ],
548        )
549        .await;
550
551        assert_eq!(8, reader.metrics.num_input_rows);
552        assert_eq!(8, reader.metrics.num_output_rows);
553    }
554
555    #[tokio::test]
556    async fn test_merge_reheap_hot() {
557        let reader1 = VecBatchReader::new(&[
558            new_batch(
559                b"k1",
560                &[1, 3],
561                &[10, 10],
562                &[OpType::Put, OpType::Put],
563                &[21, 23],
564            ),
565            new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
566        ]);
567        let reader2 = VecBatchReader::new(&[new_batch(
568            b"k1",
569            &[2, 4],
570            &[11, 11],
571            &[OpType::Put, OpType::Put],
572            &[32, 34],
573        )]);
574        let mut reader = MergeReaderBuilder::new()
575            .push_batch_reader(Box::new(reader1))
576            .push_batch_iter(Box::new(reader2))
577            .build()
578            .await
579            .unwrap();
580        check_reader_result(
581            &mut reader,
582            &[
583                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
584                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
585                new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
586                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
587                new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
588            ],
589        )
590        .await;
591    }
592
593    #[tokio::test]
594    async fn test_merge_overlapping() {
595        let reader1 = VecBatchReader::new(&[
596            new_batch(
597                b"k1",
598                &[1, 2],
599                &[11, 12],
600                &[OpType::Put, OpType::Put],
601                &[21, 22],
602            ),
603            new_batch(
604                b"k1",
605                &[4, 5],
606                &[14, 15],
607                // This override 4 and deletes 5.
608                &[OpType::Put, OpType::Delete],
609                &[24, 25],
610            ),
611            new_batch(
612                b"k2",
613                &[2, 3],
614                &[12, 13],
615                // This delete 2.
616                &[OpType::Delete, OpType::Put],
617                &[22, 23],
618            ),
619        ]);
620        let reader2 = VecBatchReader::new(&[
621            new_batch(
622                b"k1",
623                &[3, 4, 5],
624                &[10, 10, 10],
625                &[OpType::Put, OpType::Put, OpType::Put],
626                &[33, 34, 35],
627            ),
628            new_batch(
629                b"k2",
630                &[1, 10],
631                &[11, 20],
632                &[OpType::Put, OpType::Put],
633                &[21, 30],
634            ),
635        ]);
636        let mut reader = MergeReaderBuilder::new()
637            .push_batch_reader(Box::new(reader1))
638            .push_batch_iter(Box::new(reader2))
639            .build()
640            .await
641            .unwrap();
642        check_reader_result(
643            &mut reader,
644            &[
645                new_batch(
646                    b"k1",
647                    &[1, 2],
648                    &[11, 12],
649                    &[OpType::Put, OpType::Put],
650                    &[21, 22],
651                ),
652                new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
653                new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
654                new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
655                new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
656                new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
657                new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
658                new_batch(
659                    b"k2",
660                    &[2, 3],
661                    &[12, 13],
662                    &[OpType::Delete, OpType::Put],
663                    &[22, 23],
664                ),
665                new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
666            ],
667        )
668        .await;
669
670        assert_eq!(11, reader.metrics.num_input_rows);
671        assert_eq!(11, reader.metrics.num_output_rows);
672    }
673
674    #[tokio::test]
675    async fn test_merge_deleted() {
676        let reader1 = VecBatchReader::new(&[
677            new_batch(
678                b"k1",
679                &[1, 2],
680                &[11, 12],
681                &[OpType::Delete, OpType::Delete],
682                &[21, 22],
683            ),
684            new_batch(
685                b"k2",
686                &[2, 3],
687                &[12, 13],
688                &[OpType::Delete, OpType::Put],
689                &[22, 23],
690            ),
691        ]);
692        let reader2 = VecBatchReader::new(&[new_batch(
693            b"k1",
694            &[4, 5],
695            &[14, 15],
696            &[OpType::Delete, OpType::Delete],
697            &[24, 25],
698        )]);
699        let mut reader = MergeReaderBuilder::new()
700            .push_batch_reader(Box::new(reader1))
701            .push_batch_iter(Box::new(reader2))
702            .build()
703            .await
704            .unwrap();
705        check_reader_result(
706            &mut reader,
707            &[
708                new_batch(
709                    b"k1",
710                    &[1, 2],
711                    &[11, 12],
712                    &[OpType::Delete, OpType::Delete],
713                    &[21, 22],
714                ),
715                new_batch(
716                    b"k1",
717                    &[4, 5],
718                    &[14, 15],
719                    &[OpType::Delete, OpType::Delete],
720                    &[24, 25],
721                ),
722                new_batch(
723                    b"k2",
724                    &[2, 3],
725                    &[12, 13],
726                    &[OpType::Delete, OpType::Put],
727                    &[22, 23],
728                ),
729            ],
730        )
731        .await;
732    }
733
734    #[tokio::test]
735    async fn test_merge_next_node_empty() {
736        let reader1 = VecBatchReader::new(&[new_batch(
737            b"k1",
738            &[1, 2],
739            &[11, 12],
740            &[OpType::Put, OpType::Put],
741            &[21, 22],
742        )]);
743        let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
744        let mut reader = MergeReaderBuilder::new()
745            .push_batch_reader(Box::new(reader1))
746            .push_batch_iter(Box::new(reader2))
747            .build()
748            .await
749            .unwrap();
750        check_reader_result(
751            &mut reader,
752            &[
753                new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
754                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
755                new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
756            ],
757        )
758        .await;
759    }
760
761    #[tokio::test]
762    async fn test_merge_top_node_empty() {
763        let reader1 = VecBatchReader::new(&[new_batch(
764            b"k1",
765            &[1, 2],
766            &[10, 10],
767            &[OpType::Put, OpType::Put],
768            &[21, 22],
769        )]);
770        let reader2 = VecBatchReader::new(&[new_batch(
771            b"k1",
772            &[2, 3],
773            &[11, 11],
774            &[OpType::Put, OpType::Put],
775            &[32, 33],
776        )]);
777        let mut reader = MergeReaderBuilder::new()
778            .push_batch_reader(Box::new(reader1))
779            .push_batch_iter(Box::new(reader2))
780            .build()
781            .await
782            .unwrap();
783        check_reader_result(
784            &mut reader,
785            &[
786                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
787                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
788                new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
789                new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
790            ],
791        )
792        .await;
793    }
794
795    #[tokio::test]
796    async fn test_merge_large_range() {
797        let reader1 = VecBatchReader::new(&[new_batch(
798            b"k1",
799            &[1, 10],
800            &[10, 10],
801            &[OpType::Put, OpType::Put],
802            &[21, 30],
803        )]);
804        let reader2 = VecBatchReader::new(&[new_batch(
805            b"k1",
806            &[1, 20],
807            &[11, 11],
808            &[OpType::Put, OpType::Put],
809            &[31, 40],
810        )]);
811        // The hot heap have a node that doesn't have duplicate
812        // timestamps.
813        let reader3 = VecBatchReader::new(&[new_batch(
814            b"k1",
815            &[6, 8],
816            &[11, 11],
817            &[OpType::Put, OpType::Put],
818            &[36, 38],
819        )]);
820        let mut reader = MergeReaderBuilder::new()
821            .push_batch_reader(Box::new(reader1))
822            .push_batch_iter(Box::new(reader2))
823            .push_batch_reader(Box::new(reader3))
824            .build()
825            .await
826            .unwrap();
827        check_reader_result(
828            &mut reader,
829            &[
830                new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
831                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
832                new_batch(
833                    b"k1",
834                    &[6, 8],
835                    &[11, 11],
836                    &[OpType::Put, OpType::Put],
837                    &[36, 38],
838                ),
839                new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
840                new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
841            ],
842        )
843        .await;
844    }
845
846    #[tokio::test]
847    async fn test_merge_many_duplicates() {
848        let mut builder = MergeReaderBuilder::new();
849        for i in 0..10 {
850            let batches: Vec<_> = (0..8)
851                .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
852                .collect();
853            let reader = VecBatchReader::new(&batches);
854            builder.push_batch_reader(Box::new(reader));
855        }
856        let mut reader = builder.build().await.unwrap();
857        let mut expect = Vec::with_capacity(80);
858        for ts in 0..8 {
859            for i in 0..10 {
860                let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
861                expect.push(batch);
862            }
863        }
864        check_reader_result(&mut reader, &expect).await;
865    }
866
867    #[tokio::test]
868    async fn test_merge_keep_duplicate() {
869        let reader1 = VecBatchReader::new(&[new_batch(
870            b"k1",
871            &[1, 2],
872            &[10, 10],
873            &[OpType::Put, OpType::Put],
874            &[21, 22],
875        )]);
876        let reader2 = VecBatchReader::new(&[new_batch(
877            b"k1",
878            &[2, 3],
879            &[11, 11],
880            &[OpType::Put, OpType::Put],
881            &[32, 33],
882        )]);
883        let sources = vec![
884            Source::Reader(Box::new(reader1)),
885            Source::Iter(Box::new(reader2)),
886        ];
887        let mut reader = MergeReaderBuilder::from_sources(sources)
888            .build()
889            .await
890            .unwrap();
891        check_reader_result(
892            &mut reader,
893            &[
894                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
895                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
896                new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
897                new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
898            ],
899        )
900        .await;
901    }
902}