mito2/read/
flat_dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dedup implementation for flat format.
16
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::v1::OpType;
22use async_stream::try_stream;
23use common_telemetry::debug;
24use datatypes::arrow::array::{
25    Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt8Array, UInt64Array,
26    make_comparator,
27};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::compute::kernels::cmp::distinct;
30use datatypes::arrow::compute::kernels::partition::{Partitions, partition};
31use datatypes::arrow::compute::kernels::take::take;
32use datatypes::arrow::compute::{
33    SortOptions, TakeOptions, concat_batches, filter_record_batch, take_record_batch,
34};
35use datatypes::arrow::error::ArrowError;
36use datatypes::arrow::record_batch::RecordBatch;
37use futures::{Stream, TryStreamExt};
38use snafu::ResultExt;
39
40use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
41use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
42use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
43use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
44use crate::sst::parquet::flat_format::{
45    op_type_column_index, primary_key_column_index, time_index_column_index,
46};
47use crate::sst::parquet::format::{FIXED_POS_COLUMN_NUM, PrimaryKeyArray};
48
49/// An iterator to dedup sorted batches from an iterator based on the dedup strategy.
50pub struct FlatDedupIterator<I, S> {
51    iter: I,
52    strategy: S,
53    metrics: DedupMetrics,
54}
55
56impl<I, S> FlatDedupIterator<I, S> {
57    /// Creates a new dedup iterator.
58    pub fn new(iter: I, strategy: S) -> Self {
59        Self {
60            iter,
61            strategy,
62            metrics: DedupMetrics::default(),
63        }
64    }
65}
66
67impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> FlatDedupIterator<I, S> {
68    /// Returns the next deduplicated batch.
69    fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
70        while let Some(batch) = self.iter.next().transpose()? {
71            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
72                return Ok(Some(batch));
73            }
74        }
75
76        self.strategy.finish(&mut self.metrics)
77    }
78}
79
80impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Iterator
81    for FlatDedupIterator<I, S>
82{
83    type Item = Result<RecordBatch>;
84
85    fn next(&mut self) -> Option<Self::Item> {
86        self.fetch_next_batch().transpose()
87    }
88}
89
90/// An async reader to dedup sorted record batches from a stream based on the dedup strategy.
91pub struct FlatDedupReader<I, S> {
92    stream: I,
93    strategy: S,
94    metrics: DedupMetrics,
95    /// Optional metrics reporter.
96    metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
97}
98
99impl<I, S> FlatDedupReader<I, S> {
100    /// Creates a new dedup reader.
101    pub fn new(
102        stream: I,
103        strategy: S,
104        metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
105    ) -> Self {
106        Self {
107            stream,
108            strategy,
109            metrics: DedupMetrics::default(),
110            metrics_reporter,
111        }
112    }
113}
114
115impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
116    FlatDedupReader<I, S>
117{
118    /// Returns the next deduplicated batch.
119    async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
120        while let Some(batch) = self.stream.try_next().await? {
121            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
122                self.metrics.maybe_report(&self.metrics_reporter);
123                return Ok(Some(batch));
124            }
125        }
126
127        let result = self.strategy.finish(&mut self.metrics)?;
128        self.metrics.maybe_report(&self.metrics_reporter);
129        Ok(result)
130    }
131
132    /// Converts the reader into a stream.
133    pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
134        try_stream! {
135            while let Some(batch) = self.fetch_next_batch().await? {
136                yield batch;
137            }
138        }
139    }
140}
141
142impl<I, S> Drop for FlatDedupReader<I, S> {
143    fn drop(&mut self) {
144        debug!("Flat dedup reader finished, metrics: {:?}", self.metrics);
145
146        MERGE_FILTER_ROWS_TOTAL
147            .with_label_values(&["dedup"])
148            .inc_by(self.metrics.num_unselected_rows as u64);
149        MERGE_FILTER_ROWS_TOTAL
150            .with_label_values(&["delete"])
151            .inc_by(self.metrics.num_deleted_rows as u64);
152
153        // Report any remaining metrics.
154        if let Some(reporter) = &self.metrics_reporter {
155            reporter.report(&mut self.metrics);
156        }
157    }
158}
159
160/// Strategy to remove duplicate rows from sorted record batches.
161pub trait RecordBatchDedupStrategy: Send {
162    /// Pushes a batch to the dedup strategy.
163    /// Returns a batch if the strategy ensures there is no duplications based on
164    /// the input batch.
165    fn push_batch(
166        &mut self,
167        batch: RecordBatch,
168        metrics: &mut DedupMetrics,
169    ) -> Result<Option<RecordBatch>>;
170
171    /// Finishes the deduplication process and returns any remaining batch.
172    ///
173    /// Users must ensure that `push_batch` is called for all batches before
174    /// calling this method.
175    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>>;
176}
177
178/// Dedup strategy that keeps the row with latest sequence of each key.
179pub struct FlatLastRow {
180    /// Meta of the last row in the previous batch that has the same key
181    /// as the batch to push.
182    prev_batch: Option<BatchLastRow>,
183    /// Filter deleted rows.
184    filter_deleted: bool,
185}
186
187impl FlatLastRow {
188    /// Creates a new strategy with the given `filter_deleted` flag.
189    pub fn new(filter_deleted: bool) -> Self {
190        Self {
191            prev_batch: None,
192            filter_deleted,
193        }
194    }
195
196    /// Remove duplications from the batch without considering previous rows.
197    fn dedup_one_batch(batch: RecordBatch) -> Result<RecordBatch> {
198        let num_rows = batch.num_rows();
199        if num_rows < 2 {
200            return Ok(batch);
201        }
202
203        let num_columns = batch.num_columns();
204        let timestamps = batch.column(time_index_column_index(num_columns));
205        // Checks duplications based on the timestamp.
206        let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
207        if mask.count_set_bits() == num_rows - 1 {
208            // Fast path: No duplication.
209            return Ok(batch);
210        }
211
212        // The batch has duplicated timestamps, but it doesn't mean it must
213        // has duplicated rows.
214        // Partitions the batch by the primary key and time index.
215        let columns: Vec<_> = [
216            primary_key_column_index(num_columns),
217            time_index_column_index(num_columns),
218        ]
219        .iter()
220        .map(|index| batch.column(*index).clone())
221        .collect();
222        let partitions = partition(&columns).context(ComputeArrowSnafu)?;
223
224        Self::dedup_by_partitions(batch, &partitions)
225    }
226
227    /// Remove duplications for each partition.
228    fn dedup_by_partitions(batch: RecordBatch, partitions: &Partitions) -> Result<RecordBatch> {
229        let ranges = partitions.ranges();
230        // Each range at least has 1 row.
231        let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
232        if num_duplications == 0 {
233            // Fast path, no duplications.
234            return Ok(batch);
235        }
236
237        // Always takes the first row in each range.
238        let take_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
239        take_record_batch(&batch, &take_indices).context(ComputeArrowSnafu)
240    }
241}
242
243impl RecordBatchDedupStrategy for FlatLastRow {
244    fn push_batch(
245        &mut self,
246        batch: RecordBatch,
247        metrics: &mut DedupMetrics,
248    ) -> Result<Option<RecordBatch>> {
249        let start = Instant::now();
250
251        if batch.num_rows() == 0 {
252            return Ok(None);
253        }
254
255        // Dedup current batch to ensure no duplication before we checking the previous row.
256        let row_before_dedup = batch.num_rows();
257        let mut batch = Self::dedup_one_batch(batch)?;
258
259        if let Some(prev_batch) = &self.prev_batch {
260            // If we have previous batch.
261            if prev_batch.is_last_row_duplicated(&batch) {
262                // Duplicated with the last batch, skip the first row.
263                batch = batch.slice(1, batch.num_rows() - 1);
264            }
265        }
266        metrics.num_unselected_rows += row_before_dedup - batch.num_rows();
267
268        let Some(batch_last_row) = BatchLastRow::try_new(batch.clone()) else {
269            // The batch after dedup is empty.
270            // We don't need to update `prev_batch` because they have the same
271            // key and timestamp.
272            metrics.dedup_cost += start.elapsed();
273            return Ok(None);
274        };
275
276        // Store current batch to `prev_batch` so we could compare the next batch
277        // with this batch. We store batch before filtering it as rows with `OpType::Delete`
278        // would be removed from the batch after filter, then we may store an incorrect `last row`
279        // of previous batch.
280        // Safety: We checked the batch is not empty before.
281        self.prev_batch = Some(batch_last_row);
282
283        // Filters deleted rows at last.
284        let result = maybe_filter_deleted(batch, self.filter_deleted, metrics);
285
286        metrics.dedup_cost += start.elapsed();
287
288        result
289    }
290
291    fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
292        Ok(None)
293    }
294}
295
296/// Dedup strategy that keeps the last non-null field for the same key.
297pub struct FlatLastNonNull {
298    /// The start index of field columns:
299    field_column_start: usize,
300    /// Filter deleted rows.
301    filter_deleted: bool,
302    /// Buffered batch to check whether the next batch have duplicated rows with this batch.
303    /// Fields in the last row of this batch may be updated by the next batch.
304    /// The buffered batch should contain no duplication.
305    buffer: Option<BatchLastRow>,
306    /// Whether the last row range contains a delete operation.
307    /// If so, we don't need to update null fields.
308    contains_delete: bool,
309}
310
311impl RecordBatchDedupStrategy for FlatLastNonNull {
312    fn push_batch(
313        &mut self,
314        batch: RecordBatch,
315        metrics: &mut DedupMetrics,
316    ) -> Result<Option<RecordBatch>> {
317        let start = Instant::now();
318
319        if batch.num_rows() == 0 {
320            return Ok(None);
321        }
322
323        let row_before_dedup = batch.num_rows();
324
325        let Some(buffer) = self.buffer.take() else {
326            // If the buffer is None, dedup the batch, put the batch into the buffer and return.
327            // There is no previous batch with the same key, we can pass contains_delete as false.
328            let (record_batch, contains_delete) =
329                Self::dedup_one_batch(batch, self.field_column_start, false)?;
330            metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
331            self.buffer = BatchLastRow::try_new(record_batch);
332            self.contains_delete = contains_delete;
333
334            metrics.dedup_cost += start.elapsed();
335            return Ok(None);
336        };
337
338        if !buffer.is_last_row_duplicated(&batch) {
339            // The first row of batch has different key from the buffer.
340            // We can replace the buffer with the new batch.
341            // Dedup the batch.
342            // There is no previous batch with the same key, we can pass contains_delete as false.
343            let (record_batch, contains_delete) =
344                Self::dedup_one_batch(batch, self.field_column_start, false)?;
345            metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
346            debug_assert!(record_batch.num_rows() > 0);
347            self.buffer = BatchLastRow::try_new(record_batch);
348            self.contains_delete = contains_delete;
349
350            let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
351            metrics.dedup_cost += start.elapsed();
352            return result;
353        }
354
355        // The next batch has duplicated rows.
356        // We can return rows except the last row in the buffer.
357        let output = if buffer.last_batch.num_rows() > 1 {
358            let dedup_batch = buffer.last_batch.slice(0, buffer.last_batch.num_rows() - 1);
359            debug_assert_eq!(buffer.last_batch.num_rows() - 1, dedup_batch.num_rows());
360
361            maybe_filter_deleted(dedup_batch, self.filter_deleted, metrics)?
362        } else {
363            None
364        };
365        let last_row = buffer.last_batch.slice(buffer.last_batch.num_rows() - 1, 1);
366
367        // We concat the last row with the next batch.
368        let schema = batch.schema();
369        let merged = concat_batches(&schema, &[last_row, batch]).context(ComputeArrowSnafu)?;
370        let merged_row_count = merged.num_rows();
371        // Dedup the merged batch and update the buffer.
372        let (record_batch, contains_delete) =
373            Self::dedup_one_batch(merged, self.field_column_start, self.contains_delete)?;
374        metrics.num_unselected_rows += merged_row_count - record_batch.num_rows();
375        debug_assert!(record_batch.num_rows() > 0);
376        self.buffer = BatchLastRow::try_new(record_batch);
377        self.contains_delete = contains_delete;
378
379        metrics.dedup_cost += start.elapsed();
380
381        Ok(output)
382    }
383
384    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
385        let Some(buffer) = self.buffer.take() else {
386            return Ok(None);
387        };
388
389        let start = Instant::now();
390
391        let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
392
393        metrics.dedup_cost += start.elapsed();
394
395        result
396    }
397}
398
399impl FlatLastNonNull {
400    /// Creates a new strategy with the given `filter_deleted` flag.
401    pub fn new(field_column_start: usize, filter_deleted: bool) -> Self {
402        Self {
403            field_column_start,
404            filter_deleted,
405            buffer: None,
406            contains_delete: false,
407        }
408    }
409
410    /// Remove duplications from the batch without considering the previous and next rows.
411    /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows.
412    fn dedup_one_batch(
413        batch: RecordBatch,
414        field_column_start: usize,
415        prev_batch_contains_delete: bool,
416    ) -> Result<(RecordBatch, bool)> {
417        // Get op type array for checking delete operations
418        let op_type_column = batch
419            .column(op_type_column_index(batch.num_columns()))
420            .clone();
421        let op_types = op_type_column
422            .as_any()
423            .downcast_ref::<UInt8Array>()
424            .unwrap();
425        let num_rows = batch.num_rows();
426        if num_rows < 2 {
427            let contains_delete = if num_rows > 0 {
428                op_types.value(0) == OpType::Delete as u8
429            } else {
430                false
431            };
432            return Ok((batch, contains_delete));
433        }
434
435        let num_columns = batch.num_columns();
436        let timestamps = batch.column(time_index_column_index(num_columns));
437        // Checks duplications based on the timestamp.
438        let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
439        if mask.count_set_bits() == num_rows - 1 {
440            let contains_delete = op_types.value(num_rows - 1) == OpType::Delete as u8;
441            // Fast path: No duplication.
442            return Ok((batch, contains_delete));
443        }
444
445        // The batch has duplicated timestamps, but it doesn't mean it must
446        // has duplicated rows.
447        // Partitions the batch by the primary key and time index.
448        let columns: Vec<_> = [
449            primary_key_column_index(num_columns),
450            time_index_column_index(num_columns),
451        ]
452        .iter()
453        .map(|index| batch.column(*index).clone())
454        .collect();
455        let partitions = partition(&columns).context(ComputeArrowSnafu)?;
456
457        Self::dedup_by_partitions(
458            batch,
459            &partitions,
460            field_column_start,
461            op_types,
462            prev_batch_contains_delete,
463        )
464    }
465
466    /// Remove depulications for each partition.
467    /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows.
468    fn dedup_by_partitions(
469        batch: RecordBatch,
470        partitions: &Partitions,
471        field_column_start: usize,
472        op_types: &UInt8Array,
473        first_range_contains_delete: bool,
474    ) -> Result<(RecordBatch, bool)> {
475        let ranges = partitions.ranges();
476        let contains_delete = Self::last_range_has_delete(&ranges, op_types);
477
478        // Each range at least has 1 row.
479        let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
480        if num_duplications == 0 {
481            // Fast path, no duplication.
482            return Ok((batch, contains_delete));
483        }
484
485        let field_column_end = batch.num_columns() - FIXED_POS_COLUMN_NUM;
486        let take_options = Some(TakeOptions {
487            check_bounds: false,
488        });
489        // Always takes the first value for non-field columns in each range.
490        let non_field_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
491        let new_columns = batch
492            .columns()
493            .iter()
494            .enumerate()
495            .map(|(col_idx, column)| {
496                if col_idx >= field_column_start && col_idx < field_column_end {
497                    let field_indices = Self::compute_field_indices(
498                        &ranges,
499                        column,
500                        op_types,
501                        first_range_contains_delete,
502                    );
503                    take(column, &field_indices, take_options.clone()).context(ComputeArrowSnafu)
504                } else {
505                    take(column, &non_field_indices, take_options.clone())
506                        .context(ComputeArrowSnafu)
507                }
508            })
509            .collect::<Result<Vec<ArrayRef>>>()?;
510
511        let record_batch =
512            RecordBatch::try_new(batch.schema(), new_columns).context(NewRecordBatchSnafu)?;
513        Ok((record_batch, contains_delete))
514    }
515
516    /// Returns an array of indices of the latest non null value for
517    /// each input range.
518    /// If all values in a range are null, the returned index is unspecific.
519    /// Stops when encountering a delete operation and ignores all subsequent rows.
520    fn compute_field_indices(
521        ranges: &[Range<usize>],
522        field_array: &ArrayRef,
523        op_types: &UInt8Array,
524        first_range_contains_delete: bool,
525    ) -> UInt64Array {
526        ranges
527            .iter()
528            .enumerate()
529            .map(|(range_idx, r)| {
530                let mut value_index = r.start as u64;
531                if range_idx == 0 && first_range_contains_delete {
532                    return Some(value_index);
533                }
534
535                // Iterate through the range to find the first valid non-null value
536                // but stop if we encounter a delete operation.
537                for i in r.clone() {
538                    if op_types.value(i) == OpType::Delete as u8 {
539                        break;
540                    }
541                    if field_array.is_valid(i) {
542                        value_index = i as u64;
543                        break;
544                    }
545                }
546
547                Some(value_index)
548            })
549            .collect()
550    }
551
552    /// Checks whether the last range contains a delete operation.
553    fn last_range_has_delete(ranges: &[Range<usize>], op_types: &UInt8Array) -> bool {
554        if let Some(last_range) = ranges.last() {
555            last_range
556                .clone()
557                .any(|i| op_types.value(i) == OpType::Delete as u8)
558        } else {
559            false
560        }
561    }
562}
563
564/// State of the batch with the last row for dedup.
565struct BatchLastRow {
566    /// The record batch that contains the last row.
567    /// It must has at least one row.
568    last_batch: RecordBatch,
569    /// Primary keys of the last batch.
570    primary_key: PrimaryKeyArray,
571    /// Last timestamp value.
572    timestamp: i64,
573}
574
575impl BatchLastRow {
576    /// Returns a new [BatchLastRow] if the record batch is not empty.
577    fn try_new(record_batch: RecordBatch) -> Option<Self> {
578        if record_batch.num_rows() > 0 {
579            let num_columns = record_batch.num_columns();
580            let primary_key = record_batch
581                .column(primary_key_column_index(num_columns))
582                .as_any()
583                .downcast_ref::<PrimaryKeyArray>()
584                .unwrap()
585                .clone();
586            let timestamp_array = record_batch.column(time_index_column_index(num_columns));
587            let timestamp = timestamp_value(timestamp_array, timestamp_array.len() - 1);
588
589            Some(Self {
590                last_batch: record_batch,
591                primary_key,
592                timestamp,
593            })
594        } else {
595            None
596        }
597    }
598
599    /// Returns true if the first row of the input `batch` is duplicated with the last row.
600    fn is_last_row_duplicated(&self, batch: &RecordBatch) -> bool {
601        if batch.num_rows() == 0 {
602            return false;
603        }
604
605        // The first timestamp in the batch.
606        let batch_timestamp = timestamp_value(
607            batch.column(time_index_column_index(batch.num_columns())),
608            0,
609        );
610        if batch_timestamp != self.timestamp {
611            return false;
612        }
613
614        let last_key = primary_key_at(&self.primary_key, self.last_batch.num_rows() - 1);
615        let primary_key = batch
616            .column(primary_key_column_index(batch.num_columns()))
617            .as_any()
618            .downcast_ref::<PrimaryKeyArray>()
619            .unwrap();
620        // Primary key of the first row in the batch.
621        let batch_key = primary_key_at(primary_key, 0);
622
623        last_key == batch_key
624    }
625}
626
627// TODO(yingwen): We only compares timestamp arrays, we can modify this function
628// to simplify the comparator.
629// Port from https://github.com/apache/arrow-rs/blob/55.0.0/arrow-ord/src/partition.rs#L155-L168
630/// Returns a mask with bits set whenever the value or nullability changes
631fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
632    let slice_len = v.len() - 1;
633    let v1 = v.slice(0, slice_len);
634    let v2 = v.slice(1, slice_len);
635
636    if !v.data_type().is_nested() {
637        return Ok(distinct(&v1, &v2)?.values().clone());
638    }
639    // Given that we're only comparing values, null ordering in the input or
640    // sort options do not matter.
641    let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
642    Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
643}
644
645/// Filters deleted rows from the record batch if `filter_deleted` is true.
646fn maybe_filter_deleted(
647    record_batch: RecordBatch,
648    filter_deleted: bool,
649    metrics: &mut DedupMetrics,
650) -> Result<Option<RecordBatch>> {
651    if !filter_deleted {
652        return Ok(Some(record_batch));
653    }
654    let batch = filter_deleted_from_batch(record_batch, metrics)?;
655    // Skips empty batches.
656    if batch.num_rows() == 0 {
657        return Ok(None);
658    }
659    Ok(Some(batch))
660}
661
662/// Removes deleted rows from the batch and updates metrics.
663fn filter_deleted_from_batch(
664    batch: RecordBatch,
665    metrics: &mut DedupMetrics,
666) -> Result<RecordBatch> {
667    let num_rows = batch.num_rows();
668    let op_type_column = batch.column(op_type_column_index(batch.num_columns()));
669    // Safety: The column should be op type.
670    let op_types = op_type_column
671        .as_any()
672        .downcast_ref::<UInt8Array>()
673        .unwrap();
674    let has_delete = op_types
675        .values()
676        .iter()
677        .any(|op_type| *op_type != OpType::Put as u8);
678    if !has_delete {
679        return Ok(batch);
680    }
681
682    let mut builder = BooleanBufferBuilder::new(op_types.len());
683    for op_type in op_types.values() {
684        if *op_type == OpType::Delete as u8 {
685            builder.append(false);
686        } else {
687            builder.append(true);
688        }
689    }
690    let predicate = BooleanArray::new(builder.into(), None);
691    let new_batch = filter_record_batch(&batch, &predicate).context(ComputeArrowSnafu)?;
692    let num_deleted = num_rows - new_batch.num_rows();
693    metrics.num_deleted_rows += num_deleted;
694    metrics.num_unselected_rows += num_deleted;
695
696    Ok(new_batch)
697}
698
699/// Gets the primary key at `index`.
700fn primary_key_at(primary_key: &PrimaryKeyArray, index: usize) -> &[u8] {
701    let key = primary_key.keys().value(index);
702    let binary_values = primary_key
703        .values()
704        .as_any()
705        .downcast_ref::<BinaryArray>()
706        .unwrap();
707    binary_values.value(key as usize)
708}
709
710/// Gets the timestamp value from the timestamp array.
711///
712/// # Panics
713/// Panics if the array is not a timestamp array or
714/// the index is out of bound.
715pub(crate) fn timestamp_value(array: &ArrayRef, idx: usize) -> i64 {
716    timestamp_array_to_i64_slice(array)[idx]
717}
718
719#[cfg(test)]
720mod tests {
721    use std::sync::Arc;
722
723    use api::v1::OpType;
724    use datatypes::arrow::array::{
725        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
726        TimestampMillisecondArray, UInt8Array, UInt64Array,
727    };
728    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
729    use datatypes::arrow::record_batch::RecordBatch;
730
731    use super::*;
732
733    /// Creates a test RecordBatch in flat format with given parameters.
734    fn new_record_batch(
735        primary_keys: &[&[u8]],
736        timestamps: &[i64],
737        sequences: &[u64],
738        op_types: &[OpType],
739        fields: &[u64],
740    ) -> RecordBatch {
741        let num_rows = timestamps.len();
742        debug_assert_eq!(sequences.len(), num_rows);
743        debug_assert_eq!(op_types.len(), num_rows);
744        debug_assert_eq!(fields.len(), num_rows);
745        debug_assert_eq!(primary_keys.len(), num_rows);
746
747        let columns: Vec<ArrayRef> = vec![
748            // k0 column (primary key as string dictionary)
749            build_test_pk_string_dict_array(primary_keys),
750            // field0 column
751            Arc::new(Int64Array::from_iter(
752                fields.iter().map(|v| Some(*v as i64)),
753            )),
754            // ts column (time index)
755            Arc::new(TimestampMillisecondArray::from_iter_values(
756                timestamps.iter().copied(),
757            )),
758            // __primary_key column
759            build_test_pk_array(primary_keys),
760            // __sequence column
761            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
762            // __op_type column
763            Arc::new(UInt8Array::from_iter_values(
764                op_types.iter().map(|v| *v as u8),
765            )),
766        ];
767
768        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
769    }
770
771    /// Creates a test RecordBatch in flat format with multiple fields for testing FlatLastNonNull.
772    fn new_record_batch_multi_fields(
773        primary_keys: &[&[u8]],
774        timestamps: &[i64],
775        sequences: &[u64],
776        op_types: &[OpType],
777        fields: &[(Option<u64>, Option<u64>)],
778    ) -> RecordBatch {
779        let num_rows = timestamps.len();
780        debug_assert_eq!(sequences.len(), num_rows);
781        debug_assert_eq!(op_types.len(), num_rows);
782        debug_assert_eq!(fields.len(), num_rows);
783        debug_assert_eq!(primary_keys.len(), num_rows);
784
785        let columns: Vec<ArrayRef> = vec![
786            // k0 column (primary key as string dictionary)
787            build_test_pk_string_dict_array(primary_keys),
788            // field0 column
789            Arc::new(Int64Array::from_iter(
790                fields.iter().map(|field| field.0.map(|v| v as i64)),
791            )),
792            // field1 column
793            Arc::new(Int64Array::from_iter(
794                fields.iter().map(|field| field.1.map(|v| v as i64)),
795            )),
796            // ts column (time index)
797            Arc::new(TimestampMillisecondArray::from_iter_values(
798                timestamps.iter().copied(),
799            )),
800            // __primary_key column
801            build_test_pk_array(primary_keys),
802            // __sequence column
803            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
804            // __op_type column
805            Arc::new(UInt8Array::from_iter_values(
806                op_types.iter().map(|v| *v as u8),
807            )),
808        ];
809
810        RecordBatch::try_new(build_test_multi_field_schema(), columns).unwrap()
811    }
812
813    /// Creates a test string dictionary primary key array for given primary keys.
814    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
815        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
816        for &pk in primary_keys {
817            let pk_str = std::str::from_utf8(pk).unwrap();
818            builder.append(pk_str).unwrap();
819        }
820        Arc::new(builder.finish())
821    }
822
823    /// Creates a test primary key array for given primary keys.
824    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
825        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
826        for &pk in primary_keys {
827            builder.append(pk).unwrap();
828        }
829        Arc::new(builder.finish())
830    }
831
832    /// Builds the arrow schema for test flat format.
833    fn build_test_flat_schema() -> SchemaRef {
834        let fields = vec![
835            Field::new(
836                "k0",
837                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
838                false,
839            ),
840            Field::new("field0", DataType::Int64, true),
841            Field::new(
842                "ts",
843                DataType::Timestamp(TimeUnit::Millisecond, None),
844                false,
845            ),
846            Field::new(
847                "__primary_key",
848                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
849                false,
850            ),
851            Field::new("__sequence", DataType::UInt64, false),
852            Field::new("__op_type", DataType::UInt8, false),
853        ];
854        Arc::new(Schema::new(fields))
855    }
856
857    /// Builds the arrow schema for test flat format with multiple fields.
858    fn build_test_multi_field_schema() -> SchemaRef {
859        let fields = vec![
860            Field::new(
861                "k0",
862                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
863                false,
864            ),
865            Field::new("field0", DataType::Int64, true),
866            Field::new("field1", DataType::Int64, true),
867            Field::new(
868                "ts",
869                DataType::Timestamp(TimeUnit::Millisecond, None),
870                false,
871            ),
872            Field::new(
873                "__primary_key",
874                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
875                false,
876            ),
877            Field::new("__sequence", DataType::UInt64, false),
878            Field::new("__op_type", DataType::UInt8, false),
879        ];
880        Arc::new(Schema::new(fields))
881    }
882
883    /// Asserts that two RecordBatch vectors are equal.
884    fn check_record_batches_equal(expected: &[RecordBatch], actual: &[RecordBatch]) {
885        for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() {
886            assert_eq!(exp, act, "RecordBatch {} differs", i);
887        }
888        assert_eq!(
889            expected.len(),
890            actual.len(),
891            "Number of batches don't match"
892        );
893    }
894
895    /// Helper function to collect iterator results.
896    fn collect_iterator_results<I>(iter: I) -> Vec<RecordBatch>
897    where
898        I: Iterator<Item = Result<RecordBatch>>,
899    {
900        iter.map(|result| result.unwrap()).collect()
901    }
902
903    #[test]
904    fn test_flat_last_row_no_duplications() {
905        let input = vec![
906            new_record_batch(
907                &[b"k1", b"k1"],
908                &[1, 2],
909                &[11, 12],
910                &[OpType::Put, OpType::Put],
911                &[21, 22],
912            ),
913            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
914            new_record_batch(
915                &[b"k2", b"k2"],
916                &[1, 2],
917                &[111, 112],
918                &[OpType::Put, OpType::Put],
919                &[31, 32],
920            ),
921        ];
922
923        // Test with filter_deleted = true
924        let iter = input.clone().into_iter().map(Ok);
925        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
926        let result = collect_iterator_results(&mut dedup_iter);
927        check_record_batches_equal(&input, &result);
928        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
929        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
930
931        // Test with filter_deleted = false
932        let iter = input.clone().into_iter().map(Ok);
933        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
934        let result = collect_iterator_results(&mut dedup_iter);
935        check_record_batches_equal(&input, &result);
936        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
937        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
938    }
939
940    #[test]
941    fn test_flat_last_row_duplications() {
942        let input = vec![
943            new_record_batch(
944                &[b"k1", b"k1"],
945                &[1, 2],
946                &[13, 11],
947                &[OpType::Put, OpType::Put],
948                &[11, 12],
949            ),
950            // empty batch.
951            new_record_batch(&[], &[], &[], &[], &[]),
952            // Duplicate with the previous batch.
953            new_record_batch(
954                &[b"k1", b"k1", b"k1"],
955                &[2, 3, 4],
956                &[10, 13, 13],
957                &[OpType::Put, OpType::Put, OpType::Delete],
958                &[2, 13, 14],
959            ),
960            new_record_batch(
961                &[b"k2", b"k2"],
962                &[1, 2],
963                &[20, 20],
964                &[OpType::Put, OpType::Delete],
965                &[101, 0],
966            ),
967            new_record_batch(&[b"k2"], &[2], &[19], &[OpType::Put], &[102]),
968            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
969            // This batch won't increase the deleted rows count as it
970            // is filtered out by the previous batch.
971            new_record_batch(&[b"k3"], &[2], &[19], &[OpType::Delete], &[0]),
972        ];
973
974        // Test with filter_deleted = true
975        let expected_filter_deleted = vec![
976            new_record_batch(
977                &[b"k1", b"k1"],
978                &[1, 2],
979                &[13, 11],
980                &[OpType::Put, OpType::Put],
981                &[11, 12],
982            ),
983            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[13]),
984            new_record_batch(&[b"k2"], &[1], &[20], &[OpType::Put], &[101]),
985            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
986        ];
987
988        let iter = input.clone().into_iter().map(Ok);
989        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
990        let result = collect_iterator_results(&mut dedup_iter);
991        check_record_batches_equal(&expected_filter_deleted, &result);
992        assert_eq!(5, dedup_iter.metrics.num_unselected_rows);
993        assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
994
995        // Test with filter_deleted = false
996        let expected_no_filter = vec![
997            new_record_batch(
998                &[b"k1", b"k1"],
999                &[1, 2],
1000                &[13, 11],
1001                &[OpType::Put, OpType::Put],
1002                &[11, 12],
1003            ),
1004            new_record_batch(
1005                &[b"k1", b"k1"],
1006                &[3, 4],
1007                &[13, 13],
1008                &[OpType::Put, OpType::Delete],
1009                &[13, 14],
1010            ),
1011            new_record_batch(
1012                &[b"k2", b"k2"],
1013                &[1, 2],
1014                &[20, 20],
1015                &[OpType::Put, OpType::Delete],
1016                &[101, 0],
1017            ),
1018            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
1019        ];
1020
1021        let iter = input.clone().into_iter().map(Ok);
1022        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
1023        let result = collect_iterator_results(&mut dedup_iter);
1024        check_record_batches_equal(&expected_no_filter, &result);
1025        assert_eq!(3, dedup_iter.metrics.num_unselected_rows);
1026        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1027    }
1028
1029    #[test]
1030    fn test_flat_last_non_null_no_duplications() {
1031        let input = vec![
1032            new_record_batch(
1033                &[b"k1", b"k1"],
1034                &[1, 2],
1035                &[11, 12],
1036                &[OpType::Put, OpType::Put],
1037                &[21, 22],
1038            ),
1039            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
1040            new_record_batch(
1041                &[b"k2", b"k2"],
1042                &[1, 2],
1043                &[111, 112],
1044                &[OpType::Put, OpType::Put],
1045                &[31, 32],
1046            ),
1047        ];
1048
1049        // Test with filter_deleted = true
1050        let iter = input.clone().into_iter().map(Ok);
1051        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1052        let result = collect_iterator_results(&mut dedup_iter);
1053        check_record_batches_equal(&input, &result);
1054        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1055        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1056
1057        // Test with filter_deleted = false
1058        let iter = input.clone().into_iter().map(Ok);
1059        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1060        let result = collect_iterator_results(&mut dedup_iter);
1061        check_record_batches_equal(&input, &result);
1062        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1063        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1064    }
1065
1066    #[test]
1067    fn test_flat_last_non_null_field_merging() {
1068        let input = vec![
1069            new_record_batch_multi_fields(
1070                &[b"k1", b"k1"],
1071                &[1, 2],
1072                &[13, 11],
1073                &[OpType::Put, OpType::Put],
1074                &[(Some(11), Some(11)), (None, None)],
1075            ),
1076            // empty batch
1077            new_record_batch_multi_fields(&[], &[], &[], &[], &[]),
1078            // Duplicate with the previous batch - should merge fields
1079            new_record_batch_multi_fields(
1080                &[b"k1"],
1081                &[2],
1082                &[10],
1083                &[OpType::Put],
1084                &[(Some(12), None)],
1085            ),
1086            new_record_batch_multi_fields(
1087                &[b"k1", b"k1", b"k1"],
1088                &[2, 3, 4],
1089                &[10, 13, 13],
1090                &[OpType::Put, OpType::Put, OpType::Delete],
1091                &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
1092            ),
1093            new_record_batch_multi_fields(
1094                &[b"k2", b"k2"],
1095                &[1, 2],
1096                &[20, 20],
1097                &[OpType::Put, OpType::Delete],
1098                &[(Some(101), Some(101)), (None, None)],
1099            ),
1100            new_record_batch_multi_fields(
1101                &[b"k2"],
1102                &[2],
1103                &[19],
1104                &[OpType::Put],
1105                &[(Some(102), Some(102))],
1106            ),
1107            new_record_batch_multi_fields(
1108                &[b"k3"],
1109                &[2],
1110                &[20],
1111                &[OpType::Put],
1112                &[(Some(202), Some(202))],
1113            ),
1114            // This batch won't increase the deleted rows count as it
1115            // is filtered out by the previous batch. (All fields are null).
1116            new_record_batch_multi_fields(
1117                &[b"k3"],
1118                &[2],
1119                &[19],
1120                &[OpType::Delete],
1121                &[(None, None)],
1122            ),
1123        ];
1124
1125        // Test with filter_deleted = true
1126        let expected_filter_deleted = vec![
1127            new_record_batch_multi_fields(
1128                &[b"k1"],
1129                &[1],
1130                &[13],
1131                &[OpType::Put],
1132                &[(Some(11), Some(11))],
1133            ),
1134            new_record_batch_multi_fields(
1135                &[b"k1", b"k1"],
1136                &[2, 3],
1137                &[11, 13],
1138                &[OpType::Put, OpType::Put],
1139                &[(Some(12), Some(22)), (Some(13), None)],
1140            ),
1141            new_record_batch_multi_fields(
1142                &[b"k2"],
1143                &[1],
1144                &[20],
1145                &[OpType::Put],
1146                &[(Some(101), Some(101))],
1147            ),
1148            new_record_batch_multi_fields(
1149                &[b"k3"],
1150                &[2],
1151                &[20],
1152                &[OpType::Put],
1153                &[(Some(202), Some(202))],
1154            ),
1155        ];
1156
1157        let iter = input.clone().into_iter().map(Ok);
1158        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1159        let result = collect_iterator_results(&mut dedup_iter);
1160        check_record_batches_equal(&expected_filter_deleted, &result);
1161        assert_eq!(6, dedup_iter.metrics.num_unselected_rows);
1162        assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
1163
1164        // Test with filter_deleted = false
1165        let expected_no_filter = vec![
1166            new_record_batch_multi_fields(
1167                &[b"k1"],
1168                &[1],
1169                &[13],
1170                &[OpType::Put],
1171                &[(Some(11), Some(11))],
1172            ),
1173            new_record_batch_multi_fields(
1174                &[b"k1", b"k1", b"k1"],
1175                &[2, 3, 4],
1176                &[11, 13, 13],
1177                &[OpType::Put, OpType::Put, OpType::Delete],
1178                &[(Some(12), Some(22)), (Some(13), None), (None, Some(14))],
1179            ),
1180            new_record_batch_multi_fields(
1181                &[b"k2"],
1182                &[1],
1183                &[20],
1184                &[OpType::Put],
1185                &[(Some(101), Some(101))],
1186            ),
1187            new_record_batch_multi_fields(
1188                &[b"k2"],
1189                &[2],
1190                &[20],
1191                &[OpType::Delete],
1192                &[(None, None)],
1193            ),
1194            new_record_batch_multi_fields(
1195                &[b"k3"],
1196                &[2],
1197                &[20],
1198                &[OpType::Put],
1199                &[(Some(202), Some(202))],
1200            ),
1201        ];
1202
1203        let iter = input.clone().into_iter().map(Ok);
1204        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1205        let result = collect_iterator_results(&mut dedup_iter);
1206        check_record_batches_equal(&expected_no_filter, &result);
1207        assert_eq!(4, dedup_iter.metrics.num_unselected_rows);
1208        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1209    }
1210
1211    #[test]
1212    fn test_flat_last_non_null_skip_merge_no_null() {
1213        let input = vec![
1214            new_record_batch_multi_fields(
1215                &[b"k1", b"k1"],
1216                &[1, 2],
1217                &[13, 11],
1218                &[OpType::Put, OpType::Put],
1219                &[(Some(11), Some(11)), (Some(12), Some(12))],
1220            ),
1221            new_record_batch_multi_fields(
1222                &[b"k1"],
1223                &[2],
1224                &[10],
1225                &[OpType::Put],
1226                &[(None, Some(22))],
1227            ),
1228            new_record_batch_multi_fields(
1229                &[b"k1", b"k1"],
1230                &[2, 3],
1231                &[9, 13],
1232                &[OpType::Put, OpType::Put],
1233                &[(Some(32), None), (Some(13), Some(13))],
1234            ),
1235        ];
1236
1237        let expected = vec![
1238            new_record_batch_multi_fields(
1239                &[b"k1"],
1240                &[1],
1241                &[13],
1242                &[OpType::Put],
1243                &[(Some(11), Some(11))],
1244            ),
1245            new_record_batch_multi_fields(
1246                &[b"k1", b"k1"],
1247                &[2, 3],
1248                &[11, 13],
1249                &[OpType::Put, OpType::Put],
1250                &[(Some(12), Some(12)), (Some(13), Some(13))],
1251            ),
1252        ];
1253
1254        let iter = input.into_iter().map(Ok);
1255        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1256        let result = collect_iterator_results(&mut dedup_iter);
1257        check_record_batches_equal(&expected, &result);
1258        assert_eq!(2, dedup_iter.metrics.num_unselected_rows);
1259        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1260    }
1261
1262    #[test]
1263    fn test_flat_last_non_null_merge_null() {
1264        let input = vec![
1265            new_record_batch_multi_fields(
1266                &[b"k1", b"k1"],
1267                &[1, 2],
1268                &[13, 11],
1269                &[OpType::Put, OpType::Put],
1270                &[(Some(11), Some(11)), (None, None)],
1271            ),
1272            new_record_batch_multi_fields(
1273                &[b"k1"],
1274                &[2],
1275                &[10],
1276                &[OpType::Put],
1277                &[(None, Some(22))],
1278            ),
1279            new_record_batch_multi_fields(
1280                &[b"k1"],
1281                &[3],
1282                &[13],
1283                &[OpType::Put],
1284                &[(Some(33), None)],
1285            ),
1286        ];
1287
1288        let expected = vec![
1289            new_record_batch_multi_fields(
1290                &[b"k1"],
1291                &[1],
1292                &[13],
1293                &[OpType::Put],
1294                &[(Some(11), Some(11))],
1295            ),
1296            new_record_batch_multi_fields(
1297                &[b"k1"],
1298                &[2],
1299                &[11],
1300                &[OpType::Put],
1301                &[(None, Some(22))],
1302            ),
1303            new_record_batch_multi_fields(
1304                &[b"k1"],
1305                &[3],
1306                &[13],
1307                &[OpType::Put],
1308                &[(Some(33), None)],
1309            ),
1310        ];
1311
1312        let iter = input.into_iter().map(Ok);
1313        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1314        let result = collect_iterator_results(&mut dedup_iter);
1315        check_record_batches_equal(&expected, &result);
1316        assert_eq!(1, dedup_iter.metrics.num_unselected_rows);
1317        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1318    }
1319
1320    /// Helper function to check dedup strategy behavior directly.
1321    fn check_flat_dedup_strategy(
1322        input: &[RecordBatch],
1323        strategy: &mut dyn RecordBatchDedupStrategy,
1324        expect: &[RecordBatch],
1325    ) {
1326        let mut actual = Vec::new();
1327        let mut metrics = DedupMetrics::default();
1328        for batch in input {
1329            if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1330                actual.push(out);
1331            }
1332        }
1333        if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1334            actual.push(out);
1335        }
1336
1337        check_record_batches_equal(expect, &actual);
1338    }
1339
1340    #[test]
1341    fn test_flat_last_non_null_strategy_delete_last() {
1342        let input = vec![
1343            new_record_batch_multi_fields(
1344                &[b"k1"],
1345                &[1],
1346                &[6],
1347                &[OpType::Put],
1348                &[(Some(11), None)],
1349            ),
1350            new_record_batch_multi_fields(
1351                &[b"k1", b"k1"],
1352                &[1, 2],
1353                &[1, 7],
1354                &[OpType::Put, OpType::Put],
1355                &[(Some(1), None), (Some(22), Some(222))],
1356            ),
1357            new_record_batch_multi_fields(
1358                &[b"k1"],
1359                &[2],
1360                &[4],
1361                &[OpType::Put],
1362                &[(Some(12), None)],
1363            ),
1364            new_record_batch_multi_fields(
1365                &[b"k2", b"k2"],
1366                &[2, 3],
1367                &[2, 5],
1368                &[OpType::Put, OpType::Delete],
1369                &[(None, None), (Some(13), None)],
1370            ),
1371            new_record_batch_multi_fields(&[b"k2"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1372        ];
1373
1374        let mut strategy = FlatLastNonNull::new(1, true);
1375        check_flat_dedup_strategy(
1376            &input,
1377            &mut strategy,
1378            &[
1379                new_record_batch_multi_fields(
1380                    &[b"k1"],
1381                    &[1],
1382                    &[6],
1383                    &[OpType::Put],
1384                    &[(Some(11), None)],
1385                ),
1386                new_record_batch_multi_fields(
1387                    &[b"k1"],
1388                    &[2],
1389                    &[7],
1390                    &[OpType::Put],
1391                    &[(Some(22), Some(222))],
1392                ),
1393                new_record_batch_multi_fields(
1394                    &[b"k2"],
1395                    &[2],
1396                    &[2],
1397                    &[OpType::Put],
1398                    &[(None, None)],
1399                ),
1400            ],
1401        );
1402    }
1403
1404    #[test]
1405    fn test_flat_last_non_null_strategy_delete_one() {
1406        let input = vec![
1407            new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1408            new_record_batch_multi_fields(
1409                &[b"k2"],
1410                &[1],
1411                &[6],
1412                &[OpType::Put],
1413                &[(Some(11), None)],
1414            ),
1415        ];
1416
1417        let mut strategy = FlatLastNonNull::new(1, true);
1418        check_flat_dedup_strategy(
1419            &input,
1420            &mut strategy,
1421            &[new_record_batch_multi_fields(
1422                &[b"k2"],
1423                &[1],
1424                &[6],
1425                &[OpType::Put],
1426                &[(Some(11), None)],
1427            )],
1428        );
1429    }
1430
1431    #[test]
1432    fn test_flat_last_non_null_strategy_delete_all() {
1433        let input = vec![
1434            new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1435            new_record_batch_multi_fields(
1436                &[b"k2"],
1437                &[1],
1438                &[6],
1439                &[OpType::Delete],
1440                &[(Some(11), None)],
1441            ),
1442        ];
1443
1444        let mut strategy = FlatLastNonNull::new(1, true);
1445        check_flat_dedup_strategy(&input, &mut strategy, &[]);
1446    }
1447
1448    #[test]
1449    fn test_flat_last_non_null_strategy_same_batch() {
1450        let input = vec![
1451            new_record_batch_multi_fields(
1452                &[b"k1"],
1453                &[1],
1454                &[6],
1455                &[OpType::Put],
1456                &[(Some(11), None)],
1457            ),
1458            new_record_batch_multi_fields(
1459                &[b"k1", b"k1"],
1460                &[1, 2],
1461                &[1, 7],
1462                &[OpType::Put, OpType::Put],
1463                &[(Some(1), None), (Some(22), Some(222))],
1464            ),
1465            new_record_batch_multi_fields(
1466                &[b"k1"],
1467                &[2],
1468                &[4],
1469                &[OpType::Put],
1470                &[(Some(12), None)],
1471            ),
1472            new_record_batch_multi_fields(
1473                &[b"k1", b"k1"],
1474                &[2, 3],
1475                &[2, 5],
1476                &[OpType::Put, OpType::Put],
1477                &[(None, None), (Some(13), None)],
1478            ),
1479            new_record_batch_multi_fields(&[b"k1"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1480        ];
1481
1482        let mut strategy = FlatLastNonNull::new(1, true);
1483        check_flat_dedup_strategy(
1484            &input,
1485            &mut strategy,
1486            &[
1487                new_record_batch_multi_fields(
1488                    &[b"k1"],
1489                    &[1],
1490                    &[6],
1491                    &[OpType::Put],
1492                    &[(Some(11), None)],
1493                ),
1494                new_record_batch_multi_fields(
1495                    &[b"k1"],
1496                    &[2],
1497                    &[7],
1498                    &[OpType::Put],
1499                    &[(Some(22), Some(222))],
1500                ),
1501                new_record_batch_multi_fields(
1502                    &[b"k1"],
1503                    &[3],
1504                    &[5],
1505                    &[OpType::Put],
1506                    &[(Some(13), Some(3))],
1507                ),
1508            ],
1509        );
1510    }
1511
1512    #[test]
1513    fn test_flat_last_non_null_strategy_delete_middle() {
1514        let input = vec![
1515            new_record_batch_multi_fields(
1516                &[b"k1"],
1517                &[1],
1518                &[7],
1519                &[OpType::Put],
1520                &[(Some(11), None)],
1521            ),
1522            new_record_batch_multi_fields(&[b"k1"], &[1], &[4], &[OpType::Delete], &[(None, None)]),
1523            new_record_batch_multi_fields(
1524                &[b"k1"],
1525                &[1],
1526                &[1],
1527                &[OpType::Put],
1528                &[(Some(12), Some(1))],
1529            ),
1530            new_record_batch_multi_fields(
1531                &[b"k1"],
1532                &[2],
1533                &[8],
1534                &[OpType::Put],
1535                &[(Some(21), None)],
1536            ),
1537            new_record_batch_multi_fields(&[b"k1"], &[2], &[5], &[OpType::Delete], &[(None, None)]),
1538            new_record_batch_multi_fields(
1539                &[b"k1"],
1540                &[2],
1541                &[2],
1542                &[OpType::Put],
1543                &[(Some(22), Some(2))],
1544            ),
1545            new_record_batch_multi_fields(
1546                &[b"k1"],
1547                &[3],
1548                &[9],
1549                &[OpType::Put],
1550                &[(Some(31), None)],
1551            ),
1552            new_record_batch_multi_fields(&[b"k1"], &[3], &[6], &[OpType::Delete], &[(None, None)]),
1553            new_record_batch_multi_fields(
1554                &[b"k1"],
1555                &[3],
1556                &[3],
1557                &[OpType::Put],
1558                &[(Some(32), Some(3))],
1559            ),
1560        ];
1561
1562        let mut strategy = FlatLastNonNull::new(1, true);
1563        check_flat_dedup_strategy(
1564            &input,
1565            &mut strategy,
1566            &[
1567                new_record_batch_multi_fields(
1568                    &[b"k1"],
1569                    &[1],
1570                    &[7],
1571                    &[OpType::Put],
1572                    &[(Some(11), None)],
1573                ),
1574                new_record_batch_multi_fields(
1575                    &[b"k1"],
1576                    &[2],
1577                    &[8],
1578                    &[OpType::Put],
1579                    &[(Some(21), None)],
1580                ),
1581                new_record_batch_multi_fields(
1582                    &[b"k1"],
1583                    &[3],
1584                    &[9],
1585                    &[OpType::Put],
1586                    &[(Some(31), None)],
1587                ),
1588            ],
1589        );
1590    }
1591}