1use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use datatypes::data_type::DataType;
23use datatypes::prelude::ScalarVector;
24use datatypes::value::Value;
25use datatypes::vectors::MutableVector;
26
27use crate::error::Result;
28use crate::read::{Batch, BatchColumn};
29
30pub trait DedupMetricsReport: Send + Sync {
32 fn report(&self, metrics: &mut DedupMetrics);
34}
35
36pub trait DedupStrategy: Send {
38 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
41
42 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
47}
48
49fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
51 let num_rows = batch.num_rows();
52 batch.filter_deleted()?;
53 let num_rows_after_filter = batch.num_rows();
54 let num_deleted = num_rows - num_rows_after_filter;
55 metrics.num_deleted_rows += num_deleted;
56 metrics.num_unselected_rows += num_deleted;
57
58 Ok(())
59}
60
61#[derive(Default)]
63pub struct DedupMetrics {
64 pub(crate) num_unselected_rows: usize,
66 pub(crate) num_deleted_rows: usize,
68 pub(crate) dedup_cost: Duration,
70}
71
72impl fmt::Debug for DedupMetrics {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 if self.dedup_cost.is_zero() {
76 return write!(f, "{{}}");
77 }
78
79 write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
80
81 if self.num_unselected_rows > 0 {
82 write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
83 }
84 if self.num_deleted_rows > 0 {
85 write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
86 }
87
88 write!(f, "}}")
89 }
90}
91
92impl DedupMetrics {
93 pub(crate) fn merge(&mut self, other: &DedupMetrics) {
95 let DedupMetrics {
96 num_unselected_rows,
97 num_deleted_rows,
98 dedup_cost,
99 } = other;
100
101 self.num_unselected_rows += *num_unselected_rows;
102 self.num_deleted_rows += *num_deleted_rows;
103 self.dedup_cost += *dedup_cost;
104 }
105
106 pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
108 if self.dedup_cost.as_millis() > 10
109 && let Some(r) = reporter
110 {
111 r.report(self);
112 }
113 }
114}
115
116struct LastFieldsBuilder {
124 filter_deleted: bool,
126 builders: Vec<Box<dyn MutableVector>>,
128 last_fields: Vec<Value>,
131 contains_null: bool,
134 contains_deletion: bool,
136 initialized: bool,
138}
139
140impl LastFieldsBuilder {
141 fn new(filter_deleted: bool) -> Self {
143 Self {
144 filter_deleted,
145 builders: Vec::new(),
146 last_fields: Vec::new(),
147 contains_null: false,
148 contains_deletion: false,
149 initialized: false,
150 }
151 }
152
153 fn maybe_init(&mut self, batch: &Batch) {
155 debug_assert!(!batch.is_empty());
156
157 if self.initialized {
158 return;
160 }
161
162 self.initialized = true;
163
164 if batch.fields().is_empty() {
165 return;
167 }
168
169 let last_idx = batch.num_rows() - 1;
170 let fields = batch.fields();
171 self.contains_deletion =
173 batch.op_types().get_data(last_idx).unwrap() == OpType::Delete as u8;
174 if !self.contains_deletion {
176 self.contains_null = fields.iter().any(|col| col.data.is_null(last_idx));
177 }
178
179 if self.skip_merge() {
180 return;
182 }
183 if self.builders.is_empty() {
184 self.builders = fields
185 .iter()
186 .map(|col| col.data.data_type().create_mutable_vector(1))
187 .collect();
188 }
189 self.last_fields = fields.iter().map(|col| col.data.get(last_idx)).collect();
190 }
191
192 fn skip_merge(&self) -> bool {
194 debug_assert!(self.initialized);
195
196 self.contains_deletion || !self.contains_null
198 }
199
200 fn push_first_row(&mut self, batch: &Batch) {
202 debug_assert!(self.initialized);
203 debug_assert!(!batch.is_empty());
204
205 if self.skip_merge() {
206 return;
208 }
209
210 self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
215 if self.contains_deletion {
216 return;
218 }
219
220 let fields = batch.fields();
221 for (idx, value) in self.last_fields.iter_mut().enumerate() {
222 if value.is_null() && !fields[idx].data.is_null(0) {
223 *value = fields[idx].data.get(0);
225 }
226 }
227 self.contains_null = self.last_fields.iter().any(Value::is_null);
229 }
230
231 fn merge_last_non_null(
235 &mut self,
236 buffer: Batch,
237 metrics: &mut DedupMetrics,
238 ) -> Result<Option<Batch>> {
239 debug_assert!(self.initialized);
240
241 let mut output = if self.last_fields.is_empty() {
242 buffer
244 } else {
245 for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
247 builder.push_value_ref(&value.as_value_ref());
249 }
250 let fields = self
251 .builders
252 .iter_mut()
253 .zip(buffer.fields())
254 .map(|(builder, col)| BatchColumn {
255 column_id: col.column_id,
256 data: builder.to_vector(),
257 })
258 .collect();
259
260 if buffer.num_rows() == 1 {
261 buffer.with_fields(fields)?
263 } else {
264 let front = buffer.slice(0, buffer.num_rows() - 1);
266 let last = buffer.slice(buffer.num_rows() - 1, 1);
267 let last = last.with_fields(fields)?;
268 Batch::concat(vec![front, last])?
269 }
270 };
271
272 self.clear();
274
275 if self.filter_deleted {
276 filter_deleted_from_batch(&mut output, metrics)?;
277 }
278
279 if output.is_empty() {
280 Ok(None)
281 } else {
282 Ok(Some(output))
283 }
284 }
285
286 fn clear(&mut self) {
288 self.last_fields.clear();
289 self.contains_null = false;
290 self.contains_deletion = false;
291 self.initialized = false;
292 }
293}
294
295pub(crate) struct LastNonNull {
302 buffer: Option<Batch>,
304 last_fields: LastFieldsBuilder,
306}
307
308impl LastNonNull {
309 pub(crate) fn new(filter_deleted: bool) -> Self {
311 Self {
312 buffer: None,
313 last_fields: LastFieldsBuilder::new(filter_deleted),
314 }
315 }
316}
317
318impl DedupStrategy for LastNonNull {
319 fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
320 let start = Instant::now();
321
322 if batch.is_empty() {
323 return Ok(None);
324 }
325
326 let Some(buffer) = self.buffer.as_mut() else {
327 self.buffer = Some(batch);
329 return Ok(None);
330 };
331
332 self.last_fields.maybe_init(buffer);
334
335 if buffer.primary_key() != batch.primary_key() {
336 let buffer = std::mem::replace(buffer, batch);
338 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
339 metrics.dedup_cost += start.elapsed();
340 return Ok(merged);
341 }
342
343 if buffer.last_timestamp() != batch.first_timestamp() {
344 let buffer = std::mem::replace(buffer, batch);
346 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
347 metrics.dedup_cost += start.elapsed();
348 return Ok(merged);
349 }
350
351 metrics.num_unselected_rows += 1;
354 if batch.num_rows() == 1 {
356 self.last_fields.push_first_row(&batch);
357 metrics.dedup_cost += start.elapsed();
358 return Ok(None);
359 }
360
361 let first = batch.slice(0, 1);
364 self.last_fields.push_first_row(&first);
365 let batch = batch.slice(1, batch.num_rows() - 1);
367 let buffer = std::mem::replace(buffer, batch);
368 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
369
370 metrics.dedup_cost += start.elapsed();
371
372 Ok(merged)
373 }
374
375 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
376 let start = Instant::now();
377
378 let Some(buffer) = self.buffer.take() else {
379 return Ok(None);
380 };
381
382 self.last_fields.maybe_init(&buffer);
384
385 let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
386
387 metrics.dedup_cost += start.elapsed();
388
389 Ok(merged)
390 }
391}
392
393pub(crate) struct LastNonNullIter<I> {
396 iter: Option<I>,
398 strategy: LastNonNull,
400 metrics: DedupMetrics,
402 current_batch: Option<Batch>,
406 current_index: usize,
409}
410
411impl<I> LastNonNullIter<I> {
412 pub(crate) fn new(iter: I) -> Self {
414 Self {
415 iter: Some(iter),
416 strategy: LastNonNull::new(false),
418 metrics: DedupMetrics::default(),
419 current_batch: None,
420 current_index: 0,
421 }
422 }
423}
424
425impl<I: Iterator<Item = Result<Batch>>> LastNonNullIter<I> {
426 fn next_batch_for_merge(&mut self) -> Result<Option<Batch>> {
429 if self.current_batch.is_none() {
430 let Some(iter) = self.iter.as_mut() else {
432 return Ok(None);
434 };
435
436 self.current_batch = iter.next().transpose()?;
437 self.current_index = 0;
438 if self.current_batch.is_none() {
439 self.iter = None;
441 return Ok(None);
442 }
443 }
444
445 if let Some(batch) = &self.current_batch {
446 let n = batch.num_rows();
447 let timestamps = batch.timestamps_native().unwrap();
449 let mut pos = self.current_index;
450 while pos + 1 < n && timestamps[pos] != timestamps[pos + 1] {
451 pos += 1;
452 }
453 let segment = batch.slice(self.current_index, pos - self.current_index + 1);
454 if pos + 1 < n && timestamps[pos] == timestamps[pos + 1] {
455 self.current_index = pos + 1;
456 } else {
457 self.current_batch = None;
458 self.current_index = 0;
459 }
460 return Ok(Some(segment));
461 }
462
463 Ok(None)
464 }
465
466 fn next_batch(&mut self) -> Result<Option<Batch>> {
467 while let Some(batch) = self.next_batch_for_merge()? {
468 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
469 return Ok(Some(batch));
470 }
471 }
472
473 self.strategy.finish(&mut self.metrics)
474 }
475}
476
477impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
478 type Item = Result<Batch>;
479
480 fn next(&mut self) -> Option<Self::Item> {
481 self.next_batch().transpose()
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use std::sync::Arc;
488
489 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
490
491 use super::*;
492 use crate::read::BatchBuilder;
493
494 fn new_batch_multi_fields(
496 primary_key: &[u8],
497 timestamps: &[i64],
498 sequences: &[u64],
499 op_types: &[OpType],
500 fields: &[(Option<u64>, Option<u64>)],
501 ) -> Batch {
502 let mut builder = BatchBuilder::new(primary_key.to_vec());
503 builder
504 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
505 timestamps.iter().copied(),
506 )))
507 .unwrap()
508 .sequences_array(Arc::new(UInt64Array::from_iter_values(
509 sequences.iter().copied(),
510 )))
511 .unwrap()
512 .op_types_array(Arc::new(UInt8Array::from_iter_values(
513 op_types.iter().map(|v| *v as u8),
514 )))
515 .unwrap()
516 .push_field_array(
517 1,
518 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.0))),
519 )
520 .unwrap()
521 .push_field_array(
522 2,
523 Arc::new(UInt64Array::from_iter(fields.iter().map(|field| field.1))),
524 )
525 .unwrap();
526 builder.build().unwrap()
527 }
528
529 fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
530 let mut actual = Vec::new();
531 let mut metrics = DedupMetrics::default();
532 for batch in input {
533 if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
534 actual.push(out);
535 }
536 }
537 if let Some(out) = strategy.finish(&mut metrics).unwrap() {
538 actual.push(out);
539 }
540
541 assert_eq!(expect, actual);
542 }
543
544 #[test]
545 fn test_last_non_null_strategy_delete_last() {
546 let input = [
547 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
548 new_batch_multi_fields(
549 b"k1",
550 &[1, 2],
551 &[1, 7],
552 &[OpType::Put, OpType::Put],
553 &[(Some(1), None), (Some(22), Some(222))],
554 ),
555 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
556 new_batch_multi_fields(
557 b"k2",
558 &[2, 3],
559 &[2, 5],
560 &[OpType::Put, OpType::Delete],
561 &[(None, None), (Some(13), None)],
562 ),
563 new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
564 ];
565
566 let mut strategy = LastNonNull::new(true);
567 check_dedup_strategy(
568 &input,
569 &mut strategy,
570 &[
571 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
572 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
573 new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
574 ],
575 );
576 }
577
578 #[test]
579 fn test_last_non_null_strategy_delete_one() {
580 let input = [
581 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
582 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
583 ];
584
585 let mut strategy = LastNonNull::new(true);
586 check_dedup_strategy(
587 &input,
588 &mut strategy,
589 &[new_batch_multi_fields(
590 b"k2",
591 &[1],
592 &[6],
593 &[OpType::Put],
594 &[(Some(11), None)],
595 )],
596 );
597 }
598
599 #[test]
600 fn test_last_non_null_strategy_delete_all() {
601 let input = [
602 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
603 new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
604 ];
605
606 let mut strategy = LastNonNull::new(true);
607 check_dedup_strategy(&input, &mut strategy, &[]);
608 }
609
610 #[test]
611 fn test_last_non_null_strategy_same_batch() {
612 let input = [
613 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
614 new_batch_multi_fields(
615 b"k1",
616 &[1, 2],
617 &[1, 7],
618 &[OpType::Put, OpType::Put],
619 &[(Some(1), None), (Some(22), Some(222))],
620 ),
621 new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
622 new_batch_multi_fields(
623 b"k1",
624 &[2, 3],
625 &[2, 5],
626 &[OpType::Put, OpType::Put],
627 &[(None, None), (Some(13), None)],
628 ),
629 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
630 ];
631
632 let mut strategy = LastNonNull::new(true);
633 check_dedup_strategy(
634 &input,
635 &mut strategy,
636 &[
637 new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
638 new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
639 new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
640 ],
641 );
642 }
643
644 #[test]
645 fn test_last_non_null_strategy_delete_middle() {
646 let input = [
647 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
648 new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
649 new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
650 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
651 new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
652 new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
653 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
654 new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
655 new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
656 ];
657
658 let mut strategy = LastNonNull::new(true);
659 check_dedup_strategy(
660 &input,
661 &mut strategy,
662 &[
663 new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
664 new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
665 new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
666 ],
667 );
668 }
669
670 #[test]
671 fn test_last_non_null_iter_on_batch() {
672 let input = [new_batch_multi_fields(
673 b"k1",
674 &[1, 1, 2],
675 &[13, 12, 13],
676 &[OpType::Put, OpType::Put, OpType::Put],
677 &[(None, None), (Some(1), None), (Some(2), Some(22))],
678 )];
679 let iter = input.into_iter().map(Ok);
680 let iter = LastNonNullIter::new(iter);
681 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
682 let expect = [
683 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
684 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
685 ];
686 assert_eq!(&expect, &actual[..]);
687 }
688
689 #[test]
690 fn test_last_non_null_iter_same_row() {
691 let input = [
692 new_batch_multi_fields(
693 b"k1",
694 &[1, 1, 1],
695 &[13, 12, 11],
696 &[OpType::Put, OpType::Put, OpType::Put],
697 &[(None, None), (Some(1), None), (Some(11), None)],
698 ),
699 new_batch_multi_fields(
700 b"k1",
701 &[1, 1],
702 &[10, 9],
703 &[OpType::Put, OpType::Put],
704 &[(None, Some(11)), (Some(21), Some(31))],
705 ),
706 ];
707 let iter = input.into_iter().map(Ok);
708 let iter = LastNonNullIter::new(iter);
709 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
710 let expect = [new_batch_multi_fields(
711 b"k1",
712 &[1],
713 &[13],
714 &[OpType::Put],
715 &[(Some(1), Some(11))],
716 )];
717 assert_eq!(&expect, &actual[..]);
718 }
719
720 #[test]
721 fn test_last_non_null_iter_multi_batch() {
722 let input = [
723 new_batch_multi_fields(
724 b"k1",
725 &[1, 1, 2],
726 &[13, 12, 13],
727 &[OpType::Put, OpType::Put, OpType::Put],
728 &[(None, None), (Some(1), None), (Some(2), Some(22))],
729 ),
730 new_batch_multi_fields(
731 b"k1",
732 &[2, 3],
733 &[12, 13],
734 &[OpType::Put, OpType::Delete],
735 &[(None, Some(12)), (None, None)],
736 ),
737 new_batch_multi_fields(
738 b"k2",
739 &[1, 1, 2],
740 &[13, 12, 13],
741 &[OpType::Put, OpType::Put, OpType::Put],
742 &[(None, None), (Some(1), None), (Some(2), Some(22))],
743 ),
744 ];
745 let iter = input.into_iter().map(Ok);
746 let iter = LastNonNullIter::new(iter);
747 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
748 let expect = [
749 new_batch_multi_fields(b"k1", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
750 new_batch_multi_fields(b"k1", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
751 new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Delete], &[(None, None)]),
752 new_batch_multi_fields(b"k2", &[1], &[13], &[OpType::Put], &[(Some(1), None)]),
753 new_batch_multi_fields(b"k2", &[2], &[13], &[OpType::Put], &[(Some(2), Some(22))]),
754 ];
755 assert_eq!(&expect, &actual[..]);
756 }
757
758 fn new_batch_no_fields(
760 primary_key: &[u8],
761 timestamps: &[i64],
762 sequences: &[u64],
763 op_types: &[OpType],
764 ) -> Batch {
765 let mut builder = BatchBuilder::new(primary_key.to_vec());
766 builder
767 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
768 timestamps.iter().copied(),
769 )))
770 .unwrap()
771 .sequences_array(Arc::new(UInt64Array::from_iter_values(
772 sequences.iter().copied(),
773 )))
774 .unwrap()
775 .op_types_array(Arc::new(UInt8Array::from_iter_values(
776 op_types.iter().map(|v| *v as u8),
777 )))
778 .unwrap();
779 builder.build().unwrap()
780 }
781
782 #[test]
783 fn test_last_non_null_iter_no_batch() {
784 let input = [
785 new_batch_no_fields(
786 b"k1",
787 &[1, 1, 2],
788 &[13, 12, 13],
789 &[OpType::Put, OpType::Put, OpType::Put],
790 ),
791 new_batch_no_fields(b"k1", &[2, 3], &[12, 13], &[OpType::Put, OpType::Delete]),
792 new_batch_no_fields(
793 b"k2",
794 &[1, 1, 2],
795 &[13, 12, 13],
796 &[OpType::Put, OpType::Put, OpType::Put],
797 ),
798 ];
799 let iter = input.into_iter().map(Ok);
800 let iter = LastNonNullIter::new(iter);
801 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
802 let expect = [
803 new_batch_no_fields(b"k1", &[1], &[13], &[OpType::Put]),
804 new_batch_no_fields(b"k1", &[2], &[13], &[OpType::Put]),
805 new_batch_no_fields(b"k1", &[3], &[13], &[OpType::Delete]),
806 new_batch_no_fields(b"k2", &[1], &[13], &[OpType::Put]),
807 new_batch_no_fields(b"k2", &[2], &[13], &[OpType::Put]),
808 ];
809 assert_eq!(&expect, &actual[..]);
810 }
811}