Skip to main content

mito2/read/
flat_merge.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datatypes::arrow::array::{Array, AsArray, Int64Array, UInt64Array};
24use datatypes::arrow::compute::interleave;
25use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaRef, Utf8Type};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::arrow_array::BinaryArray;
29use datatypes::timestamp::timestamp_array_to_primitive;
30use futures::{Stream, TryStreamExt};
31use snafu::ResultExt;
32use store_api::storage::SequenceNumber;
33
34use crate::error::{ComputeArrowSnafu, Result};
35use crate::memtable::BoxedRecordBatchIterator;
36use crate::metrics::READ_STAGE_ELAPSED;
37use crate::read::BoxedRecordBatchStream;
38use crate::sst::parquet::flat_format::{
39    primary_key_column_index, sequence_column_index, time_index_column_index,
40};
41use crate::sst::parquet::format::PrimaryKeyArray;
42
43/// Checks whether interleaving the selected rows from byte columns would overflow
44/// i32 offsets. Similar to arrow-rs `interleave_bytes()`, accumulates offsets and
45/// returns an error if the capacity exceeds `i32::MAX`.
46///
47/// TODO(yingwen): Remove this after upgrading to arrow >= 58.1.0, which handles
48/// offset overflow in `interleave_bytes()` natively.
49///
50/// See: <https://github.com/apache/arrow-rs/blob/65ad652f2410fc51ad77da1805e85c0a76d9a7ea/arrow-select/src/interleave.rs#L208-L225>
51fn check_interleave_bytes_overflow<T: datatypes::arrow::datatypes::ByteArrayType>(
52    batches: &[(usize, RecordBatch)],
53    col_idx: usize,
54    indices: &[(usize, usize)],
55) -> std::result::Result<(), ArrowError> {
56    // Quick check: if concatenating all value data won't overflow, interleaving
57    // a subset of rows definitely won't either.
58    let total: usize = batches
59        .iter()
60        .map(|(_, batch)| batch.column(col_idx).as_bytes::<T>().value_data().len())
61        .sum();
62    if T::Offset::from_usize(total).is_some() {
63        return Ok(());
64    }
65    // Total exceeds the offset limit, do the precise per-row check.
66    let mut capacity: usize = 0;
67    for &(a, b) in indices {
68        let array = batches[a].1.column(col_idx).as_bytes::<T>();
69        let o = array.value_offsets();
70        let element_len = o[b + 1].as_usize() - o[b].as_usize();
71        capacity += element_len;
72        T::Offset::from_usize(capacity).ok_or(ArrowError::OffsetOverflowError(capacity))?;
73    }
74    Ok(())
75}
76
77/// Checks whether `interleave()` would overflow i32 offsets for `Utf8` or `Binary` columns.
78fn check_interleave_overflow(
79    batches: &[(usize, RecordBatch)],
80    schema: &SchemaRef,
81    indices: &[(usize, usize)],
82) -> Result<()> {
83    for (col_idx, field) in schema.fields.iter().enumerate() {
84        match field.data_type() {
85            DataType::Utf8 => {
86                check_interleave_bytes_overflow::<Utf8Type>(batches, col_idx, indices)
87                    .context(ComputeArrowSnafu)?;
88            }
89            DataType::Binary => {
90                check_interleave_bytes_overflow::<BinaryType>(batches, col_idx, indices)
91                    .context(ComputeArrowSnafu)?;
92            }
93            _ => continue,
94        }
95    }
96    Ok(())
97}
98
99/// Keeps track of the current position in a batch
100#[derive(Debug, Copy, Clone, Default)]
101struct BatchCursor {
102    /// The index into BatchBuilder::batches
103    batch_idx: usize,
104    /// The row index within the given batch
105    row_idx: usize,
106}
107
108/// Trait for reporting merge metrics.
109pub trait MergeMetricsReport: Send + Sync {
110    /// Reports and resets the metrics.
111    fn report(&self, metrics: &mut MergeMetrics);
112}
113
114/// Metrics for the merge reader.
115#[derive(Default)]
116pub struct MergeMetrics {
117    /// Cost to initialize the reader.
118    pub(crate) init_cost: Duration,
119    /// Total scan cost of the reader.
120    pub(crate) scan_cost: Duration,
121    /// Number of times to fetch batches.
122    pub(crate) num_fetch_by_batches: usize,
123    /// Number of times to fetch rows.
124    pub(crate) num_fetch_by_rows: usize,
125    /// Cost to fetch batches from sources.
126    pub(crate) fetch_cost: Duration,
127}
128
129impl fmt::Debug for MergeMetrics {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        if self.scan_cost.is_zero() {
132            return write!(f, "{{}}");
133        }
134
135        write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
136
137        if !self.init_cost.is_zero() {
138            write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
139        }
140        if self.num_fetch_by_batches > 0 {
141            write!(
142                f,
143                r#", "num_fetch_by_batches":{}"#,
144                self.num_fetch_by_batches
145            )?;
146        }
147        if self.num_fetch_by_rows > 0 {
148            write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
149        }
150        if !self.fetch_cost.is_zero() {
151            write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
152        }
153
154        write!(f, "}}")
155    }
156}
157
158impl MergeMetrics {
159    /// Merges metrics from another MergeMetrics instance.
160    pub(crate) fn merge(&mut self, other: &MergeMetrics) {
161        let MergeMetrics {
162            init_cost,
163            scan_cost,
164            num_fetch_by_batches,
165            num_fetch_by_rows,
166            fetch_cost,
167        } = other;
168
169        self.init_cost += *init_cost;
170        self.scan_cost += *scan_cost;
171        self.num_fetch_by_batches += *num_fetch_by_batches;
172        self.num_fetch_by_rows += *num_fetch_by_rows;
173        self.fetch_cost += *fetch_cost;
174    }
175
176    /// Reports the metrics if scan_cost exceeds 10ms and resets them.
177    pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
178        if self.scan_cost.as_millis() > 10
179            && let Some(r) = reporter
180        {
181            r.report(self);
182        }
183    }
184}
185
186/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
187// Ports from https://github.com/apache/datafusion/blob/49.0.0/datafusion/physical-plan/src/sorts/builder.rs
188// Adds the `take_remaining_rows()` method.
189#[derive(Debug)]
190pub struct BatchBuilder {
191    /// The schema of the RecordBatches yielded by this stream
192    schema: SchemaRef,
193
194    /// Maintain a list of [`RecordBatch`] and their corresponding stream
195    batches: Vec<(usize, RecordBatch)>,
196
197    /// The current [`BatchCursor`] for each stream
198    cursors: Vec<BatchCursor>,
199
200    /// The accumulated stream indexes from which to pull rows
201    /// Consists of a tuple of `(batch_idx, row_idx)`
202    indices: Vec<(usize, usize)>,
203}
204
205impl BatchBuilder {
206    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
207    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
208        Self {
209            schema,
210            batches: Vec::with_capacity(stream_count * 2),
211            cursors: vec![BatchCursor::default(); stream_count],
212            indices: Vec::with_capacity(batch_size),
213        }
214    }
215
216    /// Append a new batch in `stream_idx`
217    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
218        let batch_idx = self.batches.len();
219        self.batches.push((stream_idx, batch));
220        self.cursors[stream_idx] = BatchCursor {
221            batch_idx,
222            row_idx: 0,
223        };
224    }
225
226    /// Append the next row from `stream_idx`
227    pub fn push_row(&mut self, stream_idx: usize) {
228        let cursor = &mut self.cursors[stream_idx];
229        let row_idx = cursor.row_idx;
230        cursor.row_idx += 1;
231        self.indices.push((cursor.batch_idx, row_idx));
232    }
233
234    /// Returns the number of in-progress rows in this [`BatchBuilder`]
235    pub fn len(&self) -> usize {
236        self.indices.len()
237    }
238
239    /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
240    pub fn is_empty(&self) -> bool {
241        self.indices.is_empty()
242    }
243
244    /// Returns the schema of this [`BatchBuilder`]
245    pub fn schema(&self) -> &SchemaRef {
246        &self.schema
247    }
248
249    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
250    ///
251    /// Will then drop any batches for which all rows have been yielded to the output
252    ///
253    /// Returns `None` if no pending rows
254    pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
255        if self.is_empty() {
256            return Ok(None);
257        }
258
259        check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
260
261        let columns = (0..self.schema.fields.len())
262            .map(|column_idx| {
263                let arrays: Vec<_> = self
264                    .batches
265                    .iter()
266                    .map(|(_, batch)| batch.column(column_idx).as_ref())
267                    .collect();
268                interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
269            })
270            .collect::<Result<Vec<_>>>()?;
271
272        self.indices.clear();
273
274        // New cursors are only created once the previous cursor for the stream
275        // is finished. This means all remaining rows from all but the last batch
276        // for each stream have been yielded to the newly created record batch
277        //
278        // We can therefore drop all but the last batch for each stream
279        self.retain_batches();
280
281        RecordBatch::try_new(Arc::clone(&self.schema), columns)
282            .context(ComputeArrowSnafu)
283            .map(Some)
284    }
285
286    /// Slice and take remaining rows from the last batch of `stream_idx` and push
287    /// the next batch if available.
288    pub fn take_remaining_rows(
289        &mut self,
290        stream_idx: usize,
291        next: Option<RecordBatch>,
292    ) -> RecordBatch {
293        let cursor = &mut self.cursors[stream_idx];
294        let batch = &self.batches[cursor.batch_idx];
295        let output = batch
296            .1
297            .slice(cursor.row_idx, batch.1.num_rows() - cursor.row_idx);
298        cursor.row_idx = batch.1.num_rows();
299
300        if let Some(b) = next {
301            self.push_batch(stream_idx, b);
302            self.retain_batches();
303        }
304
305        output
306    }
307
308    fn retain_batches(&mut self) {
309        let mut batch_idx = 0;
310        let mut retained = 0;
311        self.batches.retain(|(stream_idx, _)| {
312            let stream_cursor = &mut self.cursors[*stream_idx];
313            let retain = stream_cursor.batch_idx == batch_idx;
314            batch_idx += 1;
315
316            if retain {
317                stream_cursor.batch_idx = retained;
318                retained += 1;
319            }
320            retain
321        });
322    }
323}
324
325/// A comparable node of the heap.
326trait NodeCmp: Eq + Ord {
327    /// Returns whether the node still has batch to read.
328    fn is_eof(&self) -> bool;
329
330    /// Returns true if the key range of current batch in `self` is behind (exclusive) current
331    /// batch in `other`.
332    ///
333    /// # Panics
334    /// Panics if either `self` or `other` is EOF.
335    fn is_behind(&self, other: &Self) -> bool;
336}
337
338/// Common algorithm of merging sorted batches from multiple nodes.
339struct MergeAlgo<T> {
340    /// Holds nodes whose key range of current batch **is** overlapped with the merge window.
341    /// Each node yields batches from a `source`.
342    ///
343    /// Node in this heap **MUST** not be empty. A `merge window` is the (primary key, timestamp)
344    /// range of the **root node** in the `hot` heap.
345    hot: BinaryHeap<T>,
346    /// Holds nodes whose key range of current batch **isn't** overlapped with the merge window.
347    ///
348    /// Nodes in this heap **MUST** not be empty.
349    cold: BinaryHeap<T>,
350}
351
352impl<T: NodeCmp> MergeAlgo<T> {
353    /// Creates a new merge algorithm from `nodes`.
354    ///
355    /// All nodes must be initialized.
356    fn new(mut nodes: Vec<T>) -> Self {
357        // Skips EOF nodes.
358        nodes.retain(|node| !node.is_eof());
359        let hot = BinaryHeap::with_capacity(nodes.len());
360        let cold = BinaryHeap::from(nodes);
361
362        let mut algo = MergeAlgo { hot, cold };
363        // Initializes the algorithm.
364        algo.refill_hot();
365
366        algo
367    }
368
369    /// Moves nodes in `cold` heap, whose key range is overlapped with current merge
370    /// window to `hot` heap.
371    fn refill_hot(&mut self) {
372        while !self.cold.is_empty() {
373            if let Some(merge_window) = self.hot.peek() {
374                let warmest = self.cold.peek().unwrap();
375                if warmest.is_behind(merge_window) {
376                    // if the warmest node in the `cold` heap is totally after the
377                    // `merge_window`, then no need to add more nodes into the `hot`
378                    // heap for merge sorting.
379                    break;
380                }
381            }
382
383            let warmest = self.cold.pop().unwrap();
384            self.hot.push(warmest);
385        }
386    }
387
388    /// Push the node popped from `hot` back to a proper heap.
389    fn reheap(&mut self, node: T) {
390        if node.is_eof() {
391            // If the node is EOF, don't put it into the heap again.
392            // The merge window would be updated, need to refill the hot heap.
393            self.refill_hot();
394        } else {
395            // Find a proper heap for this node.
396            let node_is_cold = if let Some(hottest) = self.hot.peek() {
397                // If key range of this node is behind the hottest node's then we can
398                // push it to the cold heap. Otherwise we should push it to the hot heap.
399                node.is_behind(hottest)
400            } else {
401                // The hot heap is empty, but we don't known whether the current
402                // batch of this node is still the hottest.
403                true
404            };
405
406            if node_is_cold {
407                self.cold.push(node);
408            } else {
409                self.hot.push(node);
410            }
411            // Anyway, the merge window has been changed, we need to refill the hot heap.
412            self.refill_hot();
413        }
414    }
415
416    /// Pops the hottest node.
417    fn pop_hot(&mut self) -> Option<T> {
418        self.hot.pop()
419    }
420
421    /// Returns true if there are rows in the hot heap.
422    fn has_rows(&self) -> bool {
423        !self.hot.is_empty()
424    }
425
426    /// Returns true if we can fetch a batch directly instead of a row.
427    fn can_fetch_batch(&self) -> bool {
428        self.hot.len() == 1
429    }
430}
431
432// TODO(yingwen): Further downcast and store arrays in this struct.
433/// Columns to compare for a [RecordBatch].
434struct SortColumns {
435    primary_key: PrimaryKeyArray,
436    timestamp: Int64Array,
437    sequence: UInt64Array,
438}
439
440impl SortColumns {
441    /// Creates a new [SortColumns] from a [RecordBatch] and the position of the time index column.
442    ///
443    /// # Panics
444    /// Panics if the input batch doesn't have correct internal columns.
445    fn new(batch: &RecordBatch) -> Self {
446        let num_columns = batch.num_columns();
447        let primary_key = batch
448            .column(primary_key_column_index(num_columns))
449            .as_any()
450            .downcast_ref::<PrimaryKeyArray>()
451            .unwrap()
452            .clone();
453        let timestamp = batch.column(time_index_column_index(num_columns));
454        let (timestamp, _unit) = timestamp_array_to_primitive(timestamp).unwrap();
455        let sequence = batch
456            .column(sequence_column_index(num_columns))
457            .as_any()
458            .downcast_ref::<UInt64Array>()
459            .unwrap()
460            .clone();
461
462        Self {
463            primary_key,
464            timestamp,
465            sequence,
466        }
467    }
468
469    fn primary_key_at(&self, index: usize) -> &[u8] {
470        let key = self.primary_key.keys().value(index);
471        let binary_values = self
472            .primary_key
473            .values()
474            .as_any()
475            .downcast_ref::<BinaryArray>()
476            .unwrap();
477        binary_values.value(key as usize)
478    }
479
480    fn timestamp_at(&self, index: usize) -> i64 {
481        self.timestamp.value(index)
482    }
483
484    fn sequence_at(&self, index: usize) -> SequenceNumber {
485        self.sequence.value(index)
486    }
487
488    fn num_rows(&self) -> usize {
489        self.timestamp.len()
490    }
491}
492
493/// Cursor to a row in the [RecordBatch].
494///
495/// It compares batches by rows. During comparison, it ignores op type as sequence is enough to
496/// distinguish different rows.
497struct RowCursor {
498    /// Current row offset.
499    offset: usize,
500    /// Keys of the batch.
501    columns: SortColumns,
502}
503
504impl RowCursor {
505    fn new(columns: SortColumns) -> Self {
506        debug_assert!(columns.num_rows() > 0);
507
508        Self { offset: 0, columns }
509    }
510
511    fn is_finished(&self) -> bool {
512        self.offset >= self.columns.num_rows()
513    }
514
515    fn advance(&mut self) {
516        self.offset += 1;
517    }
518
519    fn first_primary_key(&self) -> &[u8] {
520        self.columns.primary_key_at(self.offset)
521    }
522
523    fn first_timestamp(&self) -> i64 {
524        self.columns.timestamp_at(self.offset)
525    }
526
527    fn first_sequence(&self) -> SequenceNumber {
528        self.columns.sequence_at(self.offset)
529    }
530
531    fn last_primary_key(&self) -> &[u8] {
532        self.columns.primary_key_at(self.columns.num_rows() - 1)
533    }
534
535    fn last_timestamp(&self) -> i64 {
536        self.columns.timestamp_at(self.columns.num_rows() - 1)
537    }
538}
539
540impl PartialEq for RowCursor {
541    fn eq(&self, other: &Self) -> bool {
542        self.first_primary_key() == other.first_primary_key()
543            && self.first_timestamp() == other.first_timestamp()
544            && self.first_sequence() == other.first_sequence()
545    }
546}
547
548impl Eq for RowCursor {}
549
550impl PartialOrd for RowCursor {
551    fn partial_cmp(&self, other: &RowCursor) -> Option<Ordering> {
552        Some(self.cmp(other))
553    }
554}
555
556impl Ord for RowCursor {
557    /// Compares by primary key, time index, sequence desc.
558    fn cmp(&self, other: &RowCursor) -> Ordering {
559        self.first_primary_key()
560            .cmp(other.first_primary_key())
561            .then_with(|| self.first_timestamp().cmp(&other.first_timestamp()))
562            .then_with(|| other.first_sequence().cmp(&self.first_sequence()))
563    }
564}
565
566/// Iterator to merge multiple sorted iterators into a single sorted iterator.
567///
568/// All iterators must be sorted by primary key, time index, sequence desc.
569pub struct FlatMergeIterator {
570    /// The merge algorithm to maintain heaps.
571    algo: MergeAlgo<IterNode>,
572    /// Current buffered rows to output.
573    in_progress: BatchBuilder,
574    /// Non-empty batch to output.
575    output_batch: Option<RecordBatch>,
576    /// Batch size to merge rows.
577    /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
578    /// rows.
579    batch_size: usize,
580}
581
582impl FlatMergeIterator {
583    /// Creates a new iterator to merge sorted `iters`.
584    pub fn new(
585        schema: SchemaRef,
586        iters: Vec<BoxedRecordBatchIterator>,
587        batch_size: usize,
588    ) -> Result<Self> {
589        let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
590        let mut nodes = Vec::with_capacity(iters.len());
591        // Initialize nodes and the buffer.
592        for (node_index, iter) in iters.into_iter().enumerate() {
593            let mut node = IterNode {
594                node_index,
595                iter,
596                cursor: None,
597            };
598            if let Some(batch) = node.advance_batch()? {
599                in_progress.push_batch(node_index, batch);
600                nodes.push(node);
601            }
602        }
603
604        let algo = MergeAlgo::new(nodes);
605
606        let iter = Self {
607            algo,
608            in_progress,
609            output_batch: None,
610            batch_size,
611        };
612
613        Ok(iter)
614    }
615
616    /// Fetches next sorted batch.
617    pub fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
618        while self.algo.has_rows() && self.output_batch.is_none() {
619            if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
620                // Only one batch in the hot heap, but we have pending rows, output the pending rows first.
621                self.output_batch = self.in_progress.build_record_batch()?;
622                debug_assert!(self.output_batch.is_some());
623            } else if self.algo.can_fetch_batch() {
624                self.fetch_batch_from_hottest()?;
625            } else {
626                self.fetch_row_from_hottest()?;
627            }
628        }
629
630        Ok(self.output_batch.take())
631    }
632
633    /// Fetches a batch from the hottest node.
634    fn fetch_batch_from_hottest(&mut self) -> Result<()> {
635        debug_assert!(self.in_progress.is_empty());
636
637        // Safety: next_batch() ensures the heap is not empty.
638        let mut hottest = self.algo.pop_hot().unwrap();
639        debug_assert!(!hottest.current_cursor().is_finished());
640        let next = hottest.advance_batch()?;
641        // The node is the heap is not empty, so it must have existing rows in the builder.
642        let batch = self
643            .in_progress
644            .take_remaining_rows(hottest.node_index, next);
645        Self::maybe_output_batch(batch, &mut self.output_batch);
646        self.algo.reheap(hottest);
647
648        Ok(())
649    }
650
651    /// Fetches a row from the hottest node.
652    fn fetch_row_from_hottest(&mut self) -> Result<()> {
653        // Safety: next_batch() ensures the heap has more than 1 element.
654        let mut hottest = self.algo.pop_hot().unwrap();
655        debug_assert!(!hottest.current_cursor().is_finished());
656        self.in_progress.push_row(hottest.node_index);
657        if self.in_progress.len() >= self.batch_size {
658            // We buffered enough rows.
659            if let Some(output) = self.in_progress.build_record_batch()? {
660                Self::maybe_output_batch(output, &mut self.output_batch);
661            }
662        }
663
664        if let Some(next) = hottest.advance_row()? {
665            self.in_progress.push_batch(hottest.node_index, next);
666        }
667
668        self.algo.reheap(hottest);
669        Ok(())
670    }
671
672    /// Adds the batch to the output batch if it is not empty.
673    fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
674        debug_assert!(output_batch.is_none());
675        if batch.num_rows() > 0 {
676            *output_batch = Some(batch);
677        }
678    }
679}
680
681impl Iterator for FlatMergeIterator {
682    type Item = Result<RecordBatch>;
683
684    fn next(&mut self) -> Option<Self::Item> {
685        self.next_batch().transpose()
686    }
687}
688
689/// Iterator to merge multiple sorted iterators into a single sorted iterator.
690///
691/// All iterators must be sorted by primary key, time index, sequence desc.
692pub struct FlatMergeReader {
693    /// The merge algorithm to maintain heaps.
694    algo: MergeAlgo<StreamNode>,
695    /// Current buffered rows to output.
696    in_progress: BatchBuilder,
697    /// Non-empty batch to output.
698    output_batch: Option<RecordBatch>,
699    /// Batch size to merge rows.
700    /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
701    /// rows.
702    batch_size: usize,
703    /// Local metrics.
704    metrics: MergeMetrics,
705    /// Optional metrics reporter.
706    metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
707}
708
709impl FlatMergeReader {
710    /// Creates a new iterator to merge sorted `iters`.
711    pub async fn new(
712        schema: SchemaRef,
713        iters: Vec<BoxedRecordBatchStream>,
714        batch_size: usize,
715        metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
716    ) -> Result<Self> {
717        let start = Instant::now();
718        let metrics = MergeMetrics::default();
719        let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
720        let mut nodes = Vec::with_capacity(iters.len());
721        // Initialize nodes and the buffer.
722        for (node_index, iter) in iters.into_iter().enumerate() {
723            let mut node = StreamNode {
724                node_index,
725                iter,
726                cursor: None,
727            };
728            if let Some(batch) = node.advance_batch().await? {
729                in_progress.push_batch(node_index, batch);
730                nodes.push(node);
731            }
732        }
733
734        let algo = MergeAlgo::new(nodes);
735
736        let mut reader = Self {
737            algo,
738            in_progress,
739            output_batch: None,
740            batch_size,
741            metrics,
742            metrics_reporter,
743        };
744        let elapsed = start.elapsed();
745        reader.metrics.init_cost += elapsed;
746        reader.metrics.scan_cost += elapsed;
747
748        Ok(reader)
749    }
750
751    /// Fetches next sorted batch.
752    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
753        let start = Instant::now();
754        while self.algo.has_rows() && self.output_batch.is_none() {
755            if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
756                // Only one batch in the hot heap, but we have pending rows, output the pending rows first.
757                self.output_batch = self.in_progress.build_record_batch()?;
758                debug_assert!(self.output_batch.is_some());
759            } else if self.algo.can_fetch_batch() {
760                self.fetch_batch_from_hottest().await?;
761                self.metrics.num_fetch_by_batches += 1;
762            } else {
763                self.fetch_row_from_hottest().await?;
764                self.metrics.num_fetch_by_rows += 1;
765            }
766        }
767
768        if let Some(batch) = self.output_batch.take() {
769            self.metrics.scan_cost += start.elapsed();
770            self.metrics.maybe_report(&self.metrics_reporter);
771            Ok(Some(batch))
772        } else {
773            // No more batches.
774            self.metrics.scan_cost += start.elapsed();
775            self.metrics.maybe_report(&self.metrics_reporter);
776            Ok(None)
777        }
778    }
779
780    /// Converts the reader into a stream.
781    pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
782        try_stream! {
783            while let Some(batch) = self.next_batch().await? {
784                yield batch;
785            }
786        }
787    }
788
789    /// Fetches a batch from the hottest node.
790    async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
791        debug_assert!(self.in_progress.is_empty());
792
793        // Safety: next_batch() ensures the heap is not empty.
794        let mut hottest = self.algo.pop_hot().unwrap();
795        debug_assert!(!hottest.current_cursor().is_finished());
796        let start = Instant::now();
797        let next = hottest.advance_batch().await?;
798        self.metrics.fetch_cost += start.elapsed();
799        // The node is the heap is not empty, so it must have existing rows in the builder.
800        let batch = self
801            .in_progress
802            .take_remaining_rows(hottest.node_index, next);
803        Self::maybe_output_batch(batch, &mut self.output_batch);
804        self.algo.reheap(hottest);
805
806        Ok(())
807    }
808
809    /// Fetches a row from the hottest node.
810    async fn fetch_row_from_hottest(&mut self) -> Result<()> {
811        // Safety: next_batch() ensures the heap has more than 1 element.
812        let mut hottest = self.algo.pop_hot().unwrap();
813        debug_assert!(!hottest.current_cursor().is_finished());
814        self.in_progress.push_row(hottest.node_index);
815        if self.in_progress.len() >= self.batch_size {
816            // We buffered enough rows.
817            if let Some(output) = self.in_progress.build_record_batch()? {
818                Self::maybe_output_batch(output, &mut self.output_batch);
819            }
820        }
821
822        let start = Instant::now();
823        if let Some(next) = hottest.advance_row().await? {
824            self.metrics.fetch_cost += start.elapsed();
825            self.in_progress.push_batch(hottest.node_index, next);
826        } else {
827            self.metrics.fetch_cost += start.elapsed();
828        }
829
830        self.algo.reheap(hottest);
831        Ok(())
832    }
833
834    /// Adds the batch to the output batch if it is not empty.
835    fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
836        debug_assert!(output_batch.is_none());
837        if batch.num_rows() > 0 {
838            *output_batch = Some(batch);
839        }
840    }
841}
842
843impl Drop for FlatMergeReader {
844    fn drop(&mut self) {
845        debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
846
847        READ_STAGE_ELAPSED
848            .with_label_values(&["flat_merge"])
849            .observe(self.metrics.scan_cost.as_secs_f64());
850        READ_STAGE_ELAPSED
851            .with_label_values(&["flat_merge_fetch"])
852            .observe(self.metrics.fetch_cost.as_secs_f64());
853
854        // Report any remaining metrics.
855        if let Some(reporter) = &self.metrics_reporter {
856            reporter.report(&mut self.metrics);
857        }
858    }
859}
860
861/// A sync node in the merge iterator.
862struct GenericNode<T> {
863    /// Index of the node.
864    node_index: usize,
865    /// Iterator of this `Node`.
866    iter: T,
867    /// Current batch to be read. The node should ensure the batch is not empty (The
868    /// cursor is not finished).
869    ///
870    /// `None` means the `iter` has reached EOF.
871    cursor: Option<RowCursor>,
872}
873
874impl<T> NodeCmp for GenericNode<T> {
875    fn is_eof(&self) -> bool {
876        self.cursor.is_none()
877    }
878
879    fn is_behind(&self, other: &Self) -> bool {
880        debug_assert!(!self.current_cursor().is_finished());
881        debug_assert!(!other.current_cursor().is_finished());
882
883        // We only compare pk and timestamp so nodes in the cold
884        // heap don't have overlapping timestamps with the hottest node
885        // in the hot heap.
886        self.current_cursor()
887            .first_primary_key()
888            .cmp(other.current_cursor().last_primary_key())
889            .then_with(|| {
890                self.current_cursor()
891                    .first_timestamp()
892                    .cmp(&other.current_cursor().last_timestamp())
893            })
894            == Ordering::Greater
895    }
896}
897
898impl<T> PartialEq for GenericNode<T> {
899    fn eq(&self, other: &GenericNode<T>) -> bool {
900        self.cursor == other.cursor
901    }
902}
903
904impl<T> Eq for GenericNode<T> {}
905
906impl<T> PartialOrd for GenericNode<T> {
907    fn partial_cmp(&self, other: &GenericNode<T>) -> Option<Ordering> {
908        Some(self.cmp(other))
909    }
910}
911
912impl<T> Ord for GenericNode<T> {
913    fn cmp(&self, other: &GenericNode<T>) -> Ordering {
914        // The std binary heap is a max heap, but we want the nodes are ordered in
915        // ascend order, so we compare the nodes in reverse order.
916        other.cursor.cmp(&self.cursor)
917    }
918}
919
920impl<T> GenericNode<T> {
921    /// Returns current cursor.
922    ///
923    /// # Panics
924    /// Panics if the node has reached EOF.
925    fn current_cursor(&self) -> &RowCursor {
926        self.cursor.as_ref().unwrap()
927    }
928}
929
930impl GenericNode<BoxedRecordBatchIterator> {
931    /// Fetches a new batch from the iter and updates the cursor.
932    /// It advances the current batch.
933    /// Returns the fetched new batch.
934    fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
935        let batch = self.advance_inner_iter()?;
936        let columns = batch.as_ref().map(SortColumns::new);
937        self.cursor = columns.map(RowCursor::new);
938
939        Ok(batch)
940    }
941
942    /// Skips one row.
943    /// Returns the next batch if the current batch is finished.
944    fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
945        let cursor = self.cursor.as_mut().unwrap();
946        cursor.advance();
947        if !cursor.is_finished() {
948            return Ok(None);
949        }
950
951        // Finished current batch, need to fetch a new batch.
952        self.advance_batch()
953    }
954
955    /// Fetches a non-empty batch from the iter.
956    fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
957        while let Some(batch) = self.iter.next().transpose()? {
958            if batch.num_rows() > 0 {
959                return Ok(Some(batch));
960            }
961        }
962        Ok(None)
963    }
964}
965
966type StreamNode = GenericNode<BoxedRecordBatchStream>;
967type IterNode = GenericNode<BoxedRecordBatchIterator>;
968
969impl GenericNode<BoxedRecordBatchStream> {
970    /// Fetches a new batch from the iter and updates the cursor.
971    /// It advances the current batch.
972    /// Returns the fetched new batch.
973    async fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
974        let batch = self.advance_inner_iter().await?;
975        let columns = batch.as_ref().map(SortColumns::new);
976        self.cursor = columns.map(RowCursor::new);
977
978        Ok(batch)
979    }
980
981    /// Skips one row.
982    /// Returns the next batch if the current batch is finished.
983    async fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
984        let cursor = self.cursor.as_mut().unwrap();
985        cursor.advance();
986        if !cursor.is_finished() {
987            return Ok(None);
988        }
989
990        // Finished current batch, need to fetch a new batch.
991        self.advance_batch().await
992    }
993
994    /// Fetches a non-empty batch from the iter.
995    async fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
996        while let Some(batch) = self.iter.try_next().await? {
997            if batch.num_rows() > 0 {
998                return Ok(Some(batch));
999            }
1000        }
1001        Ok(None)
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use std::sync::Arc;
1008
1009    use api::v1::OpType;
1010    use datatypes::arrow::array::builder::BinaryDictionaryBuilder;
1011    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
1012    use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type};
1013    use datatypes::arrow::record_batch::RecordBatch;
1014
1015    use super::*;
1016
1017    /// Creates a test RecordBatch with the specified data.
1018    fn create_test_record_batch(
1019        primary_keys: &[&[u8]],
1020        timestamps: &[i64],
1021        sequences: &[u64],
1022        op_types: &[OpType],
1023        field_values: &[i64],
1024    ) -> RecordBatch {
1025        let schema = Arc::new(Schema::new(vec![
1026            Field::new("field1", DataType::Int64, false),
1027            Field::new(
1028                "timestamp",
1029                DataType::Timestamp(TimeUnit::Millisecond, None),
1030                false,
1031            ),
1032            Field::new(
1033                "__primary_key",
1034                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1035                false,
1036            ),
1037            Field::new("__sequence", DataType::UInt64, false),
1038            Field::new("__op_type", DataType::UInt8, false),
1039        ]));
1040
1041        let field1 = Arc::new(Int64Array::from_iter_values(field_values.iter().copied()));
1042        let timestamp = Arc::new(TimestampMillisecondArray::from_iter_values(
1043            timestamps.iter().copied(),
1044        ));
1045
1046        // Create primary key dictionary array using BinaryDictionaryBuilder
1047        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1048        for &key in primary_keys {
1049            builder.append(key).unwrap();
1050        }
1051        let primary_key = Arc::new(builder.finish());
1052
1053        let sequence = Arc::new(UInt64Array::from_iter_values(sequences.iter().copied()));
1054        let op_type = Arc::new(UInt8Array::from_iter_values(
1055            op_types.iter().map(|&v| v as u8),
1056        ));
1057
1058        RecordBatch::try_new(
1059            schema,
1060            vec![field1, timestamp, primary_key, sequence, op_type],
1061        )
1062        .unwrap()
1063    }
1064
1065    fn new_test_iter(batches: Vec<RecordBatch>) -> BoxedRecordBatchIterator {
1066        Box::new(batches.into_iter().map(Ok))
1067    }
1068
1069    /// Helper function to check if two record batches are equivalent.
1070    fn assert_record_batches_eq(expected: &[RecordBatch], actual: &[RecordBatch]) {
1071        for (exp, act) in expected.iter().zip(actual.iter()) {
1072            assert_eq!(exp, act,);
1073        }
1074    }
1075
1076    /// Helper function to collect all batches from a FlatMergeIterator.
1077    fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec<RecordBatch> {
1078        iter.map(|result| result.unwrap()).collect()
1079    }
1080
1081    #[test]
1082    fn test_merge_iterator_empty() {
1083        let schema = Arc::new(Schema::new(vec![
1084            Field::new("field1", DataType::Int64, false),
1085            Field::new(
1086                "timestamp",
1087                DataType::Timestamp(TimeUnit::Millisecond, None),
1088                false,
1089            ),
1090            Field::new(
1091                "__primary_key",
1092                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1093                false,
1094            ),
1095            Field::new("__sequence", DataType::UInt64, false),
1096            Field::new("__op_type", DataType::UInt8, false),
1097        ]));
1098
1099        let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap();
1100        assert!(merge_iter.next_batch().unwrap().is_none());
1101    }
1102
1103    #[test]
1104    fn test_merge_iterator_single_batch() {
1105        let batch = create_test_record_batch(
1106            &[b"k1", b"k1"],
1107            &[1000, 2000],
1108            &[21, 22],
1109            &[OpType::Put, OpType::Put],
1110            &[11, 12],
1111        );
1112
1113        let schema = batch.schema();
1114        let iter = Box::new(new_test_iter(vec![batch.clone()]));
1115
1116        let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap();
1117        let result = collect_merge_iterator_batches(merge_iter);
1118
1119        assert_eq!(result.len(), 1);
1120        assert_record_batches_eq(&[batch], &result);
1121    }
1122
1123    #[test]
1124    fn test_merge_iterator_non_overlapping() {
1125        let batch1 = create_test_record_batch(
1126            &[b"k1", b"k1"],
1127            &[1000, 2000],
1128            &[21, 22],
1129            &[OpType::Put, OpType::Put],
1130            &[11, 12],
1131        );
1132        let batch2 = create_test_record_batch(
1133            &[b"k1", b"k1"],
1134            &[4000, 5000],
1135            &[24, 25],
1136            &[OpType::Put, OpType::Put],
1137            &[14, 15],
1138        );
1139        let batch3 = create_test_record_batch(
1140            &[b"k2", b"k2"],
1141            &[2000, 3000],
1142            &[22, 23],
1143            &[OpType::Delete, OpType::Put],
1144            &[12, 13],
1145        );
1146
1147        let schema = batch1.schema();
1148        let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()]));
1149        let iter2 = Box::new(new_test_iter(vec![batch2.clone()]));
1150
1151        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1152        let result = collect_merge_iterator_batches(merge_iter);
1153
1154        // Results should be sorted by primary key, timestamp, sequence desc
1155        let expected = vec![batch1, batch2, batch3];
1156        assert_record_batches_eq(&expected, &result);
1157    }
1158
1159    #[test]
1160    fn test_merge_iterator_overlapping_timestamps() {
1161        // Create batches with overlapping timestamps but different sequences
1162        let batch1 = create_test_record_batch(
1163            &[b"k1", b"k1"],
1164            &[1000, 2000],
1165            &[21, 22],
1166            &[OpType::Put, OpType::Put],
1167            &[11, 12],
1168        );
1169        let batch2 = create_test_record_batch(
1170            &[b"k1", b"k1"],
1171            &[1500, 2500],
1172            &[31, 32],
1173            &[OpType::Put, OpType::Put],
1174            &[15, 25],
1175        );
1176
1177        let schema = batch1.schema();
1178        let iter1 = Box::new(new_test_iter(vec![batch1]));
1179        let iter2 = Box::new(new_test_iter(vec![batch2]));
1180
1181        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1182        let result = collect_merge_iterator_batches(merge_iter);
1183
1184        let expected = vec![
1185            create_test_record_batch(
1186                &[b"k1", b"k1"],
1187                &[1000, 1500],
1188                &[21, 31],
1189                &[OpType::Put, OpType::Put],
1190                &[11, 15],
1191            ),
1192            create_test_record_batch(&[b"k1"], &[2000], &[22], &[OpType::Put], &[12]),
1193            create_test_record_batch(&[b"k1"], &[2500], &[32], &[OpType::Put], &[25]),
1194        ];
1195        assert_record_batches_eq(&expected, &result);
1196    }
1197
1198    #[test]
1199    fn test_merge_iterator_duplicate_keys_sequences() {
1200        // Test with same primary key and timestamp but different sequences
1201        let batch1 = create_test_record_batch(
1202            &[b"k1", b"k1"],
1203            &[1000, 1000],
1204            &[20, 10],
1205            &[OpType::Put, OpType::Put],
1206            &[1, 2],
1207        );
1208        let batch2 = create_test_record_batch(
1209            &[b"k1"],
1210            &[1000],
1211            &[15], // Middle sequence
1212            &[OpType::Put],
1213            &[3],
1214        );
1215
1216        let schema = batch1.schema();
1217        let iter1 = Box::new(new_test_iter(vec![batch1]));
1218        let iter2 = Box::new(new_test_iter(vec![batch2]));
1219
1220        let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1221        let result = collect_merge_iterator_batches(merge_iter);
1222
1223        // Should be sorted by sequence descending for same key/timestamp
1224        let expected = vec![
1225            create_test_record_batch(
1226                &[b"k1", b"k1"],
1227                &[1000, 1000],
1228                &[20, 15],
1229                &[OpType::Put, OpType::Put],
1230                &[1, 3],
1231            ),
1232            create_test_record_batch(&[b"k1"], &[1000], &[10], &[OpType::Put], &[2]),
1233        ];
1234        assert_record_batches_eq(&expected, &result);
1235    }
1236
1237    #[test]
1238    fn test_batch_builder_basic() {
1239        let schema = Arc::new(Schema::new(vec![
1240            Field::new("field1", DataType::Int64, false),
1241            Field::new(
1242                "timestamp",
1243                DataType::Timestamp(TimeUnit::Millisecond, None),
1244                false,
1245            ),
1246        ]));
1247
1248        let mut builder = BatchBuilder::new(schema.clone(), 2, 1024);
1249        assert!(builder.is_empty());
1250
1251        let batch = RecordBatch::try_new(
1252            schema,
1253            vec![
1254                Arc::new(Int64Array::from(vec![1, 2])),
1255                Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])),
1256            ],
1257        )
1258        .unwrap();
1259
1260        builder.push_batch(0, batch);
1261        builder.push_row(0);
1262        builder.push_row(0);
1263
1264        assert!(!builder.is_empty());
1265        assert_eq!(builder.len(), 2);
1266
1267        let result_batch = builder.build_record_batch().unwrap().unwrap();
1268        assert_eq!(result_batch.num_rows(), 2);
1269    }
1270
1271    #[test]
1272    fn test_row_cursor_comparison() {
1273        // Create test batches for cursor comparison
1274        let batch1 = create_test_record_batch(
1275            &[b"k1", b"k1"],
1276            &[1000, 2000],
1277            &[22, 21],
1278            &[OpType::Put, OpType::Put],
1279            &[11, 12],
1280        );
1281        let batch2 = create_test_record_batch(
1282            &[b"k1", b"k1"],
1283            &[1000, 2000],
1284            &[23, 20], // Different sequences
1285            &[OpType::Put, OpType::Put],
1286            &[11, 12],
1287        );
1288
1289        let columns1 = SortColumns::new(&batch1);
1290        let columns2 = SortColumns::new(&batch2);
1291
1292        let cursor1 = RowCursor::new(columns1);
1293        let cursor2 = RowCursor::new(columns2);
1294
1295        // cursors with same pk and timestamp should be ordered by sequence desc
1296        // cursor1 has sequence 22, cursor2 has sequence 23, so cursor2 < cursor1 (higher sequence comes first)
1297        assert!(cursor2 < cursor1);
1298    }
1299}