1use api::v1::OpType;
18use async_trait::async_trait;
19use common_telemetry::debug;
20use common_time::Timestamp;
21use datatypes::data_type::DataType;
22use datatypes::prelude::ScalarVector;
23use datatypes::value::Value;
24use datatypes::vectors::MutableVector;
25
26use crate::error::Result;
27use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
28use crate::read::{Batch, BatchColumn, BatchReader};
29
30pub(crate) struct DedupReader<R, S> {
33 source: R,
34 strategy: S,
35 metrics: DedupMetrics,
36}
37
38impl<R, S> DedupReader<R, S> {
39 pub(crate) fn new(source: R, strategy: S) -> Self {
41 Self {
42 source,
43 strategy,
44 metrics: DedupMetrics::default(),
45 }
46 }
47}
48
49impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
50 async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
52 while let Some(batch) = self.source.next_batch().await? {
53 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
54 return Ok(Some(batch));
55 }
56 }
57
58 self.strategy.finish(&mut self.metrics)
59 }
60}
61
62#[async_trait]
63impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
64 async fn next_batch(&mut self) -> Result<Option<Batch>> {
65 self.fetch_next_batch().await
66 }
67}
68
69impl<R, S> Drop for DedupReader<R, S> {
70 fn drop(&mut self) {
71 debug!("Dedup reader finished, metrics: {:?}", self.metrics);
72
73 MERGE_FILTER_ROWS_TOTAL
74 .with_label_values(&["dedup"])
75 .inc_by(self.metrics.num_unselected_rows as u64);
76 MERGE_FILTER_ROWS_TOTAL
77 .with_label_values(&["delete"])
78 .inc_by(self.metrics.num_unselected_rows as u64);
79 }
80}
81
82#[cfg(test)]
83impl<R, S> DedupReader<R, S> {
84 fn metrics(&self) -> &DedupMetrics {
85 &self.metrics
86 }
87}
88
89pub(crate) trait DedupStrategy: Send {
91 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
94
95 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
100}
101
102struct BatchLastRow {
104 primary_key: Vec<u8>,
105 timestamp: Timestamp,
107}
108
109pub(crate) struct LastRow {
118 prev_batch: Option<BatchLastRow>,
121 filter_deleted: bool,
123}
124
125impl LastRow {
126 pub(crate) fn new(filter_deleted: bool) -> Self {
128 Self {
129 prev_batch: None,
130 filter_deleted,
131 }
132 }
133}
134
135impl DedupStrategy for LastRow {
136 fn push_batch(
137 &mut self,
138 mut batch: Batch,
139 metrics: &mut DedupMetrics,
140 ) -> Result<Option<Batch>> {
141 if batch.is_empty() {
142 return Ok(None);
143 }
144 debug_assert!(batch.first_timestamp().is_some());
145 let prev_timestamp = match &self.prev_batch {
146 Some(prev_batch) => {
147 if prev_batch.primary_key != batch.primary_key() {
148 None
151 } else {
152 Some(prev_batch.timestamp)
153 }
154 }
155 None => None,
156 };
157 if batch.first_timestamp() == prev_timestamp {
158 metrics.num_unselected_rows += 1;
159 if batch.num_rows() == 1 {
161 return Ok(None);
164 }
165 batch = batch.slice(1, batch.num_rows() - 1);
167 }
168
169 match &mut self.prev_batch {
174 Some(prev) => {
175 prev.primary_key.clone_from(&batch.primary_key);
177 prev.timestamp = batch.last_timestamp().unwrap();
178 }
179 None => {
180 self.prev_batch = Some(BatchLastRow {
181 primary_key: batch.primary_key().to_vec(),
182 timestamp: batch.last_timestamp().unwrap(),
183 })
184 }
185 }
186
187 if self.filter_deleted {
189 filter_deleted_from_batch(&mut batch, metrics)?;
190 }
191
192 if batch.is_empty() {
194 Ok(None)
195 } else {
196 Ok(Some(batch))
197 }
198 }
199
200 fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
201 Ok(None)
202 }
203}
204
205fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
207 let num_rows = batch.num_rows();
208 batch.filter_deleted()?;
209 let num_rows_after_filter = batch.num_rows();
210 let num_deleted = num_rows - num_rows_after_filter;
211 metrics.num_deleted_rows += num_deleted;
212 metrics.num_unselected_rows += num_deleted;
213
214 Ok(())
215}
216
217#[derive(Debug, Default)]
219pub(crate) struct DedupMetrics {
220 pub(crate) num_unselected_rows: usize,
222 pub(crate) num_deleted_rows: usize,
224}
225
226struct LastFieldsBuilder {
234 filter_deleted: bool,
236 builders: Vec<Box<dyn MutableVector>>,
238 last_fields: Vec<Value>,
241 contains_null: bool,
244 contains_deletion: bool,
246 initialized: bool,
248}
249
250impl LastFieldsBuilder {
251 fn new(filter_deleted: bool) -> Self {
253 Self {
254 filter_deleted,
255 builders: Vec::new(),
256 last_fields: Vec::new(),
257 contains_null: false,
258 contains_deletion: false,
259 initialized: false,
260 }
261 }
262
263 fn maybe_init(&mut self, batch: &Batch) {
265 debug_assert!(!batch.is_empty());
266
267 if self.initialized {
268 return;
270 }
271
272 self.initialized = true;
273
274 if batch.fields().is_empty() {
275 return;
277 }
278
279 let last_idx = batch.num_rows() - 1;
280 let fields = batch.fields();
281 self.contains_deletion =
283 batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
284 if !self.contains_deletion {
286 self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
287 }
288
289 if self.skip_merge() {
290 return;
292 }
293 if self.builders.is_empty() {
294 self.builders = fields
295 .iter()
296 .map(|col| col.data.data_type().create_mutable_vector(1))
297 .collect();
298 }
299 self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
300 }
301
302 fn skip_merge(&self) -> bool {
304 debug_assert!(self.initialized);
305
306 self.contains_deletion || !self.contains_null
308 }
309
310 fn push_first_row(&mut self, batch: &Batch) {
312 debug_assert!(self.initialized);
313 debug_assert!(!batch.is_empty());
314
315 if self.skip_merge() {
316 return;
318 }
319
320 self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
325 if self.contains_deletion {
326 return;
328 }
329
330 let fields = batch.fields();
331 for (idx, value) in self.last_fields.iter_mut().enumerate() {
332 if value.is_null() && !fields[idx].data.is_null(0) {
333 *value = fields[idx].data.get(0);
335 }
336 }
337 self.contains_null = self.last_fields.iter().any(Value::is_null);
339 }
340
341 fn merge_last_non_null(
345 &mut self,
346 buffer: Batch,
347 metrics: &mut DedupMetrics,
348 ) -> Result<Option<Batch>> {
349 debug_assert!(self.initialized);
350
351 let mut output = if self.last_fields.is_empty() {
352 buffer
354 } else {
355 for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
357 builder.push_value_ref(value.as_value_ref());
359 }
360 let fields = self
361 .builders
362 .iter_mut()
363 .zip(buffer.fields())
364 .map(|(builder, col)| BatchColumn {
365 column_id: col.column_id,
366 data: builder.to_vector(),
367 })
368 .collect();
369
370 if buffer.num_rows() == 1 {
371 buffer.with_fields(fields)?
373 } else {
374 let front = buffer.slice(0, buffer.num_rows() - 1);
376 let last = buffer.slice(buffer.num_rows() - 1, 1);
377 let last = last.with_fields(fields)?;
378 Batch::concat(vec![front, last])?
379 }
380 };
381
382 self.clear();
384
385 if self.filter_deleted {
386 filter_deleted_from_batch(&mut output, metrics)?;
387 }
388
389 if output.is_empty() {
390 Ok(None)
391 } else {
392 Ok(Some(output))
393 }
394 }
395
396 fn clear(&mut self) {
398 self.last_fields.clear();
399 self.contains_null = false;
400 self.contains_deletion = false;
401 self.initialized = false;
402 }
403}
404
405pub(crate) struct LastNonNull {
412 buffer: Option<Batch>,
414 last_fields: LastFieldsBuilder,
416}
417
418impl LastNonNull {
419 pub(crate) fn new(filter_deleted: bool) -> Self {
421 Self {
422 buffer: None,
423 last_fields: LastFieldsBuilder::new(filter_deleted),
424 }
425 }
426}
427
428impl DedupStrategy for LastNonNull {
429 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
430 if batch.is_empty() {
431 return Ok(None);
432 }
433
434 let Some(buffer) = self.buffer.as_mut() else {
435 self.buffer = Some(batch);
437 return Ok(None);
438 };
439
440 self.last_fields.maybe_init(buffer);
442
443 if buffer.primary_key() != batch.primary_key() {
444 let buffer = std::mem::replace(buffer, batch);
446 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
447 return Ok(merged);
448 }
449
450 if buffer.last_timestamp() != batch.first_timestamp() {
451 let buffer = std::mem::replace(buffer, batch);
453 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
454 return Ok(merged);
455 }
456
457 metrics.num_unselected_rows += 1;
460 if batch.num_rows() == 1 {
462 self.last_fields.push_first_row(&batch);
463 return Ok(None);
464 }
465
466 let first = batch.slice(0, 1);
469 self.last_fields.push_first_row(&first);
470 let batch = batch.slice(1, batch.num_rows() - 1);
472 let buffer = std::mem::replace(buffer, batch);
473 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
474
475 Ok(merged)
476 }
477
478 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
479 let Some(buffer) = self.buffer.take() else {
480 return Ok(None);
481 };
482
483 self.last_fields.maybe_init(&buffer);
485
486 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
487
488 Ok(merged)
489 }
490}
491
492pub(crate) struct LastNonNullIter<I> {
495 iter: Option<I>,
497 strategy: LastNonNull,
499 metrics: DedupMetrics,
501 current_batch: Option<Batch>,
505 current_index: usize,
508}
509
510impl<I> LastNonNullIter<I> {
511 pub(crate) fn new(iter: I) -> Self {
513 Self {
514 iter: Some(iter),
515 strategy: LastNonNull::new(false),
517 metrics: DedupMetrics::default(),
518 current_batch: None,
519 current_index: 0,
520 }
521 }
522}
523
524impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
525 fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
528 if self.current_batch.is_none() {
529 let Some(iter) = self.iter.as_mut() else {
531 return Ok(None);
533 };
534
535 self.current_batch = iter.next().transpose()?;
536 self.current_index = 0;
537 if self.current_batch.is_none() {
538 self.iter = None;
540 return Ok(None);
541 }
542 }
543
544 if let Some(batch) = &self.current_batch {
545 let n = batch.num_rows();
546 let timestamps = batch.timestamps_native().unwrap();
548 let mut pos = self.current_index;
549 while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
550 pos += 1;
551 }
552 let segment = batch.slice(self.current_index, pos - self.current_index + 1);
553 if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
554 self.current_index = pos + 1;
555 } else {
556 self.current_batch = None;
557 self.current_index = 0;
558 }
559 return Ok(Some(segment));
560 }
561
562 Ok(None)
563 }
564
565 fn next_batch(&mut self) -> Result<Option<Batch>> {
566 while let Some(batch) = self.next_batch_for_merge()? {
567 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
568 return Ok(Some(batch));
569 }
570 }
571
572 self.strategy.finish(&mut self.metrics)
573 }
574}
575
576impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
577 type Item = Result<Batch>;
578
579 fn next(&mut self) -> Option<Self::Item> {
580 self.next_batch().transpose()
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use std::sync::Arc;
587
588 use api::v1::OpType;
589 use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
590
591 use super::*;
592 use crate::read::BatchBuilder;
593 use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
594
595 #[tokio::test]
596 async fn test_dedup_reader_no_duplications() {
597 let input = [
598 new_batch(
599 b"k1",
600 &[1, 2],
601 &[11, 12],
602 &[OpType::Put, OpType::Put],
603 &[21, 22],
604 ),
605 new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
606 new_batch(
607 b"k2",
608 &[1, 2],
609 &[111, 112],
610 &[OpType::Put, OpType::Put],
611 &[31, 32],
612 ),
613 ];
614
615 let reader = VecBatchReader::new(&input);
617 let mut reader = DedupReader::new(reader, LastRow::new(true));
618 check_reader_result(&mut reader, &input).await;
619 assert_eq!(0, reader.metrics().num_unselected_rows);
620 assert_eq!(0, reader.metrics().num_deleted_rows);
621
622 let reader = VecBatchReader::new(&input);
624 let mut reader = DedupReader::new(reader, LastNonNull::new(true));
625 check_reader_result(&mut reader, &input).await;
626 assert_eq!(0, reader.metrics().num_unselected_rows);
627 assert_eq!(0, reader.metrics().num_deleted_rows);
628 }
629
630 #[tokio::test]
631 async fn test_dedup_reader_duplications() {
632 let input = [
633 new_batch(
634 b"k1",
635 &[1, 2],
636 &[13, 11],
637 &[OpType::Put, OpType::Put],
638 &[11, 12],
639 ),
640 new_batch(b"k1", &[], &[], &[], &[]),
642 new_batch(
644 b"k1",
645 &[2, 3, 4],
646 &[10, 13, 13],
647 &[OpType::Put, OpType::Put, OpType::Delete],
648 &[2, 13, 14],
649 ),
650 new_batch(
651 b"k2",
652 &[1, 2],
653 &[20, 20],
654 &[OpType::Put, OpType::Delete],
655 &[101, 0],
656 ),
657 new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
658 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
659 new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
662 ];
663 let reader = VecBatchReader::new(&input);
665 let mut reader = DedupReader::new(reader, LastRow::new(true));
666 check_reader_result(
667 &mut reader,
668 &[
669 new_batch(
670 b"k1",
671 &[1, 2],
672 &[13, 11],
673 &[OpType::Put, OpType::Put],
674 &[11, 12],
675 ),
676 new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
677 new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
678 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
679 ],
680 )
681 .await;
682 assert_eq!(5, reader.metrics().num_unselected_rows);
683 assert_eq!(2, reader.metrics().num_deleted_rows);
684
685 let reader = VecBatchReader::new(&input);
687 let mut reader = DedupReader::new(reader, LastRow::new(false));
688 check_reader_result(
689 &mut reader,
690 &[
691 new_batch(
692 b"k1",
693 &[1, 2],
694 &[13, 11],
695 &[OpType::Put, OpType::Put],
696 &[11, 12],
697 ),
698 new_batch(
699 b"k1",
700 &[3, 4],
701 &[13, 13],
702 &[OpType::Put, OpType::Delete],
703 &[13, 14],
704 ),
705 new_batch(
706 b"k2",
707 &[1, 2],
708 &[20, 20],
709 &[OpType::Put, OpType::Delete],
710 &[101, 0],
711 ),
712 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
713 ],
714 )
715 .await;
716 assert_eq!(3, reader.metrics().num_unselected_rows);
717 assert_eq!(0, reader.metrics().num_deleted_rows);
718 }
719
720 fn new_batch_multi_fields(
722 primary_key: &[u8],
723 timestamps: &[i64],
724 sequences: &[u64],
725 op_types: &[OpType],
726 fields: &[(Option<u64>, Option<u64>)],
727 ) -> Batch {
728 let mut builder = BatchBuilder::new(primary_key.to_vec());
729 builder
730 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
731 timestamps.iter().copied(),
732 )))
733 .unwrap()
734 .sequences_array(Arc::new(UInt64Array::from_iter_values(
735 sequences.iter().copied(),
736 )))
737 .unwrap()
738 .op_types_array(Arc::new(UInt8Array::from_iter_values(
739 op_types.iter().map(|v| *v as u8),
740 )))
741 .unwrap()
742 .push_field_array(
743 1,
744 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
745 )
746 .unwrap()
747 .push_field_array(
748 2,
749 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
750 )
751 .unwrap();
752 builder.build().unwrap()
753 }
754
755 #[tokio::test]
756 async fn test_last_non_null_merge() {
757 let input = [
758 new_batch_multi_fields(
759 b"k1",
760 &[1, 2],
761 &[13, 11],
762 &[OpType::Put, OpType::Put],
763 &[(Some(11), Some(11)), (None, None)],
764 ),
765 new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
767 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
769 new_batch_multi_fields(
770 b"k1",
771 &[2, 3, 4],
772 &[10, 13, 13],
773 &[OpType::Put, OpType::Put, OpType::Delete],
774 &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
775 ),
776 new_batch_multi_fields(
777 b"k2",
778 &[1, 2],
779 &[20, 20],
780 &[OpType::Put, OpType::Delete],
781 &[(Some(101), Some(101)), (None, None)],
782 ),
783 new_batch_multi_fields(
784 b"k2",
785 &[2],
786 &[19],
787 &[OpType::Put],
788 &[(Some(102), Some(102))],
789 ),
790 new_batch_multi_fields(
791 b"k3",
792 &[2],
793 &[20],
794 &[OpType::Put],
795 &[(Some(202), Some(202))],
796 ),
797 new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
800 ];
801
802 let reader = VecBatchReader::new(&input);
804 let mut reader = DedupReader::new(reader, LastNonNull::new(true));
805 check_reader_result(
806 &mut reader,
807 &[
808 new_batch_multi_fields(
809 b"k1",
810 &[1, 2],
811 &[13, 11],
812 &[OpType::Put, OpType::Put],
813 &[(Some(11), Some(11)), (Some(12), Some(22))],
814 ),
815 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
816 new_batch_multi_fields(
817 b"k2",
818 &[1],
819 &[20],
820 &[OpType::Put],
821 &[(Some(101), Some(101))],
822 ),
823 new_batch_multi_fields(
824 b"k3",
825 &[2],
826 &[20],
827 &[OpType::Put],
828 &[(Some(202), Some(202))],
829 ),
830 ],
831 )
832 .await;
833 assert_eq!(6, reader.metrics().num_unselected_rows);
834 assert_eq!(2, reader.metrics().num_deleted_rows);
835
836 let reader = VecBatchReader::new(&input);
838 let mut reader = DedupReader::new(reader, LastNonNull::new(false));
839 check_reader_result(
840 &mut reader,
841 &[
842 new_batch_multi_fields(
843 b"k1",
844 &[1, 2],
845 &[13, 11],
846 &[OpType::Put, OpType::Put],
847 &[(Some(11), Some(11)), (Some(12), Some(22))],
848 ),
849 new_batch_multi_fields(
850 b"k1",
851 &[3, 4],
852 &[13, 13],
853 &[OpType::Put, OpType::Delete],
854 &[(Some(13), None), (None, Some(14))],
855 ),
856 new_batch_multi_fields(
857 b"k2",
858 &[1, 2],
859 &[20, 20],
860 &[OpType::Put, OpType::Delete],
861 &[(Some(101), Some(101)), (None, None)],
862 ),
863 new_batch_multi_fields(
864 b"k3",
865 &[2],
866 &[20],
867 &[OpType::Put],
868 &[(Some(202), Some(202))],
869 ),
870 ],
871 )
872 .await;
873 assert_eq!(4, reader.metrics().num_unselected_rows);
874 assert_eq!(0, reader.metrics().num_deleted_rows);
875 }
876
877 #[tokio::test]
878 async fn test_last_non_null_skip_merge_single() {
879 let input = [new_batch_multi_fields(
880 b"k1",
881 &[1, 2, 3],
882 &[13, 11, 13],
883 &[OpType::Put, OpType::Delete, OpType::Put],
884 &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
885 )];
886
887 let reader = VecBatchReader::new(&input);
888 let mut reader = DedupReader::new(reader, LastNonNull::new(true));
889 check_reader_result(
890 &mut reader,
891 &[new_batch_multi_fields(
892 b"k1",
893 &[1, 3],
894 &[13, 13],
895 &[OpType::Put, OpType::Put],
896 &[(Some(11), Some(11)), (Some(13), Some(13))],
897 )],
898 )
899 .await;
900 assert_eq!(1, reader.metrics().num_unselected_rows);
901 assert_eq!(1, reader.metrics().num_deleted_rows);
902
903 let reader = VecBatchReader::new(&input);
904 let mut reader = DedupReader::new(reader, LastNonNull::new(false));
905 check_reader_result(&mut reader, &input).await;
906 assert_eq!(0, reader.metrics().num_unselected_rows);
907 assert_eq!(0, reader.metrics().num_deleted_rows);
908 }
909
910 #[tokio::test]
911 async fn test_last_non_null_skip_merge_no_null() {
912 let input = [
913 new_batch_multi_fields(
914 b"k1",
915 &[1, 2],
916 &[13, 11],
917 &[OpType::Put, OpType::Put],
918 &[(Some(11), Some(11)), (Some(12), Some(12))],
919 ),
920 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
921 new_batch_multi_fields(
922 b"k1",
923 &[2, 3],
924 &[9, 13],
925 &[OpType::Put, OpType::Put],
926 &[(Some(32), None), (Some(13), Some(13))],
927 ),
928 ];
929
930 let reader = VecBatchReader::new(&input);
931 let mut reader = DedupReader::new(reader, LastNonNull::new(true));
932 check_reader_result(
933 &mut reader,
934 &[
935 new_batch_multi_fields(
936 b"k1",
937 &[1, 2],
938 &[13, 11],
939 &[OpType::Put, OpType::Put],
940 &[(Some(11), Some(11)), (Some(12), Some(12))],
941 ),
942 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
943 ],
944 )
945 .await;
946 assert_eq!(2, reader.metrics().num_unselected_rows);
947 assert_eq!(0, reader.metrics().num_deleted_rows);
948 }
949
950 #[tokio::test]
951 async fn test_last_non_null_merge_null() {
952 let input = [
953 new_batch_multi_fields(
954 b"k1",
955 &[1, 2],
956 &[13, 11],
957 &[OpType::Put, OpType::Put],
958 &[(Some(11), Some(11)), (None, None)],
959 ),
960 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
961 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
962 ];
963
964 let reader = VecBatchReader::new(&input);
965 let mut reader = DedupReader::new(reader, LastNonNull::new(true));
966 check_reader_result(
967 &mut reader,
968 &[
969 new_batch_multi_fields(
970 b"k1",
971 &[1, 2],
972 &[13, 11],
973 &[OpType::Put, OpType::Put],
974 &[(Some(11), Some(11)), (None, Some(22))],
975 ),
976 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
977 ],
978 )
979 .await;
980 assert_eq!(1, reader.metrics().num_unselected_rows);
981 assert_eq!(0, reader.metrics().num_deleted_rows);
982 }
983
984 fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
985 let mut actual = Vec::new();
986 let mut metrics = DedupMetrics::default();
987 for batch in input {
988 if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
989 actual.push(out);
990 }
991 }
992 if let Some(out) = strategy.finish(&mut metrics).unwrap() {
993 actual.push(out);
994 }
995
996 assert_eq!(expect, actual);
997 }
998
999 #[test]
1000 fn test_last_non_null_strategy_delete_last() {
1001 let input = [
1002 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1003 new_batch_multi_fields(
1004 b"k1",
1005 &[1, 2],
1006 &[1, 7],
1007 &[OpType::Put, OpType::Put],
1008 &[(Some(1), None), (Some(22), Some(222))],
1009 ),
1010 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1011 new_batch_multi_fields(
1012 b"k2",
1013 &[2, 3],
1014 &[2, 5],
1015 &[OpType::Put, OpType::Delete],
1016 &[(None, None), (Some(13), None)],
1017 ),
1018 new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1019 ];
1020
1021 let mut strategy = LastNonNull::new(true);
1022 check_dedup_strategy(
1023 &input,
1024 &mut strategy,
1025 &[
1026 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1027 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1028 new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
1029 ],
1030 );
1031 }
1032
1033 #[test]
1034 fn test_last_non_null_strategy_delete_one() {
1035 let input = [
1036 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1037 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1038 ];
1039
1040 let mut strategy = LastNonNull::new(true);
1041 check_dedup_strategy(
1042 &input,
1043 &mut strategy,
1044 &[new_batch_multi_fields(
1045 b"k2",
1046 &[1],
1047 &[6],
1048 &[OpType::Put],
1049 &[(Some(11), None)],
1050 )],
1051 );
1052 }
1053
1054 #[test]
1055 fn test_last_non_null_strategy_delete_all() {
1056 let input = [
1057 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1058 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
1059 ];
1060
1061 let mut strategy = LastNonNull::new(true);
1062 check_dedup_strategy(&input, &mut strategy, &[]);
1063 }
1064
1065 #[test]
1066 fn test_last_non_null_strategy_same_batch() {
1067 let input = [
1068 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1069 new_batch_multi_fields(
1070 b"k1",
1071 &[1, 2],
1072 &[1, 7],
1073 &[OpType::Put, OpType::Put],
1074 &[(Some(1), None), (Some(22), Some(222))],
1075 ),
1076 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1077 new_batch_multi_fields(
1078 b"k1",
1079 &[2, 3],
1080 &[2, 5],
1081 &[OpType::Put, OpType::Put],
1082 &[(None, None), (Some(13), None)],
1083 ),
1084 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1085 ];
1086
1087 let mut strategy = LastNonNull::new(true);
1088 check_dedup_strategy(
1089 &input,
1090 &mut strategy,
1091 &[
1092 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1093 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1094 new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
1095 ],
1096 );
1097 }
1098
1099 #[test]
1100 fn test_last_non_null_strategy_delete_middle() {
1101 let input = [
1102 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1103 new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
1104 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
1105 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1106 new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
1107 new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
1108 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1109 new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
1110 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
1111 ];
1112
1113 let mut strategy = LastNonNull::new(true);
1114 check_dedup_strategy(
1115 &input,
1116 &mut strategy,
1117 &[
1118 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1119 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1120 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1121 ],
1122 );
1123 }
1124
1125 #[test]
1126 fn test_last_non_null_iter_on_batch() {
1127 let input = [new_batch_multi_fields(
1128 b"k1",
1129 &[1, 1, 2],
1130 &[13, 12, 13],
1131 &[OpType::Put, OpType::Put, OpType::Put],
1132 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1133 )];
1134 let iter = input.into_iter().map(Ok);
1135 let iter = LastNonNullIter::new(iter);
1136 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1137 let expect = [
1138 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1139 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1140 ];
1141 assert_eq!(&expect, &actual[..]);
1142 }
1143
1144 #[test]
1145 fn test_last_non_null_iter_same_row() {
1146 let input = [
1147 new_batch_multi_fields(
1148 b"k1",
1149 &[1, 1, 1],
1150 &[13, 12, 11],
1151 &[OpType::Put, OpType::Put, OpType::Put],
1152 &[(None, None), (Some(1), None), (Some(11), None)],
1153 ),
1154 new_batch_multi_fields(
1155 b"k1",
1156 &[1, 1],
1157 &[10, 9],
1158 &[OpType::Put, OpType::Put],
1159 &[(None, Some(11)), (Some(21), Some(31))],
1160 ),
1161 ];
1162 let iter = input.into_iter().map(Ok);
1163 let iter = LastNonNullIter::new(iter);
1164 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1165 let expect = [new_batch_multi_fields(
1166 b"k1",
1167 &[1],
1168 &[13],
1169 &[OpType::Put],
1170 &[(Some(1), Some(11))],
1171 )];
1172 assert_eq!(&expect, &actual[..]);
1173 }
1174
1175 #[test]
1176 fn test_last_non_null_iter_multi_batch() {
1177 let input = [
1178 new_batch_multi_fields(
1179 b"k1",
1180 &[1, 1, 2],
1181 &[13, 12, 13],
1182 &[OpType::Put, OpType::Put, OpType::Put],
1183 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1184 ),
1185 new_batch_multi_fields(
1186 b"k1",
1187 &[2, 3],
1188 &[12, 13],
1189 &[OpType::Put, OpType::Delete],
1190 &[(None, Some(12)), (None, None)],
1191 ),
1192 new_batch_multi_fields(
1193 b"k2",
1194 &[1, 1, 2],
1195 &[13, 12, 13],
1196 &[OpType::Put, OpType::Put, OpType::Put],
1197 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1198 ),
1199 ];
1200 let iter = input.into_iter().map(Ok);
1201 let iter = LastNonNullIter::new(iter);
1202 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1203 let expect = [
1204 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1205 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1206 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
1207 new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1208 new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1209 ];
1210 assert_eq!(&expect, &actual[..]);
1211 }
1212
1213 fn new_batch_no_fields(
1215 primary_key: &[u8],
1216 timestamps: &[i64],
1217 sequences: &[u64],
1218 op_types: &[OpType],
1219 ) -> Batch {
1220 let mut builder = BatchBuilder::new(primary_key.to_vec());
1221 builder
1222 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1223 timestamps.iter().copied(),
1224 )))
1225 .unwrap()
1226 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1227 sequences.iter().copied(),
1228 )))
1229 .unwrap()
1230 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1231 op_types.iter().map(|v| *v as u8),
1232 )))
1233 .unwrap();
1234 builder.build().unwrap()
1235 }
1236
1237 #[test]
1238 fn test_last_non_null_iter_no_batch() {
1239 let input = [
1240 new_batch_no_fields(
1241 b"k1",
1242 &[1, 1, 2],
1243 &[13, 12, 13],
1244 &[OpType::Put, OpType::Put, OpType::Put],
1245 ),
1246 new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
1247 new_batch_no_fields(
1248 b"k2",
1249 &[1, 1, 2],
1250 &[13, 12, 13],
1251 &[OpType::Put, OpType::Put, OpType::Put],
1252 ),
1253 ];
1254 let iter = input.into_iter().map(Ok);
1255 let iter = LastNonNullIter::new(iter);
1256 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1257 let expect = [
1258 new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
1259 new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
1260 new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
1261 new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
1262 new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
1263 ];
1264 assert_eq!(&expect, &actual[..]);
1265 }
1266}