mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55    UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
56    VectorRef,
57};
58use futures::TryStreamExt;
59use futures::stream::BoxStream;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{OptionExt, ResultExt, ensure};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
64
65use crate::error::{
66    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67    Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72/// Storage internal representation of a batch of rows for a primary key (time series).
73///
74/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
75/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
76#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78    /// Primary key encoded in a comparable form.
79    primary_key: Vec<u8>,
80    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
81    pk_values: Option<CompositeValues>,
82    /// Timestamps of rows, should be sorted and not null.
83    timestamps: VectorRef,
84    /// Sequences of rows
85    ///
86    /// UInt64 type, not null.
87    sequences: Arc<UInt64Vector>,
88    /// Op types of rows
89    ///
90    /// UInt8 type, not null.
91    op_types: Arc<UInt8Vector>,
92    /// Fields organized in columnar format.
93    fields: Vec<BatchColumn>,
94    /// Cache for field index lookup.
95    fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99    /// Creates a new batch.
100    pub fn new(
101        primary_key: Vec<u8>,
102        timestamps: VectorRef,
103        sequences: Arc<UInt64Vector>,
104        op_types: Arc<UInt8Vector>,
105        fields: Vec<BatchColumn>,
106    ) -> Result<Batch> {
107        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108            .with_fields(fields)
109            .build()
110    }
111
112    /// Tries to set fields for the batch.
113    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114        Batch::new(
115            self.primary_key,
116            self.timestamps,
117            self.sequences,
118            self.op_types,
119            fields,
120        )
121    }
122
123    /// Returns primary key of the batch.
124    pub fn primary_key(&self) -> &[u8] {
125        &self.primary_key
126    }
127
128    /// Returns possibly decoded primary-key values.
129    pub fn pk_values(&self) -> Option<&CompositeValues> {
130        self.pk_values.as_ref()
131    }
132
133    /// Sets possibly decoded primary-key values.
134    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135        self.pk_values = Some(pk_values);
136    }
137
138    /// Removes possibly decoded primary-key values. For testing only.
139    #[cfg(any(test, feature = "test"))]
140    pub fn remove_pk_values(&mut self) {
141        self.pk_values = None;
142    }
143
144    /// Returns fields in the batch.
145    pub fn fields(&self) -> &[BatchColumn] {
146        &self.fields
147    }
148
149    /// Returns timestamps of the batch.
150    pub fn timestamps(&self) -> &VectorRef {
151        &self.timestamps
152    }
153
154    /// Returns sequences of the batch.
155    pub fn sequences(&self) -> &Arc<UInt64Vector> {
156        &self.sequences
157    }
158
159    /// Returns op types of the batch.
160    pub fn op_types(&self) -> &Arc<UInt8Vector> {
161        &self.op_types
162    }
163
164    /// Returns the number of rows in the batch.
165    pub fn num_rows(&self) -> usize {
166        // All vectors have the same length. We use the length of sequences vector
167        // since it has static type.
168        self.sequences.len()
169    }
170
171    /// Create an empty [`Batch`].
172    pub(crate) fn empty() -> Self {
173        Self {
174            primary_key: vec![],
175            pk_values: None,
176            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179            fields: vec![],
180            fields_idx: None,
181        }
182    }
183
184    /// Returns true if the number of rows in the batch is 0.
185    pub fn is_empty(&self) -> bool {
186        self.num_rows() == 0
187    }
188
189    /// Returns the first timestamp in the batch or `None` if the batch is empty.
190    pub fn first_timestamp(&self) -> Option<Timestamp> {
191        if self.timestamps.is_empty() {
192            return None;
193        }
194
195        Some(self.get_timestamp(0))
196    }
197
198    /// Returns the last timestamp in the batch or `None` if the batch is empty.
199    pub fn last_timestamp(&self) -> Option<Timestamp> {
200        if self.timestamps.is_empty() {
201            return None;
202        }
203
204        Some(self.get_timestamp(self.timestamps.len() - 1))
205    }
206
207    /// Returns the first sequence in the batch or `None` if the batch is empty.
208    pub fn first_sequence(&self) -> Option<SequenceNumber> {
209        if self.sequences.is_empty() {
210            return None;
211        }
212
213        Some(self.get_sequence(0))
214    }
215
216    /// Returns the last sequence in the batch or `None` if the batch is empty.
217    pub fn last_sequence(&self) -> Option<SequenceNumber> {
218        if self.sequences.is_empty() {
219            return None;
220        }
221
222        Some(self.get_sequence(self.sequences.len() - 1))
223    }
224
225    /// Replaces the primary key of the batch.
226    ///
227    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
228    /// Be sure to update that field as well.
229    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230        self.primary_key = primary_key;
231    }
232
233    /// Slice the batch, returning a new batch.
234    ///
235    /// # Panics
236    /// Panics if `offset + length > self.num_rows()`.
237    pub fn slice(&self, offset: usize, length: usize) -> Batch {
238        let fields = self
239            .fields
240            .iter()
241            .map(|column| BatchColumn {
242                column_id: column.column_id,
243                data: column.data.slice(offset, length),
244            })
245            .collect();
246        // We skip using the builder to avoid validating the batch again.
247        Batch {
248            // Now we need to clone the primary key. We could try `Bytes` if
249            // this becomes a bottleneck.
250            primary_key: self.primary_key.clone(),
251            pk_values: self.pk_values.clone(),
252            timestamps: self.timestamps.slice(offset, length),
253            sequences: Arc::new(self.sequences.get_slice(offset, length)),
254            op_types: Arc::new(self.op_types.get_slice(offset, length)),
255            fields,
256            fields_idx: self.fields_idx.clone(),
257        }
258    }
259
260    /// Takes `batches` and concat them into one batch.
261    ///
262    /// All `batches` must have the same primary key.
263    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264        ensure!(
265            !batches.is_empty(),
266            InvalidBatchSnafu {
267                reason: "empty batches",
268            }
269        );
270        if batches.len() == 1 {
271            // Now we own the `batches` so we could pop it directly.
272            return Ok(batches.pop().unwrap());
273        }
274
275        let primary_key = std::mem::take(&mut batches[0].primary_key);
276        let first = &batches[0];
277        // We took the primary key from the first batch so we don't use `first.primary_key()`.
278        ensure!(
279            batches
280                .iter()
281                .skip(1)
282                .all(|b| b.primary_key() == primary_key),
283            InvalidBatchSnafu {
284                reason: "batches have different primary key",
285            }
286        );
287        for b in batches.iter().skip(1) {
288            ensure!(
289                b.fields.len() == first.fields.len(),
290                InvalidBatchSnafu {
291                    reason: "batches have different field num",
292                }
293            );
294            for (l, r) in b.fields.iter().zip(&first.fields) {
295                ensure!(
296                    l.column_id == r.column_id,
297                    InvalidBatchSnafu {
298                        reason: "batches have different fields",
299                    }
300                );
301            }
302        }
303
304        // We take the primary key from the first batch.
305        let mut builder = BatchBuilder::new(primary_key);
306        // Concat timestamps, sequences, op_types, fields.
307        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308        builder.timestamps_array(array)?;
309        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310        builder.sequences_array(array)?;
311        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312        builder.op_types_array(array)?;
313        for (i, batch_column) in first.fields.iter().enumerate() {
314            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315            builder.push_field_array(batch_column.column_id, array)?;
316        }
317
318        builder.build()
319    }
320
321    /// Removes rows whose op type is delete.
322    pub fn filter_deleted(&mut self) -> Result<()> {
323        // Safety: op type column is not null.
324        let array = self.op_types.as_arrow();
325        // Find rows with non-delete op type.
326        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327        let predicate =
328            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329        self.filter(&BooleanVector::from(predicate))
330    }
331
332    // Applies the `predicate` to the batch.
333    // Safety: We know the array type so we unwrap on casting.
334    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335        self.timestamps = self
336            .timestamps
337            .filter(predicate)
338            .context(ComputeVectorSnafu)?;
339        self.sequences = Arc::new(
340            UInt64Vector::try_from_arrow_array(
341                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342                    .context(ComputeArrowSnafu)?,
343            )
344            .unwrap(),
345        );
346        self.op_types = Arc::new(
347            UInt8Vector::try_from_arrow_array(
348                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349                    .context(ComputeArrowSnafu)?,
350            )
351            .unwrap(),
352        );
353        for batch_column in &mut self.fields {
354            batch_column.data = batch_column
355                .data
356                .filter(predicate)
357                .context(ComputeVectorSnafu)?;
358        }
359
360        Ok(())
361    }
362
363    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
364    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
365        let seq_range = match sequence {
366            None => return Ok(()),
367            Some(seq_range) => {
368                let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
369                else {
370                    return Ok(());
371                };
372                let is_subset = match seq_range {
373                    SequenceRange::Gt { min } => min < first,
374                    SequenceRange::LtEq { max } => max >= last,
375                    SequenceRange::GtLtEq { min, max } => min < first && max >= last,
376                };
377                if is_subset {
378                    return Ok(());
379                }
380                seq_range
381            }
382        };
383
384        let seqs = self.sequences.as_arrow();
385        let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
386
387        let predicate = BooleanVector::from(predicate);
388        self.filter(&predicate)?;
389
390        Ok(())
391    }
392
393    /// Sorts rows in the batch. If `dedup` is true, it also removes
394    /// duplicated rows according to primary keys.
395    ///
396    /// It orders rows by timestamp, sequence desc and only keep the latest
397    /// row for the same timestamp. It doesn't consider op type as sequence
398    /// should already provide uniqueness for a row.
399    pub fn sort(&mut self, dedup: bool) -> Result<()> {
400        // If building a converter each time is costly, we may allow passing a
401        // converter.
402        let converter = RowConverter::new(vec![
403            SortField::new(self.timestamps.data_type().as_arrow_type()),
404            SortField::new_with_options(
405                self.sequences.data_type().as_arrow_type(),
406                SortOptions {
407                    descending: true,
408                    ..Default::default()
409                },
410            ),
411        ])
412        .context(ComputeArrowSnafu)?;
413        // Columns to sort.
414        let columns = [
415            self.timestamps.to_arrow_array(),
416            self.sequences.to_arrow_array(),
417        ];
418        let rows = converter.convert_columns(&columns).unwrap();
419        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
420
421        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
422        if !was_sorted {
423            to_sort.sort_unstable_by_key(|x| x.1);
424        }
425
426        let num_rows = to_sort.len();
427        if dedup {
428            // Dedup by timestamps.
429            to_sort.dedup_by(|left, right| {
430                debug_assert_eq!(18, left.1.as_ref().len());
431                debug_assert_eq!(18, right.1.as_ref().len());
432                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
433                // We only compare the timestamp part and ignore sequence.
434                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
435            });
436        }
437        let no_dedup = to_sort.len() == num_rows;
438
439        if was_sorted && no_dedup {
440            return Ok(());
441        }
442        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
443        self.take_in_place(&indices)
444    }
445
446    /// Returns the estimated memory size of the batch.
447    pub fn memory_size(&self) -> usize {
448        let mut size = std::mem::size_of::<Self>();
449        size += self.primary_key.len();
450        size += self.timestamps.memory_size();
451        size += self.sequences.memory_size();
452        size += self.op_types.memory_size();
453        for batch_column in &self.fields {
454            size += batch_column.data.memory_size();
455        }
456        size
457    }
458
459    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
460    pub(crate) fn projected_fields(
461        metadata: &RegionMetadata,
462        projection: &[ColumnId],
463    ) -> Vec<(ColumnId, ConcreteDataType)> {
464        let projected_ids: HashSet<_> = projection.iter().copied().collect();
465        metadata
466            .field_columns()
467            .filter_map(|column| {
468                if projected_ids.contains(&column.column_id) {
469                    Some((column.column_id, column.column_schema.data_type.clone()))
470                } else {
471                    None
472                }
473            })
474            .collect()
475    }
476
477    /// Returns timestamps in a native slice or `None` if the batch is empty.
478    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
479        if self.timestamps.is_empty() {
480            return None;
481        }
482
483        let values = match self.timestamps.data_type() {
484            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
485                .timestamps
486                .as_any()
487                .downcast_ref::<TimestampSecondVector>()
488                .unwrap()
489                .as_arrow()
490                .values(),
491            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
492                .timestamps
493                .as_any()
494                .downcast_ref::<TimestampMillisecondVector>()
495                .unwrap()
496                .as_arrow()
497                .values(),
498            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
499                .timestamps
500                .as_any()
501                .downcast_ref::<TimestampMicrosecondVector>()
502                .unwrap()
503                .as_arrow()
504                .values(),
505            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
506                .timestamps
507                .as_any()
508                .downcast_ref::<TimestampNanosecondVector>()
509                .unwrap()
510                .as_arrow()
511                .values(),
512            other => panic!("timestamps in a Batch has other type {:?}", other),
513        };
514
515        Some(values)
516    }
517
518    /// Takes the batch in place.
519    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
520        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
521        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
522            .context(ComputeArrowSnafu)?;
523        // Safety: we know the array and vector type.
524        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
525        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
526            .context(ComputeArrowSnafu)?;
527        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
528        for batch_column in &mut self.fields {
529            batch_column.data = batch_column
530                .data
531                .take(indices)
532                .context(ComputeVectorSnafu)?;
533        }
534
535        Ok(())
536    }
537
538    /// Gets a timestamp at given `index`.
539    ///
540    /// # Panics
541    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
542    fn get_timestamp(&self, index: usize) -> Timestamp {
543        match self.timestamps.get_ref(index) {
544            ValueRef::Timestamp(timestamp) => timestamp,
545
546            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
547            value => panic!("{:?} is not a timestamp", value),
548        }
549    }
550
551    /// Gets a sequence at given `index`.
552    ///
553    /// # Panics
554    /// Panics if `index` is out-of-bound or the sequence vector returns null.
555    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
556        // Safety: sequences is not null so it actually returns Some.
557        self.sequences.get_data(index).unwrap()
558    }
559
560    /// Checks the batch is monotonic by timestamps.
561    #[cfg(debug_assertions)]
562    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
563        use std::cmp::Ordering;
564        if self.timestamps_native().is_none() {
565            return Ok(());
566        }
567
568        let timestamps = self.timestamps_native().unwrap();
569        let sequences = self.sequences.as_arrow().values();
570        for (i, window) in timestamps.windows(2).enumerate() {
571            let current = window[0];
572            let next = window[1];
573            let current_sequence = sequences[i];
574            let next_sequence = sequences[i + 1];
575            match current.cmp(&next) {
576                Ordering::Less => {
577                    // The current timestamp is less than the next timestamp.
578                    continue;
579                }
580                Ordering::Equal => {
581                    // The current timestamp is equal to the next timestamp.
582                    if current_sequence < next_sequence {
583                        return Err(format!(
584                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
585                            current, next, current_sequence, next_sequence, i
586                        ));
587                    }
588                }
589                Ordering::Greater => {
590                    // The current timestamp is greater than the next timestamp.
591                    return Err(format!(
592                        "timestamps are not monotonic: {} > {}, index: {}",
593                        current, next, i
594                    ));
595                }
596            }
597        }
598
599        Ok(())
600    }
601
602    /// Returns Ok if the given batch is behind the current batch.
603    #[cfg(debug_assertions)]
604    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
605        // Checks the primary key
606        if self.primary_key() < other.primary_key() {
607            return Ok(());
608        }
609        if self.primary_key() > other.primary_key() {
610            return Err(format!(
611                "primary key is not monotonic: {:?} > {:?}",
612                self.primary_key(),
613                other.primary_key()
614            ));
615        }
616        // Checks the timestamp.
617        if self.last_timestamp() < other.first_timestamp() {
618            return Ok(());
619        }
620        if self.last_timestamp() > other.first_timestamp() {
621            return Err(format!(
622                "timestamps are not monotonic: {:?} > {:?}",
623                self.last_timestamp(),
624                other.first_timestamp()
625            ));
626        }
627        // Checks the sequence.
628        if self.last_sequence() >= other.first_sequence() {
629            return Ok(());
630        }
631        Err(format!(
632            "sequences are not monotonic: {:?} < {:?}",
633            self.last_sequence(),
634            other.first_sequence()
635        ))
636    }
637
638    /// Returns the value of the column in the primary key.
639    ///
640    /// Lazily decodes the primary key and caches the result.
641    pub fn pk_col_value(
642        &mut self,
643        codec: &dyn PrimaryKeyCodec,
644        col_idx_in_pk: usize,
645        column_id: ColumnId,
646    ) -> Result<Option<&Value>> {
647        if self.pk_values.is_none() {
648            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
649        }
650
651        let pk_values = self.pk_values.as_ref().unwrap();
652        Ok(match pk_values {
653            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
654            CompositeValues::Sparse(values) => values.get(&column_id),
655        })
656    }
657
658    /// Returns values of the field in the batch.
659    ///
660    /// Lazily caches the field index.
661    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
662        if self.fields_idx.is_none() {
663            self.fields_idx = Some(
664                self.fields
665                    .iter()
666                    .enumerate()
667                    .map(|(i, c)| (c.column_id, i))
668                    .collect(),
669            );
670        }
671
672        self.fields_idx
673            .as_ref()
674            .unwrap()
675            .get(&column_id)
676            .map(|&idx| &self.fields[idx])
677    }
678}
679
680/// A struct to check the batch is monotonic.
681#[cfg(debug_assertions)]
682#[derive(Default)]
683pub(crate) struct BatchChecker {
684    last_batch: Option<Batch>,
685    start: Option<Timestamp>,
686    end: Option<Timestamp>,
687}
688
689#[cfg(debug_assertions)]
690impl BatchChecker {
691    /// Attaches the given start timestamp to the checker.
692    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
693        self.start = start;
694        self
695    }
696
697    /// Attaches the given end timestamp to the checker.
698    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
699        self.end = end;
700        self
701    }
702
703    /// Returns true if the given batch is monotonic and behind
704    /// the last batch.
705    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
706        batch.check_monotonic()?;
707
708        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
709            && start > first
710        {
711            return Err(format!(
712                "batch's first timestamp is before the start timestamp: {:?} > {:?}",
713                start, first
714            ));
715        }
716        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
717            && end <= last
718        {
719            return Err(format!(
720                "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
721                end, last
722            ));
723        }
724
725        // Checks the batch is behind the last batch.
726        // Then Updates the last batch.
727        let res = self
728            .last_batch
729            .as_ref()
730            .map(|last| last.check_next_batch(batch))
731            .unwrap_or(Ok(()));
732        self.last_batch = Some(batch.clone());
733        res
734    }
735
736    /// Formats current batch and last batch for debug.
737    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
738        use std::fmt::Write;
739
740        let mut message = String::new();
741        if let Some(last) = &self.last_batch {
742            write!(
743                message,
744                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
745                last.primary_key(),
746                last.last_timestamp(),
747                last.last_sequence()
748            )
749            .unwrap();
750        }
751        write!(
752            message,
753            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
754            batch.primary_key(),
755            batch.timestamps(),
756            batch.sequences()
757        )
758        .unwrap();
759
760        message
761    }
762
763    /// Checks batches from the part range are monotonic. Otherwise, panics.
764    pub(crate) fn ensure_part_range_batch(
765        &mut self,
766        scanner: &str,
767        region_id: store_api::storage::RegionId,
768        partition: usize,
769        part_range: store_api::region_engine::PartitionRange,
770        batch: &Batch,
771    ) {
772        if let Err(e) = self.check_monotonic(batch) {
773            let err_msg = format!(
774                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
775                scanner, e, region_id, partition, part_range,
776            );
777            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
778            // Only print the number of row in the panic message.
779            panic!("{err_msg}, batch rows: {}", batch.num_rows());
780        }
781    }
782}
783
784/// Len of timestamp in arrow row format.
785const TIMESTAMP_KEY_LEN: usize = 9;
786
787/// Helper function to concat arrays from `iter`.
788fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
789    let arrays: Vec<_> = iter.collect();
790    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
791    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
792}
793
794/// A column in a [Batch].
795#[derive(Debug, PartialEq, Eq, Clone)]
796pub struct BatchColumn {
797    /// Id of the column.
798    pub column_id: ColumnId,
799    /// Data of the column.
800    pub data: VectorRef,
801}
802
803/// Builder to build [Batch].
804pub struct BatchBuilder {
805    primary_key: Vec<u8>,
806    timestamps: Option<VectorRef>,
807    sequences: Option<Arc<UInt64Vector>>,
808    op_types: Option<Arc<UInt8Vector>>,
809    fields: Vec<BatchColumn>,
810}
811
812impl BatchBuilder {
813    /// Creates a new [BatchBuilder] with primary key.
814    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
815        BatchBuilder {
816            primary_key,
817            timestamps: None,
818            sequences: None,
819            op_types: None,
820            fields: Vec::new(),
821        }
822    }
823
824    /// Creates a new [BatchBuilder] with all required columns.
825    pub fn with_required_columns(
826        primary_key: Vec<u8>,
827        timestamps: VectorRef,
828        sequences: Arc<UInt64Vector>,
829        op_types: Arc<UInt8Vector>,
830    ) -> BatchBuilder {
831        BatchBuilder {
832            primary_key,
833            timestamps: Some(timestamps),
834            sequences: Some(sequences),
835            op_types: Some(op_types),
836            fields: Vec::new(),
837        }
838    }
839
840    /// Set all field columns.
841    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
842        self.fields = fields;
843        self
844    }
845
846    /// Push a field column.
847    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
848        self.fields.push(column);
849        self
850    }
851
852    /// Push an array as a field.
853    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
854        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
855        self.fields.push(BatchColumn {
856            column_id,
857            data: vector,
858        });
859
860        Ok(self)
861    }
862
863    /// Try to set an array as timestamps.
864    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
865        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
866        ensure!(
867            vector.data_type().is_timestamp(),
868            InvalidBatchSnafu {
869                reason: format!("{:?} is not a timestamp type", vector.data_type()),
870            }
871        );
872
873        self.timestamps = Some(vector);
874        Ok(self)
875    }
876
877    /// Try to set an array as sequences.
878    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
879        ensure!(
880            *array.data_type() == arrow::datatypes::DataType::UInt64,
881            InvalidBatchSnafu {
882                reason: "sequence array is not UInt64 type",
883            }
884        );
885        // Safety: The cast must success as we have ensured it is uint64 type.
886        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
887        self.sequences = Some(vector);
888
889        Ok(self)
890    }
891
892    /// Try to set an array as op types.
893    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
894        ensure!(
895            *array.data_type() == arrow::datatypes::DataType::UInt8,
896            InvalidBatchSnafu {
897                reason: "sequence array is not UInt8 type",
898            }
899        );
900        // Safety: The cast must success as we have ensured it is uint64 type.
901        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
902        self.op_types = Some(vector);
903
904        Ok(self)
905    }
906
907    /// Builds the [Batch].
908    pub fn build(self) -> Result<Batch> {
909        let timestamps = self.timestamps.context(InvalidBatchSnafu {
910            reason: "missing timestamps",
911        })?;
912        let sequences = self.sequences.context(InvalidBatchSnafu {
913            reason: "missing sequences",
914        })?;
915        let op_types = self.op_types.context(InvalidBatchSnafu {
916            reason: "missing op_types",
917        })?;
918        // Our storage format ensure these columns are not nullable so
919        // we use assert here.
920        assert_eq!(0, timestamps.null_count());
921        assert_eq!(0, sequences.null_count());
922        assert_eq!(0, op_types.null_count());
923
924        let ts_len = timestamps.len();
925        ensure!(
926            sequences.len() == ts_len,
927            InvalidBatchSnafu {
928                reason: format!(
929                    "sequence have different len {} != {}",
930                    sequences.len(),
931                    ts_len
932                ),
933            }
934        );
935        ensure!(
936            op_types.len() == ts_len,
937            InvalidBatchSnafu {
938                reason: format!(
939                    "op type have different len {} != {}",
940                    op_types.len(),
941                    ts_len
942                ),
943            }
944        );
945        for column in &self.fields {
946            ensure!(
947                column.data.len() == ts_len,
948                InvalidBatchSnafu {
949                    reason: format!(
950                        "column {} has different len {} != {}",
951                        column.column_id,
952                        column.data.len(),
953                        ts_len
954                    ),
955                }
956            );
957        }
958
959        Ok(Batch {
960            primary_key: self.primary_key,
961            pk_values: None,
962            timestamps,
963            sequences,
964            op_types,
965            fields: self.fields,
966            fields_idx: None,
967        })
968    }
969}
970
971impl From<Batch> for BatchBuilder {
972    fn from(batch: Batch) -> Self {
973        Self {
974            primary_key: batch.primary_key,
975            timestamps: Some(batch.timestamps),
976            sequences: Some(batch.sequences),
977            op_types: Some(batch.op_types),
978            fields: batch.fields,
979        }
980    }
981}
982
983/// Async [Batch] reader and iterator wrapper.
984///
985/// This is the data source for SST writers or internal readers.
986pub enum Source {
987    /// Source from a [BoxedBatchReader].
988    Reader(BoxedBatchReader),
989    /// Source from a [BoxedBatchIterator].
990    Iter(BoxedBatchIterator),
991    /// Source from a [BoxedBatchStream].
992    Stream(BoxedBatchStream),
993    /// Source from a [PruneReader].
994    PruneReader(PruneReader),
995}
996
997impl Source {
998    /// Returns next [Batch] from this data source.
999    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1000        match self {
1001            Source::Reader(reader) => reader.next_batch().await,
1002            Source::Iter(iter) => iter.next().transpose(),
1003            Source::Stream(stream) => stream.try_next().await,
1004            Source::PruneReader(reader) => reader.next_batch().await,
1005        }
1006    }
1007}
1008
1009/// Async [RecordBatch] reader and iterator wrapper for flat format.
1010pub enum FlatSource {
1011    /// Source from a [BoxedRecordBatchIterator].
1012    Iter(BoxedRecordBatchIterator),
1013    /// Source from a [BoxedRecordBatchStream].
1014    Stream(BoxedRecordBatchStream),
1015}
1016
1017impl FlatSource {
1018    /// Returns next [RecordBatch] from this data source.
1019    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1020        match self {
1021            FlatSource::Iter(iter) => iter.next().transpose(),
1022            FlatSource::Stream(stream) => stream.try_next().await,
1023        }
1024    }
1025}
1026
1027/// Async batch reader.
1028///
1029/// The reader must guarantee [Batch]es returned by it have the same schema.
1030#[async_trait]
1031pub trait BatchReader: Send {
1032    /// Fetch next [Batch].
1033    ///
1034    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1035    /// again won't return batch again.
1036    ///
1037    /// If `Err` is returned, caller should not call this method again, the implementor
1038    /// may or may not panic in such case.
1039    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1040}
1041
1042/// Pointer to [BatchReader].
1043pub type BoxedBatchReader = Box<dyn BatchReader>;
1044
1045/// Pointer to a stream that yields [Batch].
1046pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1047
1048/// Pointer to a stream that yields [RecordBatch].
1049pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1050
1051#[async_trait::async_trait]
1052impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1053    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1054        (**self).next_batch().await
1055    }
1056}
1057
1058/// Local metrics for scanners.
1059#[derive(Debug, Default)]
1060pub(crate) struct ScannerMetrics {
1061    /// Duration to prepare the scan task.
1062    prepare_scan_cost: Duration,
1063    /// Duration to build the (merge) reader.
1064    build_reader_cost: Duration,
1065    /// Duration to scan data.
1066    scan_cost: Duration,
1067    /// Duration while waiting for `yield`.
1068    yield_cost: Duration,
1069    /// Number of batches returned.
1070    num_batches: usize,
1071    /// Number of rows returned.
1072    num_rows: usize,
1073    /// Number of mem ranges scanned.
1074    num_mem_ranges: usize,
1075    /// Number of file ranges scanned.
1076    num_file_ranges: usize,
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1082    use store_api::codec::PrimaryKeyEncoding;
1083    use store_api::storage::consts::ReservedColumnId;
1084
1085    use super::*;
1086    use crate::error::Error;
1087    use crate::test_util::new_batch_builder;
1088
1089    fn new_batch(
1090        timestamps: &[i64],
1091        sequences: &[u64],
1092        op_types: &[OpType],
1093        field: &[u64],
1094    ) -> Batch {
1095        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1096            .build()
1097            .unwrap()
1098    }
1099
1100    #[test]
1101    fn test_empty_batch() {
1102        let batch = Batch::empty();
1103        assert!(batch.is_empty());
1104        assert_eq!(None, batch.first_timestamp());
1105        assert_eq!(None, batch.last_timestamp());
1106        assert_eq!(None, batch.first_sequence());
1107        assert_eq!(None, batch.last_sequence());
1108        assert!(batch.timestamps_native().is_none());
1109    }
1110
1111    #[test]
1112    fn test_first_last_one() {
1113        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1114        assert_eq!(
1115            Timestamp::new_millisecond(1),
1116            batch.first_timestamp().unwrap()
1117        );
1118        assert_eq!(
1119            Timestamp::new_millisecond(1),
1120            batch.last_timestamp().unwrap()
1121        );
1122        assert_eq!(2, batch.first_sequence().unwrap());
1123        assert_eq!(2, batch.last_sequence().unwrap());
1124    }
1125
1126    #[test]
1127    fn test_first_last_multiple() {
1128        let batch = new_batch(
1129            &[1, 2, 3],
1130            &[11, 12, 13],
1131            &[OpType::Put, OpType::Put, OpType::Put],
1132            &[21, 22, 23],
1133        );
1134        assert_eq!(
1135            Timestamp::new_millisecond(1),
1136            batch.first_timestamp().unwrap()
1137        );
1138        assert_eq!(
1139            Timestamp::new_millisecond(3),
1140            batch.last_timestamp().unwrap()
1141        );
1142        assert_eq!(11, batch.first_sequence().unwrap());
1143        assert_eq!(13, batch.last_sequence().unwrap());
1144    }
1145
1146    #[test]
1147    fn test_slice() {
1148        let batch = new_batch(
1149            &[1, 2, 3, 4],
1150            &[11, 12, 13, 14],
1151            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1152            &[21, 22, 23, 24],
1153        );
1154        let batch = batch.slice(1, 2);
1155        let expect = new_batch(
1156            &[2, 3],
1157            &[12, 13],
1158            &[OpType::Delete, OpType::Put],
1159            &[22, 23],
1160        );
1161        assert_eq!(expect, batch);
1162    }
1163
1164    #[test]
1165    fn test_timestamps_native() {
1166        let batch = new_batch(
1167            &[1, 2, 3, 4],
1168            &[11, 12, 13, 14],
1169            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1170            &[21, 22, 23, 24],
1171        );
1172        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1173    }
1174
1175    #[test]
1176    fn test_concat_empty() {
1177        let err = Batch::concat(vec![]).unwrap_err();
1178        assert!(
1179            matches!(err, Error::InvalidBatch { .. }),
1180            "unexpected err: {err}"
1181        );
1182    }
1183
1184    #[test]
1185    fn test_concat_one() {
1186        let batch = new_batch(&[], &[], &[], &[]);
1187        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1188        assert_eq!(batch, actual);
1189
1190        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1191        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1192        assert_eq!(batch, actual);
1193    }
1194
1195    #[test]
1196    fn test_concat_multiple() {
1197        let batches = vec![
1198            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1199            new_batch(
1200                &[3, 4, 5],
1201                &[13, 14, 15],
1202                &[OpType::Put, OpType::Delete, OpType::Put],
1203                &[23, 24, 25],
1204            ),
1205            new_batch(&[], &[], &[], &[]),
1206            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1207        ];
1208        let batch = Batch::concat(batches).unwrap();
1209        let expect = new_batch(
1210            &[1, 2, 3, 4, 5, 6],
1211            &[11, 12, 13, 14, 15, 16],
1212            &[
1213                OpType::Put,
1214                OpType::Put,
1215                OpType::Put,
1216                OpType::Delete,
1217                OpType::Put,
1218                OpType::Put,
1219            ],
1220            &[21, 22, 23, 24, 25, 26],
1221        );
1222        assert_eq!(expect, batch);
1223    }
1224
1225    #[test]
1226    fn test_concat_different() {
1227        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1228        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1229        batch2.primary_key = b"hello".to_vec();
1230        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1231        assert!(
1232            matches!(err, Error::InvalidBatch { .. }),
1233            "unexpected err: {err}"
1234        );
1235    }
1236
1237    #[test]
1238    fn test_concat_different_fields() {
1239        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1240        let fields = vec![
1241            batch1.fields()[0].clone(),
1242            BatchColumn {
1243                column_id: 2,
1244                data: Arc::new(UInt64Vector::from_slice([2])),
1245            },
1246        ];
1247        // Batch 2 has more fields.
1248        let batch2 = batch1.clone().with_fields(fields).unwrap();
1249        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1250        assert!(
1251            matches!(err, Error::InvalidBatch { .. }),
1252            "unexpected err: {err}"
1253        );
1254
1255        // Batch 2 has different field.
1256        let fields = vec![BatchColumn {
1257            column_id: 2,
1258            data: Arc::new(UInt64Vector::from_slice([2])),
1259        }];
1260        let batch2 = batch1.clone().with_fields(fields).unwrap();
1261        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1262        assert!(
1263            matches!(err, Error::InvalidBatch { .. }),
1264            "unexpected err: {err}"
1265        );
1266    }
1267
1268    #[test]
1269    fn test_filter_deleted_empty() {
1270        let mut batch = new_batch(&[], &[], &[], &[]);
1271        batch.filter_deleted().unwrap();
1272        assert!(batch.is_empty());
1273    }
1274
1275    #[test]
1276    fn test_filter_deleted() {
1277        let mut batch = new_batch(
1278            &[1, 2, 3, 4],
1279            &[11, 12, 13, 14],
1280            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1281            &[21, 22, 23, 24],
1282        );
1283        batch.filter_deleted().unwrap();
1284        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1285        assert_eq!(expect, batch);
1286
1287        let mut batch = new_batch(
1288            &[1, 2, 3, 4],
1289            &[11, 12, 13, 14],
1290            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1291            &[21, 22, 23, 24],
1292        );
1293        let expect = batch.clone();
1294        batch.filter_deleted().unwrap();
1295        assert_eq!(expect, batch);
1296    }
1297
1298    #[test]
1299    fn test_filter_by_sequence() {
1300        // Filters put only.
1301        let mut batch = new_batch(
1302            &[1, 2, 3, 4],
1303            &[11, 12, 13, 14],
1304            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1305            &[21, 22, 23, 24],
1306        );
1307        batch
1308            .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1309            .unwrap();
1310        let expect = new_batch(
1311            &[1, 2, 3],
1312            &[11, 12, 13],
1313            &[OpType::Put, OpType::Put, OpType::Put],
1314            &[21, 22, 23],
1315        );
1316        assert_eq!(expect, batch);
1317
1318        // Filters to empty.
1319        let mut batch = new_batch(
1320            &[1, 2, 3, 4],
1321            &[11, 12, 13, 14],
1322            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1323            &[21, 22, 23, 24],
1324        );
1325
1326        batch
1327            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1328            .unwrap();
1329        assert!(batch.is_empty());
1330
1331        // None filter.
1332        let mut batch = new_batch(
1333            &[1, 2, 3, 4],
1334            &[11, 12, 13, 14],
1335            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1336            &[21, 22, 23, 24],
1337        );
1338        let expect = batch.clone();
1339        batch.filter_by_sequence(None).unwrap();
1340        assert_eq!(expect, batch);
1341
1342        // Filter a empty batch
1343        let mut batch = new_batch(&[], &[], &[], &[]);
1344        batch
1345            .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1346            .unwrap();
1347        assert!(batch.is_empty());
1348
1349        // Filter a empty batch with None
1350        let mut batch = new_batch(&[], &[], &[], &[]);
1351        batch.filter_by_sequence(None).unwrap();
1352        assert!(batch.is_empty());
1353
1354        // Test From variant - exclusive lower bound
1355        let mut batch = new_batch(
1356            &[1, 2, 3, 4],
1357            &[11, 12, 13, 14],
1358            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1359            &[21, 22, 23, 24],
1360        );
1361        batch
1362            .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1363            .unwrap();
1364        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1365        assert_eq!(expect, batch);
1366
1367        // Test From variant with no matches
1368        let mut batch = new_batch(
1369            &[1, 2, 3, 4],
1370            &[11, 12, 13, 14],
1371            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1372            &[21, 22, 23, 24],
1373        );
1374        batch
1375            .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1376            .unwrap();
1377        assert!(batch.is_empty());
1378
1379        // Test Range variant - exclusive lower bound, inclusive upper bound
1380        let mut batch = new_batch(
1381            &[1, 2, 3, 4, 5],
1382            &[11, 12, 13, 14, 15],
1383            &[
1384                OpType::Put,
1385                OpType::Put,
1386                OpType::Put,
1387                OpType::Put,
1388                OpType::Put,
1389            ],
1390            &[21, 22, 23, 24, 25],
1391        );
1392        batch
1393            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1394            .unwrap();
1395        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1396        assert_eq!(expect, batch);
1397
1398        // Test Range variant with mixed operations
1399        let mut batch = new_batch(
1400            &[1, 2, 3, 4, 5],
1401            &[11, 12, 13, 14, 15],
1402            &[
1403                OpType::Put,
1404                OpType::Delete,
1405                OpType::Put,
1406                OpType::Delete,
1407                OpType::Put,
1408            ],
1409            &[21, 22, 23, 24, 25],
1410        );
1411        batch
1412            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1413            .unwrap();
1414        let expect = new_batch(
1415            &[2, 3],
1416            &[12, 13],
1417            &[OpType::Delete, OpType::Put],
1418            &[22, 23],
1419        );
1420        assert_eq!(expect, batch);
1421
1422        // Test Range variant with no matches
1423        let mut batch = new_batch(
1424            &[1, 2, 3, 4],
1425            &[11, 12, 13, 14],
1426            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1427            &[21, 22, 23, 24],
1428        );
1429        batch
1430            .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1431            .unwrap();
1432        assert!(batch.is_empty());
1433    }
1434
1435    #[test]
1436    fn test_filter() {
1437        // Filters put only.
1438        let mut batch = new_batch(
1439            &[1, 2, 3, 4],
1440            &[11, 12, 13, 14],
1441            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1442            &[21, 22, 23, 24],
1443        );
1444        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1445        batch.filter(&predicate).unwrap();
1446        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1447        assert_eq!(expect, batch);
1448
1449        // Filters deletion.
1450        let mut batch = new_batch(
1451            &[1, 2, 3, 4],
1452            &[11, 12, 13, 14],
1453            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1454            &[21, 22, 23, 24],
1455        );
1456        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1457        batch.filter(&predicate).unwrap();
1458        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1459        assert_eq!(expect, batch);
1460
1461        // Filters to empty.
1462        let predicate = BooleanVector::from_vec(vec![false, false]);
1463        batch.filter(&predicate).unwrap();
1464        assert!(batch.is_empty());
1465    }
1466
1467    #[test]
1468    fn test_sort_and_dedup() {
1469        let original = new_batch(
1470            &[2, 3, 1, 4, 5, 2],
1471            &[1, 2, 3, 4, 5, 6],
1472            &[
1473                OpType::Put,
1474                OpType::Put,
1475                OpType::Put,
1476                OpType::Put,
1477                OpType::Put,
1478                OpType::Put,
1479            ],
1480            &[21, 22, 23, 24, 25, 26],
1481        );
1482
1483        let mut batch = original.clone();
1484        batch.sort(true).unwrap();
1485        // It should only keep one timestamp 2.
1486        assert_eq!(
1487            new_batch(
1488                &[1, 2, 3, 4, 5],
1489                &[3, 6, 2, 4, 5],
1490                &[
1491                    OpType::Put,
1492                    OpType::Put,
1493                    OpType::Put,
1494                    OpType::Put,
1495                    OpType::Put,
1496                ],
1497                &[23, 26, 22, 24, 25],
1498            ),
1499            batch
1500        );
1501
1502        let mut batch = original.clone();
1503        batch.sort(false).unwrap();
1504
1505        // It should only keep one timestamp 2.
1506        assert_eq!(
1507            new_batch(
1508                &[1, 2, 2, 3, 4, 5],
1509                &[3, 6, 1, 2, 4, 5],
1510                &[
1511                    OpType::Put,
1512                    OpType::Put,
1513                    OpType::Put,
1514                    OpType::Put,
1515                    OpType::Put,
1516                    OpType::Put,
1517                ],
1518                &[23, 26, 21, 22, 24, 25],
1519            ),
1520            batch
1521        );
1522
1523        let original = new_batch(
1524            &[2, 2, 1],
1525            &[1, 6, 1],
1526            &[OpType::Delete, OpType::Put, OpType::Put],
1527            &[21, 22, 23],
1528        );
1529
1530        let mut batch = original.clone();
1531        batch.sort(true).unwrap();
1532        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1533        assert_eq!(expect, batch);
1534
1535        let mut batch = original.clone();
1536        batch.sort(false).unwrap();
1537        let expect = new_batch(
1538            &[1, 2, 2],
1539            &[1, 6, 1],
1540            &[OpType::Put, OpType::Put, OpType::Delete],
1541            &[23, 22, 21],
1542        );
1543        assert_eq!(expect, batch);
1544    }
1545
1546    #[test]
1547    fn test_get_value() {
1548        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1549
1550        for encoding in encodings {
1551            let codec = build_primary_key_codec_with_fields(
1552                encoding,
1553                [
1554                    (
1555                        ReservedColumnId::table_id(),
1556                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1557                    ),
1558                    (
1559                        ReservedColumnId::tsid(),
1560                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1561                    ),
1562                    (
1563                        100,
1564                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1565                    ),
1566                    (
1567                        200,
1568                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1569                    ),
1570                ]
1571                .into_iter(),
1572            );
1573
1574            let values = [
1575                Value::UInt32(1000),
1576                Value::UInt64(2000),
1577                Value::String("abcdefgh".into()),
1578                Value::String("zyxwvu".into()),
1579            ];
1580            let mut buf = vec![];
1581            codec
1582                .encode_values(
1583                    &[
1584                        (ReservedColumnId::table_id(), values[0].clone()),
1585                        (ReservedColumnId::tsid(), values[1].clone()),
1586                        (100, values[2].clone()),
1587                        (200, values[3].clone()),
1588                    ],
1589                    &mut buf,
1590                )
1591                .unwrap();
1592
1593            let field_col_id = 2;
1594            let mut batch = new_batch_builder(
1595                &buf,
1596                &[1, 2, 3],
1597                &[1, 1, 1],
1598                &[OpType::Put, OpType::Put, OpType::Put],
1599                field_col_id,
1600                &[42, 43, 44],
1601            )
1602            .build()
1603            .unwrap();
1604
1605            let v = batch
1606                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1607                .unwrap()
1608                .unwrap();
1609            assert_eq!(values[0], *v);
1610
1611            let v = batch
1612                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1613                .unwrap()
1614                .unwrap();
1615            assert_eq!(values[1], *v);
1616
1617            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1618            assert_eq!(values[2], *v);
1619
1620            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1621            assert_eq!(values[3], *v);
1622
1623            let v = batch.field_col_value(field_col_id).unwrap();
1624            assert_eq!(v.data.get(0), Value::UInt64(42));
1625            assert_eq!(v.data.get(1), Value::UInt64(43));
1626            assert_eq!(v.data.get(2), Value::UInt64(44));
1627        }
1628    }
1629}