1use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use async_trait::async_trait;
23use common_telemetry::debug;
24use common_time::Timestamp;
25use datatypes::data_type::DataType;
26use datatypes::prelude::ScalarVector;
27use datatypes::value::Value;
28use datatypes::vectors::MutableVector;
29
30use crate::error::Result;
31use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
32use crate::read::{Batch, BatchColumn, BatchReader};
33
34pub trait DedupMetricsReport: Send + Sync {
36 fn report(&self, metrics: &mut DedupMetrics);
38}
39
40pub struct DedupReader<R, S> {
43 source: R,
44 strategy: S,
45 metrics: DedupMetrics,
46 metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
48}
49
50impl<R, S> DedupReader<R, S> {
51 pub fn new(
53 source: R,
54 strategy: S,
55 metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
56 ) -> Self {
57 Self {
58 source,
59 strategy,
60 metrics: DedupMetrics::default(),
61 metrics_reporter,
62 }
63 }
64}
65
66impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
67 async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
69 while let Some(batch) = self.source.next_batch().await? {
70 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
71 self.metrics.maybe_report(&self.metrics_reporter);
72 return Ok(Some(batch));
73 }
74 }
75
76 let result = self.strategy.finish(&mut self.metrics)?;
77 self.metrics.maybe_report(&self.metrics_reporter);
78 Ok(result)
79 }
80}
81
82#[async_trait]
83impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
84 async fn next_batch(&mut self) -> Result<Option<Batch>> {
85 self.fetch_next_batch().await
86 }
87}
88
89impl<R, S> Drop for DedupReader<R, S> {
90 fn drop(&mut self) {
91 debug!("Dedup reader finished, metrics: {:?}", self.metrics);
92
93 MERGE_FILTER_ROWS_TOTAL
94 .with_label_values(&["dedup"])
95 .inc_by(self.metrics.num_unselected_rows as u64);
96 MERGE_FILTER_ROWS_TOTAL
97 .with_label_values(&["delete"])
98 .inc_by(self.metrics.num_unselected_rows as u64);
99
100 if let Some(reporter) = &self.metrics_reporter {
102 reporter.report(&mut self.metrics);
103 }
104 }
105}
106
107#[cfg(test)]
108impl<R, S> DedupReader<R, S> {
109 fn metrics(&self) -> &DedupMetrics {
110 &self.metrics
111 }
112}
113
114pub trait DedupStrategy: Send {
116 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
119
120 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
125}
126
127struct BatchLastRow {
129 primary_key: Vec<u8>,
130 timestamp: Timestamp,
132}
133
134pub struct LastRow {
143 prev_batch: Option<BatchLastRow>,
146 filter_deleted: bool,
148}
149
150impl LastRow {
151 pub fn new(filter_deleted: bool) -> Self {
153 Self {
154 prev_batch: None,
155 filter_deleted,
156 }
157 }
158}
159
160impl DedupStrategy for LastRow {
161 fn push_batch(
162 &mut self,
163 mut batch: Batch,
164 metrics: &mut DedupMetrics,
165 ) -> Result<Option<Batch>> {
166 let start = Instant::now();
167
168 if batch.is_empty() {
169 return Ok(None);
170 }
171 debug_assert!(batch.first_timestamp().is_some());
172 let prev_timestamp = match &self.prev_batch {
173 Some(prev_batch) => {
174 if prev_batch.primary_key != batch.primary_key() {
175 None
178 } else {
179 Some(prev_batch.timestamp)
180 }
181 }
182 None => None,
183 };
184 if batch.first_timestamp() == prev_timestamp {
185 metrics.num_unselected_rows += 1;
186 if batch.num_rows() == 1 {
188 metrics.dedup_cost += start.elapsed();
191 return Ok(None);
192 }
193 batch = batch.slice(1, batch.num_rows() - 1);
195 }
196
197 match &mut self.prev_batch {
202 Some(prev) => {
203 prev.primary_key.clone_from(&batch.primary_key);
205 prev.timestamp = batch.last_timestamp().unwrap();
206 }
207 None => {
208 self.prev_batch = Some(BatchLastRow {
209 primary_key: batch.primary_key().to_vec(),
210 timestamp: batch.last_timestamp().unwrap(),
211 })
212 }
213 }
214
215 if self.filter_deleted {
217 filter_deleted_from_batch(&mut batch, metrics)?;
218 }
219
220 metrics.dedup_cost += start.elapsed();
221
222 if batch.is_empty() {
224 Ok(None)
225 } else {
226 Ok(Some(batch))
227 }
228 }
229
230 fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
231 Ok(None)
232 }
233}
234
235fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
237 let num_rows = batch.num_rows();
238 batch.filter_deleted()?;
239 let num_rows_after_filter = batch.num_rows();
240 let num_deleted = num_rows - num_rows_after_filter;
241 metrics.num_deleted_rows += num_deleted;
242 metrics.num_unselected_rows += num_deleted;
243
244 Ok(())
245}
246
247#[derive(Default)]
249pub struct DedupMetrics {
250 pub(crate) num_unselected_rows: usize,
252 pub(crate) num_deleted_rows: usize,
254 pub(crate) dedup_cost: Duration,
256}
257
258impl fmt::Debug for DedupMetrics {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 if self.dedup_cost.is_zero() {
262 return write!(f, "{{}}");
263 }
264
265 write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
266
267 if self.num_unselected_rows > 0 {
268 write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
269 }
270 if self.num_deleted_rows > 0 {
271 write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
272 }
273
274 write!(f, "}}")
275 }
276}
277
278impl DedupMetrics {
279 pub(crate) fn merge(&mut self, other: &DedupMetrics) {
281 let DedupMetrics {
282 num_unselected_rows,
283 num_deleted_rows,
284 dedup_cost,
285 } = other;
286
287 self.num_unselected_rows += *num_unselected_rows;
288 self.num_deleted_rows += *num_deleted_rows;
289 self.dedup_cost += *dedup_cost;
290 }
291
292 pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
294 if self.dedup_cost.as_millis() > 10
295 && let Some(r) = reporter
296 {
297 r.report(self);
298 }
299 }
300}
301
302struct LastFieldsBuilder {
310 filter_deleted: bool,
312 builders: Vec<Box<dyn MutableVector>>,
314 last_fields: Vec<Value>,
317 contains_null: bool,
320 contains_deletion: bool,
322 initialized: bool,
324}
325
326impl LastFieldsBuilder {
327 fn new(filter_deleted: bool) -> Self {
329 Self {
330 filter_deleted,
331 builders: Vec::new(),
332 last_fields: Vec::new(),
333 contains_null: false,
334 contains_deletion: false,
335 initialized: false,
336 }
337 }
338
339 fn maybe_init(&mut self, batch: &Batch) {
341 debug_assert!(!batch.is_empty());
342
343 if self.initialized {
344 return;
346 }
347
348 self.initialized = true;
349
350 if batch.fields().is_empty() {
351 return;
353 }
354
355 let last_idx = batch.num_rows() - 1;
356 let fields = batch.fields();
357 self.contains_deletion =
359 batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
360 if !self.contains_deletion {
362 self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
363 }
364
365 if self.skip_merge() {
366 return;
368 }
369 if self.builders.is_empty() {
370 self.builders = fields
371 .iter()
372 .map(|col| col.data.data_type().create_mutable_vector(1))
373 .collect();
374 }
375 self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
376 }
377
378 fn skip_merge(&self) -> bool {
380 debug_assert!(self.initialized);
381
382 self.contains_deletion || !self.contains_null
384 }
385
386 fn push_first_row(&mut self, batch: &Batch) {
388 debug_assert!(self.initialized);
389 debug_assert!(!batch.is_empty());
390
391 if self.skip_merge() {
392 return;
394 }
395
396 self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
401 if self.contains_deletion {
402 return;
404 }
405
406 let fields = batch.fields();
407 for (idx, value) in self.last_fields.iter_mut().enumerate() {
408 if value.is_null() && !fields[idx].data.is_null(0) {
409 *value = fields[idx].data.get(0);
411 }
412 }
413 self.contains_null = self.last_fields.iter().any(Value::is_null);
415 }
416
417 fn merge_last_non_null(
421 &mut self,
422 buffer: Batch,
423 metrics: &mut DedupMetrics,
424 ) -> Result<Option<Batch>> {
425 debug_assert!(self.initialized);
426
427 let mut output = if self.last_fields.is_empty() {
428 buffer
430 } else {
431 for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
433 builder.push_value_ref(&value.as_value_ref());
435 }
436 let fields = self
437 .builders
438 .iter_mut()
439 .zip(buffer.fields())
440 .map(|(builder, col)| BatchColumn {
441 column_id: col.column_id,
442 data: builder.to_vector(),
443 })
444 .collect();
445
446 if buffer.num_rows() == 1 {
447 buffer.with_fields(fields)?
449 } else {
450 let front = buffer.slice(0, buffer.num_rows() - 1);
452 let last = buffer.slice(buffer.num_rows() - 1, 1);
453 let last = last.with_fields(fields)?;
454 Batch::concat(vec![front, last])?
455 }
456 };
457
458 self.clear();
460
461 if self.filter_deleted {
462 filter_deleted_from_batch(&mut output, metrics)?;
463 }
464
465 if output.is_empty() {
466 Ok(None)
467 } else {
468 Ok(Some(output))
469 }
470 }
471
472 fn clear(&mut self) {
474 self.last_fields.clear();
475 self.contains_null = false;
476 self.contains_deletion = false;
477 self.initialized = false;
478 }
479}
480
481pub(crate) struct LastNonNull {
488 buffer: Option<Batch>,
490 last_fields: LastFieldsBuilder,
492}
493
494impl LastNonNull {
495 pub(crate) fn new(filter_deleted: bool) -> Self {
497 Self {
498 buffer: None,
499 last_fields: LastFieldsBuilder::new(filter_deleted),
500 }
501 }
502}
503
504impl DedupStrategy for LastNonNull {
505 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
506 let start = Instant::now();
507
508 if batch.is_empty() {
509 return Ok(None);
510 }
511
512 let Some(buffer) = self.buffer.as_mut() else {
513 self.buffer = Some(batch);
515 return Ok(None);
516 };
517
518 self.last_fields.maybe_init(buffer);
520
521 if buffer.primary_key() != batch.primary_key() {
522 let buffer = std::mem::replace(buffer, batch);
524 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
525 metrics.dedup_cost += start.elapsed();
526 return Ok(merged);
527 }
528
529 if buffer.last_timestamp() != batch.first_timestamp() {
530 let buffer = std::mem::replace(buffer, batch);
532 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
533 metrics.dedup_cost += start.elapsed();
534 return Ok(merged);
535 }
536
537 metrics.num_unselected_rows += 1;
540 if batch.num_rows() == 1 {
542 self.last_fields.push_first_row(&batch);
543 metrics.dedup_cost += start.elapsed();
544 return Ok(None);
545 }
546
547 let first = batch.slice(0, 1);
550 self.last_fields.push_first_row(&first);
551 let batch = batch.slice(1, batch.num_rows() - 1);
553 let buffer = std::mem::replace(buffer, batch);
554 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
555
556 metrics.dedup_cost += start.elapsed();
557
558 Ok(merged)
559 }
560
561 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
562 let start = Instant::now();
563
564 let Some(buffer) = self.buffer.take() else {
565 return Ok(None);
566 };
567
568 self.last_fields.maybe_init(&buffer);
570
571 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
572
573 metrics.dedup_cost += start.elapsed();
574
575 Ok(merged)
576 }
577}
578
579pub(crate) struct LastNonNullIter<I> {
582 iter: Option<I>,
584 strategy: LastNonNull,
586 metrics: DedupMetrics,
588 current_batch: Option<Batch>,
592 current_index: usize,
595}
596
597impl<I> LastNonNullIter<I> {
598 pub(crate) fn new(iter: I) -> Self {
600 Self {
601 iter: Some(iter),
602 strategy: LastNonNull::new(false),
604 metrics: DedupMetrics::default(),
605 current_batch: None,
606 current_index: 0,
607 }
608 }
609}
610
611impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
612 fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
615 if self.current_batch.is_none() {
616 let Some(iter) = self.iter.as_mut() else {
618 return Ok(None);
620 };
621
622 self.current_batch = iter.next().transpose()?;
623 self.current_index = 0;
624 if self.current_batch.is_none() {
625 self.iter = None;
627 return Ok(None);
628 }
629 }
630
631 if let Some(batch) = &self.current_batch {
632 let n = batch.num_rows();
633 let timestamps = batch.timestamps_native().unwrap();
635 let mut pos = self.current_index;
636 while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
637 pos += 1;
638 }
639 let segment = batch.slice(self.current_index, pos - self.current_index + 1);
640 if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
641 self.current_index = pos + 1;
642 } else {
643 self.current_batch = None;
644 self.current_index = 0;
645 }
646 return Ok(Some(segment));
647 }
648
649 Ok(None)
650 }
651
652 fn next_batch(&mut self) -> Result<Option<Batch>> {
653 while let Some(batch) = self.next_batch_for_merge()? {
654 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
655 return Ok(Some(batch));
656 }
657 }
658
659 self.strategy.finish(&mut self.metrics)
660 }
661}
662
663impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
664 type Item = Result<Batch>;
665
666 fn next(&mut self) -> Option<Self::Item> {
667 self.next_batch().transpose()
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use std::sync::Arc;
674
675 use api::v1::OpType;
676 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
677
678 use super::*;
679 use crate::read::BatchBuilder;
680 use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
681
682 #[tokio::test]
683 async fn test_dedup_reader_no_duplications() {
684 let input = [
685 new_batch(
686 b"k1",
687 &[1, 2],
688 &[11, 12],
689 &[OpType::Put, OpType::Put],
690 &[21, 22],
691 ),
692 new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
693 new_batch(
694 b"k2",
695 &[1, 2],
696 &[111, 112],
697 &[OpType::Put, OpType::Put],
698 &[31, 32],
699 ),
700 ];
701
702 let reader = VecBatchReader::new(&input);
704 let mut reader = DedupReader::new(reader, LastRow::new(true), None);
705 check_reader_result(&mut reader, &input).await;
706 assert_eq!(0, reader.metrics().num_unselected_rows);
707 assert_eq!(0, reader.metrics().num_deleted_rows);
708
709 let reader = VecBatchReader::new(&input);
711 let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
712 check_reader_result(&mut reader, &input).await;
713 assert_eq!(0, reader.metrics().num_unselected_rows);
714 assert_eq!(0, reader.metrics().num_deleted_rows);
715 }
716
717 #[tokio::test]
718 async fn test_dedup_reader_duplications() {
719 let input = [
720 new_batch(
721 b"k1",
722 &[1, 2],
723 &[13, 11],
724 &[OpType::Put, OpType::Put],
725 &[11, 12],
726 ),
727 new_batch(b"k1", &[], &[], &[], &[]),
729 new_batch(
731 b"k1",
732 &[2, 3, 4],
733 &[10, 13, 13],
734 &[OpType::Put, OpType::Put, OpType::Delete],
735 &[2, 13, 14],
736 ),
737 new_batch(
738 b"k2",
739 &[1, 2],
740 &[20, 20],
741 &[OpType::Put, OpType::Delete],
742 &[101, 0],
743 ),
744 new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
745 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
746 new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
749 ];
750 let reader = VecBatchReader::new(&input);
752 let mut reader = DedupReader::new(reader, LastRow::new(true), None);
753 check_reader_result(
754 &mut reader,
755 &[
756 new_batch(
757 b"k1",
758 &[1, 2],
759 &[13, 11],
760 &[OpType::Put, OpType::Put],
761 &[11, 12],
762 ),
763 new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
764 new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
765 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
766 ],
767 )
768 .await;
769 assert_eq!(5, reader.metrics().num_unselected_rows);
770 assert_eq!(2, reader.metrics().num_deleted_rows);
771
772 let reader = VecBatchReader::new(&input);
774 let mut reader = DedupReader::new(reader, LastRow::new(false), None);
775 check_reader_result(
776 &mut reader,
777 &[
778 new_batch(
779 b"k1",
780 &[1, 2],
781 &[13, 11],
782 &[OpType::Put, OpType::Put],
783 &[11, 12],
784 ),
785 new_batch(
786 b"k1",
787 &[3, 4],
788 &[13, 13],
789 &[OpType::Put, OpType::Delete],
790 &[13, 14],
791 ),
792 new_batch(
793 b"k2",
794 &[1, 2],
795 &[20, 20],
796 &[OpType::Put, OpType::Delete],
797 &[101, 0],
798 ),
799 new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
800 ],
801 )
802 .await;
803 assert_eq!(3, reader.metrics().num_unselected_rows);
804 assert_eq!(0, reader.metrics().num_deleted_rows);
805 }
806
807 fn new_batch_multi_fields(
809 primary_key: &[u8],
810 timestamps: &[i64],
811 sequences: &[u64],
812 op_types: &[OpType],
813 fields: &[(Option<u64>, Option<u64>)],
814 ) -> Batch {
815 let mut builder = BatchBuilder::new(primary_key.to_vec());
816 builder
817 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
818 timestamps.iter().copied(),
819 )))
820 .unwrap()
821 .sequences_array(Arc::new(UInt64Array::from_iter_values(
822 sequences.iter().copied(),
823 )))
824 .unwrap()
825 .op_types_array(Arc::new(UInt8Array::from_iter_values(
826 op_types.iter().map(|v| *v as u8),
827 )))
828 .unwrap()
829 .push_field_array(
830 1,
831 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
832 )
833 .unwrap()
834 .push_field_array(
835 2,
836 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
837 )
838 .unwrap();
839 builder.build().unwrap()
840 }
841
842 #[tokio::test]
843 async fn test_last_non_null_merge() {
844 let input = [
845 new_batch_multi_fields(
846 b"k1",
847 &[1, 2],
848 &[13, 11],
849 &[OpType::Put, OpType::Put],
850 &[(Some(11), Some(11)), (None, None)],
851 ),
852 new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
854 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
856 new_batch_multi_fields(
857 b"k1",
858 &[2, 3, 4],
859 &[10, 13, 13],
860 &[OpType::Put, OpType::Put, OpType::Delete],
861 &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
862 ),
863 new_batch_multi_fields(
864 b"k2",
865 &[1, 2],
866 &[20, 20],
867 &[OpType::Put, OpType::Delete],
868 &[(Some(101), Some(101)), (None, None)],
869 ),
870 new_batch_multi_fields(
871 b"k2",
872 &[2],
873 &[19],
874 &[OpType::Put],
875 &[(Some(102), Some(102))],
876 ),
877 new_batch_multi_fields(
878 b"k3",
879 &[2],
880 &[20],
881 &[OpType::Put],
882 &[(Some(202), Some(202))],
883 ),
884 new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
887 ];
888
889 let reader = VecBatchReader::new(&input);
891 let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
892 check_reader_result(
893 &mut reader,
894 &[
895 new_batch_multi_fields(
896 b"k1",
897 &[1, 2],
898 &[13, 11],
899 &[OpType::Put, OpType::Put],
900 &[(Some(11), Some(11)), (Some(12), Some(22))],
901 ),
902 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
903 new_batch_multi_fields(
904 b"k2",
905 &[1],
906 &[20],
907 &[OpType::Put],
908 &[(Some(101), Some(101))],
909 ),
910 new_batch_multi_fields(
911 b"k3",
912 &[2],
913 &[20],
914 &[OpType::Put],
915 &[(Some(202), Some(202))],
916 ),
917 ],
918 )
919 .await;
920 assert_eq!(6, reader.metrics().num_unselected_rows);
921 assert_eq!(2, reader.metrics().num_deleted_rows);
922
923 let reader = VecBatchReader::new(&input);
925 let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
926 check_reader_result(
927 &mut reader,
928 &[
929 new_batch_multi_fields(
930 b"k1",
931 &[1, 2],
932 &[13, 11],
933 &[OpType::Put, OpType::Put],
934 &[(Some(11), Some(11)), (Some(12), Some(22))],
935 ),
936 new_batch_multi_fields(
937 b"k1",
938 &[3, 4],
939 &[13, 13],
940 &[OpType::Put, OpType::Delete],
941 &[(Some(13), None), (None, Some(14))],
942 ),
943 new_batch_multi_fields(
944 b"k2",
945 &[1, 2],
946 &[20, 20],
947 &[OpType::Put, OpType::Delete],
948 &[(Some(101), Some(101)), (None, None)],
949 ),
950 new_batch_multi_fields(
951 b"k3",
952 &[2],
953 &[20],
954 &[OpType::Put],
955 &[(Some(202), Some(202))],
956 ),
957 ],
958 )
959 .await;
960 assert_eq!(4, reader.metrics().num_unselected_rows);
961 assert_eq!(0, reader.metrics().num_deleted_rows);
962 }
963
964 #[tokio::test]
965 async fn test_last_non_null_skip_merge_single() {
966 let input = [new_batch_multi_fields(
967 b"k1",
968 &[1, 2, 3],
969 &[13, 11, 13],
970 &[OpType::Put, OpType::Delete, OpType::Put],
971 &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
972 )];
973
974 let reader = VecBatchReader::new(&input);
975 let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
976 check_reader_result(
977 &mut reader,
978 &[new_batch_multi_fields(
979 b"k1",
980 &[1, 3],
981 &[13, 13],
982 &[OpType::Put, OpType::Put],
983 &[(Some(11), Some(11)), (Some(13), Some(13))],
984 )],
985 )
986 .await;
987 assert_eq!(1, reader.metrics().num_unselected_rows);
988 assert_eq!(1, reader.metrics().num_deleted_rows);
989
990 let reader = VecBatchReader::new(&input);
991 let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
992 check_reader_result(&mut reader, &input).await;
993 assert_eq!(0, reader.metrics().num_unselected_rows);
994 assert_eq!(0, reader.metrics().num_deleted_rows);
995 }
996
997 #[tokio::test]
998 async fn test_last_non_null_skip_merge_no_null() {
999 let input = [
1000 new_batch_multi_fields(
1001 b"k1",
1002 &[1, 2],
1003 &[13, 11],
1004 &[OpType::Put, OpType::Put],
1005 &[(Some(11), Some(11)), (Some(12), Some(12))],
1006 ),
1007 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
1008 new_batch_multi_fields(
1009 b"k1",
1010 &[2, 3],
1011 &[9, 13],
1012 &[OpType::Put, OpType::Put],
1013 &[(Some(32), None), (Some(13), Some(13))],
1014 ),
1015 ];
1016
1017 let reader = VecBatchReader::new(&input);
1018 let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
1019 check_reader_result(
1020 &mut reader,
1021 &[
1022 new_batch_multi_fields(
1023 b"k1",
1024 &[1, 2],
1025 &[13, 11],
1026 &[OpType::Put, OpType::Put],
1027 &[(Some(11), Some(11)), (Some(12), Some(12))],
1028 ),
1029 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
1030 ],
1031 )
1032 .await;
1033 assert_eq!(2, reader.metrics().num_unselected_rows);
1034 assert_eq!(0, reader.metrics().num_deleted_rows);
1035 }
1036
1037 #[tokio::test]
1038 async fn test_last_non_null_merge_null() {
1039 let input = [
1040 new_batch_multi_fields(
1041 b"k1",
1042 &[1, 2],
1043 &[13, 11],
1044 &[OpType::Put, OpType::Put],
1045 &[(Some(11), Some(11)), (None, None)],
1046 ),
1047 new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
1048 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
1049 ];
1050
1051 let reader = VecBatchReader::new(&input);
1052 let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
1053 check_reader_result(
1054 &mut reader,
1055 &[
1056 new_batch_multi_fields(
1057 b"k1",
1058 &[1, 2],
1059 &[13, 11],
1060 &[OpType::Put, OpType::Put],
1061 &[(Some(11), Some(11)), (None, Some(22))],
1062 ),
1063 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
1064 ],
1065 )
1066 .await;
1067 assert_eq!(1, reader.metrics().num_unselected_rows);
1068 assert_eq!(0, reader.metrics().num_deleted_rows);
1069 }
1070
1071 fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
1072 let mut actual = Vec::new();
1073 let mut metrics = DedupMetrics::default();
1074 for batch in input {
1075 if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1076 actual.push(out);
1077 }
1078 }
1079 if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1080 actual.push(out);
1081 }
1082
1083 assert_eq!(expect, actual);
1084 }
1085
1086 #[test]
1087 fn test_last_non_null_strategy_delete_last() {
1088 let input = [
1089 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1090 new_batch_multi_fields(
1091 b"k1",
1092 &[1, 2],
1093 &[1, 7],
1094 &[OpType::Put, OpType::Put],
1095 &[(Some(1), None), (Some(22), Some(222))],
1096 ),
1097 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1098 new_batch_multi_fields(
1099 b"k2",
1100 &[2, 3],
1101 &[2, 5],
1102 &[OpType::Put, OpType::Delete],
1103 &[(None, None), (Some(13), None)],
1104 ),
1105 new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1106 ];
1107
1108 let mut strategy = LastNonNull::new(true);
1109 check_dedup_strategy(
1110 &input,
1111 &mut strategy,
1112 &[
1113 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1114 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1115 new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
1116 ],
1117 );
1118 }
1119
1120 #[test]
1121 fn test_last_non_null_strategy_delete_one() {
1122 let input = [
1123 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1124 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1125 ];
1126
1127 let mut strategy = LastNonNull::new(true);
1128 check_dedup_strategy(
1129 &input,
1130 &mut strategy,
1131 &[new_batch_multi_fields(
1132 b"k2",
1133 &[1],
1134 &[6],
1135 &[OpType::Put],
1136 &[(Some(11), None)],
1137 )],
1138 );
1139 }
1140
1141 #[test]
1142 fn test_last_non_null_strategy_delete_all() {
1143 let input = [
1144 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
1145 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
1146 ];
1147
1148 let mut strategy = LastNonNull::new(true);
1149 check_dedup_strategy(&input, &mut strategy, &[]);
1150 }
1151
1152 #[test]
1153 fn test_last_non_null_strategy_same_batch() {
1154 let input = [
1155 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1156 new_batch_multi_fields(
1157 b"k1",
1158 &[1, 2],
1159 &[1, 7],
1160 &[OpType::Put, OpType::Put],
1161 &[(Some(1), None), (Some(22), Some(222))],
1162 ),
1163 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
1164 new_batch_multi_fields(
1165 b"k1",
1166 &[2, 3],
1167 &[2, 5],
1168 &[OpType::Put, OpType::Put],
1169 &[(None, None), (Some(13), None)],
1170 ),
1171 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1172 ];
1173
1174 let mut strategy = LastNonNull::new(true);
1175 check_dedup_strategy(
1176 &input,
1177 &mut strategy,
1178 &[
1179 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
1180 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
1181 new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
1182 ],
1183 );
1184 }
1185
1186 #[test]
1187 fn test_last_non_null_strategy_delete_middle() {
1188 let input = [
1189 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1190 new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
1191 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
1192 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1193 new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
1194 new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
1195 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1196 new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
1197 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
1198 ];
1199
1200 let mut strategy = LastNonNull::new(true);
1201 check_dedup_strategy(
1202 &input,
1203 &mut strategy,
1204 &[
1205 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
1206 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
1207 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
1208 ],
1209 );
1210 }
1211
1212 #[test]
1213 fn test_last_non_null_iter_on_batch() {
1214 let input = [new_batch_multi_fields(
1215 b"k1",
1216 &[1, 1, 2],
1217 &[13, 12, 13],
1218 &[OpType::Put, OpType::Put, OpType::Put],
1219 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1220 )];
1221 let iter = input.into_iter().map(Ok);
1222 let iter = LastNonNullIter::new(iter);
1223 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1224 let expect = [
1225 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1226 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1227 ];
1228 assert_eq!(&expect, &actual[..]);
1229 }
1230
1231 #[test]
1232 fn test_last_non_null_iter_same_row() {
1233 let input = [
1234 new_batch_multi_fields(
1235 b"k1",
1236 &[1, 1, 1],
1237 &[13, 12, 11],
1238 &[OpType::Put, OpType::Put, OpType::Put],
1239 &[(None, None), (Some(1), None), (Some(11), None)],
1240 ),
1241 new_batch_multi_fields(
1242 b"k1",
1243 &[1, 1],
1244 &[10, 9],
1245 &[OpType::Put, OpType::Put],
1246 &[(None, Some(11)), (Some(21), Some(31))],
1247 ),
1248 ];
1249 let iter = input.into_iter().map(Ok);
1250 let iter = LastNonNullIter::new(iter);
1251 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1252 let expect = [new_batch_multi_fields(
1253 b"k1",
1254 &[1],
1255 &[13],
1256 &[OpType::Put],
1257 &[(Some(1), Some(11))],
1258 )];
1259 assert_eq!(&expect, &actual[..]);
1260 }
1261
1262 #[test]
1263 fn test_last_non_null_iter_multi_batch() {
1264 let input = [
1265 new_batch_multi_fields(
1266 b"k1",
1267 &[1, 1, 2],
1268 &[13, 12, 13],
1269 &[OpType::Put, OpType::Put, OpType::Put],
1270 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1271 ),
1272 new_batch_multi_fields(
1273 b"k1",
1274 &[2, 3],
1275 &[12, 13],
1276 &[OpType::Put, OpType::Delete],
1277 &[(None, Some(12)), (None, None)],
1278 ),
1279 new_batch_multi_fields(
1280 b"k2",
1281 &[1, 1, 2],
1282 &[13, 12, 13],
1283 &[OpType::Put, OpType::Put, OpType::Put],
1284 &[(None, None), (Some(1), None), (Some(2), Some(22))],
1285 ),
1286 ];
1287 let iter = input.into_iter().map(Ok);
1288 let iter = LastNonNullIter::new(iter);
1289 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1290 let expect = [
1291 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1292 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1293 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
1294 new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
1295 new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
1296 ];
1297 assert_eq!(&expect, &actual[..]);
1298 }
1299
1300 fn new_batch_no_fields(
1302 primary_key: &[u8],
1303 timestamps: &[i64],
1304 sequences: &[u64],
1305 op_types: &[OpType],
1306 ) -> Batch {
1307 let mut builder = BatchBuilder::new(primary_key.to_vec());
1308 builder
1309 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1310 timestamps.iter().copied(),
1311 )))
1312 .unwrap()
1313 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1314 sequences.iter().copied(),
1315 )))
1316 .unwrap()
1317 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1318 op_types.iter().map(|v| *v as u8),
1319 )))
1320 .unwrap();
1321 builder.build().unwrap()
1322 }
1323
1324 #[test]
1325 fn test_last_non_null_iter_no_batch() {
1326 let input = [
1327 new_batch_no_fields(
1328 b"k1",
1329 &[1, 1, 2],
1330 &[13, 12, 13],
1331 &[OpType::Put, OpType::Put, OpType::Put],
1332 ),
1333 new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
1334 new_batch_no_fields(
1335 b"k2",
1336 &[1, 1, 2],
1337 &[13, 12, 13],
1338 &[OpType::Put, OpType::Put, OpType::Put],
1339 ),
1340 ];
1341 let iter = input.into_iter().map(Ok);
1342 let iter = LastNonNullIter::new(iter);
1343 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
1344 let expect = [
1345 new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
1346 new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
1347 new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
1348 new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
1349 new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
1350 ];
1351 assert_eq!(&expect, &actual[..]);
1352 }
1353}