mito2/read/
flat_dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dedup implementation for flat format.
16
17use std::ops::Range;
18
19use api::v1::OpType;
20use async_stream::try_stream;
21use datatypes::arrow::array::{
22    make_comparator, Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt64Array,
23    UInt8Array,
24};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::compute::kernels::cmp::distinct;
27use datatypes::arrow::compute::kernels::partition::{partition, Partitions};
28use datatypes::arrow::compute::kernels::take::take;
29use datatypes::arrow::compute::{
30    concat_batches, filter_record_batch, take_record_batch, SortOptions, TakeOptions,
31};
32use datatypes::arrow::error::ArrowError;
33use datatypes::arrow::record_batch::RecordBatch;
34use futures::{Stream, TryStreamExt};
35use snafu::ResultExt;
36
37use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
38use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
39use crate::read::dedup::DedupMetrics;
40use crate::sst::parquet::flat_format::{
41    op_type_column_index, primary_key_column_index, time_index_column_index,
42};
43use crate::sst::parquet::format::{PrimaryKeyArray, FIXED_POS_COLUMN_NUM};
44
45/// An iterator to dedup sorted batches from an iterator based on the dedup strategy.
46pub struct FlatDedupIterator<I, S> {
47    iter: I,
48    strategy: S,
49    metrics: DedupMetrics,
50}
51
52impl<I, S> FlatDedupIterator<I, S> {
53    /// Creates a new dedup iterator.
54    pub fn new(iter: I, strategy: S) -> Self {
55        Self {
56            iter,
57            strategy,
58            metrics: DedupMetrics::default(),
59        }
60    }
61}
62
63impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> FlatDedupIterator<I, S> {
64    /// Returns the next deduplicated batch.
65    fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
66        while let Some(batch) = self.iter.next().transpose()? {
67            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
68                return Ok(Some(batch));
69            }
70        }
71
72        self.strategy.finish(&mut self.metrics)
73    }
74}
75
76impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Iterator
77    for FlatDedupIterator<I, S>
78{
79    type Item = Result<RecordBatch>;
80
81    fn next(&mut self) -> Option<Self::Item> {
82        self.fetch_next_batch().transpose()
83    }
84}
85
86/// An async reader to dedup sorted record batches from a stream based on the dedup strategy.
87pub struct FlatDedupReader<I, S> {
88    stream: I,
89    strategy: S,
90    metrics: DedupMetrics,
91}
92
93impl<I, S> FlatDedupReader<I, S> {
94    /// Creates a new dedup iterator.
95    pub fn new(stream: I, strategy: S) -> Self {
96        Self {
97            stream,
98            strategy,
99            metrics: DedupMetrics::default(),
100        }
101    }
102}
103
104impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
105    FlatDedupReader<I, S>
106{
107    /// Returns the next deduplicated batch.
108    async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
109        while let Some(batch) = self.stream.try_next().await? {
110            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
111                return Ok(Some(batch));
112            }
113        }
114
115        self.strategy.finish(&mut self.metrics)
116    }
117
118    /// Converts the reader into a stream.
119    pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
120        try_stream! {
121            while let Some(batch) = self.fetch_next_batch().await? {
122                yield batch;
123            }
124        }
125    }
126}
127
128/// Strategy to remove duplicate rows from sorted record batches.
129pub trait RecordBatchDedupStrategy: Send {
130    /// Pushes a batch to the dedup strategy.
131    /// Returns a batch if the strategy ensures there is no duplications based on
132    /// the input batch.
133    fn push_batch(
134        &mut self,
135        batch: RecordBatch,
136        metrics: &mut DedupMetrics,
137    ) -> Result<Option<RecordBatch>>;
138
139    /// Finishes the deduplication process and returns any remaining batch.
140    ///
141    /// Users must ensure that `push_batch` is called for all batches before
142    /// calling this method.
143    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>>;
144}
145
146/// Dedup strategy that keeps the row with latest sequence of each key.
147pub struct FlatLastRow {
148    /// Meta of the last row in the previous batch that has the same key
149    /// as the batch to push.
150    prev_batch: Option<BatchLastRow>,
151    /// Filter deleted rows.
152    filter_deleted: bool,
153}
154
155impl FlatLastRow {
156    /// Creates a new strategy with the given `filter_deleted` flag.
157    pub fn new(filter_deleted: bool) -> Self {
158        Self {
159            prev_batch: None,
160            filter_deleted,
161        }
162    }
163
164    /// Remove duplications from the batch without considering previous rows.
165    fn dedup_one_batch(batch: RecordBatch) -> Result<RecordBatch> {
166        let num_rows = batch.num_rows();
167        if num_rows < 2 {
168            return Ok(batch);
169        }
170
171        let num_columns = batch.num_columns();
172        let timestamps = batch.column(time_index_column_index(num_columns));
173        // Checks duplications based on the timestamp.
174        let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
175        if mask.count_set_bits() == num_rows - 1 {
176            // Fast path: No duplication.
177            return Ok(batch);
178        }
179
180        // The batch has duplicated timestamps, but it doesn't mean it must
181        // has duplicated rows.
182        // Partitions the batch by the primary key and time index.
183        let columns: Vec<_> = [
184            primary_key_column_index(num_columns),
185            time_index_column_index(num_columns),
186        ]
187        .iter()
188        .map(|index| batch.column(*index).clone())
189        .collect();
190        let partitions = partition(&columns).context(ComputeArrowSnafu)?;
191
192        Self::dedup_by_partitions(batch, &partitions)
193    }
194
195    /// Remove duplications for each partition.
196    fn dedup_by_partitions(batch: RecordBatch, partitions: &Partitions) -> Result<RecordBatch> {
197        let ranges = partitions.ranges();
198        // Each range at least has 1 row.
199        let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
200        if num_duplications == 0 {
201            // Fast path, no duplications.
202            return Ok(batch);
203        }
204
205        // Always takes the first row in each range.
206        let take_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
207        take_record_batch(&batch, &take_indices).context(ComputeArrowSnafu)
208    }
209}
210
211impl RecordBatchDedupStrategy for FlatLastRow {
212    fn push_batch(
213        &mut self,
214        batch: RecordBatch,
215        metrics: &mut DedupMetrics,
216    ) -> Result<Option<RecordBatch>> {
217        if batch.num_rows() == 0 {
218            return Ok(None);
219        }
220
221        // Dedup current batch to ensure no duplication before we checking the previous row.
222        let row_before_dedup = batch.num_rows();
223        let mut batch = Self::dedup_one_batch(batch)?;
224
225        if let Some(prev_batch) = &self.prev_batch {
226            // If we have previous batch.
227            if prev_batch.is_last_row_duplicated(&batch) {
228                // Duplicated with the last batch, skip the first row.
229                batch = batch.slice(1, batch.num_rows() - 1);
230            }
231        }
232        metrics.num_unselected_rows += row_before_dedup - batch.num_rows();
233
234        let Some(batch_last_row) = BatchLastRow::try_new(batch.clone()) else {
235            // The batch after dedup is empty.
236            // We don't need to update `prev_batch` because they have the same
237            // key and timestamp.
238            return Ok(None);
239        };
240
241        // Store current batch to `prev_batch` so we could compare the next batch
242        // with this batch. We store batch before filtering it as rows with `OpType::Delete`
243        // would be removed from the batch after filter, then we may store an incorrect `last row`
244        // of previous batch.
245        // Safety: We checked the batch is not empty before.
246        self.prev_batch = Some(batch_last_row);
247
248        // Filters deleted rows at last.
249        maybe_filter_deleted(batch, self.filter_deleted, metrics)
250    }
251
252    fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
253        Ok(None)
254    }
255}
256
257/// Dedup strategy that keeps the last non-null field for the same key.
258pub struct FlatLastNonNull {
259    /// The start index of field columns:
260    field_column_start: usize,
261    /// Filter deleted rows.
262    filter_deleted: bool,
263    /// Buffered batch to check whether the next batch have duplicated rows with this batch.
264    /// Fields in the last row of this batch may be updated by the next batch.
265    /// The buffered batch should contain no duplication.
266    buffer: Option<BatchLastRow>,
267    /// Whether the last row range contains a delete operation.
268    /// If so, we don't need to update null fields.
269    contains_delete: bool,
270}
271
272impl RecordBatchDedupStrategy for FlatLastNonNull {
273    fn push_batch(
274        &mut self,
275        batch: RecordBatch,
276        metrics: &mut DedupMetrics,
277    ) -> Result<Option<RecordBatch>> {
278        if batch.num_rows() == 0 {
279            return Ok(None);
280        }
281
282        let row_before_dedup = batch.num_rows();
283
284        let Some(buffer) = self.buffer.take() else {
285            // If the buffer is None, dedup the batch, put the batch into the buffer and return.
286            // There is no previous batch with the same key, we can pass contains_delete as false.
287            let (record_batch, contains_delete) =
288                Self::dedup_one_batch(batch, self.field_column_start, false)?;
289            metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
290            self.buffer = BatchLastRow::try_new(record_batch);
291            self.contains_delete = contains_delete;
292
293            return Ok(None);
294        };
295
296        if !buffer.is_last_row_duplicated(&batch) {
297            // The first row of batch has different key from the buffer.
298            // We can replace the buffer with the new batch.
299            // Dedup the batch.
300            // There is no previous batch with the same key, we can pass contains_delete as false.
301            let (record_batch, contains_delete) =
302                Self::dedup_one_batch(batch, self.field_column_start, false)?;
303            metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
304            debug_assert!(record_batch.num_rows() > 0);
305            self.buffer = BatchLastRow::try_new(record_batch);
306            self.contains_delete = contains_delete;
307
308            return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
309        }
310
311        // The next batch has duplicated rows.
312        // We can return rows except the last row in the buffer.
313        let output = if buffer.last_batch.num_rows() > 1 {
314            let dedup_batch = buffer.last_batch.slice(0, buffer.last_batch.num_rows() - 1);
315            debug_assert_eq!(buffer.last_batch.num_rows() - 1, dedup_batch.num_rows());
316
317            maybe_filter_deleted(dedup_batch, self.filter_deleted, metrics)?
318        } else {
319            None
320        };
321        let last_row = buffer.last_batch.slice(buffer.last_batch.num_rows() - 1, 1);
322
323        // We concat the last row with the next batch.
324        let schema = batch.schema();
325        let merged = concat_batches(&schema, &[last_row, batch]).context(ComputeArrowSnafu)?;
326        let merged_row_count = merged.num_rows();
327        // Dedup the merged batch and update the buffer.
328        let (record_batch, contains_delete) =
329            Self::dedup_one_batch(merged, self.field_column_start, self.contains_delete)?;
330        metrics.num_unselected_rows += merged_row_count - record_batch.num_rows();
331        debug_assert!(record_batch.num_rows() > 0);
332        self.buffer = BatchLastRow::try_new(record_batch);
333        self.contains_delete = contains_delete;
334
335        Ok(output)
336    }
337
338    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
339        let Some(buffer) = self.buffer.take() else {
340            return Ok(None);
341        };
342
343        maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics)
344    }
345}
346
347impl FlatLastNonNull {
348    /// Creates a new strategy with the given `filter_deleted` flag.
349    pub fn new(field_column_start: usize, filter_deleted: bool) -> Self {
350        Self {
351            field_column_start,
352            filter_deleted,
353            buffer: None,
354            contains_delete: false,
355        }
356    }
357
358    /// Remove duplications from the batch without considering the previous and next rows.
359    /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows.
360    fn dedup_one_batch(
361        batch: RecordBatch,
362        field_column_start: usize,
363        prev_batch_contains_delete: bool,
364    ) -> Result<(RecordBatch, bool)> {
365        // Get op type array for checking delete operations
366        let op_type_column = batch
367            .column(op_type_column_index(batch.num_columns()))
368            .clone();
369        let op_types = op_type_column
370            .as_any()
371            .downcast_ref::<UInt8Array>()
372            .unwrap();
373        let num_rows = batch.num_rows();
374        if num_rows < 2 {
375            let contains_delete = if num_rows > 0 {
376                op_types.value(0) == OpType::Delete as u8
377            } else {
378                false
379            };
380            return Ok((batch, contains_delete));
381        }
382
383        let num_columns = batch.num_columns();
384        let timestamps = batch.column(time_index_column_index(num_columns));
385        // Checks duplications based on the timestamp.
386        let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
387        if mask.count_set_bits() == num_rows - 1 {
388            let contains_delete = op_types.value(num_rows - 1) == OpType::Delete as u8;
389            // Fast path: No duplication.
390            return Ok((batch, contains_delete));
391        }
392
393        // The batch has duplicated timestamps, but it doesn't mean it must
394        // has duplicated rows.
395        // Partitions the batch by the primary key and time index.
396        let columns: Vec<_> = [
397            primary_key_column_index(num_columns),
398            time_index_column_index(num_columns),
399        ]
400        .iter()
401        .map(|index| batch.column(*index).clone())
402        .collect();
403        let partitions = partition(&columns).context(ComputeArrowSnafu)?;
404
405        Self::dedup_by_partitions(
406            batch,
407            &partitions,
408            field_column_start,
409            op_types,
410            prev_batch_contains_delete,
411        )
412    }
413
414    /// Remove depulications for each partition.
415    /// Returns a tuple containing the deduplicated batch and a boolean indicating whether the last range contains deleted rows.
416    fn dedup_by_partitions(
417        batch: RecordBatch,
418        partitions: &Partitions,
419        field_column_start: usize,
420        op_types: &UInt8Array,
421        first_range_contains_delete: bool,
422    ) -> Result<(RecordBatch, bool)> {
423        let ranges = partitions.ranges();
424        let contains_delete = Self::last_range_has_delete(&ranges, op_types);
425
426        // Each range at least has 1 row.
427        let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
428        if num_duplications == 0 {
429            // Fast path, no duplication.
430            return Ok((batch, contains_delete));
431        }
432
433        let field_column_end = batch.num_columns() - FIXED_POS_COLUMN_NUM;
434        let take_options = Some(TakeOptions {
435            check_bounds: false,
436        });
437        // Always takes the first value for non-field columns in each range.
438        let non_field_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
439        let new_columns = batch
440            .columns()
441            .iter()
442            .enumerate()
443            .map(|(col_idx, column)| {
444                if col_idx >= field_column_start && col_idx < field_column_end {
445                    let field_indices = Self::compute_field_indices(
446                        &ranges,
447                        column,
448                        op_types,
449                        first_range_contains_delete,
450                    );
451                    take(column, &field_indices, take_options.clone()).context(ComputeArrowSnafu)
452                } else {
453                    take(column, &non_field_indices, take_options.clone())
454                        .context(ComputeArrowSnafu)
455                }
456            })
457            .collect::<Result<Vec<ArrayRef>>>()?;
458
459        let record_batch =
460            RecordBatch::try_new(batch.schema(), new_columns).context(NewRecordBatchSnafu)?;
461        Ok((record_batch, contains_delete))
462    }
463
464    /// Returns an array of indices of the latest non null value for
465    /// each input range.
466    /// If all values in a range are null, the returned index is unspecific.
467    /// Stops when encountering a delete operation and ignores all subsequent rows.
468    fn compute_field_indices(
469        ranges: &[Range<usize>],
470        field_array: &ArrayRef,
471        op_types: &UInt8Array,
472        first_range_contains_delete: bool,
473    ) -> UInt64Array {
474        ranges
475            .iter()
476            .enumerate()
477            .map(|(range_idx, r)| {
478                let mut value_index = r.start as u64;
479                if range_idx == 0 && first_range_contains_delete {
480                    return Some(value_index);
481                }
482
483                // Iterate through the range to find the first valid non-null value
484                // but stop if we encounter a delete operation.
485                for i in r.clone() {
486                    if op_types.value(i) == OpType::Delete as u8 {
487                        break;
488                    }
489                    if field_array.is_valid(i) {
490                        value_index = i as u64;
491                        break;
492                    }
493                }
494
495                Some(value_index)
496            })
497            .collect()
498    }
499
500    /// Checks whether the last range contains a delete operation.
501    fn last_range_has_delete(ranges: &[Range<usize>], op_types: &UInt8Array) -> bool {
502        if let Some(last_range) = ranges.last() {
503            last_range
504                .clone()
505                .any(|i| op_types.value(i) == OpType::Delete as u8)
506        } else {
507            false
508        }
509    }
510}
511
512/// State of the batch with the last row for dedup.
513struct BatchLastRow {
514    /// The record batch that contains the last row.
515    /// It must has at least one row.
516    last_batch: RecordBatch,
517    /// Primary keys of the last batch.
518    primary_key: PrimaryKeyArray,
519    /// Last timestamp value.
520    timestamp: i64,
521}
522
523impl BatchLastRow {
524    /// Returns a new [BatchLastRow] if the record batch is not empty.
525    fn try_new(record_batch: RecordBatch) -> Option<Self> {
526        if record_batch.num_rows() > 0 {
527            let num_columns = record_batch.num_columns();
528            let primary_key = record_batch
529                .column(primary_key_column_index(num_columns))
530                .as_any()
531                .downcast_ref::<PrimaryKeyArray>()
532                .unwrap()
533                .clone();
534            let timestamp_array = record_batch.column(time_index_column_index(num_columns));
535            let timestamp = timestamp_value(timestamp_array, timestamp_array.len() - 1);
536
537            Some(Self {
538                last_batch: record_batch,
539                primary_key,
540                timestamp,
541            })
542        } else {
543            None
544        }
545    }
546
547    /// Returns true if the first row of the input `batch` is duplicated with the last row.
548    fn is_last_row_duplicated(&self, batch: &RecordBatch) -> bool {
549        if batch.num_rows() == 0 {
550            return false;
551        }
552
553        // The first timestamp in the batch.
554        let batch_timestamp = timestamp_value(
555            batch.column(time_index_column_index(batch.num_columns())),
556            0,
557        );
558        if batch_timestamp != self.timestamp {
559            return false;
560        }
561
562        let last_key = primary_key_at(&self.primary_key, self.last_batch.num_rows() - 1);
563        let primary_key = batch
564            .column(primary_key_column_index(batch.num_columns()))
565            .as_any()
566            .downcast_ref::<PrimaryKeyArray>()
567            .unwrap();
568        // Primary key of the first row in the batch.
569        let batch_key = primary_key_at(primary_key, 0);
570
571        last_key == batch_key
572    }
573}
574
575// TODO(yingwen): We only compares timestamp arrays, we can modify this function
576// to simplify the comparator.
577// Port from https://github.com/apache/arrow-rs/blob/55.0.0/arrow-ord/src/partition.rs#L155-L168
578/// Returns a mask with bits set whenever the value or nullability changes
579fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
580    let slice_len = v.len() - 1;
581    let v1 = v.slice(0, slice_len);
582    let v2 = v.slice(1, slice_len);
583
584    if !v.data_type().is_nested() {
585        return Ok(distinct(&v1, &v2)?.values().clone());
586    }
587    // Given that we're only comparing values, null ordering in the input or
588    // sort options do not matter.
589    let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
590    Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
591}
592
593/// Filters deleted rows from the record batch if `filter_deleted` is true.
594fn maybe_filter_deleted(
595    record_batch: RecordBatch,
596    filter_deleted: bool,
597    metrics: &mut DedupMetrics,
598) -> Result<Option<RecordBatch>> {
599    if !filter_deleted {
600        return Ok(Some(record_batch));
601    }
602    let batch = filter_deleted_from_batch(record_batch, metrics)?;
603    // Skips empty batches.
604    if batch.num_rows() == 0 {
605        return Ok(None);
606    }
607    Ok(Some(batch))
608}
609
610/// Removes deleted rows from the batch and updates metrics.
611fn filter_deleted_from_batch(
612    batch: RecordBatch,
613    metrics: &mut DedupMetrics,
614) -> Result<RecordBatch> {
615    let num_rows = batch.num_rows();
616    let op_type_column = batch.column(op_type_column_index(batch.num_columns()));
617    // Safety: The column should be op type.
618    let op_types = op_type_column
619        .as_any()
620        .downcast_ref::<UInt8Array>()
621        .unwrap();
622    let has_delete = op_types
623        .values()
624        .iter()
625        .any(|op_type| *op_type != OpType::Put as u8);
626    if !has_delete {
627        return Ok(batch);
628    }
629
630    let mut builder = BooleanBufferBuilder::new(op_types.len());
631    for op_type in op_types.values() {
632        if *op_type == OpType::Delete as u8 {
633            builder.append(false);
634        } else {
635            builder.append(true);
636        }
637    }
638    let predicate = BooleanArray::new(builder.into(), None);
639    let new_batch = filter_record_batch(&batch, &predicate).context(ComputeArrowSnafu)?;
640    let num_deleted = num_rows - new_batch.num_rows();
641    metrics.num_deleted_rows += num_deleted;
642    metrics.num_unselected_rows += num_deleted;
643
644    Ok(new_batch)
645}
646
647/// Gets the primary key at `index`.
648fn primary_key_at(primary_key: &PrimaryKeyArray, index: usize) -> &[u8] {
649    let key = primary_key.keys().value(index);
650    let binary_values = primary_key
651        .values()
652        .as_any()
653        .downcast_ref::<BinaryArray>()
654        .unwrap();
655    binary_values.value(key as usize)
656}
657
658/// Gets the timestamp value from the timestamp array.
659///
660/// # Panics
661/// Panics if the array is not a timestamp array or
662/// the index is out of bound.
663pub(crate) fn timestamp_value(array: &ArrayRef, idx: usize) -> i64 {
664    timestamp_array_to_i64_slice(array)[idx]
665}
666
667#[cfg(test)]
668mod tests {
669    use std::sync::Arc;
670
671    use api::v1::OpType;
672    use datatypes::arrow::array::{
673        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
674        TimestampMillisecondArray, UInt64Array, UInt8Array,
675    };
676    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
677    use datatypes::arrow::record_batch::RecordBatch;
678
679    use super::*;
680
681    /// Creates a test RecordBatch in flat format with given parameters.
682    fn new_record_batch(
683        primary_keys: &[&[u8]],
684        timestamps: &[i64],
685        sequences: &[u64],
686        op_types: &[OpType],
687        fields: &[u64],
688    ) -> RecordBatch {
689        let num_rows = timestamps.len();
690        debug_assert_eq!(sequences.len(), num_rows);
691        debug_assert_eq!(op_types.len(), num_rows);
692        debug_assert_eq!(fields.len(), num_rows);
693        debug_assert_eq!(primary_keys.len(), num_rows);
694
695        let columns: Vec<ArrayRef> = vec![
696            // k0 column (primary key as string dictionary)
697            build_test_pk_string_dict_array(primary_keys),
698            // field0 column
699            Arc::new(Int64Array::from_iter(
700                fields.iter().map(|v| Some(*v as i64)),
701            )),
702            // ts column (time index)
703            Arc::new(TimestampMillisecondArray::from_iter_values(
704                timestamps.iter().copied(),
705            )),
706            // __primary_key column
707            build_test_pk_array(primary_keys),
708            // __sequence column
709            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
710            // __op_type column
711            Arc::new(UInt8Array::from_iter_values(
712                op_types.iter().map(|v| *v as u8),
713            )),
714        ];
715
716        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
717    }
718
719    /// Creates a test RecordBatch in flat format with multiple fields for testing FlatLastNonNull.
720    fn new_record_batch_multi_fields(
721        primary_keys: &[&[u8]],
722        timestamps: &[i64],
723        sequences: &[u64],
724        op_types: &[OpType],
725        fields: &[(Option<u64>, Option<u64>)],
726    ) -> RecordBatch {
727        let num_rows = timestamps.len();
728        debug_assert_eq!(sequences.len(), num_rows);
729        debug_assert_eq!(op_types.len(), num_rows);
730        debug_assert_eq!(fields.len(), num_rows);
731        debug_assert_eq!(primary_keys.len(), num_rows);
732
733        let columns: Vec<ArrayRef> = vec![
734            // k0 column (primary key as string dictionary)
735            build_test_pk_string_dict_array(primary_keys),
736            // field0 column
737            Arc::new(Int64Array::from_iter(
738                fields.iter().map(|field| field.0.map(|v| v as i64)),
739            )),
740            // field1 column
741            Arc::new(Int64Array::from_iter(
742                fields.iter().map(|field| field.1.map(|v| v as i64)),
743            )),
744            // ts column (time index)
745            Arc::new(TimestampMillisecondArray::from_iter_values(
746                timestamps.iter().copied(),
747            )),
748            // __primary_key column
749            build_test_pk_array(primary_keys),
750            // __sequence column
751            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
752            // __op_type column
753            Arc::new(UInt8Array::from_iter_values(
754                op_types.iter().map(|v| *v as u8),
755            )),
756        ];
757
758        RecordBatch::try_new(build_test_multi_field_schema(), columns).unwrap()
759    }
760
761    /// Creates a test string dictionary primary key array for given primary keys.
762    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
763        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
764        for &pk in primary_keys {
765            let pk_str = std::str::from_utf8(pk).unwrap();
766            builder.append(pk_str).unwrap();
767        }
768        Arc::new(builder.finish())
769    }
770
771    /// Creates a test primary key array for given primary keys.
772    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
773        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
774        for &pk in primary_keys {
775            builder.append(pk).unwrap();
776        }
777        Arc::new(builder.finish())
778    }
779
780    /// Builds the arrow schema for test flat format.
781    fn build_test_flat_schema() -> SchemaRef {
782        let fields = vec![
783            Field::new(
784                "k0",
785                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
786                false,
787            ),
788            Field::new("field0", DataType::Int64, true),
789            Field::new(
790                "ts",
791                DataType::Timestamp(TimeUnit::Millisecond, None),
792                false,
793            ),
794            Field::new(
795                "__primary_key",
796                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
797                false,
798            ),
799            Field::new("__sequence", DataType::UInt64, false),
800            Field::new("__op_type", DataType::UInt8, false),
801        ];
802        Arc::new(Schema::new(fields))
803    }
804
805    /// Builds the arrow schema for test flat format with multiple fields.
806    fn build_test_multi_field_schema() -> SchemaRef {
807        let fields = vec![
808            Field::new(
809                "k0",
810                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
811                false,
812            ),
813            Field::new("field0", DataType::Int64, true),
814            Field::new("field1", DataType::Int64, true),
815            Field::new(
816                "ts",
817                DataType::Timestamp(TimeUnit::Millisecond, None),
818                false,
819            ),
820            Field::new(
821                "__primary_key",
822                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
823                false,
824            ),
825            Field::new("__sequence", DataType::UInt64, false),
826            Field::new("__op_type", DataType::UInt8, false),
827        ];
828        Arc::new(Schema::new(fields))
829    }
830
831    /// Asserts that two RecordBatch vectors are equal.
832    fn check_record_batches_equal(expected: &[RecordBatch], actual: &[RecordBatch]) {
833        for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() {
834            assert_eq!(exp, act, "RecordBatch {} differs", i);
835        }
836        assert_eq!(
837            expected.len(),
838            actual.len(),
839            "Number of batches don't match"
840        );
841    }
842
843    /// Helper function to collect iterator results.
844    fn collect_iterator_results<I>(iter: I) -> Vec<RecordBatch>
845    where
846        I: Iterator<Item = Result<RecordBatch>>,
847    {
848        iter.map(|result| result.unwrap()).collect()
849    }
850
851    #[test]
852    fn test_flat_last_row_no_duplications() {
853        let input = vec![
854            new_record_batch(
855                &[b"k1", b"k1"],
856                &[1, 2],
857                &[11, 12],
858                &[OpType::Put, OpType::Put],
859                &[21, 22],
860            ),
861            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
862            new_record_batch(
863                &[b"k2", b"k2"],
864                &[1, 2],
865                &[111, 112],
866                &[OpType::Put, OpType::Put],
867                &[31, 32],
868            ),
869        ];
870
871        // Test with filter_deleted = true
872        let iter = input.clone().into_iter().map(Ok);
873        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
874        let result = collect_iterator_results(&mut dedup_iter);
875        check_record_batches_equal(&input, &result);
876        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
877        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
878
879        // Test with filter_deleted = false
880        let iter = input.clone().into_iter().map(Ok);
881        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
882        let result = collect_iterator_results(&mut dedup_iter);
883        check_record_batches_equal(&input, &result);
884        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
885        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
886    }
887
888    #[test]
889    fn test_flat_last_row_duplications() {
890        let input = vec![
891            new_record_batch(
892                &[b"k1", b"k1"],
893                &[1, 2],
894                &[13, 11],
895                &[OpType::Put, OpType::Put],
896                &[11, 12],
897            ),
898            // empty batch.
899            new_record_batch(&[], &[], &[], &[], &[]),
900            // Duplicate with the previous batch.
901            new_record_batch(
902                &[b"k1", b"k1", b"k1"],
903                &[2, 3, 4],
904                &[10, 13, 13],
905                &[OpType::Put, OpType::Put, OpType::Delete],
906                &[2, 13, 14],
907            ),
908            new_record_batch(
909                &[b"k2", b"k2"],
910                &[1, 2],
911                &[20, 20],
912                &[OpType::Put, OpType::Delete],
913                &[101, 0],
914            ),
915            new_record_batch(&[b"k2"], &[2], &[19], &[OpType::Put], &[102]),
916            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
917            // This batch won't increase the deleted rows count as it
918            // is filtered out by the previous batch.
919            new_record_batch(&[b"k3"], &[2], &[19], &[OpType::Delete], &[0]),
920        ];
921
922        // Test with filter_deleted = true
923        let expected_filter_deleted = vec![
924            new_record_batch(
925                &[b"k1", b"k1"],
926                &[1, 2],
927                &[13, 11],
928                &[OpType::Put, OpType::Put],
929                &[11, 12],
930            ),
931            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[13]),
932            new_record_batch(&[b"k2"], &[1], &[20], &[OpType::Put], &[101]),
933            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
934        ];
935
936        let iter = input.clone().into_iter().map(Ok);
937        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
938        let result = collect_iterator_results(&mut dedup_iter);
939        check_record_batches_equal(&expected_filter_deleted, &result);
940        assert_eq!(5, dedup_iter.metrics.num_unselected_rows);
941        assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
942
943        // Test with filter_deleted = false
944        let expected_no_filter = vec![
945            new_record_batch(
946                &[b"k1", b"k1"],
947                &[1, 2],
948                &[13, 11],
949                &[OpType::Put, OpType::Put],
950                &[11, 12],
951            ),
952            new_record_batch(
953                &[b"k1", b"k1"],
954                &[3, 4],
955                &[13, 13],
956                &[OpType::Put, OpType::Delete],
957                &[13, 14],
958            ),
959            new_record_batch(
960                &[b"k2", b"k2"],
961                &[1, 2],
962                &[20, 20],
963                &[OpType::Put, OpType::Delete],
964                &[101, 0],
965            ),
966            new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
967        ];
968
969        let iter = input.clone().into_iter().map(Ok);
970        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
971        let result = collect_iterator_results(&mut dedup_iter);
972        check_record_batches_equal(&expected_no_filter, &result);
973        assert_eq!(3, dedup_iter.metrics.num_unselected_rows);
974        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
975    }
976
977    #[test]
978    fn test_flat_last_non_null_no_duplications() {
979        let input = vec![
980            new_record_batch(
981                &[b"k1", b"k1"],
982                &[1, 2],
983                &[11, 12],
984                &[OpType::Put, OpType::Put],
985                &[21, 22],
986            ),
987            new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
988            new_record_batch(
989                &[b"k2", b"k2"],
990                &[1, 2],
991                &[111, 112],
992                &[OpType::Put, OpType::Put],
993                &[31, 32],
994            ),
995        ];
996
997        // Test with filter_deleted = true
998        let iter = input.clone().into_iter().map(Ok);
999        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1000        let result = collect_iterator_results(&mut dedup_iter);
1001        check_record_batches_equal(&input, &result);
1002        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1003        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1004
1005        // Test with filter_deleted = false
1006        let iter = input.clone().into_iter().map(Ok);
1007        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1008        let result = collect_iterator_results(&mut dedup_iter);
1009        check_record_batches_equal(&input, &result);
1010        assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1011        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1012    }
1013
1014    #[test]
1015    fn test_flat_last_non_null_field_merging() {
1016        let input = vec![
1017            new_record_batch_multi_fields(
1018                &[b"k1", b"k1"],
1019                &[1, 2],
1020                &[13, 11],
1021                &[OpType::Put, OpType::Put],
1022                &[(Some(11), Some(11)), (None, None)],
1023            ),
1024            // empty batch
1025            new_record_batch_multi_fields(&[], &[], &[], &[], &[]),
1026            // Duplicate with the previous batch - should merge fields
1027            new_record_batch_multi_fields(
1028                &[b"k1"],
1029                &[2],
1030                &[10],
1031                &[OpType::Put],
1032                &[(Some(12), None)],
1033            ),
1034            new_record_batch_multi_fields(
1035                &[b"k1", b"k1", b"k1"],
1036                &[2, 3, 4],
1037                &[10, 13, 13],
1038                &[OpType::Put, OpType::Put, OpType::Delete],
1039                &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
1040            ),
1041            new_record_batch_multi_fields(
1042                &[b"k2", b"k2"],
1043                &[1, 2],
1044                &[20, 20],
1045                &[OpType::Put, OpType::Delete],
1046                &[(Some(101), Some(101)), (None, None)],
1047            ),
1048            new_record_batch_multi_fields(
1049                &[b"k2"],
1050                &[2],
1051                &[19],
1052                &[OpType::Put],
1053                &[(Some(102), Some(102))],
1054            ),
1055            new_record_batch_multi_fields(
1056                &[b"k3"],
1057                &[2],
1058                &[20],
1059                &[OpType::Put],
1060                &[(Some(202), Some(202))],
1061            ),
1062            // This batch won't increase the deleted rows count as it
1063            // is filtered out by the previous batch. (All fields are null).
1064            new_record_batch_multi_fields(
1065                &[b"k3"],
1066                &[2],
1067                &[19],
1068                &[OpType::Delete],
1069                &[(None, None)],
1070            ),
1071        ];
1072
1073        // Test with filter_deleted = true
1074        let expected_filter_deleted = vec![
1075            new_record_batch_multi_fields(
1076                &[b"k1"],
1077                &[1],
1078                &[13],
1079                &[OpType::Put],
1080                &[(Some(11), Some(11))],
1081            ),
1082            new_record_batch_multi_fields(
1083                &[b"k1", b"k1"],
1084                &[2, 3],
1085                &[11, 13],
1086                &[OpType::Put, OpType::Put],
1087                &[(Some(12), Some(22)), (Some(13), None)],
1088            ),
1089            new_record_batch_multi_fields(
1090                &[b"k2"],
1091                &[1],
1092                &[20],
1093                &[OpType::Put],
1094                &[(Some(101), Some(101))],
1095            ),
1096            new_record_batch_multi_fields(
1097                &[b"k3"],
1098                &[2],
1099                &[20],
1100                &[OpType::Put],
1101                &[(Some(202), Some(202))],
1102            ),
1103        ];
1104
1105        let iter = input.clone().into_iter().map(Ok);
1106        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1107        let result = collect_iterator_results(&mut dedup_iter);
1108        check_record_batches_equal(&expected_filter_deleted, &result);
1109        assert_eq!(6, dedup_iter.metrics.num_unselected_rows);
1110        assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
1111
1112        // Test with filter_deleted = false
1113        let expected_no_filter = vec![
1114            new_record_batch_multi_fields(
1115                &[b"k1"],
1116                &[1],
1117                &[13],
1118                &[OpType::Put],
1119                &[(Some(11), Some(11))],
1120            ),
1121            new_record_batch_multi_fields(
1122                &[b"k1", b"k1", b"k1"],
1123                &[2, 3, 4],
1124                &[11, 13, 13],
1125                &[OpType::Put, OpType::Put, OpType::Delete],
1126                &[(Some(12), Some(22)), (Some(13), None), (None, Some(14))],
1127            ),
1128            new_record_batch_multi_fields(
1129                &[b"k2"],
1130                &[1],
1131                &[20],
1132                &[OpType::Put],
1133                &[(Some(101), Some(101))],
1134            ),
1135            new_record_batch_multi_fields(
1136                &[b"k2"],
1137                &[2],
1138                &[20],
1139                &[OpType::Delete],
1140                &[(None, None)],
1141            ),
1142            new_record_batch_multi_fields(
1143                &[b"k3"],
1144                &[2],
1145                &[20],
1146                &[OpType::Put],
1147                &[(Some(202), Some(202))],
1148            ),
1149        ];
1150
1151        let iter = input.clone().into_iter().map(Ok);
1152        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1153        let result = collect_iterator_results(&mut dedup_iter);
1154        check_record_batches_equal(&expected_no_filter, &result);
1155        assert_eq!(4, dedup_iter.metrics.num_unselected_rows);
1156        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1157    }
1158
1159    #[test]
1160    fn test_flat_last_non_null_skip_merge_no_null() {
1161        let input = vec![
1162            new_record_batch_multi_fields(
1163                &[b"k1", b"k1"],
1164                &[1, 2],
1165                &[13, 11],
1166                &[OpType::Put, OpType::Put],
1167                &[(Some(11), Some(11)), (Some(12), Some(12))],
1168            ),
1169            new_record_batch_multi_fields(
1170                &[b"k1"],
1171                &[2],
1172                &[10],
1173                &[OpType::Put],
1174                &[(None, Some(22))],
1175            ),
1176            new_record_batch_multi_fields(
1177                &[b"k1", b"k1"],
1178                &[2, 3],
1179                &[9, 13],
1180                &[OpType::Put, OpType::Put],
1181                &[(Some(32), None), (Some(13), Some(13))],
1182            ),
1183        ];
1184
1185        let expected = vec![
1186            new_record_batch_multi_fields(
1187                &[b"k1"],
1188                &[1],
1189                &[13],
1190                &[OpType::Put],
1191                &[(Some(11), Some(11))],
1192            ),
1193            new_record_batch_multi_fields(
1194                &[b"k1", b"k1"],
1195                &[2, 3],
1196                &[11, 13],
1197                &[OpType::Put, OpType::Put],
1198                &[(Some(12), Some(12)), (Some(13), Some(13))],
1199            ),
1200        ];
1201
1202        let iter = input.into_iter().map(Ok);
1203        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1204        let result = collect_iterator_results(&mut dedup_iter);
1205        check_record_batches_equal(&expected, &result);
1206        assert_eq!(2, dedup_iter.metrics.num_unselected_rows);
1207        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1208    }
1209
1210    #[test]
1211    fn test_flat_last_non_null_merge_null() {
1212        let input = vec![
1213            new_record_batch_multi_fields(
1214                &[b"k1", b"k1"],
1215                &[1, 2],
1216                &[13, 11],
1217                &[OpType::Put, OpType::Put],
1218                &[(Some(11), Some(11)), (None, None)],
1219            ),
1220            new_record_batch_multi_fields(
1221                &[b"k1"],
1222                &[2],
1223                &[10],
1224                &[OpType::Put],
1225                &[(None, Some(22))],
1226            ),
1227            new_record_batch_multi_fields(
1228                &[b"k1"],
1229                &[3],
1230                &[13],
1231                &[OpType::Put],
1232                &[(Some(33), None)],
1233            ),
1234        ];
1235
1236        let expected = vec![
1237            new_record_batch_multi_fields(
1238                &[b"k1"],
1239                &[1],
1240                &[13],
1241                &[OpType::Put],
1242                &[(Some(11), Some(11))],
1243            ),
1244            new_record_batch_multi_fields(
1245                &[b"k1"],
1246                &[2],
1247                &[11],
1248                &[OpType::Put],
1249                &[(None, Some(22))],
1250            ),
1251            new_record_batch_multi_fields(
1252                &[b"k1"],
1253                &[3],
1254                &[13],
1255                &[OpType::Put],
1256                &[(Some(33), None)],
1257            ),
1258        ];
1259
1260        let iter = input.into_iter().map(Ok);
1261        let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1262        let result = collect_iterator_results(&mut dedup_iter);
1263        check_record_batches_equal(&expected, &result);
1264        assert_eq!(1, dedup_iter.metrics.num_unselected_rows);
1265        assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1266    }
1267
1268    /// Helper function to check dedup strategy behavior directly.
1269    fn check_flat_dedup_strategy(
1270        input: &[RecordBatch],
1271        strategy: &mut dyn RecordBatchDedupStrategy,
1272        expect: &[RecordBatch],
1273    ) {
1274        let mut actual = Vec::new();
1275        let mut metrics = DedupMetrics::default();
1276        for batch in input {
1277            if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1278                actual.push(out);
1279            }
1280        }
1281        if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1282            actual.push(out);
1283        }
1284
1285        check_record_batches_equal(expect, &actual);
1286    }
1287
1288    #[test]
1289    fn test_flat_last_non_null_strategy_delete_last() {
1290        let input = vec![
1291            new_record_batch_multi_fields(
1292                &[b"k1"],
1293                &[1],
1294                &[6],
1295                &[OpType::Put],
1296                &[(Some(11), None)],
1297            ),
1298            new_record_batch_multi_fields(
1299                &[b"k1", b"k1"],
1300                &[1, 2],
1301                &[1, 7],
1302                &[OpType::Put, OpType::Put],
1303                &[(Some(1), None), (Some(22), Some(222))],
1304            ),
1305            new_record_batch_multi_fields(
1306                &[b"k1"],
1307                &[2],
1308                &[4],
1309                &[OpType::Put],
1310                &[(Some(12), None)],
1311            ),
1312            new_record_batch_multi_fields(
1313                &[b"k2", b"k2"],
1314                &[2, 3],
1315                &[2, 5],
1316                &[OpType::Put, OpType::Delete],
1317                &[(None, None), (Some(13), None)],
1318            ),
1319            new_record_batch_multi_fields(&[b"k2"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1320        ];
1321
1322        let mut strategy = FlatLastNonNull::new(1, true);
1323        check_flat_dedup_strategy(
1324            &input,
1325            &mut strategy,
1326            &[
1327                new_record_batch_multi_fields(
1328                    &[b"k1"],
1329                    &[1],
1330                    &[6],
1331                    &[OpType::Put],
1332                    &[(Some(11), None)],
1333                ),
1334                new_record_batch_multi_fields(
1335                    &[b"k1"],
1336                    &[2],
1337                    &[7],
1338                    &[OpType::Put],
1339                    &[(Some(22), Some(222))],
1340                ),
1341                new_record_batch_multi_fields(
1342                    &[b"k2"],
1343                    &[2],
1344                    &[2],
1345                    &[OpType::Put],
1346                    &[(None, None)],
1347                ),
1348            ],
1349        );
1350    }
1351
1352    #[test]
1353    fn test_flat_last_non_null_strategy_delete_one() {
1354        let input = vec![
1355            new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1356            new_record_batch_multi_fields(
1357                &[b"k2"],
1358                &[1],
1359                &[6],
1360                &[OpType::Put],
1361                &[(Some(11), None)],
1362            ),
1363        ];
1364
1365        let mut strategy = FlatLastNonNull::new(1, true);
1366        check_flat_dedup_strategy(
1367            &input,
1368            &mut strategy,
1369            &[new_record_batch_multi_fields(
1370                &[b"k2"],
1371                &[1],
1372                &[6],
1373                &[OpType::Put],
1374                &[(Some(11), None)],
1375            )],
1376        );
1377    }
1378
1379    #[test]
1380    fn test_flat_last_non_null_strategy_delete_all() {
1381        let input = vec![
1382            new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1383            new_record_batch_multi_fields(
1384                &[b"k2"],
1385                &[1],
1386                &[6],
1387                &[OpType::Delete],
1388                &[(Some(11), None)],
1389            ),
1390        ];
1391
1392        let mut strategy = FlatLastNonNull::new(1, true);
1393        check_flat_dedup_strategy(&input, &mut strategy, &[]);
1394    }
1395
1396    #[test]
1397    fn test_flat_last_non_null_strategy_same_batch() {
1398        let input = vec![
1399            new_record_batch_multi_fields(
1400                &[b"k1"],
1401                &[1],
1402                &[6],
1403                &[OpType::Put],
1404                &[(Some(11), None)],
1405            ),
1406            new_record_batch_multi_fields(
1407                &[b"k1", b"k1"],
1408                &[1, 2],
1409                &[1, 7],
1410                &[OpType::Put, OpType::Put],
1411                &[(Some(1), None), (Some(22), Some(222))],
1412            ),
1413            new_record_batch_multi_fields(
1414                &[b"k1"],
1415                &[2],
1416                &[4],
1417                &[OpType::Put],
1418                &[(Some(12), None)],
1419            ),
1420            new_record_batch_multi_fields(
1421                &[b"k1", b"k1"],
1422                &[2, 3],
1423                &[2, 5],
1424                &[OpType::Put, OpType::Put],
1425                &[(None, None), (Some(13), None)],
1426            ),
1427            new_record_batch_multi_fields(&[b"k1"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1428        ];
1429
1430        let mut strategy = FlatLastNonNull::new(1, true);
1431        check_flat_dedup_strategy(
1432            &input,
1433            &mut strategy,
1434            &[
1435                new_record_batch_multi_fields(
1436                    &[b"k1"],
1437                    &[1],
1438                    &[6],
1439                    &[OpType::Put],
1440                    &[(Some(11), None)],
1441                ),
1442                new_record_batch_multi_fields(
1443                    &[b"k1"],
1444                    &[2],
1445                    &[7],
1446                    &[OpType::Put],
1447                    &[(Some(22), Some(222))],
1448                ),
1449                new_record_batch_multi_fields(
1450                    &[b"k1"],
1451                    &[3],
1452                    &[5],
1453                    &[OpType::Put],
1454                    &[(Some(13), Some(3))],
1455                ),
1456            ],
1457        );
1458    }
1459
1460    #[test]
1461    fn test_flat_last_non_null_strategy_delete_middle() {
1462        let input = vec![
1463            new_record_batch_multi_fields(
1464                &[b"k1"],
1465                &[1],
1466                &[7],
1467                &[OpType::Put],
1468                &[(Some(11), None)],
1469            ),
1470            new_record_batch_multi_fields(&[b"k1"], &[1], &[4], &[OpType::Delete], &[(None, None)]),
1471            new_record_batch_multi_fields(
1472                &[b"k1"],
1473                &[1],
1474                &[1],
1475                &[OpType::Put],
1476                &[(Some(12), Some(1))],
1477            ),
1478            new_record_batch_multi_fields(
1479                &[b"k1"],
1480                &[2],
1481                &[8],
1482                &[OpType::Put],
1483                &[(Some(21), None)],
1484            ),
1485            new_record_batch_multi_fields(&[b"k1"], &[2], &[5], &[OpType::Delete], &[(None, None)]),
1486            new_record_batch_multi_fields(
1487                &[b"k1"],
1488                &[2],
1489                &[2],
1490                &[OpType::Put],
1491                &[(Some(22), Some(2))],
1492            ),
1493            new_record_batch_multi_fields(
1494                &[b"k1"],
1495                &[3],
1496                &[9],
1497                &[OpType::Put],
1498                &[(Some(31), None)],
1499            ),
1500            new_record_batch_multi_fields(&[b"k1"], &[3], &[6], &[OpType::Delete], &[(None, None)]),
1501            new_record_batch_multi_fields(
1502                &[b"k1"],
1503                &[3],
1504                &[3],
1505                &[OpType::Put],
1506                &[(Some(32), Some(3))],
1507            ),
1508        ];
1509
1510        let mut strategy = FlatLastNonNull::new(1, true);
1511        check_flat_dedup_strategy(
1512            &input,
1513            &mut strategy,
1514            &[
1515                new_record_batch_multi_fields(
1516                    &[b"k1"],
1517                    &[1],
1518                    &[7],
1519                    &[OpType::Put],
1520                    &[(Some(11), None)],
1521                ),
1522                new_record_batch_multi_fields(
1523                    &[b"k1"],
1524                    &[2],
1525                    &[8],
1526                    &[OpType::Put],
1527                    &[(Some(21), None)],
1528                ),
1529                new_record_batch_multi_fields(
1530                    &[b"k1"],
1531                    &[3],
1532                    &[9],
1533                    &[OpType::Put],
1534                    &[(Some(31), None)],
1535                ),
1536            ],
1537        );
1538    }
1539}