use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
pub(crate) struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
}
impl<R, S> DedupReader<R, S> {
pub(crate) fn new(source: R, strategy: S) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
}
}
}
impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
}
}
#[async_trait]
impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.fetch_next_batch().await
}
}
impl<R, S> Drop for DedupReader<R, S> {
fn drop(&mut self) {
debug!("Dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
}
}
#[cfg(test)]
impl<R, S> DedupReader<R, S> {
fn metrics(&self) -> &DedupMetrics {
&self.metrics
}
}
pub(crate) trait DedupStrategy: Send {
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
}
struct BatchLastRow {
primary_key: Vec<u8>,
timestamp: Timestamp,
}
pub(crate) struct LastRow {
prev_batch: Option<BatchLastRow>,
filter_deleted: bool,
}
impl LastRow {
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
prev_batch: None,
filter_deleted,
}
}
}
impl DedupStrategy for LastRow {
fn push_batch(
&mut self,
mut batch: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
if batch.is_empty() {
return Ok(None);
}
debug_assert!(batch.first_timestamp().is_some());
let prev_timestamp = match &self.prev_batch {
Some(prev_batch) => {
if prev_batch.primary_key != batch.primary_key() {
None
} else {
Some(prev_batch.timestamp)
}
}
None => None,
};
if batch.first_timestamp() == prev_timestamp {
metrics.num_unselected_rows += 1;
if batch.num_rows() == 1 {
return Ok(None);
}
batch = batch.slice(1, batch.num_rows() - 1);
}
match &mut self.prev_batch {
Some(prev) => {
prev.primary_key.clone_from(&batch.primary_key);
prev.timestamp = batch.last_timestamp().unwrap();
}
None => {
self.prev_batch = Some(BatchLastRow {
primary_key: batch.primary_key().to_vec(),
timestamp: batch.last_timestamp().unwrap(),
})
}
}
if self.filter_deleted {
filter_deleted_from_batch(&mut batch, metrics)?;
}
if batch.is_empty() {
Ok(None)
} else {
Ok(Some(batch))
}
}
fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
Ok(None)
}
}
fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
let num_rows = batch.num_rows();
batch.filter_deleted()?;
let num_rows_after_filter = batch.num_rows();
let num_deleted = num_rows - num_rows_after_filter;
metrics.num_deleted_rows += num_deleted;
metrics.num_unselected_rows += num_deleted;
Ok(())
}
#[derive(Debug, Default)]
pub(crate) struct DedupMetrics {
pub(crate) num_unselected_rows: usize,
pub(crate) num_deleted_rows: usize,
}
struct LastFieldsBuilder {
filter_deleted: bool,
builders: Vec<Box<dyn MutableVector>>,
last_fields: Vec<Value>,
contains_null: bool,
contains_deletion: bool,
initialized: bool,
}
impl LastFieldsBuilder {
fn new(filter_deleted: bool) -> Self {
Self {
filter_deleted,
builders: Vec::new(),
last_fields: Vec::new(),
contains_null: false,
contains_deletion: false,
initialized: false,
}
}
fn maybe_init(&mut self, batch: &Batch) {
debug_assert!(!batch.is_empty());
if self.initialized {
return;
}
self.initialized = true;
if batch.fields().is_empty() {
return;
}
let last_idx = batch.num_rows() - 1;
let fields = batch.fields();
self.contains_deletion =
batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
if !self.contains_deletion {
self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
}
if self.skip_merge() {
return;
}
if self.builders.is_empty() {
self.builders = fields
.iter()
.map(|col| col.data.data_type().create_mutable_vector(1))
.collect();
}
self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
}
fn skip_merge(&self) -> bool {
debug_assert!(self.initialized);
self.contains_deletion || !self.contains_null
}
fn push_first_row(&mut self, batch: &Batch) {
debug_assert!(self.initialized);
debug_assert!(!batch.is_empty());
if self.skip_merge() {
return;
}
self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
if self.contains_deletion {
return;
}
let fields = batch.fields();
for (idx, value) in self.last_fields.iter_mut().enumerate() {
if value.is_null() && !fields[idx].data.is_null(0) {
*value = fields[idx].data.get(0);
}
}
self.contains_null = self.last_fields.iter().any(Value::is_null);
}
fn merge_last_non_null(
&mut self,
buffer: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
debug_assert!(self.initialized);
let mut output = if self.last_fields.is_empty() {
buffer
} else {
for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
builder.push_value_ref(value.as_value_ref());
}
let fields = self
.builders
.iter_mut()
.zip(buffer.fields())
.map(|(builder, col)| BatchColumn {
column_id: col.column_id,
data: builder.to_vector(),
})
.collect();
if buffer.num_rows() == 1 {
buffer.with_fields(fields)?
} else {
let front = buffer.slice(0, buffer.num_rows() - 1);
let last = buffer.slice(buffer.num_rows() - 1, 1);
let last = last.with_fields(fields)?;
Batch::concat(vec![front, last])?
}
};
self.clear();
if self.filter_deleted {
filter_deleted_from_batch(&mut output, metrics)?;
}
if output.is_empty() {
Ok(None)
} else {
Ok(Some(output))
}
}
fn clear(&mut self) {
self.last_fields.clear();
self.contains_null = false;
self.contains_deletion = false;
self.initialized = false;
}
}
pub(crate) struct LastNonNull {
buffer: Option<Batch>,
last_fields: LastFieldsBuilder,
}
impl LastNonNull {
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
buffer: None,
last_fields: LastFieldsBuilder::new(filter_deleted),
}
}
}
impl DedupStrategy for LastNonNull {
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
if batch.is_empty() {
return Ok(None);
}
let Some(buffer) = self.buffer.as_mut() else {
self.buffer = Some(batch);
return Ok(None);
};
self.last_fields.maybe_init(buffer);
if buffer.primary_key() != batch.primary_key() {
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
return Ok(merged);
}
if buffer.last_timestamp() != batch.first_timestamp() {
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
return Ok(merged);
}
metrics.num_unselected_rows += 1;
if batch.num_rows() == 1 {
self.last_fields.push_first_row(&batch);
return Ok(None);
}
let first = batch.slice(0, 1);
self.last_fields.push_first_row(&first);
let batch = batch.slice(1, batch.num_rows() - 1);
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
Ok(merged)
}
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let Some(buffer) = self.buffer.take() else {
return Ok(None);
};
self.last_fields.maybe_init(&buffer);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
Ok(merged)
}
}
pub(crate) struct LastNonNullIter<I> {
iter: Option<I>,
strategy: LastNonNull,
metrics: DedupMetrics,
current_batch: Option<Batch>,
current_index: usize,
}
impl<I> LastNonNullIter<I> {
pub(crate) fn new(iter: I) -> Self {
Self {
iter: Some(iter),
strategy: LastNonNull::new(false),
metrics: DedupMetrics::default(),
current_batch: None,
current_index: 0,
}
}
}
impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
if self.current_batch.is_none() {
let Some(iter) = self.iter.as_mut() else {
return Ok(None);
};
self.current_batch = iter.next().transpose()?;
self.current_index = 0;
if self.current_batch.is_none() {
self.iter = None;
return Ok(None);
}
}
if let Some(batch) = &self.current_batch {
let n = batch.num_rows();
let timestamps = batch.timestamps_native().unwrap();
let mut pos = self.current_index;
while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
pos += 1;
}
let segment = batch.slice(self.current_index, pos - self.current_index + 1);
if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
self.current_index = pos + 1;
} else {
self.current_batch = None;
self.current_index = 0;
}
return Ok(Some(segment));
}
Ok(None)
}
fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.next_batch_for_merge()? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
}
}
impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.next_batch().transpose()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use super::*;
use crate::read::BatchBuilder;
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
async fn test_dedup_reader_no_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
new_batch(
b"k2",
&[1, 2],
&[111, 112],
&[OpType::Put, OpType::Put],
&[31, 32],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_dedup_reader_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(b"k1", &[], &[], &[], &[]),
new_batch(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[2, 13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(5, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(false));
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(3, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
fn new_batch_multi_fields(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
fields: &[(Option<u64>, Option<u64>)],
) -> Batch {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
)
.unwrap()
.push_field_array(
2,
Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
)
.unwrap();
builder.build().unwrap()
}
#[tokio::test]
async fn test_last_non_null_merge() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k2",
&[2],
&[19],
&[OpType::Put],
&[(Some(102), Some(102))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
new_batch_multi_fields(
b"k2",
&[1],
&[20],
&[OpType::Put],
&[(Some(101), Some(101))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(6, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[(Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(4, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_skip_merge_single() {
let input = [new_batch_multi_fields(
b"k1",
&[1, 2, 3],
&[13, 11, 13],
&[OpType::Put, OpType::Delete, OpType::Put],
&[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
)];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
check_reader_result(
&mut reader,
&[new_batch_multi_fields(
b"k1",
&[1, 3],
&[13, 13],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(13), Some(13))],
)],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(1, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_skip_merge_no_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[9, 13],
&[OpType::Put, OpType::Put],
&[(Some(32), None), (Some(13), Some(13))],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
],
)
.await;
assert_eq!(2, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_merge_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
let mut actual = Vec::new();
let mut metrics = DedupMetrics::default();
for batch in input {
if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
actual.push(out);
}
}
if let Some(out) = strategy.finish(&mut metrics).unwrap() {
actual.push(out);
}
assert_eq!(expect, actual);
}
#[test]
fn test_last_non_null_strategy_delete_last() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k2",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Delete],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];
let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
],
);
}
#[test]
fn test_last_non_null_strategy_delete_one() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
];
let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[new_batch_multi_fields(
b"k2",
&[1],
&[6],
&[OpType::Put],
&[(Some(11), None)],
)],
);
}
#[test]
fn test_last_non_null_strategy_delete_all() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
];
let mut strategy = LastNonNull::new(true);
check_dedup_strategy(&input, &mut strategy, &[]);
}
#[test]
fn test_last_non_null_strategy_same_batch() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Put],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];
let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
],
);
}
#[test]
fn test_last_non_null_strategy_delete_middle() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
];
let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
],
);
}
#[test]
fn test_last_non_null_iter_on_batch() {
let input = [new_batch_multi_fields(
b"k1",
&[1, 1, 2],
&[13, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[(None, None), (Some(1), None), (Some(2), Some(22))],
)];
let iter = input.into_iter().map(Ok);
let iter = LastNonNullIter::new(iter);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
let expect = [
new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
];
assert_eq!(&expect, &actual[..]);
}
#[test]
fn test_last_non_null_iter_same_row() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 1, 1],
&[13, 12, 11],
&[OpType::Put, OpType::Put, OpType::Put],
&[(None, None), (Some(1), None), (Some(11), None)],
),
new_batch_multi_fields(
b"k1",
&[1, 1],
&[10, 9],
&[OpType::Put, OpType::Put],
&[(None, Some(11)), (Some(21), Some(31))],
),
];
let iter = input.into_iter().map(Ok);
let iter = LastNonNullIter::new(iter);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
let expect = [new_batch_multi_fields(
b"k1",
&[1],
&[13],
&[OpType::Put],
&[(Some(1), Some(11))],
)];
assert_eq!(&expect, &actual[..]);
}
#[test]
fn test_last_non_null_iter_multi_batch() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 1, 2],
&[13, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[(None, None), (Some(1), None), (Some(2), Some(22))],
),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[12, 13],
&[OpType::Put, OpType::Delete],
&[(None, Some(12)), (None, None)],
),
new_batch_multi_fields(
b"k2",
&[1, 1, 2],
&[13, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[(None, None), (Some(1), None), (Some(2), Some(22))],
),
];
let iter = input.into_iter().map(Ok);
let iter = LastNonNullIter::new(iter);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
let expect = [
new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
];
assert_eq!(&expect, &actual[..]);
}
fn new_batch_no_fields(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
) -> Batch {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap();
builder.build().unwrap()
}
#[test]
fn test_last_non_null_iter_no_batch() {
let input = [
new_batch_no_fields(
b"k1",
&[1, 1, 2],
&[13, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
),
new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
new_batch_no_fields(
b"k2",
&[1, 1, 2],
&[13, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
),
];
let iter = input.into_iter().map(Ok);
let iter = LastNonNullIter::new(iter);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
let expect = [
new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
];
assert_eq!(&expect, &actual[..]);
}
}