mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub mod range;
25pub mod scan_region;
26pub mod scan_util;
27pub(crate) mod seq_scan;
28pub mod series_scan;
29pub mod stream;
30pub(crate) mod unordered_scan;
31
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::Duration;
35
36use api::v1::OpType;
37use async_trait::async_trait;
38use common_time::Timestamp;
39use datafusion_common::arrow::array::UInt8Array;
40use datatypes::arrow;
41use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
42use datatypes::arrow::compute::SortOptions;
43use datatypes::arrow::row::{RowConverter, SortField};
44use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
45use datatypes::scalars::ScalarVectorBuilder;
46use datatypes::types::TimestampType;
47use datatypes::value::{Value, ValueRef};
48use datatypes::vectors::{
49    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
50    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
51    UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector,
52    VectorRef,
53};
54use futures::stream::BoxStream;
55use futures::TryStreamExt;
56use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
57use snafu::{ensure, OptionExt, ResultExt};
58use store_api::metadata::RegionMetadata;
59use store_api::storage::{ColumnId, SequenceNumber};
60
61use crate::error::{
62    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
63    Result,
64};
65use crate::memtable::BoxedBatchIterator;
66use crate::read::prune::PruneReader;
67
68/// Storage internal representation of a batch of rows for a primary key (time series).
69///
70/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
71/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
72#[derive(Debug, PartialEq, Clone)]
73pub struct Batch {
74    /// Primary key encoded in a comparable form.
75    primary_key: Vec<u8>,
76    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
77    pk_values: Option<CompositeValues>,
78    /// Timestamps of rows, should be sorted and not null.
79    timestamps: VectorRef,
80    /// Sequences of rows
81    ///
82    /// UInt64 type, not null.
83    sequences: Arc<UInt64Vector>,
84    /// Op types of rows
85    ///
86    /// UInt8 type, not null.
87    op_types: Arc<UInt8Vector>,
88    /// Fields organized in columnar format.
89    fields: Vec<BatchColumn>,
90    /// Cache for field index lookup.
91    fields_idx: Option<HashMap<ColumnId, usize>>,
92}
93
94impl Batch {
95    /// Creates a new batch.
96    pub fn new(
97        primary_key: Vec<u8>,
98        timestamps: VectorRef,
99        sequences: Arc<UInt64Vector>,
100        op_types: Arc<UInt8Vector>,
101        fields: Vec<BatchColumn>,
102    ) -> Result<Batch> {
103        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
104            .with_fields(fields)
105            .build()
106    }
107
108    /// Tries to set fields for the batch.
109    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
110        Batch::new(
111            self.primary_key,
112            self.timestamps,
113            self.sequences,
114            self.op_types,
115            fields,
116        )
117    }
118
119    /// Returns primary key of the batch.
120    pub fn primary_key(&self) -> &[u8] {
121        &self.primary_key
122    }
123
124    /// Returns possibly decoded primary-key values.
125    pub fn pk_values(&self) -> Option<&CompositeValues> {
126        self.pk_values.as_ref()
127    }
128
129    /// Sets possibly decoded primary-key values.
130    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
131        self.pk_values = Some(pk_values);
132    }
133
134    /// Removes possibly decoded primary-key values. For testing only.
135    #[cfg(any(test, feature = "test"))]
136    pub fn remove_pk_values(&mut self) {
137        self.pk_values = None;
138    }
139
140    /// Returns fields in the batch.
141    pub fn fields(&self) -> &[BatchColumn] {
142        &self.fields
143    }
144
145    /// Returns timestamps of the batch.
146    pub fn timestamps(&self) -> &VectorRef {
147        &self.timestamps
148    }
149
150    /// Returns sequences of the batch.
151    pub fn sequences(&self) -> &Arc<UInt64Vector> {
152        &self.sequences
153    }
154
155    /// Returns op types of the batch.
156    pub fn op_types(&self) -> &Arc<UInt8Vector> {
157        &self.op_types
158    }
159
160    /// Returns the number of rows in the batch.
161    pub fn num_rows(&self) -> usize {
162        // All vectors have the same length. We use the length of sequences vector
163        // since it has static type.
164        self.sequences.len()
165    }
166
167    /// Create an empty [`Batch`].
168    pub(crate) fn empty() -> Self {
169        Self {
170            primary_key: vec![],
171            pk_values: None,
172            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
173            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
174            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
175            fields: vec![],
176            fields_idx: None,
177        }
178    }
179
180    /// Returns true if the number of rows in the batch is 0.
181    pub fn is_empty(&self) -> bool {
182        self.num_rows() == 0
183    }
184
185    /// Returns the first timestamp in the batch or `None` if the batch is empty.
186    pub fn first_timestamp(&self) -> Option<Timestamp> {
187        if self.timestamps.is_empty() {
188            return None;
189        }
190
191        Some(self.get_timestamp(0))
192    }
193
194    /// Returns the last timestamp in the batch or `None` if the batch is empty.
195    pub fn last_timestamp(&self) -> Option<Timestamp> {
196        if self.timestamps.is_empty() {
197            return None;
198        }
199
200        Some(self.get_timestamp(self.timestamps.len() - 1))
201    }
202
203    /// Returns the first sequence in the batch or `None` if the batch is empty.
204    pub fn first_sequence(&self) -> Option<SequenceNumber> {
205        if self.sequences.is_empty() {
206            return None;
207        }
208
209        Some(self.get_sequence(0))
210    }
211
212    /// Returns the last sequence in the batch or `None` if the batch is empty.
213    pub fn last_sequence(&self) -> Option<SequenceNumber> {
214        if self.sequences.is_empty() {
215            return None;
216        }
217
218        Some(self.get_sequence(self.sequences.len() - 1))
219    }
220
221    /// Replaces the primary key of the batch.
222    ///
223    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
224    /// Be sure to update that field as well.
225    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
226        self.primary_key = primary_key;
227    }
228
229    /// Slice the batch, returning a new batch.
230    ///
231    /// # Panics
232    /// Panics if `offset + length > self.num_rows()`.
233    pub fn slice(&self, offset: usize, length: usize) -> Batch {
234        let fields = self
235            .fields
236            .iter()
237            .map(|column| BatchColumn {
238                column_id: column.column_id,
239                data: column.data.slice(offset, length),
240            })
241            .collect();
242        // We skip using the builder to avoid validating the batch again.
243        Batch {
244            // Now we need to clone the primary key. We could try `Bytes` if
245            // this becomes a bottleneck.
246            primary_key: self.primary_key.clone(),
247            pk_values: self.pk_values.clone(),
248            timestamps: self.timestamps.slice(offset, length),
249            sequences: Arc::new(self.sequences.get_slice(offset, length)),
250            op_types: Arc::new(self.op_types.get_slice(offset, length)),
251            fields,
252            fields_idx: self.fields_idx.clone(),
253        }
254    }
255
256    /// Takes `batches` and concat them into one batch.
257    ///
258    /// All `batches` must have the same primary key.
259    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
260        ensure!(
261            !batches.is_empty(),
262            InvalidBatchSnafu {
263                reason: "empty batches",
264            }
265        );
266        if batches.len() == 1 {
267            // Now we own the `batches` so we could pop it directly.
268            return Ok(batches.pop().unwrap());
269        }
270
271        let primary_key = std::mem::take(&mut batches[0].primary_key);
272        let first = &batches[0];
273        // We took the primary key from the first batch so we don't use `first.primary_key()`.
274        ensure!(
275            batches
276                .iter()
277                .skip(1)
278                .all(|b| b.primary_key() == primary_key),
279            InvalidBatchSnafu {
280                reason: "batches have different primary key",
281            }
282        );
283        for b in batches.iter().skip(1) {
284            ensure!(
285                b.fields.len() == first.fields.len(),
286                InvalidBatchSnafu {
287                    reason: "batches have different field num",
288                }
289            );
290            for (l, r) in b.fields.iter().zip(&first.fields) {
291                ensure!(
292                    l.column_id == r.column_id,
293                    InvalidBatchSnafu {
294                        reason: "batches have different fields",
295                    }
296                );
297            }
298        }
299
300        // We take the primary key from the first batch.
301        let mut builder = BatchBuilder::new(primary_key);
302        // Concat timestamps, sequences, op_types, fields.
303        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
304        builder.timestamps_array(array)?;
305        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
306        builder.sequences_array(array)?;
307        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
308        builder.op_types_array(array)?;
309        for (i, batch_column) in first.fields.iter().enumerate() {
310            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
311            builder.push_field_array(batch_column.column_id, array)?;
312        }
313
314        builder.build()
315    }
316
317    /// Removes rows whose op type is delete.
318    pub fn filter_deleted(&mut self) -> Result<()> {
319        // Safety: op type column is not null.
320        let array = self.op_types.as_arrow();
321        // Find rows with non-delete op type.
322        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
323        let predicate =
324            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
325        self.filter(&BooleanVector::from(predicate))
326    }
327
328    // Applies the `predicate` to the batch.
329    // Safety: We know the array type so we unwrap on casting.
330    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
331        self.timestamps = self
332            .timestamps
333            .filter(predicate)
334            .context(ComputeVectorSnafu)?;
335        self.sequences = Arc::new(
336            UInt64Vector::try_from_arrow_array(
337                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
338                    .context(ComputeArrowSnafu)?,
339            )
340            .unwrap(),
341        );
342        self.op_types = Arc::new(
343            UInt8Vector::try_from_arrow_array(
344                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
345                    .context(ComputeArrowSnafu)?,
346            )
347            .unwrap(),
348        );
349        for batch_column in &mut self.fields {
350            batch_column.data = batch_column
351                .data
352                .filter(predicate)
353                .context(ComputeVectorSnafu)?;
354        }
355
356        Ok(())
357    }
358
359    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
360    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
361        let seq = match (sequence, self.last_sequence()) {
362            (None, _) | (_, None) => return Ok(()),
363            (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
364            (Some(sequence), Some(_)) => sequence,
365        };
366
367        let seqs = self.sequences.as_arrow();
368        let sequence = UInt64Array::new_scalar(seq);
369        let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
370            .context(ComputeArrowSnafu)?;
371        let predicate = BooleanVector::from(predicate);
372        self.filter(&predicate)?;
373
374        Ok(())
375    }
376
377    /// Sorts rows in the batch. If `dedup` is true, it also removes
378    /// duplicated rows according to primary keys.
379    ///
380    /// It orders rows by timestamp, sequence desc and only keep the latest
381    /// row for the same timestamp. It doesn't consider op type as sequence
382    /// should already provide uniqueness for a row.
383    pub fn sort(&mut self, dedup: bool) -> Result<()> {
384        // If building a converter each time is costly, we may allow passing a
385        // converter.
386        let converter = RowConverter::new(vec![
387            SortField::new(self.timestamps.data_type().as_arrow_type()),
388            SortField::new_with_options(
389                self.sequences.data_type().as_arrow_type(),
390                SortOptions {
391                    descending: true,
392                    ..Default::default()
393                },
394            ),
395        ])
396        .context(ComputeArrowSnafu)?;
397        // Columns to sort.
398        let columns = [
399            self.timestamps.to_arrow_array(),
400            self.sequences.to_arrow_array(),
401        ];
402        let rows = converter.convert_columns(&columns).unwrap();
403        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
404
405        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
406        if !was_sorted {
407            to_sort.sort_unstable_by_key(|x| x.1);
408        }
409
410        let num_rows = to_sort.len();
411        if dedup {
412            // Dedup by timestamps.
413            to_sort.dedup_by(|left, right| {
414                debug_assert_eq!(18, left.1.as_ref().len());
415                debug_assert_eq!(18, right.1.as_ref().len());
416                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
417                // We only compare the timestamp part and ignore sequence.
418                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
419            });
420        }
421        let no_dedup = to_sort.len() == num_rows;
422
423        if was_sorted && no_dedup {
424            return Ok(());
425        }
426        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
427        self.take_in_place(&indices)
428    }
429
430    /// Returns the estimated memory size of the batch.
431    pub fn memory_size(&self) -> usize {
432        let mut size = std::mem::size_of::<Self>();
433        size += self.primary_key.len();
434        size += self.timestamps.memory_size();
435        size += self.sequences.memory_size();
436        size += self.op_types.memory_size();
437        for batch_column in &self.fields {
438            size += batch_column.data.memory_size();
439        }
440        size
441    }
442
443    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
444    pub(crate) fn projected_fields(
445        metadata: &RegionMetadata,
446        projection: &[ColumnId],
447    ) -> Vec<(ColumnId, ConcreteDataType)> {
448        let projected_ids: HashSet<_> = projection.iter().copied().collect();
449        metadata
450            .field_columns()
451            .filter_map(|column| {
452                if projected_ids.contains(&column.column_id) {
453                    Some((column.column_id, column.column_schema.data_type.clone()))
454                } else {
455                    None
456                }
457            })
458            .collect()
459    }
460
461    /// Returns timestamps in a native slice or `None` if the batch is empty.
462    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
463        if self.timestamps.is_empty() {
464            return None;
465        }
466
467        let values = match self.timestamps.data_type() {
468            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
469                .timestamps
470                .as_any()
471                .downcast_ref::<TimestampSecondVector>()
472                .unwrap()
473                .as_arrow()
474                .values(),
475            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
476                .timestamps
477                .as_any()
478                .downcast_ref::<TimestampMillisecondVector>()
479                .unwrap()
480                .as_arrow()
481                .values(),
482            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
483                .timestamps
484                .as_any()
485                .downcast_ref::<TimestampMicrosecondVector>()
486                .unwrap()
487                .as_arrow()
488                .values(),
489            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
490                .timestamps
491                .as_any()
492                .downcast_ref::<TimestampNanosecondVector>()
493                .unwrap()
494                .as_arrow()
495                .values(),
496            other => panic!("timestamps in a Batch has other type {:?}", other),
497        };
498
499        Some(values)
500    }
501
502    /// Takes the batch in place.
503    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
504        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
505        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
506            .context(ComputeArrowSnafu)?;
507        // Safety: we know the array and vector type.
508        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
509        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
510            .context(ComputeArrowSnafu)?;
511        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
512        for batch_column in &mut self.fields {
513            batch_column.data = batch_column
514                .data
515                .take(indices)
516                .context(ComputeVectorSnafu)?;
517        }
518
519        Ok(())
520    }
521
522    /// Gets a timestamp at given `index`.
523    ///
524    /// # Panics
525    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
526    fn get_timestamp(&self, index: usize) -> Timestamp {
527        match self.timestamps.get_ref(index) {
528            ValueRef::Timestamp(timestamp) => timestamp,
529
530            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
531            value => panic!("{:?} is not a timestamp", value),
532        }
533    }
534
535    /// Gets a sequence at given `index`.
536    ///
537    /// # Panics
538    /// Panics if `index` is out-of-bound or the sequence vector returns null.
539    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
540        // Safety: sequences is not null so it actually returns Some.
541        self.sequences.get_data(index).unwrap()
542    }
543
544    /// Checks the batch is monotonic by timestamps.
545    #[cfg(debug_assertions)]
546    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
547        use std::cmp::Ordering;
548        if self.timestamps_native().is_none() {
549            return Ok(());
550        }
551
552        let timestamps = self.timestamps_native().unwrap();
553        let sequences = self.sequences.as_arrow().values();
554        for (i, window) in timestamps.windows(2).enumerate() {
555            let current = window[0];
556            let next = window[1];
557            let current_sequence = sequences[i];
558            let next_sequence = sequences[i + 1];
559            match current.cmp(&next) {
560                Ordering::Less => {
561                    // The current timestamp is less than the next timestamp.
562                    continue;
563                }
564                Ordering::Equal => {
565                    // The current timestamp is equal to the next timestamp.
566                    if current_sequence < next_sequence {
567                        return Err(format!(
568                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
569                            current, next, current_sequence, next_sequence, i
570                        ));
571                    }
572                }
573                Ordering::Greater => {
574                    // The current timestamp is greater than the next timestamp.
575                    return Err(format!(
576                        "timestamps are not monotonic: {} > {}, index: {}",
577                        current, next, i
578                    ));
579                }
580            }
581        }
582
583        Ok(())
584    }
585
586    /// Returns Ok if the given batch is behind the current batch.
587    #[cfg(debug_assertions)]
588    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
589        // Checks the primary key
590        if self.primary_key() < other.primary_key() {
591            return Ok(());
592        }
593        if self.primary_key() > other.primary_key() {
594            return Err(format!(
595                "primary key is not monotonic: {:?} > {:?}",
596                self.primary_key(),
597                other.primary_key()
598            ));
599        }
600        // Checks the timestamp.
601        if self.last_timestamp() < other.first_timestamp() {
602            return Ok(());
603        }
604        if self.last_timestamp() > other.first_timestamp() {
605            return Err(format!(
606                "timestamps are not monotonic: {:?} > {:?}",
607                self.last_timestamp(),
608                other.first_timestamp()
609            ));
610        }
611        // Checks the sequence.
612        if self.last_sequence() >= other.first_sequence() {
613            return Ok(());
614        }
615        Err(format!(
616            "sequences are not monotonic: {:?} < {:?}",
617            self.last_sequence(),
618            other.first_sequence()
619        ))
620    }
621
622    /// Returns the value of the column in the primary key.
623    ///
624    /// Lazily decodes the primary key and caches the result.
625    pub fn pk_col_value(
626        &mut self,
627        codec: &dyn PrimaryKeyCodec,
628        col_idx_in_pk: usize,
629        column_id: ColumnId,
630    ) -> Result<Option<&Value>> {
631        if self.pk_values.is_none() {
632            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
633        }
634
635        let pk_values = self.pk_values.as_ref().unwrap();
636        Ok(match pk_values {
637            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
638            CompositeValues::Sparse(values) => values.get(&column_id),
639        })
640    }
641
642    /// Returns values of the field in the batch.
643    ///
644    /// Lazily caches the field index.
645    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
646        if self.fields_idx.is_none() {
647            self.fields_idx = Some(
648                self.fields
649                    .iter()
650                    .enumerate()
651                    .map(|(i, c)| (c.column_id, i))
652                    .collect(),
653            );
654        }
655
656        self.fields_idx
657            .as_ref()
658            .unwrap()
659            .get(&column_id)
660            .map(|&idx| &self.fields[idx])
661    }
662}
663
664/// A struct to check the batch is monotonic.
665#[cfg(debug_assertions)]
666#[derive(Default)]
667pub(crate) struct BatchChecker {
668    last_batch: Option<Batch>,
669    start: Option<Timestamp>,
670    end: Option<Timestamp>,
671}
672
673#[cfg(debug_assertions)]
674impl BatchChecker {
675    /// Attaches the given start timestamp to the checker.
676    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
677        self.start = start;
678        self
679    }
680
681    /// Attaches the given end timestamp to the checker.
682    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
683        self.end = end;
684        self
685    }
686
687    /// Returns true if the given batch is monotonic and behind
688    /// the last batch.
689    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
690        batch.check_monotonic()?;
691
692        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
693            if start > first {
694                return Err(format!(
695                    "batch's first timestamp is before the start timestamp: {:?} > {:?}",
696                    start, first
697                ));
698            }
699        }
700        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
701            if end <= last {
702                return Err(format!(
703                    "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
704                    end, last
705                ));
706            }
707        }
708
709        // Checks the batch is behind the last batch.
710        // Then Updates the last batch.
711        let res = self
712            .last_batch
713            .as_ref()
714            .map(|last| last.check_next_batch(batch))
715            .unwrap_or(Ok(()));
716        self.last_batch = Some(batch.clone());
717        res
718    }
719
720    /// Formats current batch and last batch for debug.
721    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
722        use std::fmt::Write;
723
724        let mut message = String::new();
725        if let Some(last) = &self.last_batch {
726            write!(
727                message,
728                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
729                last.primary_key(),
730                last.last_timestamp(),
731                last.last_sequence()
732            )
733            .unwrap();
734        }
735        write!(
736            message,
737            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
738            batch.primary_key(),
739            batch.timestamps(),
740            batch.sequences()
741        )
742        .unwrap();
743
744        message
745    }
746
747    /// Checks batches from the part range are monotonic. Otherwise, panics.
748    pub(crate) fn ensure_part_range_batch(
749        &mut self,
750        scanner: &str,
751        region_id: store_api::storage::RegionId,
752        partition: usize,
753        part_range: store_api::region_engine::PartitionRange,
754        batch: &Batch,
755    ) {
756        if let Err(e) = self.check_monotonic(batch) {
757            let err_msg = format!(
758                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
759                scanner, e, region_id, partition, part_range,
760            );
761            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
762            // Only print the number of row in the panic message.
763            panic!("{err_msg}, batch rows: {}", batch.num_rows());
764        }
765    }
766}
767
768/// Len of timestamp in arrow row format.
769const TIMESTAMP_KEY_LEN: usize = 9;
770
771/// Helper function to concat arrays from `iter`.
772fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
773    let arrays: Vec<_> = iter.collect();
774    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
775    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
776}
777
778/// A column in a [Batch].
779#[derive(Debug, PartialEq, Eq, Clone)]
780pub struct BatchColumn {
781    /// Id of the column.
782    pub column_id: ColumnId,
783    /// Data of the column.
784    pub data: VectorRef,
785}
786
787/// Builder to build [Batch].
788pub struct BatchBuilder {
789    primary_key: Vec<u8>,
790    timestamps: Option<VectorRef>,
791    sequences: Option<Arc<UInt64Vector>>,
792    op_types: Option<Arc<UInt8Vector>>,
793    fields: Vec<BatchColumn>,
794}
795
796impl BatchBuilder {
797    /// Creates a new [BatchBuilder] with primary key.
798    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
799        BatchBuilder {
800            primary_key,
801            timestamps: None,
802            sequences: None,
803            op_types: None,
804            fields: Vec::new(),
805        }
806    }
807
808    /// Creates a new [BatchBuilder] with all required columns.
809    pub fn with_required_columns(
810        primary_key: Vec<u8>,
811        timestamps: VectorRef,
812        sequences: Arc<UInt64Vector>,
813        op_types: Arc<UInt8Vector>,
814    ) -> BatchBuilder {
815        BatchBuilder {
816            primary_key,
817            timestamps: Some(timestamps),
818            sequences: Some(sequences),
819            op_types: Some(op_types),
820            fields: Vec::new(),
821        }
822    }
823
824    /// Set all field columns.
825    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
826        self.fields = fields;
827        self
828    }
829
830    /// Push a field column.
831    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
832        self.fields.push(column);
833        self
834    }
835
836    /// Push an array as a field.
837    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
838        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
839        self.fields.push(BatchColumn {
840            column_id,
841            data: vector,
842        });
843
844        Ok(self)
845    }
846
847    /// Try to set an array as timestamps.
848    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
849        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
850        ensure!(
851            vector.data_type().is_timestamp(),
852            InvalidBatchSnafu {
853                reason: format!("{:?} is not a timestamp type", vector.data_type()),
854            }
855        );
856
857        self.timestamps = Some(vector);
858        Ok(self)
859    }
860
861    /// Try to set an array as sequences.
862    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
863        ensure!(
864            *array.data_type() == arrow::datatypes::DataType::UInt64,
865            InvalidBatchSnafu {
866                reason: "sequence array is not UInt64 type",
867            }
868        );
869        // Safety: The cast must success as we have ensured it is uint64 type.
870        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
871        self.sequences = Some(vector);
872
873        Ok(self)
874    }
875
876    /// Try to set an array as op types.
877    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
878        ensure!(
879            *array.data_type() == arrow::datatypes::DataType::UInt8,
880            InvalidBatchSnafu {
881                reason: "sequence array is not UInt8 type",
882            }
883        );
884        // Safety: The cast must success as we have ensured it is uint64 type.
885        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
886        self.op_types = Some(vector);
887
888        Ok(self)
889    }
890
891    /// Builds the [Batch].
892    pub fn build(self) -> Result<Batch> {
893        let timestamps = self.timestamps.context(InvalidBatchSnafu {
894            reason: "missing timestamps",
895        })?;
896        let sequences = self.sequences.context(InvalidBatchSnafu {
897            reason: "missing sequences",
898        })?;
899        let op_types = self.op_types.context(InvalidBatchSnafu {
900            reason: "missing op_types",
901        })?;
902        // Our storage format ensure these columns are not nullable so
903        // we use assert here.
904        assert_eq!(0, timestamps.null_count());
905        assert_eq!(0, sequences.null_count());
906        assert_eq!(0, op_types.null_count());
907
908        let ts_len = timestamps.len();
909        ensure!(
910            sequences.len() == ts_len,
911            InvalidBatchSnafu {
912                reason: format!(
913                    "sequence have different len {} != {}",
914                    sequences.len(),
915                    ts_len
916                ),
917            }
918        );
919        ensure!(
920            op_types.len() == ts_len,
921            InvalidBatchSnafu {
922                reason: format!(
923                    "op type have different len {} != {}",
924                    op_types.len(),
925                    ts_len
926                ),
927            }
928        );
929        for column in &self.fields {
930            ensure!(
931                column.data.len() == ts_len,
932                InvalidBatchSnafu {
933                    reason: format!(
934                        "column {} has different len {} != {}",
935                        column.column_id,
936                        column.data.len(),
937                        ts_len
938                    ),
939                }
940            );
941        }
942
943        Ok(Batch {
944            primary_key: self.primary_key,
945            pk_values: None,
946            timestamps,
947            sequences,
948            op_types,
949            fields: self.fields,
950            fields_idx: None,
951        })
952    }
953}
954
955impl From<Batch> for BatchBuilder {
956    fn from(batch: Batch) -> Self {
957        Self {
958            primary_key: batch.primary_key,
959            timestamps: Some(batch.timestamps),
960            sequences: Some(batch.sequences),
961            op_types: Some(batch.op_types),
962            fields: batch.fields,
963        }
964    }
965}
966
967/// Async [Batch] reader and iterator wrapper.
968///
969/// This is the data source for SST writers or internal readers.
970pub enum Source {
971    /// Source from a [BoxedBatchReader].
972    Reader(BoxedBatchReader),
973    /// Source from a [BoxedBatchIterator].
974    Iter(BoxedBatchIterator),
975    /// Source from a [BoxedBatchStream].
976    Stream(BoxedBatchStream),
977    /// Source from a [PruneReader].
978    PruneReader(PruneReader),
979}
980
981impl Source {
982    /// Returns next [Batch] from this data source.
983    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
984        match self {
985            Source::Reader(reader) => reader.next_batch().await,
986            Source::Iter(iter) => iter.next().transpose(),
987            Source::Stream(stream) => stream.try_next().await,
988            Source::PruneReader(reader) => reader.next_batch().await,
989        }
990    }
991}
992
993/// Async batch reader.
994///
995/// The reader must guarantee [Batch]es returned by it have the same schema.
996#[async_trait]
997pub trait BatchReader: Send {
998    /// Fetch next [Batch].
999    ///
1000    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1001    /// again won't return batch again.
1002    ///
1003    /// If `Err` is returned, caller should not call this method again, the implementor
1004    /// may or may not panic in such case.
1005    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1006}
1007
1008/// Pointer to [BatchReader].
1009pub type BoxedBatchReader = Box<dyn BatchReader>;
1010
1011/// Pointer to a stream that yields [Batch].
1012pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1013
1014#[async_trait::async_trait]
1015impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1016    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1017        (**self).next_batch().await
1018    }
1019}
1020
1021/// Local metrics for scanners.
1022#[derive(Debug, Default)]
1023pub(crate) struct ScannerMetrics {
1024    /// Duration to prepare the scan task.
1025    prepare_scan_cost: Duration,
1026    /// Duration to build the (merge) reader.
1027    build_reader_cost: Duration,
1028    /// Duration to scan data.
1029    scan_cost: Duration,
1030    /// Duration while waiting for `yield`.
1031    yield_cost: Duration,
1032    /// Number of batches returned.
1033    num_batches: usize,
1034    /// Number of rows returned.
1035    num_rows: usize,
1036    /// Number of mem ranges scanned.
1037    num_mem_ranges: usize,
1038    /// Number of file ranges scanned.
1039    num_file_ranges: usize,
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1045    use store_api::codec::PrimaryKeyEncoding;
1046    use store_api::storage::consts::ReservedColumnId;
1047
1048    use super::*;
1049    use crate::error::Error;
1050    use crate::test_util::new_batch_builder;
1051
1052    fn new_batch(
1053        timestamps: &[i64],
1054        sequences: &[u64],
1055        op_types: &[OpType],
1056        field: &[u64],
1057    ) -> Batch {
1058        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1059            .build()
1060            .unwrap()
1061    }
1062
1063    #[test]
1064    fn test_empty_batch() {
1065        let batch = Batch::empty();
1066        assert!(batch.is_empty());
1067        assert_eq!(None, batch.first_timestamp());
1068        assert_eq!(None, batch.last_timestamp());
1069        assert_eq!(None, batch.first_sequence());
1070        assert_eq!(None, batch.last_sequence());
1071        assert!(batch.timestamps_native().is_none());
1072    }
1073
1074    #[test]
1075    fn test_first_last_one() {
1076        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1077        assert_eq!(
1078            Timestamp::new_millisecond(1),
1079            batch.first_timestamp().unwrap()
1080        );
1081        assert_eq!(
1082            Timestamp::new_millisecond(1),
1083            batch.last_timestamp().unwrap()
1084        );
1085        assert_eq!(2, batch.first_sequence().unwrap());
1086        assert_eq!(2, batch.last_sequence().unwrap());
1087    }
1088
1089    #[test]
1090    fn test_first_last_multiple() {
1091        let batch = new_batch(
1092            &[1, 2, 3],
1093            &[11, 12, 13],
1094            &[OpType::Put, OpType::Put, OpType::Put],
1095            &[21, 22, 23],
1096        );
1097        assert_eq!(
1098            Timestamp::new_millisecond(1),
1099            batch.first_timestamp().unwrap()
1100        );
1101        assert_eq!(
1102            Timestamp::new_millisecond(3),
1103            batch.last_timestamp().unwrap()
1104        );
1105        assert_eq!(11, batch.first_sequence().unwrap());
1106        assert_eq!(13, batch.last_sequence().unwrap());
1107    }
1108
1109    #[test]
1110    fn test_slice() {
1111        let batch = new_batch(
1112            &[1, 2, 3, 4],
1113            &[11, 12, 13, 14],
1114            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1115            &[21, 22, 23, 24],
1116        );
1117        let batch = batch.slice(1, 2);
1118        let expect = new_batch(
1119            &[2, 3],
1120            &[12, 13],
1121            &[OpType::Delete, OpType::Put],
1122            &[22, 23],
1123        );
1124        assert_eq!(expect, batch);
1125    }
1126
1127    #[test]
1128    fn test_timestamps_native() {
1129        let batch = new_batch(
1130            &[1, 2, 3, 4],
1131            &[11, 12, 13, 14],
1132            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1133            &[21, 22, 23, 24],
1134        );
1135        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1136    }
1137
1138    #[test]
1139    fn test_concat_empty() {
1140        let err = Batch::concat(vec![]).unwrap_err();
1141        assert!(
1142            matches!(err, Error::InvalidBatch { .. }),
1143            "unexpected err: {err}"
1144        );
1145    }
1146
1147    #[test]
1148    fn test_concat_one() {
1149        let batch = new_batch(&[], &[], &[], &[]);
1150        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1151        assert_eq!(batch, actual);
1152
1153        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1154        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1155        assert_eq!(batch, actual);
1156    }
1157
1158    #[test]
1159    fn test_concat_multiple() {
1160        let batches = vec![
1161            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1162            new_batch(
1163                &[3, 4, 5],
1164                &[13, 14, 15],
1165                &[OpType::Put, OpType::Delete, OpType::Put],
1166                &[23, 24, 25],
1167            ),
1168            new_batch(&[], &[], &[], &[]),
1169            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1170        ];
1171        let batch = Batch::concat(batches).unwrap();
1172        let expect = new_batch(
1173            &[1, 2, 3, 4, 5, 6],
1174            &[11, 12, 13, 14, 15, 16],
1175            &[
1176                OpType::Put,
1177                OpType::Put,
1178                OpType::Put,
1179                OpType::Delete,
1180                OpType::Put,
1181                OpType::Put,
1182            ],
1183            &[21, 22, 23, 24, 25, 26],
1184        );
1185        assert_eq!(expect, batch);
1186    }
1187
1188    #[test]
1189    fn test_concat_different() {
1190        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1191        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1192        batch2.primary_key = b"hello".to_vec();
1193        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1194        assert!(
1195            matches!(err, Error::InvalidBatch { .. }),
1196            "unexpected err: {err}"
1197        );
1198    }
1199
1200    #[test]
1201    fn test_concat_different_fields() {
1202        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1203        let fields = vec![
1204            batch1.fields()[0].clone(),
1205            BatchColumn {
1206                column_id: 2,
1207                data: Arc::new(UInt64Vector::from_slice([2])),
1208            },
1209        ];
1210        // Batch 2 has more fields.
1211        let batch2 = batch1.clone().with_fields(fields).unwrap();
1212        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1213        assert!(
1214            matches!(err, Error::InvalidBatch { .. }),
1215            "unexpected err: {err}"
1216        );
1217
1218        // Batch 2 has different field.
1219        let fields = vec![BatchColumn {
1220            column_id: 2,
1221            data: Arc::new(UInt64Vector::from_slice([2])),
1222        }];
1223        let batch2 = batch1.clone().with_fields(fields).unwrap();
1224        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1225        assert!(
1226            matches!(err, Error::InvalidBatch { .. }),
1227            "unexpected err: {err}"
1228        );
1229    }
1230
1231    #[test]
1232    fn test_filter_deleted_empty() {
1233        let mut batch = new_batch(&[], &[], &[], &[]);
1234        batch.filter_deleted().unwrap();
1235        assert!(batch.is_empty());
1236    }
1237
1238    #[test]
1239    fn test_filter_deleted() {
1240        let mut batch = new_batch(
1241            &[1, 2, 3, 4],
1242            &[11, 12, 13, 14],
1243            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1244            &[21, 22, 23, 24],
1245        );
1246        batch.filter_deleted().unwrap();
1247        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1248        assert_eq!(expect, batch);
1249
1250        let mut batch = new_batch(
1251            &[1, 2, 3, 4],
1252            &[11, 12, 13, 14],
1253            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1254            &[21, 22, 23, 24],
1255        );
1256        let expect = batch.clone();
1257        batch.filter_deleted().unwrap();
1258        assert_eq!(expect, batch);
1259    }
1260
1261    #[test]
1262    fn test_filter_by_sequence() {
1263        // Filters put only.
1264        let mut batch = new_batch(
1265            &[1, 2, 3, 4],
1266            &[11, 12, 13, 14],
1267            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1268            &[21, 22, 23, 24],
1269        );
1270        batch.filter_by_sequence(Some(13)).unwrap();
1271        let expect = new_batch(
1272            &[1, 2, 3],
1273            &[11, 12, 13],
1274            &[OpType::Put, OpType::Put, OpType::Put],
1275            &[21, 22, 23],
1276        );
1277        assert_eq!(expect, batch);
1278
1279        // Filters to empty.
1280        let mut batch = new_batch(
1281            &[1, 2, 3, 4],
1282            &[11, 12, 13, 14],
1283            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1284            &[21, 22, 23, 24],
1285        );
1286
1287        batch.filter_by_sequence(Some(10)).unwrap();
1288        assert!(batch.is_empty());
1289
1290        // None filter.
1291        let mut batch = new_batch(
1292            &[1, 2, 3, 4],
1293            &[11, 12, 13, 14],
1294            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1295            &[21, 22, 23, 24],
1296        );
1297        let expect = batch.clone();
1298        batch.filter_by_sequence(None).unwrap();
1299        assert_eq!(expect, batch);
1300
1301        // Filter a empty batch
1302        let mut batch = new_batch(&[], &[], &[], &[]);
1303        batch.filter_by_sequence(Some(10)).unwrap();
1304        assert!(batch.is_empty());
1305
1306        // Filter a empty batch with None
1307        let mut batch = new_batch(&[], &[], &[], &[]);
1308        batch.filter_by_sequence(None).unwrap();
1309        assert!(batch.is_empty());
1310    }
1311
1312    #[test]
1313    fn test_filter() {
1314        // Filters put only.
1315        let mut batch = new_batch(
1316            &[1, 2, 3, 4],
1317            &[11, 12, 13, 14],
1318            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1319            &[21, 22, 23, 24],
1320        );
1321        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1322        batch.filter(&predicate).unwrap();
1323        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1324        assert_eq!(expect, batch);
1325
1326        // Filters deletion.
1327        let mut batch = new_batch(
1328            &[1, 2, 3, 4],
1329            &[11, 12, 13, 14],
1330            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1331            &[21, 22, 23, 24],
1332        );
1333        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1334        batch.filter(&predicate).unwrap();
1335        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1336        assert_eq!(expect, batch);
1337
1338        // Filters to empty.
1339        let predicate = BooleanVector::from_vec(vec![false, false]);
1340        batch.filter(&predicate).unwrap();
1341        assert!(batch.is_empty());
1342    }
1343
1344    #[test]
1345    fn test_sort_and_dedup() {
1346        let original = new_batch(
1347            &[2, 3, 1, 4, 5, 2],
1348            &[1, 2, 3, 4, 5, 6],
1349            &[
1350                OpType::Put,
1351                OpType::Put,
1352                OpType::Put,
1353                OpType::Put,
1354                OpType::Put,
1355                OpType::Put,
1356            ],
1357            &[21, 22, 23, 24, 25, 26],
1358        );
1359
1360        let mut batch = original.clone();
1361        batch.sort(true).unwrap();
1362        // It should only keep one timestamp 2.
1363        assert_eq!(
1364            new_batch(
1365                &[1, 2, 3, 4, 5],
1366                &[3, 6, 2, 4, 5],
1367                &[
1368                    OpType::Put,
1369                    OpType::Put,
1370                    OpType::Put,
1371                    OpType::Put,
1372                    OpType::Put,
1373                ],
1374                &[23, 26, 22, 24, 25],
1375            ),
1376            batch
1377        );
1378
1379        let mut batch = original.clone();
1380        batch.sort(false).unwrap();
1381
1382        // It should only keep one timestamp 2.
1383        assert_eq!(
1384            new_batch(
1385                &[1, 2, 2, 3, 4, 5],
1386                &[3, 6, 1, 2, 4, 5],
1387                &[
1388                    OpType::Put,
1389                    OpType::Put,
1390                    OpType::Put,
1391                    OpType::Put,
1392                    OpType::Put,
1393                    OpType::Put,
1394                ],
1395                &[23, 26, 21, 22, 24, 25],
1396            ),
1397            batch
1398        );
1399
1400        let original = new_batch(
1401            &[2, 2, 1],
1402            &[1, 6, 1],
1403            &[OpType::Delete, OpType::Put, OpType::Put],
1404            &[21, 22, 23],
1405        );
1406
1407        let mut batch = original.clone();
1408        batch.sort(true).unwrap();
1409        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1410        assert_eq!(expect, batch);
1411
1412        let mut batch = original.clone();
1413        batch.sort(false).unwrap();
1414        let expect = new_batch(
1415            &[1, 2, 2],
1416            &[1, 6, 1],
1417            &[OpType::Put, OpType::Put, OpType::Delete],
1418            &[23, 22, 21],
1419        );
1420        assert_eq!(expect, batch);
1421    }
1422
1423    #[test]
1424    fn test_get_value() {
1425        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1426
1427        for encoding in encodings {
1428            let codec = build_primary_key_codec_with_fields(
1429                encoding,
1430                [
1431                    (
1432                        ReservedColumnId::table_id(),
1433                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1434                    ),
1435                    (
1436                        ReservedColumnId::tsid(),
1437                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1438                    ),
1439                    (
1440                        100,
1441                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1442                    ),
1443                    (
1444                        200,
1445                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1446                    ),
1447                ]
1448                .into_iter(),
1449            );
1450
1451            let values = [
1452                Value::UInt32(1000),
1453                Value::UInt64(2000),
1454                Value::String("abcdefgh".into()),
1455                Value::String("zyxwvu".into()),
1456            ];
1457            let mut buf = vec![];
1458            codec
1459                .encode_values(
1460                    &[
1461                        (ReservedColumnId::table_id(), values[0].clone()),
1462                        (ReservedColumnId::tsid(), values[1].clone()),
1463                        (100, values[2].clone()),
1464                        (200, values[3].clone()),
1465                    ],
1466                    &mut buf,
1467                )
1468                .unwrap();
1469
1470            let field_col_id = 2;
1471            let mut batch = new_batch_builder(
1472                &buf,
1473                &[1, 2, 3],
1474                &[1, 1, 1],
1475                &[OpType::Put, OpType::Put, OpType::Put],
1476                field_col_id,
1477                &[42, 43, 44],
1478            )
1479            .build()
1480            .unwrap();
1481
1482            let v = batch
1483                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1484                .unwrap()
1485                .unwrap();
1486            assert_eq!(values[0], *v);
1487
1488            let v = batch
1489                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1490                .unwrap()
1491                .unwrap();
1492            assert_eq!(values[1], *v);
1493
1494            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1495            assert_eq!(values[2], *v);
1496
1497            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1498            assert_eq!(values[3], *v);
1499
1500            let v = batch.field_col_value(field_col_id).unwrap();
1501            assert_eq!(v.data.get(0), Value::UInt64(42));
1502            assert_eq!(v.data.get(1), Value::UInt64(43));
1503            assert_eq!(v.data.get(2), Value::UInt64(44));
1504        }
1505    }
1506}