Skip to main content

mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod projection;
25pub(crate) mod prune;
26pub(crate) mod pruner;
27pub mod range;
28#[cfg(feature = "test")]
29pub mod range_cache;
30#[cfg(not(feature = "test"))]
31pub(crate) mod range_cache;
32pub mod scan_region;
33pub mod scan_util;
34pub(crate) mod seq_scan;
35pub mod series_scan;
36pub mod stream;
37pub(crate) mod unordered_scan;
38
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Duration;
42
43use api::v1::OpType;
44use async_trait::async_trait;
45use common_time::Timestamp;
46use datafusion_common::arrow::array::UInt8Array;
47use datatypes::arrow;
48use datatypes::arrow::array::{Array, ArrayRef};
49use datatypes::arrow::compute::SortOptions;
50use datatypes::arrow::record_batch::RecordBatch;
51use datatypes::arrow::row::{RowConverter, SortField};
52use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
53use datatypes::scalars::ScalarVectorBuilder;
54use datatypes::types::TimestampType;
55use datatypes::value::{Value, ValueRef};
56use datatypes::vectors::{
57    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
58    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
59    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
60    VectorRef,
61};
62use futures::TryStreamExt;
63use futures::stream::BoxStream;
64use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
65use snafu::{OptionExt, ResultExt, ensure};
66use store_api::metadata::RegionMetadata;
67use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
68
69use crate::error::{
70    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
71    Result,
72};
73use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
74use crate::read::prune::PruneReader;
75
76/// Storage internal representation of a batch of rows for a primary key (time series).
77///
78/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
79/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
80#[derive(Debug, PartialEq, Clone)]
81pub struct Batch {
82    /// Primary key encoded in a comparable form.
83    primary_key: Vec<u8>,
84    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
85    pk_values: Option<CompositeValues>,
86    /// Timestamps of rows, should be sorted and not null.
87    timestamps: VectorRef,
88    /// Sequences of rows
89    ///
90    /// UInt64 type, not null.
91    sequences: Arc<UInt64Vector>,
92    /// Op types of rows
93    ///
94    /// UInt8 type, not null.
95    op_types: Arc<UInt8Vector>,
96    /// Fields organized in columnar format.
97    fields: Vec<BatchColumn>,
98    /// Cache for field index lookup.
99    fields_idx: Option<HashMap<ColumnId, usize>>,
100}
101
102impl Batch {
103    /// Creates a new batch.
104    pub fn new(
105        primary_key: Vec<u8>,
106        timestamps: VectorRef,
107        sequences: Arc<UInt64Vector>,
108        op_types: Arc<UInt8Vector>,
109        fields: Vec<BatchColumn>,
110    ) -> Result<Batch> {
111        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
112            .with_fields(fields)
113            .build()
114    }
115
116    /// Tries to set fields for the batch.
117    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
118        Batch::new(
119            self.primary_key,
120            self.timestamps,
121            self.sequences,
122            self.op_types,
123            fields,
124        )
125    }
126
127    /// Returns primary key of the batch.
128    pub fn primary_key(&self) -> &[u8] {
129        &self.primary_key
130    }
131
132    /// Returns possibly decoded primary-key values.
133    pub fn pk_values(&self) -> Option<&CompositeValues> {
134        self.pk_values.as_ref()
135    }
136
137    /// Sets possibly decoded primary-key values.
138    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
139        self.pk_values = Some(pk_values);
140    }
141
142    /// Removes possibly decoded primary-key values. For testing only.
143    #[cfg(any(test, feature = "test"))]
144    pub fn remove_pk_values(&mut self) {
145        self.pk_values = None;
146    }
147
148    /// Returns fields in the batch.
149    pub fn fields(&self) -> &[BatchColumn] {
150        &self.fields
151    }
152
153    /// Returns timestamps of the batch.
154    pub fn timestamps(&self) -> &VectorRef {
155        &self.timestamps
156    }
157
158    /// Returns sequences of the batch.
159    pub fn sequences(&self) -> &Arc<UInt64Vector> {
160        &self.sequences
161    }
162
163    /// Returns op types of the batch.
164    pub fn op_types(&self) -> &Arc<UInt8Vector> {
165        &self.op_types
166    }
167
168    /// Returns the number of rows in the batch.
169    pub fn num_rows(&self) -> usize {
170        // All vectors have the same length. We use the length of sequences vector
171        // since it has static type.
172        self.sequences.len()
173    }
174
175    /// Create an empty [`Batch`].
176    #[allow(dead_code)]
177    pub(crate) fn empty() -> Self {
178        Self {
179            primary_key: vec![],
180            pk_values: None,
181            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
182            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
183            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
184            fields: vec![],
185            fields_idx: None,
186        }
187    }
188
189    /// Returns true if the number of rows in the batch is 0.
190    pub fn is_empty(&self) -> bool {
191        self.num_rows() == 0
192    }
193
194    /// Returns the first timestamp in the batch or `None` if the batch is empty.
195    pub fn first_timestamp(&self) -> Option<Timestamp> {
196        if self.timestamps.is_empty() {
197            return None;
198        }
199
200        Some(self.get_timestamp(0))
201    }
202
203    /// Returns the last timestamp in the batch or `None` if the batch is empty.
204    pub fn last_timestamp(&self) -> Option<Timestamp> {
205        if self.timestamps.is_empty() {
206            return None;
207        }
208
209        Some(self.get_timestamp(self.timestamps.len() - 1))
210    }
211
212    /// Returns the first sequence in the batch or `None` if the batch is empty.
213    pub fn first_sequence(&self) -> Option<SequenceNumber> {
214        if self.sequences.is_empty() {
215            return None;
216        }
217
218        Some(self.get_sequence(0))
219    }
220
221    /// Returns the last sequence in the batch or `None` if the batch is empty.
222    pub fn last_sequence(&self) -> Option<SequenceNumber> {
223        if self.sequences.is_empty() {
224            return None;
225        }
226
227        Some(self.get_sequence(self.sequences.len() - 1))
228    }
229
230    /// Replaces the primary key of the batch.
231    ///
232    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
233    /// Be sure to update that field as well.
234    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
235        self.primary_key = primary_key;
236    }
237
238    /// Slice the batch, returning a new batch.
239    ///
240    /// # Panics
241    /// Panics if `offset + length > self.num_rows()`.
242    pub fn slice(&self, offset: usize, length: usize) -> Batch {
243        let fields = self
244            .fields
245            .iter()
246            .map(|column| BatchColumn {
247                column_id: column.column_id,
248                data: column.data.slice(offset, length),
249            })
250            .collect();
251        // We skip using the builder to avoid validating the batch again.
252        Batch {
253            // Now we need to clone the primary key. We could try `Bytes` if
254            // this becomes a bottleneck.
255            primary_key: self.primary_key.clone(),
256            pk_values: self.pk_values.clone(),
257            timestamps: self.timestamps.slice(offset, length),
258            sequences: Arc::new(self.sequences.get_slice(offset, length)),
259            op_types: Arc::new(self.op_types.get_slice(offset, length)),
260            fields,
261            fields_idx: self.fields_idx.clone(),
262        }
263    }
264
265    /// Takes `batches` and concat them into one batch.
266    ///
267    /// All `batches` must have the same primary key.
268    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
269        ensure!(
270            !batches.is_empty(),
271            InvalidBatchSnafu {
272                reason: "empty batches",
273            }
274        );
275        if batches.len() == 1 {
276            // Now we own the `batches` so we could pop it directly.
277            return Ok(batches.pop().unwrap());
278        }
279
280        let primary_key = std::mem::take(&mut batches[0].primary_key);
281        let first = &batches[0];
282        // We took the primary key from the first batch so we don't use `first.primary_key()`.
283        ensure!(
284            batches
285                .iter()
286                .skip(1)
287                .all(|b| b.primary_key() == primary_key),
288            InvalidBatchSnafu {
289                reason: "batches have different primary key",
290            }
291        );
292        for b in batches.iter().skip(1) {
293            ensure!(
294                b.fields.len() == first.fields.len(),
295                InvalidBatchSnafu {
296                    reason: "batches have different field num",
297                }
298            );
299            for (l, r) in b.fields.iter().zip(&first.fields) {
300                ensure!(
301                    l.column_id == r.column_id,
302                    InvalidBatchSnafu {
303                        reason: "batches have different fields",
304                    }
305                );
306            }
307        }
308
309        // We take the primary key from the first batch.
310        let mut builder = BatchBuilder::new(primary_key);
311        // Concat timestamps, sequences, op_types, fields.
312        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
313        builder.timestamps_array(array)?;
314        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
315        builder.sequences_array(array)?;
316        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
317        builder.op_types_array(array)?;
318        for (i, batch_column) in first.fields.iter().enumerate() {
319            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
320            builder.push_field_array(batch_column.column_id, array)?;
321        }
322
323        builder.build()
324    }
325
326    /// Removes rows whose op type is delete.
327    pub fn filter_deleted(&mut self) -> Result<()> {
328        // Safety: op type column is not null.
329        let array = self.op_types.as_arrow();
330        // Find rows with non-delete op type.
331        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
332        let predicate =
333            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
334        self.filter(&BooleanVector::from(predicate))
335    }
336
337    // Applies the `predicate` to the batch.
338    // Safety: We know the array type so we unwrap on casting.
339    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
340        self.timestamps = self
341            .timestamps
342            .filter(predicate)
343            .context(ComputeVectorSnafu)?;
344        self.sequences = Arc::new(
345            UInt64Vector::try_from_arrow_array(
346                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
347                    .context(ComputeArrowSnafu)?,
348            )
349            .unwrap(),
350        );
351        self.op_types = Arc::new(
352            UInt8Vector::try_from_arrow_array(
353                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
354                    .context(ComputeArrowSnafu)?,
355            )
356            .unwrap(),
357        );
358        for batch_column in &mut self.fields {
359            batch_column.data = batch_column
360                .data
361                .filter(predicate)
362                .context(ComputeVectorSnafu)?;
363        }
364
365        Ok(())
366    }
367
368    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
369    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
370        let seq_range = match sequence {
371            None => return Ok(()),
372            Some(seq_range) => {
373                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
374                else {
375                    return Ok(());
376                };
377                let is_subset = match seq_range {
378                    SequenceRange::Gt { min } => min < first,
379                    SequenceRange::LtEq { max } => max >= last,
380                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
381                };
382                if is_subset {
383                    return Ok(());
384                }
385                seq_range
386            }
387        };
388
389        let seqs = self.sequences.as_arrow();
390        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
391
392        let predicate = BooleanVector::from(predicate);
393        self.filter(&predicate)?;
394
395        Ok(())
396    }
397
398    /// Sorts rows in the batch. If `dedup` is true, it also removes
399    /// duplicated rows according to primary keys.
400    ///
401    /// It orders rows by timestamp, sequence desc and only keep the latest
402    /// row for the same timestamp. It doesn't consider op type as sequence
403    /// should already provide uniqueness for a row.
404    pub fn sort(&mut self, dedup: bool) -> Result<()> {
405        // If building a converter each time is costly, we may allow passing a
406        // converter.
407        let converter = RowConverter::new(vec![
408            SortField::new(self.timestamps.data_type().as_arrow_type()),
409            SortField::new_with_options(
410                self.sequences.data_type().as_arrow_type(),
411                SortOptions {
412                    descending: true,
413                    ..Default::default()
414                },
415            ),
416        ])
417        .context(ComputeArrowSnafu)?;
418        // Columns to sort.
419        let columns = [
420            self.timestamps.to_arrow_array(),
421            self.sequences.to_arrow_array(),
422        ];
423        let rows = converter.convert_columns(&columns).unwrap();
424        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
425
426        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
427        if !was_sorted {
428            to_sort.sort_unstable_by_key(|x| x.1);
429        }
430
431        let num_rows = to_sort.len();
432        if dedup {
433            // Dedup by timestamps.
434            to_sort.dedup_by(|left, right| {
435                debug_assert_eq!(18, left.1.as_ref().len());
436                debug_assert_eq!(18, right.1.as_ref().len());
437                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
438                // We only compare the timestamp part and ignore sequence.
439                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
440            });
441        }
442        let no_dedup = to_sort.len() == num_rows;
443
444        if was_sorted && no_dedup {
445            return Ok(());
446        }
447        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
448        self.take_in_place(&indices)
449    }
450
451    /// Merges duplicated timestamps in the batch by keeping the latest non-null field values.
452    ///
453    /// Rows must already be sorted by timestamp (ascending) and sequence (descending).
454    ///
455    /// This method deduplicates rows with the same timestamp (keeping the first row in each
456    /// timestamp range as the base row) and fills null fields from subsequent rows until all
457    /// fields are filled or a delete operation is encountered.
458    pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
459        let num_rows = self.num_rows();
460        if num_rows < 2 {
461            return Ok(());
462        }
463
464        let Some(timestamps) = self.timestamps_native() else {
465            return Ok(());
466        };
467
468        // Fast path: check if there are any duplicate timestamps.
469        let mut has_dup = false;
470        let mut group_count = 1;
471        for i in 1..num_rows {
472            has_dup |= timestamps[i] == timestamps[i - 1];
473            group_count += (timestamps[i] != timestamps[i - 1]) as usize;
474        }
475        if !has_dup {
476            return Ok(());
477        }
478
479        let num_fields = self.fields.len();
480        let op_types = self.op_types.as_arrow().values();
481
482        let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
483        let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
484            .map(|_| Vec::with_capacity(group_count))
485            .collect();
486
487        let mut start = 0;
488        while start < num_rows {
489            let ts = timestamps[start];
490            let mut end = start + 1;
491            while end < num_rows && timestamps[end] == ts {
492                end += 1;
493            }
494
495            let group_pos = base_indices.len();
496            base_indices.push(start as u32);
497
498            if num_fields > 0 {
499                // Default: take the base row for all fields.
500                for idx in &mut field_indices {
501                    idx.push(start as u32);
502                }
503
504                let base_deleted = op_types[start] == OpType::Delete as u8;
505                if !base_deleted {
506                    // Track fields that are null in the base row and try to fill them from older
507                    // rows in the same timestamp range.
508                    let mut missing_fields = Vec::new();
509                    for (field_idx, col) in self.fields.iter().enumerate() {
510                        if col.data.is_null(start) {
511                            missing_fields.push(field_idx);
512                        }
513                    }
514
515                    if !missing_fields.is_empty() {
516                        for row_idx in (start + 1)..end {
517                            if op_types[row_idx] == OpType::Delete as u8 {
518                                break;
519                            }
520
521                            missing_fields.retain(|&field_idx| {
522                                if self.fields[field_idx].data.is_null(row_idx) {
523                                    true
524                                } else {
525                                    field_indices[field_idx][group_pos] = row_idx as u32;
526                                    false
527                                }
528                            });
529
530                            if missing_fields.is_empty() {
531                                break;
532                            }
533                        }
534                    }
535                }
536            }
537
538            start = end;
539        }
540
541        let base_indices = UInt32Vector::from_vec(base_indices);
542        self.timestamps = self
543            .timestamps
544            .take(&base_indices)
545            .context(ComputeVectorSnafu)?;
546        let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
547            .context(ComputeArrowSnafu)?;
548        // Safety: We know the array and vector type.
549        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
550        let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
551            .context(ComputeArrowSnafu)?;
552        // Safety: We know the array and vector type.
553        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
554
555        for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
556            let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
557            batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
558        }
559
560        Ok(())
561    }
562
563    /// Returns the estimated memory size of the batch.
564    pub fn memory_size(&self) -> usize {
565        let mut size = std::mem::size_of::<Self>();
566        size += self.primary_key.len();
567        size += self.timestamps.memory_size();
568        size += self.sequences.memory_size();
569        size += self.op_types.memory_size();
570        for batch_column in &self.fields {
571            size += batch_column.data.memory_size();
572        }
573        size
574    }
575
576    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
577    pub(crate) fn projected_fields(
578        metadata: &RegionMetadata,
579        projection: &[ColumnId],
580    ) -> Vec<(ColumnId, ConcreteDataType)> {
581        let projected_ids: HashSet<_> = projection.iter().copied().collect();
582        metadata
583            .field_columns()
584            .filter_map(|column| {
585                if projected_ids.contains(&column.column_id) {
586                    Some((column.column_id, column.column_schema.data_type.clone()))
587                } else {
588                    None
589                }
590            })
591            .collect()
592    }
593
594    /// Returns timestamps in a native slice or `None` if the batch is empty.
595    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
596        if self.timestamps.is_empty() {
597            return None;
598        }
599
600        let values = match self.timestamps.data_type() {
601            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
602                .timestamps
603                .as_any()
604                .downcast_ref::<TimestampSecondVector>()
605                .unwrap()
606                .as_arrow()
607                .values(),
608            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
609                .timestamps
610                .as_any()
611                .downcast_ref::<TimestampMillisecondVector>()
612                .unwrap()
613                .as_arrow()
614                .values(),
615            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
616                .timestamps
617                .as_any()
618                .downcast_ref::<TimestampMicrosecondVector>()
619                .unwrap()
620                .as_arrow()
621                .values(),
622            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
623                .timestamps
624                .as_any()
625                .downcast_ref::<TimestampNanosecondVector>()
626                .unwrap()
627                .as_arrow()
628                .values(),
629            other => panic!("timestamps in a Batch has other type {:?}", other),
630        };
631
632        Some(values)
633    }
634
635    /// Takes the batch in place.
636    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
637        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
638        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
639            .context(ComputeArrowSnafu)?;
640        // Safety: we know the array and vector type.
641        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
642        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
643            .context(ComputeArrowSnafu)?;
644        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
645        for batch_column in &mut self.fields {
646            batch_column.data = batch_column
647                .data
648                .take(indices)
649                .context(ComputeVectorSnafu)?;
650        }
651
652        Ok(())
653    }
654
655    /// Gets a timestamp at given `index`.
656    ///
657    /// # Panics
658    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
659    fn get_timestamp(&self, index: usize) -> Timestamp {
660        match self.timestamps.get_ref(index) {
661            ValueRef::Timestamp(timestamp) => timestamp,
662
663            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
664            value => panic!("{:?} is not a timestamp", value),
665        }
666    }
667
668    /// Gets a sequence at given `index`.
669    ///
670    /// # Panics
671    /// Panics if `index` is out-of-bound or the sequence vector returns null.
672    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
673        // Safety: sequences is not null so it actually returns Some.
674        self.sequences.get_data(index).unwrap()
675    }
676
677    /// Checks the batch is monotonic by timestamps.
678    #[cfg(debug_assertions)]
679    #[allow(dead_code)]
680    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
681        use std::cmp::Ordering;
682        if self.timestamps_native().is_none() {
683            return Ok(());
684        }
685
686        let timestamps = self.timestamps_native().unwrap();
687        let sequences = self.sequences.as_arrow().values();
688        for (i, window) in timestamps.windows(2).enumerate() {
689            let current = window[0];
690            let next = window[1];
691            let current_sequence = sequences[i];
692            let next_sequence = sequences[i + 1];
693            match current.cmp(&next) {
694                Ordering::Less => {
695                    // The current timestamp is less than the next timestamp.
696                    continue;
697                }
698                Ordering::Equal => {
699                    // The current timestamp is equal to the next timestamp.
700                    if current_sequence < next_sequence {
701                        return Err(format!(
702                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
703                            current, next, current_sequence, next_sequence, i
704                        ));
705                    }
706                }
707                Ordering::Greater => {
708                    // The current timestamp is greater than the next timestamp.
709                    return Err(format!(
710                        "timestamps are not monotonic: {} > {}, index: {}",
711                        current, next, i
712                    ));
713                }
714            }
715        }
716
717        Ok(())
718    }
719
720    /// Returns Ok if the given batch is behind the current batch.
721    #[cfg(debug_assertions)]
722    #[allow(dead_code)]
723    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
724        // Checks the primary key
725        if self.primary_key() < other.primary_key() {
726            return Ok(());
727        }
728        if self.primary_key() > other.primary_key() {
729            return Err(format!(
730                "primary key is not monotonic: {:?} > {:?}",
731                self.primary_key(),
732                other.primary_key()
733            ));
734        }
735        // Checks the timestamp.
736        if self.last_timestamp() < other.first_timestamp() {
737            return Ok(());
738        }
739        if self.last_timestamp() > other.first_timestamp() {
740            return Err(format!(
741                "timestamps are not monotonic: {:?} > {:?}",
742                self.last_timestamp(),
743                other.first_timestamp()
744            ));
745        }
746        // Checks the sequence.
747        if self.last_sequence() >= other.first_sequence() {
748            return Ok(());
749        }
750        Err(format!(
751            "sequences are not monotonic: {:?} < {:?}",
752            self.last_sequence(),
753            other.first_sequence()
754        ))
755    }
756
757    /// Returns the value of the column in the primary key.
758    ///
759    /// Lazily decodes the primary key and caches the result.
760    pub fn pk_col_value(
761        &mut self,
762        codec: &dyn PrimaryKeyCodec,
763        col_idx_in_pk: usize,
764        column_id: ColumnId,
765    ) -> Result<Option<&Value>> {
766        if self.pk_values.is_none() {
767            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
768        }
769
770        let pk_values = self.pk_values.as_ref().unwrap();
771        Ok(match pk_values {
772            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
773            CompositeValues::Sparse(values) => values.get(&column_id),
774        })
775    }
776
777    /// Returns values of the field in the batch.
778    ///
779    /// Lazily caches the field index.
780    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
781        if self.fields_idx.is_none() {
782            self.fields_idx = Some(
783                self.fields
784                    .iter()
785                    .enumerate()
786                    .map(|(i, c)| (c.column_id, i))
787                    .collect(),
788            );
789        }
790
791        self.fields_idx
792            .as_ref()
793            .unwrap()
794            .get(&column_id)
795            .map(|&idx| &self.fields[idx])
796    }
797}
798
799/// A struct to check the batch is monotonic.
800#[cfg(debug_assertions)]
801#[derive(Default)]
802#[allow(dead_code)]
803pub(crate) struct BatchChecker {
804    last_batch: Option<Batch>,
805    start: Option<Timestamp>,
806    end: Option<Timestamp>,
807}
808
809#[cfg(debug_assertions)]
810#[allow(dead_code)]
811impl BatchChecker {
812    /// Attaches the given start timestamp to the checker.
813    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
814        self.start = start;
815        self
816    }
817
818    /// Attaches the given end timestamp to the checker.
819    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
820        self.end = end;
821        self
822    }
823
824    /// Returns true if the given batch is monotonic and behind
825    /// the last batch.
826    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
827        batch.check_monotonic()?;
828
829        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
830            && start > first
831        {
832            return Err(format!(
833                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
834                start, first
835            ));
836        }
837        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
838            && end <= last
839        {
840            return Err(format!(
841                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
842                end, last
843            ));
844        }
845
846        // Checks the batch is behind the last batch.
847        // Then Updates the last batch.
848        let res = self
849            .last_batch
850            .as_ref()
851            .map(|last| last.check_next_batch(batch))
852            .unwrap_or(Ok(()));
853        self.last_batch = Some(batch.clone());
854        res
855    }
856
857    /// Formats current batch and last batch for debug.
858    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
859        use std::fmt::Write;
860
861        let mut message = String::new();
862        if let Some(last) = &self.last_batch {
863            write!(
864                message,
865                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
866                last.primary_key(),
867                last.last_timestamp(),
868                last.last_sequence()
869            )
870            .unwrap();
871        }
872        write!(
873            message,
874            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
875            batch.primary_key(),
876            batch.timestamps(),
877            batch.sequences()
878        )
879        .unwrap();
880
881        message
882    }
883
884    /// Checks batches from the part range are monotonic. Otherwise, panics.
885    pub(crate) fn ensure_part_range_batch(
886        &mut self,
887        scanner: &str,
888        region_id: store_api::storage::RegionId,
889        partition: usize,
890        part_range: store_api::region_engine::PartitionRange,
891        batch: &Batch,
892    ) {
893        if let Err(e) = self.check_monotonic(batch) {
894            let err_msg = format!(
895                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
896                scanner, e, region_id, partition, part_range,
897            );
898            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
899            // Only print the number of row in the panic message.
900            panic!("{err_msg}, batch rows: {}", batch.num_rows());
901        }
902    }
903}
904
905/// Len of timestamp in arrow row format.
906const TIMESTAMP_KEY_LEN: usize = 9;
907
908/// Helper function to concat arrays from `iter`.
909fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
910    let arrays: Vec<_> = iter.collect();
911    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
912    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
913}
914
915/// A column in a [Batch].
916#[derive(Debug, PartialEq, Eq, Clone)]
917pub struct BatchColumn {
918    /// Id of the column.
919    pub column_id: ColumnId,
920    /// Data of the column.
921    pub data: VectorRef,
922}
923
924/// Builder to build [Batch].
925pub struct BatchBuilder {
926    primary_key: Vec<u8>,
927    timestamps: Option<VectorRef>,
928    sequences: Option<Arc<UInt64Vector>>,
929    op_types: Option<Arc<UInt8Vector>>,
930    fields: Vec<BatchColumn>,
931}
932
933impl BatchBuilder {
934    /// Creates a new [BatchBuilder] with primary key.
935    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
936        BatchBuilder {
937            primary_key,
938            timestamps: None,
939            sequences: None,
940            op_types: None,
941            fields: Vec::new(),
942        }
943    }
944
945    /// Creates a new [BatchBuilder] with all required columns.
946    pub fn with_required_columns(
947        primary_key: Vec<u8>,
948        timestamps: VectorRef,
949        sequences: Arc<UInt64Vector>,
950        op_types: Arc<UInt8Vector>,
951    ) -> BatchBuilder {
952        BatchBuilder {
953            primary_key,
954            timestamps: Some(timestamps),
955            sequences: Some(sequences),
956            op_types: Some(op_types),
957            fields: Vec::new(),
958        }
959    }
960
961    /// Set all field columns.
962    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
963        self.fields = fields;
964        self
965    }
966
967    /// Push a field column.
968    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
969        self.fields.push(column);
970        self
971    }
972
973    /// Push an array as a field.
974    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
975        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
976        self.fields.push(BatchColumn {
977            column_id,
978            data: vector,
979        });
980
981        Ok(self)
982    }
983
984    /// Try to set an array as timestamps.
985    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
986        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
987        ensure!(
988            vector.data_type().is_timestamp(),
989            InvalidBatchSnafu {
990                reason: format!("{:?} is not a timestamp type", vector.data_type()),
991            }
992        );
993
994        self.timestamps = Some(vector);
995        Ok(self)
996    }
997
998    /// Try to set an array as sequences.
999    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1000        ensure!(
1001            *array.data_type() == arrow::datatypes::DataType::UInt64,
1002            InvalidBatchSnafu {
1003                reason: "sequence array is not UInt64 type",
1004            }
1005        );
1006        // Safety: The cast must success as we have ensured it is uint64 type.
1007        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1008        self.sequences = Some(vector);
1009
1010        Ok(self)
1011    }
1012
1013    /// Try to set an array as op types.
1014    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1015        ensure!(
1016            *array.data_type() == arrow::datatypes::DataType::UInt8,
1017            InvalidBatchSnafu {
1018                reason: "sequence array is not UInt8 type",
1019            }
1020        );
1021        // Safety: The cast must success as we have ensured it is uint64 type.
1022        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1023        self.op_types = Some(vector);
1024
1025        Ok(self)
1026    }
1027
1028    /// Builds the [Batch].
1029    pub fn build(self) -> Result<Batch> {
1030        let timestamps = self.timestamps.context(InvalidBatchSnafu {
1031            reason: "missing timestamps",
1032        })?;
1033        let sequences = self.sequences.context(InvalidBatchSnafu {
1034            reason: "missing sequences",
1035        })?;
1036        let op_types = self.op_types.context(InvalidBatchSnafu {
1037            reason: "missing op_types",
1038        })?;
1039        // Our storage format ensure these columns are not nullable so
1040        // we use assert here.
1041        assert_eq!(0, timestamps.null_count());
1042        assert_eq!(0, sequences.null_count());
1043        assert_eq!(0, op_types.null_count());
1044
1045        let ts_len = timestamps.len();
1046        ensure!(
1047            sequences.len() == ts_len,
1048            InvalidBatchSnafu {
1049                reason: format!(
1050                    "sequence have different len {} != {}",
1051                    sequences.len(),
1052                    ts_len
1053                ),
1054            }
1055        );
1056        ensure!(
1057            op_types.len() == ts_len,
1058            InvalidBatchSnafu {
1059                reason: format!(
1060                    "op type have different len {} != {}",
1061                    op_types.len(),
1062                    ts_len
1063                ),
1064            }
1065        );
1066        for column in &self.fields {
1067            ensure!(
1068                column.data.len() == ts_len,
1069                InvalidBatchSnafu {
1070                    reason: format!(
1071                        "column {} has different len {} != {}",
1072                        column.column_id,
1073                        column.data.len(),
1074                        ts_len
1075                    ),
1076                }
1077            );
1078        }
1079
1080        Ok(Batch {
1081            primary_key: self.primary_key,
1082            pk_values: None,
1083            timestamps,
1084            sequences,
1085            op_types,
1086            fields: self.fields,
1087            fields_idx: None,
1088        })
1089    }
1090}
1091
1092impl From<Batch> for BatchBuilder {
1093    fn from(batch: Batch) -> Self {
1094        Self {
1095            primary_key: batch.primary_key,
1096            timestamps: Some(batch.timestamps),
1097            sequences: Some(batch.sequences),
1098            op_types: Some(batch.op_types),
1099            fields: batch.fields,
1100        }
1101    }
1102}
1103
1104/// Async [Batch] reader and iterator wrapper.
1105///
1106/// This is the data source for SST writers or internal readers.
1107pub enum Source {
1108    /// Source from a [BoxedBatchReader].
1109    Reader(BoxedBatchReader),
1110    /// Source from a [BoxedBatchIterator].
1111    Iter(BoxedBatchIterator),
1112    /// Source from a [BoxedBatchStream].
1113    Stream(BoxedBatchStream),
1114    /// Source from a [PruneReader].
1115    PruneReader(PruneReader),
1116}
1117
1118impl Source {
1119    /// Returns next [Batch] from this data source.
1120    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1121        match self {
1122            Source::Reader(reader) => reader.next_batch().await,
1123            Source::Iter(iter) => iter.next().transpose(),
1124            Source::Stream(stream) => stream.try_next().await,
1125            Source::PruneReader(reader) => reader.next_batch().await,
1126        }
1127    }
1128}
1129
1130/// Async [RecordBatch] reader and iterator wrapper for flat format.
1131pub enum FlatSource {
1132    /// Source from a [BoxedRecordBatchIterator].
1133    Iter(BoxedRecordBatchIterator),
1134    /// Source from a [BoxedRecordBatchStream].
1135    Stream(BoxedRecordBatchStream),
1136}
1137
1138impl FlatSource {
1139    /// Returns next [RecordBatch] from this data source.
1140    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1141        match self {
1142            FlatSource::Iter(iter) => iter.next().transpose(),
1143            FlatSource::Stream(stream) => stream.try_next().await,
1144        }
1145    }
1146}
1147
1148/// Async batch reader.
1149///
1150/// The reader must guarantee [Batch]es returned by it have the same schema.
1151#[async_trait]
1152pub trait BatchReader: Send {
1153    /// Fetch next [Batch].
1154    ///
1155    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1156    /// again won't return batch again.
1157    ///
1158    /// If `Err` is returned, caller should not call this method again, the implementor
1159    /// may or may not panic in such case.
1160    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1161}
1162
1163/// Pointer to [BatchReader].
1164pub type BoxedBatchReader = Box<dyn BatchReader>;
1165
1166/// Pointer to a stream that yields [Batch].
1167pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1168
1169/// Pointer to a stream that yields [RecordBatch].
1170pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1171
1172#[async_trait::async_trait]
1173impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1174    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1175        (**self).next_batch().await
1176    }
1177}
1178
1179/// Local metrics for scanners.
1180#[derive(Debug, Default)]
1181pub(crate) struct ScannerMetrics {
1182    /// Duration to scan data.
1183    scan_cost: Duration,
1184    /// Duration while waiting for `yield`.
1185    yield_cost: Duration,
1186    /// Number of batches returned.
1187    num_batches: usize,
1188    /// Number of rows returned.
1189    num_rows: usize,
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1195    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1196    use store_api::codec::PrimaryKeyEncoding;
1197    use store_api::storage::consts::ReservedColumnId;
1198
1199    use super::*;
1200    use crate::error::Error;
1201    use crate::test_util::new_batch_builder;
1202
1203    fn new_batch(
1204        timestamps: &[i64],
1205        sequences: &[u64],
1206        op_types: &[OpType],
1207        field: &[u64],
1208    ) -> Batch {
1209        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1210            .build()
1211            .unwrap()
1212    }
1213
1214    fn new_batch_with_u64_fields(
1215        timestamps: &[i64],
1216        sequences: &[u64],
1217        op_types: &[OpType],
1218        fields: &[(ColumnId, &[Option<u64>])],
1219    ) -> Batch {
1220        assert_eq!(timestamps.len(), sequences.len());
1221        assert_eq!(timestamps.len(), op_types.len());
1222        for (_, values) in fields {
1223            assert_eq!(timestamps.len(), values.len());
1224        }
1225
1226        let mut builder = BatchBuilder::new(b"test".to_vec());
1227        builder
1228            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1229                timestamps.iter().copied(),
1230            )))
1231            .unwrap()
1232            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1233                sequences.iter().copied(),
1234            )))
1235            .unwrap()
1236            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1237                op_types.iter().map(|v| *v as u8),
1238            )))
1239            .unwrap();
1240
1241        for (col_id, values) in fields {
1242            builder
1243                .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1244                .unwrap();
1245        }
1246
1247        builder.build().unwrap()
1248    }
1249
1250    fn new_batch_without_fields(
1251        timestamps: &[i64],
1252        sequences: &[u64],
1253        op_types: &[OpType],
1254    ) -> Batch {
1255        assert_eq!(timestamps.len(), sequences.len());
1256        assert_eq!(timestamps.len(), op_types.len());
1257
1258        let mut builder = BatchBuilder::new(b"test".to_vec());
1259        builder
1260            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1261                timestamps.iter().copied(),
1262            )))
1263            .unwrap()
1264            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1265                sequences.iter().copied(),
1266            )))
1267            .unwrap()
1268            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1269                op_types.iter().map(|v| *v as u8),
1270            )))
1271            .unwrap();
1272
1273        builder.build().unwrap()
1274    }
1275
1276    #[test]
1277    fn test_empty_batch() {
1278        let batch = Batch::empty();
1279        assert!(batch.is_empty());
1280        assert_eq!(None, batch.first_timestamp());
1281        assert_eq!(None, batch.last_timestamp());
1282        assert_eq!(None, batch.first_sequence());
1283        assert_eq!(None, batch.last_sequence());
1284        assert!(batch.timestamps_native().is_none());
1285    }
1286
1287    #[test]
1288    fn test_first_last_one() {
1289        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1290        assert_eq!(
1291            Timestamp::new_millisecond(1),
1292            batch.first_timestamp().unwrap()
1293        );
1294        assert_eq!(
1295            Timestamp::new_millisecond(1),
1296            batch.last_timestamp().unwrap()
1297        );
1298        assert_eq!(2, batch.first_sequence().unwrap());
1299        assert_eq!(2, batch.last_sequence().unwrap());
1300    }
1301
1302    #[test]
1303    fn test_first_last_multiple() {
1304        let batch = new_batch(
1305            &[1, 2, 3],
1306            &[11, 12, 13],
1307            &[OpType::Put, OpType::Put, OpType::Put],
1308            &[21, 22, 23],
1309        );
1310        assert_eq!(
1311            Timestamp::new_millisecond(1),
1312            batch.first_timestamp().unwrap()
1313        );
1314        assert_eq!(
1315            Timestamp::new_millisecond(3),
1316            batch.last_timestamp().unwrap()
1317        );
1318        assert_eq!(11, batch.first_sequence().unwrap());
1319        assert_eq!(13, batch.last_sequence().unwrap());
1320    }
1321
1322    #[test]
1323    fn test_slice() {
1324        let batch = new_batch(
1325            &[1, 2, 3, 4],
1326            &[11, 12, 13, 14],
1327            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1328            &[21, 22, 23, 24],
1329        );
1330        let batch = batch.slice(1, 2);
1331        let expect = new_batch(
1332            &[2, 3],
1333            &[12, 13],
1334            &[OpType::Delete, OpType::Put],
1335            &[22, 23],
1336        );
1337        assert_eq!(expect, batch);
1338    }
1339
1340    #[test]
1341    fn test_timestamps_native() {
1342        let batch = new_batch(
1343            &[1, 2, 3, 4],
1344            &[11, 12, 13, 14],
1345            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1346            &[21, 22, 23, 24],
1347        );
1348        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1349    }
1350
1351    #[test]
1352    fn test_concat_empty() {
1353        let err = Batch::concat(vec![]).unwrap_err();
1354        assert!(
1355            matches!(err, Error::InvalidBatch { .. }),
1356            "unexpected err: {err}"
1357        );
1358    }
1359
1360    #[test]
1361    fn test_concat_one() {
1362        let batch = new_batch(&[], &[], &[], &[]);
1363        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1364        assert_eq!(batch, actual);
1365
1366        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1367        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1368        assert_eq!(batch, actual);
1369    }
1370
1371    #[test]
1372    fn test_concat_multiple() {
1373        let batches = vec![
1374            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1375            new_batch(
1376                &[3, 4, 5],
1377                &[13, 14, 15],
1378                &[OpType::Put, OpType::Delete, OpType::Put],
1379                &[23, 24, 25],
1380            ),
1381            new_batch(&[], &[], &[], &[]),
1382            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1383        ];
1384        let batch = Batch::concat(batches).unwrap();
1385        let expect = new_batch(
1386            &[1, 2, 3, 4, 5, 6],
1387            &[11, 12, 13, 14, 15, 16],
1388            &[
1389                OpType::Put,
1390                OpType::Put,
1391                OpType::Put,
1392                OpType::Delete,
1393                OpType::Put,
1394                OpType::Put,
1395            ],
1396            &[21, 22, 23, 24, 25, 26],
1397        );
1398        assert_eq!(expect, batch);
1399    }
1400
1401    #[test]
1402    fn test_concat_different() {
1403        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1404        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1405        batch2.primary_key = b"hello".to_vec();
1406        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1407        assert!(
1408            matches!(err, Error::InvalidBatch { .. }),
1409            "unexpected err: {err}"
1410        );
1411    }
1412
1413    #[test]
1414    fn test_concat_different_fields() {
1415        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1416        let fields = vec![
1417            batch1.fields()[0].clone(),
1418            BatchColumn {
1419                column_id: 2,
1420                data: Arc::new(UInt64Vector::from_slice([2])),
1421            },
1422        ];
1423        // Batch 2 has more fields.
1424        let batch2 = batch1.clone().with_fields(fields).unwrap();
1425        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1426        assert!(
1427            matches!(err, Error::InvalidBatch { .. }),
1428            "unexpected err: {err}"
1429        );
1430
1431        // Batch 2 has different field.
1432        let fields = vec![BatchColumn {
1433            column_id: 2,
1434            data: Arc::new(UInt64Vector::from_slice([2])),
1435        }];
1436        let batch2 = batch1.clone().with_fields(fields).unwrap();
1437        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1438        assert!(
1439            matches!(err, Error::InvalidBatch { .. }),
1440            "unexpected err: {err}"
1441        );
1442    }
1443
1444    #[test]
1445    fn test_filter_deleted_empty() {
1446        let mut batch = new_batch(&[], &[], &[], &[]);
1447        batch.filter_deleted().unwrap();
1448        assert!(batch.is_empty());
1449    }
1450
1451    #[test]
1452    fn test_filter_deleted() {
1453        let mut batch = new_batch(
1454            &[1, 2, 3, 4],
1455            &[11, 12, 13, 14],
1456            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1457            &[21, 22, 23, 24],
1458        );
1459        batch.filter_deleted().unwrap();
1460        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1461        assert_eq!(expect, batch);
1462
1463        let mut batch = new_batch(
1464            &[1, 2, 3, 4],
1465            &[11, 12, 13, 14],
1466            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1467            &[21, 22, 23, 24],
1468        );
1469        let expect = batch.clone();
1470        batch.filter_deleted().unwrap();
1471        assert_eq!(expect, batch);
1472    }
1473
1474    #[test]
1475    fn test_filter_by_sequence() {
1476        // Filters put only.
1477        let mut batch = new_batch(
1478            &[1, 2, 3, 4],
1479            &[11, 12, 13, 14],
1480            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1481            &[21, 22, 23, 24],
1482        );
1483        batch
1484            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1485            .unwrap();
1486        let expect = new_batch(
1487            &[1, 2, 3],
1488            &[11, 12, 13],
1489            &[OpType::Put, OpType::Put, OpType::Put],
1490            &[21, 22, 23],
1491        );
1492        assert_eq!(expect, batch);
1493
1494        // Filters to empty.
1495        let mut batch = new_batch(
1496            &[1, 2, 3, 4],
1497            &[11, 12, 13, 14],
1498            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1499            &[21, 22, 23, 24],
1500        );
1501
1502        batch
1503            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1504            .unwrap();
1505        assert!(batch.is_empty());
1506
1507        // None filter.
1508        let mut batch = new_batch(
1509            &[1, 2, 3, 4],
1510            &[11, 12, 13, 14],
1511            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1512            &[21, 22, 23, 24],
1513        );
1514        let expect = batch.clone();
1515        batch.filter_by_sequence(None).unwrap();
1516        assert_eq!(expect, batch);
1517
1518        // Filter a empty batch
1519        let mut batch = new_batch(&[], &[], &[], &[]);
1520        batch
1521            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1522            .unwrap();
1523        assert!(batch.is_empty());
1524
1525        // Filter a empty batch with None
1526        let mut batch = new_batch(&[], &[], &[], &[]);
1527        batch.filter_by_sequence(None).unwrap();
1528        assert!(batch.is_empty());
1529
1530        // Test From variant - exclusive lower bound
1531        let mut batch = new_batch(
1532            &[1, 2, 3, 4],
1533            &[11, 12, 13, 14],
1534            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1535            &[21, 22, 23, 24],
1536        );
1537        batch
1538            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1539            .unwrap();
1540        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1541        assert_eq!(expect, batch);
1542
1543        // Test From variant with no matches
1544        let mut batch = new_batch(
1545            &[1, 2, 3, 4],
1546            &[11, 12, 13, 14],
1547            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1548            &[21, 22, 23, 24],
1549        );
1550        batch
1551            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1552            .unwrap();
1553        assert!(batch.is_empty());
1554
1555        // Test Range variant - exclusive lower bound, inclusive upper bound
1556        let mut batch = new_batch(
1557            &[1, 2, 3, 4, 5],
1558            &[11, 12, 13, 14, 15],
1559            &[
1560                OpType::Put,
1561                OpType::Put,
1562                OpType::Put,
1563                OpType::Put,
1564                OpType::Put,
1565            ],
1566            &[21, 22, 23, 24, 25],
1567        );
1568        batch
1569            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1570            .unwrap();
1571        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1572        assert_eq!(expect, batch);
1573
1574        // Test Range variant with mixed operations
1575        let mut batch = new_batch(
1576            &[1, 2, 3, 4, 5],
1577            &[11, 12, 13, 14, 15],
1578            &[
1579                OpType::Put,
1580                OpType::Delete,
1581                OpType::Put,
1582                OpType::Delete,
1583                OpType::Put,
1584            ],
1585            &[21, 22, 23, 24, 25],
1586        );
1587        batch
1588            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1589            .unwrap();
1590        let expect = new_batch(
1591            &[2, 3],
1592            &[12, 13],
1593            &[OpType::Delete, OpType::Put],
1594            &[22, 23],
1595        );
1596        assert_eq!(expect, batch);
1597
1598        // Test Range variant with no matches
1599        let mut batch = new_batch(
1600            &[1, 2, 3, 4],
1601            &[11, 12, 13, 14],
1602            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1603            &[21, 22, 23, 24],
1604        );
1605        batch
1606            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1607            .unwrap();
1608        assert!(batch.is_empty());
1609    }
1610
1611    #[test]
1612    fn test_merge_last_non_null_no_dup() {
1613        let mut batch = new_batch_with_u64_fields(
1614            &[1, 2],
1615            &[2, 1],
1616            &[OpType::Put, OpType::Put],
1617            &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1618        );
1619        let expect = batch.clone();
1620        batch.merge_last_non_null().unwrap();
1621        assert_eq!(expect, batch);
1622    }
1623
1624    #[test]
1625    fn test_merge_last_non_null_fill_null_fields() {
1626        // Rows are already sorted by timestamp asc and sequence desc.
1627        let mut batch = new_batch_with_u64_fields(
1628            &[1, 1, 1],
1629            &[3, 2, 1],
1630            &[OpType::Put, OpType::Put, OpType::Put],
1631            &[
1632                (1, &[None, Some(10), Some(11)]),
1633                (2, &[Some(100), Some(200), Some(300)]),
1634            ],
1635        );
1636        batch.merge_last_non_null().unwrap();
1637
1638        // Field 1 is filled from the first older row (seq=2). Field 2 keeps the base value.
1639        // Filled fields must not be overwritten by even older duplicates.
1640        let expect = new_batch_with_u64_fields(
1641            &[1],
1642            &[3],
1643            &[OpType::Put],
1644            &[(1, &[Some(10)]), (2, &[Some(100)])],
1645        );
1646        assert_eq!(expect, batch);
1647    }
1648
1649    #[test]
1650    fn test_merge_last_non_null_stop_at_delete_row() {
1651        // A delete row in older duplicates should stop filling to avoid resurrecting values before
1652        // deletion.
1653        let mut batch = new_batch_with_u64_fields(
1654            &[1, 1, 1],
1655            &[3, 2, 1],
1656            &[OpType::Put, OpType::Delete, OpType::Put],
1657            &[
1658                (1, &[None, Some(10), Some(11)]),
1659                (2, &[Some(100), Some(200), Some(300)]),
1660            ],
1661        );
1662        batch.merge_last_non_null().unwrap();
1663
1664        let expect = new_batch_with_u64_fields(
1665            &[1],
1666            &[3],
1667            &[OpType::Put],
1668            &[(1, &[None]), (2, &[Some(100)])],
1669        );
1670        assert_eq!(expect, batch);
1671    }
1672
1673    #[test]
1674    fn test_merge_last_non_null_base_delete_no_merge() {
1675        let mut batch = new_batch_with_u64_fields(
1676            &[1, 1],
1677            &[3, 2],
1678            &[OpType::Delete, OpType::Put],
1679            &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1680        );
1681        batch.merge_last_non_null().unwrap();
1682
1683        // Base row is delete, keep it as is and don't merge fields from older rows.
1684        let expect =
1685            new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1686        assert_eq!(expect, batch);
1687    }
1688
1689    #[test]
1690    fn test_merge_last_non_null_multiple_timestamp_groups() {
1691        let mut batch = new_batch_with_u64_fields(
1692            &[1, 1, 2, 3, 3],
1693            &[5, 4, 3, 2, 1],
1694            &[
1695                OpType::Put,
1696                OpType::Put,
1697                OpType::Put,
1698                OpType::Put,
1699                OpType::Put,
1700            ],
1701            &[
1702                (1, &[None, Some(10), Some(20), None, Some(30)]),
1703                (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1704            ],
1705        );
1706        batch.merge_last_non_null().unwrap();
1707
1708        let expect = new_batch_with_u64_fields(
1709            &[1, 2, 3],
1710            &[5, 3, 2],
1711            &[OpType::Put, OpType::Put, OpType::Put],
1712            &[
1713                (1, &[Some(10), Some(20), Some(30)]),
1714                (2, &[Some(100), Some(120), Some(130)]),
1715            ],
1716        );
1717        assert_eq!(expect, batch);
1718    }
1719
1720    #[test]
1721    fn test_merge_last_non_null_no_fields() {
1722        let mut batch = new_batch_without_fields(
1723            &[1, 1, 2],
1724            &[3, 2, 1],
1725            &[OpType::Put, OpType::Put, OpType::Put],
1726        );
1727        batch.merge_last_non_null().unwrap();
1728
1729        let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1730        assert_eq!(expect, batch);
1731    }
1732
1733    #[test]
1734    fn test_filter() {
1735        // Filters put only.
1736        let mut batch = new_batch(
1737            &[1, 2, 3, 4],
1738            &[11, 12, 13, 14],
1739            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1740            &[21, 22, 23, 24],
1741        );
1742        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1743        batch.filter(&predicate).unwrap();
1744        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1745        assert_eq!(expect, batch);
1746
1747        // Filters deletion.
1748        let mut batch = new_batch(
1749            &[1, 2, 3, 4],
1750            &[11, 12, 13, 14],
1751            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1752            &[21, 22, 23, 24],
1753        );
1754        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1755        batch.filter(&predicate).unwrap();
1756        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1757        assert_eq!(expect, batch);
1758
1759        // Filters to empty.
1760        let predicate = BooleanVector::from_vec(vec![false, false]);
1761        batch.filter(&predicate).unwrap();
1762        assert!(batch.is_empty());
1763    }
1764
1765    #[test]
1766    fn test_sort_and_dedup() {
1767        let original = new_batch(
1768            &[2, 3, 1, 4, 5, 2],
1769            &[1, 2, 3, 4, 5, 6],
1770            &[
1771                OpType::Put,
1772                OpType::Put,
1773                OpType::Put,
1774                OpType::Put,
1775                OpType::Put,
1776                OpType::Put,
1777            ],
1778            &[21, 22, 23, 24, 25, 26],
1779        );
1780
1781        let mut batch = original.clone();
1782        batch.sort(true).unwrap();
1783        // It should only keep one timestamp 2.
1784        assert_eq!(
1785            new_batch(
1786                &[1, 2, 3, 4, 5],
1787                &[3, 6, 2, 4, 5],
1788                &[
1789                    OpType::Put,
1790                    OpType::Put,
1791                    OpType::Put,
1792                    OpType::Put,
1793                    OpType::Put,
1794                ],
1795                &[23, 26, 22, 24, 25],
1796            ),
1797            batch
1798        );
1799
1800        let mut batch = original.clone();
1801        batch.sort(false).unwrap();
1802
1803        // It should only keep one timestamp 2.
1804        assert_eq!(
1805            new_batch(
1806                &[1, 2, 2, 3, 4, 5],
1807                &[3, 6, 1, 2, 4, 5],
1808                &[
1809                    OpType::Put,
1810                    OpType::Put,
1811                    OpType::Put,
1812                    OpType::Put,
1813                    OpType::Put,
1814                    OpType::Put,
1815                ],
1816                &[23, 26, 21, 22, 24, 25],
1817            ),
1818            batch
1819        );
1820
1821        let original = new_batch(
1822            &[2, 2, 1],
1823            &[1, 6, 1],
1824            &[OpType::Delete, OpType::Put, OpType::Put],
1825            &[21, 22, 23],
1826        );
1827
1828        let mut batch = original.clone();
1829        batch.sort(true).unwrap();
1830        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1831        assert_eq!(expect, batch);
1832
1833        let mut batch = original.clone();
1834        batch.sort(false).unwrap();
1835        let expect = new_batch(
1836            &[1, 2, 2],
1837            &[1, 6, 1],
1838            &[OpType::Put, OpType::Put, OpType::Delete],
1839            &[23, 22, 21],
1840        );
1841        assert_eq!(expect, batch);
1842    }
1843
1844    #[test]
1845    fn test_get_value() {
1846        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1847
1848        for encoding in encodings {
1849            let codec = build_primary_key_codec_with_fields(
1850                encoding,
1851                [
1852                    (
1853                        ReservedColumnId::table_id(),
1854                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1855                    ),
1856                    (
1857                        ReservedColumnId::tsid(),
1858                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1859                    ),
1860                    (
1861                        100,
1862                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1863                    ),
1864                    (
1865                        200,
1866                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1867                    ),
1868                ]
1869                .into_iter(),
1870            );
1871
1872            let values = [
1873                Value::UInt32(1000),
1874                Value::UInt64(2000),
1875                Value::String("abcdefgh".into()),
1876                Value::String("zyxwvu".into()),
1877            ];
1878            let mut buf = vec![];
1879            codec
1880                .encode_values(
1881                    &[
1882                        (ReservedColumnId::table_id(), values[0].clone()),
1883                        (ReservedColumnId::tsid(), values[1].clone()),
1884                        (100, values[2].clone()),
1885                        (200, values[3].clone()),
1886                    ],
1887                    &mut buf,
1888                )
1889                .unwrap();
1890
1891            let field_col_id = 2;
1892            let mut batch = new_batch_builder(
1893                &buf,
1894                &[1, 2, 3],
1895                &[1, 1, 1],
1896                &[OpType::Put, OpType::Put, OpType::Put],
1897                field_col_id,
1898                &[42, 43, 44],
1899            )
1900            .build()
1901            .unwrap();
1902
1903            let v = batch
1904                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1905                .unwrap()
1906                .unwrap();
1907            assert_eq!(values[0], *v);
1908
1909            let v = batch
1910                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1911                .unwrap()
1912                .unwrap();
1913            assert_eq!(values[1], *v);
1914
1915            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1916            assert_eq!(values[2], *v);
1917
1918            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1919            assert_eq!(values[3], *v);
1920
1921            let v = batch.field_col_value(field_col_id).unwrap();
1922            assert_eq!(v.data.get(0), Value::UInt64(42));
1923            assert_eq!(v.data.get(1), Value::UInt64(43));
1924            assert_eq!(v.data.get(2), Value::UInt64(44));
1925        }
1926    }
1927}