mito2/read/
dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to remove duplicate rows from a sorted batch.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use async_trait::async_trait;
23use common_telemetry::debug;
24use common_time::Timestamp;
25use datatypes::data_type::DataType;
26use datatypes::prelude::ScalarVector;
27use datatypes::value::Value;
28use datatypes::vectors::MutableVector;
29
30use crate::error::Result;
31use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
32use crate::read::{Batch, BatchColumn, BatchReader};
33
34/// Trait for reporting dedup metrics.
35pub trait DedupMetricsReport: Send + Sync {
36    /// Reports and resets the metrics.
37    fn report(&self, metrics: &mut DedupMetrics);
38}
39
40/// A reader that dedup sorted batches from a source based on the
41/// dedup strategy.
42pub struct DedupReader<R, S> {
43    source: R,
44    strategy: S,
45    metrics: DedupMetrics,
46    /// Optional metrics reporter.
47    metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
48}
49
50impl<R, S> DedupReader<R, S> {
51    /// Creates a new dedup reader.
52    pub fn new(
53        source: R,
54        strategy: S,
55        metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
56    ) -> Self {
57        Self {
58            source,
59            strategy,
60            metrics: DedupMetrics::default(),
61            metrics_reporter,
62        }
63    }
64}
65
66impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
67    /// Returns the next deduplicated batch.
68    async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
69        while let Some(batch) = self.source.next_batch().await? {
70            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
71                self.metrics.maybe_report(&self.metrics_reporter);
72                return Ok(Some(batch));
73            }
74        }
75
76        let result = self.strategy.finish(&mut self.metrics)?;
77        self.metrics.maybe_report(&self.metrics_reporter);
78        Ok(result)
79    }
80}
81
82#[async_trait]
83impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
84    async fn next_batch(&mut self) -> Result<Option<Batch>> {
85        self.fetch_next_batch().await
86    }
87}
88
89impl<R, S> Drop for DedupReader<R, S> {
90    fn drop(&mut self) {
91        debug!("Dedup reader finished, metrics: {:?}", self.metrics);
92
93        MERGE_FILTER_ROWS_TOTAL
94            .with_label_values(&["dedup"])
95            .inc_by(self.metrics.num_unselected_rows as u64);
96        MERGE_FILTER_ROWS_TOTAL
97            .with_label_values(&["delete"])
98            .inc_by(self.metrics.num_unselected_rows as u64);
99
100        // Report any remaining metrics.
101        if let Some(reporter) = &self.metrics_reporter {
102            reporter.report(&mut self.metrics);
103        }
104    }
105}
106
107#[cfg(test)]
108impl<R, S> DedupReader<R, S> {
109    fn metrics(&self) -> &DedupMetrics {
110        &self.metrics
111    }
112}
113
114/// Strategy to remove duplicate rows from sorted batches.
115pub trait DedupStrategy: Send {
116    /// Pushes a batch to the dedup strategy.
117    /// Returns the deduplicated batch.
118    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
119
120    /// Finishes the deduplication and resets the strategy.
121    ///
122    /// Users must ensure that `push_batch` is called for all batches before
123    /// calling this method.
124    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
125}
126
127/// State of the last row in a batch for dedup.
128struct BatchLastRow {
129    primary_key: Vec<u8>,
130    /// The last timestamp of the batch.
131    timestamp: Timestamp,
132}
133
134/// Dedup strategy that keeps the row with latest sequence of each key.
135///
136/// This strategy is optimized specially based on the properties of the SST files,
137/// memtables and the merge reader. It assumes that batches from files and memtables
138/// don't contain duplicate rows and the merge reader never concatenates batches from
139/// different source.
140///
141/// We might implement a new strategy if we need to process files with duplicate rows.
142pub struct LastRow {
143    /// Meta of the last row in the previous batch that has the same key
144    /// as the batch to push.
145    prev_batch: Option<BatchLastRow>,
146    /// Filter deleted rows.
147    filter_deleted: bool,
148}
149
150impl LastRow {
151    /// Creates a new strategy with the given `filter_deleted` flag.
152    pub fn new(filter_deleted: bool) -> Self {
153        Self {
154            prev_batch: None,
155            filter_deleted,
156        }
157    }
158}
159
160impl DedupStrategy for LastRow {
161    fn push_batch(
162        &mut self,
163        mut batch: Batch,
164        metrics: &mut DedupMetrics,
165    ) -> Result<Option<Batch>> {
166        let start = Instant::now();
167
168        if batch.is_empty() {
169            return Ok(None);
170        }
171        debug_assert!(batch.first_timestamp().is_some());
172        let prev_timestamp = match &self.prev_batch {
173            Some(prev_batch) => {
174                if prev_batch.primary_key != batch.primary_key() {
175                    // The key has changed. This is the first batch of the
176                    // new key.
177                    None
178                } else {
179                    Some(prev_batch.timestamp)
180                }
181            }
182            None => None,
183        };
184        if batch.first_timestamp() == prev_timestamp {
185            metrics.num_unselected_rows += 1;
186            // This batch contains a duplicate row, skip it.
187            if batch.num_rows() == 1 {
188                // We don't need to update `prev_batch` because they have the same
189                // key and timestamp.
190                metrics.dedup_cost += start.elapsed();
191                return Ok(None);
192            }
193            // Skips the first row.
194            batch = batch.slice(1, batch.num_rows() - 1);
195        }
196
197        // Store current batch to `prev_batch` so we could compare the next batch
198        // with this batch. We store batch before filtering it as rows with `OpType::Delete`
199        // would be removed from the batch after filter, then we may store an incorrect `last row`
200        // of previous batch.
201        match &mut self.prev_batch {
202            Some(prev) => {
203                // Reuse the primary key buffer.
204                prev.primary_key.clone_from(&batch.primary_key);
205                prev.timestamp = batch.last_timestamp().unwrap();
206            }
207            None => {
208                self.prev_batch = Some(BatchLastRow {
209                    primary_key: batch.primary_key().to_vec(),
210                    timestamp: batch.last_timestamp().unwrap(),
211                })
212            }
213        }
214
215        // Filters deleted rows.
216        if self.filter_deleted {
217            filter_deleted_from_batch(&mut batch, metrics)?;
218        }
219
220        metrics.dedup_cost += start.elapsed();
221
222        // The batch can become empty if all rows are deleted.
223        if batch.is_empty() {
224            Ok(None)
225        } else {
226            Ok(Some(batch))
227        }
228    }
229
230    fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
231        Ok(None)
232    }
233}
234
235/// Removes deleted rows from the batch and updates metrics.
236fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
237    let num_rows = batch.num_rows();
238    batch.filter_deleted()?;
239    let num_rows_after_filter = batch.num_rows();
240    let num_deleted = num_rows - num_rows_after_filter;
241    metrics.num_deleted_rows += num_deleted;
242    metrics.num_unselected_rows += num_deleted;
243
244    Ok(())
245}
246
247/// Metrics for deduplication.
248#[derive(Default)]
249pub struct DedupMetrics {
250    /// Number of rows removed during deduplication.
251    pub(crate) num_unselected_rows: usize,
252    /// Number of deleted rows.
253    pub(crate) num_deleted_rows: usize,
254    /// Time spent on deduplication.
255    pub(crate) dedup_cost: Duration,
256}
257
258impl fmt::Debug for DedupMetrics {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        // Skip output if dedup_cost is zero
261        if self.dedup_cost.is_zero() {
262            return write!(f, "{{}}");
263        }
264
265        write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
266
267        if self.num_unselected_rows > 0 {
268            write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
269        }
270        if self.num_deleted_rows > 0 {
271            write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
272        }
273
274        write!(f, "}}")
275    }
276}
277
278impl DedupMetrics {
279    /// Merges metrics from another DedupMetrics instance.
280    pub(crate) fn merge(&mut self, other: &DedupMetrics) {
281        let DedupMetrics {
282            num_unselected_rows,
283            num_deleted_rows,
284            dedup_cost,
285        } = other;
286
287        self.num_unselected_rows += *num_unselected_rows;
288        self.num_deleted_rows += *num_deleted_rows;
289        self.dedup_cost += *dedup_cost;
290    }
291
292    /// Reports the metrics if dedup_cost exceeds 10ms and resets them.
293    pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
294        if self.dedup_cost.as_millis() > 10
295            && let Some(r) = reporter
296        {
297            r.report(self);
298        }
299    }
300}
301
302/// Buffer to store fields in the last row to merge.
303///
304/// Usage:
305/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()`
306/// to push the first row of batches that the timestamp is the same as the row in this builder.
307/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and
308/// return the merged batch.
309struct LastFieldsBuilder {
310    /// Filter deleted rows.
311    filter_deleted: bool,
312    /// Fields builders, lazy initialized.
313    builders: Vec<Box<dyn MutableVector>>,
314    /// Last fields to merge, lazy initialized.
315    /// Only initializes this field when `skip_merge()` is false.
316    last_fields: Vec<Value>,
317    /// Whether the last row (including `last_fields`) has null field.
318    /// Only sets this field when `contains_deletion` is false.
319    contains_null: bool,
320    /// Whether the last row has delete op. If true, skips merging fields.
321    contains_deletion: bool,
322    /// Whether the builder is initialized.
323    initialized: bool,
324}
325
326impl LastFieldsBuilder {
327    /// Returns a new builder with the given `filter_deleted` flag.
328    fn new(filter_deleted: bool) -> Self {
329        Self {
330            filter_deleted,
331            builders: Vec::new(),
332            last_fields: Vec::new(),
333            contains_null: false,
334            contains_deletion: false,
335            initialized: false,
336        }
337    }
338
339    /// Initializes the builders with the last row of the batch.
340    fn maybe_init(&mut self, batch: &Batch) {
341        debug_assert!(!batch.is_empty());
342
343        if self.initialized {
344            // Already initialized or no fields to merge.
345            return;
346        }
347
348        self.initialized = true;
349
350        if batch.fields().is_empty() {
351            // No fields to merge.
352            return;
353        }
354
355        let last_idx = batch.num_rows() - 1;
356        let fields = batch.fields();
357        // Safety: The last_idx is valid.
358        self.contains_deletion =
359            batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
360        // If the row has been deleted, then we don't need to merge fields.
361        if !self.contains_deletion {
362            self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
363        }
364
365        if self.skip_merge() {
366            // No null field or the row has been deleted, no need to merge.
367            return;
368        }
369        if self.builders.is_empty() {
370            self.builders = fields
371                .iter()
372                .map(|col| col.data.data_type().create_mutable_vector(1))
373                .collect();
374        }
375        self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
376    }
377
378    /// Returns true if the builder don't need to merge the rows.
379    fn skip_merge(&self) -> bool {
380        debug_assert!(self.initialized);
381
382        // No null field or the row has been deleted, no need to merge.
383        self.contains_deletion || !self.contains_null
384    }
385
386    /// Pushes first row of a batch to the builder.
387    fn push_first_row(&mut self, batch: &Batch) {
388        debug_assert!(self.initialized);
389        debug_assert!(!batch.is_empty());
390
391        if self.skip_merge() {
392            // No remaining null field, skips this batch.
393            return;
394        }
395
396        // Both `maybe_init()` and `push_first_row()` can update the builder. If the delete
397        // op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`.
398        // We must check the batch and update the deletion flag here to prevent
399        // the builder from merging non-null fields in rows that insert before the deleted row.
400        self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
401        if self.contains_deletion {
402            // Deletes this row.
403            return;
404        }
405
406        let fields = batch.fields();
407        for (idx, value) in self.last_fields.iter_mut().enumerate() {
408            if value.is_null() && !fields[idx].data.is_null(0) {
409                // Updates the value.
410                *value = fields[idx].data.get(0);
411            }
412        }
413        // Updates the flag.
414        self.contains_null = self.last_fields.iter().any(Value::is_null);
415    }
416
417    /// Merges last non-null fields, builds a new batch and resets the builder.
418    /// It may overwrites the last row of the `buffer`. The `buffer` is the batch
419    /// that initialized the builder.
420    fn merge_last_non_null(
421        &mut self,
422        buffer: Batch,
423        metrics: &mut DedupMetrics,
424    ) -> Result<Option<Batch>> {
425        debug_assert!(self.initialized);
426
427        let mut output = if self.last_fields.is_empty() {
428            // No need to overwrite the last row.
429            buffer
430        } else {
431            // Builds last fields.
432            for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
433                // Safety: Vectors of the batch has the same type.
434                builder.push_value_ref(&value.as_value_ref());
435            }
436            let fields = self
437                .builders
438                .iter_mut()
439                .zip(buffer.fields())
440                .map(|(builder, col)| BatchColumn {
441                    column_id: col.column_id,
442                    data: builder.to_vector(),
443                })
444                .collect();
445
446            if buffer.num_rows() == 1 {
447                // Replaces the buffer directly if it only has one row.
448                buffer.with_fields(fields)?
449            } else {
450                // Replaces the last row of the buffer.
451                let front = buffer.slice(0, buffer.num_rows() - 1);
452                let last = buffer.slice(buffer.num_rows() - 1, 1);
453                let last = last.with_fields(fields)?;
454                Batch::concat(vec![front, last])?
455            }
456        };
457
458        // Resets itself. `self.builders` is already reset in `to_vector()`.
459        self.clear();
460
461        if self.filter_deleted {
462            filter_deleted_from_batch(&mut output, metrics)?;
463        }
464
465        if output.is_empty() {
466            Ok(None)
467        } else {
468            Ok(Some(output))
469        }
470    }
471
472    /// Clears the builder.
473    fn clear(&mut self) {
474        self.last_fields.clear();
475        self.contains_null = false;
476        self.contains_deletion = false;
477        self.initialized = false;
478    }
479}
480
481/// Dedup strategy that keeps the last non-null field for the same key.
482///
483/// It assumes that batches from files and memtables don't contain duplicate rows
484/// and the merge reader never concatenates batches from different source.
485///
486/// We might implement a new strategy if we need to process files with duplicate rows.
487pub(crate) struct LastNonNull {
488    /// Buffered batch that fields in the last row may be updated.
489    buffer: Option<Batch>,
490    /// Fields that overlaps with the last row of the `buffer`.
491    last_fields: LastFieldsBuilder,
492}
493
494impl LastNonNull {
495    /// Creates a new strategy with the given `filter_deleted` flag.
496    pub(crate) fn new(filter_deleted: bool) -> Self {
497        Self {
498            buffer: None,
499            last_fields: LastFieldsBuilder::new(filter_deleted),
500        }
501    }
502}
503
504impl DedupStrategy for LastNonNull {
505    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
506        let start = Instant::now();
507
508        if batch.is_empty() {
509            return Ok(None);
510        }
511
512        let Some(buffer) = self.buffer.as_mut() else {
513            // The buffer is empty, store the batch and return. We need to observe the next batch.
514            self.buffer = Some(batch);
515            return Ok(None);
516        };
517
518        // Initializes last fields with the first buffer.
519        self.last_fields.maybe_init(buffer);
520
521        if buffer.primary_key() != batch.primary_key() {
522            // Next key is different.
523            let buffer = std::mem::replace(buffer, batch);
524            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
525            metrics.dedup_cost += start.elapsed();
526            return Ok(merged);
527        }
528
529        if buffer.last_timestamp() != batch.first_timestamp() {
530            // The next batch has a different timestamp.
531            let buffer = std::mem::replace(buffer, batch);
532            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
533            metrics.dedup_cost += start.elapsed();
534            return Ok(merged);
535        }
536
537        // The next batch has the same key and timestamp.
538
539        metrics.num_unselected_rows += 1;
540        // We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
541        if batch.num_rows() == 1 {
542            self.last_fields.push_first_row(&batch);
543            metrics.dedup_cost += start.elapsed();
544            return Ok(None);
545        }
546
547        // The next batch has the same key and timestamp but contains multiple rows.
548        // We can merge the first row and buffer the remaining rows.
549        let first = batch.slice(0, 1);
550        self.last_fields.push_first_row(&first);
551        // Moves the remaining rows to the buffer.
552        let batch = batch.slice(1, batch.num_rows() - 1);
553        let buffer = std::mem::replace(buffer, batch);
554        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
555
556        metrics.dedup_cost += start.elapsed();
557
558        Ok(merged)
559    }
560
561    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
562        let start = Instant::now();
563
564        let Some(buffer) = self.buffer.take() else {
565            return Ok(None);
566        };
567
568        // Initializes last fields with the first buffer.
569        self.last_fields.maybe_init(&buffer);
570
571        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
572
573        metrics.dedup_cost += start.elapsed();
574
575        Ok(merged)
576    }
577}
578
579/// An iterator that dedup rows by [LastNonNull] strategy.
580/// The input iterator must returns sorted batches.
581pub(crate) struct LastNonNullIter<I> {
582    /// Inner iterator that returns sorted batches.
583    iter: Option<I>,
584    /// Dedup strategy.
585    strategy: LastNonNull,
586    /// Dedup metrics.
587    metrics: DedupMetrics,
588    /// The current batch returned by the iterator. If it is None, we need to
589    /// fetch a new batch.
590    /// The batch is always not empty.
591    current_batch: Option<Batch>,
592    /// The index of the current row in the current batch.
593    /// more to check issue #5229.
594    current_index: usize,
595}
596
597impl<I> LastNonNullIter<I> {
598    /// Creates a new iterator with the given inner iterator.
599    pub(crate) fn new(iter: I) -> Self {
600        Self {
601            iter: Some(iter),
602            // We only use the iter in memtables. Memtables never filter deleted.
603            strategy: LastNonNull::new(false),
604            metrics: DedupMetrics::default(),
605            current_batch: None,
606            current_index: 0,
607        }
608    }
609}
610
611impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
612    /// Fetches the next batch from the inner iterator. It will slice the batch if it
613    /// contains duplicate rows.
614    fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
615        if self.current_batch.is_none() {
616            // No current batch. Fetches a new batch from the inner iterator.
617            let Some(iter) = self.iter.as_mut() else {
618                // The iterator is exhausted.
619                return Ok(None);
620            };
621
622            self.current_batch = iter.next().transpose()?;
623            self.current_index = 0;
624            if self.current_batch.is_none() {
625                // The iterator is exhausted.
626                self.iter = None;
627                return Ok(None);
628            }
629        }
630
631        if let Some(batch) = &self.current_batch {
632            let n = batch.num_rows();
633            // Safety: The batch is not empty when accessed.
634            let timestamps = batch.timestamps_native().unwrap();
635            let mut pos = self.current_index;
636            while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
637                pos += 1;
638            }
639            let segment = batch.slice(self.current_index, pos - self.current_index + 1);
640            if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
641                self.current_index = pos + 1;
642            } else {
643                self.current_batch = None;
644                self.current_index = 0;
645            }
646            return Ok(Some(segment));
647        }
648
649        Ok(None)
650    }
651
652    fn next_batch(&mut self) -> Result<Option<Batch>> {
653        while let Some(batch) = self.next_batch_for_merge()? {
654            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
655                return Ok(Some(batch));
656            }
657        }
658
659        self.strategy.finish(&mut self.metrics)
660    }
661}
662
663impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
664    type Item = Result<Batch>;
665
666    fn next(&mut self) -> Option<Self::Item> {
667        self.next_batch().transpose()
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use std::sync::Arc;
674
675    use api::v1::OpType;
676    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
677
678    use super::*;
679    use crate::read::BatchBuilder;
680    use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
681
682    #[tokio::test]
683    async fn test_dedup_reader_no_duplications() {
684        let input = [
685            new_batch(
686                b"k1",
687                &[1, 2],
688                &[11, 12],
689                &[OpType::Put, OpType::Put],
690                &[21, 22],
691            ),
692            new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
693            new_batch(
694                b"k2",
695                &[1, 2],
696                &[111, 112],
697                &[OpType::Put, OpType::Put],
698                &[31, 32],
699            ),
700        ];
701
702        // Test last row.
703        let reader = VecBatchReader::new(&input);
704        let mut reader = DedupReader::new(reader, LastRow::new(true), None);
705        check_reader_result(&mut reader, &input).await;
706        assert_eq!(0, reader.metrics().num_unselected_rows);
707        assert_eq!(0, reader.metrics().num_deleted_rows);
708
709        // Test last non-null.
710        let reader = VecBatchReader::new(&input);
711        let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
712        check_reader_result(&mut reader, &input).await;
713        assert_eq!(0, reader.metrics().num_unselected_rows);
714        assert_eq!(0, reader.metrics().num_deleted_rows);
715    }
716
717    #[tokio::test]
718    async fn test_dedup_reader_duplications() {
719        let input = [
720            new_batch(
721                b"k1",
722                &[1, 2],
723                &[13, 11],
724                &[OpType::Put, OpType::Put],
725                &[11, 12],
726            ),
727            // empty batch.
728            new_batch(b"k1", &[], &[], &[], &[]),
729            // Duplicate with the previous batch.
730            new_batch(
731                b"k1",
732                &[2, 3, 4],
733                &[10, 13, 13],
734                &[OpType::Put, OpType::Put, OpType::Delete],
735                &[2, 13, 14],
736            ),
737            new_batch(
738                b"k2",
739                &[1, 2],
740                &[20, 20],
741                &[OpType::Put, OpType::Delete],
742                &[101, 0],
743            ),
744            new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
745            new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
746            // This batch won't increase the deleted rows count as it
747            // is filtered out by the previous batch.
748            new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
749        ];
750        // Filter deleted.
751        let reader = VecBatchReader::new(&input);
752        let mut reader = DedupReader::new(reader, LastRow::new(true), None);
753        check_reader_result(
754            &mut reader,
755            &[
756                new_batch(
757                    b"k1",
758                    &[1, 2],
759                    &[13, 11],
760                    &[OpType::Put, OpType::Put],
761                    &[11, 12],
762                ),
763                new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
764                new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
765                new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
766            ],
767        )
768        .await;
769        assert_eq!(5, reader.metrics().num_unselected_rows);
770        assert_eq!(2, reader.metrics().num_deleted_rows);
771
772        // Does not filter deleted.
773        let reader = VecBatchReader::new(&input);
774        let mut reader = DedupReader::new(reader, LastRow::new(false), None);
775        check_reader_result(
776            &mut reader,
777            &[
778                new_batch(
779                    b"k1",
780                    &[1, 2],
781                    &[13, 11],
782                    &[OpType::Put, OpType::Put],
783                    &[11, 12],
784                ),
785                new_batch(
786                    b"k1",
787                    &[3, 4],
788                    &[13, 13],
789                    &[OpType::Put, OpType::Delete],
790                    &[13, 14],
791                ),
792                new_batch(
793                    b"k2",
794                    &[1, 2],
795                    &[20, 20],
796                    &[OpType::Put, OpType::Delete],
797                    &[101, 0],
798                ),
799                new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
800            ],
801        )
802        .await;
803        assert_eq!(3, reader.metrics().num_unselected_rows);
804        assert_eq!(0, reader.metrics().num_deleted_rows);
805    }
806
807    /// Returns a new [Batch] whose field has column id 1, 2.
808    fn new_batch_multi_fields(
809        primary_key: &[u8],
810        timestamps: &[i64],
811        sequences: &[u64],
812        op_types: &[OpType],
813        fields: &[(Option<u64>, Option<u64>)],
814    ) -> Batch {
815        let mut builder = BatchBuilder::new(primary_key.to_vec());
816        builder
817            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
818                timestamps.iter().copied(),
819            )))
820            .unwrap()
821            .sequences_array(Arc::new(UInt64Array::from_iter_values(
822                sequences.iter().copied(),
823            )))
824            .unwrap()
825            .op_types_array(Arc::new(UInt8Array::from_iter_values(
826                op_types.iter().map(|v| *v as u8),
827            )))
828            .unwrap()
829            .push_field_array(
830                1,
831                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
832            )
833            .unwrap()
834            .push_field_array(
835                2,
836                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
837            )
838            .unwrap();
839        builder.build().unwrap()
840    }
841
842    #[tokio::test]
843    async fn test_last_non_null_merge() {
844        let input = [
845            new_batch_multi_fields(
846                b"k1",
847                &[1, 2],
848                &[13, 11],
849                &[OpType::Put, OpType::Put],
850                &[(Some(11), Some(11)), (None, None)],
851            ),
852            // empty batch.
853            new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
854            // Duplicate with the previous batch.
855            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
856            new_batch_multi_fields(
857                b"k1",
858                &[2, 3, 4],
859                &[10, 13, 13],
860                &[OpType::Put, OpType::Put, OpType::Delete],
861                &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
862            ),
863            new_batch_multi_fields(
864                b"k2",
865                &[1, 2],
866                &[20, 20],
867                &[OpType::Put, OpType::Delete],
868                &[(Some(101), Some(101)), (None, None)],
869            ),
870            new_batch_multi_fields(
871                b"k2",
872                &[2],
873                &[19],
874                &[OpType::Put],
875                &[(Some(102), Some(102))],
876            ),
877            new_batch_multi_fields(
878                b"k3",
879                &[2],
880                &[20],
881                &[OpType::Put],
882                &[(Some(202), Some(202))],
883            ),
884            // This batch won't increase the deleted rows count as it
885            // is filtered out by the previous batch. (All fields are null).
886            new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
887        ];
888
889        // Filter deleted.
890        let reader = VecBatchReader::new(&input);
891        let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
892        check_reader_result(
893            &mut reader,
894            &[
895                new_batch_multi_fields(
896                    b"k1",
897                    &[1, 2],
898                    &[13, 11],
899                    &[OpType::Put, OpType::Put],
900                    &[(Some(11), Some(11)), (Some(12), Some(22))],
901                ),
902                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
903                new_batch_multi_fields(
904                    b"k2",
905                    &[1],
906                    &[20],
907                    &[OpType::Put],
908                    &[(Some(101), Some(101))],
909                ),
910                new_batch_multi_fields(
911                    b"k3",
912                    &[2],
913                    &[20],
914                    &[OpType::Put],
915                    &[(Some(202), Some(202))],
916                ),
917            ],
918        )
919        .await;
920        assert_eq!(6, reader.metrics().num_unselected_rows);
921        assert_eq!(2, reader.metrics().num_deleted_rows);
922
923        // Does not filter deleted.
924        let reader = VecBatchReader::new(&input);
925        let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
926        check_reader_result(
927            &mut reader,
928            &[
929                new_batch_multi_fields(
930                    b"k1",
931                    &[1, 2],
932                    &[13, 11],
933                    &[OpType::Put, OpType::Put],
934                    &[(Some(11), Some(11)), (Some(12), Some(22))],
935                ),
936                new_batch_multi_fields(
937                    b"k1",
938                    &[3, 4],
939                    &[13, 13],
940                    &[OpType::Put, OpType::Delete],
941                    &[(Some(13), None), (None, Some(14))],
942                ),
943                new_batch_multi_fields(
944                    b"k2",
945                    &[1, 2],
946                    &[20, 20],
947                    &[OpType::Put, OpType::Delete],
948                    &[(Some(101), Some(101)), (None, None)],
949                ),
950                new_batch_multi_fields(
951                    b"k3",
952                    &[2],
953                    &[20],
954                    &[OpType::Put],
955                    &[(Some(202), Some(202))],
956                ),
957            ],
958        )
959        .await;
960        assert_eq!(4, reader.metrics().num_unselected_rows);
961        assert_eq!(0, reader.metrics().num_deleted_rows);
962    }
963
964    #[tokio::test]
965    async fn test_last_non_null_skip_merge_single() {
966        let input = [new_batch_multi_fields(
967            b"k1",
968            &[1, 2, 3],
969            &[13, 11, 13],
970            &[OpType::Put, OpType::Delete, OpType::Put],
971            &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
972        )];
973
974        let reader = VecBatchReader::new(&input);
975        let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
976        check_reader_result(
977            &mut reader,
978            &[new_batch_multi_fields(
979                b"k1",
980                &[1, 3],
981                &[13, 13],
982                &[OpType::Put, OpType::Put],
983                &[(Some(11), Some(11)), (Some(13), Some(13))],
984            )],
985        )
986        .await;
987        assert_eq!(1, reader.metrics().num_unselected_rows);
988        assert_eq!(1, reader.metrics().num_deleted_rows);
989
990        let reader = VecBatchReader::new(&input);
991        let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
992        check_reader_result(&mut reader, &input).await;
993        assert_eq!(0, reader.metrics().num_unselected_rows);
994        assert_eq!(0, reader.metrics().num_deleted_rows);
995    }
996
997    #[tokio::test]
998    async fn test_last_non_null_skip_merge_no_null() {
999        let input = [
1000            new_batch_multi_fields(
1001                b"k1",
1002                &[1, 2],
1003                &[13, 11],
1004                &[OpType::Put, OpType::Put],
1005                &[(Some(11), Some(11)), (Some(12), Some(12))],
1006            ),
1007            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
1008            new_batch_multi_fields(
1009                b"k1",
1010                &[2, 3],
1011                &[9, 13],
1012                &[OpType::Put, OpType::Put],
1013                &[(Some(32), None), (Some(13), Some(13))],
1014            ),
1015        ];
1016
1017        let reader = VecBatchReader::new(&input);
1018        let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
1019        check_reader_result(
1020            &mut reader,
1021            &[
1022                new_batch_multi_fields(
1023                    b"k1",
1024                    &[1, 2],
1025                    &[13, 11],
1026                    &[OpType::Put, OpType::Put],
1027                    &[(Some(11), Some(11)), (Some(12), Some(12))],
1028                ),
1029                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
1030            ],
1031        )
1032        .await;
1033        assert_eq!(2, reader.metrics().num_unselected_rows);
1034        assert_eq!(0, reader.metrics().num_deleted_rows);
1035    }
1036
1037    #[tokio::test]
1038    async fn test_last_non_null_merge_null() {
1039        let input = [
1040            new_batch_multi_fields(
1041                b"k1",
1042                &[1, 2],
1043                &[13, 11],
1044                &[OpType::Put, OpType::Put],
1045                &[(Some(11), Some(11)), (None, None)],
1046            ),
1047            new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
1048            new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
1049        ];
1050
1051        let reader = VecBatchReader::new(&input);
1052        let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
1053        check_reader_result(
1054            &mut reader,
1055            &[
1056                new_batch_multi_fields(
1057                    b"k1",
1058                    &[1, 2],
1059                    &[13, 11],
1060                    &[OpType::Put, OpType::Put],
1061                    &[(Some(11), Some(11)), (None, Some(22))],
1062                ),
1063                new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
1064            ],
1065        )
1066        .await;
1067        assert_eq!(1, reader.metrics().num_unselected_rows);
1068        assert_eq!(0, reader.metrics().num_deleted_rows);
1069    }
1070
1071    fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
1072        let mut actual = Vec::new();
1073        let mut metrics = DedupMetrics::default();
1074        for batch in input {
1075            if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1076                actual.push(out);
1077            }
1078        }
1079        if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1080            actual.push(out);
1081        }
1082
1083        assert_eq!(expect, actual);
1084    }
1085
1086    #[test]
1087    fn test_last_non_null_strategy_delete_last() {
1088        let input = [
1089            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1090            new_batch_multi_fields(
1091                b"k1",
1092                &[1, 2],
1093                &[1, 7],
1094                &[OpType::Put, OpType::Put],
1095                &[(Some(1), None), (Some(22), Some(222))],
1096            ),
1097            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1098            new_batch_multi_fields(
1099                b"k2",
1100                &[2, 3],
1101                &[2, 5],
1102                &[OpType::Put, OpType::Delete],
1103                &[(None, None), (Some(13), None)],
1104            ),
1105            new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1106        ];
1107
1108        let mut strategy = LastNonNull::new(true);
1109        check_dedup_strategy(
1110            &input,
1111            &mut strategy,
1112            &[
1113                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1114                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1115                new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
1116            ],
1117        );
1118    }
1119
1120    #[test]
1121    fn test_last_non_null_strategy_delete_one() {
1122        let input = [
1123            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1124            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1125        ];
1126
1127        let mut strategy = LastNonNull::new(true);
1128        check_dedup_strategy(
1129            &input,
1130            &mut strategy,
1131            &[new_batch_multi_fields(
1132                b"k2",
1133                &[1],
1134                &[6],
1135                &[OpType::Put],
1136                &[(Some(11), None)],
1137            )],
1138        );
1139    }
1140
1141    #[test]
1142    fn test_last_non_null_strategy_delete_all() {
1143        let input = [
1144            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1145            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
1146        ];
1147
1148        let mut strategy = LastNonNull::new(true);
1149        check_dedup_strategy(&input, &mut strategy, &[]);
1150    }
1151
1152    #[test]
1153    fn test_last_non_null_strategy_same_batch() {
1154        let input = [
1155            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1156            new_batch_multi_fields(
1157                b"k1",
1158                &[1, 2],
1159                &[1, 7],
1160                &[OpType::Put, OpType::Put],
1161                &[(Some(1), None), (Some(22), Some(222))],
1162            ),
1163            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1164            new_batch_multi_fields(
1165                b"k1",
1166                &[2, 3],
1167                &[2, 5],
1168                &[OpType::Put, OpType::Put],
1169                &[(None, None), (Some(13), None)],
1170            ),
1171            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1172        ];
1173
1174        let mut strategy = LastNonNull::new(true);
1175        check_dedup_strategy(
1176            &input,
1177            &mut strategy,
1178            &[
1179                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1180                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1181                new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
1182            ],
1183        );
1184    }
1185
1186    #[test]
1187    fn test_last_non_null_strategy_delete_middle() {
1188        let input = [
1189            new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1190            new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
1191            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
1192            new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1193            new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
1194            new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
1195            new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1196            new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
1197            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
1198        ];
1199
1200        let mut strategy = LastNonNull::new(true);
1201        check_dedup_strategy(
1202            &input,
1203            &mut strategy,
1204            &[
1205                new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1206                new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1207                new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1208            ],
1209        );
1210    }
1211
1212    #[test]
1213    fn test_last_non_null_iter_on_batch() {
1214        let input = [new_batch_multi_fields(
1215            b"k1",
1216            &[1, 1, 2],
1217            &[13, 12, 13],
1218            &[OpType::Put, OpType::Put, OpType::Put],
1219            &[(None, None), (Some(1), None), (Some(2), Some(22))],
1220        )];
1221        let iter = input.into_iter().map(Ok);
1222        let iter = LastNonNullIter::new(iter);
1223        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1224        let expect = [
1225            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1226            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1227        ];
1228        assert_eq!(&expect, &actual[..]);
1229    }
1230
1231    #[test]
1232    fn test_last_non_null_iter_same_row() {
1233        let input = [
1234            new_batch_multi_fields(
1235                b"k1",
1236                &[1, 1, 1],
1237                &[13, 12, 11],
1238                &[OpType::Put, OpType::Put, OpType::Put],
1239                &[(None, None), (Some(1), None), (Some(11), None)],
1240            ),
1241            new_batch_multi_fields(
1242                b"k1",
1243                &[1, 1],
1244                &[10, 9],
1245                &[OpType::Put, OpType::Put],
1246                &[(None, Some(11)), (Some(21), Some(31))],
1247            ),
1248        ];
1249        let iter = input.into_iter().map(Ok);
1250        let iter = LastNonNullIter::new(iter);
1251        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1252        let expect = [new_batch_multi_fields(
1253            b"k1",
1254            &[1],
1255            &[13],
1256            &[OpType::Put],
1257            &[(Some(1), Some(11))],
1258        )];
1259        assert_eq!(&expect, &actual[..]);
1260    }
1261
1262    #[test]
1263    fn test_last_non_null_iter_multi_batch() {
1264        let input = [
1265            new_batch_multi_fields(
1266                b"k1",
1267                &[1, 1, 2],
1268                &[13, 12, 13],
1269                &[OpType::Put, OpType::Put, OpType::Put],
1270                &[(None, None), (Some(1), None), (Some(2), Some(22))],
1271            ),
1272            new_batch_multi_fields(
1273                b"k1",
1274                &[2, 3],
1275                &[12, 13],
1276                &[OpType::Put, OpType::Delete],
1277                &[(None, Some(12)), (None, None)],
1278            ),
1279            new_batch_multi_fields(
1280                b"k2",
1281                &[1, 1, 2],
1282                &[13, 12, 13],
1283                &[OpType::Put, OpType::Put, OpType::Put],
1284                &[(None, None), (Some(1), None), (Some(2), Some(22))],
1285            ),
1286        ];
1287        let iter = input.into_iter().map(Ok);
1288        let iter = LastNonNullIter::new(iter);
1289        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1290        let expect = [
1291            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1292            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1293            new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
1294            new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1295            new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1296        ];
1297        assert_eq!(&expect, &actual[..]);
1298    }
1299
1300    /// Returns a new [Batch] without fields.
1301    fn new_batch_no_fields(
1302        primary_key: &[u8],
1303        timestamps: &[i64],
1304        sequences: &[u64],
1305        op_types: &[OpType],
1306    ) -> Batch {
1307        let mut builder = BatchBuilder::new(primary_key.to_vec());
1308        builder
1309            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1310                timestamps.iter().copied(),
1311            )))
1312            .unwrap()
1313            .sequences_array(Arc::new(UInt64Array::from_iter_values(
1314                sequences.iter().copied(),
1315            )))
1316            .unwrap()
1317            .op_types_array(Arc::new(UInt8Array::from_iter_values(
1318                op_types.iter().map(|v| *v as u8),
1319            )))
1320            .unwrap();
1321        builder.build().unwrap()
1322    }
1323
1324    #[test]
1325    fn test_last_non_null_iter_no_batch() {
1326        let input = [
1327            new_batch_no_fields(
1328                b"k1",
1329                &[1, 1, 2],
1330                &[13, 12, 13],
1331                &[OpType::Put, OpType::Put, OpType::Put],
1332            ),
1333            new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
1334            new_batch_no_fields(
1335                b"k2",
1336                &[1, 1, 2],
1337                &[13, 12, 13],
1338                &[OpType::Put, OpType::Put, OpType::Put],
1339            ),
1340        ];
1341        let iter = input.into_iter().map(Ok);
1342        let iter = LastNonNullIter::new(iter);
1343        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1344        let expect = [
1345            new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
1346            new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
1347            new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
1348            new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
1349            new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
1350        ];
1351        assert_eq!(&expect, &actual[..]);
1352    }
1353}