mito2/read/
dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to remove duplicate rows from a sorted batch.
16
17use api::v1::OpType;
18use async_trait::async_trait;
19use common_telemetry::debug;
20use common_time::Timestamp;
21use datatypes::data_type::DataType;
22use datatypes::prelude::ScalarVector;
23use datatypes::value::Value;
24use datatypes::vectors::MutableVector;
25
26use crate::error::Result;
27use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
28use crate::read::{Batch, BatchColumn, BatchReader};
29
30/// A reader that dedup sorted batches from a source based on the
31/// dedup strategy.
32pub(crate) struct DedupReader<R, S> {
33    source: R,
34    strategy: S,
35    metrics: DedupMetrics,
36}
37
38impl<R, S> DedupReader<R, S> {
39    /// Creates a new dedup reader.
40    pub(crate) fn new(source: R, strategy: S) -> Self {
41        Self {
42            source,
43            strategy,
44            metrics: DedupMetrics::default(),
45        }
46    }
47}
48
49impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
50    /// Returns the next deduplicated batch.
51    async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
52        while let Some(batch) = self.source.next_batch().await? {
53            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
54                return Ok(Some(batch));
55            }
56        }
57
58        self.strategy.finish(&mut self.metrics)
59    }
60}
61
62#[async_trait]
63impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
64    async fn next_batch(&mut self) -> Result<Option<Batch>> {
65        self.fetch_next_batch().await
66    }
67}
68
69impl<R, S> Drop for DedupReader<R, S> {
70    fn drop(&mut self) {
71        debug!("Dedup reader finished, metrics: {:?}", self.metrics);
72
73        MERGE_FILTER_ROWS_TOTAL
74            .with_label_values(&["dedup"])
75            .inc_by(self.metrics.num_unselected_rows as u64);
76        MERGE_FILTER_ROWS_TOTAL
77            .with_label_values(&["delete"])
78            .inc_by(self.metrics.num_unselected_rows as u64);
79    }
80}
81
82#[cfg(test)]
83impl<R, S> DedupReader<R, S> {
84    fn metrics(&self) -> &DedupMetrics {
85        &self.metrics
86    }
87}
88
89/// Strategy to remove duplicate rows from sorted batches.
90pub(crate) trait DedupStrategy: Send {
91    /// Pushes a batch to the dedup strategy.
92    /// Returns the deduplicated batch.
93    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
94
95    /// Finishes the deduplication and resets the strategy.
96    ///
97    /// Users must ensure that `push_batch` is called for all batches before
98    /// calling this method.
99    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
100}
101
102/// State of the last row in a batch for dedup.
103struct BatchLastRow {
104    primary_key: Vec<u8>,
105    /// The last timestamp of the batch.
106    timestamp: Timestamp,
107}
108
109/// Dedup strategy that keeps the row with latest sequence of each key.
110///
111/// This strategy is optimized specially based on the properties of the SST files,
112/// memtables and the merge reader. It assumes that batches from files and memtables
113/// don't contain duplicate rows and the merge reader never concatenates batches from
114/// different source.
115///
116/// We might implement a new strategy if we need to process files with duplicate rows.
117pub(crate) struct LastRow {
118    /// Meta of the last row in the previous batch that has the same key
119    /// as the batch to push.
120    prev_batch: Option<BatchLastRow>,
121    /// Filter deleted rows.
122    filter_deleted: bool,
123}
124
125impl LastRow {
126    /// Creates a new strategy with the given `filter_deleted` flag.
127    pub(crate) fn new(filter_deleted: bool) -> Self {
128        Self {
129            prev_batch: None,
130            filter_deleted,
131        }
132    }
133}
134
135impl DedupStrategy for LastRow {
136    fn push_batch(
137        &mut self,
138        mut batch: Batch,
139        metrics: &mut DedupMetrics,
140    ) -> Result<Option<Batch>> {
141        if batch.is_empty() {
142            return Ok(None);
143        }
144        debug_assert!(batch.first_timestamp().is_some());
145        let prev_timestamp = match &self.prev_batch {
146            Some(prev_batch) => {
147                if prev_batch.primary_key != batch.primary_key() {
148                    // The key has changed. This is the first batch of the
149                    // new key.
150                    None
151                } else {
152                    Some(prev_batch.timestamp)
153                }
154            }
155            None => None,
156        };
157        if batch.first_timestamp() == prev_timestamp {
158            metrics.num_unselected_rows += 1;
159            // This batch contains a duplicate row, skip it.
160            if batch.num_rows() == 1 {
161                // We don't need to update `prev_batch` because they have the same
162                // key and timestamp.
163                return Ok(None);
164            }
165            // Skips the first row.
166            batch = batch.slice(1, batch.num_rows() - 1);
167        }
168
169        // Store current batch to `prev_batch` so we could compare the next batch
170        // with this batch. We store batch before filtering it as rows with `OpType::Delete`
171        // would be removed from the batch after filter, then we may store an incorrect `last row`
172        // of previous batch.
173        match &mut self.prev_batch {
174            Some(prev) => {
175                // Reuse the primary key buffer.
176                prev.primary_key.clone_from(&batch.primary_key);
177                prev.timestamp = batch.last_timestamp().unwrap();
178            }
179            None => {
180                self.prev_batch = Some(BatchLastRow {
181                    primary_key: batch.primary_key().to_vec(),
182                    timestamp: batch.last_timestamp().unwrap(),
183                })
184            }
185        }
186
187        // Filters deleted rows.
188        if self.filter_deleted {
189            filter_deleted_from_batch(&mut batch, metrics)?;
190        }
191
192        // The batch can become empty if all rows are deleted.
193        if batch.is_empty() {
194            Ok(None)
195        } else {
196            Ok(Some(batch))
197        }
198    }
199
200    fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
201        Ok(None)
202    }
203}
204
205/// Removes deleted rows from the batch and updates metrics.
206fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
207    let num_rows = batch.num_rows();
208    batch.filter_deleted()?;
209    let num_rows_after_filter = batch.num_rows();
210    let num_deleted = num_rows - num_rows_after_filter;
211    metrics.num_deleted_rows += num_deleted;
212    metrics.num_unselected_rows += num_deleted;
213
214    Ok(())
215}
216
217/// Metrics for deduplication.
218#[derive(Debug, Default)]
219pub(crate) struct DedupMetrics {
220    /// Number of rows removed during deduplication.
221    pub(crate) num_unselected_rows: usize,
222    /// Number of deleted rows.
223    pub(crate) num_deleted_rows: usize,
224}
225
226/// Buffer to store fields in the last row to merge.
227///
228/// Usage:
229/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()`
230/// to push the first row of batches that the timestamp is the same as the row in this builder.
231/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and
232/// return the merged batch.
233struct LastFieldsBuilder {
234    /// Filter deleted rows.
235    filter_deleted: bool,
236    /// Fields builders, lazy initialized.
237    builders: Vec<Box<dyn MutableVector>>,
238    /// Last fields to merge, lazy initialized.
239    /// Only initializes this field when `skip_merge()` is false.
240    last_fields: Vec<Value>,
241    /// Whether the last row (including `last_fields`) has null field.
242    /// Only sets this field when `contains_deletion` is false.
243    contains_null: bool,
244    /// Whether the last row has delete op. If true, skips merging fields.
245    contains_deletion: bool,
246    /// Whether the builder is initialized.
247    initialized: bool,
248}
249
250impl LastFieldsBuilder {
251    /// Returns a new builder with the given `filter_deleted` flag.
252    fn new(filter_deleted: bool) -> Self {
253        Self {
254            filter_deleted,
255            builders: Vec::new(),
256            last_fields: Vec::new(),
257            contains_null: false,
258            contains_deletion: false,
259            initialized: false,
260        }
261    }
262
263    /// Initializes the builders with the last row of the batch.
264    fn maybe_init(&mut self, batch: &Batch) {
265        debug_assert!(!batch.is_empty());
266
267        if self.initialized {
268            // Already initialized or no fields to merge.
269            return;
270        }
271
272        self.initialized = true;
273
274        if batch.fields().is_empty() {
275            // No fields to merge.
276            return;
277        }
278
279        let last_idx = batch.num_rows() - 1;
280        let fields = batch.fields();
281        // Safety: The last_idx is valid.
282        self.contains_deletion =
283            batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
284        // If the row has been deleted, then we don't need to merge fields.
285        if !self.contains_deletion {
286            self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
287        }
288
289        if self.skip_merge() {
290            // No null field or the row has been deleted, no need to merge.
291            return;
292        }
293        if self.builders.is_empty() {
294            self.builders = fields
295                .iter()
296                .map(|col| col.data.data_type().create_mutable_vector(1))
297                .collect();
298        }
299        self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
300    }
301
302    /// Returns true if the builder don't need to merge the rows.
303    fn skip_merge(&self) -> bool {
304        debug_assert!(self.initialized);
305
306        // No null field or the row has been deleted, no need to merge.
307        self.contains_deletion || !self.contains_null
308    }
309
310    /// Pushes first row of a batch to the builder.
311    fn push_first_row(&mut self, batch: &Batch) {
312        debug_assert!(self.initialized);
313        debug_assert!(!batch.is_empty());
314
315        if self.skip_merge() {
316            // No remaining null field, skips this batch.
317            return;
318        }
319
320        // Both `maybe_init()` and `push_first_row()` can update the builder. If the delete
321        // op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`.
322        // We must check the batch and update the deletion flag here to prevent
323        // the builder from merging non-null fields in rows that insert before the deleted row.
324        self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
325        if self.contains_deletion {
326            // Deletes this row.
327            return;
328        }
329
330        let fields = batch.fields();
331        for (idx, value) in self.last_fields.iter_mut().enumerate() {
332            if value.is_null() && !fields[idx].data.is_null(0) {
333                // Updates the value.
334                *value = fields[idx].data.get(0);
335            }
336        }
337        // Updates the flag.
338        self.contains_null = self.last_fields.iter().any(Value::is_null);
339    }
340
341    /// Merges last non-null fields, builds a new batch and resets the builder.
342    /// It may overwrites the last row of the `buffer`. The `buffer` is the batch
343    /// that initialized the builder.
344    fn merge_last_non_null(
345        &mut self,
346        buffer: Batch,
347        metrics: &mut DedupMetrics,
348    ) -> Result<Option<Batch>> {
349        debug_assert!(self.initialized);
350
351        let mut output = if self.last_fields.is_empty() {
352            // No need to overwrite the last row.
353            buffer
354        } else {
355            // Builds last fields.
356            for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
357                // Safety: Vectors of the batch has the same type.
358                builder.push_value_ref(value.as_value_ref());
359            }
360            let fields = self
361                .builders
362                .iter_mut()
363                .zip(buffer.fields())
364                .map(|(builder, col)| BatchColumn {
365                    column_id: col.column_id,
366                    data: builder.to_vector(),
367                })
368                .collect();
369
370            if buffer.num_rows() == 1 {
371                // Replaces the buffer directly if it only has one row.
372                buffer.with_fields(fields)?
373            } else {
374                // Replaces the last row of the buffer.
375                let front = buffer.slice(0, buffer.num_rows() - 1);
376                let last = buffer.slice(buffer.num_rows() - 1, 1);
377                let last = last.with_fields(fields)?;
378                Batch::concat(vec![front, last])?
379            }
380        };
381
382        // Resets itself. `self.builders` is already reset in `to_vector()`.
383        self.clear();
384
385        if self.filter_deleted {
386            filter_deleted_from_batch(&mut output, metrics)?;
387        }
388
389        if output.is_empty() {
390            Ok(None)
391        } else {
392            Ok(Some(output))
393        }
394    }
395
396    /// Clears the builder.
397    fn clear(&mut self) {
398        self.last_fields.clear();
399        self.contains_null = false;
400        self.contains_deletion = false;
401        self.initialized = false;
402    }
403}
404
405/// Dedup strategy that keeps the last non-null field for the same key.
406///
407/// It assumes that batches from files and memtables don't contain duplicate rows
408/// and the merge reader never concatenates batches from different source.
409///
410/// We might implement a new strategy if we need to process files with duplicate rows.
411pub(crate) struct LastNonNull {
412    /// Buffered batch that fields in the last row may be updated.
413    buffer: Option<Batch>,
414    /// Fields that overlaps with the last row of the `buffer`.
415    last_fields: LastFieldsBuilder,
416}
417
418impl LastNonNull {
419    /// Creates a new strategy with the given `filter_deleted` flag.
420    pub(crate) fn new(filter_deleted: bool) -> Self {
421        Self {
422            buffer: None,
423            last_fields: LastFieldsBuilder::new(filter_deleted),
424        }
425    }
426}
427
428impl DedupStrategy for LastNonNull {
429    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
430        if batch.is_empty() {
431            return Ok(None);
432        }
433
434        let Some(buffer) = self.buffer.as_mut() else {
435            // The buffer is empty, store the batch and return. We need to observe the next batch.
436            self.buffer = Some(batch);
437            return Ok(None);
438        };
439
440        // Initializes last fields with the first buffer.
441        self.last_fields.maybe_init(buffer);
442
443        if buffer.primary_key() != batch.primary_key() {
444            // Next key is different.
445            let buffer = std::mem::replace(buffer, batch);
446            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
447            return Ok(merged);
448        }
449
450        if buffer.last_timestamp() != batch.first_timestamp() {
451            // The next batch has a different timestamp.
452            let buffer = std::mem::replace(buffer, batch);
453            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
454            return Ok(merged);
455        }
456
457        // The next batch has the same key and timestamp.
458
459        metrics.num_unselected_rows += 1;
460        // We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
461        if batch.num_rows() == 1 {
462            self.last_fields.push_first_row(&batch);
463            return Ok(None);
464        }
465
466        // The next batch has the same key and timestamp but contains multiple rows.
467        // We can merge the first row and buffer the remaining rows.
468        let first = batch.slice(0, 1);
469        self.last_fields.push_first_row(&first);
470        // Moves the remaining rows to the buffer.
471        let batch = batch.slice(1, batch.num_rows() - 1);
472        let buffer = std::mem::replace(buffer, batch);
473        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
474
475        Ok(merged)
476    }
477
478    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
479        let Some(buffer) = self.buffer.take() else {
480            return Ok(None);
481        };
482
483        // Initializes last fields with the first buffer.
484        self.last_fields.maybe_init(&buffer);
485
486        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
487
488        Ok(merged)
489    }
490}
491
492/// An iterator that dedup rows by [LastNonNull] strategy.
493/// The input iterator must returns sorted batches.
494pub(crate) struct LastNonNullIter<I> {
495    /// Inner iterator that returns sorted batches.
496    iter: Option<I>,
497    /// Dedup strategy.
498    strategy: LastNonNull,
499    /// Dedup metrics.
500    metrics: DedupMetrics,
501    /// The current batch returned by the iterator. If it is None, we need to
502    /// fetch a new batch.
503    /// The batch is always not empty.
504    current_batch: Option<Batch>,
505    /// The index of the current row in the current batch.
506    /// more to check issue #5229.
507    current_index: usize,
508}
509
510impl<I> LastNonNullIter<I> {
511    /// Creates a new iterator with the given inner iterator.
512    pub(crate) fn new(iter: I) -> Self {
513        Self {
514            iter: Some(iter),
515            // We only use the iter in memtables. Memtables never filter deleted.
516            strategy: LastNonNull::new(false),
517            metrics: DedupMetrics::default(),
518            current_batch: None,
519            current_index: 0,
520        }
521    }
522}
523
524impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
525    /// Fetches the next batch from the inner iterator. It will slice the batch if it
526    /// contains duplicate rows.
527    fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
528        if self.current_batch.is_none() {
529            // No current batch. Fetches a new batch from the inner iterator.
530            let Some(iter) = self.iter.as_mut() else {
531                // The iterator is exhausted.
532                return Ok(None);
533            };
534
535            self.current_batch = iter.next().transpose()?;
536            self.current_index = 0;
537            if self.current_batch.is_none() {
538                // The iterator is exhausted.
539                self.iter = None;
540                return Ok(None);
541            }
542        }
543
544        if let Some(batch) = &self.current_batch {
545            let n = batch.num_rows();
546            // Safety: The batch is not empty when accessed.
547            let timestamps = batch.timestamps_native().unwrap();
548            let mut pos = self.current_index;
549            while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
550                pos += 1;
551            }
552            let segment = batch.slice(self.current_index, pos - self.current_index + 1);
553            if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
554                self.current_index = pos + 1;
555            } else {
556                self.current_batch = None;
557                self.current_index = 0;
558            }
559            return Ok(Some(segment));
560        }
561
562        Ok(None)
563    }
564
565    fn next_batch(&mut self) -> Result<Option<Batch>> {
566        while let Some(batch) = self.next_batch_for_merge()? {
567            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
568                return Ok(Some(batch));
569            }
570        }
571
572        self.strategy.finish(&mut self.metrics)
573    }
574}
575
576impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
577    type Item = Result<Batch>;
578
579    fn next(&mut self) -> Option<Self::Item> {
580        self.next_batch().transpose()
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use std::sync::Arc;
587
588    use api::v1::OpType;
589    use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
590
591    use super::*;
592    use crate::read::BatchBuilder;
593    use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
594
595    #[tokio::test]
596    async fn test_dedup_reader_no_duplications() {
597        let input = [
598            new_batch(
599                b"k1",
600                &[1, 2],
601                &[11, 12],
602                &[OpType::Put, OpType::Put],
603                &[21, 22],
604            ),
605            new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
606            new_batch(
607                b"k2",
608                &[1, 2],
609                &[111, 112],
610                &[OpType::Put, OpType::Put],
611                &[31, 32],
612            ),
613        ];
614
615        // Test last row.
616        let reader = VecBatchReader::new(&input);
617        let mut reader = DedupReader::new(reader, LastRow::new(true));
618        check_reader_result(&mut reader, &input).await;
619        assert_eq!(0, reader.metrics().num_unselected_rows);
620        assert_eq!(0, reader.metrics().num_deleted_rows);
621
622        // Test last non-null.
623        let reader = VecBatchReader::new(&input);
624        let mut reader = DedupReader::new(reader, LastNonNull::new(true));
625        check_reader_result(&mut reader, &input).await;
626        assert_eq!(0, reader.metrics().num_unselected_rows);
627        assert_eq!(0, reader.metrics().num_deleted_rows);
628    }
629
630    #[tokio::test]
631    async fn test_dedup_reader_duplications() {
632        let input = [
633            new_batch(
634                b"k1",
635                &[1, 2],
636                &[13, 11],
637                &[OpType::Put, OpType::Put],
638                &[11, 12],
639            ),
640            // empty batch.
641            new_batch(b"k1", &[], &[], &[], &[]),
642            // Duplicate with the previous batch.
643            new_batch(
644                b"k1",
645                &[2, 3, 4],
646                &[10, 13, 13],
647                &[OpType::Put, OpType::Put, OpType::Delete],
648                &[2, 13, 14],
649            ),
650            new_batch(
651                b"k2",
652                &[1, 2],
653                &[20, 20],
654                &[OpType::Put, OpType::Delete],
655                &[101, 0],
656            ),
657            new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
658            new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
659            // This batch won't increase the deleted rows count as it
660            // is filtered out by the previous batch.
661            new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
662        ];
663        // Filter deleted.
664        let reader = VecBatchReader::new(&input);
665        let mut reader = DedupReader::new(reader, LastRow::new(true));
666        check_reader_result(
667            &mut reader,
668            &[
669                new_batch(
670                    b"k1",
671                    &[1, 2],
672                    &[13, 11],
673                    &[OpType::Put, OpType::Put],
674                    &[11, 12],
675                ),
676                new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
677                new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
678                new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
679            ],
680        )
681        .await;
682        assert_eq!(5, reader.metrics().num_unselected_rows);
683        assert_eq!(2, reader.metrics().num_deleted_rows);
684
685        // Does not filter deleted.
686        let reader = VecBatchReader::new(&input);
687        let mut reader = DedupReader::new(reader, LastRow::new(false));
688        check_reader_result(
689            &mut reader,
690            &[
691                new_batch(
692                    b"k1",
693                    &[1, 2],
694                    &[13, 11],
695                    &[OpType::Put, OpType::Put],
696                    &[11, 12],
697                ),
698                new_batch(
699                    b"k1",
700                    &[3, 4],
701                    &[13, 13],
702                    &[OpType::Put, OpType::Delete],
703                    &[13, 14],
704                ),
705                new_batch(
706                    b"k2",
707                    &[1, 2],
708                    &[20, 20],
709                    &[OpType::Put, OpType::Delete],
710                    &[101, 0],
711                ),
712                new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
713            ],
714        )
715        .await;
716        assert_eq!(3, reader.metrics().num_unselected_rows);
717        assert_eq!(0, reader.metrics().num_deleted_rows);
718    }
719
720    /// Returns a new [Batch] whose field has column id 1, 2.
721    fn new_batch_multi_fields(
722        primary_key: &[u8],
723        timestamps: &[i64],
724        sequences: &[u64],
725        op_types: &[OpType],
726        fields: &[(Option<u64>, Option<u64>)],
727    ) -> Batch {
728        let mut builder = BatchBuilder::new(primary_key.to_vec());
729        builder
730            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
731                timestamps.iter().copied(),
732            )))
733            .unwrap()
734            .sequences_array(Arc::new(UInt64Array::from_iter_values(
735                sequences.iter().copied(),
736            )))
737            .unwrap()
738            .op_types_array(Arc::new(UInt8Array::from_iter_values(
739                op_types.iter().map(|v| *v as u8),
740            )))
741            .unwrap()
742            .push_field_array(
743                1,
744                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
745            )
746            .unwrap()
747            .push_field_array(
748                2,
749                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
750            )
751            .unwrap();
752        builder.build().unwrap()
753    }
754
755    #[tokio::test]
756    async fn test_last_non_null_merge() {
757        let input = [
758            new_batch_multi_fields(
759                b"k1",
760                &[1, 2],
761                &[13, 11],
762                &[OpType::Put, OpType::Put],
763                &[(Some(11), Some(11)), (None, None)],
764            ),
765            // empty batch.
766            new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
767            // Duplicate with the previous batch.
768            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
769            new_batch_multi_fields(
770                b"k1",
771                &[2, 3, 4],
772                &[10, 13, 13],
773                &[OpType::Put, OpType::Put, OpType::Delete],
774                &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
775            ),
776            new_batch_multi_fields(
777                b"k2",
778                &[1, 2],
779                &[20, 20],
780                &[OpType::Put, OpType::Delete],
781                &[(Some(101), Some(101)), (None, None)],
782            ),
783            new_batch_multi_fields(
784                b"k2",
785                &[2],
786                &[19],
787                &[OpType::Put],
788                &[(Some(102), Some(102))],
789            ),
790            new_batch_multi_fields(
791                b"k3",
792                &[2],
793                &[20],
794                &[OpType::Put],
795                &[(Some(202), Some(202))],
796            ),
797            // This batch won't increase the deleted rows count as it
798            // is filtered out by the previous batch. (All fields are null).
799            new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
800        ];
801
802        // Filter deleted.
803        let reader = VecBatchReader::new(&input);
804        let mut reader = DedupReader::new(reader, LastNonNull::new(true));
805        check_reader_result(
806            &mut reader,
807            &[
808                new_batch_multi_fields(
809                    b"k1",
810                    &[1, 2],
811                    &[13, 11],
812                    &[OpType::Put, OpType::Put],
813                    &[(Some(11), Some(11)), (Some(12), Some(22))],
814                ),
815                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
816                new_batch_multi_fields(
817                    b"k2",
818                    &[1],
819                    &[20],
820                    &[OpType::Put],
821                    &[(Some(101), Some(101))],
822                ),
823                new_batch_multi_fields(
824                    b"k3",
825                    &[2],
826                    &[20],
827                    &[OpType::Put],
828                    &[(Some(202), Some(202))],
829                ),
830            ],
831        )
832        .await;
833        assert_eq!(6, reader.metrics().num_unselected_rows);
834        assert_eq!(2, reader.metrics().num_deleted_rows);
835
836        // Does not filter deleted.
837        let reader = VecBatchReader::new(&input);
838        let mut reader = DedupReader::new(reader, LastNonNull::new(false));
839        check_reader_result(
840            &mut reader,
841            &[
842                new_batch_multi_fields(
843                    b"k1",
844                    &[1, 2],
845                    &[13, 11],
846                    &[OpType::Put, OpType::Put],
847                    &[(Some(11), Some(11)), (Some(12), Some(22))],
848                ),
849                new_batch_multi_fields(
850                    b"k1",
851                    &[3, 4],
852                    &[13, 13],
853                    &[OpType::Put, OpType::Delete],
854                    &[(Some(13), None), (None, Some(14))],
855                ),
856                new_batch_multi_fields(
857                    b"k2",
858                    &[1, 2],
859                    &[20, 20],
860                    &[OpType::Put, OpType::Delete],
861                    &[(Some(101), Some(101)), (None, None)],
862                ),
863                new_batch_multi_fields(
864                    b"k3",
865                    &[2],
866                    &[20],
867                    &[OpType::Put],
868                    &[(Some(202), Some(202))],
869                ),
870            ],
871        )
872        .await;
873        assert_eq!(4, reader.metrics().num_unselected_rows);
874        assert_eq!(0, reader.metrics().num_deleted_rows);
875    }
876
877    #[tokio::test]
878    async fn test_last_non_null_skip_merge_single() {
879        let input = [new_batch_multi_fields(
880            b"k1",
881            &[1, 2, 3],
882            &[13, 11, 13],
883            &[OpType::Put, OpType::Delete, OpType::Put],
884            &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
885        )];
886
887        let reader = VecBatchReader::new(&input);
888        let mut reader = DedupReader::new(reader, LastNonNull::new(true));
889        check_reader_result(
890            &mut reader,
891            &[new_batch_multi_fields(
892                b"k1",
893                &[1, 3],
894                &[13, 13],
895                &[OpType::Put, OpType::Put],
896                &[(Some(11), Some(11)), (Some(13), Some(13))],
897            )],
898        )
899        .await;
900        assert_eq!(1, reader.metrics().num_unselected_rows);
901        assert_eq!(1, reader.metrics().num_deleted_rows);
902
903        let reader = VecBatchReader::new(&input);
904        let mut reader = DedupReader::new(reader, LastNonNull::new(false));
905        check_reader_result(&mut reader, &input).await;
906        assert_eq!(0, reader.metrics().num_unselected_rows);
907        assert_eq!(0, reader.metrics().num_deleted_rows);
908    }
909
910    #[tokio::test]
911    async fn test_last_non_null_skip_merge_no_null() {
912        let input = [
913            new_batch_multi_fields(
914                b"k1",
915                &[1, 2],
916                &[13, 11],
917                &[OpType::Put, OpType::Put],
918                &[(Some(11), Some(11)), (Some(12), Some(12))],
919            ),
920            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
921            new_batch_multi_fields(
922                b"k1",
923                &[2, 3],
924                &[9, 13],
925                &[OpType::Put, OpType::Put],
926                &[(Some(32), None), (Some(13), Some(13))],
927            ),
928        ];
929
930        let reader = VecBatchReader::new(&input);
931        let mut reader = DedupReader::new(reader, LastNonNull::new(true));
932        check_reader_result(
933            &mut reader,
934            &[
935                new_batch_multi_fields(
936                    b"k1",
937                    &[1, 2],
938                    &[13, 11],
939                    &[OpType::Put, OpType::Put],
940                    &[(Some(11), Some(11)), (Some(12), Some(12))],
941                ),
942                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
943            ],
944        )
945        .await;
946        assert_eq!(2, reader.metrics().num_unselected_rows);
947        assert_eq!(0, reader.metrics().num_deleted_rows);
948    }
949
950    #[tokio::test]
951    async fn test_last_non_null_merge_null() {
952        let input = [
953            new_batch_multi_fields(
954                b"k1",
955                &[1, 2],
956                &[13, 11],
957                &[OpType::Put, OpType::Put],
958                &[(Some(11), Some(11)), (None, None)],
959            ),
960            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
961            new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
962        ];
963
964        let reader = VecBatchReader::new(&input);
965        let mut reader = DedupReader::new(reader, LastNonNull::new(true));
966        check_reader_result(
967            &mut reader,
968            &[
969                new_batch_multi_fields(
970                    b"k1",
971                    &[1, 2],
972                    &[13, 11],
973                    &[OpType::Put, OpType::Put],
974                    &[(Some(11), Some(11)), (None, Some(22))],
975                ),
976                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
977            ],
978        )
979        .await;
980        assert_eq!(1, reader.metrics().num_unselected_rows);
981        assert_eq!(0, reader.metrics().num_deleted_rows);
982    }
983
984    fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
985        let mut actual = Vec::new();
986        let mut metrics = DedupMetrics::default();
987        for batch in input {
988            if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
989                actual.push(out);
990            }
991        }
992        if let Some(out) = strategy.finish(&mut metrics).unwrap() {
993            actual.push(out);
994        }
995
996        assert_eq!(expect, actual);
997    }
998
999    #[test]
1000    fn test_last_non_null_strategy_delete_last() {
1001        let input = [
1002            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1003            new_batch_multi_fields(
1004                b"k1",
1005                &[1, 2],
1006                &[1, 7],
1007                &[OpType::Put, OpType::Put],
1008                &[(Some(1), None), (Some(22), Some(222))],
1009            ),
1010            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1011            new_batch_multi_fields(
1012                b"k2",
1013                &[2, 3],
1014                &[2, 5],
1015                &[OpType::Put, OpType::Delete],
1016                &[(None, None), (Some(13), None)],
1017            ),
1018            new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1019        ];
1020
1021        let mut strategy = LastNonNull::new(true);
1022        check_dedup_strategy(
1023            &input,
1024            &mut strategy,
1025            &[
1026                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1027                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1028                new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
1029            ],
1030        );
1031    }
1032
1033    #[test]
1034    fn test_last_non_null_strategy_delete_one() {
1035        let input = [
1036            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1037            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1038        ];
1039
1040        let mut strategy = LastNonNull::new(true);
1041        check_dedup_strategy(
1042            &input,
1043            &mut strategy,
1044            &[new_batch_multi_fields(
1045                b"k2",
1046                &[1],
1047                &[6],
1048                &[OpType::Put],
1049                &[(Some(11), None)],
1050            )],
1051        );
1052    }
1053
1054    #[test]
1055    fn test_last_non_null_strategy_delete_all() {
1056        let input = [
1057            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1058            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
1059        ];
1060
1061        let mut strategy = LastNonNull::new(true);
1062        check_dedup_strategy(&input, &mut strategy, &[]);
1063    }
1064
1065    #[test]
1066    fn test_last_non_null_strategy_same_batch() {
1067        let input = [
1068            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1069            new_batch_multi_fields(
1070                b"k1",
1071                &[1, 2],
1072                &[1, 7],
1073                &[OpType::Put, OpType::Put],
1074                &[(Some(1), None), (Some(22), Some(222))],
1075            ),
1076            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1077            new_batch_multi_fields(
1078                b"k1",
1079                &[2, 3],
1080                &[2, 5],
1081                &[OpType::Put, OpType::Put],
1082                &[(None, None), (Some(13), None)],
1083            ),
1084            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1085        ];
1086
1087        let mut strategy = LastNonNull::new(true);
1088        check_dedup_strategy(
1089            &input,
1090            &mut strategy,
1091            &[
1092                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1093                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1094                new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
1095            ],
1096        );
1097    }
1098
1099    #[test]
1100    fn test_last_non_null_strategy_delete_middle() {
1101        let input = [
1102            new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1103            new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
1104            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
1105            new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1106            new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
1107            new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
1108            new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1109            new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
1110            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
1111        ];
1112
1113        let mut strategy = LastNonNull::new(true);
1114        check_dedup_strategy(
1115            &input,
1116            &mut strategy,
1117            &[
1118                new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1119                new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1120                new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1121            ],
1122        );
1123    }
1124
1125    #[test]
1126    fn test_last_non_null_iter_on_batch() {
1127        let input = [new_batch_multi_fields(
1128            b"k1",
1129            &[1, 1, 2],
1130            &[13, 12, 13],
1131            &[OpType::Put, OpType::Put, OpType::Put],
1132            &[(None, None), (Some(1), None), (Some(2), Some(22))],
1133        )];
1134        let iter = input.into_iter().map(Ok);
1135        let iter = LastNonNullIter::new(iter);
1136        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1137        let expect = [
1138            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1139            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1140        ];
1141        assert_eq!(&expect, &actual[..]);
1142    }
1143
1144    #[test]
1145    fn test_last_non_null_iter_same_row() {
1146        let input = [
1147            new_batch_multi_fields(
1148                b"k1",
1149                &[1, 1, 1],
1150                &[13, 12, 11],
1151                &[OpType::Put, OpType::Put, OpType::Put],
1152                &[(None, None), (Some(1), None), (Some(11), None)],
1153            ),
1154            new_batch_multi_fields(
1155                b"k1",
1156                &[1, 1],
1157                &[10, 9],
1158                &[OpType::Put, OpType::Put],
1159                &[(None, Some(11)), (Some(21), Some(31))],
1160            ),
1161        ];
1162        let iter = input.into_iter().map(Ok);
1163        let iter = LastNonNullIter::new(iter);
1164        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1165        let expect = [new_batch_multi_fields(
1166            b"k1",
1167            &[1],
1168            &[13],
1169            &[OpType::Put],
1170            &[(Some(1), Some(11))],
1171        )];
1172        assert_eq!(&expect, &actual[..]);
1173    }
1174
1175    #[test]
1176    fn test_last_non_null_iter_multi_batch() {
1177        let input = [
1178            new_batch_multi_fields(
1179                b"k1",
1180                &[1, 1, 2],
1181                &[13, 12, 13],
1182                &[OpType::Put, OpType::Put, OpType::Put],
1183                &[(None, None), (Some(1), None), (Some(2), Some(22))],
1184            ),
1185            new_batch_multi_fields(
1186                b"k1",
1187                &[2, 3],
1188                &[12, 13],
1189                &[OpType::Put, OpType::Delete],
1190                &[(None, Some(12)), (None, None)],
1191            ),
1192            new_batch_multi_fields(
1193                b"k2",
1194                &[1, 1, 2],
1195                &[13, 12, 13],
1196                &[OpType::Put, OpType::Put, OpType::Put],
1197                &[(None, None), (Some(1), None), (Some(2), Some(22))],
1198            ),
1199        ];
1200        let iter = input.into_iter().map(Ok);
1201        let iter = LastNonNullIter::new(iter);
1202        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1203        let expect = [
1204            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1205            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1206            new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
1207            new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1208            new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1209        ];
1210        assert_eq!(&expect, &actual[..]);
1211    }
1212
1213    /// Returns a new [Batch] without fields.
1214    fn new_batch_no_fields(
1215        primary_key: &[u8],
1216        timestamps: &[i64],
1217        sequences: &[u64],
1218        op_types: &[OpType],
1219    ) -> Batch {
1220        let mut builder = BatchBuilder::new(primary_key.to_vec());
1221        builder
1222            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1223                timestamps.iter().copied(),
1224            )))
1225            .unwrap()
1226            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1227                sequences.iter().copied(),
1228            )))
1229            .unwrap()
1230            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1231                op_types.iter().map(|v| *v as u8),
1232            )))
1233            .unwrap();
1234        builder.build().unwrap()
1235    }
1236
1237    #[test]
1238    fn test_last_non_null_iter_no_batch() {
1239        let input = [
1240            new_batch_no_fields(
1241                b"k1",
1242                &[1, 1, 2],
1243                &[13, 12, 13],
1244                &[OpType::Put, OpType::Put, OpType::Put],
1245            ),
1246            new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
1247            new_batch_no_fields(
1248                b"k2",
1249                &[1, 1, 2],
1250                &[13, 12, 13],
1251                &[OpType::Put, OpType::Put, OpType::Put],
1252            ),
1253        ];
1254        let iter = input.into_iter().map(Ok);
1255        let iter = LastNonNullIter::new(iter);
1256        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1257        let expect = [
1258            new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
1259            new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
1260            new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
1261            new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
1262            new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
1263        ];
1264        assert_eq!(&expect, &actual[..]);
1265    }
1266}