mito2/read/
merge.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Merge reader implementation.
16
17use std::cmp::Ordering;
18use std::collections::BinaryHeap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use std::{fmt, mem};
22
23use async_trait::async_trait;
24use common_telemetry::debug;
25
26use crate::error::Result;
27use crate::memtable::BoxedBatchIterator;
28use crate::metrics::READ_STAGE_ELAPSED;
29use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
30
31/// Trait for reporting merge metrics.
32pub trait MergeMetricsReport: Send + Sync {
33    /// Reports and resets the metrics.
34    fn report(&self, metrics: &mut MergeMetrics);
35}
36
37/// Reader to merge sorted batches.
38///
39/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
40/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
41///    ignore op type as sequence is already unique).
42/// 2. Batches from sources **must** not be empty.
43///
44/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
45/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
46/// as the first one in the next batch.
47pub struct MergeReader {
48    /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
49    /// Each node yields batches from a `source`.
50    ///
51    /// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp)
52    /// range of the **root node** in the `hot` heap.
53    hot: BinaryHeap<Node>,
54    /// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
55    ///
56    /// `Node` in this heap **must** not be empty.
57    cold: BinaryHeap<Node>,
58    /// Batch to output.
59    output_batch: Option<Batch>,
60    /// Local metrics.
61    metrics: MergeMetrics,
62    /// Optional metrics reporter.
63    metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
64}
65
66#[async_trait]
67impl BatchReader for MergeReader {
68    async fn next_batch(&mut self) -> Result<Option<Batch>> {
69        let start = Instant::now();
70        while !self.hot.is_empty() && self.output_batch.is_none() {
71            if self.hot.len() == 1 {
72                // No need to do merge sort if only one batch in the hot heap.
73                self.fetch_batch_from_hottest().await?;
74                self.metrics.num_fetch_by_batches += 1;
75            } else {
76                // We could only fetch rows that less than the next node from the hottest node.
77                self.fetch_rows_from_hottest().await?;
78                self.metrics.num_fetch_by_rows += 1;
79            }
80        }
81
82        if let Some(batch) = self.output_batch.take() {
83            self.metrics.scan_cost += start.elapsed();
84            self.metrics.maybe_report(&self.metrics_reporter);
85            Ok(Some(batch))
86        } else {
87            // Nothing fetched.
88            self.metrics.scan_cost += start.elapsed();
89            self.metrics.maybe_report(&self.metrics_reporter);
90            Ok(None)
91        }
92    }
93}
94
95impl Drop for MergeReader {
96    fn drop(&mut self) {
97        debug!("Merge reader finished, metrics: {:?}", self.metrics);
98
99        READ_STAGE_ELAPSED
100            .with_label_values(&["merge"])
101            .observe(self.metrics.scan_cost.as_secs_f64());
102        READ_STAGE_ELAPSED
103            .with_label_values(&["merge_fetch"])
104            .observe(self.metrics.fetch_cost.as_secs_f64());
105
106        // Report any remaining metrics.
107        if let Some(reporter) = &self.metrics_reporter {
108            reporter.report(&mut self.metrics);
109        }
110    }
111}
112
113impl MergeReader {
114    /// Creates and initializes a new [MergeReader].
115    pub async fn new(
116        sources: Vec<Source>,
117        metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
118    ) -> Result<MergeReader> {
119        let start = Instant::now();
120        let mut metrics = MergeMetrics::default();
121
122        let mut cold = BinaryHeap::with_capacity(sources.len());
123        let hot = BinaryHeap::with_capacity(sources.len());
124        for source in sources {
125            let node = Node::new(source, &mut metrics).await?;
126            if !node.is_eof() {
127                // Ensure `cold` don't have eof nodes.
128                cold.push(node);
129            }
130        }
131
132        let mut reader = MergeReader {
133            hot,
134            cold,
135            output_batch: None,
136            metrics,
137            metrics_reporter,
138        };
139        // Initializes the reader.
140        reader.refill_hot();
141
142        let elapsed = start.elapsed();
143        reader.metrics.init_cost += elapsed;
144        reader.metrics.scan_cost += elapsed;
145        Ok(reader)
146    }
147
148    /// Moves nodes in `cold` heap, whose key range is overlapped with current merge
149    /// window to `hot` heap.
150    fn refill_hot(&mut self) {
151        while !self.cold.is_empty() {
152            if let Some(merge_window) = self.hot.peek() {
153                let warmest = self.cold.peek().unwrap();
154                if warmest.is_behind(merge_window) {
155                    // if the warmest node in the `cold` heap is totally after the
156                    // `merge_window`, then no need to add more nodes into the `hot`
157                    // heap for merge sorting.
158                    break;
159                }
160            }
161
162            let warmest = self.cold.pop().unwrap();
163            self.hot.push(warmest);
164        }
165    }
166
167    /// Fetches one batch from the hottest node.
168    async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
169        assert_eq!(1, self.hot.len());
170
171        let mut hottest = self.hot.pop().unwrap();
172        let batch = hottest.fetch_batch(&mut self.metrics).await?;
173        Self::maybe_output_batch(batch, &mut self.output_batch)?;
174        self.reheap(hottest)
175    }
176
177    /// Fetches non-duplicated rows from the hottest node.
178    async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
179        // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
180        // Pop hottest node.
181        let mut top_node = self.hot.pop().unwrap();
182        let top = top_node.current_batch();
183        // Min timestamp and its sequence in the next batch.
184        let next_min_ts = {
185            let next_node = self.hot.peek().unwrap();
186            let next = next_node.current_batch();
187            // top and next have overlapping rows so they must have same primary keys.
188            debug_assert_eq!(top.primary_key(), next.primary_key());
189            // Safety: Batches in the heap is not empty, so we can use unwrap here.
190            next.first_timestamp().unwrap()
191        };
192
193        // Safety: Batches in the heap is not empty, so we can use unwrap here.
194        let timestamps = top.timestamps_native().unwrap();
195        // Binary searches the timestamp in the top batch.
196        // Safety: Batches should have the same timestamp resolution so we can compare the native
197        // value directly.
198        let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
199            Ok(pos) => pos,
200            Err(pos) => {
201                // No duplicate timestamp. Outputs timestamp before `pos`.
202                Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
203                top_node.skip_rows(pos, &mut self.metrics).await?;
204                return self.reheap(top_node);
205            }
206        };
207
208        // No need to remove duplicate timestamps.
209        let output_end = if duplicate_pos == 0 {
210            // If the first timestamp of the top node is duplicate. We can simply return the first row
211            // as the heap ensure it is the one with largest sequence.
212            1
213        } else {
214            // We don't know which one has the larger sequence so we use the range before
215            // the duplicate pos.
216            duplicate_pos
217        };
218        Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
219        top_node.skip_rows(output_end, &mut self.metrics).await?;
220        self.reheap(top_node)
221    }
222
223    /// Push the node popped from `hot` back to a proper heap.
224    fn reheap(&mut self, node: Node) -> Result<()> {
225        if node.is_eof() {
226            // If the node is EOF, don't put it into the heap again.
227            // The merge window would be updated, need to refill the hot heap.
228            self.refill_hot();
229        } else {
230            // Find a proper heap for this node.
231            let node_is_cold = if let Some(hottest) = self.hot.peek() {
232                // If key range of this node is behind the hottest node's then we can
233                // push it to the cold heap. Otherwise we should push it to the hot heap.
234                node.is_behind(hottest)
235            } else {
236                // The hot heap is empty, but we don't known whether the current
237                // batch of this node is still the hottest.
238                true
239            };
240
241            if node_is_cold {
242                self.cold.push(node);
243            } else {
244                self.hot.push(node);
245            }
246            // Anyway, the merge window has been changed, we need to refill the hot heap.
247            self.refill_hot();
248        }
249
250        Ok(())
251    }
252
253    /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
254    ///
255    /// Ignores the `batch` if it is empty.
256    fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
257        debug_assert!(output_batch.is_none());
258        if batch.is_empty() {
259            return Ok(());
260        }
261        *output_batch = Some(batch);
262
263        Ok(())
264    }
265}
266
267/// Builder to build and initialize a [MergeReader].
268#[derive(Default)]
269pub struct MergeReaderBuilder {
270    /// Input sources.
271    ///
272    /// All source must yield batches with the same schema.
273    sources: Vec<Source>,
274    /// Optional metrics reporter.
275    metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
276}
277
278impl MergeReaderBuilder {
279    /// Returns an empty builder.
280    pub fn new() -> MergeReaderBuilder {
281        MergeReaderBuilder::default()
282    }
283
284    /// Creates a builder from sources.
285    pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
286        MergeReaderBuilder {
287            sources,
288            metrics_reporter: None,
289        }
290    }
291
292    /// Pushes a batch reader to sources.
293    pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
294        self.sources.push(Source::Reader(reader));
295        self
296    }
297
298    /// Pushes a batch iterator to sources.
299    pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
300        self.sources.push(Source::Iter(iter));
301        self
302    }
303
304    /// Sets the metrics reporter.
305    pub fn with_metrics_reporter(
306        &mut self,
307        reporter: Option<Arc<dyn MergeMetricsReport>>,
308    ) -> &mut Self {
309        self.metrics_reporter = reporter;
310        self
311    }
312
313    /// Builds and initializes the reader, then resets the builder.
314    pub async fn build(&mut self) -> Result<MergeReader> {
315        let sources = mem::take(&mut self.sources);
316        let metrics_reporter = self.metrics_reporter.take();
317        MergeReader::new(sources, metrics_reporter).await
318    }
319}
320
321/// Metrics for the merge reader.
322#[derive(Default)]
323pub struct MergeMetrics {
324    /// Cost to initialize the reader.
325    pub(crate) init_cost: Duration,
326    /// Total scan cost of the reader.
327    pub(crate) scan_cost: Duration,
328    /// Number of times to fetch batches.
329    pub(crate) num_fetch_by_batches: usize,
330    /// Number of times to fetch rows.
331    pub(crate) num_fetch_by_rows: usize,
332    /// Cost to fetch batches from sources.
333    pub(crate) fetch_cost: Duration,
334}
335
336impl fmt::Debug for MergeMetrics {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        // Skip output if scan_cost is zero
339        if self.scan_cost.is_zero() {
340            return write!(f, "{{}}");
341        }
342
343        write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
344
345        if !self.init_cost.is_zero() {
346            write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
347        }
348        if self.num_fetch_by_batches > 0 {
349            write!(
350                f,
351                r#", "num_fetch_by_batches":{}"#,
352                self.num_fetch_by_batches
353            )?;
354        }
355        if self.num_fetch_by_rows > 0 {
356            write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
357        }
358        if !self.fetch_cost.is_zero() {
359            write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
360        }
361
362        write!(f, "}}")
363    }
364}
365
366impl MergeMetrics {
367    /// Merges metrics from another MergeMetrics instance.
368    pub(crate) fn merge(&mut self, other: &MergeMetrics) {
369        let MergeMetrics {
370            init_cost,
371            scan_cost,
372            num_fetch_by_batches,
373            num_fetch_by_rows,
374            fetch_cost,
375        } = other;
376
377        self.init_cost += *init_cost;
378        self.scan_cost += *scan_cost;
379        self.num_fetch_by_batches += *num_fetch_by_batches;
380        self.num_fetch_by_rows += *num_fetch_by_rows;
381        self.fetch_cost += *fetch_cost;
382    }
383
384    /// Reports the metrics if scan_cost exceeds 10ms and resets them.
385    pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
386        if self.scan_cost.as_millis() > 10
387            && let Some(r) = reporter
388        {
389            r.report(self);
390        }
391    }
392}
393
394/// A `Node` represent an individual input data source to be merged.
395struct Node {
396    /// Data source of this `Node`.
397    source: Source,
398    /// Current batch to be read. The node ensures the batch is not empty.
399    ///
400    /// `None` means the `source` has reached EOF.
401    current_batch: Option<CompareFirst>,
402}
403
404impl Node {
405    /// Initialize a node.
406    ///
407    /// It tries to fetch one batch from the `source`.
408    async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result<Node> {
409        // Ensures batch is not empty.
410        let start = Instant::now();
411        let current_batch = source.next_batch().await?.map(CompareFirst);
412        metrics.fetch_cost += start.elapsed();
413
414        Ok(Node {
415            source,
416            current_batch,
417        })
418    }
419
420    /// Returns whether the node still has batch to read.
421    fn is_eof(&self) -> bool {
422        self.current_batch.is_none()
423    }
424
425    /// Returns the primary key of current batch.
426    ///
427    /// # Panics
428    /// Panics if the node has reached EOF.
429    fn primary_key(&self) -> &[u8] {
430        self.current_batch().primary_key()
431    }
432
433    /// Returns current batch.
434    ///
435    /// # Panics
436    /// Panics if the node has reached EOF.
437    fn current_batch(&self) -> &Batch {
438        &self.current_batch.as_ref().unwrap().0
439    }
440
441    /// Returns current batch and fetches next batch
442    /// from the source.
443    ///
444    /// # Panics
445    /// Panics if the node has reached EOF.
446    async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result<Batch> {
447        let current = self.current_batch.take().unwrap();
448        let start = Instant::now();
449        // Ensures batch is not empty.
450        self.current_batch = self.source.next_batch().await?.map(CompareFirst);
451        metrics.fetch_cost += start.elapsed();
452        Ok(current.0)
453    }
454
455    /// Returns true if the key range of current batch in `self` is behind (exclusive) current
456    /// batch in `other`.
457    ///
458    /// # Panics
459    /// Panics if either `self` or `other` is EOF.
460    fn is_behind(&self, other: &Node) -> bool {
461        debug_assert!(!self.current_batch().is_empty());
462        debug_assert!(!other.current_batch().is_empty());
463
464        // We only compare pk and timestamp so nodes in the cold
465        // heap don't have overlapping timestamps with the hottest node
466        // in the hot heap.
467        self.primary_key().cmp(other.primary_key()).then_with(|| {
468            self.current_batch()
469                .first_timestamp()
470                .cmp(&other.current_batch().last_timestamp())
471        }) == Ordering::Greater
472    }
473
474    /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches
475    /// next batch from the node.
476    ///
477    /// # Panics
478    /// Panics if the node is EOF.
479    async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> {
480        let batch = self.current_batch();
481        debug_assert!(batch.num_rows() >= num_to_skip);
482
483        let remaining = batch.num_rows() - num_to_skip;
484        if remaining == 0 {
485            // Nothing remains, we need to fetch next batch to ensure the batch is not empty.
486            self.fetch_batch(metrics).await?;
487        } else {
488            debug_assert!(!batch.is_empty());
489            self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
490        }
491
492        Ok(())
493    }
494}
495
496impl PartialEq for Node {
497    fn eq(&self, other: &Node) -> bool {
498        self.current_batch == other.current_batch
499    }
500}
501
502impl Eq for Node {}
503
504impl PartialOrd for Node {
505    fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
506        Some(self.cmp(other))
507    }
508}
509
510impl Ord for Node {
511    fn cmp(&self, other: &Node) -> Ordering {
512        // The std binary heap is a max heap, but we want the nodes are ordered in
513        // ascend order, so we compare the nodes in reverse order.
514        other.current_batch.cmp(&self.current_batch)
515    }
516}
517
518/// Type to compare [Batch] by first row.
519///
520/// It ignores op type as sequence is enough to distinguish different rows.
521struct CompareFirst(Batch);
522
523impl PartialEq for CompareFirst {
524    fn eq(&self, other: &Self) -> bool {
525        self.0.primary_key() == other.0.primary_key()
526            && self.0.first_timestamp() == other.0.first_timestamp()
527            && self.0.first_sequence() == other.0.first_sequence()
528    }
529}
530
531impl Eq for CompareFirst {}
532
533impl PartialOrd for CompareFirst {
534    fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
535        Some(self.cmp(other))
536    }
537}
538
539impl Ord for CompareFirst {
540    /// Compares by primary key, time index, sequence desc.
541    fn cmp(&self, other: &CompareFirst) -> Ordering {
542        self.0
543            .primary_key()
544            .cmp(other.0.primary_key())
545            .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
546            .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use api::v1::OpType;
553
554    use super::*;
555    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
556
557    #[tokio::test]
558    async fn test_merge_reader_empty() {
559        let mut reader = MergeReaderBuilder::new().build().await.unwrap();
560        assert!(reader.next_batch().await.unwrap().is_none());
561        assert!(reader.next_batch().await.unwrap().is_none());
562    }
563
564    #[tokio::test]
565    async fn test_merge_non_overlapping() {
566        let reader1 = VecBatchReader::new(&[
567            new_batch(
568                b"k1",
569                &[1, 2],
570                &[11, 12],
571                &[OpType::Put, OpType::Put],
572                &[21, 22],
573            ),
574            new_batch(
575                b"k1",
576                &[7, 8],
577                &[17, 18],
578                &[OpType::Put, OpType::Delete],
579                &[27, 28],
580            ),
581            new_batch(
582                b"k2",
583                &[2, 3],
584                &[12, 13],
585                &[OpType::Delete, OpType::Put],
586                &[22, 23],
587            ),
588        ]);
589        let reader2 = VecBatchReader::new(&[new_batch(
590            b"k1",
591            &[4, 5],
592            &[14, 15],
593            &[OpType::Put, OpType::Put],
594            &[24, 25],
595        )]);
596        let mut reader = MergeReaderBuilder::new()
597            .push_batch_reader(Box::new(reader1))
598            .push_batch_iter(Box::new(reader2))
599            .build()
600            .await
601            .unwrap();
602        check_reader_result(
603            &mut reader,
604            &[
605                new_batch(
606                    b"k1",
607                    &[1, 2],
608                    &[11, 12],
609                    &[OpType::Put, OpType::Put],
610                    &[21, 22],
611                ),
612                new_batch(
613                    b"k1",
614                    &[4, 5],
615                    &[14, 15],
616                    &[OpType::Put, OpType::Put],
617                    &[24, 25],
618                ),
619                new_batch(
620                    b"k1",
621                    &[7, 8],
622                    &[17, 18],
623                    &[OpType::Put, OpType::Delete],
624                    &[27, 28],
625                ),
626                new_batch(
627                    b"k2",
628                    &[2, 3],
629                    &[12, 13],
630                    &[OpType::Delete, OpType::Put],
631                    &[22, 23],
632                ),
633            ],
634        )
635        .await;
636    }
637
638    #[tokio::test]
639    async fn test_merge_reheap_hot() {
640        let reader1 = VecBatchReader::new(&[
641            new_batch(
642                b"k1",
643                &[1, 3],
644                &[10, 10],
645                &[OpType::Put, OpType::Put],
646                &[21, 23],
647            ),
648            new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
649        ]);
650        let reader2 = VecBatchReader::new(&[new_batch(
651            b"k1",
652            &[2, 4],
653            &[11, 11],
654            &[OpType::Put, OpType::Put],
655            &[32, 34],
656        )]);
657        let mut reader = MergeReaderBuilder::new()
658            .push_batch_reader(Box::new(reader1))
659            .push_batch_iter(Box::new(reader2))
660            .build()
661            .await
662            .unwrap();
663        check_reader_result(
664            &mut reader,
665            &[
666                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
667                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
668                new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
669                new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
670                new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
671            ],
672        )
673        .await;
674    }
675
676    #[tokio::test]
677    async fn test_merge_overlapping() {
678        let reader1 = VecBatchReader::new(&[
679            new_batch(
680                b"k1",
681                &[1, 2],
682                &[11, 12],
683                &[OpType::Put, OpType::Put],
684                &[21, 22],
685            ),
686            new_batch(
687                b"k1",
688                &[4, 5],
689                &[14, 15],
690                // This override 4 and deletes 5.
691                &[OpType::Put, OpType::Delete],
692                &[24, 25],
693            ),
694            new_batch(
695                b"k2",
696                &[2, 3],
697                &[12, 13],
698                // This delete 2.
699                &[OpType::Delete, OpType::Put],
700                &[22, 23],
701            ),
702        ]);
703        let reader2 = VecBatchReader::new(&[
704            new_batch(
705                b"k1",
706                &[3, 4, 5],
707                &[10, 10, 10],
708                &[OpType::Put, OpType::Put, OpType::Put],
709                &[33, 34, 35],
710            ),
711            new_batch(
712                b"k2",
713                &[1, 10],
714                &[11, 20],
715                &[OpType::Put, OpType::Put],
716                &[21, 30],
717            ),
718        ]);
719        let mut reader = MergeReaderBuilder::new()
720            .push_batch_reader(Box::new(reader1))
721            .push_batch_iter(Box::new(reader2))
722            .build()
723            .await
724            .unwrap();
725        check_reader_result(
726            &mut reader,
727            &[
728                new_batch(
729                    b"k1",
730                    &[1, 2],
731                    &[11, 12],
732                    &[OpType::Put, OpType::Put],
733                    &[21, 22],
734                ),
735                new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
736                new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
737                new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
738                new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
739                new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
740                new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
741                new_batch(
742                    b"k2",
743                    &[2, 3],
744                    &[12, 13],
745                    &[OpType::Delete, OpType::Put],
746                    &[22, 23],
747                ),
748                new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
749            ],
750        )
751        .await;
752    }
753
754    #[tokio::test]
755    async fn test_merge_deleted() {
756        let reader1 = VecBatchReader::new(&[
757            new_batch(
758                b"k1",
759                &[1, 2],
760                &[11, 12],
761                &[OpType::Delete, OpType::Delete],
762                &[21, 22],
763            ),
764            new_batch(
765                b"k2",
766                &[2, 3],
767                &[12, 13],
768                &[OpType::Delete, OpType::Put],
769                &[22, 23],
770            ),
771        ]);
772        let reader2 = VecBatchReader::new(&[new_batch(
773            b"k1",
774            &[4, 5],
775            &[14, 15],
776            &[OpType::Delete, OpType::Delete],
777            &[24, 25],
778        )]);
779        let mut reader = MergeReaderBuilder::new()
780            .push_batch_reader(Box::new(reader1))
781            .push_batch_iter(Box::new(reader2))
782            .build()
783            .await
784            .unwrap();
785        check_reader_result(
786            &mut reader,
787            &[
788                new_batch(
789                    b"k1",
790                    &[1, 2],
791                    &[11, 12],
792                    &[OpType::Delete, OpType::Delete],
793                    &[21, 22],
794                ),
795                new_batch(
796                    b"k1",
797                    &[4, 5],
798                    &[14, 15],
799                    &[OpType::Delete, OpType::Delete],
800                    &[24, 25],
801                ),
802                new_batch(
803                    b"k2",
804                    &[2, 3],
805                    &[12, 13],
806                    &[OpType::Delete, OpType::Put],
807                    &[22, 23],
808                ),
809            ],
810        )
811        .await;
812    }
813
814    #[tokio::test]
815    async fn test_merge_next_node_empty() {
816        let reader1 = VecBatchReader::new(&[new_batch(
817            b"k1",
818            &[1, 2],
819            &[11, 12],
820            &[OpType::Put, OpType::Put],
821            &[21, 22],
822        )]);
823        let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
824        let mut reader = MergeReaderBuilder::new()
825            .push_batch_reader(Box::new(reader1))
826            .push_batch_iter(Box::new(reader2))
827            .build()
828            .await
829            .unwrap();
830        check_reader_result(
831            &mut reader,
832            &[
833                new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
834                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
835                new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
836            ],
837        )
838        .await;
839    }
840
841    #[tokio::test]
842    async fn test_merge_top_node_empty() {
843        let reader1 = VecBatchReader::new(&[new_batch(
844            b"k1",
845            &[1, 2],
846            &[10, 10],
847            &[OpType::Put, OpType::Put],
848            &[21, 22],
849        )]);
850        let reader2 = VecBatchReader::new(&[new_batch(
851            b"k1",
852            &[2, 3],
853            &[11, 11],
854            &[OpType::Put, OpType::Put],
855            &[32, 33],
856        )]);
857        let mut reader = MergeReaderBuilder::new()
858            .push_batch_reader(Box::new(reader1))
859            .push_batch_iter(Box::new(reader2))
860            .build()
861            .await
862            .unwrap();
863        check_reader_result(
864            &mut reader,
865            &[
866                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
867                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
868                new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
869                new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
870            ],
871        )
872        .await;
873    }
874
875    #[tokio::test]
876    async fn test_merge_large_range() {
877        let reader1 = VecBatchReader::new(&[new_batch(
878            b"k1",
879            &[1, 10],
880            &[10, 10],
881            &[OpType::Put, OpType::Put],
882            &[21, 30],
883        )]);
884        let reader2 = VecBatchReader::new(&[new_batch(
885            b"k1",
886            &[1, 20],
887            &[11, 11],
888            &[OpType::Put, OpType::Put],
889            &[31, 40],
890        )]);
891        // The hot heap have a node that doesn't have duplicate
892        // timestamps.
893        let reader3 = VecBatchReader::new(&[new_batch(
894            b"k1",
895            &[6, 8],
896            &[11, 11],
897            &[OpType::Put, OpType::Put],
898            &[36, 38],
899        )]);
900        let mut reader = MergeReaderBuilder::new()
901            .push_batch_reader(Box::new(reader1))
902            .push_batch_iter(Box::new(reader2))
903            .push_batch_reader(Box::new(reader3))
904            .build()
905            .await
906            .unwrap();
907        check_reader_result(
908            &mut reader,
909            &[
910                new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
911                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
912                new_batch(
913                    b"k1",
914                    &[6, 8],
915                    &[11, 11],
916                    &[OpType::Put, OpType::Put],
917                    &[36, 38],
918                ),
919                new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
920                new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
921            ],
922        )
923        .await;
924    }
925
926    #[tokio::test]
927    async fn test_merge_many_duplicates() {
928        let mut builder = MergeReaderBuilder::new();
929        for i in 0..10 {
930            let batches: Vec<_> = (0..8)
931                .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
932                .collect();
933            let reader = VecBatchReader::new(&batches);
934            builder.push_batch_reader(Box::new(reader));
935        }
936        let mut reader = builder.build().await.unwrap();
937        let mut expect = Vec::with_capacity(80);
938        for ts in 0..8 {
939            for i in 0..10 {
940                let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
941                expect.push(batch);
942            }
943        }
944        check_reader_result(&mut reader, &expect).await;
945    }
946
947    #[tokio::test]
948    async fn test_merge_keep_duplicate() {
949        let reader1 = VecBatchReader::new(&[new_batch(
950            b"k1",
951            &[1, 2],
952            &[10, 10],
953            &[OpType::Put, OpType::Put],
954            &[21, 22],
955        )]);
956        let reader2 = VecBatchReader::new(&[new_batch(
957            b"k1",
958            &[2, 3],
959            &[11, 11],
960            &[OpType::Put, OpType::Put],
961            &[32, 33],
962        )]);
963        let sources = vec![
964            Source::Reader(Box::new(reader1)),
965            Source::Iter(Box::new(reader2)),
966        ];
967        let mut reader = MergeReaderBuilder::from_sources(sources)
968            .build()
969            .await
970            .unwrap();
971        check_reader_result(
972            &mut reader,
973            &[
974                new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
975                new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
976                new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
977                new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
978            ],
979        )
980        .await;
981    }
982}