Skip to main content

mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod projection;
25pub(crate) mod prune;
26pub(crate) mod pruner;
27pub mod range;
28#[cfg(feature = "test")]
29pub mod range_cache;
30#[cfg(not(feature = "test"))]
31pub(crate) mod range_cache;
32pub(crate) mod read_columns;
33pub mod scan_region;
34pub mod scan_util;
35pub(crate) mod seq_scan;
36pub mod series_scan;
37pub mod stream;
38pub(crate) mod unordered_scan;
39
40use std::collections::HashMap;
41use std::sync::Arc;
42use std::time::Duration;
43
44use api::v1::OpType;
45use async_trait::async_trait;
46use common_time::Timestamp;
47use datafusion_common::arrow::array::UInt8Array;
48use datatypes::arrow;
49use datatypes::arrow::array::{Array, ArrayRef};
50use datatypes::arrow::compute::SortOptions;
51use datatypes::arrow::record_batch::RecordBatch;
52use datatypes::arrow::row::{RowConverter, SortField};
53use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
54use datatypes::scalars::ScalarVectorBuilder;
55use datatypes::types::TimestampType;
56use datatypes::value::{Value, ValueRef};
57use datatypes::vectors::{
58    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
59    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
60    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
61    VectorRef,
62};
63use futures::TryStreamExt;
64use futures::stream::BoxStream;
65use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
66use snafu::{OptionExt, ResultExt, ensure};
67use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
68
69use crate::error::{
70    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
71    Result,
72};
73use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
74/// Storage internal representation of a batch of rows for a primary key (time series).
75///
76/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
77/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
78#[derive(Debug, PartialEq, Clone)]
79pub struct Batch {
80    /// Primary key encoded in a comparable form.
81    primary_key: Vec<u8>,
82    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
83    pk_values: Option<CompositeValues>,
84    /// Timestamps of rows, should be sorted and not null.
85    timestamps: VectorRef,
86    /// Sequences of rows
87    ///
88    /// UInt64 type, not null.
89    sequences: Arc<UInt64Vector>,
90    /// Op types of rows
91    ///
92    /// UInt8 type, not null.
93    op_types: Arc<UInt8Vector>,
94    /// Fields organized in columnar format.
95    fields: Vec<BatchColumn>,
96    /// Cache for field index lookup.
97    fields_idx: Option<HashMap<ColumnId, usize>>,
98}
99
100impl Batch {
101    /// Creates a new batch.
102    pub fn new(
103        primary_key: Vec<u8>,
104        timestamps: VectorRef,
105        sequences: Arc<UInt64Vector>,
106        op_types: Arc<UInt8Vector>,
107        fields: Vec<BatchColumn>,
108    ) -> Result<Batch> {
109        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
110            .with_fields(fields)
111            .build()
112    }
113
114    /// Tries to set fields for the batch.
115    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
116        Batch::new(
117            self.primary_key,
118            self.timestamps,
119            self.sequences,
120            self.op_types,
121            fields,
122        )
123    }
124
125    /// Returns primary key of the batch.
126    pub fn primary_key(&self) -> &[u8] {
127        &self.primary_key
128    }
129
130    /// Returns possibly decoded primary-key values.
131    pub fn pk_values(&self) -> Option<&CompositeValues> {
132        self.pk_values.as_ref()
133    }
134
135    /// Sets possibly decoded primary-key values.
136    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
137        self.pk_values = Some(pk_values);
138    }
139
140    /// Removes possibly decoded primary-key values. For testing only.
141    #[cfg(any(test, feature = "test"))]
142    pub fn remove_pk_values(&mut self) {
143        self.pk_values = None;
144    }
145
146    /// Returns fields in the batch.
147    pub fn fields(&self) -> &[BatchColumn] {
148        &self.fields
149    }
150
151    /// Returns timestamps of the batch.
152    pub fn timestamps(&self) -> &VectorRef {
153        &self.timestamps
154    }
155
156    /// Returns sequences of the batch.
157    pub fn sequences(&self) -> &Arc<UInt64Vector> {
158        &self.sequences
159    }
160
161    /// Returns op types of the batch.
162    pub fn op_types(&self) -> &Arc<UInt8Vector> {
163        &self.op_types
164    }
165
166    /// Returns the number of rows in the batch.
167    pub fn num_rows(&self) -> usize {
168        // All vectors have the same length. We use the length of sequences vector
169        // since it has static type.
170        self.sequences.len()
171    }
172
173    /// Create an empty [`Batch`].
174    #[allow(dead_code)]
175    pub(crate) fn empty() -> Self {
176        Self {
177            primary_key: vec![],
178            pk_values: None,
179            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
180            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
181            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
182            fields: vec![],
183            fields_idx: None,
184        }
185    }
186
187    /// Returns true if the number of rows in the batch is 0.
188    pub fn is_empty(&self) -> bool {
189        self.num_rows() == 0
190    }
191
192    /// Returns the first timestamp in the batch or `None` if the batch is empty.
193    pub fn first_timestamp(&self) -> Option<Timestamp> {
194        if self.timestamps.is_empty() {
195            return None;
196        }
197
198        Some(self.get_timestamp(0))
199    }
200
201    /// Returns the last timestamp in the batch or `None` if the batch is empty.
202    pub fn last_timestamp(&self) -> Option<Timestamp> {
203        if self.timestamps.is_empty() {
204            return None;
205        }
206
207        Some(self.get_timestamp(self.timestamps.len() - 1))
208    }
209
210    /// Returns the first sequence in the batch or `None` if the batch is empty.
211    pub fn first_sequence(&self) -> Option<SequenceNumber> {
212        if self.sequences.is_empty() {
213            return None;
214        }
215
216        Some(self.get_sequence(0))
217    }
218
219    /// Returns the last sequence in the batch or `None` if the batch is empty.
220    pub fn last_sequence(&self) -> Option<SequenceNumber> {
221        if self.sequences.is_empty() {
222            return None;
223        }
224
225        Some(self.get_sequence(self.sequences.len() - 1))
226    }
227
228    /// Replaces the primary key of the batch.
229    ///
230    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
231    /// Be sure to update that field as well.
232    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
233        self.primary_key = primary_key;
234    }
235
236    /// Slice the batch, returning a new batch.
237    ///
238    /// # Panics
239    /// Panics if `offset + length > self.num_rows()`.
240    pub fn slice(&self, offset: usize, length: usize) -> Batch {
241        let fields = self
242            .fields
243            .iter()
244            .map(|column| BatchColumn {
245                column_id: column.column_id,
246                data: column.data.slice(offset, length),
247            })
248            .collect();
249        // We skip using the builder to avoid validating the batch again.
250        Batch {
251            // Now we need to clone the primary key. We could try `Bytes` if
252            // this becomes a bottleneck.
253            primary_key: self.primary_key.clone(),
254            pk_values: self.pk_values.clone(),
255            timestamps: self.timestamps.slice(offset, length),
256            sequences: Arc::new(self.sequences.get_slice(offset, length)),
257            op_types: Arc::new(self.op_types.get_slice(offset, length)),
258            fields,
259            fields_idx: self.fields_idx.clone(),
260        }
261    }
262
263    /// Takes `batches` and concat them into one batch.
264    ///
265    /// All `batches` must have the same primary key.
266    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
267        ensure!(
268            !batches.is_empty(),
269            InvalidBatchSnafu {
270                reason: "empty batches",
271            }
272        );
273        if batches.len() == 1 {
274            // Now we own the `batches` so we could pop it directly.
275            return Ok(batches.pop().unwrap());
276        }
277
278        let primary_key = std::mem::take(&mut batches[0].primary_key);
279        let first = &batches[0];
280        // We took the primary key from the first batch so we don't use `first.primary_key()`.
281        ensure!(
282            batches
283                .iter()
284                .skip(1)
285                .all(|b| b.primary_key() == primary_key),
286            InvalidBatchSnafu {
287                reason: "batches have different primary key",
288            }
289        );
290        for b in batches.iter().skip(1) {
291            ensure!(
292                b.fields.len() == first.fields.len(),
293                InvalidBatchSnafu {
294                    reason: "batches have different field num",
295                }
296            );
297            for (l, r) in b.fields.iter().zip(&first.fields) {
298                ensure!(
299                    l.column_id == r.column_id,
300                    InvalidBatchSnafu {
301                        reason: "batches have different fields",
302                    }
303                );
304            }
305        }
306
307        // We take the primary key from the first batch.
308        let mut builder = BatchBuilder::new(primary_key);
309        // Concat timestamps, sequences, op_types, fields.
310        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
311        builder.timestamps_array(array)?;
312        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
313        builder.sequences_array(array)?;
314        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
315        builder.op_types_array(array)?;
316        for (i, batch_column) in first.fields.iter().enumerate() {
317            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
318            builder.push_field_array(batch_column.column_id, array)?;
319        }
320
321        builder.build()
322    }
323
324    /// Removes rows whose op type is delete.
325    pub fn filter_deleted(&mut self) -> Result<()> {
326        // Safety: op type column is not null.
327        let array = self.op_types.as_arrow();
328        // Find rows with non-delete op type.
329        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
330        let predicate =
331            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
332        self.filter(&BooleanVector::from(predicate))
333    }
334
335    // Applies the `predicate` to the batch.
336    // Safety: We know the array type so we unwrap on casting.
337    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
338        self.timestamps = self
339            .timestamps
340            .filter(predicate)
341            .context(ComputeVectorSnafu)?;
342        self.sequences = Arc::new(
343            UInt64Vector::try_from_arrow_array(
344                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
345                    .context(ComputeArrowSnafu)?,
346            )
347            .unwrap(),
348        );
349        self.op_types = Arc::new(
350            UInt8Vector::try_from_arrow_array(
351                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
352                    .context(ComputeArrowSnafu)?,
353            )
354            .unwrap(),
355        );
356        for batch_column in &mut self.fields {
357            batch_column.data = batch_column
358                .data
359                .filter(predicate)
360                .context(ComputeVectorSnafu)?;
361        }
362
363        Ok(())
364    }
365
366    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
367    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
368        let seq_range = match sequence {
369            None => return Ok(()),
370            Some(seq_range) => {
371                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
372                else {
373                    return Ok(());
374                };
375                let is_subset = match seq_range {
376                    SequenceRange::Gt { min } => min < first,
377                    SequenceRange::LtEq { max } => max >= last,
378                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
379                };
380                if is_subset {
381                    return Ok(());
382                }
383                seq_range
384            }
385        };
386
387        let seqs = self.sequences.as_arrow();
388        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
389
390        let predicate = BooleanVector::from(predicate);
391        self.filter(&predicate)?;
392
393        Ok(())
394    }
395
396    /// Sorts rows in the batch. If `dedup` is true, it also removes
397    /// duplicated rows according to primary keys.
398    ///
399    /// It orders rows by timestamp, sequence desc and only keep the latest
400    /// row for the same timestamp. It doesn't consider op type as sequence
401    /// should already provide uniqueness for a row.
402    pub fn sort(&mut self, dedup: bool) -> Result<()> {
403        // If building a converter each time is costly, we may allow passing a
404        // converter.
405        let converter = RowConverter::new(vec![
406            SortField::new(self.timestamps.data_type().as_arrow_type()),
407            SortField::new_with_options(
408                self.sequences.data_type().as_arrow_type(),
409                SortOptions {
410                    descending: true,
411                    ..Default::default()
412                },
413            ),
414        ])
415        .context(ComputeArrowSnafu)?;
416        // Columns to sort.
417        let columns = [
418            self.timestamps.to_arrow_array(),
419            self.sequences.to_arrow_array(),
420        ];
421        let rows = converter.convert_columns(&columns).unwrap();
422        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
423
424        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
425        if !was_sorted {
426            to_sort.sort_unstable_by_key(|x| x.1);
427        }
428
429        let num_rows = to_sort.len();
430        if dedup {
431            // Dedup by timestamps.
432            to_sort.dedup_by(|left, right| {
433                debug_assert_eq!(18, left.1.as_ref().len());
434                debug_assert_eq!(18, right.1.as_ref().len());
435                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
436                // We only compare the timestamp part and ignore sequence.
437                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
438            });
439        }
440        let no_dedup = to_sort.len() == num_rows;
441
442        if was_sorted && no_dedup {
443            return Ok(());
444        }
445        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
446        self.take_in_place(&indices)
447    }
448
449    /// Merges duplicated timestamps in the batch by keeping the latest non-null field values.
450    ///
451    /// Rows must already be sorted by timestamp (ascending) and sequence (descending).
452    ///
453    /// This method deduplicates rows with the same timestamp (keeping the first row in each
454    /// timestamp range as the base row) and fills null fields from subsequent rows until all
455    /// fields are filled or a delete operation is encountered.
456    pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
457        let num_rows = self.num_rows();
458        if num_rows < 2 {
459            return Ok(());
460        }
461
462        let Some(timestamps) = self.timestamps_native() else {
463            return Ok(());
464        };
465
466        // Fast path: check if there are any duplicate timestamps.
467        let mut has_dup = false;
468        let mut group_count = 1;
469        for i in 1..num_rows {
470            has_dup |= timestamps[i] == timestamps[i - 1];
471            group_count += (timestamps[i] != timestamps[i - 1]) as usize;
472        }
473        if !has_dup {
474            return Ok(());
475        }
476
477        let num_fields = self.fields.len();
478        let op_types = self.op_types.as_arrow().values();
479
480        let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
481        let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
482            .map(|_| Vec::with_capacity(group_count))
483            .collect();
484
485        let mut start = 0;
486        while start < num_rows {
487            let ts = timestamps[start];
488            let mut end = start + 1;
489            while end < num_rows && timestamps[end] == ts {
490                end += 1;
491            }
492
493            let group_pos = base_indices.len();
494            base_indices.push(start as u32);
495
496            if num_fields > 0 {
497                // Default: take the base row for all fields.
498                for idx in &mut field_indices {
499                    idx.push(start as u32);
500                }
501
502                let base_deleted = op_types[start] == OpType::Delete as u8;
503                if !base_deleted {
504                    // Track fields that are null in the base row and try to fill them from older
505                    // rows in the same timestamp range.
506                    let mut missing_fields = Vec::new();
507                    for (field_idx, col) in self.fields.iter().enumerate() {
508                        if col.data.is_null(start) {
509                            missing_fields.push(field_idx);
510                        }
511                    }
512
513                    if !missing_fields.is_empty() {
514                        for row_idx in (start + 1)..end {
515                            if op_types[row_idx] == OpType::Delete as u8 {
516                                break;
517                            }
518
519                            missing_fields.retain(|&field_idx| {
520                                if self.fields[field_idx].data.is_null(row_idx) {
521                                    true
522                                } else {
523                                    field_indices[field_idx][group_pos] = row_idx as u32;
524                                    false
525                                }
526                            });
527
528                            if missing_fields.is_empty() {
529                                break;
530                            }
531                        }
532                    }
533                }
534            }
535
536            start = end;
537        }
538
539        let base_indices = UInt32Vector::from_vec(base_indices);
540        self.timestamps = self
541            .timestamps
542            .take(&base_indices)
543            .context(ComputeVectorSnafu)?;
544        let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
545            .context(ComputeArrowSnafu)?;
546        // Safety: We know the array and vector type.
547        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
548        let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
549            .context(ComputeArrowSnafu)?;
550        // Safety: We know the array and vector type.
551        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
552
553        for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
554            let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
555            batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
556        }
557
558        Ok(())
559    }
560
561    /// Returns the estimated memory size of the batch.
562    pub fn memory_size(&self) -> usize {
563        let mut size = std::mem::size_of::<Self>();
564        size += self.primary_key.len();
565        size += self.timestamps.memory_size();
566        size += self.sequences.memory_size();
567        size += self.op_types.memory_size();
568        for batch_column in &self.fields {
569            size += batch_column.data.memory_size();
570        }
571        size
572    }
573
574    /// Returns timestamps in a native slice or `None` if the batch is empty.
575    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
576        if self.timestamps.is_empty() {
577            return None;
578        }
579
580        let values = match self.timestamps.data_type() {
581            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
582                .timestamps
583                .as_any()
584                .downcast_ref::<TimestampSecondVector>()
585                .unwrap()
586                .as_arrow()
587                .values(),
588            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
589                .timestamps
590                .as_any()
591                .downcast_ref::<TimestampMillisecondVector>()
592                .unwrap()
593                .as_arrow()
594                .values(),
595            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
596                .timestamps
597                .as_any()
598                .downcast_ref::<TimestampMicrosecondVector>()
599                .unwrap()
600                .as_arrow()
601                .values(),
602            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
603                .timestamps
604                .as_any()
605                .downcast_ref::<TimestampNanosecondVector>()
606                .unwrap()
607                .as_arrow()
608                .values(),
609            other => panic!("timestamps in a Batch has other type {:?}", other),
610        };
611
612        Some(values)
613    }
614
615    /// Takes the batch in place.
616    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
617        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
618        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
619            .context(ComputeArrowSnafu)?;
620        // Safety: we know the array and vector type.
621        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
622        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
623            .context(ComputeArrowSnafu)?;
624        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
625        for batch_column in &mut self.fields {
626            batch_column.data = batch_column
627                .data
628                .take(indices)
629                .context(ComputeVectorSnafu)?;
630        }
631
632        Ok(())
633    }
634
635    /// Gets a timestamp at given `index`.
636    ///
637    /// # Panics
638    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
639    fn get_timestamp(&self, index: usize) -> Timestamp {
640        match self.timestamps.get_ref(index) {
641            ValueRef::Timestamp(timestamp) => timestamp,
642
643            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
644            value => panic!("{:?} is not a timestamp", value),
645        }
646    }
647
648    /// Gets a sequence at given `index`.
649    ///
650    /// # Panics
651    /// Panics if `index` is out-of-bound or the sequence vector returns null.
652    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
653        // Safety: sequences is not null so it actually returns Some.
654        self.sequences.get_data(index).unwrap()
655    }
656
657    /// Checks the batch is monotonic by timestamps.
658    #[cfg(debug_assertions)]
659    #[allow(dead_code)]
660    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
661        use std::cmp::Ordering;
662        if self.timestamps_native().is_none() {
663            return Ok(());
664        }
665
666        let timestamps = self.timestamps_native().unwrap();
667        let sequences = self.sequences.as_arrow().values();
668        for (i, window) in timestamps.windows(2).enumerate() {
669            let current = window[0];
670            let next = window[1];
671            let current_sequence = sequences[i];
672            let next_sequence = sequences[i + 1];
673            match current.cmp(&next) {
674                Ordering::Less => {
675                    // The current timestamp is less than the next timestamp.
676                    continue;
677                }
678                Ordering::Equal => {
679                    // The current timestamp is equal to the next timestamp.
680                    if current_sequence < next_sequence {
681                        return Err(format!(
682                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
683                            current, next, current_sequence, next_sequence, i
684                        ));
685                    }
686                }
687                Ordering::Greater => {
688                    // The current timestamp is greater than the next timestamp.
689                    return Err(format!(
690                        "timestamps are not monotonic: {} > {}, index: {}",
691                        current, next, i
692                    ));
693                }
694            }
695        }
696
697        Ok(())
698    }
699
700    /// Returns Ok if the given batch is behind the current batch.
701    #[cfg(debug_assertions)]
702    #[allow(dead_code)]
703    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
704        // Checks the primary key
705        if self.primary_key() < other.primary_key() {
706            return Ok(());
707        }
708        if self.primary_key() > other.primary_key() {
709            return Err(format!(
710                "primary key is not monotonic: {:?} > {:?}",
711                self.primary_key(),
712                other.primary_key()
713            ));
714        }
715        // Checks the timestamp.
716        if self.last_timestamp() < other.first_timestamp() {
717            return Ok(());
718        }
719        if self.last_timestamp() > other.first_timestamp() {
720            return Err(format!(
721                "timestamps are not monotonic: {:?} > {:?}",
722                self.last_timestamp(),
723                other.first_timestamp()
724            ));
725        }
726        // Checks the sequence.
727        if self.last_sequence() >= other.first_sequence() {
728            return Ok(());
729        }
730        Err(format!(
731            "sequences are not monotonic: {:?} < {:?}",
732            self.last_sequence(),
733            other.first_sequence()
734        ))
735    }
736
737    /// Returns the value of the column in the primary key.
738    ///
739    /// Lazily decodes the primary key and caches the result.
740    pub fn pk_col_value(
741        &mut self,
742        codec: &dyn PrimaryKeyCodec,
743        col_idx_in_pk: usize,
744        column_id: ColumnId,
745    ) -> Result<Option<&Value>> {
746        if self.pk_values.is_none() {
747            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
748        }
749
750        let pk_values = self.pk_values.as_ref().unwrap();
751        Ok(match pk_values {
752            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
753            CompositeValues::Sparse(values) => values.get(&column_id),
754        })
755    }
756
757    /// Returns values of the field in the batch.
758    ///
759    /// Lazily caches the field index.
760    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
761        if self.fields_idx.is_none() {
762            self.fields_idx = Some(
763                self.fields
764                    .iter()
765                    .enumerate()
766                    .map(|(i, c)| (c.column_id, i))
767                    .collect(),
768            );
769        }
770
771        self.fields_idx
772            .as_ref()
773            .unwrap()
774            .get(&column_id)
775            .map(|&idx| &self.fields[idx])
776    }
777}
778
779/// A struct to check the batch is monotonic.
780#[cfg(debug_assertions)]
781#[derive(Default)]
782#[allow(dead_code)]
783pub(crate) struct BatchChecker {
784    last_batch: Option<Batch>,
785    start: Option<Timestamp>,
786    end: Option<Timestamp>,
787}
788
789#[cfg(debug_assertions)]
790#[allow(dead_code)]
791impl BatchChecker {
792    /// Attaches the given start timestamp to the checker.
793    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
794        self.start = start;
795        self
796    }
797
798    /// Attaches the given end timestamp to the checker.
799    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
800        self.end = end;
801        self
802    }
803
804    /// Returns true if the given batch is monotonic and behind
805    /// the last batch.
806    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
807        batch.check_monotonic()?;
808
809        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
810            && start > first
811        {
812            return Err(format!(
813                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
814                start, first
815            ));
816        }
817        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
818            && end <= last
819        {
820            return Err(format!(
821                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
822                end, last
823            ));
824        }
825
826        // Checks the batch is behind the last batch.
827        // Then Updates the last batch.
828        let res = self
829            .last_batch
830            .as_ref()
831            .map(|last| last.check_next_batch(batch))
832            .unwrap_or(Ok(()));
833        self.last_batch = Some(batch.clone());
834        res
835    }
836
837    /// Formats current batch and last batch for debug.
838    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
839        use std::fmt::Write;
840
841        let mut message = String::new();
842        if let Some(last) = &self.last_batch {
843            write!(
844                message,
845                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
846                last.primary_key(),
847                last.last_timestamp(),
848                last.last_sequence()
849            )
850            .unwrap();
851        }
852        write!(
853            message,
854            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
855            batch.primary_key(),
856            batch.timestamps(),
857            batch.sequences()
858        )
859        .unwrap();
860
861        message
862    }
863
864    /// Checks batches from the part range are monotonic. Otherwise, panics.
865    pub(crate) fn ensure_part_range_batch(
866        &mut self,
867        scanner: &str,
868        region_id: store_api::storage::RegionId,
869        partition: usize,
870        part_range: store_api::region_engine::PartitionRange,
871        batch: &Batch,
872    ) {
873        if let Err(e) = self.check_monotonic(batch) {
874            let err_msg = format!(
875                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
876                scanner, e, region_id, partition, part_range,
877            );
878            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
879            // Only print the number of row in the panic message.
880            panic!("{err_msg}, batch rows: {}", batch.num_rows());
881        }
882    }
883}
884
885/// Len of timestamp in arrow row format.
886const TIMESTAMP_KEY_LEN: usize = 9;
887
888/// Helper function to concat arrays from `iter`.
889fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
890    let arrays: Vec<_> = iter.collect();
891    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
892    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
893}
894
895/// A column in a [Batch].
896#[derive(Debug, PartialEq, Eq, Clone)]
897pub struct BatchColumn {
898    /// Id of the column.
899    pub column_id: ColumnId,
900    /// Data of the column.
901    pub data: VectorRef,
902}
903
904/// Builder to build [Batch].
905pub struct BatchBuilder {
906    primary_key: Vec<u8>,
907    timestamps: Option<VectorRef>,
908    sequences: Option<Arc<UInt64Vector>>,
909    op_types: Option<Arc<UInt8Vector>>,
910    fields: Vec<BatchColumn>,
911}
912
913impl BatchBuilder {
914    /// Creates a new [BatchBuilder] with primary key.
915    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
916        BatchBuilder {
917            primary_key,
918            timestamps: None,
919            sequences: None,
920            op_types: None,
921            fields: Vec::new(),
922        }
923    }
924
925    /// Creates a new [BatchBuilder] with all required columns.
926    pub fn with_required_columns(
927        primary_key: Vec<u8>,
928        timestamps: VectorRef,
929        sequences: Arc<UInt64Vector>,
930        op_types: Arc<UInt8Vector>,
931    ) -> BatchBuilder {
932        BatchBuilder {
933            primary_key,
934            timestamps: Some(timestamps),
935            sequences: Some(sequences),
936            op_types: Some(op_types),
937            fields: Vec::new(),
938        }
939    }
940
941    /// Set all field columns.
942    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
943        self.fields = fields;
944        self
945    }
946
947    /// Push a field column.
948    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
949        self.fields.push(column);
950        self
951    }
952
953    /// Push an array as a field.
954    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
955        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
956        self.fields.push(BatchColumn {
957            column_id,
958            data: vector,
959        });
960
961        Ok(self)
962    }
963
964    /// Try to set an array as timestamps.
965    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
966        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
967        ensure!(
968            vector.data_type().is_timestamp(),
969            InvalidBatchSnafu {
970                reason: format!("{:?} is not a timestamp type", vector.data_type()),
971            }
972        );
973
974        self.timestamps = Some(vector);
975        Ok(self)
976    }
977
978    /// Try to set an array as sequences.
979    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
980        ensure!(
981            *array.data_type() == arrow::datatypes::DataType::UInt64,
982            InvalidBatchSnafu {
983                reason: "sequence array is not UInt64 type",
984            }
985        );
986        // Safety: The cast must success as we have ensured it is uint64 type.
987        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
988        self.sequences = Some(vector);
989
990        Ok(self)
991    }
992
993    /// Try to set an array as op types.
994    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
995        ensure!(
996            *array.data_type() == arrow::datatypes::DataType::UInt8,
997            InvalidBatchSnafu {
998                reason: "sequence array is not UInt8 type",
999            }
1000        );
1001        // Safety: The cast must success as we have ensured it is uint64 type.
1002        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1003        self.op_types = Some(vector);
1004
1005        Ok(self)
1006    }
1007
1008    /// Builds the [Batch].
1009    pub fn build(self) -> Result<Batch> {
1010        let timestamps = self.timestamps.context(InvalidBatchSnafu {
1011            reason: "missing timestamps",
1012        })?;
1013        let sequences = self.sequences.context(InvalidBatchSnafu {
1014            reason: "missing sequences",
1015        })?;
1016        let op_types = self.op_types.context(InvalidBatchSnafu {
1017            reason: "missing op_types",
1018        })?;
1019        // Our storage format ensure these columns are not nullable so
1020        // we use assert here.
1021        assert_eq!(0, timestamps.null_count());
1022        assert_eq!(0, sequences.null_count());
1023        assert_eq!(0, op_types.null_count());
1024
1025        let ts_len = timestamps.len();
1026        ensure!(
1027            sequences.len() == ts_len,
1028            InvalidBatchSnafu {
1029                reason: format!(
1030                    "sequence have different len {} != {}",
1031                    sequences.len(),
1032                    ts_len
1033                ),
1034            }
1035        );
1036        ensure!(
1037            op_types.len() == ts_len,
1038            InvalidBatchSnafu {
1039                reason: format!(
1040                    "op type have different len {} != {}",
1041                    op_types.len(),
1042                    ts_len
1043                ),
1044            }
1045        );
1046        for column in &self.fields {
1047            ensure!(
1048                column.data.len() == ts_len,
1049                InvalidBatchSnafu {
1050                    reason: format!(
1051                        "column {} has different len {} != {}",
1052                        column.column_id,
1053                        column.data.len(),
1054                        ts_len
1055                    ),
1056                }
1057            );
1058        }
1059
1060        Ok(Batch {
1061            primary_key: self.primary_key,
1062            pk_values: None,
1063            timestamps,
1064            sequences,
1065            op_types,
1066            fields: self.fields,
1067            fields_idx: None,
1068        })
1069    }
1070}
1071
1072impl From<Batch> for BatchBuilder {
1073    fn from(batch: Batch) -> Self {
1074        Self {
1075            primary_key: batch.primary_key,
1076            timestamps: Some(batch.timestamps),
1077            sequences: Some(batch.sequences),
1078            op_types: Some(batch.op_types),
1079            fields: batch.fields,
1080        }
1081    }
1082}
1083
1084/// Async [Batch] reader and iterator wrapper.
1085///
1086/// This is the data source for SST writers or internal readers.
1087pub enum Source {
1088    /// Source from a [BoxedBatchReader].
1089    Reader(BoxedBatchReader),
1090    /// Source from a [BoxedBatchIterator].
1091    Iter(BoxedBatchIterator),
1092    /// Source from a [BoxedBatchStream].
1093    Stream(BoxedBatchStream),
1094}
1095
1096impl Source {
1097    /// Returns next [Batch] from this data source.
1098    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1099        match self {
1100            Source::Reader(reader) => reader.next_batch().await,
1101            Source::Iter(iter) => iter.next().transpose(),
1102            Source::Stream(stream) => stream.try_next().await,
1103        }
1104    }
1105}
1106
1107/// Async [RecordBatch] reader and iterator wrapper for flat format.
1108pub enum FlatSource {
1109    /// Source from a [BoxedRecordBatchIterator].
1110    Iter(BoxedRecordBatchIterator),
1111    /// Source from a [BoxedRecordBatchStream].
1112    Stream(BoxedRecordBatchStream),
1113}
1114
1115impl FlatSource {
1116    /// Returns next [RecordBatch] from this data source.
1117    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1118        match self {
1119            FlatSource::Iter(iter) => iter.next().transpose(),
1120            FlatSource::Stream(stream) => stream.try_next().await,
1121        }
1122    }
1123}
1124
1125/// Async batch reader.
1126///
1127/// The reader must guarantee [Batch]es returned by it have the same schema.
1128#[async_trait]
1129pub trait BatchReader: Send {
1130    /// Fetch next [Batch].
1131    ///
1132    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1133    /// again won't return batch again.
1134    ///
1135    /// If `Err` is returned, caller should not call this method again, the implementor
1136    /// may or may not panic in such case.
1137    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1138}
1139
1140/// Pointer to [BatchReader].
1141pub type BoxedBatchReader = Box<dyn BatchReader>;
1142
1143/// Pointer to a stream that yields [Batch].
1144pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1145
1146/// Pointer to a stream that yields [RecordBatch].
1147pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1148
1149#[async_trait::async_trait]
1150impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1151    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1152        (**self).next_batch().await
1153    }
1154}
1155
1156/// Local metrics for scanners.
1157#[derive(Debug, Default)]
1158pub(crate) struct ScannerMetrics {
1159    /// Duration to scan data.
1160    scan_cost: Duration,
1161    /// Duration while waiting for `yield`.
1162    yield_cost: Duration,
1163    /// Number of batches returned.
1164    num_batches: usize,
1165    /// Number of rows returned.
1166    num_rows: usize,
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1172    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1173    use store_api::codec::PrimaryKeyEncoding;
1174    use store_api::storage::consts::ReservedColumnId;
1175
1176    use super::*;
1177    use crate::error::Error;
1178    use crate::test_util::new_batch_builder;
1179
1180    fn new_batch(
1181        timestamps: &[i64],
1182        sequences: &[u64],
1183        op_types: &[OpType],
1184        field: &[u64],
1185    ) -> Batch {
1186        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1187            .build()
1188            .unwrap()
1189    }
1190
1191    fn new_batch_with_u64_fields(
1192        timestamps: &[i64],
1193        sequences: &[u64],
1194        op_types: &[OpType],
1195        fields: &[(ColumnId, &[Option<u64>])],
1196    ) -> Batch {
1197        assert_eq!(timestamps.len(), sequences.len());
1198        assert_eq!(timestamps.len(), op_types.len());
1199        for (_, values) in fields {
1200            assert_eq!(timestamps.len(), values.len());
1201        }
1202
1203        let mut builder = BatchBuilder::new(b"test".to_vec());
1204        builder
1205            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1206                timestamps.iter().copied(),
1207            )))
1208            .unwrap()
1209            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1210                sequences.iter().copied(),
1211            )))
1212            .unwrap()
1213            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1214                op_types.iter().map(|v| *v as u8),
1215            )))
1216            .unwrap();
1217
1218        for (col_id, values) in fields {
1219            builder
1220                .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1221                .unwrap();
1222        }
1223
1224        builder.build().unwrap()
1225    }
1226
1227    fn new_batch_without_fields(
1228        timestamps: &[i64],
1229        sequences: &[u64],
1230        op_types: &[OpType],
1231    ) -> Batch {
1232        assert_eq!(timestamps.len(), sequences.len());
1233        assert_eq!(timestamps.len(), op_types.len());
1234
1235        let mut builder = BatchBuilder::new(b"test".to_vec());
1236        builder
1237            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1238                timestamps.iter().copied(),
1239            )))
1240            .unwrap()
1241            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1242                sequences.iter().copied(),
1243            )))
1244            .unwrap()
1245            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1246                op_types.iter().map(|v| *v as u8),
1247            )))
1248            .unwrap();
1249
1250        builder.build().unwrap()
1251    }
1252
1253    #[test]
1254    fn test_empty_batch() {
1255        let batch = Batch::empty();
1256        assert!(batch.is_empty());
1257        assert_eq!(None, batch.first_timestamp());
1258        assert_eq!(None, batch.last_timestamp());
1259        assert_eq!(None, batch.first_sequence());
1260        assert_eq!(None, batch.last_sequence());
1261        assert!(batch.timestamps_native().is_none());
1262    }
1263
1264    #[test]
1265    fn test_first_last_one() {
1266        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1267        assert_eq!(
1268            Timestamp::new_millisecond(1),
1269            batch.first_timestamp().unwrap()
1270        );
1271        assert_eq!(
1272            Timestamp::new_millisecond(1),
1273            batch.last_timestamp().unwrap()
1274        );
1275        assert_eq!(2, batch.first_sequence().unwrap());
1276        assert_eq!(2, batch.last_sequence().unwrap());
1277    }
1278
1279    #[test]
1280    fn test_first_last_multiple() {
1281        let batch = new_batch(
1282            &[1, 2, 3],
1283            &[11, 12, 13],
1284            &[OpType::Put, OpType::Put, OpType::Put],
1285            &[21, 22, 23],
1286        );
1287        assert_eq!(
1288            Timestamp::new_millisecond(1),
1289            batch.first_timestamp().unwrap()
1290        );
1291        assert_eq!(
1292            Timestamp::new_millisecond(3),
1293            batch.last_timestamp().unwrap()
1294        );
1295        assert_eq!(11, batch.first_sequence().unwrap());
1296        assert_eq!(13, batch.last_sequence().unwrap());
1297    }
1298
1299    #[test]
1300    fn test_slice() {
1301        let batch = new_batch(
1302            &[1, 2, 3, 4],
1303            &[11, 12, 13, 14],
1304            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1305            &[21, 22, 23, 24],
1306        );
1307        let batch = batch.slice(1, 2);
1308        let expect = new_batch(
1309            &[2, 3],
1310            &[12, 13],
1311            &[OpType::Delete, OpType::Put],
1312            &[22, 23],
1313        );
1314        assert_eq!(expect, batch);
1315    }
1316
1317    #[test]
1318    fn test_timestamps_native() {
1319        let batch = new_batch(
1320            &[1, 2, 3, 4],
1321            &[11, 12, 13, 14],
1322            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1323            &[21, 22, 23, 24],
1324        );
1325        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1326    }
1327
1328    #[test]
1329    fn test_concat_empty() {
1330        let err = Batch::concat(vec![]).unwrap_err();
1331        assert!(
1332            matches!(err, Error::InvalidBatch { .. }),
1333            "unexpected err: {err}"
1334        );
1335    }
1336
1337    #[test]
1338    fn test_concat_one() {
1339        let batch = new_batch(&[], &[], &[], &[]);
1340        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1341        assert_eq!(batch, actual);
1342
1343        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1344        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1345        assert_eq!(batch, actual);
1346    }
1347
1348    #[test]
1349    fn test_concat_multiple() {
1350        let batches = vec![
1351            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1352            new_batch(
1353                &[3, 4, 5],
1354                &[13, 14, 15],
1355                &[OpType::Put, OpType::Delete, OpType::Put],
1356                &[23, 24, 25],
1357            ),
1358            new_batch(&[], &[], &[], &[]),
1359            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1360        ];
1361        let batch = Batch::concat(batches).unwrap();
1362        let expect = new_batch(
1363            &[1, 2, 3, 4, 5, 6],
1364            &[11, 12, 13, 14, 15, 16],
1365            &[
1366                OpType::Put,
1367                OpType::Put,
1368                OpType::Put,
1369                OpType::Delete,
1370                OpType::Put,
1371                OpType::Put,
1372            ],
1373            &[21, 22, 23, 24, 25, 26],
1374        );
1375        assert_eq!(expect, batch);
1376    }
1377
1378    #[test]
1379    fn test_concat_different() {
1380        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1381        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1382        batch2.primary_key = b"hello".to_vec();
1383        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1384        assert!(
1385            matches!(err, Error::InvalidBatch { .. }),
1386            "unexpected err: {err}"
1387        );
1388    }
1389
1390    #[test]
1391    fn test_concat_different_fields() {
1392        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1393        let fields = vec![
1394            batch1.fields()[0].clone(),
1395            BatchColumn {
1396                column_id: 2,
1397                data: Arc::new(UInt64Vector::from_slice([2])),
1398            },
1399        ];
1400        // Batch 2 has more fields.
1401        let batch2 = batch1.clone().with_fields(fields).unwrap();
1402        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1403        assert!(
1404            matches!(err, Error::InvalidBatch { .. }),
1405            "unexpected err: {err}"
1406        );
1407
1408        // Batch 2 has different field.
1409        let fields = vec![BatchColumn {
1410            column_id: 2,
1411            data: Arc::new(UInt64Vector::from_slice([2])),
1412        }];
1413        let batch2 = batch1.clone().with_fields(fields).unwrap();
1414        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1415        assert!(
1416            matches!(err, Error::InvalidBatch { .. }),
1417            "unexpected err: {err}"
1418        );
1419    }
1420
1421    #[test]
1422    fn test_filter_deleted_empty() {
1423        let mut batch = new_batch(&[], &[], &[], &[]);
1424        batch.filter_deleted().unwrap();
1425        assert!(batch.is_empty());
1426    }
1427
1428    #[test]
1429    fn test_filter_deleted() {
1430        let mut batch = new_batch(
1431            &[1, 2, 3, 4],
1432            &[11, 12, 13, 14],
1433            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1434            &[21, 22, 23, 24],
1435        );
1436        batch.filter_deleted().unwrap();
1437        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1438        assert_eq!(expect, batch);
1439
1440        let mut batch = new_batch(
1441            &[1, 2, 3, 4],
1442            &[11, 12, 13, 14],
1443            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1444            &[21, 22, 23, 24],
1445        );
1446        let expect = batch.clone();
1447        batch.filter_deleted().unwrap();
1448        assert_eq!(expect, batch);
1449    }
1450
1451    #[test]
1452    fn test_filter_by_sequence() {
1453        // Filters put only.
1454        let mut batch = new_batch(
1455            &[1, 2, 3, 4],
1456            &[11, 12, 13, 14],
1457            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1458            &[21, 22, 23, 24],
1459        );
1460        batch
1461            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1462            .unwrap();
1463        let expect = new_batch(
1464            &[1, 2, 3],
1465            &[11, 12, 13],
1466            &[OpType::Put, OpType::Put, OpType::Put],
1467            &[21, 22, 23],
1468        );
1469        assert_eq!(expect, batch);
1470
1471        // Filters to empty.
1472        let mut batch = new_batch(
1473            &[1, 2, 3, 4],
1474            &[11, 12, 13, 14],
1475            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1476            &[21, 22, 23, 24],
1477        );
1478
1479        batch
1480            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1481            .unwrap();
1482        assert!(batch.is_empty());
1483
1484        // None filter.
1485        let mut batch = new_batch(
1486            &[1, 2, 3, 4],
1487            &[11, 12, 13, 14],
1488            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1489            &[21, 22, 23, 24],
1490        );
1491        let expect = batch.clone();
1492        batch.filter_by_sequence(None).unwrap();
1493        assert_eq!(expect, batch);
1494
1495        // Filter a empty batch
1496        let mut batch = new_batch(&[], &[], &[], &[]);
1497        batch
1498            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1499            .unwrap();
1500        assert!(batch.is_empty());
1501
1502        // Filter a empty batch with None
1503        let mut batch = new_batch(&[], &[], &[], &[]);
1504        batch.filter_by_sequence(None).unwrap();
1505        assert!(batch.is_empty());
1506
1507        // Test From variant - exclusive lower bound
1508        let mut batch = new_batch(
1509            &[1, 2, 3, 4],
1510            &[11, 12, 13, 14],
1511            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1512            &[21, 22, 23, 24],
1513        );
1514        batch
1515            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1516            .unwrap();
1517        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1518        assert_eq!(expect, batch);
1519
1520        // Test From variant with no matches
1521        let mut batch = new_batch(
1522            &[1, 2, 3, 4],
1523            &[11, 12, 13, 14],
1524            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1525            &[21, 22, 23, 24],
1526        );
1527        batch
1528            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1529            .unwrap();
1530        assert!(batch.is_empty());
1531
1532        // Test Range variant - exclusive lower bound, inclusive upper bound
1533        let mut batch = new_batch(
1534            &[1, 2, 3, 4, 5],
1535            &[11, 12, 13, 14, 15],
1536            &[
1537                OpType::Put,
1538                OpType::Put,
1539                OpType::Put,
1540                OpType::Put,
1541                OpType::Put,
1542            ],
1543            &[21, 22, 23, 24, 25],
1544        );
1545        batch
1546            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1547            .unwrap();
1548        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1549        assert_eq!(expect, batch);
1550
1551        // Test Range variant with mixed operations
1552        let mut batch = new_batch(
1553            &[1, 2, 3, 4, 5],
1554            &[11, 12, 13, 14, 15],
1555            &[
1556                OpType::Put,
1557                OpType::Delete,
1558                OpType::Put,
1559                OpType::Delete,
1560                OpType::Put,
1561            ],
1562            &[21, 22, 23, 24, 25],
1563        );
1564        batch
1565            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1566            .unwrap();
1567        let expect = new_batch(
1568            &[2, 3],
1569            &[12, 13],
1570            &[OpType::Delete, OpType::Put],
1571            &[22, 23],
1572        );
1573        assert_eq!(expect, batch);
1574
1575        // Test Range variant with no matches
1576        let mut batch = new_batch(
1577            &[1, 2, 3, 4],
1578            &[11, 12, 13, 14],
1579            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1580            &[21, 22, 23, 24],
1581        );
1582        batch
1583            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1584            .unwrap();
1585        assert!(batch.is_empty());
1586    }
1587
1588    #[test]
1589    fn test_merge_last_non_null_no_dup() {
1590        let mut batch = new_batch_with_u64_fields(
1591            &[1, 2],
1592            &[2, 1],
1593            &[OpType::Put, OpType::Put],
1594            &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1595        );
1596        let expect = batch.clone();
1597        batch.merge_last_non_null().unwrap();
1598        assert_eq!(expect, batch);
1599    }
1600
1601    #[test]
1602    fn test_merge_last_non_null_fill_null_fields() {
1603        // Rows are already sorted by timestamp asc and sequence desc.
1604        let mut batch = new_batch_with_u64_fields(
1605            &[1, 1, 1],
1606            &[3, 2, 1],
1607            &[OpType::Put, OpType::Put, OpType::Put],
1608            &[
1609                (1, &[None, Some(10), Some(11)]),
1610                (2, &[Some(100), Some(200), Some(300)]),
1611            ],
1612        );
1613        batch.merge_last_non_null().unwrap();
1614
1615        // Field 1 is filled from the first older row (seq=2). Field 2 keeps the base value.
1616        // Filled fields must not be overwritten by even older duplicates.
1617        let expect = new_batch_with_u64_fields(
1618            &[1],
1619            &[3],
1620            &[OpType::Put],
1621            &[(1, &[Some(10)]), (2, &[Some(100)])],
1622        );
1623        assert_eq!(expect, batch);
1624    }
1625
1626    #[test]
1627    fn test_merge_last_non_null_stop_at_delete_row() {
1628        // A delete row in older duplicates should stop filling to avoid resurrecting values before
1629        // deletion.
1630        let mut batch = new_batch_with_u64_fields(
1631            &[1, 1, 1],
1632            &[3, 2, 1],
1633            &[OpType::Put, OpType::Delete, OpType::Put],
1634            &[
1635                (1, &[None, Some(10), Some(11)]),
1636                (2, &[Some(100), Some(200), Some(300)]),
1637            ],
1638        );
1639        batch.merge_last_non_null().unwrap();
1640
1641        let expect = new_batch_with_u64_fields(
1642            &[1],
1643            &[3],
1644            &[OpType::Put],
1645            &[(1, &[None]), (2, &[Some(100)])],
1646        );
1647        assert_eq!(expect, batch);
1648    }
1649
1650    #[test]
1651    fn test_merge_last_non_null_base_delete_no_merge() {
1652        let mut batch = new_batch_with_u64_fields(
1653            &[1, 1],
1654            &[3, 2],
1655            &[OpType::Delete, OpType::Put],
1656            &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1657        );
1658        batch.merge_last_non_null().unwrap();
1659
1660        // Base row is delete, keep it as is and don't merge fields from older rows.
1661        let expect =
1662            new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1663        assert_eq!(expect, batch);
1664    }
1665
1666    #[test]
1667    fn test_merge_last_non_null_multiple_timestamp_groups() {
1668        let mut batch = new_batch_with_u64_fields(
1669            &[1, 1, 2, 3, 3],
1670            &[5, 4, 3, 2, 1],
1671            &[
1672                OpType::Put,
1673                OpType::Put,
1674                OpType::Put,
1675                OpType::Put,
1676                OpType::Put,
1677            ],
1678            &[
1679                (1, &[None, Some(10), Some(20), None, Some(30)]),
1680                (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1681            ],
1682        );
1683        batch.merge_last_non_null().unwrap();
1684
1685        let expect = new_batch_with_u64_fields(
1686            &[1, 2, 3],
1687            &[5, 3, 2],
1688            &[OpType::Put, OpType::Put, OpType::Put],
1689            &[
1690                (1, &[Some(10), Some(20), Some(30)]),
1691                (2, &[Some(100), Some(120), Some(130)]),
1692            ],
1693        );
1694        assert_eq!(expect, batch);
1695    }
1696
1697    #[test]
1698    fn test_merge_last_non_null_no_fields() {
1699        let mut batch = new_batch_without_fields(
1700            &[1, 1, 2],
1701            &[3, 2, 1],
1702            &[OpType::Put, OpType::Put, OpType::Put],
1703        );
1704        batch.merge_last_non_null().unwrap();
1705
1706        let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1707        assert_eq!(expect, batch);
1708    }
1709
1710    #[test]
1711    fn test_filter() {
1712        // Filters put only.
1713        let mut batch = new_batch(
1714            &[1, 2, 3, 4],
1715            &[11, 12, 13, 14],
1716            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1717            &[21, 22, 23, 24],
1718        );
1719        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1720        batch.filter(&predicate).unwrap();
1721        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1722        assert_eq!(expect, batch);
1723
1724        // Filters deletion.
1725        let mut batch = new_batch(
1726            &[1, 2, 3, 4],
1727            &[11, 12, 13, 14],
1728            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1729            &[21, 22, 23, 24],
1730        );
1731        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1732        batch.filter(&predicate).unwrap();
1733        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1734        assert_eq!(expect, batch);
1735
1736        // Filters to empty.
1737        let predicate = BooleanVector::from_vec(vec![false, false]);
1738        batch.filter(&predicate).unwrap();
1739        assert!(batch.is_empty());
1740    }
1741
1742    #[test]
1743    fn test_sort_and_dedup() {
1744        let original = new_batch(
1745            &[2, 3, 1, 4, 5, 2],
1746            &[1, 2, 3, 4, 5, 6],
1747            &[
1748                OpType::Put,
1749                OpType::Put,
1750                OpType::Put,
1751                OpType::Put,
1752                OpType::Put,
1753                OpType::Put,
1754            ],
1755            &[21, 22, 23, 24, 25, 26],
1756        );
1757
1758        let mut batch = original.clone();
1759        batch.sort(true).unwrap();
1760        // It should only keep one timestamp 2.
1761        assert_eq!(
1762            new_batch(
1763                &[1, 2, 3, 4, 5],
1764                &[3, 6, 2, 4, 5],
1765                &[
1766                    OpType::Put,
1767                    OpType::Put,
1768                    OpType::Put,
1769                    OpType::Put,
1770                    OpType::Put,
1771                ],
1772                &[23, 26, 22, 24, 25],
1773            ),
1774            batch
1775        );
1776
1777        let mut batch = original.clone();
1778        batch.sort(false).unwrap();
1779
1780        // It should only keep one timestamp 2.
1781        assert_eq!(
1782            new_batch(
1783                &[1, 2, 2, 3, 4, 5],
1784                &[3, 6, 1, 2, 4, 5],
1785                &[
1786                    OpType::Put,
1787                    OpType::Put,
1788                    OpType::Put,
1789                    OpType::Put,
1790                    OpType::Put,
1791                    OpType::Put,
1792                ],
1793                &[23, 26, 21, 22, 24, 25],
1794            ),
1795            batch
1796        );
1797
1798        let original = new_batch(
1799            &[2, 2, 1],
1800            &[1, 6, 1],
1801            &[OpType::Delete, OpType::Put, OpType::Put],
1802            &[21, 22, 23],
1803        );
1804
1805        let mut batch = original.clone();
1806        batch.sort(true).unwrap();
1807        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1808        assert_eq!(expect, batch);
1809
1810        let mut batch = original.clone();
1811        batch.sort(false).unwrap();
1812        let expect = new_batch(
1813            &[1, 2, 2],
1814            &[1, 6, 1],
1815            &[OpType::Put, OpType::Put, OpType::Delete],
1816            &[23, 22, 21],
1817        );
1818        assert_eq!(expect, batch);
1819    }
1820
1821    #[test]
1822    fn test_get_value() {
1823        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1824
1825        for encoding in encodings {
1826            let codec = build_primary_key_codec_with_fields(
1827                encoding,
1828                [
1829                    (
1830                        ReservedColumnId::table_id(),
1831                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1832                    ),
1833                    (
1834                        ReservedColumnId::tsid(),
1835                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1836                    ),
1837                    (
1838                        100,
1839                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1840                    ),
1841                    (
1842                        200,
1843                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1844                    ),
1845                ]
1846                .into_iter(),
1847            );
1848
1849            let values = [
1850                Value::UInt32(1000),
1851                Value::UInt64(2000),
1852                Value::String("abcdefgh".into()),
1853                Value::String("zyxwvu".into()),
1854            ];
1855            let mut buf = vec![];
1856            codec
1857                .encode_values(
1858                    &[
1859                        (ReservedColumnId::table_id(), values[0].clone()),
1860                        (ReservedColumnId::tsid(), values[1].clone()),
1861                        (100, values[2].clone()),
1862                        (200, values[3].clone()),
1863                    ],
1864                    &mut buf,
1865                )
1866                .unwrap();
1867
1868            let field_col_id = 2;
1869            let mut batch = new_batch_builder(
1870                &buf,
1871                &[1, 2, 3],
1872                &[1, 1, 1],
1873                &[OpType::Put, OpType::Put, OpType::Put],
1874                field_col_id,
1875                &[42, 43, 44],
1876            )
1877            .build()
1878            .unwrap();
1879
1880            let v = batch
1881                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1882                .unwrap()
1883                .unwrap();
1884            assert_eq!(values[0], *v);
1885
1886            let v = batch
1887                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1888                .unwrap()
1889                .unwrap();
1890            assert_eq!(values[1], *v);
1891
1892            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1893            assert_eq!(values[2], *v);
1894
1895            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1896            assert_eq!(values[3], *v);
1897
1898            let v = batch.field_col_value(field_col_id).unwrap();
1899            assert_eq!(v.data.get(0), Value::UInt64(42));
1900            assert_eq!(v.data.get(1), Value::UInt64(43));
1901            assert_eq!(v.data.get(2), Value::UInt64(44));
1902        }
1903    }
1904}