mito2/
read.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common structs and utilities for reading data.
16
17pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53    BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54    TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55    UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector,
56    VectorRef,
57};
58use futures::stream::BoxStream;
59use futures::TryStreamExt;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{ensure, OptionExt, ResultExt};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber};
64
65use crate::error::{
66    ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67    Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72/// Storage internal representation of a batch of rows for a primary key (time series).
73///
74/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
75/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
76#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78    /// Primary key encoded in a comparable form.
79    primary_key: Vec<u8>,
80    /// Possibly decoded `primary_key` values. Some places would decode it in advance.
81    pk_values: Option<CompositeValues>,
82    /// Timestamps of rows, should be sorted and not null.
83    timestamps: VectorRef,
84    /// Sequences of rows
85    ///
86    /// UInt64 type, not null.
87    sequences: Arc<UInt64Vector>,
88    /// Op types of rows
89    ///
90    /// UInt8 type, not null.
91    op_types: Arc<UInt8Vector>,
92    /// Fields organized in columnar format.
93    fields: Vec<BatchColumn>,
94    /// Cache for field index lookup.
95    fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99    /// Creates a new batch.
100    pub fn new(
101        primary_key: Vec<u8>,
102        timestamps: VectorRef,
103        sequences: Arc<UInt64Vector>,
104        op_types: Arc<UInt8Vector>,
105        fields: Vec<BatchColumn>,
106    ) -> Result<Batch> {
107        BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108            .with_fields(fields)
109            .build()
110    }
111
112    /// Tries to set fields for the batch.
113    pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114        Batch::new(
115            self.primary_key,
116            self.timestamps,
117            self.sequences,
118            self.op_types,
119            fields,
120        )
121    }
122
123    /// Returns primary key of the batch.
124    pub fn primary_key(&self) -> &[u8] {
125        &self.primary_key
126    }
127
128    /// Returns possibly decoded primary-key values.
129    pub fn pk_values(&self) -> Option<&CompositeValues> {
130        self.pk_values.as_ref()
131    }
132
133    /// Sets possibly decoded primary-key values.
134    pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135        self.pk_values = Some(pk_values);
136    }
137
138    /// Removes possibly decoded primary-key values. For testing only.
139    #[cfg(any(test, feature = "test"))]
140    pub fn remove_pk_values(&mut self) {
141        self.pk_values = None;
142    }
143
144    /// Returns fields in the batch.
145    pub fn fields(&self) -> &[BatchColumn] {
146        &self.fields
147    }
148
149    /// Returns timestamps of the batch.
150    pub fn timestamps(&self) -> &VectorRef {
151        &self.timestamps
152    }
153
154    /// Returns sequences of the batch.
155    pub fn sequences(&self) -> &Arc<UInt64Vector> {
156        &self.sequences
157    }
158
159    /// Returns op types of the batch.
160    pub fn op_types(&self) -> &Arc<UInt8Vector> {
161        &self.op_types
162    }
163
164    /// Returns the number of rows in the batch.
165    pub fn num_rows(&self) -> usize {
166        // All vectors have the same length. We use the length of sequences vector
167        // since it has static type.
168        self.sequences.len()
169    }
170
171    /// Create an empty [`Batch`].
172    pub(crate) fn empty() -> Self {
173        Self {
174            primary_key: vec![],
175            pk_values: None,
176            timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177            sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178            op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179            fields: vec![],
180            fields_idx: None,
181        }
182    }
183
184    /// Returns true if the number of rows in the batch is 0.
185    pub fn is_empty(&self) -> bool {
186        self.num_rows() == 0
187    }
188
189    /// Returns the first timestamp in the batch or `None` if the batch is empty.
190    pub fn first_timestamp(&self) -> Option<Timestamp> {
191        if self.timestamps.is_empty() {
192            return None;
193        }
194
195        Some(self.get_timestamp(0))
196    }
197
198    /// Returns the last timestamp in the batch or `None` if the batch is empty.
199    pub fn last_timestamp(&self) -> Option<Timestamp> {
200        if self.timestamps.is_empty() {
201            return None;
202        }
203
204        Some(self.get_timestamp(self.timestamps.len() - 1))
205    }
206
207    /// Returns the first sequence in the batch or `None` if the batch is empty.
208    pub fn first_sequence(&self) -> Option<SequenceNumber> {
209        if self.sequences.is_empty() {
210            return None;
211        }
212
213        Some(self.get_sequence(0))
214    }
215
216    /// Returns the last sequence in the batch or `None` if the batch is empty.
217    pub fn last_sequence(&self) -> Option<SequenceNumber> {
218        if self.sequences.is_empty() {
219            return None;
220        }
221
222        Some(self.get_sequence(self.sequences.len() - 1))
223    }
224
225    /// Replaces the primary key of the batch.
226    ///
227    /// Notice that this [Batch] also contains a maybe-exist `pk_values`.
228    /// Be sure to update that field as well.
229    pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230        self.primary_key = primary_key;
231    }
232
233    /// Slice the batch, returning a new batch.
234    ///
235    /// # Panics
236    /// Panics if `offset + length > self.num_rows()`.
237    pub fn slice(&self, offset: usize, length: usize) -> Batch {
238        let fields = self
239            .fields
240            .iter()
241            .map(|column| BatchColumn {
242                column_id: column.column_id,
243                data: column.data.slice(offset, length),
244            })
245            .collect();
246        // We skip using the builder to avoid validating the batch again.
247        Batch {
248            // Now we need to clone the primary key. We could try `Bytes` if
249            // this becomes a bottleneck.
250            primary_key: self.primary_key.clone(),
251            pk_values: self.pk_values.clone(),
252            timestamps: self.timestamps.slice(offset, length),
253            sequences: Arc::new(self.sequences.get_slice(offset, length)),
254            op_types: Arc::new(self.op_types.get_slice(offset, length)),
255            fields,
256            fields_idx: self.fields_idx.clone(),
257        }
258    }
259
260    /// Takes `batches` and concat them into one batch.
261    ///
262    /// All `batches` must have the same primary key.
263    pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264        ensure!(
265            !batches.is_empty(),
266            InvalidBatchSnafu {
267                reason: "empty batches",
268            }
269        );
270        if batches.len() == 1 {
271            // Now we own the `batches` so we could pop it directly.
272            return Ok(batches.pop().unwrap());
273        }
274
275        let primary_key = std::mem::take(&mut batches[0].primary_key);
276        let first = &batches[0];
277        // We took the primary key from the first batch so we don't use `first.primary_key()`.
278        ensure!(
279            batches
280                .iter()
281                .skip(1)
282                .all(|b| b.primary_key() == primary_key),
283            InvalidBatchSnafu {
284                reason: "batches have different primary key",
285            }
286        );
287        for b in batches.iter().skip(1) {
288            ensure!(
289                b.fields.len() == first.fields.len(),
290                InvalidBatchSnafu {
291                    reason: "batches have different field num",
292                }
293            );
294            for (l, r) in b.fields.iter().zip(&first.fields) {
295                ensure!(
296                    l.column_id == r.column_id,
297                    InvalidBatchSnafu {
298                        reason: "batches have different fields",
299                    }
300                );
301            }
302        }
303
304        // We take the primary key from the first batch.
305        let mut builder = BatchBuilder::new(primary_key);
306        // Concat timestamps, sequences, op_types, fields.
307        let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308        builder.timestamps_array(array)?;
309        let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310        builder.sequences_array(array)?;
311        let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312        builder.op_types_array(array)?;
313        for (i, batch_column) in first.fields.iter().enumerate() {
314            let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315            builder.push_field_array(batch_column.column_id, array)?;
316        }
317
318        builder.build()
319    }
320
321    /// Removes rows whose op type is delete.
322    pub fn filter_deleted(&mut self) -> Result<()> {
323        // Safety: op type column is not null.
324        let array = self.op_types.as_arrow();
325        // Find rows with non-delete op type.
326        let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327        let predicate =
328            arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329        self.filter(&BooleanVector::from(predicate))
330    }
331
332    // Applies the `predicate` to the batch.
333    // Safety: We know the array type so we unwrap on casting.
334    pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335        self.timestamps = self
336            .timestamps
337            .filter(predicate)
338            .context(ComputeVectorSnafu)?;
339        self.sequences = Arc::new(
340            UInt64Vector::try_from_arrow_array(
341                arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342                    .context(ComputeArrowSnafu)?,
343            )
344            .unwrap(),
345        );
346        self.op_types = Arc::new(
347            UInt8Vector::try_from_arrow_array(
348                arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349                    .context(ComputeArrowSnafu)?,
350            )
351            .unwrap(),
352        );
353        for batch_column in &mut self.fields {
354            batch_column.data = batch_column
355                .data
356                .filter(predicate)
357                .context(ComputeVectorSnafu)?;
358        }
359
360        Ok(())
361    }
362
363    /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
364    pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
365        let seq = match (sequence, self.last_sequence()) {
366            (None, _) | (_, None) => return Ok(()),
367            (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
368            (Some(sequence), Some(_)) => sequence,
369        };
370
371        let seqs = self.sequences.as_arrow();
372        let sequence = UInt64Array::new_scalar(seq);
373        let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
374            .context(ComputeArrowSnafu)?;
375        let predicate = BooleanVector::from(predicate);
376        self.filter(&predicate)?;
377
378        Ok(())
379    }
380
381    /// Sorts rows in the batch. If `dedup` is true, it also removes
382    /// duplicated rows according to primary keys.
383    ///
384    /// It orders rows by timestamp, sequence desc and only keep the latest
385    /// row for the same timestamp. It doesn't consider op type as sequence
386    /// should already provide uniqueness for a row.
387    pub fn sort(&mut self, dedup: bool) -> Result<()> {
388        // If building a converter each time is costly, we may allow passing a
389        // converter.
390        let converter = RowConverter::new(vec![
391            SortField::new(self.timestamps.data_type().as_arrow_type()),
392            SortField::new_with_options(
393                self.sequences.data_type().as_arrow_type(),
394                SortOptions {
395                    descending: true,
396                    ..Default::default()
397                },
398            ),
399        ])
400        .context(ComputeArrowSnafu)?;
401        // Columns to sort.
402        let columns = [
403            self.timestamps.to_arrow_array(),
404            self.sequences.to_arrow_array(),
405        ];
406        let rows = converter.convert_columns(&columns).unwrap();
407        let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
408
409        let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
410        if !was_sorted {
411            to_sort.sort_unstable_by_key(|x| x.1);
412        }
413
414        let num_rows = to_sort.len();
415        if dedup {
416            // Dedup by timestamps.
417            to_sort.dedup_by(|left, right| {
418                debug_assert_eq!(18, left.1.as_ref().len());
419                debug_assert_eq!(18, right.1.as_ref().len());
420                let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
421                // We only compare the timestamp part and ignore sequence.
422                left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
423            });
424        }
425        let no_dedup = to_sort.len() == num_rows;
426
427        if was_sorted && no_dedup {
428            return Ok(());
429        }
430        let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
431        self.take_in_place(&indices)
432    }
433
434    /// Returns the estimated memory size of the batch.
435    pub fn memory_size(&self) -> usize {
436        let mut size = std::mem::size_of::<Self>();
437        size += self.primary_key.len();
438        size += self.timestamps.memory_size();
439        size += self.sequences.memory_size();
440        size += self.op_types.memory_size();
441        for batch_column in &self.fields {
442            size += batch_column.data.memory_size();
443        }
444        size
445    }
446
447    /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
448    pub(crate) fn projected_fields(
449        metadata: &RegionMetadata,
450        projection: &[ColumnId],
451    ) -> Vec<(ColumnId, ConcreteDataType)> {
452        let projected_ids: HashSet<_> = projection.iter().copied().collect();
453        metadata
454            .field_columns()
455            .filter_map(|column| {
456                if projected_ids.contains(&column.column_id) {
457                    Some((column.column_id, column.column_schema.data_type.clone()))
458                } else {
459                    None
460                }
461            })
462            .collect()
463    }
464
465    /// Returns timestamps in a native slice or `None` if the batch is empty.
466    pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
467        if self.timestamps.is_empty() {
468            return None;
469        }
470
471        let values = match self.timestamps.data_type() {
472            ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
473                .timestamps
474                .as_any()
475                .downcast_ref::<TimestampSecondVector>()
476                .unwrap()
477                .as_arrow()
478                .values(),
479            ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
480                .timestamps
481                .as_any()
482                .downcast_ref::<TimestampMillisecondVector>()
483                .unwrap()
484                .as_arrow()
485                .values(),
486            ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
487                .timestamps
488                .as_any()
489                .downcast_ref::<TimestampMicrosecondVector>()
490                .unwrap()
491                .as_arrow()
492                .values(),
493            ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
494                .timestamps
495                .as_any()
496                .downcast_ref::<TimestampNanosecondVector>()
497                .unwrap()
498                .as_arrow()
499                .values(),
500            other => panic!("timestamps in a Batch has other type {:?}", other),
501        };
502
503        Some(values)
504    }
505
506    /// Takes the batch in place.
507    fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
508        self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
509        let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
510            .context(ComputeArrowSnafu)?;
511        // Safety: we know the array and vector type.
512        self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
513        let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
514            .context(ComputeArrowSnafu)?;
515        self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
516        for batch_column in &mut self.fields {
517            batch_column.data = batch_column
518                .data
519                .take(indices)
520                .context(ComputeVectorSnafu)?;
521        }
522
523        Ok(())
524    }
525
526    /// Gets a timestamp at given `index`.
527    ///
528    /// # Panics
529    /// Panics if `index` is out-of-bound or the timestamp vector returns null.
530    fn get_timestamp(&self, index: usize) -> Timestamp {
531        match self.timestamps.get_ref(index) {
532            ValueRef::Timestamp(timestamp) => timestamp,
533
534            // We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
535            value => panic!("{:?} is not a timestamp", value),
536        }
537    }
538
539    /// Gets a sequence at given `index`.
540    ///
541    /// # Panics
542    /// Panics if `index` is out-of-bound or the sequence vector returns null.
543    pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
544        // Safety: sequences is not null so it actually returns Some.
545        self.sequences.get_data(index).unwrap()
546    }
547
548    /// Checks the batch is monotonic by timestamps.
549    #[cfg(debug_assertions)]
550    pub(crate) fn check_monotonic(&self) -> Result<(), String> {
551        use std::cmp::Ordering;
552        if self.timestamps_native().is_none() {
553            return Ok(());
554        }
555
556        let timestamps = self.timestamps_native().unwrap();
557        let sequences = self.sequences.as_arrow().values();
558        for (i, window) in timestamps.windows(2).enumerate() {
559            let current = window[0];
560            let next = window[1];
561            let current_sequence = sequences[i];
562            let next_sequence = sequences[i + 1];
563            match current.cmp(&next) {
564                Ordering::Less => {
565                    // The current timestamp is less than the next timestamp.
566                    continue;
567                }
568                Ordering::Equal => {
569                    // The current timestamp is equal to the next timestamp.
570                    if current_sequence < next_sequence {
571                        return Err(format!(
572                            "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
573                            current, next, current_sequence, next_sequence, i
574                        ));
575                    }
576                }
577                Ordering::Greater => {
578                    // The current timestamp is greater than the next timestamp.
579                    return Err(format!(
580                        "timestamps are not monotonic: {} > {}, index: {}",
581                        current, next, i
582                    ));
583                }
584            }
585        }
586
587        Ok(())
588    }
589
590    /// Returns Ok if the given batch is behind the current batch.
591    #[cfg(debug_assertions)]
592    pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
593        // Checks the primary key
594        if self.primary_key() < other.primary_key() {
595            return Ok(());
596        }
597        if self.primary_key() > other.primary_key() {
598            return Err(format!(
599                "primary key is not monotonic: {:?} > {:?}",
600                self.primary_key(),
601                other.primary_key()
602            ));
603        }
604        // Checks the timestamp.
605        if self.last_timestamp() < other.first_timestamp() {
606            return Ok(());
607        }
608        if self.last_timestamp() > other.first_timestamp() {
609            return Err(format!(
610                "timestamps are not monotonic: {:?} > {:?}",
611                self.last_timestamp(),
612                other.first_timestamp()
613            ));
614        }
615        // Checks the sequence.
616        if self.last_sequence() >= other.first_sequence() {
617            return Ok(());
618        }
619        Err(format!(
620            "sequences are not monotonic: {:?} < {:?}",
621            self.last_sequence(),
622            other.first_sequence()
623        ))
624    }
625
626    /// Returns the value of the column in the primary key.
627    ///
628    /// Lazily decodes the primary key and caches the result.
629    pub fn pk_col_value(
630        &mut self,
631        codec: &dyn PrimaryKeyCodec,
632        col_idx_in_pk: usize,
633        column_id: ColumnId,
634    ) -> Result<Option<&Value>> {
635        if self.pk_values.is_none() {
636            self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
637        }
638
639        let pk_values = self.pk_values.as_ref().unwrap();
640        Ok(match pk_values {
641            CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
642            CompositeValues::Sparse(values) => values.get(&column_id),
643        })
644    }
645
646    /// Returns values of the field in the batch.
647    ///
648    /// Lazily caches the field index.
649    pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
650        if self.fields_idx.is_none() {
651            self.fields_idx = Some(
652                self.fields
653                    .iter()
654                    .enumerate()
655                    .map(|(i, c)| (c.column_id, i))
656                    .collect(),
657            );
658        }
659
660        self.fields_idx
661            .as_ref()
662            .unwrap()
663            .get(&column_id)
664            .map(|&idx| &self.fields[idx])
665    }
666}
667
668/// A struct to check the batch is monotonic.
669#[cfg(debug_assertions)]
670#[derive(Default)]
671pub(crate) struct BatchChecker {
672    last_batch: Option<Batch>,
673    start: Option<Timestamp>,
674    end: Option<Timestamp>,
675}
676
677#[cfg(debug_assertions)]
678impl BatchChecker {
679    /// Attaches the given start timestamp to the checker.
680    pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
681        self.start = start;
682        self
683    }
684
685    /// Attaches the given end timestamp to the checker.
686    pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
687        self.end = end;
688        self
689    }
690
691    /// Returns true if the given batch is monotonic and behind
692    /// the last batch.
693    pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
694        batch.check_monotonic()?;
695
696        if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
697            if start > first {
698                return Err(format!(
699                    "batch's first timestamp is before the start timestamp: {:?} > {:?}",
700                    start, first
701                ));
702            }
703        }
704        if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
705            if end <= last {
706                return Err(format!(
707                    "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
708                    end, last
709                ));
710            }
711        }
712
713        // Checks the batch is behind the last batch.
714        // Then Updates the last batch.
715        let res = self
716            .last_batch
717            .as_ref()
718            .map(|last| last.check_next_batch(batch))
719            .unwrap_or(Ok(()));
720        self.last_batch = Some(batch.clone());
721        res
722    }
723
724    /// Formats current batch and last batch for debug.
725    pub(crate) fn format_batch(&self, batch: &Batch) -> String {
726        use std::fmt::Write;
727
728        let mut message = String::new();
729        if let Some(last) = &self.last_batch {
730            write!(
731                message,
732                "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
733                last.primary_key(),
734                last.last_timestamp(),
735                last.last_sequence()
736            )
737            .unwrap();
738        }
739        write!(
740            message,
741            "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
742            batch.primary_key(),
743            batch.timestamps(),
744            batch.sequences()
745        )
746        .unwrap();
747
748        message
749    }
750
751    /// Checks batches from the part range are monotonic. Otherwise, panics.
752    pub(crate) fn ensure_part_range_batch(
753        &mut self,
754        scanner: &str,
755        region_id: store_api::storage::RegionId,
756        partition: usize,
757        part_range: store_api::region_engine::PartitionRange,
758        batch: &Batch,
759    ) {
760        if let Err(e) = self.check_monotonic(batch) {
761            let err_msg = format!(
762                "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
763                scanner, e, region_id, partition, part_range,
764            );
765            common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
766            // Only print the number of row in the panic message.
767            panic!("{err_msg}, batch rows: {}", batch.num_rows());
768        }
769    }
770}
771
772/// Len of timestamp in arrow row format.
773const TIMESTAMP_KEY_LEN: usize = 9;
774
775/// Helper function to concat arrays from `iter`.
776fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
777    let arrays: Vec<_> = iter.collect();
778    let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
779    arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
780}
781
782/// A column in a [Batch].
783#[derive(Debug, PartialEq, Eq, Clone)]
784pub struct BatchColumn {
785    /// Id of the column.
786    pub column_id: ColumnId,
787    /// Data of the column.
788    pub data: VectorRef,
789}
790
791/// Builder to build [Batch].
792pub struct BatchBuilder {
793    primary_key: Vec<u8>,
794    timestamps: Option<VectorRef>,
795    sequences: Option<Arc<UInt64Vector>>,
796    op_types: Option<Arc<UInt8Vector>>,
797    fields: Vec<BatchColumn>,
798}
799
800impl BatchBuilder {
801    /// Creates a new [BatchBuilder] with primary key.
802    pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
803        BatchBuilder {
804            primary_key,
805            timestamps: None,
806            sequences: None,
807            op_types: None,
808            fields: Vec::new(),
809        }
810    }
811
812    /// Creates a new [BatchBuilder] with all required columns.
813    pub fn with_required_columns(
814        primary_key: Vec<u8>,
815        timestamps: VectorRef,
816        sequences: Arc<UInt64Vector>,
817        op_types: Arc<UInt8Vector>,
818    ) -> BatchBuilder {
819        BatchBuilder {
820            primary_key,
821            timestamps: Some(timestamps),
822            sequences: Some(sequences),
823            op_types: Some(op_types),
824            fields: Vec::new(),
825        }
826    }
827
828    /// Set all field columns.
829    pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
830        self.fields = fields;
831        self
832    }
833
834    /// Push a field column.
835    pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
836        self.fields.push(column);
837        self
838    }
839
840    /// Push an array as a field.
841    pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
842        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
843        self.fields.push(BatchColumn {
844            column_id,
845            data: vector,
846        });
847
848        Ok(self)
849    }
850
851    /// Try to set an array as timestamps.
852    pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
853        let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
854        ensure!(
855            vector.data_type().is_timestamp(),
856            InvalidBatchSnafu {
857                reason: format!("{:?} is not a timestamp type", vector.data_type()),
858            }
859        );
860
861        self.timestamps = Some(vector);
862        Ok(self)
863    }
864
865    /// Try to set an array as sequences.
866    pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
867        ensure!(
868            *array.data_type() == arrow::datatypes::DataType::UInt64,
869            InvalidBatchSnafu {
870                reason: "sequence array is not UInt64 type",
871            }
872        );
873        // Safety: The cast must success as we have ensured it is uint64 type.
874        let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
875        self.sequences = Some(vector);
876
877        Ok(self)
878    }
879
880    /// Try to set an array as op types.
881    pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
882        ensure!(
883            *array.data_type() == arrow::datatypes::DataType::UInt8,
884            InvalidBatchSnafu {
885                reason: "sequence array is not UInt8 type",
886            }
887        );
888        // Safety: The cast must success as we have ensured it is uint64 type.
889        let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
890        self.op_types = Some(vector);
891
892        Ok(self)
893    }
894
895    /// Builds the [Batch].
896    pub fn build(self) -> Result<Batch> {
897        let timestamps = self.timestamps.context(InvalidBatchSnafu {
898            reason: "missing timestamps",
899        })?;
900        let sequences = self.sequences.context(InvalidBatchSnafu {
901            reason: "missing sequences",
902        })?;
903        let op_types = self.op_types.context(InvalidBatchSnafu {
904            reason: "missing op_types",
905        })?;
906        // Our storage format ensure these columns are not nullable so
907        // we use assert here.
908        assert_eq!(0, timestamps.null_count());
909        assert_eq!(0, sequences.null_count());
910        assert_eq!(0, op_types.null_count());
911
912        let ts_len = timestamps.len();
913        ensure!(
914            sequences.len() == ts_len,
915            InvalidBatchSnafu {
916                reason: format!(
917                    "sequence have different len {} != {}",
918                    sequences.len(),
919                    ts_len
920                ),
921            }
922        );
923        ensure!(
924            op_types.len() == ts_len,
925            InvalidBatchSnafu {
926                reason: format!(
927                    "op type have different len {} != {}",
928                    op_types.len(),
929                    ts_len
930                ),
931            }
932        );
933        for column in &self.fields {
934            ensure!(
935                column.data.len() == ts_len,
936                InvalidBatchSnafu {
937                    reason: format!(
938                        "column {} has different len {} != {}",
939                        column.column_id,
940                        column.data.len(),
941                        ts_len
942                    ),
943                }
944            );
945        }
946
947        Ok(Batch {
948            primary_key: self.primary_key,
949            pk_values: None,
950            timestamps,
951            sequences,
952            op_types,
953            fields: self.fields,
954            fields_idx: None,
955        })
956    }
957}
958
959impl From<Batch> for BatchBuilder {
960    fn from(batch: Batch) -> Self {
961        Self {
962            primary_key: batch.primary_key,
963            timestamps: Some(batch.timestamps),
964            sequences: Some(batch.sequences),
965            op_types: Some(batch.op_types),
966            fields: batch.fields,
967        }
968    }
969}
970
971/// Async [Batch] reader and iterator wrapper.
972///
973/// This is the data source for SST writers or internal readers.
974pub enum Source {
975    /// Source from a [BoxedBatchReader].
976    Reader(BoxedBatchReader),
977    /// Source from a [BoxedBatchIterator].
978    Iter(BoxedBatchIterator),
979    /// Source from a [BoxedBatchStream].
980    Stream(BoxedBatchStream),
981    /// Source from a [PruneReader].
982    PruneReader(PruneReader),
983}
984
985impl Source {
986    /// Returns next [Batch] from this data source.
987    pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
988        match self {
989            Source::Reader(reader) => reader.next_batch().await,
990            Source::Iter(iter) => iter.next().transpose(),
991            Source::Stream(stream) => stream.try_next().await,
992            Source::PruneReader(reader) => reader.next_batch().await,
993        }
994    }
995}
996
997/// Async [RecordBatch] reader and iterator wrapper for flat format.
998pub enum FlatSource {
999    /// Source from a [BoxedRecordBatchIterator].
1000    Iter(BoxedRecordBatchIterator),
1001    /// Source from a [BoxedRecordBatchStream].
1002    Stream(BoxedRecordBatchStream),
1003}
1004
1005impl FlatSource {
1006    /// Returns next [RecordBatch] from this data source.
1007    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1008        match self {
1009            FlatSource::Iter(iter) => iter.next().transpose(),
1010            FlatSource::Stream(stream) => stream.try_next().await,
1011        }
1012    }
1013}
1014
1015/// Async batch reader.
1016///
1017/// The reader must guarantee [Batch]es returned by it have the same schema.
1018#[async_trait]
1019pub trait BatchReader: Send {
1020    /// Fetch next [Batch].
1021    ///
1022    /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
1023    /// again won't return batch again.
1024    ///
1025    /// If `Err` is returned, caller should not call this method again, the implementor
1026    /// may or may not panic in such case.
1027    async fn next_batch(&mut self) -> Result<Option<Batch>>;
1028}
1029
1030/// Pointer to [BatchReader].
1031pub type BoxedBatchReader = Box<dyn BatchReader>;
1032
1033/// Pointer to a stream that yields [Batch].
1034pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1035
1036/// Pointer to a stream that yields [RecordBatch].
1037pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1038
1039#[async_trait::async_trait]
1040impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1041    async fn next_batch(&mut self) -> Result<Option<Batch>> {
1042        (**self).next_batch().await
1043    }
1044}
1045
1046/// Local metrics for scanners.
1047#[derive(Debug, Default)]
1048pub(crate) struct ScannerMetrics {
1049    /// Duration to prepare the scan task.
1050    prepare_scan_cost: Duration,
1051    /// Duration to build the (merge) reader.
1052    build_reader_cost: Duration,
1053    /// Duration to scan data.
1054    scan_cost: Duration,
1055    /// Duration while waiting for `yield`.
1056    yield_cost: Duration,
1057    /// Number of batches returned.
1058    num_batches: usize,
1059    /// Number of rows returned.
1060    num_rows: usize,
1061    /// Number of mem ranges scanned.
1062    num_mem_ranges: usize,
1063    /// Number of file ranges scanned.
1064    num_file_ranges: usize,
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1070    use store_api::codec::PrimaryKeyEncoding;
1071    use store_api::storage::consts::ReservedColumnId;
1072
1073    use super::*;
1074    use crate::error::Error;
1075    use crate::test_util::new_batch_builder;
1076
1077    fn new_batch(
1078        timestamps: &[i64],
1079        sequences: &[u64],
1080        op_types: &[OpType],
1081        field: &[u64],
1082    ) -> Batch {
1083        new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1084            .build()
1085            .unwrap()
1086    }
1087
1088    #[test]
1089    fn test_empty_batch() {
1090        let batch = Batch::empty();
1091        assert!(batch.is_empty());
1092        assert_eq!(None, batch.first_timestamp());
1093        assert_eq!(None, batch.last_timestamp());
1094        assert_eq!(None, batch.first_sequence());
1095        assert_eq!(None, batch.last_sequence());
1096        assert!(batch.timestamps_native().is_none());
1097    }
1098
1099    #[test]
1100    fn test_first_last_one() {
1101        let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1102        assert_eq!(
1103            Timestamp::new_millisecond(1),
1104            batch.first_timestamp().unwrap()
1105        );
1106        assert_eq!(
1107            Timestamp::new_millisecond(1),
1108            batch.last_timestamp().unwrap()
1109        );
1110        assert_eq!(2, batch.first_sequence().unwrap());
1111        assert_eq!(2, batch.last_sequence().unwrap());
1112    }
1113
1114    #[test]
1115    fn test_first_last_multiple() {
1116        let batch = new_batch(
1117            &[1, 2, 3],
1118            &[11, 12, 13],
1119            &[OpType::Put, OpType::Put, OpType::Put],
1120            &[21, 22, 23],
1121        );
1122        assert_eq!(
1123            Timestamp::new_millisecond(1),
1124            batch.first_timestamp().unwrap()
1125        );
1126        assert_eq!(
1127            Timestamp::new_millisecond(3),
1128            batch.last_timestamp().unwrap()
1129        );
1130        assert_eq!(11, batch.first_sequence().unwrap());
1131        assert_eq!(13, batch.last_sequence().unwrap());
1132    }
1133
1134    #[test]
1135    fn test_slice() {
1136        let batch = new_batch(
1137            &[1, 2, 3, 4],
1138            &[11, 12, 13, 14],
1139            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1140            &[21, 22, 23, 24],
1141        );
1142        let batch = batch.slice(1, 2);
1143        let expect = new_batch(
1144            &[2, 3],
1145            &[12, 13],
1146            &[OpType::Delete, OpType::Put],
1147            &[22, 23],
1148        );
1149        assert_eq!(expect, batch);
1150    }
1151
1152    #[test]
1153    fn test_timestamps_native() {
1154        let batch = new_batch(
1155            &[1, 2, 3, 4],
1156            &[11, 12, 13, 14],
1157            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1158            &[21, 22, 23, 24],
1159        );
1160        assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1161    }
1162
1163    #[test]
1164    fn test_concat_empty() {
1165        let err = Batch::concat(vec![]).unwrap_err();
1166        assert!(
1167            matches!(err, Error::InvalidBatch { .. }),
1168            "unexpected err: {err}"
1169        );
1170    }
1171
1172    #[test]
1173    fn test_concat_one() {
1174        let batch = new_batch(&[], &[], &[], &[]);
1175        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1176        assert_eq!(batch, actual);
1177
1178        let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1179        let actual = Batch::concat(vec![batch.clone()]).unwrap();
1180        assert_eq!(batch, actual);
1181    }
1182
1183    #[test]
1184    fn test_concat_multiple() {
1185        let batches = vec![
1186            new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1187            new_batch(
1188                &[3, 4, 5],
1189                &[13, 14, 15],
1190                &[OpType::Put, OpType::Delete, OpType::Put],
1191                &[23, 24, 25],
1192            ),
1193            new_batch(&[], &[], &[], &[]),
1194            new_batch(&[6], &[16], &[OpType::Put], &[26]),
1195        ];
1196        let batch = Batch::concat(batches).unwrap();
1197        let expect = new_batch(
1198            &[1, 2, 3, 4, 5, 6],
1199            &[11, 12, 13, 14, 15, 16],
1200            &[
1201                OpType::Put,
1202                OpType::Put,
1203                OpType::Put,
1204                OpType::Delete,
1205                OpType::Put,
1206                OpType::Put,
1207            ],
1208            &[21, 22, 23, 24, 25, 26],
1209        );
1210        assert_eq!(expect, batch);
1211    }
1212
1213    #[test]
1214    fn test_concat_different() {
1215        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1216        let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1217        batch2.primary_key = b"hello".to_vec();
1218        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1219        assert!(
1220            matches!(err, Error::InvalidBatch { .. }),
1221            "unexpected err: {err}"
1222        );
1223    }
1224
1225    #[test]
1226    fn test_concat_different_fields() {
1227        let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1228        let fields = vec![
1229            batch1.fields()[0].clone(),
1230            BatchColumn {
1231                column_id: 2,
1232                data: Arc::new(UInt64Vector::from_slice([2])),
1233            },
1234        ];
1235        // Batch 2 has more fields.
1236        let batch2 = batch1.clone().with_fields(fields).unwrap();
1237        let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1238        assert!(
1239            matches!(err, Error::InvalidBatch { .. }),
1240            "unexpected err: {err}"
1241        );
1242
1243        // Batch 2 has different field.
1244        let fields = vec![BatchColumn {
1245            column_id: 2,
1246            data: Arc::new(UInt64Vector::from_slice([2])),
1247        }];
1248        let batch2 = batch1.clone().with_fields(fields).unwrap();
1249        let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1250        assert!(
1251            matches!(err, Error::InvalidBatch { .. }),
1252            "unexpected err: {err}"
1253        );
1254    }
1255
1256    #[test]
1257    fn test_filter_deleted_empty() {
1258        let mut batch = new_batch(&[], &[], &[], &[]);
1259        batch.filter_deleted().unwrap();
1260        assert!(batch.is_empty());
1261    }
1262
1263    #[test]
1264    fn test_filter_deleted() {
1265        let mut batch = new_batch(
1266            &[1, 2, 3, 4],
1267            &[11, 12, 13, 14],
1268            &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1269            &[21, 22, 23, 24],
1270        );
1271        batch.filter_deleted().unwrap();
1272        let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1273        assert_eq!(expect, batch);
1274
1275        let mut batch = new_batch(
1276            &[1, 2, 3, 4],
1277            &[11, 12, 13, 14],
1278            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1279            &[21, 22, 23, 24],
1280        );
1281        let expect = batch.clone();
1282        batch.filter_deleted().unwrap();
1283        assert_eq!(expect, batch);
1284    }
1285
1286    #[test]
1287    fn test_filter_by_sequence() {
1288        // Filters put only.
1289        let mut batch = new_batch(
1290            &[1, 2, 3, 4],
1291            &[11, 12, 13, 14],
1292            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1293            &[21, 22, 23, 24],
1294        );
1295        batch.filter_by_sequence(Some(13)).unwrap();
1296        let expect = new_batch(
1297            &[1, 2, 3],
1298            &[11, 12, 13],
1299            &[OpType::Put, OpType::Put, OpType::Put],
1300            &[21, 22, 23],
1301        );
1302        assert_eq!(expect, batch);
1303
1304        // Filters to empty.
1305        let mut batch = new_batch(
1306            &[1, 2, 3, 4],
1307            &[11, 12, 13, 14],
1308            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1309            &[21, 22, 23, 24],
1310        );
1311
1312        batch.filter_by_sequence(Some(10)).unwrap();
1313        assert!(batch.is_empty());
1314
1315        // None filter.
1316        let mut batch = new_batch(
1317            &[1, 2, 3, 4],
1318            &[11, 12, 13, 14],
1319            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1320            &[21, 22, 23, 24],
1321        );
1322        let expect = batch.clone();
1323        batch.filter_by_sequence(None).unwrap();
1324        assert_eq!(expect, batch);
1325
1326        // Filter a empty batch
1327        let mut batch = new_batch(&[], &[], &[], &[]);
1328        batch.filter_by_sequence(Some(10)).unwrap();
1329        assert!(batch.is_empty());
1330
1331        // Filter a empty batch with None
1332        let mut batch = new_batch(&[], &[], &[], &[]);
1333        batch.filter_by_sequence(None).unwrap();
1334        assert!(batch.is_empty());
1335    }
1336
1337    #[test]
1338    fn test_filter() {
1339        // Filters put only.
1340        let mut batch = new_batch(
1341            &[1, 2, 3, 4],
1342            &[11, 12, 13, 14],
1343            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1344            &[21, 22, 23, 24],
1345        );
1346        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1347        batch.filter(&predicate).unwrap();
1348        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1349        assert_eq!(expect, batch);
1350
1351        // Filters deletion.
1352        let mut batch = new_batch(
1353            &[1, 2, 3, 4],
1354            &[11, 12, 13, 14],
1355            &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1356            &[21, 22, 23, 24],
1357        );
1358        let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1359        batch.filter(&predicate).unwrap();
1360        let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1361        assert_eq!(expect, batch);
1362
1363        // Filters to empty.
1364        let predicate = BooleanVector::from_vec(vec![false, false]);
1365        batch.filter(&predicate).unwrap();
1366        assert!(batch.is_empty());
1367    }
1368
1369    #[test]
1370    fn test_sort_and_dedup() {
1371        let original = new_batch(
1372            &[2, 3, 1, 4, 5, 2],
1373            &[1, 2, 3, 4, 5, 6],
1374            &[
1375                OpType::Put,
1376                OpType::Put,
1377                OpType::Put,
1378                OpType::Put,
1379                OpType::Put,
1380                OpType::Put,
1381            ],
1382            &[21, 22, 23, 24, 25, 26],
1383        );
1384
1385        let mut batch = original.clone();
1386        batch.sort(true).unwrap();
1387        // It should only keep one timestamp 2.
1388        assert_eq!(
1389            new_batch(
1390                &[1, 2, 3, 4, 5],
1391                &[3, 6, 2, 4, 5],
1392                &[
1393                    OpType::Put,
1394                    OpType::Put,
1395                    OpType::Put,
1396                    OpType::Put,
1397                    OpType::Put,
1398                ],
1399                &[23, 26, 22, 24, 25],
1400            ),
1401            batch
1402        );
1403
1404        let mut batch = original.clone();
1405        batch.sort(false).unwrap();
1406
1407        // It should only keep one timestamp 2.
1408        assert_eq!(
1409            new_batch(
1410                &[1, 2, 2, 3, 4, 5],
1411                &[3, 6, 1, 2, 4, 5],
1412                &[
1413                    OpType::Put,
1414                    OpType::Put,
1415                    OpType::Put,
1416                    OpType::Put,
1417                    OpType::Put,
1418                    OpType::Put,
1419                ],
1420                &[23, 26, 21, 22, 24, 25],
1421            ),
1422            batch
1423        );
1424
1425        let original = new_batch(
1426            &[2, 2, 1],
1427            &[1, 6, 1],
1428            &[OpType::Delete, OpType::Put, OpType::Put],
1429            &[21, 22, 23],
1430        );
1431
1432        let mut batch = original.clone();
1433        batch.sort(true).unwrap();
1434        let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1435        assert_eq!(expect, batch);
1436
1437        let mut batch = original.clone();
1438        batch.sort(false).unwrap();
1439        let expect = new_batch(
1440            &[1, 2, 2],
1441            &[1, 6, 1],
1442            &[OpType::Put, OpType::Put, OpType::Delete],
1443            &[23, 22, 21],
1444        );
1445        assert_eq!(expect, batch);
1446    }
1447
1448    #[test]
1449    fn test_get_value() {
1450        let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1451
1452        for encoding in encodings {
1453            let codec = build_primary_key_codec_with_fields(
1454                encoding,
1455                [
1456                    (
1457                        ReservedColumnId::table_id(),
1458                        row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1459                    ),
1460                    (
1461                        ReservedColumnId::tsid(),
1462                        row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1463                    ),
1464                    (
1465                        100,
1466                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1467                    ),
1468                    (
1469                        200,
1470                        row_converter::SortField::new(ConcreteDataType::string_datatype()),
1471                    ),
1472                ]
1473                .into_iter(),
1474            );
1475
1476            let values = [
1477                Value::UInt32(1000),
1478                Value::UInt64(2000),
1479                Value::String("abcdefgh".into()),
1480                Value::String("zyxwvu".into()),
1481            ];
1482            let mut buf = vec![];
1483            codec
1484                .encode_values(
1485                    &[
1486                        (ReservedColumnId::table_id(), values[0].clone()),
1487                        (ReservedColumnId::tsid(), values[1].clone()),
1488                        (100, values[2].clone()),
1489                        (200, values[3].clone()),
1490                    ],
1491                    &mut buf,
1492                )
1493                .unwrap();
1494
1495            let field_col_id = 2;
1496            let mut batch = new_batch_builder(
1497                &buf,
1498                &[1, 2, 3],
1499                &[1, 1, 1],
1500                &[OpType::Put, OpType::Put, OpType::Put],
1501                field_col_id,
1502                &[42, 43, 44],
1503            )
1504            .build()
1505            .unwrap();
1506
1507            let v = batch
1508                .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1509                .unwrap()
1510                .unwrap();
1511            assert_eq!(values[0], *v);
1512
1513            let v = batch
1514                .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1515                .unwrap()
1516                .unwrap();
1517            assert_eq!(values[1], *v);
1518
1519            let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1520            assert_eq!(values[2], *v);
1521
1522            let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1523            assert_eq!(values[3], *v);
1524
1525            let v = batch.field_col_value(field_col_id).unwrap();
1526            assert_eq!(v.data.get(0), Value::UInt64(42));
1527            assert_eq!(v.data.get(1), Value::UInt64(43));
1528            assert_eq!(v.data.get(2), Value::UInt64(44));
1529        }
1530    }
1531}