mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
56    VectorRef,
57};
58use futures::TryStreamExt;
59use futures::stream::BoxStream;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{OptionExt, ResultExt, ensure};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
64
65use crate::error::{
66    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67    Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72/// Storage internal representation of a batch of rows for a primary key (time series).
73///
74/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
75/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
76#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78    /// Primary key encoded in a comparable form.
79    primary_key: Vec<u8>,
80    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
81    pk_values: Option<CompositeValues>,
82    /// Timestamps of rows, should be sorted and not null.
83    timestamps: VectorRef,
84    /// Sequences of rows
85    ///
86    /// UInt64 type, not null.
87    sequences: Arc<UInt64Vector>,
88    /// Op types of rows
89    ///
90    /// UInt8 type, not null.
91    op_types: Arc<UInt8Vector>,
92    /// Fields organized in columnar format.
93    fields: Vec<BatchColumn>,
94    /// Cache for field index lookup.
95    fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99    /// Creates a new batch.
100    pub fn new(
101        primary_key: Vec<u8>,
102        timestamps: VectorRef,
103        sequences: Arc<UInt64Vector>,
104        op_types: Arc<UInt8Vector>,
105        fields: Vec<BatchColumn>,
106    ) -> Result<Batch> {
107        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108            .with_fields(fields)
109            .build()
110    }
111
112    /// Tries to set fields for the batch.
113    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114        Batch::new(
115            self.primary_key,
116            self.timestamps,
117            self.sequences,
118            self.op_types,
119            fields,
120        )
121    }
122
123    /// Returns primary key of the batch.
124    pub fn primary_key(&self) -> &[u8] {
125        &self.primary_key
126    }
127
128    /// Returns possibly decoded primary-key values.
129    pub fn pk_values(&self) -> Option<&CompositeValues> {
130        self.pk_values.as_ref()
131    }
132
133    /// Sets possibly decoded primary-key values.
134    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135        self.pk_values = Some(pk_values);
136    }
137
138    /// Removes possibly decoded primary-key values. For testing only.
139    #[cfg(any(test, feature = "test"))]
140    pub fn remove_pk_values(&mut self) {
141        self.pk_values = None;
142    }
143
144    /// Returns fields in the batch.
145    pub fn fields(&self) -> &[BatchColumn] {
146        &self.fields
147    }
148
149    /// Returns timestamps of the batch.
150    pub fn timestamps(&self) -> &VectorRef {
151        &self.timestamps
152    }
153
154    /// Returns sequences of the batch.
155    pub fn sequences(&self) -> &Arc<UInt64Vector> {
156        &self.sequences
157    }
158
159    /// Returns op types of the batch.
160    pub fn op_types(&self) -> &Arc<UInt8Vector> {
161        &self.op_types
162    }
163
164    /// Returns the number of rows in the batch.
165    pub fn num_rows(&self) -> usize {
166        // All vectors have the same length. We use the length of sequences vector
167        // since it has static type.
168        self.sequences.len()
169    }
170
171    /// Create an empty [`Batch`].
172    pub(crate) fn empty() -> Self {
173        Self {
174            primary_key: vec![],
175            pk_values: None,
176            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179            fields: vec![],
180            fields_idx: None,
181        }
182    }
183
184    /// Returns true if the number of rows in the batch is 0.
185    pub fn is_empty(&self) -> bool {
186        self.num_rows() == 0
187    }
188
189    /// Returns the first timestamp in the batch or `None` if the batch is empty.
190    pub fn first_timestamp(&self) -> Option<Timestamp> {
191        if self.timestamps.is_empty() {
192            return None;
193        }
194
195        Some(self.get_timestamp(0))
196    }
197
198    /// Returns the last timestamp in the batch or `None` if the batch is empty.
199    pub fn last_timestamp(&self) -> Option<Timestamp> {
200        if self.timestamps.is_empty() {
201            return None;
202        }
203
204        Some(self.get_timestamp(self.timestamps.len() - 1))
205    }
206
207    /// Returns the first sequence in the batch or `None` if the batch is empty.
208    pub fn first_sequence(&self) -> Option<SequenceNumber> {
209        if self.sequences.is_empty() {
210            return None;
211        }
212
213        Some(self.get_sequence(0))
214    }
215
216    /// Returns the last sequence in the batch or `None` if the batch is empty.
217    pub fn last_sequence(&self) -> Option<SequenceNumber> {
218        if self.sequences.is_empty() {
219            return None;
220        }
221
222        Some(self.get_sequence(self.sequences.len() - 1))
223    }
224
225    /// Replaces the primary key of the batch.
226    ///
227    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
228    /// Be sure to update that field as well.
229    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230        self.primary_key = primary_key;
231    }
232
233    /// Slice the batch, returning a new batch.
234    ///
235    /// # Panics
236    /// Panics if `offset + length > self.num_rows()`.
237    pub fn slice(&self, offset: usize, length: usize) -> Batch {
238        let fields = self
239            .fields
240            .iter()
241            .map(|column| BatchColumn {
242                column_id: column.column_id,
243                data: column.data.slice(offset, length),
244            })
245            .collect();
246        // We skip using the builder to avoid validating the batch again.
247        Batch {
248            // Now we need to clone the primary key. We could try `Bytes` if
249            // this becomes a bottleneck.
250            primary_key: self.primary_key.clone(),
251            pk_values: self.pk_values.clone(),
252            timestamps: self.timestamps.slice(offset, length),
253            sequences: Arc::new(self.sequences.get_slice(offset, length)),
254            op_types: Arc::new(self.op_types.get_slice(offset, length)),
255            fields,
256            fields_idx: self.fields_idx.clone(),
257        }
258    }
259
260    /// Takes `batches` and concat them into one batch.
261    ///
262    /// All `batches` must have the same primary key.
263    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264        ensure!(
265            !batches.is_empty(),
266            InvalidBatchSnafu {
267                reason: "empty batches",
268            }
269        );
270        if batches.len() == 1 {
271            // Now we own the `batches` so we could pop it directly.
272            return Ok(batches.pop().unwrap());
273        }
274
275        let primary_key = std::mem::take(&mut batches[0].primary_key);
276        let first = &batches[0];
277        // We took the primary key from the first batch so we don't use `first.primary_key()`.
278        ensure!(
279            batches
280                .iter()
281                .skip(1)
282                .all(|b| b.primary_key() == primary_key),
283            InvalidBatchSnafu {
284                reason: "batches have different primary key",
285            }
286        );
287        for b in batches.iter().skip(1) {
288            ensure!(
289                b.fields.len() == first.fields.len(),
290                InvalidBatchSnafu {
291                    reason: "batches have different field num",
292                }
293            );
294            for (l, r) in b.fields.iter().zip(&first.fields) {
295                ensure!(
296                    l.column_id == r.column_id,
297                    InvalidBatchSnafu {
298                        reason: "batches have different fields",
299                    }
300                );
301            }
302        }
303
304        // We take the primary key from the first batch.
305        let mut builder = BatchBuilder::new(primary_key);
306        // Concat timestamps, sequences, op_types, fields.
307        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308        builder.timestamps_array(array)?;
309        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310        builder.sequences_array(array)?;
311        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312        builder.op_types_array(array)?;
313        for (i, batch_column) in first.fields.iter().enumerate() {
314            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315            builder.push_field_array(batch_column.column_id, array)?;
316        }
317
318        builder.build()
319    }
320
321    /// Removes rows whose op type is delete.
322    pub fn filter_deleted(&mut self) -> Result<()> {
323        // Safety: op type column is not null.
324        let array = self.op_types.as_arrow();
325        // Find rows with non-delete op type.
326        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327        let predicate =
328            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329        self.filter(&BooleanVector::from(predicate))
330    }
331
332    // Applies the `predicate` to the batch.
333    // Safety: We know the array type so we unwrap on casting.
334    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335        self.timestamps = self
336            .timestamps
337            .filter(predicate)
338            .context(ComputeVectorSnafu)?;
339        self.sequences = Arc::new(
340            UInt64Vector::try_from_arrow_array(
341                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342                    .context(ComputeArrowSnafu)?,
343            )
344            .unwrap(),
345        );
346        self.op_types = Arc::new(
347            UInt8Vector::try_from_arrow_array(
348                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349                    .context(ComputeArrowSnafu)?,
350            )
351            .unwrap(),
352        );
353        for batch_column in &mut self.fields {
354            batch_column.data = batch_column
355                .data
356                .filter(predicate)
357                .context(ComputeVectorSnafu)?;
358        }
359
360        Ok(())
361    }
362
363    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
364    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
365        let seq_range = match sequence {
366            None => return Ok(()),
367            Some(seq_range) => {
368                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
369                else {
370                    return Ok(());
371                };
372                let is_subset = match seq_range {
373                    SequenceRange::Gt { min } => min < first,
374                    SequenceRange::LtEq { max } => max >= last,
375                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
376                };
377                if is_subset {
378                    return Ok(());
379                }
380                seq_range
381            }
382        };
383
384        let seqs = self.sequences.as_arrow();
385        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
386
387        let predicate = BooleanVector::from(predicate);
388        self.filter(&predicate)?;
389
390        Ok(())
391    }
392
393    /// Sorts rows in the batch. If `dedup` is true, it also removes
394    /// duplicated rows according to primary keys.
395    ///
396    /// It orders rows by timestamp, sequence desc and only keep the latest
397    /// row for the same timestamp. It doesn't consider op type as sequence
398    /// should already provide uniqueness for a row.
399    pub fn sort(&mut self, dedup: bool) -> Result<()> {
400        // If building a converter each time is costly, we may allow passing a
401        // converter.
402        let converter = RowConverter::new(vec![
403            SortField::new(self.timestamps.data_type().as_arrow_type()),
404            SortField::new_with_options(
405                self.sequences.data_type().as_arrow_type(),
406                SortOptions {
407                    descending: true,
408                    ..Default::default()
409                },
410            ),
411        ])
412        .context(ComputeArrowSnafu)?;
413        // Columns to sort.
414        let columns = [
415            self.timestamps.to_arrow_array(),
416            self.sequences.to_arrow_array(),
417        ];
418        let rows = converter.convert_columns(&columns).unwrap();
419        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
420
421        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
422        if !was_sorted {
423            to_sort.sort_unstable_by_key(|x| x.1);
424        }
425
426        let num_rows = to_sort.len();
427        if dedup {
428            // Dedup by timestamps.
429            to_sort.dedup_by(|left, right| {
430                debug_assert_eq!(18, left.1.as_ref().len());
431                debug_assert_eq!(18, right.1.as_ref().len());
432                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
433                // We only compare the timestamp part and ignore sequence.
434                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
435            });
436        }
437        let no_dedup = to_sort.len() == num_rows;
438
439        if was_sorted && no_dedup {
440            return Ok(());
441        }
442        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
443        self.take_in_place(&indices)
444    }
445
446    /// Merges duplicated timestamps in the batch by keeping the latest non-null field values.
447    ///
448    /// Rows must already be sorted by timestamp (ascending) and sequence (descending).
449    ///
450    /// This method deduplicates rows with the same timestamp (keeping the first row in each
451    /// timestamp range as the base row) and fills null fields from subsequent rows until all
452    /// fields are filled or a delete operation is encountered.
453    pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
454        let num_rows = self.num_rows();
455        if num_rows < 2 {
456            return Ok(());
457        }
458
459        let Some(timestamps) = self.timestamps_native() else {
460            return Ok(());
461        };
462
463        // Fast path: check if there are any duplicate timestamps.
464        let mut has_dup = false;
465        let mut group_count = 1;
466        for i in 1..num_rows {
467            has_dup |= timestamps[i] == timestamps[i - 1];
468            group_count += (timestamps[i] != timestamps[i - 1]) as usize;
469        }
470        if !has_dup {
471            return Ok(());
472        }
473
474        let num_fields = self.fields.len();
475        let op_types = self.op_types.as_arrow().values();
476
477        let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
478        let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
479            .map(|_| Vec::with_capacity(group_count))
480            .collect();
481
482        let mut start = 0;
483        while start < num_rows {
484            let ts = timestamps[start];
485            let mut end = start + 1;
486            while end < num_rows && timestamps[end] == ts {
487                end += 1;
488            }
489
490            let group_pos = base_indices.len();
491            base_indices.push(start as u32);
492
493            if num_fields > 0 {
494                // Default: take the base row for all fields.
495                for idx in &mut field_indices {
496                    idx.push(start as u32);
497                }
498
499                let base_deleted = op_types[start] == OpType::Delete as u8;
500                if !base_deleted {
501                    // Track fields that are null in the base row and try to fill them from older
502                    // rows in the same timestamp range.
503                    let mut missing_fields = Vec::new();
504                    for (field_idx, col) in self.fields.iter().enumerate() {
505                        if col.data.is_null(start) {
506                            missing_fields.push(field_idx);
507                        }
508                    }
509
510                    if !missing_fields.is_empty() {
511                        for row_idx in (start + 1)..end {
512                            if op_types[row_idx] == OpType::Delete as u8 {
513                                break;
514                            }
515
516                            missing_fields.retain(|&field_idx| {
517                                if self.fields[field_idx].data.is_null(row_idx) {
518                                    true
519                                } else {
520                                    field_indices[field_idx][group_pos] = row_idx as u32;
521                                    false
522                                }
523                            });
524
525                            if missing_fields.is_empty() {
526                                break;
527                            }
528                        }
529                    }
530                }
531            }
532
533            start = end;
534        }
535
536        let base_indices = UInt32Vector::from_vec(base_indices);
537        self.timestamps = self
538            .timestamps
539            .take(&base_indices)
540            .context(ComputeVectorSnafu)?;
541        let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
542            .context(ComputeArrowSnafu)?;
543        // Safety: We know the array and vector type.
544        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
545        let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
546            .context(ComputeArrowSnafu)?;
547        // Safety: We know the array and vector type.
548        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
549
550        for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
551            let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
552            batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
553        }
554
555        Ok(())
556    }
557
558    /// Returns the estimated memory size of the batch.
559    pub fn memory_size(&self) -> usize {
560        let mut size = std::mem::size_of::<Self>();
561        size += self.primary_key.len();
562        size += self.timestamps.memory_size();
563        size += self.sequences.memory_size();
564        size += self.op_types.memory_size();
565        for batch_column in &self.fields {
566            size += batch_column.data.memory_size();
567        }
568        size
569    }
570
571    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
572    pub(crate) fn projected_fields(
573        metadata: &RegionMetadata,
574        projection: &[ColumnId],
575    ) -> Vec<(ColumnId, ConcreteDataType)> {
576        let projected_ids: HashSet<_> = projection.iter().copied().collect();
577        metadata
578            .field_columns()
579            .filter_map(|column| {
580                if projected_ids.contains(&column.column_id) {
581                    Some((column.column_id, column.column_schema.data_type.clone()))
582                } else {
583                    None
584                }
585            })
586            .collect()
587    }
588
589    /// Returns timestamps in a native slice or `None` if the batch is empty.
590    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
591        if self.timestamps.is_empty() {
592            return None;
593        }
594
595        let values = match self.timestamps.data_type() {
596            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
597                .timestamps
598                .as_any()
599                .downcast_ref::<TimestampSecondVector>()
600                .unwrap()
601                .as_arrow()
602                .values(),
603            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
604                .timestamps
605                .as_any()
606                .downcast_ref::<TimestampMillisecondVector>()
607                .unwrap()
608                .as_arrow()
609                .values(),
610            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
611                .timestamps
612                .as_any()
613                .downcast_ref::<TimestampMicrosecondVector>()
614                .unwrap()
615                .as_arrow()
616                .values(),
617            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
618                .timestamps
619                .as_any()
620                .downcast_ref::<TimestampNanosecondVector>()
621                .unwrap()
622                .as_arrow()
623                .values(),
624            other => panic!("timestamps in a Batch has other type {:?}", other),
625        };
626
627        Some(values)
628    }
629
630    /// Takes the batch in place.
631    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
632        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
633        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
634            .context(ComputeArrowSnafu)?;
635        // Safety: we know the array and vector type.
636        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
637        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
638            .context(ComputeArrowSnafu)?;
639        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
640        for batch_column in &mut self.fields {
641            batch_column.data = batch_column
642                .data
643                .take(indices)
644                .context(ComputeVectorSnafu)?;
645        }
646
647        Ok(())
648    }
649
650    /// Gets a timestamp at given `index`.
651    ///
652    /// # Panics
653    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
654    fn get_timestamp(&self, index: usize) -> Timestamp {
655        match self.timestamps.get_ref(index) {
656            ValueRef::Timestamp(timestamp) => timestamp,
657
658            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
659            value => panic!("{:?} is not a timestamp", value),
660        }
661    }
662
663    /// Gets a sequence at given `index`.
664    ///
665    /// # Panics
666    /// Panics if `index` is out-of-bound or the sequence vector returns null.
667    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
668        // Safety: sequences is not null so it actually returns Some.
669        self.sequences.get_data(index).unwrap()
670    }
671
672    /// Checks the batch is monotonic by timestamps.
673    #[cfg(debug_assertions)]
674    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
675        use std::cmp::Ordering;
676        if self.timestamps_native().is_none() {
677            return Ok(());
678        }
679
680        let timestamps = self.timestamps_native().unwrap();
681        let sequences = self.sequences.as_arrow().values();
682        for (i, window) in timestamps.windows(2).enumerate() {
683            let current = window[0];
684            let next = window[1];
685            let current_sequence = sequences[i];
686            let next_sequence = sequences[i + 1];
687            match current.cmp(&next) {
688                Ordering::Less => {
689                    // The current timestamp is less than the next timestamp.
690                    continue;
691                }
692                Ordering::Equal => {
693                    // The current timestamp is equal to the next timestamp.
694                    if current_sequence < next_sequence {
695                        return Err(format!(
696                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
697                            current, next, current_sequence, next_sequence, i
698                        ));
699                    }
700                }
701                Ordering::Greater => {
702                    // The current timestamp is greater than the next timestamp.
703                    return Err(format!(
704                        "timestamps are not monotonic: {} > {}, index: {}",
705                        current, next, i
706                    ));
707                }
708            }
709        }
710
711        Ok(())
712    }
713
714    /// Returns Ok if the given batch is behind the current batch.
715    #[cfg(debug_assertions)]
716    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
717        // Checks the primary key
718        if self.primary_key() < other.primary_key() {
719            return Ok(());
720        }
721        if self.primary_key() > other.primary_key() {
722            return Err(format!(
723                "primary key is not monotonic: {:?} > {:?}",
724                self.primary_key(),
725                other.primary_key()
726            ));
727        }
728        // Checks the timestamp.
729        if self.last_timestamp() < other.first_timestamp() {
730            return Ok(());
731        }
732        if self.last_timestamp() > other.first_timestamp() {
733            return Err(format!(
734                "timestamps are not monotonic: {:?} > {:?}",
735                self.last_timestamp(),
736                other.first_timestamp()
737            ));
738        }
739        // Checks the sequence.
740        if self.last_sequence() >= other.first_sequence() {
741            return Ok(());
742        }
743        Err(format!(
744            "sequences are not monotonic: {:?} < {:?}",
745            self.last_sequence(),
746            other.first_sequence()
747        ))
748    }
749
750    /// Returns the value of the column in the primary key.
751    ///
752    /// Lazily decodes the primary key and caches the result.
753    pub fn pk_col_value(
754        &mut self,
755        codec: &dyn PrimaryKeyCodec,
756        col_idx_in_pk: usize,
757        column_id: ColumnId,
758    ) -> Result<Option<&Value>> {
759        if self.pk_values.is_none() {
760            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
761        }
762
763        let pk_values = self.pk_values.as_ref().unwrap();
764        Ok(match pk_values {
765            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
766            CompositeValues::Sparse(values) => values.get(&column_id),
767        })
768    }
769
770    /// Returns values of the field in the batch.
771    ///
772    /// Lazily caches the field index.
773    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
774        if self.fields_idx.is_none() {
775            self.fields_idx = Some(
776                self.fields
777                    .iter()
778                    .enumerate()
779                    .map(|(i, c)| (c.column_id, i))
780                    .collect(),
781            );
782        }
783
784        self.fields_idx
785            .as_ref()
786            .unwrap()
787            .get(&column_id)
788            .map(|&idx| &self.fields[idx])
789    }
790}
791
792/// A struct to check the batch is monotonic.
793#[cfg(debug_assertions)]
794#[derive(Default)]
795pub(crate) struct BatchChecker {
796    last_batch: Option<Batch>,
797    start: Option<Timestamp>,
798    end: Option<Timestamp>,
799}
800
801#[cfg(debug_assertions)]
802impl BatchChecker {
803    /// Attaches the given start timestamp to the checker.
804    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
805        self.start = start;
806        self
807    }
808
809    /// Attaches the given end timestamp to the checker.
810    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
811        self.end = end;
812        self
813    }
814
815    /// Returns true if the given batch is monotonic and behind
816    /// the last batch.
817    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
818        batch.check_monotonic()?;
819
820        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
821            && start > first
822        {
823            return Err(format!(
824                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
825                start, first
826            ));
827        }
828        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
829            && end <= last
830        {
831            return Err(format!(
832                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
833                end, last
834            ));
835        }
836
837        // Checks the batch is behind the last batch.
838        // Then Updates the last batch.
839        let res = self
840            .last_batch
841            .as_ref()
842            .map(|last| last.check_next_batch(batch))
843            .unwrap_or(Ok(()));
844        self.last_batch = Some(batch.clone());
845        res
846    }
847
848    /// Formats current batch and last batch for debug.
849    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
850        use std::fmt::Write;
851
852        let mut message = String::new();
853        if let Some(last) = &self.last_batch {
854            write!(
855                message,
856                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
857                last.primary_key(),
858                last.last_timestamp(),
859                last.last_sequence()
860            )
861            .unwrap();
862        }
863        write!(
864            message,
865            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
866            batch.primary_key(),
867            batch.timestamps(),
868            batch.sequences()
869        )
870        .unwrap();
871
872        message
873    }
874
875    /// Checks batches from the part range are monotonic. Otherwise, panics.
876    pub(crate) fn ensure_part_range_batch(
877        &mut self,
878        scanner: &str,
879        region_id: store_api::storage::RegionId,
880        partition: usize,
881        part_range: store_api::region_engine::PartitionRange,
882        batch: &Batch,
883    ) {
884        if let Err(e) = self.check_monotonic(batch) {
885            let err_msg = format!(
886                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
887                scanner, e, region_id, partition, part_range,
888            );
889            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
890            // Only print the number of row in the panic message.
891            panic!("{err_msg}, batch rows: {}", batch.num_rows());
892        }
893    }
894}
895
896/// Len of timestamp in arrow row format.
897const TIMESTAMP_KEY_LEN: usize = 9;
898
899/// Helper function to concat arrays from `iter`.
900fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
901    let arrays: Vec<_> = iter.collect();
902    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
903    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
904}
905
906/// A column in a [Batch].
907#[derive(Debug, PartialEq, Eq, Clone)]
908pub struct BatchColumn {
909    /// Id of the column.
910    pub column_id: ColumnId,
911    /// Data of the column.
912    pub data: VectorRef,
913}
914
915/// Builder to build [Batch].
916pub struct BatchBuilder {
917    primary_key: Vec<u8>,
918    timestamps: Option<VectorRef>,
919    sequences: Option<Arc<UInt64Vector>>,
920    op_types: Option<Arc<UInt8Vector>>,
921    fields: Vec<BatchColumn>,
922}
923
924impl BatchBuilder {
925    /// Creates a new [BatchBuilder] with primary key.
926    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
927        BatchBuilder {
928            primary_key,
929            timestamps: None,
930            sequences: None,
931            op_types: None,
932            fields: Vec::new(),
933        }
934    }
935
936    /// Creates a new [BatchBuilder] with all required columns.
937    pub fn with_required_columns(
938        primary_key: Vec<u8>,
939        timestamps: VectorRef,
940        sequences: Arc<UInt64Vector>,
941        op_types: Arc<UInt8Vector>,
942    ) -> BatchBuilder {
943        BatchBuilder {
944            primary_key,
945            timestamps: Some(timestamps),
946            sequences: Some(sequences),
947            op_types: Some(op_types),
948            fields: Vec::new(),
949        }
950    }
951
952    /// Set all field columns.
953    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
954        self.fields = fields;
955        self
956    }
957
958    /// Push a field column.
959    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
960        self.fields.push(column);
961        self
962    }
963
964    /// Push an array as a field.
965    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
966        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
967        self.fields.push(BatchColumn {
968            column_id,
969            data: vector,
970        });
971
972        Ok(self)
973    }
974
975    /// Try to set an array as timestamps.
976    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
977        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
978        ensure!(
979            vector.data_type().is_timestamp(),
980            InvalidBatchSnafu {
981                reason: format!("{:?} is not a timestamp type", vector.data_type()),
982            }
983        );
984
985        self.timestamps = Some(vector);
986        Ok(self)
987    }
988
989    /// Try to set an array as sequences.
990    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
991        ensure!(
992            *array.data_type() == arrow::datatypes::DataType::UInt64,
993            InvalidBatchSnafu {
994                reason: "sequence array is not UInt64 type",
995            }
996        );
997        // Safety: The cast must success as we have ensured it is uint64 type.
998        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
999        self.sequences = Some(vector);
1000
1001        Ok(self)
1002    }
1003
1004    /// Try to set an array as op types.
1005    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1006        ensure!(
1007            *array.data_type() == arrow::datatypes::DataType::UInt8,
1008            InvalidBatchSnafu {
1009                reason: "sequence array is not UInt8 type",
1010            }
1011        );
1012        // Safety: The cast must success as we have ensured it is uint64 type.
1013        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1014        self.op_types = Some(vector);
1015
1016        Ok(self)
1017    }
1018
1019    /// Builds the [Batch].
1020    pub fn build(self) -> Result<Batch> {
1021        let timestamps = self.timestamps.context(InvalidBatchSnafu {
1022            reason: "missing timestamps",
1023        })?;
1024        let sequences = self.sequences.context(InvalidBatchSnafu {
1025            reason: "missing sequences",
1026        })?;
1027        let op_types = self.op_types.context(InvalidBatchSnafu {
1028            reason: "missing op_types",
1029        })?;
1030        // Our storage format ensure these columns are not nullable so
1031        // we use assert here.
1032        assert_eq!(0, timestamps.null_count());
1033        assert_eq!(0, sequences.null_count());
1034        assert_eq!(0, op_types.null_count());
1035
1036        let ts_len = timestamps.len();
1037        ensure!(
1038            sequences.len() == ts_len,
1039            InvalidBatchSnafu {
1040                reason: format!(
1041                    "sequence have different len {} != {}",
1042                    sequences.len(),
1043                    ts_len
1044                ),
1045            }
1046        );
1047        ensure!(
1048            op_types.len() == ts_len,
1049            InvalidBatchSnafu {
1050                reason: format!(
1051                    "op type have different len {} != {}",
1052                    op_types.len(),
1053                    ts_len
1054                ),
1055            }
1056        );
1057        for column in &self.fields {
1058            ensure!(
1059                column.data.len() == ts_len,
1060                InvalidBatchSnafu {
1061                    reason: format!(
1062                        "column {} has different len {} != {}",
1063                        column.column_id,
1064                        column.data.len(),
1065                        ts_len
1066                    ),
1067                }
1068            );
1069        }
1070
1071        Ok(Batch {
1072            primary_key: self.primary_key,
1073            pk_values: None,
1074            timestamps,
1075            sequences,
1076            op_types,
1077            fields: self.fields,
1078            fields_idx: None,
1079        })
1080    }
1081}
1082
1083impl From<Batch> for BatchBuilder {
1084    fn from(batch: Batch) -> Self {
1085        Self {
1086            primary_key: batch.primary_key,
1087            timestamps: Some(batch.timestamps),
1088            sequences: Some(batch.sequences),
1089            op_types: Some(batch.op_types),
1090            fields: batch.fields,
1091        }
1092    }
1093}
1094
1095/// Async [Batch] reader and iterator wrapper.
1096///
1097/// This is the data source for SST writers or internal readers.
1098pub enum Source {
1099    /// Source from a [BoxedBatchReader].
1100    Reader(BoxedBatchReader),
1101    /// Source from a [BoxedBatchIterator].
1102    Iter(BoxedBatchIterator),
1103    /// Source from a [BoxedBatchStream].
1104    Stream(BoxedBatchStream),
1105    /// Source from a [PruneReader].
1106    PruneReader(PruneReader),
1107}
1108
1109impl Source {
1110    /// Returns next [Batch] from this data source.
1111    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1112        match self {
1113            Source::Reader(reader) => reader.next_batch().await,
1114            Source::Iter(iter) => iter.next().transpose(),
1115            Source::Stream(stream) => stream.try_next().await,
1116            Source::PruneReader(reader) => reader.next_batch().await,
1117        }
1118    }
1119}
1120
1121/// Async [RecordBatch] reader and iterator wrapper for flat format.
1122pub enum FlatSource {
1123    /// Source from a [BoxedRecordBatchIterator].
1124    Iter(BoxedRecordBatchIterator),
1125    /// Source from a [BoxedRecordBatchStream].
1126    Stream(BoxedRecordBatchStream),
1127}
1128
1129impl FlatSource {
1130    /// Returns next [RecordBatch] from this data source.
1131    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1132        match self {
1133            FlatSource::Iter(iter) => iter.next().transpose(),
1134            FlatSource::Stream(stream) => stream.try_next().await,
1135        }
1136    }
1137}
1138
1139/// Async batch reader.
1140///
1141/// The reader must guarantee [Batch]es returned by it have the same schema.
1142#[async_trait]
1143pub trait BatchReader: Send {
1144    /// Fetch next [Batch].
1145    ///
1146    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1147    /// again won't return batch again.
1148    ///
1149    /// If `Err` is returned, caller should not call this method again, the implementor
1150    /// may or may not panic in such case.
1151    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1152}
1153
1154/// Pointer to [BatchReader].
1155pub type BoxedBatchReader = Box<dyn BatchReader>;
1156
1157/// Pointer to a stream that yields [Batch].
1158pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1159
1160/// Pointer to a stream that yields [RecordBatch].
1161pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1162
1163#[async_trait::async_trait]
1164impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1165    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1166        (**self).next_batch().await
1167    }
1168}
1169
1170/// Local metrics for scanners.
1171#[derive(Debug, Default)]
1172pub(crate) struct ScannerMetrics {
1173    /// Duration to scan data.
1174    scan_cost: Duration,
1175    /// Duration while waiting for `yield`.
1176    yield_cost: Duration,
1177    /// Number of batches returned.
1178    num_batches: usize,
1179    /// Number of rows returned.
1180    num_rows: usize,
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1186    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1187    use store_api::codec::PrimaryKeyEncoding;
1188    use store_api::storage::consts::ReservedColumnId;
1189
1190    use super::*;
1191    use crate::error::Error;
1192    use crate::test_util::new_batch_builder;
1193
1194    fn new_batch(
1195        timestamps: &[i64],
1196        sequences: &[u64],
1197        op_types: &[OpType],
1198        field: &[u64],
1199    ) -> Batch {
1200        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1201            .build()
1202            .unwrap()
1203    }
1204
1205    fn new_batch_with_u64_fields(
1206        timestamps: &[i64],
1207        sequences: &[u64],
1208        op_types: &[OpType],
1209        fields: &[(ColumnId, &[Option<u64>])],
1210    ) -> Batch {
1211        assert_eq!(timestamps.len(), sequences.len());
1212        assert_eq!(timestamps.len(), op_types.len());
1213        for (_, values) in fields {
1214            assert_eq!(timestamps.len(), values.len());
1215        }
1216
1217        let mut builder = BatchBuilder::new(b"test".to_vec());
1218        builder
1219            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1220                timestamps.iter().copied(),
1221            )))
1222            .unwrap()
1223            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1224                sequences.iter().copied(),
1225            )))
1226            .unwrap()
1227            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1228                op_types.iter().map(|v| *v as u8),
1229            )))
1230            .unwrap();
1231
1232        for (col_id, values) in fields {
1233            builder
1234                .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1235                .unwrap();
1236        }
1237
1238        builder.build().unwrap()
1239    }
1240
1241    fn new_batch_without_fields(
1242        timestamps: &[i64],
1243        sequences: &[u64],
1244        op_types: &[OpType],
1245    ) -> Batch {
1246        assert_eq!(timestamps.len(), sequences.len());
1247        assert_eq!(timestamps.len(), op_types.len());
1248
1249        let mut builder = BatchBuilder::new(b"test".to_vec());
1250        builder
1251            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1252                timestamps.iter().copied(),
1253            )))
1254            .unwrap()
1255            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1256                sequences.iter().copied(),
1257            )))
1258            .unwrap()
1259            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1260                op_types.iter().map(|v| *v as u8),
1261            )))
1262            .unwrap();
1263
1264        builder.build().unwrap()
1265    }
1266
1267    #[test]
1268    fn test_empty_batch() {
1269        let batch = Batch::empty();
1270        assert!(batch.is_empty());
1271        assert_eq!(None, batch.first_timestamp());
1272        assert_eq!(None, batch.last_timestamp());
1273        assert_eq!(None, batch.first_sequence());
1274        assert_eq!(None, batch.last_sequence());
1275        assert!(batch.timestamps_native().is_none());
1276    }
1277
1278    #[test]
1279    fn test_first_last_one() {
1280        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1281        assert_eq!(
1282            Timestamp::new_millisecond(1),
1283            batch.first_timestamp().unwrap()
1284        );
1285        assert_eq!(
1286            Timestamp::new_millisecond(1),
1287            batch.last_timestamp().unwrap()
1288        );
1289        assert_eq!(2, batch.first_sequence().unwrap());
1290        assert_eq!(2, batch.last_sequence().unwrap());
1291    }
1292
1293    #[test]
1294    fn test_first_last_multiple() {
1295        let batch = new_batch(
1296            &[1, 2, 3],
1297            &[11, 12, 13],
1298            &[OpType::Put, OpType::Put, OpType::Put],
1299            &[21, 22, 23],
1300        );
1301        assert_eq!(
1302            Timestamp::new_millisecond(1),
1303            batch.first_timestamp().unwrap()
1304        );
1305        assert_eq!(
1306            Timestamp::new_millisecond(3),
1307            batch.last_timestamp().unwrap()
1308        );
1309        assert_eq!(11, batch.first_sequence().unwrap());
1310        assert_eq!(13, batch.last_sequence().unwrap());
1311    }
1312
1313    #[test]
1314    fn test_slice() {
1315        let batch = new_batch(
1316            &[1, 2, 3, 4],
1317            &[11, 12, 13, 14],
1318            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1319            &[21, 22, 23, 24],
1320        );
1321        let batch = batch.slice(1, 2);
1322        let expect = new_batch(
1323            &[2, 3],
1324            &[12, 13],
1325            &[OpType::Delete, OpType::Put],
1326            &[22, 23],
1327        );
1328        assert_eq!(expect, batch);
1329    }
1330
1331    #[test]
1332    fn test_timestamps_native() {
1333        let batch = new_batch(
1334            &[1, 2, 3, 4],
1335            &[11, 12, 13, 14],
1336            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1337            &[21, 22, 23, 24],
1338        );
1339        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1340    }
1341
1342    #[test]
1343    fn test_concat_empty() {
1344        let err = Batch::concat(vec![]).unwrap_err();
1345        assert!(
1346            matches!(err, Error::InvalidBatch { .. }),
1347            "unexpected err: {err}"
1348        );
1349    }
1350
1351    #[test]
1352    fn test_concat_one() {
1353        let batch = new_batch(&[], &[], &[], &[]);
1354        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1355        assert_eq!(batch, actual);
1356
1357        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1358        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1359        assert_eq!(batch, actual);
1360    }
1361
1362    #[test]
1363    fn test_concat_multiple() {
1364        let batches = vec![
1365            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1366            new_batch(
1367                &[3, 4, 5],
1368                &[13, 14, 15],
1369                &[OpType::Put, OpType::Delete, OpType::Put],
1370                &[23, 24, 25],
1371            ),
1372            new_batch(&[], &[], &[], &[]),
1373            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1374        ];
1375        let batch = Batch::concat(batches).unwrap();
1376        let expect = new_batch(
1377            &[1, 2, 3, 4, 5, 6],
1378            &[11, 12, 13, 14, 15, 16],
1379            &[
1380                OpType::Put,
1381                OpType::Put,
1382                OpType::Put,
1383                OpType::Delete,
1384                OpType::Put,
1385                OpType::Put,
1386            ],
1387            &[21, 22, 23, 24, 25, 26],
1388        );
1389        assert_eq!(expect, batch);
1390    }
1391
1392    #[test]
1393    fn test_concat_different() {
1394        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1395        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1396        batch2.primary_key = b"hello".to_vec();
1397        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1398        assert!(
1399            matches!(err, Error::InvalidBatch { .. }),
1400            "unexpected err: {err}"
1401        );
1402    }
1403
1404    #[test]
1405    fn test_concat_different_fields() {
1406        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1407        let fields = vec![
1408            batch1.fields()[0].clone(),
1409            BatchColumn {
1410                column_id: 2,
1411                data: Arc::new(UInt64Vector::from_slice([2])),
1412            },
1413        ];
1414        // Batch 2 has more fields.
1415        let batch2 = batch1.clone().with_fields(fields).unwrap();
1416        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1417        assert!(
1418            matches!(err, Error::InvalidBatch { .. }),
1419            "unexpected err: {err}"
1420        );
1421
1422        // Batch 2 has different field.
1423        let fields = vec![BatchColumn {
1424            column_id: 2,
1425            data: Arc::new(UInt64Vector::from_slice([2])),
1426        }];
1427        let batch2 = batch1.clone().with_fields(fields).unwrap();
1428        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1429        assert!(
1430            matches!(err, Error::InvalidBatch { .. }),
1431            "unexpected err: {err}"
1432        );
1433    }
1434
1435    #[test]
1436    fn test_filter_deleted_empty() {
1437        let mut batch = new_batch(&[], &[], &[], &[]);
1438        batch.filter_deleted().unwrap();
1439        assert!(batch.is_empty());
1440    }
1441
1442    #[test]
1443    fn test_filter_deleted() {
1444        let mut batch = new_batch(
1445            &[1, 2, 3, 4],
1446            &[11, 12, 13, 14],
1447            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1448            &[21, 22, 23, 24],
1449        );
1450        batch.filter_deleted().unwrap();
1451        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1452        assert_eq!(expect, batch);
1453
1454        let mut batch = new_batch(
1455            &[1, 2, 3, 4],
1456            &[11, 12, 13, 14],
1457            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1458            &[21, 22, 23, 24],
1459        );
1460        let expect = batch.clone();
1461        batch.filter_deleted().unwrap();
1462        assert_eq!(expect, batch);
1463    }
1464
1465    #[test]
1466    fn test_filter_by_sequence() {
1467        // Filters put only.
1468        let mut batch = new_batch(
1469            &[1, 2, 3, 4],
1470            &[11, 12, 13, 14],
1471            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1472            &[21, 22, 23, 24],
1473        );
1474        batch
1475            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1476            .unwrap();
1477        let expect = new_batch(
1478            &[1, 2, 3],
1479            &[11, 12, 13],
1480            &[OpType::Put, OpType::Put, OpType::Put],
1481            &[21, 22, 23],
1482        );
1483        assert_eq!(expect, batch);
1484
1485        // Filters to empty.
1486        let mut batch = new_batch(
1487            &[1, 2, 3, 4],
1488            &[11, 12, 13, 14],
1489            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1490            &[21, 22, 23, 24],
1491        );
1492
1493        batch
1494            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1495            .unwrap();
1496        assert!(batch.is_empty());
1497
1498        // None filter.
1499        let mut batch = new_batch(
1500            &[1, 2, 3, 4],
1501            &[11, 12, 13, 14],
1502            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1503            &[21, 22, 23, 24],
1504        );
1505        let expect = batch.clone();
1506        batch.filter_by_sequence(None).unwrap();
1507        assert_eq!(expect, batch);
1508
1509        // Filter a empty batch
1510        let mut batch = new_batch(&[], &[], &[], &[]);
1511        batch
1512            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1513            .unwrap();
1514        assert!(batch.is_empty());
1515
1516        // Filter a empty batch with None
1517        let mut batch = new_batch(&[], &[], &[], &[]);
1518        batch.filter_by_sequence(None).unwrap();
1519        assert!(batch.is_empty());
1520
1521        // Test From variant - exclusive lower bound
1522        let mut batch = new_batch(
1523            &[1, 2, 3, 4],
1524            &[11, 12, 13, 14],
1525            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1526            &[21, 22, 23, 24],
1527        );
1528        batch
1529            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1530            .unwrap();
1531        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1532        assert_eq!(expect, batch);
1533
1534        // Test From variant with no matches
1535        let mut batch = new_batch(
1536            &[1, 2, 3, 4],
1537            &[11, 12, 13, 14],
1538            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1539            &[21, 22, 23, 24],
1540        );
1541        batch
1542            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1543            .unwrap();
1544        assert!(batch.is_empty());
1545
1546        // Test Range variant - exclusive lower bound, inclusive upper bound
1547        let mut batch = new_batch(
1548            &[1, 2, 3, 4, 5],
1549            &[11, 12, 13, 14, 15],
1550            &[
1551                OpType::Put,
1552                OpType::Put,
1553                OpType::Put,
1554                OpType::Put,
1555                OpType::Put,
1556            ],
1557            &[21, 22, 23, 24, 25],
1558        );
1559        batch
1560            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1561            .unwrap();
1562        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1563        assert_eq!(expect, batch);
1564
1565        // Test Range variant with mixed operations
1566        let mut batch = new_batch(
1567            &[1, 2, 3, 4, 5],
1568            &[11, 12, 13, 14, 15],
1569            &[
1570                OpType::Put,
1571                OpType::Delete,
1572                OpType::Put,
1573                OpType::Delete,
1574                OpType::Put,
1575            ],
1576            &[21, 22, 23, 24, 25],
1577        );
1578        batch
1579            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1580            .unwrap();
1581        let expect = new_batch(
1582            &[2, 3],
1583            &[12, 13],
1584            &[OpType::Delete, OpType::Put],
1585            &[22, 23],
1586        );
1587        assert_eq!(expect, batch);
1588
1589        // Test Range variant with no matches
1590        let mut batch = new_batch(
1591            &[1, 2, 3, 4],
1592            &[11, 12, 13, 14],
1593            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1594            &[21, 22, 23, 24],
1595        );
1596        batch
1597            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1598            .unwrap();
1599        assert!(batch.is_empty());
1600    }
1601
1602    #[test]
1603    fn test_merge_last_non_null_no_dup() {
1604        let mut batch = new_batch_with_u64_fields(
1605            &[1, 2],
1606            &[2, 1],
1607            &[OpType::Put, OpType::Put],
1608            &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1609        );
1610        let expect = batch.clone();
1611        batch.merge_last_non_null().unwrap();
1612        assert_eq!(expect, batch);
1613    }
1614
1615    #[test]
1616    fn test_merge_last_non_null_fill_null_fields() {
1617        // Rows are already sorted by timestamp asc and sequence desc.
1618        let mut batch = new_batch_with_u64_fields(
1619            &[1, 1, 1],
1620            &[3, 2, 1],
1621            &[OpType::Put, OpType::Put, OpType::Put],
1622            &[
1623                (1, &[None, Some(10), Some(11)]),
1624                (2, &[Some(100), Some(200), Some(300)]),
1625            ],
1626        );
1627        batch.merge_last_non_null().unwrap();
1628
1629        // Field 1 is filled from the first older row (seq=2). Field 2 keeps the base value.
1630        // Filled fields must not be overwritten by even older duplicates.
1631        let expect = new_batch_with_u64_fields(
1632            &[1],
1633            &[3],
1634            &[OpType::Put],
1635            &[(1, &[Some(10)]), (2, &[Some(100)])],
1636        );
1637        assert_eq!(expect, batch);
1638    }
1639
1640    #[test]
1641    fn test_merge_last_non_null_stop_at_delete_row() {
1642        // A delete row in older duplicates should stop filling to avoid resurrecting values before
1643        // deletion.
1644        let mut batch = new_batch_with_u64_fields(
1645            &[1, 1, 1],
1646            &[3, 2, 1],
1647            &[OpType::Put, OpType::Delete, OpType::Put],
1648            &[
1649                (1, &[None, Some(10), Some(11)]),
1650                (2, &[Some(100), Some(200), Some(300)]),
1651            ],
1652        );
1653        batch.merge_last_non_null().unwrap();
1654
1655        let expect = new_batch_with_u64_fields(
1656            &[1],
1657            &[3],
1658            &[OpType::Put],
1659            &[(1, &[None]), (2, &[Some(100)])],
1660        );
1661        assert_eq!(expect, batch);
1662    }
1663
1664    #[test]
1665    fn test_merge_last_non_null_base_delete_no_merge() {
1666        let mut batch = new_batch_with_u64_fields(
1667            &[1, 1],
1668            &[3, 2],
1669            &[OpType::Delete, OpType::Put],
1670            &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1671        );
1672        batch.merge_last_non_null().unwrap();
1673
1674        // Base row is delete, keep it as is and don't merge fields from older rows.
1675        let expect =
1676            new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1677        assert_eq!(expect, batch);
1678    }
1679
1680    #[test]
1681    fn test_merge_last_non_null_multiple_timestamp_groups() {
1682        let mut batch = new_batch_with_u64_fields(
1683            &[1, 1, 2, 3, 3],
1684            &[5, 4, 3, 2, 1],
1685            &[
1686                OpType::Put,
1687                OpType::Put,
1688                OpType::Put,
1689                OpType::Put,
1690                OpType::Put,
1691            ],
1692            &[
1693                (1, &[None, Some(10), Some(20), None, Some(30)]),
1694                (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1695            ],
1696        );
1697        batch.merge_last_non_null().unwrap();
1698
1699        let expect = new_batch_with_u64_fields(
1700            &[1, 2, 3],
1701            &[5, 3, 2],
1702            &[OpType::Put, OpType::Put, OpType::Put],
1703            &[
1704                (1, &[Some(10), Some(20), Some(30)]),
1705                (2, &[Some(100), Some(120), Some(130)]),
1706            ],
1707        );
1708        assert_eq!(expect, batch);
1709    }
1710
1711    #[test]
1712    fn test_merge_last_non_null_no_fields() {
1713        let mut batch = new_batch_without_fields(
1714            &[1, 1, 2],
1715            &[3, 2, 1],
1716            &[OpType::Put, OpType::Put, OpType::Put],
1717        );
1718        batch.merge_last_non_null().unwrap();
1719
1720        let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1721        assert_eq!(expect, batch);
1722    }
1723
1724    #[test]
1725    fn test_filter() {
1726        // Filters put only.
1727        let mut batch = new_batch(
1728            &[1, 2, 3, 4],
1729            &[11, 12, 13, 14],
1730            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1731            &[21, 22, 23, 24],
1732        );
1733        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1734        batch.filter(&predicate).unwrap();
1735        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1736        assert_eq!(expect, batch);
1737
1738        // Filters deletion.
1739        let mut batch = new_batch(
1740            &[1, 2, 3, 4],
1741            &[11, 12, 13, 14],
1742            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1743            &[21, 22, 23, 24],
1744        );
1745        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1746        batch.filter(&predicate).unwrap();
1747        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1748        assert_eq!(expect, batch);
1749
1750        // Filters to empty.
1751        let predicate = BooleanVector::from_vec(vec![false, false]);
1752        batch.filter(&predicate).unwrap();
1753        assert!(batch.is_empty());
1754    }
1755
1756    #[test]
1757    fn test_sort_and_dedup() {
1758        let original = new_batch(
1759            &[2, 3, 1, 4, 5, 2],
1760            &[1, 2, 3, 4, 5, 6],
1761            &[
1762                OpType::Put,
1763                OpType::Put,
1764                OpType::Put,
1765                OpType::Put,
1766                OpType::Put,
1767                OpType::Put,
1768            ],
1769            &[21, 22, 23, 24, 25, 26],
1770        );
1771
1772        let mut batch = original.clone();
1773        batch.sort(true).unwrap();
1774        // It should only keep one timestamp 2.
1775        assert_eq!(
1776            new_batch(
1777                &[1, 2, 3, 4, 5],
1778                &[3, 6, 2, 4, 5],
1779                &[
1780                    OpType::Put,
1781                    OpType::Put,
1782                    OpType::Put,
1783                    OpType::Put,
1784                    OpType::Put,
1785                ],
1786                &[23, 26, 22, 24, 25],
1787            ),
1788            batch
1789        );
1790
1791        let mut batch = original.clone();
1792        batch.sort(false).unwrap();
1793
1794        // It should only keep one timestamp 2.
1795        assert_eq!(
1796            new_batch(
1797                &[1, 2, 2, 3, 4, 5],
1798                &[3, 6, 1, 2, 4, 5],
1799                &[
1800                    OpType::Put,
1801                    OpType::Put,
1802                    OpType::Put,
1803                    OpType::Put,
1804                    OpType::Put,
1805                    OpType::Put,
1806                ],
1807                &[23, 26, 21, 22, 24, 25],
1808            ),
1809            batch
1810        );
1811
1812        let original = new_batch(
1813            &[2, 2, 1],
1814            &[1, 6, 1],
1815            &[OpType::Delete, OpType::Put, OpType::Put],
1816            &[21, 22, 23],
1817        );
1818
1819        let mut batch = original.clone();
1820        batch.sort(true).unwrap();
1821        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1822        assert_eq!(expect, batch);
1823
1824        let mut batch = original.clone();
1825        batch.sort(false).unwrap();
1826        let expect = new_batch(
1827            &[1, 2, 2],
1828            &[1, 6, 1],
1829            &[OpType::Put, OpType::Put, OpType::Delete],
1830            &[23, 22, 21],
1831        );
1832        assert_eq!(expect, batch);
1833    }
1834
1835    #[test]
1836    fn test_get_value() {
1837        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1838
1839        for encoding in encodings {
1840            let codec = build_primary_key_codec_with_fields(
1841                encoding,
1842                [
1843                    (
1844                        ReservedColumnId::table_id(),
1845                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1846                    ),
1847                    (
1848                        ReservedColumnId::tsid(),
1849                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1850                    ),
1851                    (
1852                        100,
1853                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1854                    ),
1855                    (
1856                        200,
1857                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1858                    ),
1859                ]
1860                .into_iter(),
1861            );
1862
1863            let values = [
1864                Value::UInt32(1000),
1865                Value::UInt64(2000),
1866                Value::String("abcdefgh".into()),
1867                Value::String("zyxwvu".into()),
1868            ];
1869            let mut buf = vec![];
1870            codec
1871                .encode_values(
1872                    &[
1873                        (ReservedColumnId::table_id(), values[0].clone()),
1874                        (ReservedColumnId::tsid(), values[1].clone()),
1875                        (100, values[2].clone()),
1876                        (200, values[3].clone()),
1877                    ],
1878                    &mut buf,
1879                )
1880                .unwrap();
1881
1882            let field_col_id = 2;
1883            let mut batch = new_batch_builder(
1884                &buf,
1885                &[1, 2, 3],
1886                &[1, 1, 1],
1887                &[OpType::Put, OpType::Put, OpType::Put],
1888                field_col_id,
1889                &[42, 43, 44],
1890            )
1891            .build()
1892            .unwrap();
1893
1894            let v = batch
1895                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1896                .unwrap()
1897                .unwrap();
1898            assert_eq!(values[0], *v);
1899
1900            let v = batch
1901                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1902                .unwrap()
1903                .unwrap();
1904            assert_eq!(values[1], *v);
1905
1906            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1907            assert_eq!(values[2], *v);
1908
1909            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1910            assert_eq!(values[3], *v);
1911
1912            let v = batch.field_col_value(field_col_id).unwrap();
1913            assert_eq!(v.data.get(0), Value::UInt64(42));
1914            assert_eq!(v.data.get(1), Value::UInt64(43));
1915            assert_eq!(v.data.get(2), Value::UInt64(44));
1916        }
1917    }
1918}