mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub(crate) mod pruner;
28pub mod range;
29pub mod scan_region;
30pub mod scan_util;
31pub(crate) mod seq_scan;
32pub mod series_scan;
33pub mod stream;
34pub(crate) mod unordered_scan;
35
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::time::Duration;
39
40use api::v1::OpType;
41use async_trait::async_trait;
42use common_time::Timestamp;
43use datafusion_common::arrow::array::UInt8Array;
44use datatypes::arrow;
45use datatypes::arrow::array::{Array, ArrayRef};
46use datatypes::arrow::compute::SortOptions;
47use datatypes::arrow::record_batch::RecordBatch;
48use datatypes::arrow::row::{RowConverter, SortField};
49use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
50use datatypes::scalars::ScalarVectorBuilder;
51use datatypes::types::TimestampType;
52use datatypes::value::{Value, ValueRef};
53use datatypes::vectors::{
54    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
55    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
56    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
57    VectorRef,
58};
59use futures::TryStreamExt;
60use futures::stream::BoxStream;
61use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
62use snafu::{OptionExt, ResultExt, ensure};
63use store_api::metadata::RegionMetadata;
64use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
65
66use crate::error::{
67    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
68    Result,
69};
70use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
71use crate::read::prune::PruneReader;
72
73/// Storage internal representation of a batch of rows for a primary key (time series).
74///
75/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
76/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
77#[derive(Debug, PartialEq, Clone)]
78pub struct Batch {
79    /// Primary key encoded in a comparable form.
80    primary_key: Vec<u8>,
81    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
82    pk_values: Option<CompositeValues>,
83    /// Timestamps of rows, should be sorted and not null.
84    timestamps: VectorRef,
85    /// Sequences of rows
86    ///
87    /// UInt64 type, not null.
88    sequences: Arc<UInt64Vector>,
89    /// Op types of rows
90    ///
91    /// UInt8 type, not null.
92    op_types: Arc<UInt8Vector>,
93    /// Fields organized in columnar format.
94    fields: Vec<BatchColumn>,
95    /// Cache for field index lookup.
96    fields_idx: Option<HashMap<ColumnId, usize>>,
97}
98
99impl Batch {
100    /// Creates a new batch.
101    pub fn new(
102        primary_key: Vec<u8>,
103        timestamps: VectorRef,
104        sequences: Arc<UInt64Vector>,
105        op_types: Arc<UInt8Vector>,
106        fields: Vec<BatchColumn>,
107    ) -> Result<Batch> {
108        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
109            .with_fields(fields)
110            .build()
111    }
112
113    /// Tries to set fields for the batch.
114    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
115        Batch::new(
116            self.primary_key,
117            self.timestamps,
118            self.sequences,
119            self.op_types,
120            fields,
121        )
122    }
123
124    /// Returns primary key of the batch.
125    pub fn primary_key(&self) -> &[u8] {
126        &self.primary_key
127    }
128
129    /// Returns possibly decoded primary-key values.
130    pub fn pk_values(&self) -> Option<&CompositeValues> {
131        self.pk_values.as_ref()
132    }
133
134    /// Sets possibly decoded primary-key values.
135    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
136        self.pk_values = Some(pk_values);
137    }
138
139    /// Removes possibly decoded primary-key values. For testing only.
140    #[cfg(any(test, feature = "test"))]
141    pub fn remove_pk_values(&mut self) {
142        self.pk_values = None;
143    }
144
145    /// Returns fields in the batch.
146    pub fn fields(&self) -> &[BatchColumn] {
147        &self.fields
148    }
149
150    /// Returns timestamps of the batch.
151    pub fn timestamps(&self) -> &VectorRef {
152        &self.timestamps
153    }
154
155    /// Returns sequences of the batch.
156    pub fn sequences(&self) -> &Arc<UInt64Vector> {
157        &self.sequences
158    }
159
160    /// Returns op types of the batch.
161    pub fn op_types(&self) -> &Arc<UInt8Vector> {
162        &self.op_types
163    }
164
165    /// Returns the number of rows in the batch.
166    pub fn num_rows(&self) -> usize {
167        // All vectors have the same length. We use the length of sequences vector
168        // since it has static type.
169        self.sequences.len()
170    }
171
172    /// Create an empty [`Batch`].
173    pub(crate) fn empty() -> Self {
174        Self {
175            primary_key: vec![],
176            pk_values: None,
177            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
178            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
179            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
180            fields: vec![],
181            fields_idx: None,
182        }
183    }
184
185    /// Returns true if the number of rows in the batch is 0.
186    pub fn is_empty(&self) -> bool {
187        self.num_rows() == 0
188    }
189
190    /// Returns the first timestamp in the batch or `None` if the batch is empty.
191    pub fn first_timestamp(&self) -> Option<Timestamp> {
192        if self.timestamps.is_empty() {
193            return None;
194        }
195
196        Some(self.get_timestamp(0))
197    }
198
199    /// Returns the last timestamp in the batch or `None` if the batch is empty.
200    pub fn last_timestamp(&self) -> Option<Timestamp> {
201        if self.timestamps.is_empty() {
202            return None;
203        }
204
205        Some(self.get_timestamp(self.timestamps.len() - 1))
206    }
207
208    /// Returns the first sequence in the batch or `None` if the batch is empty.
209    pub fn first_sequence(&self) -> Option<SequenceNumber> {
210        if self.sequences.is_empty() {
211            return None;
212        }
213
214        Some(self.get_sequence(0))
215    }
216
217    /// Returns the last sequence in the batch or `None` if the batch is empty.
218    pub fn last_sequence(&self) -> Option<SequenceNumber> {
219        if self.sequences.is_empty() {
220            return None;
221        }
222
223        Some(self.get_sequence(self.sequences.len() - 1))
224    }
225
226    /// Replaces the primary key of the batch.
227    ///
228    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
229    /// Be sure to update that field as well.
230    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
231        self.primary_key = primary_key;
232    }
233
234    /// Slice the batch, returning a new batch.
235    ///
236    /// # Panics
237    /// Panics if `offset + length > self.num_rows()`.
238    pub fn slice(&self, offset: usize, length: usize) -> Batch {
239        let fields = self
240            .fields
241            .iter()
242            .map(|column| BatchColumn {
243                column_id: column.column_id,
244                data: column.data.slice(offset, length),
245            })
246            .collect();
247        // We skip using the builder to avoid validating the batch again.
248        Batch {
249            // Now we need to clone the primary key. We could try `Bytes` if
250            // this becomes a bottleneck.
251            primary_key: self.primary_key.clone(),
252            pk_values: self.pk_values.clone(),
253            timestamps: self.timestamps.slice(offset, length),
254            sequences: Arc::new(self.sequences.get_slice(offset, length)),
255            op_types: Arc::new(self.op_types.get_slice(offset, length)),
256            fields,
257            fields_idx: self.fields_idx.clone(),
258        }
259    }
260
261    /// Takes `batches` and concat them into one batch.
262    ///
263    /// All `batches` must have the same primary key.
264    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
265        ensure!(
266            !batches.is_empty(),
267            InvalidBatchSnafu {
268                reason: "empty batches",
269            }
270        );
271        if batches.len() == 1 {
272            // Now we own the `batches` so we could pop it directly.
273            return Ok(batches.pop().unwrap());
274        }
275
276        let primary_key = std::mem::take(&mut batches[0].primary_key);
277        let first = &batches[0];
278        // We took the primary key from the first batch so we don't use `first.primary_key()`.
279        ensure!(
280            batches
281                .iter()
282                .skip(1)
283                .all(|b| b.primary_key() == primary_key),
284            InvalidBatchSnafu {
285                reason: "batches have different primary key",
286            }
287        );
288        for b in batches.iter().skip(1) {
289            ensure!(
290                b.fields.len() == first.fields.len(),
291                InvalidBatchSnafu {
292                    reason: "batches have different field num",
293                }
294            );
295            for (l, r) in b.fields.iter().zip(&first.fields) {
296                ensure!(
297                    l.column_id == r.column_id,
298                    InvalidBatchSnafu {
299                        reason: "batches have different fields",
300                    }
301                );
302            }
303        }
304
305        // We take the primary key from the first batch.
306        let mut builder = BatchBuilder::new(primary_key);
307        // Concat timestamps, sequences, op_types, fields.
308        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
309        builder.timestamps_array(array)?;
310        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
311        builder.sequences_array(array)?;
312        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
313        builder.op_types_array(array)?;
314        for (i, batch_column) in first.fields.iter().enumerate() {
315            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
316            builder.push_field_array(batch_column.column_id, array)?;
317        }
318
319        builder.build()
320    }
321
322    /// Removes rows whose op type is delete.
323    pub fn filter_deleted(&mut self) -> Result<()> {
324        // Safety: op type column is not null.
325        let array = self.op_types.as_arrow();
326        // Find rows with non-delete op type.
327        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
328        let predicate =
329            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
330        self.filter(&BooleanVector::from(predicate))
331    }
332
333    // Applies the `predicate` to the batch.
334    // Safety: We know the array type so we unwrap on casting.
335    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
336        self.timestamps = self
337            .timestamps
338            .filter(predicate)
339            .context(ComputeVectorSnafu)?;
340        self.sequences = Arc::new(
341            UInt64Vector::try_from_arrow_array(
342                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
343                    .context(ComputeArrowSnafu)?,
344            )
345            .unwrap(),
346        );
347        self.op_types = Arc::new(
348            UInt8Vector::try_from_arrow_array(
349                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
350                    .context(ComputeArrowSnafu)?,
351            )
352            .unwrap(),
353        );
354        for batch_column in &mut self.fields {
355            batch_column.data = batch_column
356                .data
357                .filter(predicate)
358                .context(ComputeVectorSnafu)?;
359        }
360
361        Ok(())
362    }
363
364    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
365    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
366        let seq_range = match sequence {
367            None => return Ok(()),
368            Some(seq_range) => {
369                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
370                else {
371                    return Ok(());
372                };
373                let is_subset = match seq_range {
374                    SequenceRange::Gt { min } => min < first,
375                    SequenceRange::LtEq { max } => max >= last,
376                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
377                };
378                if is_subset {
379                    return Ok(());
380                }
381                seq_range
382            }
383        };
384
385        let seqs = self.sequences.as_arrow();
386        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
387
388        let predicate = BooleanVector::from(predicate);
389        self.filter(&predicate)?;
390
391        Ok(())
392    }
393
394    /// Sorts rows in the batch. If `dedup` is true, it also removes
395    /// duplicated rows according to primary keys.
396    ///
397    /// It orders rows by timestamp, sequence desc and only keep the latest
398    /// row for the same timestamp. It doesn't consider op type as sequence
399    /// should already provide uniqueness for a row.
400    pub fn sort(&mut self, dedup: bool) -> Result<()> {
401        // If building a converter each time is costly, we may allow passing a
402        // converter.
403        let converter = RowConverter::new(vec![
404            SortField::new(self.timestamps.data_type().as_arrow_type()),
405            SortField::new_with_options(
406                self.sequences.data_type().as_arrow_type(),
407                SortOptions {
408                    descending: true,
409                    ..Default::default()
410                },
411            ),
412        ])
413        .context(ComputeArrowSnafu)?;
414        // Columns to sort.
415        let columns = [
416            self.timestamps.to_arrow_array(),
417            self.sequences.to_arrow_array(),
418        ];
419        let rows = converter.convert_columns(&columns).unwrap();
420        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
421
422        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
423        if !was_sorted {
424            to_sort.sort_unstable_by_key(|x| x.1);
425        }
426
427        let num_rows = to_sort.len();
428        if dedup {
429            // Dedup by timestamps.
430            to_sort.dedup_by(|left, right| {
431                debug_assert_eq!(18, left.1.as_ref().len());
432                debug_assert_eq!(18, right.1.as_ref().len());
433                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
434                // We only compare the timestamp part and ignore sequence.
435                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
436            });
437        }
438        let no_dedup = to_sort.len() == num_rows;
439
440        if was_sorted && no_dedup {
441            return Ok(());
442        }
443        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
444        self.take_in_place(&indices)
445    }
446
447    /// Merges duplicated timestamps in the batch by keeping the latest non-null field values.
448    ///
449    /// Rows must already be sorted by timestamp (ascending) and sequence (descending).
450    ///
451    /// This method deduplicates rows with the same timestamp (keeping the first row in each
452    /// timestamp range as the base row) and fills null fields from subsequent rows until all
453    /// fields are filled or a delete operation is encountered.
454    pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
455        let num_rows = self.num_rows();
456        if num_rows < 2 {
457            return Ok(());
458        }
459
460        let Some(timestamps) = self.timestamps_native() else {
461            return Ok(());
462        };
463
464        // Fast path: check if there are any duplicate timestamps.
465        let mut has_dup = false;
466        let mut group_count = 1;
467        for i in 1..num_rows {
468            has_dup |= timestamps[i] == timestamps[i - 1];
469            group_count += (timestamps[i] != timestamps[i - 1]) as usize;
470        }
471        if !has_dup {
472            return Ok(());
473        }
474
475        let num_fields = self.fields.len();
476        let op_types = self.op_types.as_arrow().values();
477
478        let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
479        let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
480            .map(|_| Vec::with_capacity(group_count))
481            .collect();
482
483        let mut start = 0;
484        while start < num_rows {
485            let ts = timestamps[start];
486            let mut end = start + 1;
487            while end < num_rows && timestamps[end] == ts {
488                end += 1;
489            }
490
491            let group_pos = base_indices.len();
492            base_indices.push(start as u32);
493
494            if num_fields > 0 {
495                // Default: take the base row for all fields.
496                for idx in &mut field_indices {
497                    idx.push(start as u32);
498                }
499
500                let base_deleted = op_types[start] == OpType::Delete as u8;
501                if !base_deleted {
502                    // Track fields that are null in the base row and try to fill them from older
503                    // rows in the same timestamp range.
504                    let mut missing_fields = Vec::new();
505                    for (field_idx, col) in self.fields.iter().enumerate() {
506                        if col.data.is_null(start) {
507                            missing_fields.push(field_idx);
508                        }
509                    }
510
511                    if !missing_fields.is_empty() {
512                        for row_idx in (start + 1)..end {
513                            if op_types[row_idx] == OpType::Delete as u8 {
514                                break;
515                            }
516
517                            missing_fields.retain(|&field_idx| {
518                                if self.fields[field_idx].data.is_null(row_idx) {
519                                    true
520                                } else {
521                                    field_indices[field_idx][group_pos] = row_idx as u32;
522                                    false
523                                }
524                            });
525
526                            if missing_fields.is_empty() {
527                                break;
528                            }
529                        }
530                    }
531                }
532            }
533
534            start = end;
535        }
536
537        let base_indices = UInt32Vector::from_vec(base_indices);
538        self.timestamps = self
539            .timestamps
540            .take(&base_indices)
541            .context(ComputeVectorSnafu)?;
542        let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
543            .context(ComputeArrowSnafu)?;
544        // Safety: We know the array and vector type.
545        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
546        let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
547            .context(ComputeArrowSnafu)?;
548        // Safety: We know the array and vector type.
549        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
550
551        for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
552            let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
553            batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
554        }
555
556        Ok(())
557    }
558
559    /// Returns the estimated memory size of the batch.
560    pub fn memory_size(&self) -> usize {
561        let mut size = std::mem::size_of::<Self>();
562        size += self.primary_key.len();
563        size += self.timestamps.memory_size();
564        size += self.sequences.memory_size();
565        size += self.op_types.memory_size();
566        for batch_column in &self.fields {
567            size += batch_column.data.memory_size();
568        }
569        size
570    }
571
572    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
573    pub(crate) fn projected_fields(
574        metadata: &RegionMetadata,
575        projection: &[ColumnId],
576    ) -> Vec<(ColumnId, ConcreteDataType)> {
577        let projected_ids: HashSet<_> = projection.iter().copied().collect();
578        metadata
579            .field_columns()
580            .filter_map(|column| {
581                if projected_ids.contains(&column.column_id) {
582                    Some((column.column_id, column.column_schema.data_type.clone()))
583                } else {
584                    None
585                }
586            })
587            .collect()
588    }
589
590    /// Returns timestamps in a native slice or `None` if the batch is empty.
591    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
592        if self.timestamps.is_empty() {
593            return None;
594        }
595
596        let values = match self.timestamps.data_type() {
597            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
598                .timestamps
599                .as_any()
600                .downcast_ref::<TimestampSecondVector>()
601                .unwrap()
602                .as_arrow()
603                .values(),
604            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
605                .timestamps
606                .as_any()
607                .downcast_ref::<TimestampMillisecondVector>()
608                .unwrap()
609                .as_arrow()
610                .values(),
611            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
612                .timestamps
613                .as_any()
614                .downcast_ref::<TimestampMicrosecondVector>()
615                .unwrap()
616                .as_arrow()
617                .values(),
618            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
619                .timestamps
620                .as_any()
621                .downcast_ref::<TimestampNanosecondVector>()
622                .unwrap()
623                .as_arrow()
624                .values(),
625            other => panic!("timestamps in a Batch has other type {:?}", other),
626        };
627
628        Some(values)
629    }
630
631    /// Takes the batch in place.
632    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
633        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
634        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
635            .context(ComputeArrowSnafu)?;
636        // Safety: we know the array and vector type.
637        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
638        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
639            .context(ComputeArrowSnafu)?;
640        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
641        for batch_column in &mut self.fields {
642            batch_column.data = batch_column
643                .data
644                .take(indices)
645                .context(ComputeVectorSnafu)?;
646        }
647
648        Ok(())
649    }
650
651    /// Gets a timestamp at given `index`.
652    ///
653    /// # Panics
654    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
655    fn get_timestamp(&self, index: usize) -> Timestamp {
656        match self.timestamps.get_ref(index) {
657            ValueRef::Timestamp(timestamp) => timestamp,
658
659            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
660            value => panic!("{:?} is not a timestamp", value),
661        }
662    }
663
664    /// Gets a sequence at given `index`.
665    ///
666    /// # Panics
667    /// Panics if `index` is out-of-bound or the sequence vector returns null.
668    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
669        // Safety: sequences is not null so it actually returns Some.
670        self.sequences.get_data(index).unwrap()
671    }
672
673    /// Checks the batch is monotonic by timestamps.
674    #[cfg(debug_assertions)]
675    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
676        use std::cmp::Ordering;
677        if self.timestamps_native().is_none() {
678            return Ok(());
679        }
680
681        let timestamps = self.timestamps_native().unwrap();
682        let sequences = self.sequences.as_arrow().values();
683        for (i, window) in timestamps.windows(2).enumerate() {
684            let current = window[0];
685            let next = window[1];
686            let current_sequence = sequences[i];
687            let next_sequence = sequences[i + 1];
688            match current.cmp(&next) {
689                Ordering::Less => {
690                    // The current timestamp is less than the next timestamp.
691                    continue;
692                }
693                Ordering::Equal => {
694                    // The current timestamp is equal to the next timestamp.
695                    if current_sequence < next_sequence {
696                        return Err(format!(
697                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
698                            current, next, current_sequence, next_sequence, i
699                        ));
700                    }
701                }
702                Ordering::Greater => {
703                    // The current timestamp is greater than the next timestamp.
704                    return Err(format!(
705                        "timestamps are not monotonic: {} > {}, index: {}",
706                        current, next, i
707                    ));
708                }
709            }
710        }
711
712        Ok(())
713    }
714
715    /// Returns Ok if the given batch is behind the current batch.
716    #[cfg(debug_assertions)]
717    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
718        // Checks the primary key
719        if self.primary_key() < other.primary_key() {
720            return Ok(());
721        }
722        if self.primary_key() > other.primary_key() {
723            return Err(format!(
724                "primary key is not monotonic: {:?} > {:?}",
725                self.primary_key(),
726                other.primary_key()
727            ));
728        }
729        // Checks the timestamp.
730        if self.last_timestamp() < other.first_timestamp() {
731            return Ok(());
732        }
733        if self.last_timestamp() > other.first_timestamp() {
734            return Err(format!(
735                "timestamps are not monotonic: {:?} > {:?}",
736                self.last_timestamp(),
737                other.first_timestamp()
738            ));
739        }
740        // Checks the sequence.
741        if self.last_sequence() >= other.first_sequence() {
742            return Ok(());
743        }
744        Err(format!(
745            "sequences are not monotonic: {:?} < {:?}",
746            self.last_sequence(),
747            other.first_sequence()
748        ))
749    }
750
751    /// Returns the value of the column in the primary key.
752    ///
753    /// Lazily decodes the primary key and caches the result.
754    pub fn pk_col_value(
755        &mut self,
756        codec: &dyn PrimaryKeyCodec,
757        col_idx_in_pk: usize,
758        column_id: ColumnId,
759    ) -> Result<Option<&Value>> {
760        if self.pk_values.is_none() {
761            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
762        }
763
764        let pk_values = self.pk_values.as_ref().unwrap();
765        Ok(match pk_values {
766            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
767            CompositeValues::Sparse(values) => values.get(&column_id),
768        })
769    }
770
771    /// Returns values of the field in the batch.
772    ///
773    /// Lazily caches the field index.
774    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
775        if self.fields_idx.is_none() {
776            self.fields_idx = Some(
777                self.fields
778                    .iter()
779                    .enumerate()
780                    .map(|(i, c)| (c.column_id, i))
781                    .collect(),
782            );
783        }
784
785        self.fields_idx
786            .as_ref()
787            .unwrap()
788            .get(&column_id)
789            .map(|&idx| &self.fields[idx])
790    }
791}
792
793/// A struct to check the batch is monotonic.
794#[cfg(debug_assertions)]
795#[derive(Default)]
796pub(crate) struct BatchChecker {
797    last_batch: Option<Batch>,
798    start: Option<Timestamp>,
799    end: Option<Timestamp>,
800}
801
802#[cfg(debug_assertions)]
803impl BatchChecker {
804    /// Attaches the given start timestamp to the checker.
805    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
806        self.start = start;
807        self
808    }
809
810    /// Attaches the given end timestamp to the checker.
811    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
812        self.end = end;
813        self
814    }
815
816    /// Returns true if the given batch is monotonic and behind
817    /// the last batch.
818    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
819        batch.check_monotonic()?;
820
821        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
822            && start > first
823        {
824            return Err(format!(
825                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
826                start, first
827            ));
828        }
829        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
830            && end <= last
831        {
832            return Err(format!(
833                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
834                end, last
835            ));
836        }
837
838        // Checks the batch is behind the last batch.
839        // Then Updates the last batch.
840        let res = self
841            .last_batch
842            .as_ref()
843            .map(|last| last.check_next_batch(batch))
844            .unwrap_or(Ok(()));
845        self.last_batch = Some(batch.clone());
846        res
847    }
848
849    /// Formats current batch and last batch for debug.
850    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
851        use std::fmt::Write;
852
853        let mut message = String::new();
854        if let Some(last) = &self.last_batch {
855            write!(
856                message,
857                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
858                last.primary_key(),
859                last.last_timestamp(),
860                last.last_sequence()
861            )
862            .unwrap();
863        }
864        write!(
865            message,
866            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
867            batch.primary_key(),
868            batch.timestamps(),
869            batch.sequences()
870        )
871        .unwrap();
872
873        message
874    }
875
876    /// Checks batches from the part range are monotonic. Otherwise, panics.
877    pub(crate) fn ensure_part_range_batch(
878        &mut self,
879        scanner: &str,
880        region_id: store_api::storage::RegionId,
881        partition: usize,
882        part_range: store_api::region_engine::PartitionRange,
883        batch: &Batch,
884    ) {
885        if let Err(e) = self.check_monotonic(batch) {
886            let err_msg = format!(
887                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
888                scanner, e, region_id, partition, part_range,
889            );
890            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
891            // Only print the number of row in the panic message.
892            panic!("{err_msg}, batch rows: {}", batch.num_rows());
893        }
894    }
895}
896
897/// Len of timestamp in arrow row format.
898const TIMESTAMP_KEY_LEN: usize = 9;
899
900/// Helper function to concat arrays from `iter`.
901fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
902    let arrays: Vec<_> = iter.collect();
903    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
904    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
905}
906
907/// A column in a [Batch].
908#[derive(Debug, PartialEq, Eq, Clone)]
909pub struct BatchColumn {
910    /// Id of the column.
911    pub column_id: ColumnId,
912    /// Data of the column.
913    pub data: VectorRef,
914}
915
916/// Builder to build [Batch].
917pub struct BatchBuilder {
918    primary_key: Vec<u8>,
919    timestamps: Option<VectorRef>,
920    sequences: Option<Arc<UInt64Vector>>,
921    op_types: Option<Arc<UInt8Vector>>,
922    fields: Vec<BatchColumn>,
923}
924
925impl BatchBuilder {
926    /// Creates a new [BatchBuilder] with primary key.
927    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
928        BatchBuilder {
929            primary_key,
930            timestamps: None,
931            sequences: None,
932            op_types: None,
933            fields: Vec::new(),
934        }
935    }
936
937    /// Creates a new [BatchBuilder] with all required columns.
938    pub fn with_required_columns(
939        primary_key: Vec<u8>,
940        timestamps: VectorRef,
941        sequences: Arc<UInt64Vector>,
942        op_types: Arc<UInt8Vector>,
943    ) -> BatchBuilder {
944        BatchBuilder {
945            primary_key,
946            timestamps: Some(timestamps),
947            sequences: Some(sequences),
948            op_types: Some(op_types),
949            fields: Vec::new(),
950        }
951    }
952
953    /// Set all field columns.
954    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
955        self.fields = fields;
956        self
957    }
958
959    /// Push a field column.
960    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
961        self.fields.push(column);
962        self
963    }
964
965    /// Push an array as a field.
966    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
967        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
968        self.fields.push(BatchColumn {
969            column_id,
970            data: vector,
971        });
972
973        Ok(self)
974    }
975
976    /// Try to set an array as timestamps.
977    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
978        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
979        ensure!(
980            vector.data_type().is_timestamp(),
981            InvalidBatchSnafu {
982                reason: format!("{:?} is not a timestamp type", vector.data_type()),
983            }
984        );
985
986        self.timestamps = Some(vector);
987        Ok(self)
988    }
989
990    /// Try to set an array as sequences.
991    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
992        ensure!(
993            *array.data_type() == arrow::datatypes::DataType::UInt64,
994            InvalidBatchSnafu {
995                reason: "sequence array is not UInt64 type",
996            }
997        );
998        // Safety: The cast must success as we have ensured it is uint64 type.
999        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1000        self.sequences = Some(vector);
1001
1002        Ok(self)
1003    }
1004
1005    /// Try to set an array as op types.
1006    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1007        ensure!(
1008            *array.data_type() == arrow::datatypes::DataType::UInt8,
1009            InvalidBatchSnafu {
1010                reason: "sequence array is not UInt8 type",
1011            }
1012        );
1013        // Safety: The cast must success as we have ensured it is uint64 type.
1014        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1015        self.op_types = Some(vector);
1016
1017        Ok(self)
1018    }
1019
1020    /// Builds the [Batch].
1021    pub fn build(self) -> Result<Batch> {
1022        let timestamps = self.timestamps.context(InvalidBatchSnafu {
1023            reason: "missing timestamps",
1024        })?;
1025        let sequences = self.sequences.context(InvalidBatchSnafu {
1026            reason: "missing sequences",
1027        })?;
1028        let op_types = self.op_types.context(InvalidBatchSnafu {
1029            reason: "missing op_types",
1030        })?;
1031        // Our storage format ensure these columns are not nullable so
1032        // we use assert here.
1033        assert_eq!(0, timestamps.null_count());
1034        assert_eq!(0, sequences.null_count());
1035        assert_eq!(0, op_types.null_count());
1036
1037        let ts_len = timestamps.len();
1038        ensure!(
1039            sequences.len() == ts_len,
1040            InvalidBatchSnafu {
1041                reason: format!(
1042                    "sequence have different len {} != {}",
1043                    sequences.len(),
1044                    ts_len
1045                ),
1046            }
1047        );
1048        ensure!(
1049            op_types.len() == ts_len,
1050            InvalidBatchSnafu {
1051                reason: format!(
1052                    "op type have different len {} != {}",
1053                    op_types.len(),
1054                    ts_len
1055                ),
1056            }
1057        );
1058        for column in &self.fields {
1059            ensure!(
1060                column.data.len() == ts_len,
1061                InvalidBatchSnafu {
1062                    reason: format!(
1063                        "column {} has different len {} != {}",
1064                        column.column_id,
1065                        column.data.len(),
1066                        ts_len
1067                    ),
1068                }
1069            );
1070        }
1071
1072        Ok(Batch {
1073            primary_key: self.primary_key,
1074            pk_values: None,
1075            timestamps,
1076            sequences,
1077            op_types,
1078            fields: self.fields,
1079            fields_idx: None,
1080        })
1081    }
1082}
1083
1084impl From<Batch> for BatchBuilder {
1085    fn from(batch: Batch) -> Self {
1086        Self {
1087            primary_key: batch.primary_key,
1088            timestamps: Some(batch.timestamps),
1089            sequences: Some(batch.sequences),
1090            op_types: Some(batch.op_types),
1091            fields: batch.fields,
1092        }
1093    }
1094}
1095
1096/// Async [Batch] reader and iterator wrapper.
1097///
1098/// This is the data source for SST writers or internal readers.
1099pub enum Source {
1100    /// Source from a [BoxedBatchReader].
1101    Reader(BoxedBatchReader),
1102    /// Source from a [BoxedBatchIterator].
1103    Iter(BoxedBatchIterator),
1104    /// Source from a [BoxedBatchStream].
1105    Stream(BoxedBatchStream),
1106    /// Source from a [PruneReader].
1107    PruneReader(PruneReader),
1108}
1109
1110impl Source {
1111    /// Returns next [Batch] from this data source.
1112    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1113        match self {
1114            Source::Reader(reader) => reader.next_batch().await,
1115            Source::Iter(iter) => iter.next().transpose(),
1116            Source::Stream(stream) => stream.try_next().await,
1117            Source::PruneReader(reader) => reader.next_batch().await,
1118        }
1119    }
1120}
1121
1122/// Async [RecordBatch] reader and iterator wrapper for flat format.
1123pub enum FlatSource {
1124    /// Source from a [BoxedRecordBatchIterator].
1125    Iter(BoxedRecordBatchIterator),
1126    /// Source from a [BoxedRecordBatchStream].
1127    Stream(BoxedRecordBatchStream),
1128}
1129
1130impl FlatSource {
1131    /// Returns next [RecordBatch] from this data source.
1132    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1133        match self {
1134            FlatSource::Iter(iter) => iter.next().transpose(),
1135            FlatSource::Stream(stream) => stream.try_next().await,
1136        }
1137    }
1138}
1139
1140/// Async batch reader.
1141///
1142/// The reader must guarantee [Batch]es returned by it have the same schema.
1143#[async_trait]
1144pub trait BatchReader: Send {
1145    /// Fetch next [Batch].
1146    ///
1147    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1148    /// again won't return batch again.
1149    ///
1150    /// If `Err` is returned, caller should not call this method again, the implementor
1151    /// may or may not panic in such case.
1152    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1153}
1154
1155/// Pointer to [BatchReader].
1156pub type BoxedBatchReader = Box<dyn BatchReader>;
1157
1158/// Pointer to a stream that yields [Batch].
1159pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1160
1161/// Pointer to a stream that yields [RecordBatch].
1162pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1163
1164#[async_trait::async_trait]
1165impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1166    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1167        (**self).next_batch().await
1168    }
1169}
1170
1171/// Local metrics for scanners.
1172#[derive(Debug, Default)]
1173pub(crate) struct ScannerMetrics {
1174    /// Duration to scan data.
1175    scan_cost: Duration,
1176    /// Duration while waiting for `yield`.
1177    yield_cost: Duration,
1178    /// Number of batches returned.
1179    num_batches: usize,
1180    /// Number of rows returned.
1181    num_rows: usize,
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1187    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1188    use store_api::codec::PrimaryKeyEncoding;
1189    use store_api::storage::consts::ReservedColumnId;
1190
1191    use super::*;
1192    use crate::error::Error;
1193    use crate::test_util::new_batch_builder;
1194
1195    fn new_batch(
1196        timestamps: &[i64],
1197        sequences: &[u64],
1198        op_types: &[OpType],
1199        field: &[u64],
1200    ) -> Batch {
1201        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1202            .build()
1203            .unwrap()
1204    }
1205
1206    fn new_batch_with_u64_fields(
1207        timestamps: &[i64],
1208        sequences: &[u64],
1209        op_types: &[OpType],
1210        fields: &[(ColumnId, &[Option<u64>])],
1211    ) -> Batch {
1212        assert_eq!(timestamps.len(), sequences.len());
1213        assert_eq!(timestamps.len(), op_types.len());
1214        for (_, values) in fields {
1215            assert_eq!(timestamps.len(), values.len());
1216        }
1217
1218        let mut builder = BatchBuilder::new(b"test".to_vec());
1219        builder
1220            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1221                timestamps.iter().copied(),
1222            )))
1223            .unwrap()
1224            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1225                sequences.iter().copied(),
1226            )))
1227            .unwrap()
1228            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1229                op_types.iter().map(|v| *v as u8),
1230            )))
1231            .unwrap();
1232
1233        for (col_id, values) in fields {
1234            builder
1235                .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1236                .unwrap();
1237        }
1238
1239        builder.build().unwrap()
1240    }
1241
1242    fn new_batch_without_fields(
1243        timestamps: &[i64],
1244        sequences: &[u64],
1245        op_types: &[OpType],
1246    ) -> Batch {
1247        assert_eq!(timestamps.len(), sequences.len());
1248        assert_eq!(timestamps.len(), op_types.len());
1249
1250        let mut builder = BatchBuilder::new(b"test".to_vec());
1251        builder
1252            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1253                timestamps.iter().copied(),
1254            )))
1255            .unwrap()
1256            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1257                sequences.iter().copied(),
1258            )))
1259            .unwrap()
1260            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1261                op_types.iter().map(|v| *v as u8),
1262            )))
1263            .unwrap();
1264
1265        builder.build().unwrap()
1266    }
1267
1268    #[test]
1269    fn test_empty_batch() {
1270        let batch = Batch::empty();
1271        assert!(batch.is_empty());
1272        assert_eq!(None, batch.first_timestamp());
1273        assert_eq!(None, batch.last_timestamp());
1274        assert_eq!(None, batch.first_sequence());
1275        assert_eq!(None, batch.last_sequence());
1276        assert!(batch.timestamps_native().is_none());
1277    }
1278
1279    #[test]
1280    fn test_first_last_one() {
1281        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1282        assert_eq!(
1283            Timestamp::new_millisecond(1),
1284            batch.first_timestamp().unwrap()
1285        );
1286        assert_eq!(
1287            Timestamp::new_millisecond(1),
1288            batch.last_timestamp().unwrap()
1289        );
1290        assert_eq!(2, batch.first_sequence().unwrap());
1291        assert_eq!(2, batch.last_sequence().unwrap());
1292    }
1293
1294    #[test]
1295    fn test_first_last_multiple() {
1296        let batch = new_batch(
1297            &[1, 2, 3],
1298            &[11, 12, 13],
1299            &[OpType::Put, OpType::Put, OpType::Put],
1300            &[21, 22, 23],
1301        );
1302        assert_eq!(
1303            Timestamp::new_millisecond(1),
1304            batch.first_timestamp().unwrap()
1305        );
1306        assert_eq!(
1307            Timestamp::new_millisecond(3),
1308            batch.last_timestamp().unwrap()
1309        );
1310        assert_eq!(11, batch.first_sequence().unwrap());
1311        assert_eq!(13, batch.last_sequence().unwrap());
1312    }
1313
1314    #[test]
1315    fn test_slice() {
1316        let batch = new_batch(
1317            &[1, 2, 3, 4],
1318            &[11, 12, 13, 14],
1319            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1320            &[21, 22, 23, 24],
1321        );
1322        let batch = batch.slice(1, 2);
1323        let expect = new_batch(
1324            &[2, 3],
1325            &[12, 13],
1326            &[OpType::Delete, OpType::Put],
1327            &[22, 23],
1328        );
1329        assert_eq!(expect, batch);
1330    }
1331
1332    #[test]
1333    fn test_timestamps_native() {
1334        let batch = new_batch(
1335            &[1, 2, 3, 4],
1336            &[11, 12, 13, 14],
1337            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1338            &[21, 22, 23, 24],
1339        );
1340        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1341    }
1342
1343    #[test]
1344    fn test_concat_empty() {
1345        let err = Batch::concat(vec![]).unwrap_err();
1346        assert!(
1347            matches!(err, Error::InvalidBatch { .. }),
1348            "unexpected err: {err}"
1349        );
1350    }
1351
1352    #[test]
1353    fn test_concat_one() {
1354        let batch = new_batch(&[], &[], &[], &[]);
1355        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1356        assert_eq!(batch, actual);
1357
1358        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1359        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1360        assert_eq!(batch, actual);
1361    }
1362
1363    #[test]
1364    fn test_concat_multiple() {
1365        let batches = vec![
1366            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1367            new_batch(
1368                &[3, 4, 5],
1369                &[13, 14, 15],
1370                &[OpType::Put, OpType::Delete, OpType::Put],
1371                &[23, 24, 25],
1372            ),
1373            new_batch(&[], &[], &[], &[]),
1374            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1375        ];
1376        let batch = Batch::concat(batches).unwrap();
1377        let expect = new_batch(
1378            &[1, 2, 3, 4, 5, 6],
1379            &[11, 12, 13, 14, 15, 16],
1380            &[
1381                OpType::Put,
1382                OpType::Put,
1383                OpType::Put,
1384                OpType::Delete,
1385                OpType::Put,
1386                OpType::Put,
1387            ],
1388            &[21, 22, 23, 24, 25, 26],
1389        );
1390        assert_eq!(expect, batch);
1391    }
1392
1393    #[test]
1394    fn test_concat_different() {
1395        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1396        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1397        batch2.primary_key = b"hello".to_vec();
1398        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1399        assert!(
1400            matches!(err, Error::InvalidBatch { .. }),
1401            "unexpected err: {err}"
1402        );
1403    }
1404
1405    #[test]
1406    fn test_concat_different_fields() {
1407        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1408        let fields = vec![
1409            batch1.fields()[0].clone(),
1410            BatchColumn {
1411                column_id: 2,
1412                data: Arc::new(UInt64Vector::from_slice([2])),
1413            },
1414        ];
1415        // Batch 2 has more fields.
1416        let batch2 = batch1.clone().with_fields(fields).unwrap();
1417        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1418        assert!(
1419            matches!(err, Error::InvalidBatch { .. }),
1420            "unexpected err: {err}"
1421        );
1422
1423        // Batch 2 has different field.
1424        let fields = vec![BatchColumn {
1425            column_id: 2,
1426            data: Arc::new(UInt64Vector::from_slice([2])),
1427        }];
1428        let batch2 = batch1.clone().with_fields(fields).unwrap();
1429        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1430        assert!(
1431            matches!(err, Error::InvalidBatch { .. }),
1432            "unexpected err: {err}"
1433        );
1434    }
1435
1436    #[test]
1437    fn test_filter_deleted_empty() {
1438        let mut batch = new_batch(&[], &[], &[], &[]);
1439        batch.filter_deleted().unwrap();
1440        assert!(batch.is_empty());
1441    }
1442
1443    #[test]
1444    fn test_filter_deleted() {
1445        let mut batch = new_batch(
1446            &[1, 2, 3, 4],
1447            &[11, 12, 13, 14],
1448            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1449            &[21, 22, 23, 24],
1450        );
1451        batch.filter_deleted().unwrap();
1452        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1453        assert_eq!(expect, batch);
1454
1455        let mut batch = new_batch(
1456            &[1, 2, 3, 4],
1457            &[11, 12, 13, 14],
1458            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1459            &[21, 22, 23, 24],
1460        );
1461        let expect = batch.clone();
1462        batch.filter_deleted().unwrap();
1463        assert_eq!(expect, batch);
1464    }
1465
1466    #[test]
1467    fn test_filter_by_sequence() {
1468        // Filters put only.
1469        let mut batch = new_batch(
1470            &[1, 2, 3, 4],
1471            &[11, 12, 13, 14],
1472            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1473            &[21, 22, 23, 24],
1474        );
1475        batch
1476            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1477            .unwrap();
1478        let expect = new_batch(
1479            &[1, 2, 3],
1480            &[11, 12, 13],
1481            &[OpType::Put, OpType::Put, OpType::Put],
1482            &[21, 22, 23],
1483        );
1484        assert_eq!(expect, batch);
1485
1486        // Filters to empty.
1487        let mut batch = new_batch(
1488            &[1, 2, 3, 4],
1489            &[11, 12, 13, 14],
1490            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1491            &[21, 22, 23, 24],
1492        );
1493
1494        batch
1495            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1496            .unwrap();
1497        assert!(batch.is_empty());
1498
1499        // None filter.
1500        let mut batch = new_batch(
1501            &[1, 2, 3, 4],
1502            &[11, 12, 13, 14],
1503            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1504            &[21, 22, 23, 24],
1505        );
1506        let expect = batch.clone();
1507        batch.filter_by_sequence(None).unwrap();
1508        assert_eq!(expect, batch);
1509
1510        // Filter a empty batch
1511        let mut batch = new_batch(&[], &[], &[], &[]);
1512        batch
1513            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1514            .unwrap();
1515        assert!(batch.is_empty());
1516
1517        // Filter a empty batch with None
1518        let mut batch = new_batch(&[], &[], &[], &[]);
1519        batch.filter_by_sequence(None).unwrap();
1520        assert!(batch.is_empty());
1521
1522        // Test From variant - exclusive lower bound
1523        let mut batch = new_batch(
1524            &[1, 2, 3, 4],
1525            &[11, 12, 13, 14],
1526            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1527            &[21, 22, 23, 24],
1528        );
1529        batch
1530            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1531            .unwrap();
1532        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1533        assert_eq!(expect, batch);
1534
1535        // Test From variant with no matches
1536        let mut batch = new_batch(
1537            &[1, 2, 3, 4],
1538            &[11, 12, 13, 14],
1539            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1540            &[21, 22, 23, 24],
1541        );
1542        batch
1543            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1544            .unwrap();
1545        assert!(batch.is_empty());
1546
1547        // Test Range variant - exclusive lower bound, inclusive upper bound
1548        let mut batch = new_batch(
1549            &[1, 2, 3, 4, 5],
1550            &[11, 12, 13, 14, 15],
1551            &[
1552                OpType::Put,
1553                OpType::Put,
1554                OpType::Put,
1555                OpType::Put,
1556                OpType::Put,
1557            ],
1558            &[21, 22, 23, 24, 25],
1559        );
1560        batch
1561            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1562            .unwrap();
1563        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1564        assert_eq!(expect, batch);
1565
1566        // Test Range variant with mixed operations
1567        let mut batch = new_batch(
1568            &[1, 2, 3, 4, 5],
1569            &[11, 12, 13, 14, 15],
1570            &[
1571                OpType::Put,
1572                OpType::Delete,
1573                OpType::Put,
1574                OpType::Delete,
1575                OpType::Put,
1576            ],
1577            &[21, 22, 23, 24, 25],
1578        );
1579        batch
1580            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1581            .unwrap();
1582        let expect = new_batch(
1583            &[2, 3],
1584            &[12, 13],
1585            &[OpType::Delete, OpType::Put],
1586            &[22, 23],
1587        );
1588        assert_eq!(expect, batch);
1589
1590        // Test Range variant with no matches
1591        let mut batch = new_batch(
1592            &[1, 2, 3, 4],
1593            &[11, 12, 13, 14],
1594            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1595            &[21, 22, 23, 24],
1596        );
1597        batch
1598            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1599            .unwrap();
1600        assert!(batch.is_empty());
1601    }
1602
1603    #[test]
1604    fn test_merge_last_non_null_no_dup() {
1605        let mut batch = new_batch_with_u64_fields(
1606            &[1, 2],
1607            &[2, 1],
1608            &[OpType::Put, OpType::Put],
1609            &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1610        );
1611        let expect = batch.clone();
1612        batch.merge_last_non_null().unwrap();
1613        assert_eq!(expect, batch);
1614    }
1615
1616    #[test]
1617    fn test_merge_last_non_null_fill_null_fields() {
1618        // Rows are already sorted by timestamp asc and sequence desc.
1619        let mut batch = new_batch_with_u64_fields(
1620            &[1, 1, 1],
1621            &[3, 2, 1],
1622            &[OpType::Put, OpType::Put, OpType::Put],
1623            &[
1624                (1, &[None, Some(10), Some(11)]),
1625                (2, &[Some(100), Some(200), Some(300)]),
1626            ],
1627        );
1628        batch.merge_last_non_null().unwrap();
1629
1630        // Field 1 is filled from the first older row (seq=2). Field 2 keeps the base value.
1631        // Filled fields must not be overwritten by even older duplicates.
1632        let expect = new_batch_with_u64_fields(
1633            &[1],
1634            &[3],
1635            &[OpType::Put],
1636            &[(1, &[Some(10)]), (2, &[Some(100)])],
1637        );
1638        assert_eq!(expect, batch);
1639    }
1640
1641    #[test]
1642    fn test_merge_last_non_null_stop_at_delete_row() {
1643        // A delete row in older duplicates should stop filling to avoid resurrecting values before
1644        // deletion.
1645        let mut batch = new_batch_with_u64_fields(
1646            &[1, 1, 1],
1647            &[3, 2, 1],
1648            &[OpType::Put, OpType::Delete, OpType::Put],
1649            &[
1650                (1, &[None, Some(10), Some(11)]),
1651                (2, &[Some(100), Some(200), Some(300)]),
1652            ],
1653        );
1654        batch.merge_last_non_null().unwrap();
1655
1656        let expect = new_batch_with_u64_fields(
1657            &[1],
1658            &[3],
1659            &[OpType::Put],
1660            &[(1, &[None]), (2, &[Some(100)])],
1661        );
1662        assert_eq!(expect, batch);
1663    }
1664
1665    #[test]
1666    fn test_merge_last_non_null_base_delete_no_merge() {
1667        let mut batch = new_batch_with_u64_fields(
1668            &[1, 1],
1669            &[3, 2],
1670            &[OpType::Delete, OpType::Put],
1671            &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1672        );
1673        batch.merge_last_non_null().unwrap();
1674
1675        // Base row is delete, keep it as is and don't merge fields from older rows.
1676        let expect =
1677            new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1678        assert_eq!(expect, batch);
1679    }
1680
1681    #[test]
1682    fn test_merge_last_non_null_multiple_timestamp_groups() {
1683        let mut batch = new_batch_with_u64_fields(
1684            &[1, 1, 2, 3, 3],
1685            &[5, 4, 3, 2, 1],
1686            &[
1687                OpType::Put,
1688                OpType::Put,
1689                OpType::Put,
1690                OpType::Put,
1691                OpType::Put,
1692            ],
1693            &[
1694                (1, &[None, Some(10), Some(20), None, Some(30)]),
1695                (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1696            ],
1697        );
1698        batch.merge_last_non_null().unwrap();
1699
1700        let expect = new_batch_with_u64_fields(
1701            &[1, 2, 3],
1702            &[5, 3, 2],
1703            &[OpType::Put, OpType::Put, OpType::Put],
1704            &[
1705                (1, &[Some(10), Some(20), Some(30)]),
1706                (2, &[Some(100), Some(120), Some(130)]),
1707            ],
1708        );
1709        assert_eq!(expect, batch);
1710    }
1711
1712    #[test]
1713    fn test_merge_last_non_null_no_fields() {
1714        let mut batch = new_batch_without_fields(
1715            &[1, 1, 2],
1716            &[3, 2, 1],
1717            &[OpType::Put, OpType::Put, OpType::Put],
1718        );
1719        batch.merge_last_non_null().unwrap();
1720
1721        let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1722        assert_eq!(expect, batch);
1723    }
1724
1725    #[test]
1726    fn test_filter() {
1727        // Filters put only.
1728        let mut batch = new_batch(
1729            &[1, 2, 3, 4],
1730            &[11, 12, 13, 14],
1731            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1732            &[21, 22, 23, 24],
1733        );
1734        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1735        batch.filter(&predicate).unwrap();
1736        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1737        assert_eq!(expect, batch);
1738
1739        // Filters deletion.
1740        let mut batch = new_batch(
1741            &[1, 2, 3, 4],
1742            &[11, 12, 13, 14],
1743            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1744            &[21, 22, 23, 24],
1745        );
1746        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1747        batch.filter(&predicate).unwrap();
1748        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1749        assert_eq!(expect, batch);
1750
1751        // Filters to empty.
1752        let predicate = BooleanVector::from_vec(vec![false, false]);
1753        batch.filter(&predicate).unwrap();
1754        assert!(batch.is_empty());
1755    }
1756
1757    #[test]
1758    fn test_sort_and_dedup() {
1759        let original = new_batch(
1760            &[2, 3, 1, 4, 5, 2],
1761            &[1, 2, 3, 4, 5, 6],
1762            &[
1763                OpType::Put,
1764                OpType::Put,
1765                OpType::Put,
1766                OpType::Put,
1767                OpType::Put,
1768                OpType::Put,
1769            ],
1770            &[21, 22, 23, 24, 25, 26],
1771        );
1772
1773        let mut batch = original.clone();
1774        batch.sort(true).unwrap();
1775        // It should only keep one timestamp 2.
1776        assert_eq!(
1777            new_batch(
1778                &[1, 2, 3, 4, 5],
1779                &[3, 6, 2, 4, 5],
1780                &[
1781                    OpType::Put,
1782                    OpType::Put,
1783                    OpType::Put,
1784                    OpType::Put,
1785                    OpType::Put,
1786                ],
1787                &[23, 26, 22, 24, 25],
1788            ),
1789            batch
1790        );
1791
1792        let mut batch = original.clone();
1793        batch.sort(false).unwrap();
1794
1795        // It should only keep one timestamp 2.
1796        assert_eq!(
1797            new_batch(
1798                &[1, 2, 2, 3, 4, 5],
1799                &[3, 6, 1, 2, 4, 5],
1800                &[
1801                    OpType::Put,
1802                    OpType::Put,
1803                    OpType::Put,
1804                    OpType::Put,
1805                    OpType::Put,
1806                    OpType::Put,
1807                ],
1808                &[23, 26, 21, 22, 24, 25],
1809            ),
1810            batch
1811        );
1812
1813        let original = new_batch(
1814            &[2, 2, 1],
1815            &[1, 6, 1],
1816            &[OpType::Delete, OpType::Put, OpType::Put],
1817            &[21, 22, 23],
1818        );
1819
1820        let mut batch = original.clone();
1821        batch.sort(true).unwrap();
1822        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1823        assert_eq!(expect, batch);
1824
1825        let mut batch = original.clone();
1826        batch.sort(false).unwrap();
1827        let expect = new_batch(
1828            &[1, 2, 2],
1829            &[1, 6, 1],
1830            &[OpType::Put, OpType::Put, OpType::Delete],
1831            &[23, 22, 21],
1832        );
1833        assert_eq!(expect, batch);
1834    }
1835
1836    #[test]
1837    fn test_get_value() {
1838        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1839
1840        for encoding in encodings {
1841            let codec = build_primary_key_codec_with_fields(
1842                encoding,
1843                [
1844                    (
1845                        ReservedColumnId::table_id(),
1846                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1847                    ),
1848                    (
1849                        ReservedColumnId::tsid(),
1850                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1851                    ),
1852                    (
1853                        100,
1854                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1855                    ),
1856                    (
1857                        200,
1858                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1859                    ),
1860                ]
1861                .into_iter(),
1862            );
1863
1864            let values = [
1865                Value::UInt32(1000),
1866                Value::UInt64(2000),
1867                Value::String("abcdefgh".into()),
1868                Value::String("zyxwvu".into()),
1869            ];
1870            let mut buf = vec![];
1871            codec
1872                .encode_values(
1873                    &[
1874                        (ReservedColumnId::table_id(), values[0].clone()),
1875                        (ReservedColumnId::tsid(), values[1].clone()),
1876                        (100, values[2].clone()),
1877                        (200, values[3].clone()),
1878                    ],
1879                    &mut buf,
1880                )
1881                .unwrap();
1882
1883            let field_col_id = 2;
1884            let mut batch = new_batch_builder(
1885                &buf,
1886                &[1, 2, 3],
1887                &[1, 1, 1],
1888                &[OpType::Put, OpType::Put, OpType::Put],
1889                field_col_id,
1890                &[42, 43, 44],
1891            )
1892            .build()
1893            .unwrap();
1894
1895            let v = batch
1896                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1897                .unwrap()
1898                .unwrap();
1899            assert_eq!(values[0], *v);
1900
1901            let v = batch
1902                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1903                .unwrap()
1904                .unwrap();
1905            assert_eq!(values[1], *v);
1906
1907            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1908            assert_eq!(values[2], *v);
1909
1910            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1911            assert_eq!(values[3], *v);
1912
1913            let v = batch.field_col_value(field_col_id).unwrap();
1914            assert_eq!(v.data.get(0), Value::UInt64(42));
1915            assert_eq!(v.data.get(1), Value::UInt64(43));
1916            assert_eq!(v.data.get(2), Value::UInt64(44));
1917        }
1918    }
1919}