mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod projection;
22pub(crate) mod prune;
23pub(crate) mod range;
24pub(crate) mod scan_region;
25pub(crate) mod scan_util;
26pub(crate) mod seq_scan;
27pub(crate) mod unordered_scan;
28
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::Duration;
32
33use api::v1::OpType;
34use async_trait::async_trait;
35use common_time::Timestamp;
36use datafusion_common::arrow::array::UInt8Array;
37use datatypes::arrow;
38use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
39use datatypes::arrow::compute::SortOptions;
40use datatypes::arrow::row::{RowConverter, SortField};
41use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
42use datatypes::types::TimestampType;
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::{
45    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
46    TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
47    Vector, VectorRef,
48};
49use futures::stream::BoxStream;
50use futures::TryStreamExt;
51use snafu::{ensure, OptionExt, ResultExt};
52use store_api::metadata::RegionMetadata;
53use store_api::storage::{ColumnId, SequenceNumber};
54
55use crate::error::{
56    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
57};
58use crate::memtable::BoxedBatchIterator;
59use crate::read::prune::PruneReader;
60use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
61
62/// Storage internal representation of a batch of rows for a primary key (time series).
63///
64/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
65/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
66#[derive(Debug, PartialEq, Clone)]
67pub struct Batch {
68    /// Primary key encoded in a comparable form.
69    primary_key: Vec<u8>,
70    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
71    pk_values: Option<CompositeValues>,
72    /// Timestamps of rows, should be sorted and not null.
73    timestamps: VectorRef,
74    /// Sequences of rows
75    ///
76    /// UInt64 type, not null.
77    sequences: Arc<UInt64Vector>,
78    /// Op types of rows
79    ///
80    /// UInt8 type, not null.
81    op_types: Arc<UInt8Vector>,
82    /// Fields organized in columnar format.
83    fields: Vec<BatchColumn>,
84    /// Cache for field index lookup.
85    fields_idx: Option<HashMap<ColumnId, usize>>,
86}
87
88impl Batch {
89    /// Creates a new batch.
90    pub fn new(
91        primary_key: Vec<u8>,
92        timestamps: VectorRef,
93        sequences: Arc<UInt64Vector>,
94        op_types: Arc<UInt8Vector>,
95        fields: Vec<BatchColumn>,
96    ) -> Result<Batch> {
97        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
98            .with_fields(fields)
99            .build()
100    }
101
102    /// Tries to set fields for the batch.
103    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
104        Batch::new(
105            self.primary_key,
106            self.timestamps,
107            self.sequences,
108            self.op_types,
109            fields,
110        )
111    }
112
113    /// Returns primary key of the batch.
114    pub fn primary_key(&self) -> &[u8] {
115        &self.primary_key
116    }
117
118    /// Returns possibly decoded primary-key values.
119    pub fn pk_values(&self) -> Option<&CompositeValues> {
120        self.pk_values.as_ref()
121    }
122
123    /// Sets possibly decoded primary-key values.
124    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
125        self.pk_values = Some(pk_values);
126    }
127
128    /// Removes possibly decoded primary-key values. For testing only.
129    #[cfg(any(test, feature = "test"))]
130    pub fn remove_pk_values(&mut self) {
131        self.pk_values = None;
132    }
133
134    /// Returns fields in the batch.
135    pub fn fields(&self) -> &[BatchColumn] {
136        &self.fields
137    }
138
139    /// Returns timestamps of the batch.
140    pub fn timestamps(&self) -> &VectorRef {
141        &self.timestamps
142    }
143
144    /// Returns sequences of the batch.
145    pub fn sequences(&self) -> &Arc<UInt64Vector> {
146        &self.sequences
147    }
148
149    /// Returns op types of the batch.
150    pub fn op_types(&self) -> &Arc<UInt8Vector> {
151        &self.op_types
152    }
153
154    /// Returns the number of rows in the batch.
155    pub fn num_rows(&self) -> usize {
156        // All vectors have the same length. We use the length of sequences vector
157        // since it has static type.
158        self.sequences.len()
159    }
160
161    /// Returns true if the number of rows in the batch is 0.
162    pub fn is_empty(&self) -> bool {
163        self.num_rows() == 0
164    }
165
166    /// Returns the first timestamp in the batch or `None` if the batch is empty.
167    pub fn first_timestamp(&self) -> Option<Timestamp> {
168        if self.timestamps.is_empty() {
169            return None;
170        }
171
172        Some(self.get_timestamp(0))
173    }
174
175    /// Returns the last timestamp in the batch or `None` if the batch is empty.
176    pub fn last_timestamp(&self) -> Option<Timestamp> {
177        if self.timestamps.is_empty() {
178            return None;
179        }
180
181        Some(self.get_timestamp(self.timestamps.len() - 1))
182    }
183
184    /// Returns the first sequence in the batch or `None` if the batch is empty.
185    pub fn first_sequence(&self) -> Option<SequenceNumber> {
186        if self.sequences.is_empty() {
187            return None;
188        }
189
190        Some(self.get_sequence(0))
191    }
192
193    /// Returns the last sequence in the batch or `None` if the batch is empty.
194    pub fn last_sequence(&self) -> Option<SequenceNumber> {
195        if self.sequences.is_empty() {
196            return None;
197        }
198
199        Some(self.get_sequence(self.sequences.len() - 1))
200    }
201
202    /// Replaces the primary key of the batch.
203    ///
204    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
205    /// Be sure to update that field as well.
206    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
207        self.primary_key = primary_key;
208    }
209
210    /// Slice the batch, returning a new batch.
211    ///
212    /// # Panics
213    /// Panics if `offset + length > self.num_rows()`.
214    pub fn slice(&self, offset: usize, length: usize) -> Batch {
215        let fields = self
216            .fields
217            .iter()
218            .map(|column| BatchColumn {
219                column_id: column.column_id,
220                data: column.data.slice(offset, length),
221            })
222            .collect();
223        // We skip using the builder to avoid validating the batch again.
224        Batch {
225            // Now we need to clone the primary key. We could try `Bytes` if
226            // this becomes a bottleneck.
227            primary_key: self.primary_key.clone(),
228            pk_values: self.pk_values.clone(),
229            timestamps: self.timestamps.slice(offset, length),
230            sequences: Arc::new(self.sequences.get_slice(offset, length)),
231            op_types: Arc::new(self.op_types.get_slice(offset, length)),
232            fields,
233            fields_idx: self.fields_idx.clone(),
234        }
235    }
236
237    /// Takes `batches` and concat them into one batch.
238    ///
239    /// All `batches` must have the same primary key.
240    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
241        ensure!(
242            !batches.is_empty(),
243            InvalidBatchSnafu {
244                reason: "empty batches",
245            }
246        );
247        if batches.len() == 1 {
248            // Now we own the `batches` so we could pop it directly.
249            return Ok(batches.pop().unwrap());
250        }
251
252        let primary_key = std::mem::take(&mut batches[0].primary_key);
253        let first = &batches[0];
254        // We took the primary key from the first batch so we don't use `first.primary_key()`.
255        ensure!(
256            batches
257                .iter()
258                .skip(1)
259                .all(|b| b.primary_key() == primary_key),
260            InvalidBatchSnafu {
261                reason: "batches have different primary key",
262            }
263        );
264        for b in batches.iter().skip(1) {
265            ensure!(
266                b.fields.len() == first.fields.len(),
267                InvalidBatchSnafu {
268                    reason: "batches have different field num",
269                }
270            );
271            for (l, r) in b.fields.iter().zip(&first.fields) {
272                ensure!(
273                    l.column_id == r.column_id,
274                    InvalidBatchSnafu {
275                        reason: "batches have different fields",
276                    }
277                );
278            }
279        }
280
281        // We take the primary key from the first batch.
282        let mut builder = BatchBuilder::new(primary_key);
283        // Concat timestamps, sequences, op_types, fields.
284        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
285        builder.timestamps_array(array)?;
286        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
287        builder.sequences_array(array)?;
288        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
289        builder.op_types_array(array)?;
290        for (i, batch_column) in first.fields.iter().enumerate() {
291            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
292            builder.push_field_array(batch_column.column_id, array)?;
293        }
294
295        builder.build()
296    }
297
298    /// Removes rows whose op type is delete.
299    pub fn filter_deleted(&mut self) -> Result<()> {
300        // Safety: op type column is not null.
301        let array = self.op_types.as_arrow();
302        // Find rows with non-delete op type.
303        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
304        let predicate =
305            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
306        self.filter(&BooleanVector::from(predicate))
307    }
308
309    // Applies the `predicate` to the batch.
310    // Safety: We know the array type so we unwrap on casting.
311    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
312        self.timestamps = self
313            .timestamps
314            .filter(predicate)
315            .context(ComputeVectorSnafu)?;
316        self.sequences = Arc::new(
317            UInt64Vector::try_from_arrow_array(
318                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
319                    .context(ComputeArrowSnafu)?,
320            )
321            .unwrap(),
322        );
323        self.op_types = Arc::new(
324            UInt8Vector::try_from_arrow_array(
325                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
326                    .context(ComputeArrowSnafu)?,
327            )
328            .unwrap(),
329        );
330        for batch_column in &mut self.fields {
331            batch_column.data = batch_column
332                .data
333                .filter(predicate)
334                .context(ComputeVectorSnafu)?;
335        }
336
337        Ok(())
338    }
339
340    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
341    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
342        let seq = match (sequence, self.last_sequence()) {
343            (None, _) | (_, None) => return Ok(()),
344            (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
345            (Some(sequence), Some(_)) => sequence,
346        };
347
348        let seqs = self.sequences.as_arrow();
349        let sequence = UInt64Array::new_scalar(seq);
350        let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
351            .context(ComputeArrowSnafu)?;
352        let predicate = BooleanVector::from(predicate);
353        self.filter(&predicate)?;
354
355        Ok(())
356    }
357
358    /// Sorts rows in the batch. If `dedup` is true, it also removes
359    /// duplicated rows according to primary keys.
360    ///
361    /// It orders rows by timestamp, sequence desc and only keep the latest
362    /// row for the same timestamp. It doesn't consider op type as sequence
363    /// should already provide uniqueness for a row.
364    pub fn sort(&mut self, dedup: bool) -> Result<()> {
365        // If building a converter each time is costly, we may allow passing a
366        // converter.
367        let converter = RowConverter::new(vec![
368            SortField::new(self.timestamps.data_type().as_arrow_type()),
369            SortField::new_with_options(
370                self.sequences.data_type().as_arrow_type(),
371                SortOptions {
372                    descending: true,
373                    ..Default::default()
374                },
375            ),
376        ])
377        .context(ComputeArrowSnafu)?;
378        // Columns to sort.
379        let columns = [
380            self.timestamps.to_arrow_array(),
381            self.sequences.to_arrow_array(),
382        ];
383        let rows = converter.convert_columns(&columns).unwrap();
384        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
385
386        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
387        if !was_sorted {
388            to_sort.sort_unstable_by_key(|x| x.1);
389        }
390
391        let num_rows = to_sort.len();
392        if dedup {
393            // Dedup by timestamps.
394            to_sort.dedup_by(|left, right| {
395                debug_assert_eq!(18, left.1.as_ref().len());
396                debug_assert_eq!(18, right.1.as_ref().len());
397                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
398                // We only compare the timestamp part and ignore sequence.
399                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
400            });
401        }
402        let no_dedup = to_sort.len() == num_rows;
403
404        if was_sorted && no_dedup {
405            return Ok(());
406        }
407        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
408        self.take_in_place(&indices)
409    }
410
411    /// Returns the estimated memory size of the batch.
412    pub fn memory_size(&self) -> usize {
413        let mut size = std::mem::size_of::<Self>();
414        size += self.primary_key.len();
415        size += self.timestamps.memory_size();
416        size += self.sequences.memory_size();
417        size += self.op_types.memory_size();
418        for batch_column in &self.fields {
419            size += batch_column.data.memory_size();
420        }
421        size
422    }
423
424    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
425    pub(crate) fn projected_fields(
426        metadata: &RegionMetadata,
427        projection: &[ColumnId],
428    ) -> Vec<(ColumnId, ConcreteDataType)> {
429        let projected_ids: HashSet<_> = projection.iter().copied().collect();
430        metadata
431            .field_columns()
432            .filter_map(|column| {
433                if projected_ids.contains(&column.column_id) {
434                    Some((column.column_id, column.column_schema.data_type.clone()))
435                } else {
436                    None
437                }
438            })
439            .collect()
440    }
441
442    /// Returns timestamps in a native slice or `None` if the batch is empty.
443    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
444        if self.timestamps.is_empty() {
445            return None;
446        }
447
448        let values = match self.timestamps.data_type() {
449            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
450                .timestamps
451                .as_any()
452                .downcast_ref::<TimestampSecondVector>()
453                .unwrap()
454                .as_arrow()
455                .values(),
456            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
457                .timestamps
458                .as_any()
459                .downcast_ref::<TimestampMillisecondVector>()
460                .unwrap()
461                .as_arrow()
462                .values(),
463            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
464                .timestamps
465                .as_any()
466                .downcast_ref::<TimestampMicrosecondVector>()
467                .unwrap()
468                .as_arrow()
469                .values(),
470            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
471                .timestamps
472                .as_any()
473                .downcast_ref::<TimestampNanosecondVector>()
474                .unwrap()
475                .as_arrow()
476                .values(),
477            other => panic!("timestamps in a Batch has other type {:?}", other),
478        };
479
480        Some(values)
481    }
482
483    /// Takes the batch in place.
484    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
485        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
486        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
487            .context(ComputeArrowSnafu)?;
488        // Safety: we know the array and vector type.
489        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
490        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
491            .context(ComputeArrowSnafu)?;
492        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
493        for batch_column in &mut self.fields {
494            batch_column.data = batch_column
495                .data
496                .take(indices)
497                .context(ComputeVectorSnafu)?;
498        }
499
500        Ok(())
501    }
502
503    /// Gets a timestamp at given `index`.
504    ///
505    /// # Panics
506    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
507    fn get_timestamp(&self, index: usize) -> Timestamp {
508        match self.timestamps.get_ref(index) {
509            ValueRef::Timestamp(timestamp) => timestamp,
510
511            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
512            value => panic!("{:?} is not a timestamp", value),
513        }
514    }
515
516    /// Gets a sequence at given `index`.
517    ///
518    /// # Panics
519    /// Panics if `index` is out-of-bound or the sequence vector returns null.
520    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
521        // Safety: sequences is not null so it actually returns Some.
522        self.sequences.get_data(index).unwrap()
523    }
524
525    /// Checks the batch is monotonic by timestamps.
526    #[cfg(debug_assertions)]
527    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
528        use std::cmp::Ordering;
529        if self.timestamps_native().is_none() {
530            return Ok(());
531        }
532
533        let timestamps = self.timestamps_native().unwrap();
534        let sequences = self.sequences.as_arrow().values();
535        for (i, window) in timestamps.windows(2).enumerate() {
536            let current = window[0];
537            let next = window[1];
538            let current_sequence = sequences[i];
539            let next_sequence = sequences[i + 1];
540            match current.cmp(&next) {
541                Ordering::Less => {
542                    // The current timestamp is less than the next timestamp.
543                    continue;
544                }
545                Ordering::Equal => {
546                    // The current timestamp is equal to the next timestamp.
547                    if current_sequence < next_sequence {
548                        return Err(format!(
549                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
550                            current, next, current_sequence, next_sequence, i
551                        ));
552                    }
553                }
554                Ordering::Greater => {
555                    // The current timestamp is greater than the next timestamp.
556                    return Err(format!(
557                        "timestamps are not monotonic: {} > {}, index: {}",
558                        current, next, i
559                    ));
560                }
561            }
562        }
563
564        Ok(())
565    }
566
567    /// Returns Ok if the given batch is behind the current batch.
568    #[cfg(debug_assertions)]
569    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
570        // Checks the primary key
571        if self.primary_key() < other.primary_key() {
572            return Ok(());
573        }
574        if self.primary_key() > other.primary_key() {
575            return Err(format!(
576                "primary key is not monotonic: {:?} > {:?}",
577                self.primary_key(),
578                other.primary_key()
579            ));
580        }
581        // Checks the timestamp.
582        if self.last_timestamp() < other.first_timestamp() {
583            return Ok(());
584        }
585        if self.last_timestamp() > other.first_timestamp() {
586            return Err(format!(
587                "timestamps are not monotonic: {:?} > {:?}",
588                self.last_timestamp(),
589                other.first_timestamp()
590            ));
591        }
592        // Checks the sequence.
593        if self.last_sequence() >= other.first_sequence() {
594            return Ok(());
595        }
596        Err(format!(
597            "sequences are not monotonic: {:?} < {:?}",
598            self.last_sequence(),
599            other.first_sequence()
600        ))
601    }
602
603    /// Returns the value of the column in the primary key.
604    ///
605    /// Lazily decodes the primary key and caches the result.
606    pub fn pk_col_value(
607        &mut self,
608        codec: &dyn PrimaryKeyCodec,
609        col_idx_in_pk: usize,
610        column_id: ColumnId,
611    ) -> Result<Option<&Value>> {
612        if self.pk_values.is_none() {
613            self.pk_values = Some(codec.decode(&self.primary_key)?);
614        }
615
616        let pk_values = self.pk_values.as_ref().unwrap();
617        Ok(match pk_values {
618            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
619            CompositeValues::Sparse(values) => values.get(&column_id),
620        })
621    }
622
623    /// Returns values of the field in the batch.
624    ///
625    /// Lazily caches the field index.
626    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
627        if self.fields_idx.is_none() {
628            self.fields_idx = Some(
629                self.fields
630                    .iter()
631                    .enumerate()
632                    .map(|(i, c)| (c.column_id, i))
633                    .collect(),
634            );
635        }
636
637        self.fields_idx
638            .as_ref()
639            .unwrap()
640            .get(&column_id)
641            .map(|&idx| &self.fields[idx])
642    }
643}
644
645/// A struct to check the batch is monotonic.
646#[cfg(debug_assertions)]
647#[derive(Default)]
648pub(crate) struct BatchChecker {
649    last_batch: Option<Batch>,
650    start: Option<Timestamp>,
651    end: Option<Timestamp>,
652}
653
654#[cfg(debug_assertions)]
655impl BatchChecker {
656    /// Attaches the given start timestamp to the checker.
657    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
658        self.start = start;
659        self
660    }
661
662    /// Attaches the given end timestamp to the checker.
663    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
664        self.end = end;
665        self
666    }
667
668    /// Returns true if the given batch is monotonic and behind
669    /// the last batch.
670    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
671        batch.check_monotonic()?;
672
673        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
674            if start > first {
675                return Err(format!(
676                    "batch's first timestamp is before the start timestamp: {:?} > {:?}",
677                    start, first
678                ));
679            }
680        }
681        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
682            if end <= last {
683                return Err(format!(
684                    "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
685                    end, last
686                ));
687            }
688        }
689
690        // Checks the batch is behind the last batch.
691        // Then Updates the last batch.
692        let res = self
693            .last_batch
694            .as_ref()
695            .map(|last| last.check_next_batch(batch))
696            .unwrap_or(Ok(()));
697        self.last_batch = Some(batch.clone());
698        res
699    }
700
701    /// Formats current batch and last batch for debug.
702    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
703        use std::fmt::Write;
704
705        let mut message = String::new();
706        if let Some(last) = &self.last_batch {
707            write!(
708                message,
709                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
710                last.primary_key(),
711                last.last_timestamp(),
712                last.last_sequence()
713            )
714            .unwrap();
715        }
716        write!(
717            message,
718            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
719            batch.primary_key(),
720            batch.timestamps(),
721            batch.sequences()
722        )
723        .unwrap();
724
725        message
726    }
727
728    /// Checks batches from the part range are monotonic. Otherwise, panics.
729    pub(crate) fn ensure_part_range_batch(
730        &mut self,
731        scanner: &str,
732        region_id: store_api::storage::RegionId,
733        partition: usize,
734        part_range: store_api::region_engine::PartitionRange,
735        batch: &Batch,
736    ) {
737        if let Err(e) = self.check_monotonic(batch) {
738            let err_msg = format!(
739                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
740                scanner, e, region_id, partition, part_range,
741            );
742            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
743            // Only print the number of row in the panic message.
744            panic!("{err_msg}, batch rows: {}", batch.num_rows());
745        }
746    }
747}
748
749/// Len of timestamp in arrow row format.
750const TIMESTAMP_KEY_LEN: usize = 9;
751
752/// Helper function to concat arrays from `iter`.
753fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
754    let arrays: Vec<_> = iter.collect();
755    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
756    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
757}
758
759/// A column in a [Batch].
760#[derive(Debug, PartialEq, Eq, Clone)]
761pub struct BatchColumn {
762    /// Id of the column.
763    pub column_id: ColumnId,
764    /// Data of the column.
765    pub data: VectorRef,
766}
767
768/// Builder to build [Batch].
769pub struct BatchBuilder {
770    primary_key: Vec<u8>,
771    timestamps: Option<VectorRef>,
772    sequences: Option<Arc<UInt64Vector>>,
773    op_types: Option<Arc<UInt8Vector>>,
774    fields: Vec<BatchColumn>,
775}
776
777impl BatchBuilder {
778    /// Creates a new [BatchBuilder] with primary key.
779    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
780        BatchBuilder {
781            primary_key,
782            timestamps: None,
783            sequences: None,
784            op_types: None,
785            fields: Vec::new(),
786        }
787    }
788
789    /// Creates a new [BatchBuilder] with all required columns.
790    pub fn with_required_columns(
791        primary_key: Vec<u8>,
792        timestamps: VectorRef,
793        sequences: Arc<UInt64Vector>,
794        op_types: Arc<UInt8Vector>,
795    ) -> BatchBuilder {
796        BatchBuilder {
797            primary_key,
798            timestamps: Some(timestamps),
799            sequences: Some(sequences),
800            op_types: Some(op_types),
801            fields: Vec::new(),
802        }
803    }
804
805    /// Set all field columns.
806    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
807        self.fields = fields;
808        self
809    }
810
811    /// Push a field column.
812    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
813        self.fields.push(column);
814        self
815    }
816
817    /// Push an array as a field.
818    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
819        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
820        self.fields.push(BatchColumn {
821            column_id,
822            data: vector,
823        });
824
825        Ok(self)
826    }
827
828    /// Try to set an array as timestamps.
829    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
830        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
831        ensure!(
832            vector.data_type().is_timestamp(),
833            InvalidBatchSnafu {
834                reason: format!("{:?} is not a timestamp type", vector.data_type()),
835            }
836        );
837
838        self.timestamps = Some(vector);
839        Ok(self)
840    }
841
842    /// Try to set an array as sequences.
843    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
844        ensure!(
845            *array.data_type() == arrow::datatypes::DataType::UInt64,
846            InvalidBatchSnafu {
847                reason: "sequence array is not UInt64 type",
848            }
849        );
850        // Safety: The cast must success as we have ensured it is uint64 type.
851        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
852        self.sequences = Some(vector);
853
854        Ok(self)
855    }
856
857    /// Try to set an array as op types.
858    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
859        ensure!(
860            *array.data_type() == arrow::datatypes::DataType::UInt8,
861            InvalidBatchSnafu {
862                reason: "sequence array is not UInt8 type",
863            }
864        );
865        // Safety: The cast must success as we have ensured it is uint64 type.
866        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
867        self.op_types = Some(vector);
868
869        Ok(self)
870    }
871
872    /// Builds the [Batch].
873    pub fn build(self) -> Result<Batch> {
874        let timestamps = self.timestamps.context(InvalidBatchSnafu {
875            reason: "missing timestamps",
876        })?;
877        let sequences = self.sequences.context(InvalidBatchSnafu {
878            reason: "missing sequences",
879        })?;
880        let op_types = self.op_types.context(InvalidBatchSnafu {
881            reason: "missing op_types",
882        })?;
883        // Our storage format ensure these columns are not nullable so
884        // we use assert here.
885        assert_eq!(0, timestamps.null_count());
886        assert_eq!(0, sequences.null_count());
887        assert_eq!(0, op_types.null_count());
888
889        let ts_len = timestamps.len();
890        ensure!(
891            sequences.len() == ts_len,
892            InvalidBatchSnafu {
893                reason: format!(
894                    "sequence have different len {} != {}",
895                    sequences.len(),
896                    ts_len
897                ),
898            }
899        );
900        ensure!(
901            op_types.len() == ts_len,
902            InvalidBatchSnafu {
903                reason: format!(
904                    "op type have different len {} != {}",
905                    op_types.len(),
906                    ts_len
907                ),
908            }
909        );
910        for column in &self.fields {
911            ensure!(
912                column.data.len() == ts_len,
913                InvalidBatchSnafu {
914                    reason: format!(
915                        "column {} has different len {} != {}",
916                        column.column_id,
917                        column.data.len(),
918                        ts_len
919                    ),
920                }
921            );
922        }
923
924        Ok(Batch {
925            primary_key: self.primary_key,
926            pk_values: None,
927            timestamps,
928            sequences,
929            op_types,
930            fields: self.fields,
931            fields_idx: None,
932        })
933    }
934}
935
936impl From<Batch> for BatchBuilder {
937    fn from(batch: Batch) -> Self {
938        Self {
939            primary_key: batch.primary_key,
940            timestamps: Some(batch.timestamps),
941            sequences: Some(batch.sequences),
942            op_types: Some(batch.op_types),
943            fields: batch.fields,
944        }
945    }
946}
947
948/// Async [Batch] reader and iterator wrapper.
949///
950/// This is the data source for SST writers or internal readers.
951pub enum Source {
952    /// Source from a [BoxedBatchReader].
953    Reader(BoxedBatchReader),
954    /// Source from a [BoxedBatchIterator].
955    Iter(BoxedBatchIterator),
956    /// Source from a [BoxedBatchStream].
957    Stream(BoxedBatchStream),
958    /// Source from a [PruneReader].
959    PruneReader(PruneReader),
960}
961
962impl Source {
963    /// Returns next [Batch] from this data source.
964    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
965        match self {
966            Source::Reader(reader) => reader.next_batch().await,
967            Source::Iter(iter) => iter.next().transpose(),
968            Source::Stream(stream) => stream.try_next().await,
969            Source::PruneReader(reader) => reader.next_batch().await,
970        }
971    }
972}
973
974/// Async batch reader.
975///
976/// The reader must guarantee [Batch]es returned by it have the same schema.
977#[async_trait]
978pub trait BatchReader: Send {
979    /// Fetch next [Batch].
980    ///
981    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
982    /// again won't return batch again.
983    ///
984    /// If `Err` is returned, caller should not call this method again, the implementor
985    /// may or may not panic in such case.
986    async fn next_batch(&mut self) -> Result<Option<Batch>>;
987}
988
989/// Pointer to [BatchReader].
990pub type BoxedBatchReader = Box<dyn BatchReader>;
991
992/// Pointer to a stream that yields [Batch].
993pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
994
995#[async_trait::async_trait]
996impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
997    async fn next_batch(&mut self) -> Result<Option<Batch>> {
998        (**self).next_batch().await
999    }
1000}
1001
1002/// Local metrics for scanners.
1003#[derive(Debug, Default)]
1004pub(crate) struct ScannerMetrics {
1005    /// Duration to prepare the scan task.
1006    prepare_scan_cost: Duration,
1007    /// Duration to build the (merge) reader.
1008    build_reader_cost: Duration,
1009    /// Duration to scan data.
1010    scan_cost: Duration,
1011    /// Duration to convert batches.
1012    convert_cost: Duration,
1013    /// Duration while waiting for `yield`.
1014    yield_cost: Duration,
1015    /// Number of batches returned.
1016    num_batches: usize,
1017    /// Number of rows returned.
1018    num_rows: usize,
1019    /// Number of mem ranges scanned.
1020    num_mem_ranges: usize,
1021    /// Number of file ranges scanned.
1022    num_file_ranges: usize,
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027    use store_api::codec::PrimaryKeyEncoding;
1028    use store_api::storage::consts::ReservedColumnId;
1029
1030    use super::*;
1031    use crate::error::Error;
1032    use crate::row_converter::{self, build_primary_key_codec_with_fields};
1033    use crate::test_util::new_batch_builder;
1034
1035    fn new_batch(
1036        timestamps: &[i64],
1037        sequences: &[u64],
1038        op_types: &[OpType],
1039        field: &[u64],
1040    ) -> Batch {
1041        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1042            .build()
1043            .unwrap()
1044    }
1045
1046    #[test]
1047    fn test_empty_batch() {
1048        let batch = new_batch(&[], &[], &[], &[]);
1049        assert_eq!(None, batch.first_timestamp());
1050        assert_eq!(None, batch.last_timestamp());
1051        assert_eq!(None, batch.first_sequence());
1052        assert_eq!(None, batch.last_sequence());
1053        assert!(batch.timestamps_native().is_none());
1054    }
1055
1056    #[test]
1057    fn test_first_last_one() {
1058        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1059        assert_eq!(
1060            Timestamp::new_millisecond(1),
1061            batch.first_timestamp().unwrap()
1062        );
1063        assert_eq!(
1064            Timestamp::new_millisecond(1),
1065            batch.last_timestamp().unwrap()
1066        );
1067        assert_eq!(2, batch.first_sequence().unwrap());
1068        assert_eq!(2, batch.last_sequence().unwrap());
1069    }
1070
1071    #[test]
1072    fn test_first_last_multiple() {
1073        let batch = new_batch(
1074            &[1, 2, 3],
1075            &[11, 12, 13],
1076            &[OpType::Put, OpType::Put, OpType::Put],
1077            &[21, 22, 23],
1078        );
1079        assert_eq!(
1080            Timestamp::new_millisecond(1),
1081            batch.first_timestamp().unwrap()
1082        );
1083        assert_eq!(
1084            Timestamp::new_millisecond(3),
1085            batch.last_timestamp().unwrap()
1086        );
1087        assert_eq!(11, batch.first_sequence().unwrap());
1088        assert_eq!(13, batch.last_sequence().unwrap());
1089    }
1090
1091    #[test]
1092    fn test_slice() {
1093        let batch = new_batch(
1094            &[1, 2, 3, 4],
1095            &[11, 12, 13, 14],
1096            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1097            &[21, 22, 23, 24],
1098        );
1099        let batch = batch.slice(1, 2);
1100        let expect = new_batch(
1101            &[2, 3],
1102            &[12, 13],
1103            &[OpType::Delete, OpType::Put],
1104            &[22, 23],
1105        );
1106        assert_eq!(expect, batch);
1107    }
1108
1109    #[test]
1110    fn test_timestamps_native() {
1111        let batch = new_batch(
1112            &[1, 2, 3, 4],
1113            &[11, 12, 13, 14],
1114            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1115            &[21, 22, 23, 24],
1116        );
1117        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1118    }
1119
1120    #[test]
1121    fn test_concat_empty() {
1122        let err = Batch::concat(vec![]).unwrap_err();
1123        assert!(
1124            matches!(err, Error::InvalidBatch { .. }),
1125            "unexpected err: {err}"
1126        );
1127    }
1128
1129    #[test]
1130    fn test_concat_one() {
1131        let batch = new_batch(&[], &[], &[], &[]);
1132        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1133        assert_eq!(batch, actual);
1134
1135        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1136        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1137        assert_eq!(batch, actual);
1138    }
1139
1140    #[test]
1141    fn test_concat_multiple() {
1142        let batches = vec![
1143            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1144            new_batch(
1145                &[3, 4, 5],
1146                &[13, 14, 15],
1147                &[OpType::Put, OpType::Delete, OpType::Put],
1148                &[23, 24, 25],
1149            ),
1150            new_batch(&[], &[], &[], &[]),
1151            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1152        ];
1153        let batch = Batch::concat(batches).unwrap();
1154        let expect = new_batch(
1155            &[1, 2, 3, 4, 5, 6],
1156            &[11, 12, 13, 14, 15, 16],
1157            &[
1158                OpType::Put,
1159                OpType::Put,
1160                OpType::Put,
1161                OpType::Delete,
1162                OpType::Put,
1163                OpType::Put,
1164            ],
1165            &[21, 22, 23, 24, 25, 26],
1166        );
1167        assert_eq!(expect, batch);
1168    }
1169
1170    #[test]
1171    fn test_concat_different() {
1172        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1173        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1174        batch2.primary_key = b"hello".to_vec();
1175        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1176        assert!(
1177            matches!(err, Error::InvalidBatch { .. }),
1178            "unexpected err: {err}"
1179        );
1180    }
1181
1182    #[test]
1183    fn test_concat_different_fields() {
1184        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1185        let fields = vec![
1186            batch1.fields()[0].clone(),
1187            BatchColumn {
1188                column_id: 2,
1189                data: Arc::new(UInt64Vector::from_slice([2])),
1190            },
1191        ];
1192        // Batch 2 has more fields.
1193        let batch2 = batch1.clone().with_fields(fields).unwrap();
1194        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1195        assert!(
1196            matches!(err, Error::InvalidBatch { .. }),
1197            "unexpected err: {err}"
1198        );
1199
1200        // Batch 2 has different field.
1201        let fields = vec![BatchColumn {
1202            column_id: 2,
1203            data: Arc::new(UInt64Vector::from_slice([2])),
1204        }];
1205        let batch2 = batch1.clone().with_fields(fields).unwrap();
1206        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1207        assert!(
1208            matches!(err, Error::InvalidBatch { .. }),
1209            "unexpected err: {err}"
1210        );
1211    }
1212
1213    #[test]
1214    fn test_filter_deleted_empty() {
1215        let mut batch = new_batch(&[], &[], &[], &[]);
1216        batch.filter_deleted().unwrap();
1217        assert!(batch.is_empty());
1218    }
1219
1220    #[test]
1221    fn test_filter_deleted() {
1222        let mut batch = new_batch(
1223            &[1, 2, 3, 4],
1224            &[11, 12, 13, 14],
1225            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1226            &[21, 22, 23, 24],
1227        );
1228        batch.filter_deleted().unwrap();
1229        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1230        assert_eq!(expect, batch);
1231
1232        let mut batch = new_batch(
1233            &[1, 2, 3, 4],
1234            &[11, 12, 13, 14],
1235            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1236            &[21, 22, 23, 24],
1237        );
1238        let expect = batch.clone();
1239        batch.filter_deleted().unwrap();
1240        assert_eq!(expect, batch);
1241    }
1242
1243    #[test]
1244    fn test_filter_by_sequence() {
1245        // Filters put only.
1246        let mut batch = new_batch(
1247            &[1, 2, 3, 4],
1248            &[11, 12, 13, 14],
1249            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1250            &[21, 22, 23, 24],
1251        );
1252        batch.filter_by_sequence(Some(13)).unwrap();
1253        let expect = new_batch(
1254            &[1, 2, 3],
1255            &[11, 12, 13],
1256            &[OpType::Put, OpType::Put, OpType::Put],
1257            &[21, 22, 23],
1258        );
1259        assert_eq!(expect, batch);
1260
1261        // Filters to empty.
1262        let mut batch = new_batch(
1263            &[1, 2, 3, 4],
1264            &[11, 12, 13, 14],
1265            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1266            &[21, 22, 23, 24],
1267        );
1268
1269        batch.filter_by_sequence(Some(10)).unwrap();
1270        assert!(batch.is_empty());
1271
1272        // None filter.
1273        let mut batch = new_batch(
1274            &[1, 2, 3, 4],
1275            &[11, 12, 13, 14],
1276            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1277            &[21, 22, 23, 24],
1278        );
1279        let expect = batch.clone();
1280        batch.filter_by_sequence(None).unwrap();
1281        assert_eq!(expect, batch);
1282
1283        // Filter a empty batch
1284        let mut batch = new_batch(&[], &[], &[], &[]);
1285        batch.filter_by_sequence(Some(10)).unwrap();
1286        assert!(batch.is_empty());
1287
1288        // Filter a empty batch with None
1289        let mut batch = new_batch(&[], &[], &[], &[]);
1290        batch.filter_by_sequence(None).unwrap();
1291        assert!(batch.is_empty());
1292    }
1293
1294    #[test]
1295    fn test_filter() {
1296        // Filters put only.
1297        let mut batch = new_batch(
1298            &[1, 2, 3, 4],
1299            &[11, 12, 13, 14],
1300            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1301            &[21, 22, 23, 24],
1302        );
1303        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1304        batch.filter(&predicate).unwrap();
1305        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1306        assert_eq!(expect, batch);
1307
1308        // Filters deletion.
1309        let mut batch = new_batch(
1310            &[1, 2, 3, 4],
1311            &[11, 12, 13, 14],
1312            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1313            &[21, 22, 23, 24],
1314        );
1315        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1316        batch.filter(&predicate).unwrap();
1317        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1318        assert_eq!(expect, batch);
1319
1320        // Filters to empty.
1321        let predicate = BooleanVector::from_vec(vec![false, false]);
1322        batch.filter(&predicate).unwrap();
1323        assert!(batch.is_empty());
1324    }
1325
1326    #[test]
1327    fn test_sort_and_dedup() {
1328        let original = new_batch(
1329            &[2, 3, 1, 4, 5, 2],
1330            &[1, 2, 3, 4, 5, 6],
1331            &[
1332                OpType::Put,
1333                OpType::Put,
1334                OpType::Put,
1335                OpType::Put,
1336                OpType::Put,
1337                OpType::Put,
1338            ],
1339            &[21, 22, 23, 24, 25, 26],
1340        );
1341
1342        let mut batch = original.clone();
1343        batch.sort(true).unwrap();
1344        // It should only keep one timestamp 2.
1345        assert_eq!(
1346            new_batch(
1347                &[1, 2, 3, 4, 5],
1348                &[3, 6, 2, 4, 5],
1349                &[
1350                    OpType::Put,
1351                    OpType::Put,
1352                    OpType::Put,
1353                    OpType::Put,
1354                    OpType::Put,
1355                ],
1356                &[23, 26, 22, 24, 25],
1357            ),
1358            batch
1359        );
1360
1361        let mut batch = original.clone();
1362        batch.sort(false).unwrap();
1363
1364        // It should only keep one timestamp 2.
1365        assert_eq!(
1366            new_batch(
1367                &[1, 2, 2, 3, 4, 5],
1368                &[3, 6, 1, 2, 4, 5],
1369                &[
1370                    OpType::Put,
1371                    OpType::Put,
1372                    OpType::Put,
1373                    OpType::Put,
1374                    OpType::Put,
1375                    OpType::Put,
1376                ],
1377                &[23, 26, 21, 22, 24, 25],
1378            ),
1379            batch
1380        );
1381
1382        let original = new_batch(
1383            &[2, 2, 1],
1384            &[1, 6, 1],
1385            &[OpType::Delete, OpType::Put, OpType::Put],
1386            &[21, 22, 23],
1387        );
1388
1389        let mut batch = original.clone();
1390        batch.sort(true).unwrap();
1391        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1392        assert_eq!(expect, batch);
1393
1394        let mut batch = original.clone();
1395        batch.sort(false).unwrap();
1396        let expect = new_batch(
1397            &[1, 2, 2],
1398            &[1, 6, 1],
1399            &[OpType::Put, OpType::Put, OpType::Delete],
1400            &[23, 22, 21],
1401        );
1402        assert_eq!(expect, batch);
1403    }
1404
1405    #[test]
1406    fn test_get_value() {
1407        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1408
1409        for encoding in encodings {
1410            let codec = build_primary_key_codec_with_fields(
1411                encoding,
1412                [
1413                    (
1414                        ReservedColumnId::table_id(),
1415                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1416                    ),
1417                    (
1418                        ReservedColumnId::tsid(),
1419                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1420                    ),
1421                    (
1422                        100,
1423                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1424                    ),
1425                    (
1426                        200,
1427                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1428                    ),
1429                ]
1430                .into_iter(),
1431            );
1432
1433            let values = [
1434                Value::UInt32(1000),
1435                Value::UInt64(2000),
1436                Value::String("abcdefgh".into()),
1437                Value::String("zyxwvu".into()),
1438            ];
1439            let mut buf = vec![];
1440            codec
1441                .encode_values(
1442                    &[
1443                        (ReservedColumnId::table_id(), values[0].clone()),
1444                        (ReservedColumnId::tsid(), values[1].clone()),
1445                        (100, values[2].clone()),
1446                        (200, values[3].clone()),
1447                    ],
1448                    &mut buf,
1449                )
1450                .unwrap();
1451
1452            let field_col_id = 2;
1453            let mut batch = new_batch_builder(
1454                &buf,
1455                &[1, 2, 3],
1456                &[1, 1, 1],
1457                &[OpType::Put, OpType::Put, OpType::Put],
1458                field_col_id,
1459                &[42, 43, 44],
1460            )
1461            .build()
1462            .unwrap();
1463
1464            let v = batch
1465                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1466                .unwrap()
1467                .unwrap();
1468            assert_eq!(values[0], *v);
1469
1470            let v = batch
1471                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1472                .unwrap()
1473                .unwrap();
1474            assert_eq!(values[1], *v);
1475
1476            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1477            assert_eq!(values[2], *v);
1478
1479            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1480            assert_eq!(values[3], *v);
1481
1482            let v = batch.field_col_value(field_col_id).unwrap();
1483            assert_eq!(v.data.get(0), Value::UInt64(42));
1484            assert_eq!(v.data.get(1), Value::UInt64(43));
1485            assert_eq!(v.data.get(2), Value::UInt64(44));
1486        }
1487    }
1488}