mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod merge;
25pub mod plain_batch;
26pub mod projection;
27pub(crate) mod prune;
28pub(crate) mod pruner;
29pub mod range;
30pub(crate) mod range_cache;
31pub mod scan_region;
32pub mod scan_util;
33pub(crate) mod seq_scan;
34pub mod series_scan;
35pub mod stream;
36pub(crate) mod unordered_scan;
37
38use std::collections::{HashMap, HashSet};
39use std::sync::Arc;
40use std::time::Duration;
41
42use api::v1::OpType;
43use async_trait::async_trait;
44use common_time::Timestamp;
45use datafusion_common::arrow::array::UInt8Array;
46use datatypes::arrow;
47use datatypes::arrow::array::{Array, ArrayRef};
48use datatypes::arrow::compute::SortOptions;
49use datatypes::arrow::record_batch::RecordBatch;
50use datatypes::arrow::row::{RowConverter, SortField};
51use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
52use datatypes::scalars::ScalarVectorBuilder;
53use datatypes::types::TimestampType;
54use datatypes::value::{Value, ValueRef};
55use datatypes::vectors::{
56    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
57    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
58    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
59    VectorRef,
60};
61use futures::TryStreamExt;
62use futures::stream::BoxStream;
63use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metadata::RegionMetadata;
66use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
67
68use crate::error::{
69    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
70    Result,
71};
72use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
73use crate::read::prune::PruneReader;
74
75/// Storage internal representation of a batch of rows for a primary key (time series).
76///
77/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
78/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
79#[derive(Debug, PartialEq, Clone)]
80pub struct Batch {
81    /// Primary key encoded in a comparable form.
82    primary_key: Vec<u8>,
83    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
84    pk_values: Option<CompositeValues>,
85    /// Timestamps of rows, should be sorted and not null.
86    timestamps: VectorRef,
87    /// Sequences of rows
88    ///
89    /// UInt64 type, not null.
90    sequences: Arc<UInt64Vector>,
91    /// Op types of rows
92    ///
93    /// UInt8 type, not null.
94    op_types: Arc<UInt8Vector>,
95    /// Fields organized in columnar format.
96    fields: Vec<BatchColumn>,
97    /// Cache for field index lookup.
98    fields_idx: Option<HashMap<ColumnId, usize>>,
99}
100
101impl Batch {
102    /// Creates a new batch.
103    pub fn new(
104        primary_key: Vec<u8>,
105        timestamps: VectorRef,
106        sequences: Arc<UInt64Vector>,
107        op_types: Arc<UInt8Vector>,
108        fields: Vec<BatchColumn>,
109    ) -> Result<Batch> {
110        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
111            .with_fields(fields)
112            .build()
113    }
114
115    /// Tries to set fields for the batch.
116    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
117        Batch::new(
118            self.primary_key,
119            self.timestamps,
120            self.sequences,
121            self.op_types,
122            fields,
123        )
124    }
125
126    /// Returns primary key of the batch.
127    pub fn primary_key(&self) -> &[u8] {
128        &self.primary_key
129    }
130
131    /// Returns possibly decoded primary-key values.
132    pub fn pk_values(&self) -> Option<&CompositeValues> {
133        self.pk_values.as_ref()
134    }
135
136    /// Sets possibly decoded primary-key values.
137    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
138        self.pk_values = Some(pk_values);
139    }
140
141    /// Removes possibly decoded primary-key values. For testing only.
142    #[cfg(any(test, feature = "test"))]
143    pub fn remove_pk_values(&mut self) {
144        self.pk_values = None;
145    }
146
147    /// Returns fields in the batch.
148    pub fn fields(&self) -> &[BatchColumn] {
149        &self.fields
150    }
151
152    /// Returns timestamps of the batch.
153    pub fn timestamps(&self) -> &VectorRef {
154        &self.timestamps
155    }
156
157    /// Returns sequences of the batch.
158    pub fn sequences(&self) -> &Arc<UInt64Vector> {
159        &self.sequences
160    }
161
162    /// Returns op types of the batch.
163    pub fn op_types(&self) -> &Arc<UInt8Vector> {
164        &self.op_types
165    }
166
167    /// Returns the number of rows in the batch.
168    pub fn num_rows(&self) -> usize {
169        // All vectors have the same length. We use the length of sequences vector
170        // since it has static type.
171        self.sequences.len()
172    }
173
174    /// Create an empty [`Batch`].
175    pub(crate) fn empty() -> Self {
176        Self {
177            primary_key: vec![],
178            pk_values: None,
179            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
180            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
181            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
182            fields: vec![],
183            fields_idx: None,
184        }
185    }
186
187    /// Returns true if the number of rows in the batch is 0.
188    pub fn is_empty(&self) -> bool {
189        self.num_rows() == 0
190    }
191
192    /// Returns the first timestamp in the batch or `None` if the batch is empty.
193    pub fn first_timestamp(&self) -> Option<Timestamp> {
194        if self.timestamps.is_empty() {
195            return None;
196        }
197
198        Some(self.get_timestamp(0))
199    }
200
201    /// Returns the last timestamp in the batch or `None` if the batch is empty.
202    pub fn last_timestamp(&self) -> Option<Timestamp> {
203        if self.timestamps.is_empty() {
204            return None;
205        }
206
207        Some(self.get_timestamp(self.timestamps.len() - 1))
208    }
209
210    /// Returns the first sequence in the batch or `None` if the batch is empty.
211    pub fn first_sequence(&self) -> Option<SequenceNumber> {
212        if self.sequences.is_empty() {
213            return None;
214        }
215
216        Some(self.get_sequence(0))
217    }
218
219    /// Returns the last sequence in the batch or `None` if the batch is empty.
220    pub fn last_sequence(&self) -> Option<SequenceNumber> {
221        if self.sequences.is_empty() {
222            return None;
223        }
224
225        Some(self.get_sequence(self.sequences.len() - 1))
226    }
227
228    /// Replaces the primary key of the batch.
229    ///
230    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
231    /// Be sure to update that field as well.
232    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
233        self.primary_key = primary_key;
234    }
235
236    /// Slice the batch, returning a new batch.
237    ///
238    /// # Panics
239    /// Panics if `offset + length > self.num_rows()`.
240    pub fn slice(&self, offset: usize, length: usize) -> Batch {
241        let fields = self
242            .fields
243            .iter()
244            .map(|column| BatchColumn {
245                column_id: column.column_id,
246                data: column.data.slice(offset, length),
247            })
248            .collect();
249        // We skip using the builder to avoid validating the batch again.
250        Batch {
251            // Now we need to clone the primary key. We could try `Bytes` if
252            // this becomes a bottleneck.
253            primary_key: self.primary_key.clone(),
254            pk_values: self.pk_values.clone(),
255            timestamps: self.timestamps.slice(offset, length),
256            sequences: Arc::new(self.sequences.get_slice(offset, length)),
257            op_types: Arc::new(self.op_types.get_slice(offset, length)),
258            fields,
259            fields_idx: self.fields_idx.clone(),
260        }
261    }
262
263    /// Takes `batches` and concat them into one batch.
264    ///
265    /// All `batches` must have the same primary key.
266    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
267        ensure!(
268            !batches.is_empty(),
269            InvalidBatchSnafu {
270                reason: "empty batches",
271            }
272        );
273        if batches.len() == 1 {
274            // Now we own the `batches` so we could pop it directly.
275            return Ok(batches.pop().unwrap());
276        }
277
278        let primary_key = std::mem::take(&mut batches[0].primary_key);
279        let first = &batches[0];
280        // We took the primary key from the first batch so we don't use `first.primary_key()`.
281        ensure!(
282            batches
283                .iter()
284                .skip(1)
285                .all(|b| b.primary_key() == primary_key),
286            InvalidBatchSnafu {
287                reason: "batches have different primary key",
288            }
289        );
290        for b in batches.iter().skip(1) {
291            ensure!(
292                b.fields.len() == first.fields.len(),
293                InvalidBatchSnafu {
294                    reason: "batches have different field num",
295                }
296            );
297            for (l, r) in b.fields.iter().zip(&first.fields) {
298                ensure!(
299                    l.column_id == r.column_id,
300                    InvalidBatchSnafu {
301                        reason: "batches have different fields",
302                    }
303                );
304            }
305        }
306
307        // We take the primary key from the first batch.
308        let mut builder = BatchBuilder::new(primary_key);
309        // Concat timestamps, sequences, op_types, fields.
310        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
311        builder.timestamps_array(array)?;
312        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
313        builder.sequences_array(array)?;
314        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
315        builder.op_types_array(array)?;
316        for (i, batch_column) in first.fields.iter().enumerate() {
317            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
318            builder.push_field_array(batch_column.column_id, array)?;
319        }
320
321        builder.build()
322    }
323
324    /// Removes rows whose op type is delete.
325    pub fn filter_deleted(&mut self) -> Result<()> {
326        // Safety: op type column is not null.
327        let array = self.op_types.as_arrow();
328        // Find rows with non-delete op type.
329        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
330        let predicate =
331            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
332        self.filter(&BooleanVector::from(predicate))
333    }
334
335    // Applies the `predicate` to the batch.
336    // Safety: We know the array type so we unwrap on casting.
337    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
338        self.timestamps = self
339            .timestamps
340            .filter(predicate)
341            .context(ComputeVectorSnafu)?;
342        self.sequences = Arc::new(
343            UInt64Vector::try_from_arrow_array(
344                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
345                    .context(ComputeArrowSnafu)?,
346            )
347            .unwrap(),
348        );
349        self.op_types = Arc::new(
350            UInt8Vector::try_from_arrow_array(
351                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
352                    .context(ComputeArrowSnafu)?,
353            )
354            .unwrap(),
355        );
356        for batch_column in &mut self.fields {
357            batch_column.data = batch_column
358                .data
359                .filter(predicate)
360                .context(ComputeVectorSnafu)?;
361        }
362
363        Ok(())
364    }
365
366    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
367    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
368        let seq_range = match sequence {
369            None => return Ok(()),
370            Some(seq_range) => {
371                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
372                else {
373                    return Ok(());
374                };
375                let is_subset = match seq_range {
376                    SequenceRange::Gt { min } => min < first,
377                    SequenceRange::LtEq { max } => max >= last,
378                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
379                };
380                if is_subset {
381                    return Ok(());
382                }
383                seq_range
384            }
385        };
386
387        let seqs = self.sequences.as_arrow();
388        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
389
390        let predicate = BooleanVector::from(predicate);
391        self.filter(&predicate)?;
392
393        Ok(())
394    }
395
396    /// Sorts rows in the batch. If `dedup` is true, it also removes
397    /// duplicated rows according to primary keys.
398    ///
399    /// It orders rows by timestamp, sequence desc and only keep the latest
400    /// row for the same timestamp. It doesn't consider op type as sequence
401    /// should already provide uniqueness for a row.
402    pub fn sort(&mut self, dedup: bool) -> Result<()> {
403        // If building a converter each time is costly, we may allow passing a
404        // converter.
405        let converter = RowConverter::new(vec![
406            SortField::new(self.timestamps.data_type().as_arrow_type()),
407            SortField::new_with_options(
408                self.sequences.data_type().as_arrow_type(),
409                SortOptions {
410                    descending: true,
411                    ..Default::default()
412                },
413            ),
414        ])
415        .context(ComputeArrowSnafu)?;
416        // Columns to sort.
417        let columns = [
418            self.timestamps.to_arrow_array(),
419            self.sequences.to_arrow_array(),
420        ];
421        let rows = converter.convert_columns(&columns).unwrap();
422        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
423
424        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
425        if !was_sorted {
426            to_sort.sort_unstable_by_key(|x| x.1);
427        }
428
429        let num_rows = to_sort.len();
430        if dedup {
431            // Dedup by timestamps.
432            to_sort.dedup_by(|left, right| {
433                debug_assert_eq!(18, left.1.as_ref().len());
434                debug_assert_eq!(18, right.1.as_ref().len());
435                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
436                // We only compare the timestamp part and ignore sequence.
437                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
438            });
439        }
440        let no_dedup = to_sort.len() == num_rows;
441
442        if was_sorted && no_dedup {
443            return Ok(());
444        }
445        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
446        self.take_in_place(&indices)
447    }
448
449    /// Merges duplicated timestamps in the batch by keeping the latest non-null field values.
450    ///
451    /// Rows must already be sorted by timestamp (ascending) and sequence (descending).
452    ///
453    /// This method deduplicates rows with the same timestamp (keeping the first row in each
454    /// timestamp range as the base row) and fills null fields from subsequent rows until all
455    /// fields are filled or a delete operation is encountered.
456    pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
457        let num_rows = self.num_rows();
458        if num_rows < 2 {
459            return Ok(());
460        }
461
462        let Some(timestamps) = self.timestamps_native() else {
463            return Ok(());
464        };
465
466        // Fast path: check if there are any duplicate timestamps.
467        let mut has_dup = false;
468        let mut group_count = 1;
469        for i in 1..num_rows {
470            has_dup |= timestamps[i] == timestamps[i - 1];
471            group_count += (timestamps[i] != timestamps[i - 1]) as usize;
472        }
473        if !has_dup {
474            return Ok(());
475        }
476
477        let num_fields = self.fields.len();
478        let op_types = self.op_types.as_arrow().values();
479
480        let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
481        let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
482            .map(|_| Vec::with_capacity(group_count))
483            .collect();
484
485        let mut start = 0;
486        while start < num_rows {
487            let ts = timestamps[start];
488            let mut end = start + 1;
489            while end < num_rows && timestamps[end] == ts {
490                end += 1;
491            }
492
493            let group_pos = base_indices.len();
494            base_indices.push(start as u32);
495
496            if num_fields > 0 {
497                // Default: take the base row for all fields.
498                for idx in &mut field_indices {
499                    idx.push(start as u32);
500                }
501
502                let base_deleted = op_types[start] == OpType::Delete as u8;
503                if !base_deleted {
504                    // Track fields that are null in the base row and try to fill them from older
505                    // rows in the same timestamp range.
506                    let mut missing_fields = Vec::new();
507                    for (field_idx, col) in self.fields.iter().enumerate() {
508                        if col.data.is_null(start) {
509                            missing_fields.push(field_idx);
510                        }
511                    }
512
513                    if !missing_fields.is_empty() {
514                        for row_idx in (start + 1)..end {
515                            if op_types[row_idx] == OpType::Delete as u8 {
516                                break;
517                            }
518
519                            missing_fields.retain(|&field_idx| {
520                                if self.fields[field_idx].data.is_null(row_idx) {
521                                    true
522                                } else {
523                                    field_indices[field_idx][group_pos] = row_idx as u32;
524                                    false
525                                }
526                            });
527
528                            if missing_fields.is_empty() {
529                                break;
530                            }
531                        }
532                    }
533                }
534            }
535
536            start = end;
537        }
538
539        let base_indices = UInt32Vector::from_vec(base_indices);
540        self.timestamps = self
541            .timestamps
542            .take(&base_indices)
543            .context(ComputeVectorSnafu)?;
544        let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
545            .context(ComputeArrowSnafu)?;
546        // Safety: We know the array and vector type.
547        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
548        let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
549            .context(ComputeArrowSnafu)?;
550        // Safety: We know the array and vector type.
551        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
552
553        for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
554            let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
555            batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
556        }
557
558        Ok(())
559    }
560
561    /// Returns the estimated memory size of the batch.
562    pub fn memory_size(&self) -> usize {
563        let mut size = std::mem::size_of::<Self>();
564        size += self.primary_key.len();
565        size += self.timestamps.memory_size();
566        size += self.sequences.memory_size();
567        size += self.op_types.memory_size();
568        for batch_column in &self.fields {
569            size += batch_column.data.memory_size();
570        }
571        size
572    }
573
574    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
575    pub(crate) fn projected_fields(
576        metadata: &RegionMetadata,
577        projection: &[ColumnId],
578    ) -> Vec<(ColumnId, ConcreteDataType)> {
579        let projected_ids: HashSet<_> = projection.iter().copied().collect();
580        metadata
581            .field_columns()
582            .filter_map(|column| {
583                if projected_ids.contains(&column.column_id) {
584                    Some((column.column_id, column.column_schema.data_type.clone()))
585                } else {
586                    None
587                }
588            })
589            .collect()
590    }
591
592    /// Returns timestamps in a native slice or `None` if the batch is empty.
593    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
594        if self.timestamps.is_empty() {
595            return None;
596        }
597
598        let values = match self.timestamps.data_type() {
599            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
600                .timestamps
601                .as_any()
602                .downcast_ref::<TimestampSecondVector>()
603                .unwrap()
604                .as_arrow()
605                .values(),
606            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
607                .timestamps
608                .as_any()
609                .downcast_ref::<TimestampMillisecondVector>()
610                .unwrap()
611                .as_arrow()
612                .values(),
613            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
614                .timestamps
615                .as_any()
616                .downcast_ref::<TimestampMicrosecondVector>()
617                .unwrap()
618                .as_arrow()
619                .values(),
620            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
621                .timestamps
622                .as_any()
623                .downcast_ref::<TimestampNanosecondVector>()
624                .unwrap()
625                .as_arrow()
626                .values(),
627            other => panic!("timestamps in a Batch has other type {:?}", other),
628        };
629
630        Some(values)
631    }
632
633    /// Takes the batch in place.
634    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
635        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
636        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
637            .context(ComputeArrowSnafu)?;
638        // Safety: we know the array and vector type.
639        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
640        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
641            .context(ComputeArrowSnafu)?;
642        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
643        for batch_column in &mut self.fields {
644            batch_column.data = batch_column
645                .data
646                .take(indices)
647                .context(ComputeVectorSnafu)?;
648        }
649
650        Ok(())
651    }
652
653    /// Gets a timestamp at given `index`.
654    ///
655    /// # Panics
656    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
657    fn get_timestamp(&self, index: usize) -> Timestamp {
658        match self.timestamps.get_ref(index) {
659            ValueRef::Timestamp(timestamp) => timestamp,
660
661            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
662            value => panic!("{:?} is not a timestamp", value),
663        }
664    }
665
666    /// Gets a sequence at given `index`.
667    ///
668    /// # Panics
669    /// Panics if `index` is out-of-bound or the sequence vector returns null.
670    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
671        // Safety: sequences is not null so it actually returns Some.
672        self.sequences.get_data(index).unwrap()
673    }
674
675    /// Checks the batch is monotonic by timestamps.
676    #[cfg(debug_assertions)]
677    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
678        use std::cmp::Ordering;
679        if self.timestamps_native().is_none() {
680            return Ok(());
681        }
682
683        let timestamps = self.timestamps_native().unwrap();
684        let sequences = self.sequences.as_arrow().values();
685        for (i, window) in timestamps.windows(2).enumerate() {
686            let current = window[0];
687            let next = window[1];
688            let current_sequence = sequences[i];
689            let next_sequence = sequences[i + 1];
690            match current.cmp(&next) {
691                Ordering::Less => {
692                    // The current timestamp is less than the next timestamp.
693                    continue;
694                }
695                Ordering::Equal => {
696                    // The current timestamp is equal to the next timestamp.
697                    if current_sequence < next_sequence {
698                        return Err(format!(
699                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
700                            current, next, current_sequence, next_sequence, i
701                        ));
702                    }
703                }
704                Ordering::Greater => {
705                    // The current timestamp is greater than the next timestamp.
706                    return Err(format!(
707                        "timestamps are not monotonic: {} > {}, index: {}",
708                        current, next, i
709                    ));
710                }
711            }
712        }
713
714        Ok(())
715    }
716
717    /// Returns Ok if the given batch is behind the current batch.
718    #[cfg(debug_assertions)]
719    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
720        // Checks the primary key
721        if self.primary_key() < other.primary_key() {
722            return Ok(());
723        }
724        if self.primary_key() > other.primary_key() {
725            return Err(format!(
726                "primary key is not monotonic: {:?} > {:?}",
727                self.primary_key(),
728                other.primary_key()
729            ));
730        }
731        // Checks the timestamp.
732        if self.last_timestamp() < other.first_timestamp() {
733            return Ok(());
734        }
735        if self.last_timestamp() > other.first_timestamp() {
736            return Err(format!(
737                "timestamps are not monotonic: {:?} > {:?}",
738                self.last_timestamp(),
739                other.first_timestamp()
740            ));
741        }
742        // Checks the sequence.
743        if self.last_sequence() >= other.first_sequence() {
744            return Ok(());
745        }
746        Err(format!(
747            "sequences are not monotonic: {:?} < {:?}",
748            self.last_sequence(),
749            other.first_sequence()
750        ))
751    }
752
753    /// Returns the value of the column in the primary key.
754    ///
755    /// Lazily decodes the primary key and caches the result.
756    pub fn pk_col_value(
757        &mut self,
758        codec: &dyn PrimaryKeyCodec,
759        col_idx_in_pk: usize,
760        column_id: ColumnId,
761    ) -> Result<Option<&Value>> {
762        if self.pk_values.is_none() {
763            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
764        }
765
766        let pk_values = self.pk_values.as_ref().unwrap();
767        Ok(match pk_values {
768            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
769            CompositeValues::Sparse(values) => values.get(&column_id),
770        })
771    }
772
773    /// Returns values of the field in the batch.
774    ///
775    /// Lazily caches the field index.
776    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
777        if self.fields_idx.is_none() {
778            self.fields_idx = Some(
779                self.fields
780                    .iter()
781                    .enumerate()
782                    .map(|(i, c)| (c.column_id, i))
783                    .collect(),
784            );
785        }
786
787        self.fields_idx
788            .as_ref()
789            .unwrap()
790            .get(&column_id)
791            .map(|&idx| &self.fields[idx])
792    }
793}
794
795/// A struct to check the batch is monotonic.
796#[cfg(debug_assertions)]
797#[derive(Default)]
798pub(crate) struct BatchChecker {
799    last_batch: Option<Batch>,
800    start: Option<Timestamp>,
801    end: Option<Timestamp>,
802}
803
804#[cfg(debug_assertions)]
805impl BatchChecker {
806    /// Attaches the given start timestamp to the checker.
807    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
808        self.start = start;
809        self
810    }
811
812    /// Attaches the given end timestamp to the checker.
813    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
814        self.end = end;
815        self
816    }
817
818    /// Returns true if the given batch is monotonic and behind
819    /// the last batch.
820    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
821        batch.check_monotonic()?;
822
823        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
824            && start > first
825        {
826            return Err(format!(
827                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
828                start, first
829            ));
830        }
831        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
832            && end <= last
833        {
834            return Err(format!(
835                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
836                end, last
837            ));
838        }
839
840        // Checks the batch is behind the last batch.
841        // Then Updates the last batch.
842        let res = self
843            .last_batch
844            .as_ref()
845            .map(|last| last.check_next_batch(batch))
846            .unwrap_or(Ok(()));
847        self.last_batch = Some(batch.clone());
848        res
849    }
850
851    /// Formats current batch and last batch for debug.
852    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
853        use std::fmt::Write;
854
855        let mut message = String::new();
856        if let Some(last) = &self.last_batch {
857            write!(
858                message,
859                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
860                last.primary_key(),
861                last.last_timestamp(),
862                last.last_sequence()
863            )
864            .unwrap();
865        }
866        write!(
867            message,
868            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
869            batch.primary_key(),
870            batch.timestamps(),
871            batch.sequences()
872        )
873        .unwrap();
874
875        message
876    }
877
878    /// Checks batches from the part range are monotonic. Otherwise, panics.
879    pub(crate) fn ensure_part_range_batch(
880        &mut self,
881        scanner: &str,
882        region_id: store_api::storage::RegionId,
883        partition: usize,
884        part_range: store_api::region_engine::PartitionRange,
885        batch: &Batch,
886    ) {
887        if let Err(e) = self.check_monotonic(batch) {
888            let err_msg = format!(
889                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
890                scanner, e, region_id, partition, part_range,
891            );
892            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
893            // Only print the number of row in the panic message.
894            panic!("{err_msg}, batch rows: {}", batch.num_rows());
895        }
896    }
897}
898
899/// Len of timestamp in arrow row format.
900const TIMESTAMP_KEY_LEN: usize = 9;
901
902/// Helper function to concat arrays from `iter`.
903fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
904    let arrays: Vec<_> = iter.collect();
905    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
906    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
907}
908
909/// A column in a [Batch].
910#[derive(Debug, PartialEq, Eq, Clone)]
911pub struct BatchColumn {
912    /// Id of the column.
913    pub column_id: ColumnId,
914    /// Data of the column.
915    pub data: VectorRef,
916}
917
918/// Builder to build [Batch].
919pub struct BatchBuilder {
920    primary_key: Vec<u8>,
921    timestamps: Option<VectorRef>,
922    sequences: Option<Arc<UInt64Vector>>,
923    op_types: Option<Arc<UInt8Vector>>,
924    fields: Vec<BatchColumn>,
925}
926
927impl BatchBuilder {
928    /// Creates a new [BatchBuilder] with primary key.
929    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
930        BatchBuilder {
931            primary_key,
932            timestamps: None,
933            sequences: None,
934            op_types: None,
935            fields: Vec::new(),
936        }
937    }
938
939    /// Creates a new [BatchBuilder] with all required columns.
940    pub fn with_required_columns(
941        primary_key: Vec<u8>,
942        timestamps: VectorRef,
943        sequences: Arc<UInt64Vector>,
944        op_types: Arc<UInt8Vector>,
945    ) -> BatchBuilder {
946        BatchBuilder {
947            primary_key,
948            timestamps: Some(timestamps),
949            sequences: Some(sequences),
950            op_types: Some(op_types),
951            fields: Vec::new(),
952        }
953    }
954
955    /// Set all field columns.
956    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
957        self.fields = fields;
958        self
959    }
960
961    /// Push a field column.
962    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
963        self.fields.push(column);
964        self
965    }
966
967    /// Push an array as a field.
968    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
969        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
970        self.fields.push(BatchColumn {
971            column_id,
972            data: vector,
973        });
974
975        Ok(self)
976    }
977
978    /// Try to set an array as timestamps.
979    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
980        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
981        ensure!(
982            vector.data_type().is_timestamp(),
983            InvalidBatchSnafu {
984                reason: format!("{:?} is not a timestamp type", vector.data_type()),
985            }
986        );
987
988        self.timestamps = Some(vector);
989        Ok(self)
990    }
991
992    /// Try to set an array as sequences.
993    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
994        ensure!(
995            *array.data_type() == arrow::datatypes::DataType::UInt64,
996            InvalidBatchSnafu {
997                reason: "sequence array is not UInt64 type",
998            }
999        );
1000        // Safety: The cast must success as we have ensured it is uint64 type.
1001        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1002        self.sequences = Some(vector);
1003
1004        Ok(self)
1005    }
1006
1007    /// Try to set an array as op types.
1008    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1009        ensure!(
1010            *array.data_type() == arrow::datatypes::DataType::UInt8,
1011            InvalidBatchSnafu {
1012                reason: "sequence array is not UInt8 type",
1013            }
1014        );
1015        // Safety: The cast must success as we have ensured it is uint64 type.
1016        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1017        self.op_types = Some(vector);
1018
1019        Ok(self)
1020    }
1021
1022    /// Builds the [Batch].
1023    pub fn build(self) -> Result<Batch> {
1024        let timestamps = self.timestamps.context(InvalidBatchSnafu {
1025            reason: "missing timestamps",
1026        })?;
1027        let sequences = self.sequences.context(InvalidBatchSnafu {
1028            reason: "missing sequences",
1029        })?;
1030        let op_types = self.op_types.context(InvalidBatchSnafu {
1031            reason: "missing op_types",
1032        })?;
1033        // Our storage format ensure these columns are not nullable so
1034        // we use assert here.
1035        assert_eq!(0, timestamps.null_count());
1036        assert_eq!(0, sequences.null_count());
1037        assert_eq!(0, op_types.null_count());
1038
1039        let ts_len = timestamps.len();
1040        ensure!(
1041            sequences.len() == ts_len,
1042            InvalidBatchSnafu {
1043                reason: format!(
1044                    "sequence have different len {} != {}",
1045                    sequences.len(),
1046                    ts_len
1047                ),
1048            }
1049        );
1050        ensure!(
1051            op_types.len() == ts_len,
1052            InvalidBatchSnafu {
1053                reason: format!(
1054                    "op type have different len {} != {}",
1055                    op_types.len(),
1056                    ts_len
1057                ),
1058            }
1059        );
1060        for column in &self.fields {
1061            ensure!(
1062                column.data.len() == ts_len,
1063                InvalidBatchSnafu {
1064                    reason: format!(
1065                        "column {} has different len {} != {}",
1066                        column.column_id,
1067                        column.data.len(),
1068                        ts_len
1069                    ),
1070                }
1071            );
1072        }
1073
1074        Ok(Batch {
1075            primary_key: self.primary_key,
1076            pk_values: None,
1077            timestamps,
1078            sequences,
1079            op_types,
1080            fields: self.fields,
1081            fields_idx: None,
1082        })
1083    }
1084}
1085
1086impl From<Batch> for BatchBuilder {
1087    fn from(batch: Batch) -> Self {
1088        Self {
1089            primary_key: batch.primary_key,
1090            timestamps: Some(batch.timestamps),
1091            sequences: Some(batch.sequences),
1092            op_types: Some(batch.op_types),
1093            fields: batch.fields,
1094        }
1095    }
1096}
1097
1098/// Async [Batch] reader and iterator wrapper.
1099///
1100/// This is the data source for SST writers or internal readers.
1101pub enum Source {
1102    /// Source from a [BoxedBatchReader].
1103    Reader(BoxedBatchReader),
1104    /// Source from a [BoxedBatchIterator].
1105    Iter(BoxedBatchIterator),
1106    /// Source from a [BoxedBatchStream].
1107    Stream(BoxedBatchStream),
1108    /// Source from a [PruneReader].
1109    PruneReader(PruneReader),
1110}
1111
1112impl Source {
1113    /// Returns next [Batch] from this data source.
1114    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1115        match self {
1116            Source::Reader(reader) => reader.next_batch().await,
1117            Source::Iter(iter) => iter.next().transpose(),
1118            Source::Stream(stream) => stream.try_next().await,
1119            Source::PruneReader(reader) => reader.next_batch().await,
1120        }
1121    }
1122}
1123
1124/// Async [RecordBatch] reader and iterator wrapper for flat format.
1125pub enum FlatSource {
1126    /// Source from a [BoxedRecordBatchIterator].
1127    Iter(BoxedRecordBatchIterator),
1128    /// Source from a [BoxedRecordBatchStream].
1129    Stream(BoxedRecordBatchStream),
1130}
1131
1132impl FlatSource {
1133    /// Returns next [RecordBatch] from this data source.
1134    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1135        match self {
1136            FlatSource::Iter(iter) => iter.next().transpose(),
1137            FlatSource::Stream(stream) => stream.try_next().await,
1138        }
1139    }
1140}
1141
1142/// Async batch reader.
1143///
1144/// The reader must guarantee [Batch]es returned by it have the same schema.
1145#[async_trait]
1146pub trait BatchReader: Send {
1147    /// Fetch next [Batch].
1148    ///
1149    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1150    /// again won't return batch again.
1151    ///
1152    /// If `Err` is returned, caller should not call this method again, the implementor
1153    /// may or may not panic in such case.
1154    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1155}
1156
1157/// Pointer to [BatchReader].
1158pub type BoxedBatchReader = Box<dyn BatchReader>;
1159
1160/// Pointer to a stream that yields [Batch].
1161pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1162
1163/// Pointer to a stream that yields [RecordBatch].
1164pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1165
1166#[async_trait::async_trait]
1167impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1168    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1169        (**self).next_batch().await
1170    }
1171}
1172
1173/// Local metrics for scanners.
1174#[derive(Debug, Default)]
1175pub(crate) struct ScannerMetrics {
1176    /// Duration to scan data.
1177    scan_cost: Duration,
1178    /// Duration while waiting for `yield`.
1179    yield_cost: Duration,
1180    /// Number of batches returned.
1181    num_batches: usize,
1182    /// Number of rows returned.
1183    num_rows: usize,
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1189    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1190    use store_api::codec::PrimaryKeyEncoding;
1191    use store_api::storage::consts::ReservedColumnId;
1192
1193    use super::*;
1194    use crate::error::Error;
1195    use crate::test_util::new_batch_builder;
1196
1197    fn new_batch(
1198        timestamps: &[i64],
1199        sequences: &[u64],
1200        op_types: &[OpType],
1201        field: &[u64],
1202    ) -> Batch {
1203        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1204            .build()
1205            .unwrap()
1206    }
1207
1208    fn new_batch_with_u64_fields(
1209        timestamps: &[i64],
1210        sequences: &[u64],
1211        op_types: &[OpType],
1212        fields: &[(ColumnId, &[Option<u64>])],
1213    ) -> Batch {
1214        assert_eq!(timestamps.len(), sequences.len());
1215        assert_eq!(timestamps.len(), op_types.len());
1216        for (_, values) in fields {
1217            assert_eq!(timestamps.len(), values.len());
1218        }
1219
1220        let mut builder = BatchBuilder::new(b"test".to_vec());
1221        builder
1222            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1223                timestamps.iter().copied(),
1224            )))
1225            .unwrap()
1226            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1227                sequences.iter().copied(),
1228            )))
1229            .unwrap()
1230            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1231                op_types.iter().map(|v| *v as u8),
1232            )))
1233            .unwrap();
1234
1235        for (col_id, values) in fields {
1236            builder
1237                .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1238                .unwrap();
1239        }
1240
1241        builder.build().unwrap()
1242    }
1243
1244    fn new_batch_without_fields(
1245        timestamps: &[i64],
1246        sequences: &[u64],
1247        op_types: &[OpType],
1248    ) -> Batch {
1249        assert_eq!(timestamps.len(), sequences.len());
1250        assert_eq!(timestamps.len(), op_types.len());
1251
1252        let mut builder = BatchBuilder::new(b"test".to_vec());
1253        builder
1254            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1255                timestamps.iter().copied(),
1256            )))
1257            .unwrap()
1258            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1259                sequences.iter().copied(),
1260            )))
1261            .unwrap()
1262            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1263                op_types.iter().map(|v| *v as u8),
1264            )))
1265            .unwrap();
1266
1267        builder.build().unwrap()
1268    }
1269
1270    #[test]
1271    fn test_empty_batch() {
1272        let batch = Batch::empty();
1273        assert!(batch.is_empty());
1274        assert_eq!(None, batch.first_timestamp());
1275        assert_eq!(None, batch.last_timestamp());
1276        assert_eq!(None, batch.first_sequence());
1277        assert_eq!(None, batch.last_sequence());
1278        assert!(batch.timestamps_native().is_none());
1279    }
1280
1281    #[test]
1282    fn test_first_last_one() {
1283        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1284        assert_eq!(
1285            Timestamp::new_millisecond(1),
1286            batch.first_timestamp().unwrap()
1287        );
1288        assert_eq!(
1289            Timestamp::new_millisecond(1),
1290            batch.last_timestamp().unwrap()
1291        );
1292        assert_eq!(2, batch.first_sequence().unwrap());
1293        assert_eq!(2, batch.last_sequence().unwrap());
1294    }
1295
1296    #[test]
1297    fn test_first_last_multiple() {
1298        let batch = new_batch(
1299            &[1, 2, 3],
1300            &[11, 12, 13],
1301            &[OpType::Put, OpType::Put, OpType::Put],
1302            &[21, 22, 23],
1303        );
1304        assert_eq!(
1305            Timestamp::new_millisecond(1),
1306            batch.first_timestamp().unwrap()
1307        );
1308        assert_eq!(
1309            Timestamp::new_millisecond(3),
1310            batch.last_timestamp().unwrap()
1311        );
1312        assert_eq!(11, batch.first_sequence().unwrap());
1313        assert_eq!(13, batch.last_sequence().unwrap());
1314    }
1315
1316    #[test]
1317    fn test_slice() {
1318        let batch = new_batch(
1319            &[1, 2, 3, 4],
1320            &[11, 12, 13, 14],
1321            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1322            &[21, 22, 23, 24],
1323        );
1324        let batch = batch.slice(1, 2);
1325        let expect = new_batch(
1326            &[2, 3],
1327            &[12, 13],
1328            &[OpType::Delete, OpType::Put],
1329            &[22, 23],
1330        );
1331        assert_eq!(expect, batch);
1332    }
1333
1334    #[test]
1335    fn test_timestamps_native() {
1336        let batch = new_batch(
1337            &[1, 2, 3, 4],
1338            &[11, 12, 13, 14],
1339            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1340            &[21, 22, 23, 24],
1341        );
1342        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1343    }
1344
1345    #[test]
1346    fn test_concat_empty() {
1347        let err = Batch::concat(vec![]).unwrap_err();
1348        assert!(
1349            matches!(err, Error::InvalidBatch { .. }),
1350            "unexpected err: {err}"
1351        );
1352    }
1353
1354    #[test]
1355    fn test_concat_one() {
1356        let batch = new_batch(&[], &[], &[], &[]);
1357        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1358        assert_eq!(batch, actual);
1359
1360        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1361        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1362        assert_eq!(batch, actual);
1363    }
1364
1365    #[test]
1366    fn test_concat_multiple() {
1367        let batches = vec![
1368            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1369            new_batch(
1370                &[3, 4, 5],
1371                &[13, 14, 15],
1372                &[OpType::Put, OpType::Delete, OpType::Put],
1373                &[23, 24, 25],
1374            ),
1375            new_batch(&[], &[], &[], &[]),
1376            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1377        ];
1378        let batch = Batch::concat(batches).unwrap();
1379        let expect = new_batch(
1380            &[1, 2, 3, 4, 5, 6],
1381            &[11, 12, 13, 14, 15, 16],
1382            &[
1383                OpType::Put,
1384                OpType::Put,
1385                OpType::Put,
1386                OpType::Delete,
1387                OpType::Put,
1388                OpType::Put,
1389            ],
1390            &[21, 22, 23, 24, 25, 26],
1391        );
1392        assert_eq!(expect, batch);
1393    }
1394
1395    #[test]
1396    fn test_concat_different() {
1397        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1398        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1399        batch2.primary_key = b"hello".to_vec();
1400        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1401        assert!(
1402            matches!(err, Error::InvalidBatch { .. }),
1403            "unexpected err: {err}"
1404        );
1405    }
1406
1407    #[test]
1408    fn test_concat_different_fields() {
1409        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1410        let fields = vec![
1411            batch1.fields()[0].clone(),
1412            BatchColumn {
1413                column_id: 2,
1414                data: Arc::new(UInt64Vector::from_slice([2])),
1415            },
1416        ];
1417        // Batch 2 has more fields.
1418        let batch2 = batch1.clone().with_fields(fields).unwrap();
1419        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1420        assert!(
1421            matches!(err, Error::InvalidBatch { .. }),
1422            "unexpected err: {err}"
1423        );
1424
1425        // Batch 2 has different field.
1426        let fields = vec![BatchColumn {
1427            column_id: 2,
1428            data: Arc::new(UInt64Vector::from_slice([2])),
1429        }];
1430        let batch2 = batch1.clone().with_fields(fields).unwrap();
1431        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1432        assert!(
1433            matches!(err, Error::InvalidBatch { .. }),
1434            "unexpected err: {err}"
1435        );
1436    }
1437
1438    #[test]
1439    fn test_filter_deleted_empty() {
1440        let mut batch = new_batch(&[], &[], &[], &[]);
1441        batch.filter_deleted().unwrap();
1442        assert!(batch.is_empty());
1443    }
1444
1445    #[test]
1446    fn test_filter_deleted() {
1447        let mut batch = new_batch(
1448            &[1, 2, 3, 4],
1449            &[11, 12, 13, 14],
1450            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1451            &[21, 22, 23, 24],
1452        );
1453        batch.filter_deleted().unwrap();
1454        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1455        assert_eq!(expect, batch);
1456
1457        let mut batch = new_batch(
1458            &[1, 2, 3, 4],
1459            &[11, 12, 13, 14],
1460            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1461            &[21, 22, 23, 24],
1462        );
1463        let expect = batch.clone();
1464        batch.filter_deleted().unwrap();
1465        assert_eq!(expect, batch);
1466    }
1467
1468    #[test]
1469    fn test_filter_by_sequence() {
1470        // Filters put only.
1471        let mut batch = new_batch(
1472            &[1, 2, 3, 4],
1473            &[11, 12, 13, 14],
1474            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1475            &[21, 22, 23, 24],
1476        );
1477        batch
1478            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1479            .unwrap();
1480        let expect = new_batch(
1481            &[1, 2, 3],
1482            &[11, 12, 13],
1483            &[OpType::Put, OpType::Put, OpType::Put],
1484            &[21, 22, 23],
1485        );
1486        assert_eq!(expect, batch);
1487
1488        // Filters to empty.
1489        let mut batch = new_batch(
1490            &[1, 2, 3, 4],
1491            &[11, 12, 13, 14],
1492            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1493            &[21, 22, 23, 24],
1494        );
1495
1496        batch
1497            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1498            .unwrap();
1499        assert!(batch.is_empty());
1500
1501        // None filter.
1502        let mut batch = new_batch(
1503            &[1, 2, 3, 4],
1504            &[11, 12, 13, 14],
1505            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1506            &[21, 22, 23, 24],
1507        );
1508        let expect = batch.clone();
1509        batch.filter_by_sequence(None).unwrap();
1510        assert_eq!(expect, batch);
1511
1512        // Filter a empty batch
1513        let mut batch = new_batch(&[], &[], &[], &[]);
1514        batch
1515            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1516            .unwrap();
1517        assert!(batch.is_empty());
1518
1519        // Filter a empty batch with None
1520        let mut batch = new_batch(&[], &[], &[], &[]);
1521        batch.filter_by_sequence(None).unwrap();
1522        assert!(batch.is_empty());
1523
1524        // Test From variant - exclusive lower bound
1525        let mut batch = new_batch(
1526            &[1, 2, 3, 4],
1527            &[11, 12, 13, 14],
1528            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1529            &[21, 22, 23, 24],
1530        );
1531        batch
1532            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1533            .unwrap();
1534        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1535        assert_eq!(expect, batch);
1536
1537        // Test From variant with no matches
1538        let mut batch = new_batch(
1539            &[1, 2, 3, 4],
1540            &[11, 12, 13, 14],
1541            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1542            &[21, 22, 23, 24],
1543        );
1544        batch
1545            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1546            .unwrap();
1547        assert!(batch.is_empty());
1548
1549        // Test Range variant - exclusive lower bound, inclusive upper bound
1550        let mut batch = new_batch(
1551            &[1, 2, 3, 4, 5],
1552            &[11, 12, 13, 14, 15],
1553            &[
1554                OpType::Put,
1555                OpType::Put,
1556                OpType::Put,
1557                OpType::Put,
1558                OpType::Put,
1559            ],
1560            &[21, 22, 23, 24, 25],
1561        );
1562        batch
1563            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1564            .unwrap();
1565        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1566        assert_eq!(expect, batch);
1567
1568        // Test Range variant with mixed operations
1569        let mut batch = new_batch(
1570            &[1, 2, 3, 4, 5],
1571            &[11, 12, 13, 14, 15],
1572            &[
1573                OpType::Put,
1574                OpType::Delete,
1575                OpType::Put,
1576                OpType::Delete,
1577                OpType::Put,
1578            ],
1579            &[21, 22, 23, 24, 25],
1580        );
1581        batch
1582            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1583            .unwrap();
1584        let expect = new_batch(
1585            &[2, 3],
1586            &[12, 13],
1587            &[OpType::Delete, OpType::Put],
1588            &[22, 23],
1589        );
1590        assert_eq!(expect, batch);
1591
1592        // Test Range variant with no matches
1593        let mut batch = new_batch(
1594            &[1, 2, 3, 4],
1595            &[11, 12, 13, 14],
1596            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1597            &[21, 22, 23, 24],
1598        );
1599        batch
1600            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1601            .unwrap();
1602        assert!(batch.is_empty());
1603    }
1604
1605    #[test]
1606    fn test_merge_last_non_null_no_dup() {
1607        let mut batch = new_batch_with_u64_fields(
1608            &[1, 2],
1609            &[2, 1],
1610            &[OpType::Put, OpType::Put],
1611            &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1612        );
1613        let expect = batch.clone();
1614        batch.merge_last_non_null().unwrap();
1615        assert_eq!(expect, batch);
1616    }
1617
1618    #[test]
1619    fn test_merge_last_non_null_fill_null_fields() {
1620        // Rows are already sorted by timestamp asc and sequence desc.
1621        let mut batch = new_batch_with_u64_fields(
1622            &[1, 1, 1],
1623            &[3, 2, 1],
1624            &[OpType::Put, OpType::Put, OpType::Put],
1625            &[
1626                (1, &[None, Some(10), Some(11)]),
1627                (2, &[Some(100), Some(200), Some(300)]),
1628            ],
1629        );
1630        batch.merge_last_non_null().unwrap();
1631
1632        // Field 1 is filled from the first older row (seq=2). Field 2 keeps the base value.
1633        // Filled fields must not be overwritten by even older duplicates.
1634        let expect = new_batch_with_u64_fields(
1635            &[1],
1636            &[3],
1637            &[OpType::Put],
1638            &[(1, &[Some(10)]), (2, &[Some(100)])],
1639        );
1640        assert_eq!(expect, batch);
1641    }
1642
1643    #[test]
1644    fn test_merge_last_non_null_stop_at_delete_row() {
1645        // A delete row in older duplicates should stop filling to avoid resurrecting values before
1646        // deletion.
1647        let mut batch = new_batch_with_u64_fields(
1648            &[1, 1, 1],
1649            &[3, 2, 1],
1650            &[OpType::Put, OpType::Delete, OpType::Put],
1651            &[
1652                (1, &[None, Some(10), Some(11)]),
1653                (2, &[Some(100), Some(200), Some(300)]),
1654            ],
1655        );
1656        batch.merge_last_non_null().unwrap();
1657
1658        let expect = new_batch_with_u64_fields(
1659            &[1],
1660            &[3],
1661            &[OpType::Put],
1662            &[(1, &[None]), (2, &[Some(100)])],
1663        );
1664        assert_eq!(expect, batch);
1665    }
1666
1667    #[test]
1668    fn test_merge_last_non_null_base_delete_no_merge() {
1669        let mut batch = new_batch_with_u64_fields(
1670            &[1, 1],
1671            &[3, 2],
1672            &[OpType::Delete, OpType::Put],
1673            &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1674        );
1675        batch.merge_last_non_null().unwrap();
1676
1677        // Base row is delete, keep it as is and don't merge fields from older rows.
1678        let expect =
1679            new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1680        assert_eq!(expect, batch);
1681    }
1682
1683    #[test]
1684    fn test_merge_last_non_null_multiple_timestamp_groups() {
1685        let mut batch = new_batch_with_u64_fields(
1686            &[1, 1, 2, 3, 3],
1687            &[5, 4, 3, 2, 1],
1688            &[
1689                OpType::Put,
1690                OpType::Put,
1691                OpType::Put,
1692                OpType::Put,
1693                OpType::Put,
1694            ],
1695            &[
1696                (1, &[None, Some(10), Some(20), None, Some(30)]),
1697                (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1698            ],
1699        );
1700        batch.merge_last_non_null().unwrap();
1701
1702        let expect = new_batch_with_u64_fields(
1703            &[1, 2, 3],
1704            &[5, 3, 2],
1705            &[OpType::Put, OpType::Put, OpType::Put],
1706            &[
1707                (1, &[Some(10), Some(20), Some(30)]),
1708                (2, &[Some(100), Some(120), Some(130)]),
1709            ],
1710        );
1711        assert_eq!(expect, batch);
1712    }
1713
1714    #[test]
1715    fn test_merge_last_non_null_no_fields() {
1716        let mut batch = new_batch_without_fields(
1717            &[1, 1, 2],
1718            &[3, 2, 1],
1719            &[OpType::Put, OpType::Put, OpType::Put],
1720        );
1721        batch.merge_last_non_null().unwrap();
1722
1723        let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1724        assert_eq!(expect, batch);
1725    }
1726
1727    #[test]
1728    fn test_filter() {
1729        // Filters put only.
1730        let mut batch = new_batch(
1731            &[1, 2, 3, 4],
1732            &[11, 12, 13, 14],
1733            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1734            &[21, 22, 23, 24],
1735        );
1736        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1737        batch.filter(&predicate).unwrap();
1738        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1739        assert_eq!(expect, batch);
1740
1741        // Filters deletion.
1742        let mut batch = new_batch(
1743            &[1, 2, 3, 4],
1744            &[11, 12, 13, 14],
1745            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1746            &[21, 22, 23, 24],
1747        );
1748        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1749        batch.filter(&predicate).unwrap();
1750        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1751        assert_eq!(expect, batch);
1752
1753        // Filters to empty.
1754        let predicate = BooleanVector::from_vec(vec![false, false]);
1755        batch.filter(&predicate).unwrap();
1756        assert!(batch.is_empty());
1757    }
1758
1759    #[test]
1760    fn test_sort_and_dedup() {
1761        let original = new_batch(
1762            &[2, 3, 1, 4, 5, 2],
1763            &[1, 2, 3, 4, 5, 6],
1764            &[
1765                OpType::Put,
1766                OpType::Put,
1767                OpType::Put,
1768                OpType::Put,
1769                OpType::Put,
1770                OpType::Put,
1771            ],
1772            &[21, 22, 23, 24, 25, 26],
1773        );
1774
1775        let mut batch = original.clone();
1776        batch.sort(true).unwrap();
1777        // It should only keep one timestamp 2.
1778        assert_eq!(
1779            new_batch(
1780                &[1, 2, 3, 4, 5],
1781                &[3, 6, 2, 4, 5],
1782                &[
1783                    OpType::Put,
1784                    OpType::Put,
1785                    OpType::Put,
1786                    OpType::Put,
1787                    OpType::Put,
1788                ],
1789                &[23, 26, 22, 24, 25],
1790            ),
1791            batch
1792        );
1793
1794        let mut batch = original.clone();
1795        batch.sort(false).unwrap();
1796
1797        // It should only keep one timestamp 2.
1798        assert_eq!(
1799            new_batch(
1800                &[1, 2, 2, 3, 4, 5],
1801                &[3, 6, 1, 2, 4, 5],
1802                &[
1803                    OpType::Put,
1804                    OpType::Put,
1805                    OpType::Put,
1806                    OpType::Put,
1807                    OpType::Put,
1808                    OpType::Put,
1809                ],
1810                &[23, 26, 21, 22, 24, 25],
1811            ),
1812            batch
1813        );
1814
1815        let original = new_batch(
1816            &[2, 2, 1],
1817            &[1, 6, 1],
1818            &[OpType::Delete, OpType::Put, OpType::Put],
1819            &[21, 22, 23],
1820        );
1821
1822        let mut batch = original.clone();
1823        batch.sort(true).unwrap();
1824        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1825        assert_eq!(expect, batch);
1826
1827        let mut batch = original.clone();
1828        batch.sort(false).unwrap();
1829        let expect = new_batch(
1830            &[1, 2, 2],
1831            &[1, 6, 1],
1832            &[OpType::Put, OpType::Put, OpType::Delete],
1833            &[23, 22, 21],
1834        );
1835        assert_eq!(expect, batch);
1836    }
1837
1838    #[test]
1839    fn test_get_value() {
1840        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1841
1842        for encoding in encodings {
1843            let codec = build_primary_key_codec_with_fields(
1844                encoding,
1845                [
1846                    (
1847                        ReservedColumnId::table_id(),
1848                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1849                    ),
1850                    (
1851                        ReservedColumnId::tsid(),
1852                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1853                    ),
1854                    (
1855                        100,
1856                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1857                    ),
1858                    (
1859                        200,
1860                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1861                    ),
1862                ]
1863                .into_iter(),
1864            );
1865
1866            let values = [
1867                Value::UInt32(1000),
1868                Value::UInt64(2000),
1869                Value::String("abcdefgh".into()),
1870                Value::String("zyxwvu".into()),
1871            ];
1872            let mut buf = vec![];
1873            codec
1874                .encode_values(
1875                    &[
1876                        (ReservedColumnId::table_id(), values[0].clone()),
1877                        (ReservedColumnId::tsid(), values[1].clone()),
1878                        (100, values[2].clone()),
1879                        (200, values[3].clone()),
1880                    ],
1881                    &mut buf,
1882                )
1883                .unwrap();
1884
1885            let field_col_id = 2;
1886            let mut batch = new_batch_builder(
1887                &buf,
1888                &[1, 2, 3],
1889                &[1, 1, 1],
1890                &[OpType::Put, OpType::Put, OpType::Put],
1891                field_col_id,
1892                &[42, 43, 44],
1893            )
1894            .build()
1895            .unwrap();
1896
1897            let v = batch
1898                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1899                .unwrap()
1900                .unwrap();
1901            assert_eq!(values[0], *v);
1902
1903            let v = batch
1904                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1905                .unwrap()
1906                .unwrap();
1907            assert_eq!(values[1], *v);
1908
1909            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1910            assert_eq!(values[2], *v);
1911
1912            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1913            assert_eq!(values[3], *v);
1914
1915            let v = batch.field_col_value(field_col_id).unwrap();
1916            assert_eq!(v.data.get(0), Value::UInt64(42));
1917            assert_eq!(v.data.get(1), Value::UInt64(43));
1918            assert_eq!(v.data.get(2), Value::UInt64(44));
1919        }
1920    }
1921}