Skip to main content

mito2/read/
dedup.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to remove duplicate rows from a sorted batch.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use datatypes::data_type::DataType;
23use datatypes::prelude::ScalarVector;
24use datatypes::value::Value;
25use datatypes::vectors::MutableVector;
26
27use crate::error::Result;
28use crate::read::{Batch, BatchColumn};
29
30/// Trait for reporting dedup metrics.
31pub trait DedupMetricsReport: Send + Sync {
32    /// Reports and resets the metrics.
33    fn report(&self, metrics: &mut DedupMetrics);
34}
35
36/// Strategy to remove duplicate rows from sorted batches.
37pub trait DedupStrategy: Send {
38    /// Pushes a batch to the dedup strategy.
39    /// Returns the deduplicated batch.
40    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
41
42    /// Finishes the deduplication and resets the strategy.
43    ///
44    /// Users must ensure that `push_batch` is called for all batches before
45    /// calling this method.
46    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
47}
48
49/// Removes deleted rows from the batch and updates metrics.
50fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
51    let num_rows = batch.num_rows();
52    batch.filter_deleted()?;
53    let num_rows_after_filter = batch.num_rows();
54    let num_deleted = num_rows - num_rows_after_filter;
55    metrics.num_deleted_rows += num_deleted;
56    metrics.num_unselected_rows += num_deleted;
57
58    Ok(())
59}
60
61/// Metrics for deduplication.
62#[derive(Default)]
63pub struct DedupMetrics {
64    /// Number of rows removed during deduplication.
65    pub(crate) num_unselected_rows: usize,
66    /// Number of deleted rows.
67    pub(crate) num_deleted_rows: usize,
68    /// Time spent on deduplication.
69    pub(crate) dedup_cost: Duration,
70}
71
72impl fmt::Debug for DedupMetrics {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        // Skip output if dedup_cost is zero
75        if self.dedup_cost.is_zero() {
76            return write!(f, "{{}}");
77        }
78
79        write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
80
81        if self.num_unselected_rows > 0 {
82            write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
83        }
84        if self.num_deleted_rows > 0 {
85            write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
86        }
87
88        write!(f, "}}")
89    }
90}
91
92impl DedupMetrics {
93    /// Merges metrics from another DedupMetrics instance.
94    pub(crate) fn merge(&mut self, other: &DedupMetrics) {
95        let DedupMetrics {
96            num_unselected_rows,
97            num_deleted_rows,
98            dedup_cost,
99        } = other;
100
101        self.num_unselected_rows += *num_unselected_rows;
102        self.num_deleted_rows += *num_deleted_rows;
103        self.dedup_cost += *dedup_cost;
104    }
105
106    /// Reports the metrics if dedup_cost exceeds 10ms and resets them.
107    pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
108        if self.dedup_cost.as_millis() > 10
109            && let Some(r) = reporter
110        {
111            r.report(self);
112        }
113    }
114}
115
116/// Buffer to store fields in the last row to merge.
117///
118/// Usage:
119/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()`
120/// to push the first row of batches that the timestamp is the same as the row in this builder.
121/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and
122/// return the merged batch.
123struct LastFieldsBuilder {
124    /// Filter deleted rows.
125    filter_deleted: bool,
126    /// Fields builders, lazy initialized.
127    builders: Vec<Box<dyn MutableVector>>,
128    /// Last fields to merge, lazy initialized.
129    /// Only initializes this field when `skip_merge()` is false.
130    last_fields: Vec<Value>,
131    /// Whether the last row (including `last_fields`) has null field.
132    /// Only sets this field when `contains_deletion` is false.
133    contains_null: bool,
134    /// Whether the last row has delete op. If true, skips merging fields.
135    contains_deletion: bool,
136    /// Whether the builder is initialized.
137    initialized: bool,
138}
139
140impl LastFieldsBuilder {
141    /// Returns a new builder with the given `filter_deleted` flag.
142    fn new(filter_deleted: bool) -> Self {
143        Self {
144            filter_deleted,
145            builders: Vec::new(),
146            last_fields: Vec::new(),
147            contains_null: false,
148            contains_deletion: false,
149            initialized: false,
150        }
151    }
152
153    /// Initializes the builders with the last row of the batch.
154    fn maybe_init(&mut self, batch: &Batch) {
155        debug_assert!(!batch.is_empty());
156
157        if self.initialized {
158            // Already initialized or no fields to merge.
159            return;
160        }
161
162        self.initialized = true;
163
164        if batch.fields().is_empty() {
165            // No fields to merge.
166            return;
167        }
168
169        let last_idx = batch.num_rows() - 1;
170        let fields = batch.fields();
171        // Safety: The last_idx is valid.
172        self.contains_deletion =
173            batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
174        // If the row has been deleted, then we don't need to merge fields.
175        if !self.contains_deletion {
176            self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
177        }
178
179        if self.skip_merge() {
180            // No null field or the row has been deleted, no need to merge.
181            return;
182        }
183        if self.builders.is_empty() {
184            self.builders = fields
185                .iter()
186                .map(|col| col.data.data_type().create_mutable_vector(1))
187                .collect();
188        }
189        self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
190    }
191
192    /// Returns true if the builder don't need to merge the rows.
193    fn skip_merge(&self) -> bool {
194        debug_assert!(self.initialized);
195
196        // No null field or the row has been deleted, no need to merge.
197        self.contains_deletion || !self.contains_null
198    }
199
200    /// Pushes first row of a batch to the builder.
201    fn push_first_row(&mut self, batch: &Batch) {
202        debug_assert!(self.initialized);
203        debug_assert!(!batch.is_empty());
204
205        if self.skip_merge() {
206            // No remaining null field, skips this batch.
207            return;
208        }
209
210        // Both `maybe_init()` and `push_first_row()` can update the builder. If the delete
211        // op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`.
212        // We must check the batch and update the deletion flag here to prevent
213        // the builder from merging non-null fields in rows that insert before the deleted row.
214        self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
215        if self.contains_deletion {
216            // Deletes this row.
217            return;
218        }
219
220        let fields = batch.fields();
221        for (idx, value) in self.last_fields.iter_mut().enumerate() {
222            if value.is_null() && !fields[idx].data.is_null(0) {
223                // Updates the value.
224                *value = fields[idx].data.get(0);
225            }
226        }
227        // Updates the flag.
228        self.contains_null = self.last_fields.iter().any(Value::is_null);
229    }
230
231    /// Merges last non-null fields, builds a new batch and resets the builder.
232    /// It may overwrites the last row of the `buffer`. The `buffer` is the batch
233    /// that initialized the builder.
234    fn merge_last_non_null(
235        &mut self,
236        buffer: Batch,
237        metrics: &mut DedupMetrics,
238    ) -> Result<Option<Batch>> {
239        debug_assert!(self.initialized);
240
241        let mut output = if self.last_fields.is_empty() {
242            // No need to overwrite the last row.
243            buffer
244        } else {
245            // Builds last fields.
246            for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
247                // Safety: Vectors of the batch has the same type.
248                builder.push_value_ref(&value.as_value_ref());
249            }
250            let fields = self
251                .builders
252                .iter_mut()
253                .zip(buffer.fields())
254                .map(|(builder, col)| BatchColumn {
255                    column_id: col.column_id,
256                    data: builder.to_vector(),
257                })
258                .collect();
259
260            if buffer.num_rows() == 1 {
261                // Replaces the buffer directly if it only has one row.
262                buffer.with_fields(fields)?
263            } else {
264                // Replaces the last row of the buffer.
265                let front = buffer.slice(0, buffer.num_rows() - 1);
266                let last = buffer.slice(buffer.num_rows() - 1, 1);
267                let last = last.with_fields(fields)?;
268                Batch::concat(vec![front, last])?
269            }
270        };
271
272        // Resets itself. `self.builders` is already reset in `to_vector()`.
273        self.clear();
274
275        if self.filter_deleted {
276            filter_deleted_from_batch(&mut output, metrics)?;
277        }
278
279        if output.is_empty() {
280            Ok(None)
281        } else {
282            Ok(Some(output))
283        }
284    }
285
286    /// Clears the builder.
287    fn clear(&mut self) {
288        self.last_fields.clear();
289        self.contains_null = false;
290        self.contains_deletion = false;
291        self.initialized = false;
292    }
293}
294
295/// Dedup strategy that keeps the last non-null field for the same key.
296///
297/// It assumes that batches from files and memtables don't contain duplicate rows
298/// and the merge reader never concatenates batches from different source.
299///
300/// We might implement a new strategy if we need to process files with duplicate rows.
301pub(crate) struct LastNonNull {
302    /// Buffered batch that fields in the last row may be updated.
303    buffer: Option<Batch>,
304    /// Fields that overlaps with the last row of the `buffer`.
305    last_fields: LastFieldsBuilder,
306}
307
308impl LastNonNull {
309    /// Creates a new strategy with the given `filter_deleted` flag.
310    pub(crate) fn new(filter_deleted: bool) -> Self {
311        Self {
312            buffer: None,
313            last_fields: LastFieldsBuilder::new(filter_deleted),
314        }
315    }
316}
317
318impl DedupStrategy for LastNonNull {
319    fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
320        let start = Instant::now();
321
322        if batch.is_empty() {
323            return Ok(None);
324        }
325
326        let Some(buffer) = self.buffer.as_mut() else {
327            // The buffer is empty, store the batch and return. We need to observe the next batch.
328            self.buffer = Some(batch);
329            return Ok(None);
330        };
331
332        // Initializes last fields with the first buffer.
333        self.last_fields.maybe_init(buffer);
334
335        if buffer.primary_key() != batch.primary_key() {
336            // Next key is different.
337            let buffer = std::mem::replace(buffer, batch);
338            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
339            metrics.dedup_cost += start.elapsed();
340            return Ok(merged);
341        }
342
343        if buffer.last_timestamp() != batch.first_timestamp() {
344            // The next batch has a different timestamp.
345            let buffer = std::mem::replace(buffer, batch);
346            let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
347            metrics.dedup_cost += start.elapsed();
348            return Ok(merged);
349        }
350
351        // The next batch has the same key and timestamp.
352
353        metrics.num_unselected_rows += 1;
354        // We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
355        if batch.num_rows() == 1 {
356            self.last_fields.push_first_row(&batch);
357            metrics.dedup_cost += start.elapsed();
358            return Ok(None);
359        }
360
361        // The next batch has the same key and timestamp but contains multiple rows.
362        // We can merge the first row and buffer the remaining rows.
363        let first = batch.slice(0, 1);
364        self.last_fields.push_first_row(&first);
365        // Moves the remaining rows to the buffer.
366        let batch = batch.slice(1, batch.num_rows() - 1);
367        let buffer = std::mem::replace(buffer, batch);
368        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
369
370        metrics.dedup_cost += start.elapsed();
371
372        Ok(merged)
373    }
374
375    fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
376        let start = Instant::now();
377
378        let Some(buffer) = self.buffer.take() else {
379            return Ok(None);
380        };
381
382        // Initializes last fields with the first buffer.
383        self.last_fields.maybe_init(&buffer);
384
385        let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
386
387        metrics.dedup_cost += start.elapsed();
388
389        Ok(merged)
390    }
391}
392
393/// An iterator that dedup rows by [LastNonNull] strategy.
394/// The input iterator must returns sorted batches.
395pub(crate) struct LastNonNullIter<I> {
396    /// Inner iterator that returns sorted batches.
397    iter: Option<I>,
398    /// Dedup strategy.
399    strategy: LastNonNull,
400    /// Dedup metrics.
401    metrics: DedupMetrics,
402    /// The current batch returned by the iterator. If it is None, we need to
403    /// fetch a new batch.
404    /// The batch is always not empty.
405    current_batch: Option<Batch>,
406    /// The index of the current row in the current batch.
407    /// more to check issue #5229.
408    current_index: usize,
409}
410
411impl<I> LastNonNullIter<I> {
412    /// Creates a new iterator with the given inner iterator.
413    pub(crate) fn new(iter: I) -> Self {
414        Self {
415            iter: Some(iter),
416            // We only use the iter in memtables. Memtables never filter deleted.
417            strategy: LastNonNull::new(false),
418            metrics: DedupMetrics::default(),
419            current_batch: None,
420            current_index: 0,
421        }
422    }
423}
424
425impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
426    /// Fetches the next batch from the inner iterator. It will slice the batch if it
427    /// contains duplicate rows.
428    fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
429        if self.current_batch.is_none() {
430            // No current batch. Fetches a new batch from the inner iterator.
431            let Some(iter) = self.iter.as_mut() else {
432                // The iterator is exhausted.
433                return Ok(None);
434            };
435
436            self.current_batch = iter.next().transpose()?;
437            self.current_index = 0;
438            if self.current_batch.is_none() {
439                // The iterator is exhausted.
440                self.iter = None;
441                return Ok(None);
442            }
443        }
444
445        if let Some(batch) = &self.current_batch {
446            let n = batch.num_rows();
447            // Safety: The batch is not empty when accessed.
448            let timestamps = batch.timestamps_native().unwrap();
449            let mut pos = self.current_index;
450            while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
451                pos += 1;
452            }
453            let segment = batch.slice(self.current_index, pos - self.current_index + 1);
454            if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
455                self.current_index = pos + 1;
456            } else {
457                self.current_batch = None;
458                self.current_index = 0;
459            }
460            return Ok(Some(segment));
461        }
462
463        Ok(None)
464    }
465
466    fn next_batch(&mut self) -> Result<Option<Batch>> {
467        while let Some(batch) = self.next_batch_for_merge()? {
468            if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
469                return Ok(Some(batch));
470            }
471        }
472
473        self.strategy.finish(&mut self.metrics)
474    }
475}
476
477impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
478    type Item = Result<Batch>;
479
480    fn next(&mut self) -> Option<Self::Item> {
481        self.next_batch().transpose()
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use std::sync::Arc;
488
489    use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
490
491    use super::*;
492    use crate::read::BatchBuilder;
493
494    /// Returns a new [Batch] whose field has column id 1, 2.
495    fn new_batch_multi_fields(
496        primary_key: &[u8],
497        timestamps: &[i64],
498        sequences: &[u64],
499        op_types: &[OpType],
500        fields: &[(Option<u64>, Option<u64>)],
501    ) -> Batch {
502        let mut builder = BatchBuilder::new(primary_key.to_vec());
503        builder
504            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
505                timestamps.iter().copied(),
506            )))
507            .unwrap()
508            .sequences_array(Arc::new(UInt64Array::from_iter_values(
509                sequences.iter().copied(),
510            )))
511            .unwrap()
512            .op_types_array(Arc::new(UInt8Array::from_iter_values(
513                op_types.iter().map(|v| *v as u8),
514            )))
515            .unwrap()
516            .push_field_array(
517                1,
518                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
519            )
520            .unwrap()
521            .push_field_array(
522                2,
523                Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
524            )
525            .unwrap();
526        builder.build().unwrap()
527    }
528
529    fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
530        let mut actual = Vec::new();
531        let mut metrics = DedupMetrics::default();
532        for batch in input {
533            if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
534                actual.push(out);
535            }
536        }
537        if let Some(out) = strategy.finish(&mut metrics).unwrap() {
538            actual.push(out);
539        }
540
541        assert_eq!(expect, actual);
542    }
543
544    #[test]
545    fn test_last_non_null_strategy_delete_last() {
546        let input = [
547            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
548            new_batch_multi_fields(
549                b"k1",
550                &[1, 2],
551                &[1, 7],
552                &[OpType::Put, OpType::Put],
553                &[(Some(1), None), (Some(22), Some(222))],
554            ),
555            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
556            new_batch_multi_fields(
557                b"k2",
558                &[2, 3],
559                &[2, 5],
560                &[OpType::Put, OpType::Delete],
561                &[(None, None), (Some(13), None)],
562            ),
563            new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
564        ];
565
566        let mut strategy = LastNonNull::new(true);
567        check_dedup_strategy(
568            &input,
569            &mut strategy,
570            &[
571                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
572                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
573                new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
574            ],
575        );
576    }
577
578    #[test]
579    fn test_last_non_null_strategy_delete_one() {
580        let input = [
581            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
582            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
583        ];
584
585        let mut strategy = LastNonNull::new(true);
586        check_dedup_strategy(
587            &input,
588            &mut strategy,
589            &[new_batch_multi_fields(
590                b"k2",
591                &[1],
592                &[6],
593                &[OpType::Put],
594                &[(Some(11), None)],
595            )],
596        );
597    }
598
599    #[test]
600    fn test_last_non_null_strategy_delete_all() {
601        let input = [
602            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
603            new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
604        ];
605
606        let mut strategy = LastNonNull::new(true);
607        check_dedup_strategy(&input, &mut strategy, &[]);
608    }
609
610    #[test]
611    fn test_last_non_null_strategy_same_batch() {
612        let input = [
613            new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
614            new_batch_multi_fields(
615                b"k1",
616                &[1, 2],
617                &[1, 7],
618                &[OpType::Put, OpType::Put],
619                &[(Some(1), None), (Some(22), Some(222))],
620            ),
621            new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
622            new_batch_multi_fields(
623                b"k1",
624                &[2, 3],
625                &[2, 5],
626                &[OpType::Put, OpType::Put],
627                &[(None, None), (Some(13), None)],
628            ),
629            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
630        ];
631
632        let mut strategy = LastNonNull::new(true);
633        check_dedup_strategy(
634            &input,
635            &mut strategy,
636            &[
637                new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
638                new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
639                new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
640            ],
641        );
642    }
643
644    #[test]
645    fn test_last_non_null_strategy_delete_middle() {
646        let input = [
647            new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
648            new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
649            new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
650            new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
651            new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
652            new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
653            new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
654            new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
655            new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
656        ];
657
658        let mut strategy = LastNonNull::new(true);
659        check_dedup_strategy(
660            &input,
661            &mut strategy,
662            &[
663                new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
664                new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
665                new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
666            ],
667        );
668    }
669
670    #[test]
671    fn test_last_non_null_iter_on_batch() {
672        let input = [new_batch_multi_fields(
673            b"k1",
674            &[1, 1, 2],
675            &[13, 12, 13],
676            &[OpType::Put, OpType::Put, OpType::Put],
677            &[(None, None), (Some(1), None), (Some(2), Some(22))],
678        )];
679        let iter = input.into_iter().map(Ok);
680        let iter = LastNonNullIter::new(iter);
681        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
682        let expect = [
683            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
684            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
685        ];
686        assert_eq!(&expect, &actual[..]);
687    }
688
689    #[test]
690    fn test_last_non_null_iter_same_row() {
691        let input = [
692            new_batch_multi_fields(
693                b"k1",
694                &[1, 1, 1],
695                &[13, 12, 11],
696                &[OpType::Put, OpType::Put, OpType::Put],
697                &[(None, None), (Some(1), None), (Some(11), None)],
698            ),
699            new_batch_multi_fields(
700                b"k1",
701                &[1, 1],
702                &[10, 9],
703                &[OpType::Put, OpType::Put],
704                &[(None, Some(11)), (Some(21), Some(31))],
705            ),
706        ];
707        let iter = input.into_iter().map(Ok);
708        let iter = LastNonNullIter::new(iter);
709        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
710        let expect = [new_batch_multi_fields(
711            b"k1",
712            &[1],
713            &[13],
714            &[OpType::Put],
715            &[(Some(1), Some(11))],
716        )];
717        assert_eq!(&expect, &actual[..]);
718    }
719
720    #[test]
721    fn test_last_non_null_iter_multi_batch() {
722        let input = [
723            new_batch_multi_fields(
724                b"k1",
725                &[1, 1, 2],
726                &[13, 12, 13],
727                &[OpType::Put, OpType::Put, OpType::Put],
728                &[(None, None), (Some(1), None), (Some(2), Some(22))],
729            ),
730            new_batch_multi_fields(
731                b"k1",
732                &[2, 3],
733                &[12, 13],
734                &[OpType::Put, OpType::Delete],
735                &[(None, Some(12)), (None, None)],
736            ),
737            new_batch_multi_fields(
738                b"k2",
739                &[1, 1, 2],
740                &[13, 12, 13],
741                &[OpType::Put, OpType::Put, OpType::Put],
742                &[(None, None), (Some(1), None), (Some(2), Some(22))],
743            ),
744        ];
745        let iter = input.into_iter().map(Ok);
746        let iter = LastNonNullIter::new(iter);
747        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
748        let expect = [
749            new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
750            new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
751            new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
752            new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
753            new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
754        ];
755        assert_eq!(&expect, &actual[..]);
756    }
757
758    /// Returns a new [Batch] without fields.
759    fn new_batch_no_fields(
760        primary_key: &[u8],
761        timestamps: &[i64],
762        sequences: &[u64],
763        op_types: &[OpType],
764    ) -> Batch {
765        let mut builder = BatchBuilder::new(primary_key.to_vec());
766        builder
767            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
768                timestamps.iter().copied(),
769            )))
770            .unwrap()
771            .sequences_array(Arc::new(UInt64Array::from_iter_values(
772                sequences.iter().copied(),
773            )))
774            .unwrap()
775            .op_types_array(Arc::new(UInt8Array::from_iter_values(
776                op_types.iter().map(|v| *v as u8),
777            )))
778            .unwrap();
779        builder.build().unwrap()
780    }
781
782    #[test]
783    fn test_last_non_null_iter_no_batch() {
784        let input = [
785            new_batch_no_fields(
786                b"k1",
787                &[1, 1, 2],
788                &[13, 12, 13],
789                &[OpType::Put, OpType::Put, OpType::Put],
790            ),
791            new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
792            new_batch_no_fields(
793                b"k2",
794                &[1, 1, 2],
795                &[13, 12, 13],
796                &[OpType::Put, OpType::Put, OpType::Put],
797            ),
798        ];
799        let iter = input.into_iter().map(Ok);
800        let iter = LastNonNullIter::new(iter);
801        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
802        let expect = [
803            new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
804            new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
805            new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
806            new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
807            new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
808        ];
809        assert_eq!(&expect, &actual[..]);
810    }
811}