mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub(crate) mod range;
25pub(crate) mod scan_region;
26pub(crate) mod scan_util;
27pub(crate) mod seq_scan;
28pub(crate) mod series_scan;
29pub(crate) mod unordered_scan;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use std::time::Duration;
34
35use api::v1::OpType;
36use async_trait::async_trait;
37use common_time::Timestamp;
38use datafusion_common::arrow::array::UInt8Array;
39use datatypes::arrow;
40use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
41use datatypes::arrow::compute::SortOptions;
42use datatypes::arrow::row::{RowConverter, SortField};
43use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
44use datatypes::types::TimestampType;
45use datatypes::value::{Value, ValueRef};
46use datatypes::vectors::{
47    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
48    TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
49    Vector, VectorRef,
50};
51use futures::stream::BoxStream;
52use futures::TryStreamExt;
53use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
54use snafu::{ensure, OptionExt, ResultExt};
55use store_api::metadata::RegionMetadata;
56use store_api::storage::{ColumnId, SequenceNumber};
57
58use crate::error::{
59    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
60    Result,
61};
62use crate::memtable::BoxedBatchIterator;
63use crate::read::prune::PruneReader;
64
65/// Storage internal representation of a batch of rows for a primary key (time series).
66///
67/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
68/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
69#[derive(Debug, PartialEq, Clone)]
70pub struct Batch {
71    /// Primary key encoded in a comparable form.
72    primary_key: Vec<u8>,
73    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
74    pk_values: Option<CompositeValues>,
75    /// Timestamps of rows, should be sorted and not null.
76    timestamps: VectorRef,
77    /// Sequences of rows
78    ///
79    /// UInt64 type, not null.
80    sequences: Arc<UInt64Vector>,
81    /// Op types of rows
82    ///
83    /// UInt8 type, not null.
84    op_types: Arc<UInt8Vector>,
85    /// Fields organized in columnar format.
86    fields: Vec<BatchColumn>,
87    /// Cache for field index lookup.
88    fields_idx: Option<HashMap<ColumnId, usize>>,
89}
90
91impl Batch {
92    /// Creates a new batch.
93    pub fn new(
94        primary_key: Vec<u8>,
95        timestamps: VectorRef,
96        sequences: Arc<UInt64Vector>,
97        op_types: Arc<UInt8Vector>,
98        fields: Vec<BatchColumn>,
99    ) -> Result<Batch> {
100        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
101            .with_fields(fields)
102            .build()
103    }
104
105    /// Tries to set fields for the batch.
106    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
107        Batch::new(
108            self.primary_key,
109            self.timestamps,
110            self.sequences,
111            self.op_types,
112            fields,
113        )
114    }
115
116    /// Returns primary key of the batch.
117    pub fn primary_key(&self) -> &[u8] {
118        &self.primary_key
119    }
120
121    /// Returns possibly decoded primary-key values.
122    pub fn pk_values(&self) -> Option<&CompositeValues> {
123        self.pk_values.as_ref()
124    }
125
126    /// Sets possibly decoded primary-key values.
127    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
128        self.pk_values = Some(pk_values);
129    }
130
131    /// Removes possibly decoded primary-key values. For testing only.
132    #[cfg(any(test, feature = "test"))]
133    pub fn remove_pk_values(&mut self) {
134        self.pk_values = None;
135    }
136
137    /// Returns fields in the batch.
138    pub fn fields(&self) -> &[BatchColumn] {
139        &self.fields
140    }
141
142    /// Returns timestamps of the batch.
143    pub fn timestamps(&self) -> &VectorRef {
144        &self.timestamps
145    }
146
147    /// Returns sequences of the batch.
148    pub fn sequences(&self) -> &Arc<UInt64Vector> {
149        &self.sequences
150    }
151
152    /// Returns op types of the batch.
153    pub fn op_types(&self) -> &Arc<UInt8Vector> {
154        &self.op_types
155    }
156
157    /// Returns the number of rows in the batch.
158    pub fn num_rows(&self) -> usize {
159        // All vectors have the same length. We use the length of sequences vector
160        // since it has static type.
161        self.sequences.len()
162    }
163
164    /// Returns true if the number of rows in the batch is 0.
165    pub fn is_empty(&self) -> bool {
166        self.num_rows() == 0
167    }
168
169    /// Returns the first timestamp in the batch or `None` if the batch is empty.
170    pub fn first_timestamp(&self) -> Option<Timestamp> {
171        if self.timestamps.is_empty() {
172            return None;
173        }
174
175        Some(self.get_timestamp(0))
176    }
177
178    /// Returns the last timestamp in the batch or `None` if the batch is empty.
179    pub fn last_timestamp(&self) -> Option<Timestamp> {
180        if self.timestamps.is_empty() {
181            return None;
182        }
183
184        Some(self.get_timestamp(self.timestamps.len() - 1))
185    }
186
187    /// Returns the first sequence in the batch or `None` if the batch is empty.
188    pub fn first_sequence(&self) -> Option<SequenceNumber> {
189        if self.sequences.is_empty() {
190            return None;
191        }
192
193        Some(self.get_sequence(0))
194    }
195
196    /// Returns the last sequence in the batch or `None` if the batch is empty.
197    pub fn last_sequence(&self) -> Option<SequenceNumber> {
198        if self.sequences.is_empty() {
199            return None;
200        }
201
202        Some(self.get_sequence(self.sequences.len() - 1))
203    }
204
205    /// Replaces the primary key of the batch.
206    ///
207    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
208    /// Be sure to update that field as well.
209    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
210        self.primary_key = primary_key;
211    }
212
213    /// Slice the batch, returning a new batch.
214    ///
215    /// # Panics
216    /// Panics if `offset + length > self.num_rows()`.
217    pub fn slice(&self, offset: usize, length: usize) -> Batch {
218        let fields = self
219            .fields
220            .iter()
221            .map(|column| BatchColumn {
222                column_id: column.column_id,
223                data: column.data.slice(offset, length),
224            })
225            .collect();
226        // We skip using the builder to avoid validating the batch again.
227        Batch {
228            // Now we need to clone the primary key. We could try `Bytes` if
229            // this becomes a bottleneck.
230            primary_key: self.primary_key.clone(),
231            pk_values: self.pk_values.clone(),
232            timestamps: self.timestamps.slice(offset, length),
233            sequences: Arc::new(self.sequences.get_slice(offset, length)),
234            op_types: Arc::new(self.op_types.get_slice(offset, length)),
235            fields,
236            fields_idx: self.fields_idx.clone(),
237        }
238    }
239
240    /// Takes `batches` and concat them into one batch.
241    ///
242    /// All `batches` must have the same primary key.
243    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
244        ensure!(
245            !batches.is_empty(),
246            InvalidBatchSnafu {
247                reason: "empty batches",
248            }
249        );
250        if batches.len() == 1 {
251            // Now we own the `batches` so we could pop it directly.
252            return Ok(batches.pop().unwrap());
253        }
254
255        let primary_key = std::mem::take(&mut batches[0].primary_key);
256        let first = &batches[0];
257        // We took the primary key from the first batch so we don't use `first.primary_key()`.
258        ensure!(
259            batches
260                .iter()
261                .skip(1)
262                .all(|b| b.primary_key() == primary_key),
263            InvalidBatchSnafu {
264                reason: "batches have different primary key",
265            }
266        );
267        for b in batches.iter().skip(1) {
268            ensure!(
269                b.fields.len() == first.fields.len(),
270                InvalidBatchSnafu {
271                    reason: "batches have different field num",
272                }
273            );
274            for (l, r) in b.fields.iter().zip(&first.fields) {
275                ensure!(
276                    l.column_id == r.column_id,
277                    InvalidBatchSnafu {
278                        reason: "batches have different fields",
279                    }
280                );
281            }
282        }
283
284        // We take the primary key from the first batch.
285        let mut builder = BatchBuilder::new(primary_key);
286        // Concat timestamps, sequences, op_types, fields.
287        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
288        builder.timestamps_array(array)?;
289        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
290        builder.sequences_array(array)?;
291        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
292        builder.op_types_array(array)?;
293        for (i, batch_column) in first.fields.iter().enumerate() {
294            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
295            builder.push_field_array(batch_column.column_id, array)?;
296        }
297
298        builder.build()
299    }
300
301    /// Removes rows whose op type is delete.
302    pub fn filter_deleted(&mut self) -> Result<()> {
303        // Safety: op type column is not null.
304        let array = self.op_types.as_arrow();
305        // Find rows with non-delete op type.
306        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
307        let predicate =
308            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
309        self.filter(&BooleanVector::from(predicate))
310    }
311
312    // Applies the `predicate` to the batch.
313    // Safety: We know the array type so we unwrap on casting.
314    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
315        self.timestamps = self
316            .timestamps
317            .filter(predicate)
318            .context(ComputeVectorSnafu)?;
319        self.sequences = Arc::new(
320            UInt64Vector::try_from_arrow_array(
321                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
322                    .context(ComputeArrowSnafu)?,
323            )
324            .unwrap(),
325        );
326        self.op_types = Arc::new(
327            UInt8Vector::try_from_arrow_array(
328                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
329                    .context(ComputeArrowSnafu)?,
330            )
331            .unwrap(),
332        );
333        for batch_column in &mut self.fields {
334            batch_column.data = batch_column
335                .data
336                .filter(predicate)
337                .context(ComputeVectorSnafu)?;
338        }
339
340        Ok(())
341    }
342
343    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
344    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
345        let seq = match (sequence, self.last_sequence()) {
346            (None, _) | (_, None) => return Ok(()),
347            (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
348            (Some(sequence), Some(_)) => sequence,
349        };
350
351        let seqs = self.sequences.as_arrow();
352        let sequence = UInt64Array::new_scalar(seq);
353        let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
354            .context(ComputeArrowSnafu)?;
355        let predicate = BooleanVector::from(predicate);
356        self.filter(&predicate)?;
357
358        Ok(())
359    }
360
361    /// Sorts rows in the batch. If `dedup` is true, it also removes
362    /// duplicated rows according to primary keys.
363    ///
364    /// It orders rows by timestamp, sequence desc and only keep the latest
365    /// row for the same timestamp. It doesn't consider op type as sequence
366    /// should already provide uniqueness for a row.
367    pub fn sort(&mut self, dedup: bool) -> Result<()> {
368        // If building a converter each time is costly, we may allow passing a
369        // converter.
370        let converter = RowConverter::new(vec![
371            SortField::new(self.timestamps.data_type().as_arrow_type()),
372            SortField::new_with_options(
373                self.sequences.data_type().as_arrow_type(),
374                SortOptions {
375                    descending: true,
376                    ..Default::default()
377                },
378            ),
379        ])
380        .context(ComputeArrowSnafu)?;
381        // Columns to sort.
382        let columns = [
383            self.timestamps.to_arrow_array(),
384            self.sequences.to_arrow_array(),
385        ];
386        let rows = converter.convert_columns(&columns).unwrap();
387        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
388
389        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
390        if !was_sorted {
391            to_sort.sort_unstable_by_key(|x| x.1);
392        }
393
394        let num_rows = to_sort.len();
395        if dedup {
396            // Dedup by timestamps.
397            to_sort.dedup_by(|left, right| {
398                debug_assert_eq!(18, left.1.as_ref().len());
399                debug_assert_eq!(18, right.1.as_ref().len());
400                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
401                // We only compare the timestamp part and ignore sequence.
402                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
403            });
404        }
405        let no_dedup = to_sort.len() == num_rows;
406
407        if was_sorted && no_dedup {
408            return Ok(());
409        }
410        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
411        self.take_in_place(&indices)
412    }
413
414    /// Returns the estimated memory size of the batch.
415    pub fn memory_size(&self) -> usize {
416        let mut size = std::mem::size_of::<Self>();
417        size += self.primary_key.len();
418        size += self.timestamps.memory_size();
419        size += self.sequences.memory_size();
420        size += self.op_types.memory_size();
421        for batch_column in &self.fields {
422            size += batch_column.data.memory_size();
423        }
424        size
425    }
426
427    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
428    pub(crate) fn projected_fields(
429        metadata: &RegionMetadata,
430        projection: &[ColumnId],
431    ) -> Vec<(ColumnId, ConcreteDataType)> {
432        let projected_ids: HashSet<_> = projection.iter().copied().collect();
433        metadata
434            .field_columns()
435            .filter_map(|column| {
436                if projected_ids.contains(&column.column_id) {
437                    Some((column.column_id, column.column_schema.data_type.clone()))
438                } else {
439                    None
440                }
441            })
442            .collect()
443    }
444
445    /// Returns timestamps in a native slice or `None` if the batch is empty.
446    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
447        if self.timestamps.is_empty() {
448            return None;
449        }
450
451        let values = match self.timestamps.data_type() {
452            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
453                .timestamps
454                .as_any()
455                .downcast_ref::<TimestampSecondVector>()
456                .unwrap()
457                .as_arrow()
458                .values(),
459            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
460                .timestamps
461                .as_any()
462                .downcast_ref::<TimestampMillisecondVector>()
463                .unwrap()
464                .as_arrow()
465                .values(),
466            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
467                .timestamps
468                .as_any()
469                .downcast_ref::<TimestampMicrosecondVector>()
470                .unwrap()
471                .as_arrow()
472                .values(),
473            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
474                .timestamps
475                .as_any()
476                .downcast_ref::<TimestampNanosecondVector>()
477                .unwrap()
478                .as_arrow()
479                .values(),
480            other => panic!("timestamps in a Batch has other type {:?}", other),
481        };
482
483        Some(values)
484    }
485
486    /// Takes the batch in place.
487    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
488        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
489        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
490            .context(ComputeArrowSnafu)?;
491        // Safety: we know the array and vector type.
492        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
493        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
494            .context(ComputeArrowSnafu)?;
495        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
496        for batch_column in &mut self.fields {
497            batch_column.data = batch_column
498                .data
499                .take(indices)
500                .context(ComputeVectorSnafu)?;
501        }
502
503        Ok(())
504    }
505
506    /// Gets a timestamp at given `index`.
507    ///
508    /// # Panics
509    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
510    fn get_timestamp(&self, index: usize) -> Timestamp {
511        match self.timestamps.get_ref(index) {
512            ValueRef::Timestamp(timestamp) => timestamp,
513
514            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
515            value => panic!("{:?} is not a timestamp", value),
516        }
517    }
518
519    /// Gets a sequence at given `index`.
520    ///
521    /// # Panics
522    /// Panics if `index` is out-of-bound or the sequence vector returns null.
523    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
524        // Safety: sequences is not null so it actually returns Some.
525        self.sequences.get_data(index).unwrap()
526    }
527
528    /// Checks the batch is monotonic by timestamps.
529    #[cfg(debug_assertions)]
530    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
531        use std::cmp::Ordering;
532        if self.timestamps_native().is_none() {
533            return Ok(());
534        }
535
536        let timestamps = self.timestamps_native().unwrap();
537        let sequences = self.sequences.as_arrow().values();
538        for (i, window) in timestamps.windows(2).enumerate() {
539            let current = window[0];
540            let next = window[1];
541            let current_sequence = sequences[i];
542            let next_sequence = sequences[i + 1];
543            match current.cmp(&next) {
544                Ordering::Less => {
545                    // The current timestamp is less than the next timestamp.
546                    continue;
547                }
548                Ordering::Equal => {
549                    // The current timestamp is equal to the next timestamp.
550                    if current_sequence < next_sequence {
551                        return Err(format!(
552                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
553                            current, next, current_sequence, next_sequence, i
554                        ));
555                    }
556                }
557                Ordering::Greater => {
558                    // The current timestamp is greater than the next timestamp.
559                    return Err(format!(
560                        "timestamps are not monotonic: {} > {}, index: {}",
561                        current, next, i
562                    ));
563                }
564            }
565        }
566
567        Ok(())
568    }
569
570    /// Returns Ok if the given batch is behind the current batch.
571    #[cfg(debug_assertions)]
572    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
573        // Checks the primary key
574        if self.primary_key() < other.primary_key() {
575            return Ok(());
576        }
577        if self.primary_key() > other.primary_key() {
578            return Err(format!(
579                "primary key is not monotonic: {:?} > {:?}",
580                self.primary_key(),
581                other.primary_key()
582            ));
583        }
584        // Checks the timestamp.
585        if self.last_timestamp() < other.first_timestamp() {
586            return Ok(());
587        }
588        if self.last_timestamp() > other.first_timestamp() {
589            return Err(format!(
590                "timestamps are not monotonic: {:?} > {:?}",
591                self.last_timestamp(),
592                other.first_timestamp()
593            ));
594        }
595        // Checks the sequence.
596        if self.last_sequence() >= other.first_sequence() {
597            return Ok(());
598        }
599        Err(format!(
600            "sequences are not monotonic: {:?} < {:?}",
601            self.last_sequence(),
602            other.first_sequence()
603        ))
604    }
605
606    /// Returns the value of the column in the primary key.
607    ///
608    /// Lazily decodes the primary key and caches the result.
609    pub fn pk_col_value(
610        &mut self,
611        codec: &dyn PrimaryKeyCodec,
612        col_idx_in_pk: usize,
613        column_id: ColumnId,
614    ) -> Result<Option<&Value>> {
615        if self.pk_values.is_none() {
616            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
617        }
618
619        let pk_values = self.pk_values.as_ref().unwrap();
620        Ok(match pk_values {
621            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
622            CompositeValues::Sparse(values) => values.get(&column_id),
623        })
624    }
625
626    /// Returns values of the field in the batch.
627    ///
628    /// Lazily caches the field index.
629    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
630        if self.fields_idx.is_none() {
631            self.fields_idx = Some(
632                self.fields
633                    .iter()
634                    .enumerate()
635                    .map(|(i, c)| (c.column_id, i))
636                    .collect(),
637            );
638        }
639
640        self.fields_idx
641            .as_ref()
642            .unwrap()
643            .get(&column_id)
644            .map(|&idx| &self.fields[idx])
645    }
646}
647
648/// A struct to check the batch is monotonic.
649#[cfg(debug_assertions)]
650#[derive(Default)]
651pub(crate) struct BatchChecker {
652    last_batch: Option<Batch>,
653    start: Option<Timestamp>,
654    end: Option<Timestamp>,
655}
656
657#[cfg(debug_assertions)]
658impl BatchChecker {
659    /// Attaches the given start timestamp to the checker.
660    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
661        self.start = start;
662        self
663    }
664
665    /// Attaches the given end timestamp to the checker.
666    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
667        self.end = end;
668        self
669    }
670
671    /// Returns true if the given batch is monotonic and behind
672    /// the last batch.
673    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
674        batch.check_monotonic()?;
675
676        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
677            if start > first {
678                return Err(format!(
679                    "batch's first timestamp is before the start timestamp: {:?} > {:?}",
680                    start, first
681                ));
682            }
683        }
684        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
685            if end <= last {
686                return Err(format!(
687                    "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
688                    end, last
689                ));
690            }
691        }
692
693        // Checks the batch is behind the last batch.
694        // Then Updates the last batch.
695        let res = self
696            .last_batch
697            .as_ref()
698            .map(|last| last.check_next_batch(batch))
699            .unwrap_or(Ok(()));
700        self.last_batch = Some(batch.clone());
701        res
702    }
703
704    /// Formats current batch and last batch for debug.
705    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
706        use std::fmt::Write;
707
708        let mut message = String::new();
709        if let Some(last) = &self.last_batch {
710            write!(
711                message,
712                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
713                last.primary_key(),
714                last.last_timestamp(),
715                last.last_sequence()
716            )
717            .unwrap();
718        }
719        write!(
720            message,
721            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
722            batch.primary_key(),
723            batch.timestamps(),
724            batch.sequences()
725        )
726        .unwrap();
727
728        message
729    }
730
731    /// Checks batches from the part range are monotonic. Otherwise, panics.
732    pub(crate) fn ensure_part_range_batch(
733        &mut self,
734        scanner: &str,
735        region_id: store_api::storage::RegionId,
736        partition: usize,
737        part_range: store_api::region_engine::PartitionRange,
738        batch: &Batch,
739    ) {
740        if let Err(e) = self.check_monotonic(batch) {
741            let err_msg = format!(
742                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
743                scanner, e, region_id, partition, part_range,
744            );
745            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
746            // Only print the number of row in the panic message.
747            panic!("{err_msg}, batch rows: {}", batch.num_rows());
748        }
749    }
750}
751
752/// Len of timestamp in arrow row format.
753const TIMESTAMP_KEY_LEN: usize = 9;
754
755/// Helper function to concat arrays from `iter`.
756fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
757    let arrays: Vec<_> = iter.collect();
758    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
759    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
760}
761
762/// A column in a [Batch].
763#[derive(Debug, PartialEq, Eq, Clone)]
764pub struct BatchColumn {
765    /// Id of the column.
766    pub column_id: ColumnId,
767    /// Data of the column.
768    pub data: VectorRef,
769}
770
771/// Builder to build [Batch].
772pub struct BatchBuilder {
773    primary_key: Vec<u8>,
774    timestamps: Option<VectorRef>,
775    sequences: Option<Arc<UInt64Vector>>,
776    op_types: Option<Arc<UInt8Vector>>,
777    fields: Vec<BatchColumn>,
778}
779
780impl BatchBuilder {
781    /// Creates a new [BatchBuilder] with primary key.
782    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
783        BatchBuilder {
784            primary_key,
785            timestamps: None,
786            sequences: None,
787            op_types: None,
788            fields: Vec::new(),
789        }
790    }
791
792    /// Creates a new [BatchBuilder] with all required columns.
793    pub fn with_required_columns(
794        primary_key: Vec<u8>,
795        timestamps: VectorRef,
796        sequences: Arc<UInt64Vector>,
797        op_types: Arc<UInt8Vector>,
798    ) -> BatchBuilder {
799        BatchBuilder {
800            primary_key,
801            timestamps: Some(timestamps),
802            sequences: Some(sequences),
803            op_types: Some(op_types),
804            fields: Vec::new(),
805        }
806    }
807
808    /// Set all field columns.
809    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
810        self.fields = fields;
811        self
812    }
813
814    /// Push a field column.
815    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
816        self.fields.push(column);
817        self
818    }
819
820    /// Push an array as a field.
821    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
822        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
823        self.fields.push(BatchColumn {
824            column_id,
825            data: vector,
826        });
827
828        Ok(self)
829    }
830
831    /// Try to set an array as timestamps.
832    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
833        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
834        ensure!(
835            vector.data_type().is_timestamp(),
836            InvalidBatchSnafu {
837                reason: format!("{:?} is not a timestamp type", vector.data_type()),
838            }
839        );
840
841        self.timestamps = Some(vector);
842        Ok(self)
843    }
844
845    /// Try to set an array as sequences.
846    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
847        ensure!(
848            *array.data_type() == arrow::datatypes::DataType::UInt64,
849            InvalidBatchSnafu {
850                reason: "sequence array is not UInt64 type",
851            }
852        );
853        // Safety: The cast must success as we have ensured it is uint64 type.
854        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
855        self.sequences = Some(vector);
856
857        Ok(self)
858    }
859
860    /// Try to set an array as op types.
861    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
862        ensure!(
863            *array.data_type() == arrow::datatypes::DataType::UInt8,
864            InvalidBatchSnafu {
865                reason: "sequence array is not UInt8 type",
866            }
867        );
868        // Safety: The cast must success as we have ensured it is uint64 type.
869        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
870        self.op_types = Some(vector);
871
872        Ok(self)
873    }
874
875    /// Builds the [Batch].
876    pub fn build(self) -> Result<Batch> {
877        let timestamps = self.timestamps.context(InvalidBatchSnafu {
878            reason: "missing timestamps",
879        })?;
880        let sequences = self.sequences.context(InvalidBatchSnafu {
881            reason: "missing sequences",
882        })?;
883        let op_types = self.op_types.context(InvalidBatchSnafu {
884            reason: "missing op_types",
885        })?;
886        // Our storage format ensure these columns are not nullable so
887        // we use assert here.
888        assert_eq!(0, timestamps.null_count());
889        assert_eq!(0, sequences.null_count());
890        assert_eq!(0, op_types.null_count());
891
892        let ts_len = timestamps.len();
893        ensure!(
894            sequences.len() == ts_len,
895            InvalidBatchSnafu {
896                reason: format!(
897                    "sequence have different len {} != {}",
898                    sequences.len(),
899                    ts_len
900                ),
901            }
902        );
903        ensure!(
904            op_types.len() == ts_len,
905            InvalidBatchSnafu {
906                reason: format!(
907                    "op type have different len {} != {}",
908                    op_types.len(),
909                    ts_len
910                ),
911            }
912        );
913        for column in &self.fields {
914            ensure!(
915                column.data.len() == ts_len,
916                InvalidBatchSnafu {
917                    reason: format!(
918                        "column {} has different len {} != {}",
919                        column.column_id,
920                        column.data.len(),
921                        ts_len
922                    ),
923                }
924            );
925        }
926
927        Ok(Batch {
928            primary_key: self.primary_key,
929            pk_values: None,
930            timestamps,
931            sequences,
932            op_types,
933            fields: self.fields,
934            fields_idx: None,
935        })
936    }
937}
938
939impl From<Batch> for BatchBuilder {
940    fn from(batch: Batch) -> Self {
941        Self {
942            primary_key: batch.primary_key,
943            timestamps: Some(batch.timestamps),
944            sequences: Some(batch.sequences),
945            op_types: Some(batch.op_types),
946            fields: batch.fields,
947        }
948    }
949}
950
951/// Async [Batch] reader and iterator wrapper.
952///
953/// This is the data source for SST writers or internal readers.
954pub enum Source {
955    /// Source from a [BoxedBatchReader].
956    Reader(BoxedBatchReader),
957    /// Source from a [BoxedBatchIterator].
958    Iter(BoxedBatchIterator),
959    /// Source from a [BoxedBatchStream].
960    Stream(BoxedBatchStream),
961    /// Source from a [PruneReader].
962    PruneReader(PruneReader),
963}
964
965impl Source {
966    /// Returns next [Batch] from this data source.
967    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
968        match self {
969            Source::Reader(reader) => reader.next_batch().await,
970            Source::Iter(iter) => iter.next().transpose(),
971            Source::Stream(stream) => stream.try_next().await,
972            Source::PruneReader(reader) => reader.next_batch().await,
973        }
974    }
975}
976
977/// Async batch reader.
978///
979/// The reader must guarantee [Batch]es returned by it have the same schema.
980#[async_trait]
981pub trait BatchReader: Send {
982    /// Fetch next [Batch].
983    ///
984    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
985    /// again won't return batch again.
986    ///
987    /// If `Err` is returned, caller should not call this method again, the implementor
988    /// may or may not panic in such case.
989    async fn next_batch(&mut self) -> Result<Option<Batch>>;
990}
991
992/// Pointer to [BatchReader].
993pub type BoxedBatchReader = Box<dyn BatchReader>;
994
995/// Pointer to a stream that yields [Batch].
996pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
997
998#[async_trait::async_trait]
999impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1000    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1001        (**self).next_batch().await
1002    }
1003}
1004
1005/// Local metrics for scanners.
1006#[derive(Debug, Default)]
1007pub(crate) struct ScannerMetrics {
1008    /// Duration to prepare the scan task.
1009    prepare_scan_cost: Duration,
1010    /// Duration to build the (merge) reader.
1011    build_reader_cost: Duration,
1012    /// Duration to scan data.
1013    scan_cost: Duration,
1014    /// Duration to convert batches.
1015    convert_cost: Duration,
1016    /// Duration while waiting for `yield`.
1017    yield_cost: Duration,
1018    /// Number of batches returned.
1019    num_batches: usize,
1020    /// Number of rows returned.
1021    num_rows: usize,
1022    /// Number of mem ranges scanned.
1023    num_mem_ranges: usize,
1024    /// Number of file ranges scanned.
1025    num_file_ranges: usize,
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1031    use store_api::codec::PrimaryKeyEncoding;
1032    use store_api::storage::consts::ReservedColumnId;
1033
1034    use super::*;
1035    use crate::error::Error;
1036    use crate::test_util::new_batch_builder;
1037
1038    fn new_batch(
1039        timestamps: &[i64],
1040        sequences: &[u64],
1041        op_types: &[OpType],
1042        field: &[u64],
1043    ) -> Batch {
1044        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1045            .build()
1046            .unwrap()
1047    }
1048
1049    #[test]
1050    fn test_empty_batch() {
1051        let batch = new_batch(&[], &[], &[], &[]);
1052        assert_eq!(None, batch.first_timestamp());
1053        assert_eq!(None, batch.last_timestamp());
1054        assert_eq!(None, batch.first_sequence());
1055        assert_eq!(None, batch.last_sequence());
1056        assert!(batch.timestamps_native().is_none());
1057    }
1058
1059    #[test]
1060    fn test_first_last_one() {
1061        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1062        assert_eq!(
1063            Timestamp::new_millisecond(1),
1064            batch.first_timestamp().unwrap()
1065        );
1066        assert_eq!(
1067            Timestamp::new_millisecond(1),
1068            batch.last_timestamp().unwrap()
1069        );
1070        assert_eq!(2, batch.first_sequence().unwrap());
1071        assert_eq!(2, batch.last_sequence().unwrap());
1072    }
1073
1074    #[test]
1075    fn test_first_last_multiple() {
1076        let batch = new_batch(
1077            &[1, 2, 3],
1078            &[11, 12, 13],
1079            &[OpType::Put, OpType::Put, OpType::Put],
1080            &[21, 22, 23],
1081        );
1082        assert_eq!(
1083            Timestamp::new_millisecond(1),
1084            batch.first_timestamp().unwrap()
1085        );
1086        assert_eq!(
1087            Timestamp::new_millisecond(3),
1088            batch.last_timestamp().unwrap()
1089        );
1090        assert_eq!(11, batch.first_sequence().unwrap());
1091        assert_eq!(13, batch.last_sequence().unwrap());
1092    }
1093
1094    #[test]
1095    fn test_slice() {
1096        let batch = new_batch(
1097            &[1, 2, 3, 4],
1098            &[11, 12, 13, 14],
1099            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1100            &[21, 22, 23, 24],
1101        );
1102        let batch = batch.slice(1, 2);
1103        let expect = new_batch(
1104            &[2, 3],
1105            &[12, 13],
1106            &[OpType::Delete, OpType::Put],
1107            &[22, 23],
1108        );
1109        assert_eq!(expect, batch);
1110    }
1111
1112    #[test]
1113    fn test_timestamps_native() {
1114        let batch = new_batch(
1115            &[1, 2, 3, 4],
1116            &[11, 12, 13, 14],
1117            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1118            &[21, 22, 23, 24],
1119        );
1120        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1121    }
1122
1123    #[test]
1124    fn test_concat_empty() {
1125        let err = Batch::concat(vec![]).unwrap_err();
1126        assert!(
1127            matches!(err, Error::InvalidBatch { .. }),
1128            "unexpected err: {err}"
1129        );
1130    }
1131
1132    #[test]
1133    fn test_concat_one() {
1134        let batch = new_batch(&[], &[], &[], &[]);
1135        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1136        assert_eq!(batch, actual);
1137
1138        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1139        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1140        assert_eq!(batch, actual);
1141    }
1142
1143    #[test]
1144    fn test_concat_multiple() {
1145        let batches = vec![
1146            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1147            new_batch(
1148                &[3, 4, 5],
1149                &[13, 14, 15],
1150                &[OpType::Put, OpType::Delete, OpType::Put],
1151                &[23, 24, 25],
1152            ),
1153            new_batch(&[], &[], &[], &[]),
1154            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1155        ];
1156        let batch = Batch::concat(batches).unwrap();
1157        let expect = new_batch(
1158            &[1, 2, 3, 4, 5, 6],
1159            &[11, 12, 13, 14, 15, 16],
1160            &[
1161                OpType::Put,
1162                OpType::Put,
1163                OpType::Put,
1164                OpType::Delete,
1165                OpType::Put,
1166                OpType::Put,
1167            ],
1168            &[21, 22, 23, 24, 25, 26],
1169        );
1170        assert_eq!(expect, batch);
1171    }
1172
1173    #[test]
1174    fn test_concat_different() {
1175        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1176        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1177        batch2.primary_key = b"hello".to_vec();
1178        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1179        assert!(
1180            matches!(err, Error::InvalidBatch { .. }),
1181            "unexpected err: {err}"
1182        );
1183    }
1184
1185    #[test]
1186    fn test_concat_different_fields() {
1187        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1188        let fields = vec![
1189            batch1.fields()[0].clone(),
1190            BatchColumn {
1191                column_id: 2,
1192                data: Arc::new(UInt64Vector::from_slice([2])),
1193            },
1194        ];
1195        // Batch 2 has more fields.
1196        let batch2 = batch1.clone().with_fields(fields).unwrap();
1197        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1198        assert!(
1199            matches!(err, Error::InvalidBatch { .. }),
1200            "unexpected err: {err}"
1201        );
1202
1203        // Batch 2 has different field.
1204        let fields = vec![BatchColumn {
1205            column_id: 2,
1206            data: Arc::new(UInt64Vector::from_slice([2])),
1207        }];
1208        let batch2 = batch1.clone().with_fields(fields).unwrap();
1209        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1210        assert!(
1211            matches!(err, Error::InvalidBatch { .. }),
1212            "unexpected err: {err}"
1213        );
1214    }
1215
1216    #[test]
1217    fn test_filter_deleted_empty() {
1218        let mut batch = new_batch(&[], &[], &[], &[]);
1219        batch.filter_deleted().unwrap();
1220        assert!(batch.is_empty());
1221    }
1222
1223    #[test]
1224    fn test_filter_deleted() {
1225        let mut batch = new_batch(
1226            &[1, 2, 3, 4],
1227            &[11, 12, 13, 14],
1228            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1229            &[21, 22, 23, 24],
1230        );
1231        batch.filter_deleted().unwrap();
1232        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1233        assert_eq!(expect, batch);
1234
1235        let mut batch = new_batch(
1236            &[1, 2, 3, 4],
1237            &[11, 12, 13, 14],
1238            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1239            &[21, 22, 23, 24],
1240        );
1241        let expect = batch.clone();
1242        batch.filter_deleted().unwrap();
1243        assert_eq!(expect, batch);
1244    }
1245
1246    #[test]
1247    fn test_filter_by_sequence() {
1248        // Filters put only.
1249        let mut batch = new_batch(
1250            &[1, 2, 3, 4],
1251            &[11, 12, 13, 14],
1252            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1253            &[21, 22, 23, 24],
1254        );
1255        batch.filter_by_sequence(Some(13)).unwrap();
1256        let expect = new_batch(
1257            &[1, 2, 3],
1258            &[11, 12, 13],
1259            &[OpType::Put, OpType::Put, OpType::Put],
1260            &[21, 22, 23],
1261        );
1262        assert_eq!(expect, batch);
1263
1264        // Filters to empty.
1265        let mut batch = new_batch(
1266            &[1, 2, 3, 4],
1267            &[11, 12, 13, 14],
1268            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1269            &[21, 22, 23, 24],
1270        );
1271
1272        batch.filter_by_sequence(Some(10)).unwrap();
1273        assert!(batch.is_empty());
1274
1275        // None filter.
1276        let mut batch = new_batch(
1277            &[1, 2, 3, 4],
1278            &[11, 12, 13, 14],
1279            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1280            &[21, 22, 23, 24],
1281        );
1282        let expect = batch.clone();
1283        batch.filter_by_sequence(None).unwrap();
1284        assert_eq!(expect, batch);
1285
1286        // Filter a empty batch
1287        let mut batch = new_batch(&[], &[], &[], &[]);
1288        batch.filter_by_sequence(Some(10)).unwrap();
1289        assert!(batch.is_empty());
1290
1291        // Filter a empty batch with None
1292        let mut batch = new_batch(&[], &[], &[], &[]);
1293        batch.filter_by_sequence(None).unwrap();
1294        assert!(batch.is_empty());
1295    }
1296
1297    #[test]
1298    fn test_filter() {
1299        // Filters put only.
1300        let mut batch = new_batch(
1301            &[1, 2, 3, 4],
1302            &[11, 12, 13, 14],
1303            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1304            &[21, 22, 23, 24],
1305        );
1306        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1307        batch.filter(&predicate).unwrap();
1308        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1309        assert_eq!(expect, batch);
1310
1311        // Filters deletion.
1312        let mut batch = new_batch(
1313            &[1, 2, 3, 4],
1314            &[11, 12, 13, 14],
1315            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1316            &[21, 22, 23, 24],
1317        );
1318        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1319        batch.filter(&predicate).unwrap();
1320        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1321        assert_eq!(expect, batch);
1322
1323        // Filters to empty.
1324        let predicate = BooleanVector::from_vec(vec![false, false]);
1325        batch.filter(&predicate).unwrap();
1326        assert!(batch.is_empty());
1327    }
1328
1329    #[test]
1330    fn test_sort_and_dedup() {
1331        let original = new_batch(
1332            &[2, 3, 1, 4, 5, 2],
1333            &[1, 2, 3, 4, 5, 6],
1334            &[
1335                OpType::Put,
1336                OpType::Put,
1337                OpType::Put,
1338                OpType::Put,
1339                OpType::Put,
1340                OpType::Put,
1341            ],
1342            &[21, 22, 23, 24, 25, 26],
1343        );
1344
1345        let mut batch = original.clone();
1346        batch.sort(true).unwrap();
1347        // It should only keep one timestamp 2.
1348        assert_eq!(
1349            new_batch(
1350                &[1, 2, 3, 4, 5],
1351                &[3, 6, 2, 4, 5],
1352                &[
1353                    OpType::Put,
1354                    OpType::Put,
1355                    OpType::Put,
1356                    OpType::Put,
1357                    OpType::Put,
1358                ],
1359                &[23, 26, 22, 24, 25],
1360            ),
1361            batch
1362        );
1363
1364        let mut batch = original.clone();
1365        batch.sort(false).unwrap();
1366
1367        // It should only keep one timestamp 2.
1368        assert_eq!(
1369            new_batch(
1370                &[1, 2, 2, 3, 4, 5],
1371                &[3, 6, 1, 2, 4, 5],
1372                &[
1373                    OpType::Put,
1374                    OpType::Put,
1375                    OpType::Put,
1376                    OpType::Put,
1377                    OpType::Put,
1378                    OpType::Put,
1379                ],
1380                &[23, 26, 21, 22, 24, 25],
1381            ),
1382            batch
1383        );
1384
1385        let original = new_batch(
1386            &[2, 2, 1],
1387            &[1, 6, 1],
1388            &[OpType::Delete, OpType::Put, OpType::Put],
1389            &[21, 22, 23],
1390        );
1391
1392        let mut batch = original.clone();
1393        batch.sort(true).unwrap();
1394        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1395        assert_eq!(expect, batch);
1396
1397        let mut batch = original.clone();
1398        batch.sort(false).unwrap();
1399        let expect = new_batch(
1400            &[1, 2, 2],
1401            &[1, 6, 1],
1402            &[OpType::Put, OpType::Put, OpType::Delete],
1403            &[23, 22, 21],
1404        );
1405        assert_eq!(expect, batch);
1406    }
1407
1408    #[test]
1409    fn test_get_value() {
1410        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1411
1412        for encoding in encodings {
1413            let codec = build_primary_key_codec_with_fields(
1414                encoding,
1415                [
1416                    (
1417                        ReservedColumnId::table_id(),
1418                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1419                    ),
1420                    (
1421                        ReservedColumnId::tsid(),
1422                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1423                    ),
1424                    (
1425                        100,
1426                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1427                    ),
1428                    (
1429                        200,
1430                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1431                    ),
1432                ]
1433                .into_iter(),
1434            );
1435
1436            let values = [
1437                Value::UInt32(1000),
1438                Value::UInt64(2000),
1439                Value::String("abcdefgh".into()),
1440                Value::String("zyxwvu".into()),
1441            ];
1442            let mut buf = vec![];
1443            codec
1444                .encode_values(
1445                    &[
1446                        (ReservedColumnId::table_id(), values[0].clone()),
1447                        (ReservedColumnId::tsid(), values[1].clone()),
1448                        (100, values[2].clone()),
1449                        (200, values[3].clone()),
1450                    ],
1451                    &mut buf,
1452                )
1453                .unwrap();
1454
1455            let field_col_id = 2;
1456            let mut batch = new_batch_builder(
1457                &buf,
1458                &[1, 2, 3],
1459                &[1, 1, 1],
1460                &[OpType::Put, OpType::Put, OpType::Put],
1461                field_col_id,
1462                &[42, 43, 44],
1463            )
1464            .build()
1465            .unwrap();
1466
1467            let v = batch
1468                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1469                .unwrap()
1470                .unwrap();
1471            assert_eq!(values[0], *v);
1472
1473            let v = batch
1474                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1475                .unwrap()
1476                .unwrap();
1477            assert_eq!(values[1], *v);
1478
1479            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1480            assert_eq!(values[2], *v);
1481
1482            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1483            assert_eq!(values[3], *v);
1484
1485            let v = batch.field_col_value(field_col_id).unwrap();
1486            assert_eq!(v.data.get(0), Value::UInt64(42));
1487            assert_eq!(v.data.get(1), Value::UInt64(43));
1488            assert_eq!(v.data.get(2), Value::UInt64(44));
1489        }
1490    }
1491}