mito2/read/
flat_merge.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::sync::Arc;
18use std::time::Instant;
19
20use async_stream::try_stream;
21use common_telemetry::debug;
22use datatypes::arrow::array::{Int64Array, UInt64Array};
23use datatypes::arrow::compute::interleave;
24use datatypes::arrow::datatypes::SchemaRef;
25use datatypes::arrow::record_batch::RecordBatch;
26use datatypes::arrow_array::BinaryArray;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::{Stream, TryStreamExt};
29use snafu::ResultExt;
30use store_api::storage::SequenceNumber;
31
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::BoxedRecordBatchIterator;
34use crate::metrics::READ_STAGE_ELAPSED;
35use crate::read::BoxedRecordBatchStream;
36use crate::read::merge::{MergeMetrics, MergeMetricsReport};
37use crate::sst::parquet::flat_format::{
38    primary_key_column_index, sequence_column_index, time_index_column_index,
39};
40use crate::sst::parquet::format::PrimaryKeyArray;
41
42/// Keeps track of the current position in a batch
43#[derive(Debug, Copy, Clone, Default)]
44struct BatchCursor {
45    /// The index into BatchBuilder::batches
46    batch_idx: usize,
47    /// The row index within the given batch
48    row_idx: usize,
49}
50
51/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
52// Ports from https://github.com/apache/datafusion/blob/49.0.0/datafusion/physical-plan/src/sorts/builder.rs
53// Adds the `take_remaining_rows()` method.
54#[derive(Debug)]
55pub struct BatchBuilder {
56    /// The schema of the RecordBatches yielded by this stream
57    schema: SchemaRef,
58
59    /// Maintain a list of [`RecordBatch`] and their corresponding stream
60    batches: Vec<(usize, RecordBatch)>,
61
62    /// The current [`BatchCursor`] for each stream
63    cursors: Vec<BatchCursor>,
64
65    /// The accumulated stream indexes from which to pull rows
66    /// Consists of a tuple of `(batch_idx, row_idx)`
67    indices: Vec<(usize, usize)>,
68}
69
70impl BatchBuilder {
71    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
72    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
73        Self {
74            schema,
75            batches: Vec::with_capacity(stream_count * 2),
76            cursors: vec![BatchCursor::default(); stream_count],
77            indices: Vec::with_capacity(batch_size),
78        }
79    }
80
81    /// Append a new batch in `stream_idx`
82    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
83        let batch_idx = self.batches.len();
84        self.batches.push((stream_idx, batch));
85        self.cursors[stream_idx] = BatchCursor {
86            batch_idx,
87            row_idx: 0,
88        };
89    }
90
91    /// Append the next row from `stream_idx`
92    pub fn push_row(&mut self, stream_idx: usize) {
93        let cursor = &mut self.cursors[stream_idx];
94        let row_idx = cursor.row_idx;
95        cursor.row_idx += 1;
96        self.indices.push((cursor.batch_idx, row_idx));
97    }
98
99    /// Returns the number of in-progress rows in this [`BatchBuilder`]
100    pub fn len(&self) -> usize {
101        self.indices.len()
102    }
103
104    /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
105    pub fn is_empty(&self) -> bool {
106        self.indices.is_empty()
107    }
108
109    /// Returns the schema of this [`BatchBuilder`]
110    pub fn schema(&self) -> &SchemaRef {
111        &self.schema
112    }
113
114    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
115    ///
116    /// Will then drop any batches for which all rows have been yielded to the output
117    ///
118    /// Returns `None` if no pending rows
119    pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
120        if self.is_empty() {
121            return Ok(None);
122        }
123
124        let columns = (0..self.schema.fields.len())
125            .map(|column_idx| {
126                let arrays: Vec<_> = self
127                    .batches
128                    .iter()
129                    .map(|(_, batch)| batch.column(column_idx).as_ref())
130                    .collect();
131                interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
132            })
133            .collect::<Result<Vec<_>>>()?;
134
135        self.indices.clear();
136
137        // New cursors are only created once the previous cursor for the stream
138        // is finished. This means all remaining rows from all but the last batch
139        // for each stream have been yielded to the newly created record batch
140        //
141        // We can therefore drop all but the last batch for each stream
142        self.retain_batches();
143
144        RecordBatch::try_new(Arc::clone(&self.schema), columns)
145            .context(ComputeArrowSnafu)
146            .map(Some)
147    }
148
149    /// Slice and take remaining rows from the last batch of `stream_idx` and push
150    /// the next batch if available.
151    pub fn take_remaining_rows(
152        &mut self,
153        stream_idx: usize,
154        next: Option<RecordBatch>,
155    ) -> RecordBatch {
156        let cursor = &mut self.cursors[stream_idx];
157        let batch = &self.batches[cursor.batch_idx];
158        let output = batch
159            .1
160            .slice(cursor.row_idx, batch.1.num_rows() - cursor.row_idx);
161        cursor.row_idx = batch.1.num_rows();
162
163        if let Some(b) = next {
164            self.push_batch(stream_idx, b);
165            self.retain_batches();
166        }
167
168        output
169    }
170
171    fn retain_batches(&mut self) {
172        let mut batch_idx = 0;
173        let mut retained = 0;
174        self.batches.retain(|(stream_idx, _)| {
175            let stream_cursor = &mut self.cursors[*stream_idx];
176            let retain = stream_cursor.batch_idx == batch_idx;
177            batch_idx += 1;
178
179            if retain {
180                stream_cursor.batch_idx = retained;
181                retained += 1;
182            }
183            retain
184        });
185    }
186}
187
188/// A comparable node of the heap.
189trait NodeCmp: Eq + Ord {
190    /// Returns whether the node still has batch to read.
191    fn is_eof(&self) -> bool;
192
193    /// Returns true if the key range of current batch in `self` is behind (exclusive) current
194    /// batch in `other`.
195    ///
196    /// # Panics
197    /// Panics if either `self` or `other` is EOF.
198    fn is_behind(&self, other: &Self) -> bool;
199}
200
201/// Common algorithm of merging sorted batches from multiple nodes.
202struct MergeAlgo<T> {
203    /// Holds nodes whose key range of current batch **is** overlapped with the merge window.
204    /// Each node yields batches from a `source`.
205    ///
206    /// Node in this heap **MUST** not be empty. A `merge window` is the (primary key, timestamp)
207    /// range of the **root node** in the `hot` heap.
208    hot: BinaryHeap<T>,
209    /// Holds nodes whose key range of current batch **isn't** overlapped with the merge window.
210    ///
211    /// Nodes in this heap **MUST** not be empty.
212    cold: BinaryHeap<T>,
213}
214
215impl<T: NodeCmp> MergeAlgo<T> {
216    /// Creates a new merge algorithm from `nodes`.
217    ///
218    /// All nodes must be initialized.
219    fn new(mut nodes: Vec<T>) -> Self {
220        // Skips EOF nodes.
221        nodes.retain(|node| !node.is_eof());
222        let hot = BinaryHeap::with_capacity(nodes.len());
223        let cold = BinaryHeap::from(nodes);
224
225        let mut algo = MergeAlgo { hot, cold };
226        // Initializes the algorithm.
227        algo.refill_hot();
228
229        algo
230    }
231
232    /// Moves nodes in `cold` heap, whose key range is overlapped with current merge
233    /// window to `hot` heap.
234    fn refill_hot(&mut self) {
235        while !self.cold.is_empty() {
236            if let Some(merge_window) = self.hot.peek() {
237                let warmest = self.cold.peek().unwrap();
238                if warmest.is_behind(merge_window) {
239                    // if the warmest node in the `cold` heap is totally after the
240                    // `merge_window`, then no need to add more nodes into the `hot`
241                    // heap for merge sorting.
242                    break;
243                }
244            }
245
246            let warmest = self.cold.pop().unwrap();
247            self.hot.push(warmest);
248        }
249    }
250
251    /// Push the node popped from `hot` back to a proper heap.
252    fn reheap(&mut self, node: T) {
253        if node.is_eof() {
254            // If the node is EOF, don't put it into the heap again.
255            // The merge window would be updated, need to refill the hot heap.
256            self.refill_hot();
257        } else {
258            // Find a proper heap for this node.
259            let node_is_cold = if let Some(hottest) = self.hot.peek() {
260                // If key range of this node is behind the hottest node's then we can
261                // push it to the cold heap. Otherwise we should push it to the hot heap.
262                node.is_behind(hottest)
263            } else {
264                // The hot heap is empty, but we don't known whether the current
265                // batch of this node is still the hottest.
266                true
267            };
268
269            if node_is_cold {
270                self.cold.push(node);
271            } else {
272                self.hot.push(node);
273            }
274            // Anyway, the merge window has been changed, we need to refill the hot heap.
275            self.refill_hot();
276        }
277    }
278
279    /// Pops the hottest node.
280    fn pop_hot(&mut self) -> Option<T> {
281        self.hot.pop()
282    }
283
284    /// Returns true if there are rows in the hot heap.
285    fn has_rows(&self) -> bool {
286        !self.hot.is_empty()
287    }
288
289    /// Returns true if we can fetch a batch directly instead of a row.
290    fn can_fetch_batch(&self) -> bool {
291        self.hot.len() == 1
292    }
293}
294
295// TODO(yingwen): Further downcast and store arrays in this struct.
296/// Columns to compare for a [RecordBatch].
297struct SortColumns {
298    primary_key: PrimaryKeyArray,
299    timestamp: Int64Array,
300    sequence: UInt64Array,
301}
302
303impl SortColumns {
304    /// Creates a new [SortColumns] from a [RecordBatch] and the position of the time index column.
305    ///
306    /// # Panics
307    /// Panics if the input batch doesn't have correct internal columns.
308    fn new(batch: &RecordBatch) -> Self {
309        let num_columns = batch.num_columns();
310        let primary_key = batch
311            .column(primary_key_column_index(num_columns))
312            .as_any()
313            .downcast_ref::<PrimaryKeyArray>()
314            .unwrap()
315            .clone();
316        let timestamp = batch.column(time_index_column_index(num_columns));
317        let (timestamp, _unit) = timestamp_array_to_primitive(timestamp).unwrap();
318        let sequence = batch
319            .column(sequence_column_index(num_columns))
320            .as_any()
321            .downcast_ref::<UInt64Array>()
322            .unwrap()
323            .clone();
324
325        Self {
326            primary_key,
327            timestamp,
328            sequence,
329        }
330    }
331
332    fn primary_key_at(&self, index: usize) -> &[u8] {
333        let key = self.primary_key.keys().value(index);
334        let binary_values = self
335            .primary_key
336            .values()
337            .as_any()
338            .downcast_ref::<BinaryArray>()
339            .unwrap();
340        binary_values.value(key as usize)
341    }
342
343    fn timestamp_at(&self, index: usize) -> i64 {
344        self.timestamp.value(index)
345    }
346
347    fn sequence_at(&self, index: usize) -> SequenceNumber {
348        self.sequence.value(index)
349    }
350
351    fn num_rows(&self) -> usize {
352        self.timestamp.len()
353    }
354}
355
356/// Cursor to a row in the [RecordBatch].
357///
358/// It compares batches by rows. During comparison, it ignores op type as sequence is enough to
359/// distinguish different rows.
360struct RowCursor {
361    /// Current row offset.
362    offset: usize,
363    /// Keys of the batch.
364    columns: SortColumns,
365}
366
367impl RowCursor {
368    fn new(columns: SortColumns) -> Self {
369        debug_assert!(columns.num_rows() > 0);
370
371        Self { offset: 0, columns }
372    }
373
374    fn is_finished(&self) -> bool {
375        self.offset >= self.columns.num_rows()
376    }
377
378    fn advance(&mut self) {
379        self.offset += 1;
380    }
381
382    fn first_primary_key(&self) -> &[u8] {
383        self.columns.primary_key_at(self.offset)
384    }
385
386    fn first_timestamp(&self) -> i64 {
387        self.columns.timestamp_at(self.offset)
388    }
389
390    fn first_sequence(&self) -> SequenceNumber {
391        self.columns.sequence_at(self.offset)
392    }
393
394    fn last_primary_key(&self) -> &[u8] {
395        self.columns.primary_key_at(self.columns.num_rows() - 1)
396    }
397
398    fn last_timestamp(&self) -> i64 {
399        self.columns.timestamp_at(self.columns.num_rows() - 1)
400    }
401}
402
403impl PartialEq for RowCursor {
404    fn eq(&self, other: &Self) -> bool {
405        self.first_primary_key() == other.first_primary_key()
406            && self.first_timestamp() == other.first_timestamp()
407            && self.first_sequence() == other.first_sequence()
408    }
409}
410
411impl Eq for RowCursor {}
412
413impl PartialOrd for RowCursor {
414    fn partial_cmp(&self, other: &RowCursor) -> Option<Ordering> {
415        Some(self.cmp(other))
416    }
417}
418
419impl Ord for RowCursor {
420    /// Compares by primary key, time index, sequence desc.
421    fn cmp(&self, other: &RowCursor) -> Ordering {
422        self.first_primary_key()
423            .cmp(other.first_primary_key())
424            .then_with(|| self.first_timestamp().cmp(&other.first_timestamp()))
425            .then_with(|| other.first_sequence().cmp(&self.first_sequence()))
426    }
427}
428
429/// Iterator to merge multiple sorted iterators into a single sorted iterator.
430///
431/// All iterators must be sorted by primary key, time index, sequence desc.
432pub struct FlatMergeIterator {
433    /// The merge algorithm to maintain heaps.
434    algo: MergeAlgo<IterNode>,
435    /// Current buffered rows to output.
436    in_progress: BatchBuilder,
437    /// Non-empty batch to output.
438    output_batch: Option<RecordBatch>,
439    /// Batch size to merge rows.
440    /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
441    /// rows.
442    batch_size: usize,
443}
444
445impl FlatMergeIterator {
446    /// Creates a new iterator to merge sorted `iters`.
447    pub fn new(
448        schema: SchemaRef,
449        iters: Vec<BoxedRecordBatchIterator>,
450        batch_size: usize,
451    ) -> Result<Self> {
452        let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
453        let mut nodes = Vec::with_capacity(iters.len());
454        // Initialize nodes and the buffer.
455        for (node_index, iter) in iters.into_iter().enumerate() {
456            let mut node = IterNode {
457                node_index,
458                iter,
459                cursor: None,
460            };
461            if let Some(batch) = node.advance_batch()? {
462                in_progress.push_batch(node_index, batch);
463                nodes.push(node);
464            }
465        }
466
467        let algo = MergeAlgo::new(nodes);
468
469        let iter = Self {
470            algo,
471            in_progress,
472            output_batch: None,
473            batch_size,
474        };
475
476        Ok(iter)
477    }
478
479    /// Fetches next sorted batch.
480    pub fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
481        while self.algo.has_rows() && self.output_batch.is_none() {
482            if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
483                // Only one batch in the hot heap, but we have pending rows, output the pending rows first.
484                self.output_batch = self.in_progress.build_record_batch()?;
485                debug_assert!(self.output_batch.is_some());
486            } else if self.algo.can_fetch_batch() {
487                self.fetch_batch_from_hottest()?;
488            } else {
489                self.fetch_row_from_hottest()?;
490            }
491        }
492
493        Ok(self.output_batch.take())
494    }
495
496    /// Fetches a batch from the hottest node.
497    fn fetch_batch_from_hottest(&mut self) -> Result<()> {
498        debug_assert!(self.in_progress.is_empty());
499
500        // Safety: next_batch() ensures the heap is not empty.
501        let mut hottest = self.algo.pop_hot().unwrap();
502        debug_assert!(!hottest.current_cursor().is_finished());
503        let next = hottest.advance_batch()?;
504        // The node is the heap is not empty, so it must have existing rows in the builder.
505        let batch = self
506            .in_progress
507            .take_remaining_rows(hottest.node_index, next);
508        Self::maybe_output_batch(batch, &mut self.output_batch);
509        self.algo.reheap(hottest);
510
511        Ok(())
512    }
513
514    /// Fetches a row from the hottest node.
515    fn fetch_row_from_hottest(&mut self) -> Result<()> {
516        // Safety: next_batch() ensures the heap has more than 1 element.
517        let mut hottest = self.algo.pop_hot().unwrap();
518        debug_assert!(!hottest.current_cursor().is_finished());
519        self.in_progress.push_row(hottest.node_index);
520        if self.in_progress.len() >= self.batch_size {
521            // We buffered enough rows.
522            if let Some(output) = self.in_progress.build_record_batch()? {
523                Self::maybe_output_batch(output, &mut self.output_batch);
524            }
525        }
526
527        if let Some(next) = hottest.advance_row()? {
528            self.in_progress.push_batch(hottest.node_index, next);
529        }
530
531        self.algo.reheap(hottest);
532        Ok(())
533    }
534
535    /// Adds the batch to the output batch if it is not empty.
536    fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
537        debug_assert!(output_batch.is_none());
538        if batch.num_rows() > 0 {
539            *output_batch = Some(batch);
540        }
541    }
542}
543
544impl Iterator for FlatMergeIterator {
545    type Item = Result<RecordBatch>;
546
547    fn next(&mut self) -> Option<Self::Item> {
548        self.next_batch().transpose()
549    }
550}
551
552/// Iterator to merge multiple sorted iterators into a single sorted iterator.
553///
554/// All iterators must be sorted by primary key, time index, sequence desc.
555pub struct FlatMergeReader {
556    /// The merge algorithm to maintain heaps.
557    algo: MergeAlgo<StreamNode>,
558    /// Current buffered rows to output.
559    in_progress: BatchBuilder,
560    /// Non-empty batch to output.
561    output_batch: Option<RecordBatch>,
562    /// Batch size to merge rows.
563    /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
564    /// rows.
565    batch_size: usize,
566    /// Local metrics.
567    metrics: MergeMetrics,
568    /// Optional metrics reporter.
569    metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
570}
571
572impl FlatMergeReader {
573    /// Creates a new iterator to merge sorted `iters`.
574    pub async fn new(
575        schema: SchemaRef,
576        iters: Vec<BoxedRecordBatchStream>,
577        batch_size: usize,
578        metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
579    ) -> Result<Self> {
580        let start = Instant::now();
581        let metrics = MergeMetrics::default();
582        let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
583        let mut nodes = Vec::with_capacity(iters.len());
584        // Initialize nodes and the buffer.
585        for (node_index, iter) in iters.into_iter().enumerate() {
586            let mut node = StreamNode {
587                node_index,
588                iter,
589                cursor: None,
590            };
591            if let Some(batch) = node.advance_batch().await? {
592                in_progress.push_batch(node_index, batch);
593                nodes.push(node);
594            }
595        }
596
597        let algo = MergeAlgo::new(nodes);
598
599        let mut reader = Self {
600            algo,
601            in_progress,
602            output_batch: None,
603            batch_size,
604            metrics,
605            metrics_reporter,
606        };
607        let elapsed = start.elapsed();
608        reader.metrics.init_cost += elapsed;
609        reader.metrics.scan_cost += elapsed;
610
611        Ok(reader)
612    }
613
614    /// Fetches next sorted batch.
615    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
616        let start = Instant::now();
617        while self.algo.has_rows() && self.output_batch.is_none() {
618            if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
619                // Only one batch in the hot heap, but we have pending rows, output the pending rows first.
620                self.output_batch = self.in_progress.build_record_batch()?;
621                debug_assert!(self.output_batch.is_some());
622            } else if self.algo.can_fetch_batch() {
623                self.fetch_batch_from_hottest().await?;
624                self.metrics.num_fetch_by_batches += 1;
625            } else {
626                self.fetch_row_from_hottest().await?;
627                self.metrics.num_fetch_by_rows += 1;
628            }
629        }
630
631        if let Some(batch) = self.output_batch.take() {
632            self.metrics.scan_cost += start.elapsed();
633            self.metrics.maybe_report(&self.metrics_reporter);
634            Ok(Some(batch))
635        } else {
636            // No more batches.
637            self.metrics.scan_cost += start.elapsed();
638            self.metrics.maybe_report(&self.metrics_reporter);
639            Ok(None)
640        }
641    }
642
643    /// Converts the reader into a stream.
644    pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
645        try_stream! {
646            while let Some(batch) = self.next_batch().await? {
647                yield batch;
648            }
649        }
650    }
651
652    /// Fetches a batch from the hottest node.
653    async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
654        debug_assert!(self.in_progress.is_empty());
655
656        // Safety: next_batch() ensures the heap is not empty.
657        let mut hottest = self.algo.pop_hot().unwrap();
658        debug_assert!(!hottest.current_cursor().is_finished());
659        let start = Instant::now();
660        let next = hottest.advance_batch().await?;
661        self.metrics.fetch_cost += start.elapsed();
662        // The node is the heap is not empty, so it must have existing rows in the builder.
663        let batch = self
664            .in_progress
665            .take_remaining_rows(hottest.node_index, next);
666        Self::maybe_output_batch(batch, &mut self.output_batch);
667        self.algo.reheap(hottest);
668
669        Ok(())
670    }
671
672    /// Fetches a row from the hottest node.
673    async fn fetch_row_from_hottest(&mut self) -> Result<()> {
674        // Safety: next_batch() ensures the heap has more than 1 element.
675        let mut hottest = self.algo.pop_hot().unwrap();
676        debug_assert!(!hottest.current_cursor().is_finished());
677        self.in_progress.push_row(hottest.node_index);
678        if self.in_progress.len() >= self.batch_size {
679            // We buffered enough rows.
680            if let Some(output) = self.in_progress.build_record_batch()? {
681                Self::maybe_output_batch(output, &mut self.output_batch);
682            }
683        }
684
685        let start = Instant::now();
686        if let Some(next) = hottest.advance_row().await? {
687            self.metrics.fetch_cost += start.elapsed();
688            self.in_progress.push_batch(hottest.node_index, next);
689        } else {
690            self.metrics.fetch_cost += start.elapsed();
691        }
692
693        self.algo.reheap(hottest);
694        Ok(())
695    }
696
697    /// Adds the batch to the output batch if it is not empty.
698    fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
699        debug_assert!(output_batch.is_none());
700        if batch.num_rows() > 0 {
701            *output_batch = Some(batch);
702        }
703    }
704}
705
706impl Drop for FlatMergeReader {
707    fn drop(&mut self) {
708        debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
709
710        READ_STAGE_ELAPSED
711            .with_label_values(&["flat_merge"])
712            .observe(self.metrics.scan_cost.as_secs_f64());
713        READ_STAGE_ELAPSED
714            .with_label_values(&["flat_merge_fetch"])
715            .observe(self.metrics.fetch_cost.as_secs_f64());
716
717        // Report any remaining metrics.
718        if let Some(reporter) = &self.metrics_reporter {
719            reporter.report(&mut self.metrics);
720        }
721    }
722}
723
724/// A sync node in the merge iterator.
725struct GenericNode<T> {
726    /// Index of the node.
727    node_index: usize,
728    /// Iterator of this `Node`.
729    iter: T,
730    /// Current batch to be read. The node should ensure the batch is not empty (The
731    /// cursor is not finished).
732    ///
733    /// `None` means the `iter` has reached EOF.
734    cursor: Option<RowCursor>,
735}
736
737impl<T> NodeCmp for GenericNode<T> {
738    fn is_eof(&self) -> bool {
739        self.cursor.is_none()
740    }
741
742    fn is_behind(&self, other: &Self) -> bool {
743        debug_assert!(!self.current_cursor().is_finished());
744        debug_assert!(!other.current_cursor().is_finished());
745
746        // We only compare pk and timestamp so nodes in the cold
747        // heap don't have overlapping timestamps with the hottest node
748        // in the hot heap.
749        self.current_cursor()
750            .first_primary_key()
751            .cmp(other.current_cursor().last_primary_key())
752            .then_with(|| {
753                self.current_cursor()
754                    .first_timestamp()
755                    .cmp(&other.current_cursor().last_timestamp())
756            })
757            == Ordering::Greater
758    }
759}
760
761impl<T> PartialEq for GenericNode<T> {
762    fn eq(&self, other: &GenericNode<T>) -> bool {
763        self.cursor == other.cursor
764    }
765}
766
767impl<T> Eq for GenericNode<T> {}
768
769impl<T> PartialOrd for GenericNode<T> {
770    fn partial_cmp(&self, other: &GenericNode<T>) -> Option<Ordering> {
771        Some(self.cmp(other))
772    }
773}
774
775impl<T> Ord for GenericNode<T> {
776    fn cmp(&self, other: &GenericNode<T>) -> Ordering {
777        // The std binary heap is a max heap, but we want the nodes are ordered in
778        // ascend order, so we compare the nodes in reverse order.
779        other.cursor.cmp(&self.cursor)
780    }
781}
782
783impl<T> GenericNode<T> {
784    /// Returns current cursor.
785    ///
786    /// # Panics
787    /// Panics if the node has reached EOF.
788    fn current_cursor(&self) -> &RowCursor {
789        self.cursor.as_ref().unwrap()
790    }
791}
792
793impl GenericNode<BoxedRecordBatchIterator> {
794    /// Fetches a new batch from the iter and updates the cursor.
795    /// It advances the current batch.
796    /// Returns the fetched new batch.
797    fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
798        let batch = self.advance_inner_iter()?;
799        let columns = batch.as_ref().map(SortColumns::new);
800        self.cursor = columns.map(RowCursor::new);
801
802        Ok(batch)
803    }
804
805    /// Skips one row.
806    /// Returns the next batch if the current batch is finished.
807    fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
808        let cursor = self.cursor.as_mut().unwrap();
809        cursor.advance();
810        if !cursor.is_finished() {
811            return Ok(None);
812        }
813
814        // Finished current batch, need to fetch a new batch.
815        self.advance_batch()
816    }
817
818    /// Fetches a non-empty batch from the iter.
819    fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
820        while let Some(batch) = self.iter.next().transpose()? {
821            if batch.num_rows() > 0 {
822                return Ok(Some(batch));
823            }
824        }
825        Ok(None)
826    }
827}
828
829type StreamNode = GenericNode<BoxedRecordBatchStream>;
830type IterNode = GenericNode<BoxedRecordBatchIterator>;
831
832impl GenericNode<BoxedRecordBatchStream> {
833    /// Fetches a new batch from the iter and updates the cursor.
834    /// It advances the current batch.
835    /// Returns the fetched new batch.
836    async fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
837        let batch = self.advance_inner_iter().await?;
838        let columns = batch.as_ref().map(SortColumns::new);
839        self.cursor = columns.map(RowCursor::new);
840
841        Ok(batch)
842    }
843
844    /// Skips one row.
845    /// Returns the next batch if the current batch is finished.
846    async fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
847        let cursor = self.cursor.as_mut().unwrap();
848        cursor.advance();
849        if !cursor.is_finished() {
850            return Ok(None);
851        }
852
853        // Finished current batch, need to fetch a new batch.
854        self.advance_batch().await
855    }
856
857    /// Fetches a non-empty batch from the iter.
858    async fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
859        while let Some(batch) = self.iter.try_next().await? {
860            if batch.num_rows() > 0 {
861                return Ok(Some(batch));
862            }
863        }
864        Ok(None)
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use std::sync::Arc;
871
872    use api::v1::OpType;
873    use datatypes::arrow::array::builder::BinaryDictionaryBuilder;
874    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
875    use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type};
876    use datatypes::arrow::record_batch::RecordBatch;
877
878    use super::*;
879
880    /// Creates a test RecordBatch with the specified data.
881    fn create_test_record_batch(
882        primary_keys: &[&[u8]],
883        timestamps: &[i64],
884        sequences: &[u64],
885        op_types: &[OpType],
886        field_values: &[i64],
887    ) -> RecordBatch {
888        let schema = Arc::new(Schema::new(vec![
889            Field::new("field1", DataType::Int64, false),
890            Field::new(
891                "timestamp",
892                DataType::Timestamp(TimeUnit::Millisecond, None),
893                false,
894            ),
895            Field::new(
896                "__primary_key",
897                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
898                false,
899            ),
900            Field::new("__sequence", DataType::UInt64, false),
901            Field::new("__op_type", DataType::UInt8, false),
902        ]));
903
904        let field1 = Arc::new(Int64Array::from_iter_values(field_values.iter().copied()));
905        let timestamp = Arc::new(TimestampMillisecondArray::from_iter_values(
906            timestamps.iter().copied(),
907        ));
908
909        // Create primary key dictionary array using BinaryDictionaryBuilder
910        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
911        for &key in primary_keys {
912            builder.append(key).unwrap();
913        }
914        let primary_key = Arc::new(builder.finish());
915
916        let sequence = Arc::new(UInt64Array::from_iter_values(sequences.iter().copied()));
917        let op_type = Arc::new(UInt8Array::from_iter_values(
918            op_types.iter().map(|&v| v as u8),
919        ));
920
921        RecordBatch::try_new(
922            schema,
923            vec![field1, timestamp, primary_key, sequence, op_type],
924        )
925        .unwrap()
926    }
927
928    fn new_test_iter(batches: Vec<RecordBatch>) -> BoxedRecordBatchIterator {
929        Box::new(batches.into_iter().map(Ok))
930    }
931
932    /// Helper function to check if two record batches are equivalent.
933    fn assert_record_batches_eq(expected: &[RecordBatch], actual: &[RecordBatch]) {
934        for (exp, act) in expected.iter().zip(actual.iter()) {
935            assert_eq!(exp, act,);
936        }
937    }
938
939    /// Helper function to collect all batches from a FlatMergeIterator.
940    fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec<RecordBatch> {
941        iter.map(|result| result.unwrap()).collect()
942    }
943
944    #[test]
945    fn test_merge_iterator_empty() {
946        let schema = Arc::new(Schema::new(vec![
947            Field::new("field1", DataType::Int64, false),
948            Field::new(
949                "timestamp",
950                DataType::Timestamp(TimeUnit::Millisecond, None),
951                false,
952            ),
953            Field::new(
954                "__primary_key",
955                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
956                false,
957            ),
958            Field::new("__sequence", DataType::UInt64, false),
959            Field::new("__op_type", DataType::UInt8, false),
960        ]));
961
962        let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap();
963        assert!(merge_iter.next_batch().unwrap().is_none());
964    }
965
966    #[test]
967    fn test_merge_iterator_single_batch() {
968        let batch = create_test_record_batch(
969            &[b"k1", b"k1"],
970            &[1000, 2000],
971            &[21, 22],
972            &[OpType::Put, OpType::Put],
973            &[11, 12],
974        );
975
976        let schema = batch.schema();
977        let iter = Box::new(new_test_iter(vec![batch.clone()]));
978
979        let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap();
980        let result = collect_merge_iterator_batches(merge_iter);
981
982        assert_eq!(result.len(), 1);
983        assert_record_batches_eq(&[batch], &result);
984    }
985
986    #[test]
987    fn test_merge_iterator_non_overlapping() {
988        let batch1 = create_test_record_batch(
989            &[b"k1", b"k1"],
990            &[1000, 2000],
991            &[21, 22],
992            &[OpType::Put, OpType::Put],
993            &[11, 12],
994        );
995        let batch2 = create_test_record_batch(
996            &[b"k1", b"k1"],
997            &[4000, 5000],
998            &[24, 25],
999            &[OpType::Put, OpType::Put],
1000            &[14, 15],
1001        );
1002        let batch3 = create_test_record_batch(
1003            &[b"k2", b"k2"],
1004            &[2000, 3000],
1005            &[22, 23],
1006            &[OpType::Delete, OpType::Put],
1007            &[12, 13],
1008        );
1009
1010        let schema = batch1.schema();
1011        let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()]));
1012        let iter2 = Box::new(new_test_iter(vec![batch2.clone()]));
1013
1014        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1015        let result = collect_merge_iterator_batches(merge_iter);
1016
1017        // Results should be sorted by primary key, timestamp, sequence desc
1018        let expected = vec![batch1, batch2, batch3];
1019        assert_record_batches_eq(&expected, &result);
1020    }
1021
1022    #[test]
1023    fn test_merge_iterator_overlapping_timestamps() {
1024        // Create batches with overlapping timestamps but different sequences
1025        let batch1 = create_test_record_batch(
1026            &[b"k1", b"k1"],
1027            &[1000, 2000],
1028            &[21, 22],
1029            &[OpType::Put, OpType::Put],
1030            &[11, 12],
1031        );
1032        let batch2 = create_test_record_batch(
1033            &[b"k1", b"k1"],
1034            &[1500, 2500],
1035            &[31, 32],
1036            &[OpType::Put, OpType::Put],
1037            &[15, 25],
1038        );
1039
1040        let schema = batch1.schema();
1041        let iter1 = Box::new(new_test_iter(vec![batch1]));
1042        let iter2 = Box::new(new_test_iter(vec![batch2]));
1043
1044        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1045        let result = collect_merge_iterator_batches(merge_iter);
1046
1047        let expected = vec![
1048            create_test_record_batch(
1049                &[b"k1", b"k1"],
1050                &[1000, 1500],
1051                &[21, 31],
1052                &[OpType::Put, OpType::Put],
1053                &[11, 15],
1054            ),
1055            create_test_record_batch(&[b"k1"], &[2000], &[22], &[OpType::Put], &[12]),
1056            create_test_record_batch(&[b"k1"], &[2500], &[32], &[OpType::Put], &[25]),
1057        ];
1058        assert_record_batches_eq(&expected, &result);
1059    }
1060
1061    #[test]
1062    fn test_merge_iterator_duplicate_keys_sequences() {
1063        // Test with same primary key and timestamp but different sequences
1064        let batch1 = create_test_record_batch(
1065            &[b"k1", b"k1"],
1066            &[1000, 1000],
1067            &[20, 10],
1068            &[OpType::Put, OpType::Put],
1069            &[1, 2],
1070        );
1071        let batch2 = create_test_record_batch(
1072            &[b"k1"],
1073            &[1000],
1074            &[15], // Middle sequence
1075            &[OpType::Put],
1076            &[3],
1077        );
1078
1079        let schema = batch1.schema();
1080        let iter1 = Box::new(new_test_iter(vec![batch1]));
1081        let iter2 = Box::new(new_test_iter(vec![batch2]));
1082
1083        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1084        let result = collect_merge_iterator_batches(merge_iter);
1085
1086        // Should be sorted by sequence descending for same key/timestamp
1087        let expected = vec![
1088            create_test_record_batch(
1089                &[b"k1", b"k1"],
1090                &[1000, 1000],
1091                &[20, 15],
1092                &[OpType::Put, OpType::Put],
1093                &[1, 3],
1094            ),
1095            create_test_record_batch(&[b"k1"], &[1000], &[10], &[OpType::Put], &[2]),
1096        ];
1097        assert_record_batches_eq(&expected, &result);
1098    }
1099
1100    #[test]
1101    fn test_batch_builder_basic() {
1102        let schema = Arc::new(Schema::new(vec![
1103            Field::new("field1", DataType::Int64, false),
1104            Field::new(
1105                "timestamp",
1106                DataType::Timestamp(TimeUnit::Millisecond, None),
1107                false,
1108            ),
1109        ]));
1110
1111        let mut builder = BatchBuilder::new(schema.clone(), 2, 1024);
1112        assert!(builder.is_empty());
1113
1114        let batch = RecordBatch::try_new(
1115            schema,
1116            vec![
1117                Arc::new(Int64Array::from(vec![1, 2])),
1118                Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])),
1119            ],
1120        )
1121        .unwrap();
1122
1123        builder.push_batch(0, batch);
1124        builder.push_row(0);
1125        builder.push_row(0);
1126
1127        assert!(!builder.is_empty());
1128        assert_eq!(builder.len(), 2);
1129
1130        let result_batch = builder.build_record_batch().unwrap().unwrap();
1131        assert_eq!(result_batch.num_rows(), 2);
1132    }
1133
1134    #[test]
1135    fn test_row_cursor_comparison() {
1136        // Create test batches for cursor comparison
1137        let batch1 = create_test_record_batch(
1138            &[b"k1", b"k1"],
1139            &[1000, 2000],
1140            &[22, 21],
1141            &[OpType::Put, OpType::Put],
1142            &[11, 12],
1143        );
1144        let batch2 = create_test_record_batch(
1145            &[b"k1", b"k1"],
1146            &[1000, 2000],
1147            &[23, 20], // Different sequences
1148            &[OpType::Put, OpType::Put],
1149            &[11, 12],
1150        );
1151
1152        let columns1 = SortColumns::new(&batch1);
1153        let columns2 = SortColumns::new(&batch2);
1154
1155        let cursor1 = RowCursor::new(columns1);
1156        let cursor2 = RowCursor::new(columns2);
1157
1158        // cursors with same pk and timestamp should be ordered by sequence desc
1159        // cursor1 has sequence 22, cursor2 has sequence 23, so cursor2 < cursor1 (higher sequence comes first)
1160        assert!(cursor2 < cursor1);
1161    }
1162}