mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub(crate) mod range;
25pub(crate) mod scan_region;
26pub(crate) mod scan_util;
27pub(crate) mod seq_scan;
28pub(crate) mod series_scan;
29pub(crate) mod unordered_scan;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use std::time::Duration;
34
35use api::v1::OpType;
36use async_trait::async_trait;
37use common_time::Timestamp;
38use datafusion_common::arrow::array::UInt8Array;
39use datatypes::arrow;
40use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
41use datatypes::arrow::compute::SortOptions;
42use datatypes::arrow::row::{RowConverter, SortField};
43use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
44use datatypes::types::TimestampType;
45use datatypes::value::{Value, ValueRef};
46use datatypes::vectors::{
47    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
48    TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
49    Vector, VectorRef,
50};
51use futures::stream::BoxStream;
52use futures::TryStreamExt;
53use snafu::{ensure, OptionExt, ResultExt};
54use store_api::metadata::RegionMetadata;
55use store_api::storage::{ColumnId, SequenceNumber};
56
57use crate::error::{
58    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
59};
60use crate::memtable::BoxedBatchIterator;
61use crate::read::prune::PruneReader;
62use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
63
64/// Storage internal representation of a batch of rows for a primary key (time series).
65///
66/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
67/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
68#[derive(Debug, PartialEq, Clone)]
69pub struct Batch {
70    /// Primary key encoded in a comparable form.
71    primary_key: Vec<u8>,
72    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
73    pk_values: Option<CompositeValues>,
74    /// Timestamps of rows, should be sorted and not null.
75    timestamps: VectorRef,
76    /// Sequences of rows
77    ///
78    /// UInt64 type, not null.
79    sequences: Arc<UInt64Vector>,
80    /// Op types of rows
81    ///
82    /// UInt8 type, not null.
83    op_types: Arc<UInt8Vector>,
84    /// Fields organized in columnar format.
85    fields: Vec<BatchColumn>,
86    /// Cache for field index lookup.
87    fields_idx: Option<HashMap<ColumnId, usize>>,
88}
89
90impl Batch {
91    /// Creates a new batch.
92    pub fn new(
93        primary_key: Vec<u8>,
94        timestamps: VectorRef,
95        sequences: Arc<UInt64Vector>,
96        op_types: Arc<UInt8Vector>,
97        fields: Vec<BatchColumn>,
98    ) -> Result<Batch> {
99        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
100            .with_fields(fields)
101            .build()
102    }
103
104    /// Tries to set fields for the batch.
105    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
106        Batch::new(
107            self.primary_key,
108            self.timestamps,
109            self.sequences,
110            self.op_types,
111            fields,
112        )
113    }
114
115    /// Returns primary key of the batch.
116    pub fn primary_key(&self) -> &[u8] {
117        &self.primary_key
118    }
119
120    /// Returns possibly decoded primary-key values.
121    pub fn pk_values(&self) -> Option<&CompositeValues> {
122        self.pk_values.as_ref()
123    }
124
125    /// Sets possibly decoded primary-key values.
126    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
127        self.pk_values = Some(pk_values);
128    }
129
130    /// Removes possibly decoded primary-key values. For testing only.
131    #[cfg(any(test, feature = "test"))]
132    pub fn remove_pk_values(&mut self) {
133        self.pk_values = None;
134    }
135
136    /// Returns fields in the batch.
137    pub fn fields(&self) -> &[BatchColumn] {
138        &self.fields
139    }
140
141    /// Returns timestamps of the batch.
142    pub fn timestamps(&self) -> &VectorRef {
143        &self.timestamps
144    }
145
146    /// Returns sequences of the batch.
147    pub fn sequences(&self) -> &Arc<UInt64Vector> {
148        &self.sequences
149    }
150
151    /// Returns op types of the batch.
152    pub fn op_types(&self) -> &Arc<UInt8Vector> {
153        &self.op_types
154    }
155
156    /// Returns the number of rows in the batch.
157    pub fn num_rows(&self) -> usize {
158        // All vectors have the same length. We use the length of sequences vector
159        // since it has static type.
160        self.sequences.len()
161    }
162
163    /// Returns true if the number of rows in the batch is 0.
164    pub fn is_empty(&self) -> bool {
165        self.num_rows() == 0
166    }
167
168    /// Returns the first timestamp in the batch or `None` if the batch is empty.
169    pub fn first_timestamp(&self) -> Option<Timestamp> {
170        if self.timestamps.is_empty() {
171            return None;
172        }
173
174        Some(self.get_timestamp(0))
175    }
176
177    /// Returns the last timestamp in the batch or `None` if the batch is empty.
178    pub fn last_timestamp(&self) -> Option<Timestamp> {
179        if self.timestamps.is_empty() {
180            return None;
181        }
182
183        Some(self.get_timestamp(self.timestamps.len() - 1))
184    }
185
186    /// Returns the first sequence in the batch or `None` if the batch is empty.
187    pub fn first_sequence(&self) -> Option<SequenceNumber> {
188        if self.sequences.is_empty() {
189            return None;
190        }
191
192        Some(self.get_sequence(0))
193    }
194
195    /// Returns the last sequence in the batch or `None` if the batch is empty.
196    pub fn last_sequence(&self) -> Option<SequenceNumber> {
197        if self.sequences.is_empty() {
198            return None;
199        }
200
201        Some(self.get_sequence(self.sequences.len() - 1))
202    }
203
204    /// Replaces the primary key of the batch.
205    ///
206    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
207    /// Be sure to update that field as well.
208    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
209        self.primary_key = primary_key;
210    }
211
212    /// Slice the batch, returning a new batch.
213    ///
214    /// # Panics
215    /// Panics if `offset + length > self.num_rows()`.
216    pub fn slice(&self, offset: usize, length: usize) -> Batch {
217        let fields = self
218            .fields
219            .iter()
220            .map(|column| BatchColumn {
221                column_id: column.column_id,
222                data: column.data.slice(offset, length),
223            })
224            .collect();
225        // We skip using the builder to avoid validating the batch again.
226        Batch {
227            // Now we need to clone the primary key. We could try `Bytes` if
228            // this becomes a bottleneck.
229            primary_key: self.primary_key.clone(),
230            pk_values: self.pk_values.clone(),
231            timestamps: self.timestamps.slice(offset, length),
232            sequences: Arc::new(self.sequences.get_slice(offset, length)),
233            op_types: Arc::new(self.op_types.get_slice(offset, length)),
234            fields,
235            fields_idx: self.fields_idx.clone(),
236        }
237    }
238
239    /// Takes `batches` and concat them into one batch.
240    ///
241    /// All `batches` must have the same primary key.
242    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
243        ensure!(
244            !batches.is_empty(),
245            InvalidBatchSnafu {
246                reason: "empty batches",
247            }
248        );
249        if batches.len() == 1 {
250            // Now we own the `batches` so we could pop it directly.
251            return Ok(batches.pop().unwrap());
252        }
253
254        let primary_key = std::mem::take(&mut batches[0].primary_key);
255        let first = &batches[0];
256        // We took the primary key from the first batch so we don't use `first.primary_key()`.
257        ensure!(
258            batches
259                .iter()
260                .skip(1)
261                .all(|b| b.primary_key() == primary_key),
262            InvalidBatchSnafu {
263                reason: "batches have different primary key",
264            }
265        );
266        for b in batches.iter().skip(1) {
267            ensure!(
268                b.fields.len() == first.fields.len(),
269                InvalidBatchSnafu {
270                    reason: "batches have different field num",
271                }
272            );
273            for (l, r) in b.fields.iter().zip(&first.fields) {
274                ensure!(
275                    l.column_id == r.column_id,
276                    InvalidBatchSnafu {
277                        reason: "batches have different fields",
278                    }
279                );
280            }
281        }
282
283        // We take the primary key from the first batch.
284        let mut builder = BatchBuilder::new(primary_key);
285        // Concat timestamps, sequences, op_types, fields.
286        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
287        builder.timestamps_array(array)?;
288        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
289        builder.sequences_array(array)?;
290        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
291        builder.op_types_array(array)?;
292        for (i, batch_column) in first.fields.iter().enumerate() {
293            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
294            builder.push_field_array(batch_column.column_id, array)?;
295        }
296
297        builder.build()
298    }
299
300    /// Removes rows whose op type is delete.
301    pub fn filter_deleted(&mut self) -> Result<()> {
302        // Safety: op type column is not null.
303        let array = self.op_types.as_arrow();
304        // Find rows with non-delete op type.
305        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
306        let predicate =
307            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
308        self.filter(&BooleanVector::from(predicate))
309    }
310
311    // Applies the `predicate` to the batch.
312    // Safety: We know the array type so we unwrap on casting.
313    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
314        self.timestamps = self
315            .timestamps
316            .filter(predicate)
317            .context(ComputeVectorSnafu)?;
318        self.sequences = Arc::new(
319            UInt64Vector::try_from_arrow_array(
320                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
321                    .context(ComputeArrowSnafu)?,
322            )
323            .unwrap(),
324        );
325        self.op_types = Arc::new(
326            UInt8Vector::try_from_arrow_array(
327                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
328                    .context(ComputeArrowSnafu)?,
329            )
330            .unwrap(),
331        );
332        for batch_column in &mut self.fields {
333            batch_column.data = batch_column
334                .data
335                .filter(predicate)
336                .context(ComputeVectorSnafu)?;
337        }
338
339        Ok(())
340    }
341
342    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
343    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
344        let seq = match (sequence, self.last_sequence()) {
345            (None, _) | (_, None) => return Ok(()),
346            (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
347            (Some(sequence), Some(_)) => sequence,
348        };
349
350        let seqs = self.sequences.as_arrow();
351        let sequence = UInt64Array::new_scalar(seq);
352        let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
353            .context(ComputeArrowSnafu)?;
354        let predicate = BooleanVector::from(predicate);
355        self.filter(&predicate)?;
356
357        Ok(())
358    }
359
360    /// Sorts rows in the batch. If `dedup` is true, it also removes
361    /// duplicated rows according to primary keys.
362    ///
363    /// It orders rows by timestamp, sequence desc and only keep the latest
364    /// row for the same timestamp. It doesn't consider op type as sequence
365    /// should already provide uniqueness for a row.
366    pub fn sort(&mut self, dedup: bool) -> Result<()> {
367        // If building a converter each time is costly, we may allow passing a
368        // converter.
369        let converter = RowConverter::new(vec![
370            SortField::new(self.timestamps.data_type().as_arrow_type()),
371            SortField::new_with_options(
372                self.sequences.data_type().as_arrow_type(),
373                SortOptions {
374                    descending: true,
375                    ..Default::default()
376                },
377            ),
378        ])
379        .context(ComputeArrowSnafu)?;
380        // Columns to sort.
381        let columns = [
382            self.timestamps.to_arrow_array(),
383            self.sequences.to_arrow_array(),
384        ];
385        let rows = converter.convert_columns(&columns).unwrap();
386        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
387
388        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
389        if !was_sorted {
390            to_sort.sort_unstable_by_key(|x| x.1);
391        }
392
393        let num_rows = to_sort.len();
394        if dedup {
395            // Dedup by timestamps.
396            to_sort.dedup_by(|left, right| {
397                debug_assert_eq!(18, left.1.as_ref().len());
398                debug_assert_eq!(18, right.1.as_ref().len());
399                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
400                // We only compare the timestamp part and ignore sequence.
401                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
402            });
403        }
404        let no_dedup = to_sort.len() == num_rows;
405
406        if was_sorted && no_dedup {
407            return Ok(());
408        }
409        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
410        self.take_in_place(&indices)
411    }
412
413    /// Returns the estimated memory size of the batch.
414    pub fn memory_size(&self) -> usize {
415        let mut size = std::mem::size_of::<Self>();
416        size += self.primary_key.len();
417        size += self.timestamps.memory_size();
418        size += self.sequences.memory_size();
419        size += self.op_types.memory_size();
420        for batch_column in &self.fields {
421            size += batch_column.data.memory_size();
422        }
423        size
424    }
425
426    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
427    pub(crate) fn projected_fields(
428        metadata: &RegionMetadata,
429        projection: &[ColumnId],
430    ) -> Vec<(ColumnId, ConcreteDataType)> {
431        let projected_ids: HashSet<_> = projection.iter().copied().collect();
432        metadata
433            .field_columns()
434            .filter_map(|column| {
435                if projected_ids.contains(&column.column_id) {
436                    Some((column.column_id, column.column_schema.data_type.clone()))
437                } else {
438                    None
439                }
440            })
441            .collect()
442    }
443
444    /// Returns timestamps in a native slice or `None` if the batch is empty.
445    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
446        if self.timestamps.is_empty() {
447            return None;
448        }
449
450        let values = match self.timestamps.data_type() {
451            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
452                .timestamps
453                .as_any()
454                .downcast_ref::<TimestampSecondVector>()
455                .unwrap()
456                .as_arrow()
457                .values(),
458            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
459                .timestamps
460                .as_any()
461                .downcast_ref::<TimestampMillisecondVector>()
462                .unwrap()
463                .as_arrow()
464                .values(),
465            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
466                .timestamps
467                .as_any()
468                .downcast_ref::<TimestampMicrosecondVector>()
469                .unwrap()
470                .as_arrow()
471                .values(),
472            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
473                .timestamps
474                .as_any()
475                .downcast_ref::<TimestampNanosecondVector>()
476                .unwrap()
477                .as_arrow()
478                .values(),
479            other => panic!("timestamps in a Batch has other type {:?}", other),
480        };
481
482        Some(values)
483    }
484
485    /// Takes the batch in place.
486    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
487        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
488        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
489            .context(ComputeArrowSnafu)?;
490        // Safety: we know the array and vector type.
491        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
492        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
493            .context(ComputeArrowSnafu)?;
494        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
495        for batch_column in &mut self.fields {
496            batch_column.data = batch_column
497                .data
498                .take(indices)
499                .context(ComputeVectorSnafu)?;
500        }
501
502        Ok(())
503    }
504
505    /// Gets a timestamp at given `index`.
506    ///
507    /// # Panics
508    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
509    fn get_timestamp(&self, index: usize) -> Timestamp {
510        match self.timestamps.get_ref(index) {
511            ValueRef::Timestamp(timestamp) => timestamp,
512
513            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
514            value => panic!("{:?} is not a timestamp", value),
515        }
516    }
517
518    /// Gets a sequence at given `index`.
519    ///
520    /// # Panics
521    /// Panics if `index` is out-of-bound or the sequence vector returns null.
522    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
523        // Safety: sequences is not null so it actually returns Some.
524        self.sequences.get_data(index).unwrap()
525    }
526
527    /// Checks the batch is monotonic by timestamps.
528    #[cfg(debug_assertions)]
529    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
530        use std::cmp::Ordering;
531        if self.timestamps_native().is_none() {
532            return Ok(());
533        }
534
535        let timestamps = self.timestamps_native().unwrap();
536        let sequences = self.sequences.as_arrow().values();
537        for (i, window) in timestamps.windows(2).enumerate() {
538            let current = window[0];
539            let next = window[1];
540            let current_sequence = sequences[i];
541            let next_sequence = sequences[i + 1];
542            match current.cmp(&next) {
543                Ordering::Less => {
544                    // The current timestamp is less than the next timestamp.
545                    continue;
546                }
547                Ordering::Equal => {
548                    // The current timestamp is equal to the next timestamp.
549                    if current_sequence < next_sequence {
550                        return Err(format!(
551                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
552                            current, next, current_sequence, next_sequence, i
553                        ));
554                    }
555                }
556                Ordering::Greater => {
557                    // The current timestamp is greater than the next timestamp.
558                    return Err(format!(
559                        "timestamps are not monotonic: {} > {}, index: {}",
560                        current, next, i
561                    ));
562                }
563            }
564        }
565
566        Ok(())
567    }
568
569    /// Returns Ok if the given batch is behind the current batch.
570    #[cfg(debug_assertions)]
571    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
572        // Checks the primary key
573        if self.primary_key() < other.primary_key() {
574            return Ok(());
575        }
576        if self.primary_key() > other.primary_key() {
577            return Err(format!(
578                "primary key is not monotonic: {:?} > {:?}",
579                self.primary_key(),
580                other.primary_key()
581            ));
582        }
583        // Checks the timestamp.
584        if self.last_timestamp() < other.first_timestamp() {
585            return Ok(());
586        }
587        if self.last_timestamp() > other.first_timestamp() {
588            return Err(format!(
589                "timestamps are not monotonic: {:?} > {:?}",
590                self.last_timestamp(),
591                other.first_timestamp()
592            ));
593        }
594        // Checks the sequence.
595        if self.last_sequence() >= other.first_sequence() {
596            return Ok(());
597        }
598        Err(format!(
599            "sequences are not monotonic: {:?} < {:?}",
600            self.last_sequence(),
601            other.first_sequence()
602        ))
603    }
604
605    /// Returns the value of the column in the primary key.
606    ///
607    /// Lazily decodes the primary key and caches the result.
608    pub fn pk_col_value(
609        &mut self,
610        codec: &dyn PrimaryKeyCodec,
611        col_idx_in_pk: usize,
612        column_id: ColumnId,
613    ) -> Result<Option<&Value>> {
614        if self.pk_values.is_none() {
615            self.pk_values = Some(codec.decode(&self.primary_key)?);
616        }
617
618        let pk_values = self.pk_values.as_ref().unwrap();
619        Ok(match pk_values {
620            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
621            CompositeValues::Sparse(values) => values.get(&column_id),
622        })
623    }
624
625    /// Returns values of the field in the batch.
626    ///
627    /// Lazily caches the field index.
628    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
629        if self.fields_idx.is_none() {
630            self.fields_idx = Some(
631                self.fields
632                    .iter()
633                    .enumerate()
634                    .map(|(i, c)| (c.column_id, i))
635                    .collect(),
636            );
637        }
638
639        self.fields_idx
640            .as_ref()
641            .unwrap()
642            .get(&column_id)
643            .map(|&idx| &self.fields[idx])
644    }
645}
646
647/// A struct to check the batch is monotonic.
648#[cfg(debug_assertions)]
649#[derive(Default)]
650pub(crate) struct BatchChecker {
651    last_batch: Option<Batch>,
652    start: Option<Timestamp>,
653    end: Option<Timestamp>,
654}
655
656#[cfg(debug_assertions)]
657impl BatchChecker {
658    /// Attaches the given start timestamp to the checker.
659    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
660        self.start = start;
661        self
662    }
663
664    /// Attaches the given end timestamp to the checker.
665    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
666        self.end = end;
667        self
668    }
669
670    /// Returns true if the given batch is monotonic and behind
671    /// the last batch.
672    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
673        batch.check_monotonic()?;
674
675        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
676            if start > first {
677                return Err(format!(
678                    "batch's first timestamp is before the start timestamp: {:?} > {:?}",
679                    start, first
680                ));
681            }
682        }
683        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
684            if end <= last {
685                return Err(format!(
686                    "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
687                    end, last
688                ));
689            }
690        }
691
692        // Checks the batch is behind the last batch.
693        // Then Updates the last batch.
694        let res = self
695            .last_batch
696            .as_ref()
697            .map(|last| last.check_next_batch(batch))
698            .unwrap_or(Ok(()));
699        self.last_batch = Some(batch.clone());
700        res
701    }
702
703    /// Formats current batch and last batch for debug.
704    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
705        use std::fmt::Write;
706
707        let mut message = String::new();
708        if let Some(last) = &self.last_batch {
709            write!(
710                message,
711                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
712                last.primary_key(),
713                last.last_timestamp(),
714                last.last_sequence()
715            )
716            .unwrap();
717        }
718        write!(
719            message,
720            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
721            batch.primary_key(),
722            batch.timestamps(),
723            batch.sequences()
724        )
725        .unwrap();
726
727        message
728    }
729
730    /// Checks batches from the part range are monotonic. Otherwise, panics.
731    pub(crate) fn ensure_part_range_batch(
732        &mut self,
733        scanner: &str,
734        region_id: store_api::storage::RegionId,
735        partition: usize,
736        part_range: store_api::region_engine::PartitionRange,
737        batch: &Batch,
738    ) {
739        if let Err(e) = self.check_monotonic(batch) {
740            let err_msg = format!(
741                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
742                scanner, e, region_id, partition, part_range,
743            );
744            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
745            // Only print the number of row in the panic message.
746            panic!("{err_msg}, batch rows: {}", batch.num_rows());
747        }
748    }
749}
750
751/// Len of timestamp in arrow row format.
752const TIMESTAMP_KEY_LEN: usize = 9;
753
754/// Helper function to concat arrays from `iter`.
755fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
756    let arrays: Vec<_> = iter.collect();
757    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
758    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
759}
760
761/// A column in a [Batch].
762#[derive(Debug, PartialEq, Eq, Clone)]
763pub struct BatchColumn {
764    /// Id of the column.
765    pub column_id: ColumnId,
766    /// Data of the column.
767    pub data: VectorRef,
768}
769
770/// Builder to build [Batch].
771pub struct BatchBuilder {
772    primary_key: Vec<u8>,
773    timestamps: Option<VectorRef>,
774    sequences: Option<Arc<UInt64Vector>>,
775    op_types: Option<Arc<UInt8Vector>>,
776    fields: Vec<BatchColumn>,
777}
778
779impl BatchBuilder {
780    /// Creates a new [BatchBuilder] with primary key.
781    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
782        BatchBuilder {
783            primary_key,
784            timestamps: None,
785            sequences: None,
786            op_types: None,
787            fields: Vec::new(),
788        }
789    }
790
791    /// Creates a new [BatchBuilder] with all required columns.
792    pub fn with_required_columns(
793        primary_key: Vec<u8>,
794        timestamps: VectorRef,
795        sequences: Arc<UInt64Vector>,
796        op_types: Arc<UInt8Vector>,
797    ) -> BatchBuilder {
798        BatchBuilder {
799            primary_key,
800            timestamps: Some(timestamps),
801            sequences: Some(sequences),
802            op_types: Some(op_types),
803            fields: Vec::new(),
804        }
805    }
806
807    /// Set all field columns.
808    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
809        self.fields = fields;
810        self
811    }
812
813    /// Push a field column.
814    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
815        self.fields.push(column);
816        self
817    }
818
819    /// Push an array as a field.
820    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
821        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
822        self.fields.push(BatchColumn {
823            column_id,
824            data: vector,
825        });
826
827        Ok(self)
828    }
829
830    /// Try to set an array as timestamps.
831    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
832        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
833        ensure!(
834            vector.data_type().is_timestamp(),
835            InvalidBatchSnafu {
836                reason: format!("{:?} is not a timestamp type", vector.data_type()),
837            }
838        );
839
840        self.timestamps = Some(vector);
841        Ok(self)
842    }
843
844    /// Try to set an array as sequences.
845    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
846        ensure!(
847            *array.data_type() == arrow::datatypes::DataType::UInt64,
848            InvalidBatchSnafu {
849                reason: "sequence array is not UInt64 type",
850            }
851        );
852        // Safety: The cast must success as we have ensured it is uint64 type.
853        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
854        self.sequences = Some(vector);
855
856        Ok(self)
857    }
858
859    /// Try to set an array as op types.
860    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
861        ensure!(
862            *array.data_type() == arrow::datatypes::DataType::UInt8,
863            InvalidBatchSnafu {
864                reason: "sequence array is not UInt8 type",
865            }
866        );
867        // Safety: The cast must success as we have ensured it is uint64 type.
868        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
869        self.op_types = Some(vector);
870
871        Ok(self)
872    }
873
874    /// Builds the [Batch].
875    pub fn build(self) -> Result<Batch> {
876        let timestamps = self.timestamps.context(InvalidBatchSnafu {
877            reason: "missing timestamps",
878        })?;
879        let sequences = self.sequences.context(InvalidBatchSnafu {
880            reason: "missing sequences",
881        })?;
882        let op_types = self.op_types.context(InvalidBatchSnafu {
883            reason: "missing op_types",
884        })?;
885        // Our storage format ensure these columns are not nullable so
886        // we use assert here.
887        assert_eq!(0, timestamps.null_count());
888        assert_eq!(0, sequences.null_count());
889        assert_eq!(0, op_types.null_count());
890
891        let ts_len = timestamps.len();
892        ensure!(
893            sequences.len() == ts_len,
894            InvalidBatchSnafu {
895                reason: format!(
896                    "sequence have different len {} != {}",
897                    sequences.len(),
898                    ts_len
899                ),
900            }
901        );
902        ensure!(
903            op_types.len() == ts_len,
904            InvalidBatchSnafu {
905                reason: format!(
906                    "op type have different len {} != {}",
907                    op_types.len(),
908                    ts_len
909                ),
910            }
911        );
912        for column in &self.fields {
913            ensure!(
914                column.data.len() == ts_len,
915                InvalidBatchSnafu {
916                    reason: format!(
917                        "column {} has different len {} != {}",
918                        column.column_id,
919                        column.data.len(),
920                        ts_len
921                    ),
922                }
923            );
924        }
925
926        Ok(Batch {
927            primary_key: self.primary_key,
928            pk_values: None,
929            timestamps,
930            sequences,
931            op_types,
932            fields: self.fields,
933            fields_idx: None,
934        })
935    }
936}
937
938impl From<Batch> for BatchBuilder {
939    fn from(batch: Batch) -> Self {
940        Self {
941            primary_key: batch.primary_key,
942            timestamps: Some(batch.timestamps),
943            sequences: Some(batch.sequences),
944            op_types: Some(batch.op_types),
945            fields: batch.fields,
946        }
947    }
948}
949
950/// Async [Batch] reader and iterator wrapper.
951///
952/// This is the data source for SST writers or internal readers.
953pub enum Source {
954    /// Source from a [BoxedBatchReader].
955    Reader(BoxedBatchReader),
956    /// Source from a [BoxedBatchIterator].
957    Iter(BoxedBatchIterator),
958    /// Source from a [BoxedBatchStream].
959    Stream(BoxedBatchStream),
960    /// Source from a [PruneReader].
961    PruneReader(PruneReader),
962}
963
964impl Source {
965    /// Returns next [Batch] from this data source.
966    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
967        match self {
968            Source::Reader(reader) => reader.next_batch().await,
969            Source::Iter(iter) => iter.next().transpose(),
970            Source::Stream(stream) => stream.try_next().await,
971            Source::PruneReader(reader) => reader.next_batch().await,
972        }
973    }
974}
975
976/// Async batch reader.
977///
978/// The reader must guarantee [Batch]es returned by it have the same schema.
979#[async_trait]
980pub trait BatchReader: Send {
981    /// Fetch next [Batch].
982    ///
983    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
984    /// again won't return batch again.
985    ///
986    /// If `Err` is returned, caller should not call this method again, the implementor
987    /// may or may not panic in such case.
988    async fn next_batch(&mut self) -> Result<Option<Batch>>;
989}
990
991/// Pointer to [BatchReader].
992pub type BoxedBatchReader = Box<dyn BatchReader>;
993
994/// Pointer to a stream that yields [Batch].
995pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
996
997#[async_trait::async_trait]
998impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
999    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1000        (**self).next_batch().await
1001    }
1002}
1003
1004/// Local metrics for scanners.
1005#[derive(Debug, Default)]
1006pub(crate) struct ScannerMetrics {
1007    /// Duration to prepare the scan task.
1008    prepare_scan_cost: Duration,
1009    /// Duration to build the (merge) reader.
1010    build_reader_cost: Duration,
1011    /// Duration to scan data.
1012    scan_cost: Duration,
1013    /// Duration to convert batches.
1014    convert_cost: Duration,
1015    /// Duration while waiting for `yield`.
1016    yield_cost: Duration,
1017    /// Number of batches returned.
1018    num_batches: usize,
1019    /// Number of rows returned.
1020    num_rows: usize,
1021    /// Number of mem ranges scanned.
1022    num_mem_ranges: usize,
1023    /// Number of file ranges scanned.
1024    num_file_ranges: usize,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use store_api::codec::PrimaryKeyEncoding;
1030    use store_api::storage::consts::ReservedColumnId;
1031
1032    use super::*;
1033    use crate::error::Error;
1034    use crate::row_converter::{self, build_primary_key_codec_with_fields};
1035    use crate::test_util::new_batch_builder;
1036
1037    fn new_batch(
1038        timestamps: &[i64],
1039        sequences: &[u64],
1040        op_types: &[OpType],
1041        field: &[u64],
1042    ) -> Batch {
1043        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1044            .build()
1045            .unwrap()
1046    }
1047
1048    #[test]
1049    fn test_empty_batch() {
1050        let batch = new_batch(&[], &[], &[], &[]);
1051        assert_eq!(None, batch.first_timestamp());
1052        assert_eq!(None, batch.last_timestamp());
1053        assert_eq!(None, batch.first_sequence());
1054        assert_eq!(None, batch.last_sequence());
1055        assert!(batch.timestamps_native().is_none());
1056    }
1057
1058    #[test]
1059    fn test_first_last_one() {
1060        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1061        assert_eq!(
1062            Timestamp::new_millisecond(1),
1063            batch.first_timestamp().unwrap()
1064        );
1065        assert_eq!(
1066            Timestamp::new_millisecond(1),
1067            batch.last_timestamp().unwrap()
1068        );
1069        assert_eq!(2, batch.first_sequence().unwrap());
1070        assert_eq!(2, batch.last_sequence().unwrap());
1071    }
1072
1073    #[test]
1074    fn test_first_last_multiple() {
1075        let batch = new_batch(
1076            &[1, 2, 3],
1077            &[11, 12, 13],
1078            &[OpType::Put, OpType::Put, OpType::Put],
1079            &[21, 22, 23],
1080        );
1081        assert_eq!(
1082            Timestamp::new_millisecond(1),
1083            batch.first_timestamp().unwrap()
1084        );
1085        assert_eq!(
1086            Timestamp::new_millisecond(3),
1087            batch.last_timestamp().unwrap()
1088        );
1089        assert_eq!(11, batch.first_sequence().unwrap());
1090        assert_eq!(13, batch.last_sequence().unwrap());
1091    }
1092
1093    #[test]
1094    fn test_slice() {
1095        let batch = new_batch(
1096            &[1, 2, 3, 4],
1097            &[11, 12, 13, 14],
1098            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1099            &[21, 22, 23, 24],
1100        );
1101        let batch = batch.slice(1, 2);
1102        let expect = new_batch(
1103            &[2, 3],
1104            &[12, 13],
1105            &[OpType::Delete, OpType::Put],
1106            &[22, 23],
1107        );
1108        assert_eq!(expect, batch);
1109    }
1110
1111    #[test]
1112    fn test_timestamps_native() {
1113        let batch = new_batch(
1114            &[1, 2, 3, 4],
1115            &[11, 12, 13, 14],
1116            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1117            &[21, 22, 23, 24],
1118        );
1119        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1120    }
1121
1122    #[test]
1123    fn test_concat_empty() {
1124        let err = Batch::concat(vec![]).unwrap_err();
1125        assert!(
1126            matches!(err, Error::InvalidBatch { .. }),
1127            "unexpected err: {err}"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_concat_one() {
1133        let batch = new_batch(&[], &[], &[], &[]);
1134        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1135        assert_eq!(batch, actual);
1136
1137        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1138        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1139        assert_eq!(batch, actual);
1140    }
1141
1142    #[test]
1143    fn test_concat_multiple() {
1144        let batches = vec![
1145            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1146            new_batch(
1147                &[3, 4, 5],
1148                &[13, 14, 15],
1149                &[OpType::Put, OpType::Delete, OpType::Put],
1150                &[23, 24, 25],
1151            ),
1152            new_batch(&[], &[], &[], &[]),
1153            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1154        ];
1155        let batch = Batch::concat(batches).unwrap();
1156        let expect = new_batch(
1157            &[1, 2, 3, 4, 5, 6],
1158            &[11, 12, 13, 14, 15, 16],
1159            &[
1160                OpType::Put,
1161                OpType::Put,
1162                OpType::Put,
1163                OpType::Delete,
1164                OpType::Put,
1165                OpType::Put,
1166            ],
1167            &[21, 22, 23, 24, 25, 26],
1168        );
1169        assert_eq!(expect, batch);
1170    }
1171
1172    #[test]
1173    fn test_concat_different() {
1174        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1175        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1176        batch2.primary_key = b"hello".to_vec();
1177        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1178        assert!(
1179            matches!(err, Error::InvalidBatch { .. }),
1180            "unexpected err: {err}"
1181        );
1182    }
1183
1184    #[test]
1185    fn test_concat_different_fields() {
1186        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1187        let fields = vec![
1188            batch1.fields()[0].clone(),
1189            BatchColumn {
1190                column_id: 2,
1191                data: Arc::new(UInt64Vector::from_slice([2])),
1192            },
1193        ];
1194        // Batch 2 has more fields.
1195        let batch2 = batch1.clone().with_fields(fields).unwrap();
1196        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1197        assert!(
1198            matches!(err, Error::InvalidBatch { .. }),
1199            "unexpected err: {err}"
1200        );
1201
1202        // Batch 2 has different field.
1203        let fields = vec![BatchColumn {
1204            column_id: 2,
1205            data: Arc::new(UInt64Vector::from_slice([2])),
1206        }];
1207        let batch2 = batch1.clone().with_fields(fields).unwrap();
1208        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1209        assert!(
1210            matches!(err, Error::InvalidBatch { .. }),
1211            "unexpected err: {err}"
1212        );
1213    }
1214
1215    #[test]
1216    fn test_filter_deleted_empty() {
1217        let mut batch = new_batch(&[], &[], &[], &[]);
1218        batch.filter_deleted().unwrap();
1219        assert!(batch.is_empty());
1220    }
1221
1222    #[test]
1223    fn test_filter_deleted() {
1224        let mut batch = new_batch(
1225            &[1, 2, 3, 4],
1226            &[11, 12, 13, 14],
1227            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1228            &[21, 22, 23, 24],
1229        );
1230        batch.filter_deleted().unwrap();
1231        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1232        assert_eq!(expect, batch);
1233
1234        let mut batch = new_batch(
1235            &[1, 2, 3, 4],
1236            &[11, 12, 13, 14],
1237            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1238            &[21, 22, 23, 24],
1239        );
1240        let expect = batch.clone();
1241        batch.filter_deleted().unwrap();
1242        assert_eq!(expect, batch);
1243    }
1244
1245    #[test]
1246    fn test_filter_by_sequence() {
1247        // Filters put only.
1248        let mut batch = new_batch(
1249            &[1, 2, 3, 4],
1250            &[11, 12, 13, 14],
1251            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1252            &[21, 22, 23, 24],
1253        );
1254        batch.filter_by_sequence(Some(13)).unwrap();
1255        let expect = new_batch(
1256            &[1, 2, 3],
1257            &[11, 12, 13],
1258            &[OpType::Put, OpType::Put, OpType::Put],
1259            &[21, 22, 23],
1260        );
1261        assert_eq!(expect, batch);
1262
1263        // Filters to empty.
1264        let mut batch = new_batch(
1265            &[1, 2, 3, 4],
1266            &[11, 12, 13, 14],
1267            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1268            &[21, 22, 23, 24],
1269        );
1270
1271        batch.filter_by_sequence(Some(10)).unwrap();
1272        assert!(batch.is_empty());
1273
1274        // None filter.
1275        let mut batch = new_batch(
1276            &[1, 2, 3, 4],
1277            &[11, 12, 13, 14],
1278            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1279            &[21, 22, 23, 24],
1280        );
1281        let expect = batch.clone();
1282        batch.filter_by_sequence(None).unwrap();
1283        assert_eq!(expect, batch);
1284
1285        // Filter a empty batch
1286        let mut batch = new_batch(&[], &[], &[], &[]);
1287        batch.filter_by_sequence(Some(10)).unwrap();
1288        assert!(batch.is_empty());
1289
1290        // Filter a empty batch with None
1291        let mut batch = new_batch(&[], &[], &[], &[]);
1292        batch.filter_by_sequence(None).unwrap();
1293        assert!(batch.is_empty());
1294    }
1295
1296    #[test]
1297    fn test_filter() {
1298        // Filters put only.
1299        let mut batch = new_batch(
1300            &[1, 2, 3, 4],
1301            &[11, 12, 13, 14],
1302            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1303            &[21, 22, 23, 24],
1304        );
1305        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1306        batch.filter(&predicate).unwrap();
1307        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1308        assert_eq!(expect, batch);
1309
1310        // Filters deletion.
1311        let mut batch = new_batch(
1312            &[1, 2, 3, 4],
1313            &[11, 12, 13, 14],
1314            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1315            &[21, 22, 23, 24],
1316        );
1317        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1318        batch.filter(&predicate).unwrap();
1319        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1320        assert_eq!(expect, batch);
1321
1322        // Filters to empty.
1323        let predicate = BooleanVector::from_vec(vec![false, false]);
1324        batch.filter(&predicate).unwrap();
1325        assert!(batch.is_empty());
1326    }
1327
1328    #[test]
1329    fn test_sort_and_dedup() {
1330        let original = new_batch(
1331            &[2, 3, 1, 4, 5, 2],
1332            &[1, 2, 3, 4, 5, 6],
1333            &[
1334                OpType::Put,
1335                OpType::Put,
1336                OpType::Put,
1337                OpType::Put,
1338                OpType::Put,
1339                OpType::Put,
1340            ],
1341            &[21, 22, 23, 24, 25, 26],
1342        );
1343
1344        let mut batch = original.clone();
1345        batch.sort(true).unwrap();
1346        // It should only keep one timestamp 2.
1347        assert_eq!(
1348            new_batch(
1349                &[1, 2, 3, 4, 5],
1350                &[3, 6, 2, 4, 5],
1351                &[
1352                    OpType::Put,
1353                    OpType::Put,
1354                    OpType::Put,
1355                    OpType::Put,
1356                    OpType::Put,
1357                ],
1358                &[23, 26, 22, 24, 25],
1359            ),
1360            batch
1361        );
1362
1363        let mut batch = original.clone();
1364        batch.sort(false).unwrap();
1365
1366        // It should only keep one timestamp 2.
1367        assert_eq!(
1368            new_batch(
1369                &[1, 2, 2, 3, 4, 5],
1370                &[3, 6, 1, 2, 4, 5],
1371                &[
1372                    OpType::Put,
1373                    OpType::Put,
1374                    OpType::Put,
1375                    OpType::Put,
1376                    OpType::Put,
1377                    OpType::Put,
1378                ],
1379                &[23, 26, 21, 22, 24, 25],
1380            ),
1381            batch
1382        );
1383
1384        let original = new_batch(
1385            &[2, 2, 1],
1386            &[1, 6, 1],
1387            &[OpType::Delete, OpType::Put, OpType::Put],
1388            &[21, 22, 23],
1389        );
1390
1391        let mut batch = original.clone();
1392        batch.sort(true).unwrap();
1393        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1394        assert_eq!(expect, batch);
1395
1396        let mut batch = original.clone();
1397        batch.sort(false).unwrap();
1398        let expect = new_batch(
1399            &[1, 2, 2],
1400            &[1, 6, 1],
1401            &[OpType::Put, OpType::Put, OpType::Delete],
1402            &[23, 22, 21],
1403        );
1404        assert_eq!(expect, batch);
1405    }
1406
1407    #[test]
1408    fn test_get_value() {
1409        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1410
1411        for encoding in encodings {
1412            let codec = build_primary_key_codec_with_fields(
1413                encoding,
1414                [
1415                    (
1416                        ReservedColumnId::table_id(),
1417                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1418                    ),
1419                    (
1420                        ReservedColumnId::tsid(),
1421                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1422                    ),
1423                    (
1424                        100,
1425                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1426                    ),
1427                    (
1428                        200,
1429                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1430                    ),
1431                ]
1432                .into_iter(),
1433            );
1434
1435            let values = [
1436                Value::UInt32(1000),
1437                Value::UInt64(2000),
1438                Value::String("abcdefgh".into()),
1439                Value::String("zyxwvu".into()),
1440            ];
1441            let mut buf = vec![];
1442            codec
1443                .encode_values(
1444                    &[
1445                        (ReservedColumnId::table_id(), values[0].clone()),
1446                        (ReservedColumnId::tsid(), values[1].clone()),
1447                        (100, values[2].clone()),
1448                        (200, values[3].clone()),
1449                    ],
1450                    &mut buf,
1451                )
1452                .unwrap();
1453
1454            let field_col_id = 2;
1455            let mut batch = new_batch_builder(
1456                &buf,
1457                &[1, 2, 3],
1458                &[1, 1, 1],
1459                &[OpType::Put, OpType::Put, OpType::Put],
1460                field_col_id,
1461                &[42, 43, 44],
1462            )
1463            .build()
1464            .unwrap();
1465
1466            let v = batch
1467                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1468                .unwrap()
1469                .unwrap();
1470            assert_eq!(values[0], *v);
1471
1472            let v = batch
1473                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1474                .unwrap()
1475                .unwrap();
1476            assert_eq!(values[1], *v);
1477
1478            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1479            assert_eq!(values[2], *v);
1480
1481            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1482            assert_eq!(values[3], *v);
1483
1484            let v = batch.field_col_value(field_col_id).unwrap();
1485            assert_eq!(v.data.get(0), Value::UInt64(42));
1486            assert_eq!(v.data.get(1), Value::UInt64(43));
1487            assert_eq!(v.data.get(2), Value::UInt64(44));
1488        }
1489    }
1490}