pub mod compat;
pub mod dedup;
pub mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod scan_util;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::v1::OpType;
use async_trait::async_trait;
use common_time::Timestamp;
use datafusion_common::arrow::array::UInt8Array;
use datatypes::arrow;
use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
Vector, VectorRef,
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::prune::PruneReader;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
primary_key: Vec<u8>,
pk_values: Option<CompositeValues>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
fields_idx: Option<HashMap<ColumnId, usize>>,
}
impl Batch {
pub fn new(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
.with_fields(fields)
.build()
}
pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
Batch::new(
self.primary_key,
self.timestamps,
self.sequences,
self.op_types,
fields,
)
}
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
}
pub fn pk_values(&self) -> Option<&CompositeValues> {
self.pk_values.as_ref()
}
pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
self.pk_values = Some(pk_values);
}
#[cfg(any(test, feature = "test"))]
pub fn remove_pk_values(&mut self) {
self.pk_values = None;
}
pub fn fields(&self) -> &[BatchColumn] {
&self.fields
}
pub fn timestamps(&self) -> &VectorRef {
&self.timestamps
}
pub fn sequences(&self) -> &Arc<UInt64Vector> {
&self.sequences
}
pub fn op_types(&self) -> &Arc<UInt8Vector> {
&self.op_types
}
pub fn num_rows(&self) -> usize {
self.sequences.len()
}
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
pub fn first_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
}
Some(self.get_timestamp(0))
}
pub fn last_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
}
Some(self.get_timestamp(self.timestamps.len() - 1))
}
pub fn first_sequence(&self) -> Option<SequenceNumber> {
if self.sequences.is_empty() {
return None;
}
Some(self.get_sequence(0))
}
pub fn last_sequence(&self) -> Option<SequenceNumber> {
if self.sequences.is_empty() {
return None;
}
Some(self.get_sequence(self.sequences.len() - 1))
}
pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
self.primary_key = primary_key;
}
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let fields = self
.fields
.iter()
.map(|column| BatchColumn {
column_id: column.column_id,
data: column.data.slice(offset, length),
})
.collect();
Batch {
primary_key: self.primary_key.clone(),
pk_values: self.pk_values.clone(),
timestamps: self.timestamps.slice(offset, length),
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
fields,
fields_idx: self.fields_idx.clone(),
}
}
pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
ensure!(
!batches.is_empty(),
InvalidBatchSnafu {
reason: "empty batches",
}
);
if batches.len() == 1 {
return Ok(batches.pop().unwrap());
}
let primary_key = std::mem::take(&mut batches[0].primary_key);
let first = &batches[0];
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.primary_key() == primary_key),
InvalidBatchSnafu {
reason: "batches have different primary key",
}
);
for b in batches.iter().skip(1) {
ensure!(
b.fields.len() == first.fields.len(),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);
for (l, r) in b.fields.iter().zip(&first.fields) {
ensure!(
l.column_id == r.column_id,
InvalidBatchSnafu {
reason: "batches have different fields",
}
);
}
}
let mut builder = BatchBuilder::new(primary_key);
let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
builder.timestamps_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
builder.sequences_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
builder.op_types_array(array)?;
for (i, batch_column) in first.fields.iter().enumerate() {
let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
builder.push_field_array(batch_column.column_id, array)?;
}
builder.build()
}
pub fn filter_deleted(&mut self) -> Result<()> {
let array = self.op_types.as_arrow();
let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
let predicate =
arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
self.filter(&BooleanVector::from(predicate))
}
pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
self.timestamps = self
.timestamps
.filter(predicate)
.context(ComputeVectorSnafu)?;
self.sequences = Arc::new(
UInt64Vector::try_from_arrow_array(
arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
self.op_types = Arc::new(
UInt8Vector::try_from_arrow_array(
arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.filter(predicate)
.context(ComputeVectorSnafu)?;
}
Ok(())
}
pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
let seq = match (sequence, self.last_sequence()) {
(None, _) | (_, None) => return Ok(()),
(Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
(Some(sequence), Some(_)) => sequence,
};
let seqs = self.sequences.as_arrow();
let sequence = UInt64Array::new_scalar(seq);
let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
.context(ComputeArrowSnafu)?;
let predicate = BooleanVector::from(predicate);
self.filter(&predicate)?;
Ok(())
}
pub fn sort(&mut self, dedup: bool) -> Result<()> {
let converter = RowConverter::new(vec![
SortField::new(self.timestamps.data_type().as_arrow_type()),
SortField::new_with_options(
self.sequences.data_type().as_arrow_type(),
SortOptions {
descending: true,
..Default::default()
},
),
])
.context(ComputeArrowSnafu)?;
let columns = [
self.timestamps.to_arrow_array(),
self.sequences.to_arrow_array(),
];
let rows = converter.convert_columns(&columns).unwrap();
let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
if !was_sorted {
to_sort.sort_unstable_by_key(|x| x.1);
}
let num_rows = to_sort.len();
if dedup {
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
}
let no_dedup = to_sort.len() == num_rows;
if was_sorted && no_dedup {
return Ok(());
}
let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
self.take_in_place(&indices)
}
pub fn memory_size(&self) -> usize {
let mut size = std::mem::size_of::<Self>();
size += self.primary_key.len();
size += self.timestamps.memory_size();
size += self.sequences.memory_size();
size += self.op_types.memory_size();
for batch_column in &self.fields {
size += batch_column.data.memory_size();
}
size
}
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<(ColumnId, ConcreteDataType)> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some((column.column_id, column.column_schema.data_type.clone()))
} else {
None
}
})
.collect()
}
pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
if self.timestamps.is_empty() {
return None;
}
let values = match self.timestamps.data_type() {
ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampSecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampMicrosecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.unwrap()
.as_arrow()
.values(),
other => panic!("timestamps in a Batch has other type {:?}", other),
};
Some(values)
}
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.take(indices)
.context(ComputeVectorSnafu)?;
}
Ok(())
}
fn get_timestamp(&self, index: usize) -> Timestamp {
match self.timestamps.get_ref(index) {
ValueRef::Timestamp(timestamp) => timestamp,
value => panic!("{:?} is not a timestamp", value),
}
}
pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
self.sequences.get_data(index).unwrap()
}
#[cfg(debug_assertions)]
pub(crate) fn check_monotonic(&self) -> Result<(), String> {
use std::cmp::Ordering;
if self.timestamps_native().is_none() {
return Ok(());
}
let timestamps = self.timestamps_native().unwrap();
let sequences = self.sequences.as_arrow().values();
for (i, window) in timestamps.windows(2).enumerate() {
let current = window[0];
let next = window[1];
let current_sequence = sequences[i];
let next_sequence = sequences[i + 1];
match current.cmp(&next) {
Ordering::Less => {
continue;
}
Ordering::Equal => {
if current_sequence < next_sequence {
return Err(format!(
"sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
current, next, current_sequence, next_sequence, i
));
}
}
Ordering::Greater => {
return Err(format!(
"timestamps are not monotonic: {} > {}, index: {}",
current, next, i
));
}
}
}
Ok(())
}
#[cfg(debug_assertions)]
pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
if self.primary_key() < other.primary_key() {
return Ok(());
}
if self.primary_key() > other.primary_key() {
return Err(format!(
"primary key is not monotonic: {:?} > {:?}",
self.primary_key(),
other.primary_key()
));
}
if self.last_timestamp() < other.first_timestamp() {
return Ok(());
}
if self.last_timestamp() > other.first_timestamp() {
return Err(format!(
"timestamps are not monotonic: {:?} > {:?}",
self.last_timestamp(),
other.first_timestamp()
));
}
if self.last_sequence() >= other.first_sequence() {
return Ok(());
}
Err(format!(
"sequences are not monotonic: {:?} < {:?}",
self.last_sequence(),
other.first_sequence()
))
}
pub fn pk_col_value(
&mut self,
codec: &dyn PrimaryKeyCodec,
col_idx_in_pk: usize,
column_id: ColumnId,
) -> Result<Option<&Value>> {
if self.pk_values.is_none() {
self.pk_values = Some(codec.decode(&self.primary_key)?);
}
let pk_values = self.pk_values.as_ref().unwrap();
Ok(match pk_values {
CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
CompositeValues::Sparse(values) => values.get(&column_id),
})
}
pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
if self.fields_idx.is_none() {
self.fields_idx = Some(
self.fields
.iter()
.enumerate()
.map(|(i, c)| (c.column_id, i))
.collect(),
);
}
self.fields_idx
.as_ref()
.unwrap()
.get(&column_id)
.map(|&idx| &self.fields[idx])
}
}
#[cfg(debug_assertions)]
#[derive(Default)]
pub(crate) struct BatchChecker {
last_batch: Option<Batch>,
start: Option<Timestamp>,
end: Option<Timestamp>,
}
#[cfg(debug_assertions)]
impl BatchChecker {
pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
self.start = start;
self
}
pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
self.end = end;
self
}
pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
batch.check_monotonic()?;
if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
if start > first {
return Err(format!(
"batch's first timestamp is before the start timestamp: {:?} > {:?}",
start, first
));
}
}
if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
if end <= last {
return Err(format!(
"batch's last timestamp is after the end timestamp: {:?} <= {:?}",
end, last
));
}
}
let res = self
.last_batch
.as_ref()
.map(|last| last.check_next_batch(batch))
.unwrap_or(Ok(()));
self.last_batch = Some(batch.clone());
res
}
pub(crate) fn format_batch(&self, batch: &Batch) -> String {
use std::fmt::Write;
let mut message = String::new();
if let Some(last) = &self.last_batch {
write!(
message,
"last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
last.primary_key(),
last.last_timestamp(),
last.last_sequence()
)
.unwrap();
}
write!(
message,
"batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
batch.primary_key(),
batch.timestamps(),
batch.sequences()
)
.unwrap();
message
}
pub(crate) fn ensure_part_range_batch(
&mut self,
scanner: &str,
region_id: store_api::storage::RegionId,
partition: usize,
part_range: store_api::region_engine::PartitionRange,
batch: &Batch,
) {
if let Err(e) = self.check_monotonic(batch) {
let err_msg = format!(
"{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
scanner, e, region_id, partition, part_range,
);
common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
panic!("{err_msg}, batch rows: {}", batch.num_rows());
}
}
}
const TIMESTAMP_KEY_LEN: usize = 9;
fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
let arrays: Vec<_> = iter.collect();
let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BatchColumn {
pub column_id: ColumnId,
pub data: VectorRef,
}
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: Option<VectorRef>,
sequences: Option<Arc<UInt64Vector>>,
op_types: Option<Arc<UInt8Vector>>,
fields: Vec<BatchColumn>,
}
impl BatchBuilder {
pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps: None,
sequences: None,
op_types: None,
fields: Vec::new(),
}
}
pub fn with_required_columns(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps: Some(timestamps),
sequences: Some(sequences),
op_types: Some(op_types),
fields: Vec::new(),
}
}
pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
self.fields = fields;
self
}
pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
self.fields.push(column);
self
}
pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
self.fields.push(BatchColumn {
column_id,
data: vector,
});
Ok(self)
}
pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
ensure!(
vector.data_type().is_timestamp(),
InvalidBatchSnafu {
reason: format!("{:?} is not a timestamp type", vector.data_type()),
}
);
self.timestamps = Some(vector);
Ok(self)
}
pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt64,
InvalidBatchSnafu {
reason: "sequence array is not UInt64 type",
}
);
let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
self.sequences = Some(vector);
Ok(self)
}
pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt8,
InvalidBatchSnafu {
reason: "sequence array is not UInt8 type",
}
);
let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
self.op_types = Some(vector);
Ok(self)
}
pub fn build(self) -> Result<Batch> {
let timestamps = self.timestamps.context(InvalidBatchSnafu {
reason: "missing timestamps",
})?;
let sequences = self.sequences.context(InvalidBatchSnafu {
reason: "missing sequences",
})?;
let op_types = self.op_types.context(InvalidBatchSnafu {
reason: "missing op_types",
})?;
assert_eq!(0, timestamps.null_count());
assert_eq!(0, sequences.null_count());
assert_eq!(0, op_types.null_count());
let ts_len = timestamps.len();
ensure!(
sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
sequences.len(),
ts_len
),
}
);
ensure!(
op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
op_types.len(),
ts_len
),
}
);
for column in &self.fields {
ensure!(
column.data.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"column {} has different len {} != {}",
column.column_id,
column.data.len(),
ts_len
),
}
);
}
Ok(Batch {
primary_key: self.primary_key,
pk_values: None,
timestamps,
sequences,
op_types,
fields: self.fields,
fields_idx: None,
})
}
}
impl From<Batch> for BatchBuilder {
fn from(batch: Batch) -> Self {
Self {
primary_key: batch.primary_key,
timestamps: Some(batch.timestamps),
sequences: Some(batch.sequences),
op_types: Some(batch.op_types),
fields: batch.fields,
}
}
}
pub enum Source {
Reader(BoxedBatchReader),
Iter(BoxedBatchIterator),
Stream(BoxedBatchStream),
PruneReader(PruneReader),
}
impl Source {
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
Source::PruneReader(reader) => reader.next_batch().await,
}
}
}
#[async_trait]
pub trait BatchReader: Send {
async fn next_batch(&mut self) -> Result<Option<Batch>>;
}
pub type BoxedBatchReader = Box<dyn BatchReader>;
pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
(**self).next_batch().await
}
}
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
prepare_scan_cost: Duration,
build_reader_cost: Duration,
scan_cost: Duration,
convert_cost: Duration,
yield_cost: Duration,
num_batches: usize,
num_rows: usize,
num_mem_ranges: usize,
num_file_ranges: usize,
}
#[cfg(test)]
mod tests {
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::error::Error;
use crate::row_converter::{self, build_primary_key_codec_with_fields};
use crate::test_util::new_batch_builder;
fn new_batch(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
.build()
.unwrap()
}
#[test]
fn test_empty_batch() {
let batch = new_batch(&[], &[], &[], &[]);
assert_eq!(None, batch.first_timestamp());
assert_eq!(None, batch.last_timestamp());
assert_eq!(None, batch.first_sequence());
assert_eq!(None, batch.last_sequence());
assert!(batch.timestamps_native().is_none());
}
#[test]
fn test_first_last_one() {
let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
assert_eq!(
Timestamp::new_millisecond(1),
batch.first_timestamp().unwrap()
);
assert_eq!(
Timestamp::new_millisecond(1),
batch.last_timestamp().unwrap()
);
assert_eq!(2, batch.first_sequence().unwrap());
assert_eq!(2, batch.last_sequence().unwrap());
}
#[test]
fn test_first_last_multiple() {
let batch = new_batch(
&[1, 2, 3],
&[11, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23],
);
assert_eq!(
Timestamp::new_millisecond(1),
batch.first_timestamp().unwrap()
);
assert_eq!(
Timestamp::new_millisecond(3),
batch.last_timestamp().unwrap()
);
assert_eq!(11, batch.first_sequence().unwrap());
assert_eq!(13, batch.last_sequence().unwrap());
}
#[test]
fn test_slice() {
let batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let batch = batch.slice(1, 2);
let expect = new_batch(
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
);
assert_eq!(expect, batch);
}
#[test]
fn test_timestamps_native() {
let batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
}
#[test]
fn test_concat_empty() {
let err = Batch::concat(vec![]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_concat_one() {
let batch = new_batch(&[], &[], &[], &[]);
let actual = Batch::concat(vec![batch.clone()]).unwrap();
assert_eq!(batch, actual);
let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
let actual = Batch::concat(vec![batch.clone()]).unwrap();
assert_eq!(batch, actual);
}
#[test]
fn test_concat_multiple() {
let batches = vec![
new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
new_batch(
&[3, 4, 5],
&[13, 14, 15],
&[OpType::Put, OpType::Delete, OpType::Put],
&[23, 24, 25],
),
new_batch(&[], &[], &[], &[]),
new_batch(&[6], &[16], &[OpType::Put], &[26]),
];
let batch = Batch::concat(batches).unwrap();
let expect = new_batch(
&[1, 2, 3, 4, 5, 6],
&[11, 12, 13, 14, 15, 16],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Delete,
OpType::Put,
OpType::Put,
],
&[21, 22, 23, 24, 25, 26],
);
assert_eq!(expect, batch);
}
#[test]
fn test_concat_different() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
batch2.primary_key = b"hello".to_vec();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_concat_different_fields() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let fields = vec![
batch1.fields()[0].clone(),
BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
},
];
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
let fields = vec![BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
}];
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_filter_deleted_empty() {
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_deleted().unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_filter_deleted() {
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
&[21, 22, 23, 24],
);
batch.filter_deleted().unwrap();
let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
assert_eq!(expect, batch);
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let expect = batch.clone();
batch.filter_deleted().unwrap();
assert_eq!(expect, batch);
}
#[test]
fn test_filter_by_sequence() {
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch.filter_by_sequence(Some(13)).unwrap();
let expect = new_batch(
&[1, 2, 3],
&[11, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23],
);
assert_eq!(expect, batch);
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch.filter_by_sequence(Some(10)).unwrap();
assert!(batch.is_empty());
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let expect = batch.clone();
batch.filter_by_sequence(None).unwrap();
assert_eq!(expect, batch);
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_by_sequence(Some(10)).unwrap();
assert!(batch.is_empty());
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_by_sequence(None).unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_filter() {
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
batch.filter(&predicate).unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
batch.filter(&predicate).unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
let predicate = BooleanVector::from_vec(vec![false, false]);
batch.filter(&predicate).unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_sort_and_dedup() {
let original = new_batch(
&[2, 3, 1, 4, 5, 2],
&[1, 2, 3, 4, 5, 6],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[21, 22, 23, 24, 25, 26],
);
let mut batch = original.clone();
batch.sort(true).unwrap();
assert_eq!(
new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
),
batch
);
let mut batch = original.clone();
batch.sort(false).unwrap();
assert_eq!(
new_batch(
&[1, 2, 2, 3, 4, 5],
&[3, 6, 1, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 21, 22, 24, 25],
),
batch
);
let original = new_batch(
&[2, 2, 1],
&[1, 6, 1],
&[OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23],
);
let mut batch = original.clone();
batch.sort(true).unwrap();
let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
assert_eq!(expect, batch);
let mut batch = original.clone();
batch.sort(false).unwrap();
let expect = new_batch(
&[1, 2, 2],
&[1, 6, 1],
&[OpType::Put, OpType::Put, OpType::Delete],
&[23, 22, 21],
);
assert_eq!(expect, batch);
}
#[test]
fn test_get_value() {
let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
for encoding in encodings {
let codec = build_primary_key_codec_with_fields(
encoding,
[
(
ReservedColumnId::table_id(),
row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
),
(
ReservedColumnId::tsid(),
row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
),
(
100,
row_converter::SortField::new(ConcreteDataType::string_datatype()),
),
(
200,
row_converter::SortField::new(ConcreteDataType::string_datatype()),
),
]
.into_iter(),
);
let values = [
Value::UInt32(1000),
Value::UInt64(2000),
Value::String("abcdefgh".into()),
Value::String("zyxwvu".into()),
];
let mut buf = vec![];
codec
.encode_values(
&[
(ReservedColumnId::table_id(), values[0].clone()),
(ReservedColumnId::tsid(), values[1].clone()),
(100, values[2].clone()),
(200, values[3].clone()),
],
&mut buf,
)
.unwrap();
let field_col_id = 2;
let mut batch = new_batch_builder(
&buf,
&[1, 2, 3],
&[1, 1, 1],
&[OpType::Put, OpType::Put, OpType::Put],
field_col_id,
&[42, 43, 44],
)
.build()
.unwrap();
let v = batch
.pk_col_value(&*codec, 0, ReservedColumnId::table_id())
.unwrap()
.unwrap();
assert_eq!(values[0], *v);
let v = batch
.pk_col_value(&*codec, 1, ReservedColumnId::tsid())
.unwrap()
.unwrap();
assert_eq!(values[1], *v);
let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
assert_eq!(values[2], *v);
let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
assert_eq!(values[3], *v);
let v = batch.field_col_value(field_col_id).unwrap();
assert_eq!(v.data.get(0), Value::UInt64(42));
assert_eq!(v.data.get(1), Value::UInt64(43));
assert_eq!(v.data.get(2), Value::UInt64(44));
}
}
}