1use std::ops::Range;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::v1::OpType;
22use async_stream::try_stream;
23use common_telemetry::debug;
24use datatypes::arrow::array::{
25 Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt8Array, UInt64Array,
26 make_comparator,
27};
28use datatypes::arrow::buffer::BooleanBuffer;
29use datatypes::arrow::compute::kernels::cmp::distinct;
30use datatypes::arrow::compute::kernels::partition::{Partitions, partition};
31use datatypes::arrow::compute::kernels::take::take;
32use datatypes::arrow::compute::{
33 SortOptions, TakeOptions, concat_batches, filter_record_batch, take_record_batch,
34};
35use datatypes::arrow::error::ArrowError;
36use datatypes::arrow::record_batch::RecordBatch;
37use futures::{Stream, TryStreamExt};
38use snafu::ResultExt;
39
40use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
41use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
42use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
43use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
44use crate::sst::parquet::flat_format::{
45 op_type_column_index, primary_key_column_index, time_index_column_index,
46};
47use crate::sst::parquet::format::{FIXED_POS_COLUMN_NUM, PrimaryKeyArray};
48
49pub struct FlatDedupIterator<I, S> {
51 iter: I,
52 strategy: S,
53 metrics: DedupMetrics,
54}
55
56impl<I, S> FlatDedupIterator<I, S> {
57 pub fn new(iter: I, strategy: S) -> Self {
59 Self {
60 iter,
61 strategy,
62 metrics: DedupMetrics::default(),
63 }
64 }
65}
66
67impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> FlatDedupIterator<I, S> {
68 fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
70 while let Some(batch) = self.iter.next().transpose()? {
71 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
72 return Ok(Some(batch));
73 }
74 }
75
76 self.strategy.finish(&mut self.metrics)
77 }
78}
79
80impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Iterator
81 for FlatDedupIterator<I, S>
82{
83 type Item = Result<RecordBatch>;
84
85 fn next(&mut self) -> Option<Self::Item> {
86 self.fetch_next_batch().transpose()
87 }
88}
89
90pub struct FlatDedupReader<I, S> {
92 stream: I,
93 strategy: S,
94 metrics: DedupMetrics,
95 metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
97}
98
99impl<I, S> FlatDedupReader<I, S> {
100 pub fn new(
102 stream: I,
103 strategy: S,
104 metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
105 ) -> Self {
106 Self {
107 stream,
108 strategy,
109 metrics: DedupMetrics::default(),
110 metrics_reporter,
111 }
112 }
113}
114
115impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
116 FlatDedupReader<I, S>
117{
118 async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
120 while let Some(batch) = self.stream.try_next().await? {
121 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
122 self.metrics.maybe_report(&self.metrics_reporter);
123 return Ok(Some(batch));
124 }
125 }
126
127 let result = self.strategy.finish(&mut self.metrics)?;
128 self.metrics.maybe_report(&self.metrics_reporter);
129 Ok(result)
130 }
131
132 pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
134 try_stream! {
135 while let Some(batch) = self.fetch_next_batch().await? {
136 yield batch;
137 }
138 }
139 }
140}
141
142impl<I, S> Drop for FlatDedupReader<I, S> {
143 fn drop(&mut self) {
144 debug!("Flat dedup reader finished, metrics: {:?}", self.metrics);
145
146 MERGE_FILTER_ROWS_TOTAL
147 .with_label_values(&["dedup"])
148 .inc_by(self.metrics.num_unselected_rows as u64);
149 MERGE_FILTER_ROWS_TOTAL
150 .with_label_values(&["delete"])
151 .inc_by(self.metrics.num_deleted_rows as u64);
152
153 if let Some(reporter) = &self.metrics_reporter {
155 reporter.report(&mut self.metrics);
156 }
157 }
158}
159
160pub trait RecordBatchDedupStrategy: Send {
162 fn push_batch(
166 &mut self,
167 batch: RecordBatch,
168 metrics: &mut DedupMetrics,
169 ) -> Result<Option<RecordBatch>>;
170
171 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>>;
176}
177
178pub struct FlatLastRow {
180 prev_batch: Option<BatchLastRow>,
183 filter_deleted: bool,
185}
186
187impl FlatLastRow {
188 pub fn new(filter_deleted: bool) -> Self {
190 Self {
191 prev_batch: None,
192 filter_deleted,
193 }
194 }
195
196 fn dedup_one_batch(batch: RecordBatch) -> Result<RecordBatch> {
198 let num_rows = batch.num_rows();
199 if num_rows < 2 {
200 return Ok(batch);
201 }
202
203 let num_columns = batch.num_columns();
204 let timestamps = batch.column(time_index_column_index(num_columns));
205 let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
207 if mask.count_set_bits() == num_rows - 1 {
208 return Ok(batch);
210 }
211
212 let columns: Vec<_> = [
216 primary_key_column_index(num_columns),
217 time_index_column_index(num_columns),
218 ]
219 .iter()
220 .map(|index| batch.column(*index).clone())
221 .collect();
222 let partitions = partition(&columns).context(ComputeArrowSnafu)?;
223
224 Self::dedup_by_partitions(batch, &partitions)
225 }
226
227 fn dedup_by_partitions(batch: RecordBatch, partitions: &Partitions) -> Result<RecordBatch> {
229 let ranges = partitions.ranges();
230 let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
232 if num_duplications == 0 {
233 return Ok(batch);
235 }
236
237 let take_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
239 take_record_batch(&batch, &take_indices).context(ComputeArrowSnafu)
240 }
241}
242
243impl RecordBatchDedupStrategy for FlatLastRow {
244 fn push_batch(
245 &mut self,
246 batch: RecordBatch,
247 metrics: &mut DedupMetrics,
248 ) -> Result<Option<RecordBatch>> {
249 let start = Instant::now();
250
251 if batch.num_rows() == 0 {
252 return Ok(None);
253 }
254
255 let row_before_dedup = batch.num_rows();
257 let mut batch = Self::dedup_one_batch(batch)?;
258
259 if let Some(prev_batch) = &self.prev_batch {
260 if prev_batch.is_last_row_duplicated(&batch) {
262 batch = batch.slice(1, batch.num_rows() - 1);
264 }
265 }
266 metrics.num_unselected_rows += row_before_dedup - batch.num_rows();
267
268 let Some(batch_last_row) = BatchLastRow::try_new(batch.clone()) else {
269 metrics.dedup_cost += start.elapsed();
273 return Ok(None);
274 };
275
276 self.prev_batch = Some(batch_last_row);
282
283 let result = maybe_filter_deleted(batch, self.filter_deleted, metrics);
285
286 metrics.dedup_cost += start.elapsed();
287
288 result
289 }
290
291 fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
292 Ok(None)
293 }
294}
295
296pub struct FlatLastNonNull {
298 field_column_start: usize,
300 filter_deleted: bool,
302 buffer: Option<BatchLastRow>,
306 contains_delete: bool,
309}
310
311impl RecordBatchDedupStrategy for FlatLastNonNull {
312 fn push_batch(
313 &mut self,
314 batch: RecordBatch,
315 metrics: &mut DedupMetrics,
316 ) -> Result<Option<RecordBatch>> {
317 let start = Instant::now();
318
319 if batch.num_rows() == 0 {
320 return Ok(None);
321 }
322
323 let row_before_dedup = batch.num_rows();
324
325 let Some(buffer) = self.buffer.take() else {
326 let (record_batch, contains_delete) =
329 Self::dedup_one_batch(batch, self.field_column_start, false)?;
330 metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
331 self.buffer = BatchLastRow::try_new(record_batch);
332 self.contains_delete = contains_delete;
333
334 metrics.dedup_cost += start.elapsed();
335 return Ok(None);
336 };
337
338 if !buffer.is_last_row_duplicated(&batch) {
339 let (record_batch, contains_delete) =
344 Self::dedup_one_batch(batch, self.field_column_start, false)?;
345 metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
346 debug_assert!(record_batch.num_rows() > 0);
347 self.buffer = BatchLastRow::try_new(record_batch);
348 self.contains_delete = contains_delete;
349
350 let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
351 metrics.dedup_cost += start.elapsed();
352 return result;
353 }
354
355 let output = if buffer.last_batch.num_rows() > 1 {
358 let dedup_batch = buffer.last_batch.slice(0, buffer.last_batch.num_rows() - 1);
359 debug_assert_eq!(buffer.last_batch.num_rows() - 1, dedup_batch.num_rows());
360
361 maybe_filter_deleted(dedup_batch, self.filter_deleted, metrics)?
362 } else {
363 None
364 };
365 let last_row = buffer.last_batch.slice(buffer.last_batch.num_rows() - 1, 1);
366
367 let schema = batch.schema();
369 let merged = concat_batches(&schema, &[last_row, batch]).context(ComputeArrowSnafu)?;
370 let merged_row_count = merged.num_rows();
371 let (record_batch, contains_delete) =
373 Self::dedup_one_batch(merged, self.field_column_start, self.contains_delete)?;
374 metrics.num_unselected_rows += merged_row_count - record_batch.num_rows();
375 debug_assert!(record_batch.num_rows() > 0);
376 self.buffer = BatchLastRow::try_new(record_batch);
377 self.contains_delete = contains_delete;
378
379 metrics.dedup_cost += start.elapsed();
380
381 Ok(output)
382 }
383
384 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
385 let Some(buffer) = self.buffer.take() else {
386 return Ok(None);
387 };
388
389 let start = Instant::now();
390
391 let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
392
393 metrics.dedup_cost += start.elapsed();
394
395 result
396 }
397}
398
399impl FlatLastNonNull {
400 pub fn new(field_column_start: usize, filter_deleted: bool) -> Self {
402 Self {
403 field_column_start,
404 filter_deleted,
405 buffer: None,
406 contains_delete: false,
407 }
408 }
409
410 fn dedup_one_batch(
413 batch: RecordBatch,
414 field_column_start: usize,
415 prev_batch_contains_delete: bool,
416 ) -> Result<(RecordBatch, bool)> {
417 let op_type_column = batch
419 .column(op_type_column_index(batch.num_columns()))
420 .clone();
421 let op_types = op_type_column
422 .as_any()
423 .downcast_ref::<UInt8Array>()
424 .unwrap();
425 let num_rows = batch.num_rows();
426 if num_rows < 2 {
427 let contains_delete = if num_rows > 0 {
428 op_types.value(0) == OpType::Delete as u8
429 } else {
430 false
431 };
432 return Ok((batch, contains_delete));
433 }
434
435 let num_columns = batch.num_columns();
436 let timestamps = batch.column(time_index_column_index(num_columns));
437 let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
439 if mask.count_set_bits() == num_rows - 1 {
440 let contains_delete = op_types.value(num_rows - 1) == OpType::Delete as u8;
441 return Ok((batch, contains_delete));
443 }
444
445 let columns: Vec<_> = [
449 primary_key_column_index(num_columns),
450 time_index_column_index(num_columns),
451 ]
452 .iter()
453 .map(|index| batch.column(*index).clone())
454 .collect();
455 let partitions = partition(&columns).context(ComputeArrowSnafu)?;
456
457 Self::dedup_by_partitions(
458 batch,
459 &partitions,
460 field_column_start,
461 op_types,
462 prev_batch_contains_delete,
463 )
464 }
465
466 fn dedup_by_partitions(
469 batch: RecordBatch,
470 partitions: &Partitions,
471 field_column_start: usize,
472 op_types: &UInt8Array,
473 first_range_contains_delete: bool,
474 ) -> Result<(RecordBatch, bool)> {
475 let ranges = partitions.ranges();
476 let contains_delete = Self::last_range_has_delete(&ranges, op_types);
477
478 let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
480 if num_duplications == 0 {
481 return Ok((batch, contains_delete));
483 }
484
485 let field_column_end = batch.num_columns() - FIXED_POS_COLUMN_NUM;
486 let take_options = Some(TakeOptions {
487 check_bounds: false,
488 });
489 let non_field_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
491 let new_columns = batch
492 .columns()
493 .iter()
494 .enumerate()
495 .map(|(col_idx, column)| {
496 if col_idx >= field_column_start && col_idx < field_column_end {
497 let field_indices = Self::compute_field_indices(
498 &ranges,
499 column,
500 op_types,
501 first_range_contains_delete,
502 );
503 take(column, &field_indices, take_options.clone()).context(ComputeArrowSnafu)
504 } else {
505 take(column, &non_field_indices, take_options.clone())
506 .context(ComputeArrowSnafu)
507 }
508 })
509 .collect::<Result<Vec<ArrayRef>>>()?;
510
511 let record_batch =
512 RecordBatch::try_new(batch.schema(), new_columns).context(NewRecordBatchSnafu)?;
513 Ok((record_batch, contains_delete))
514 }
515
516 fn compute_field_indices(
521 ranges: &[Range<usize>],
522 field_array: &ArrayRef,
523 op_types: &UInt8Array,
524 first_range_contains_delete: bool,
525 ) -> UInt64Array {
526 ranges
527 .iter()
528 .enumerate()
529 .map(|(range_idx, r)| {
530 let mut value_index = r.start as u64;
531 if range_idx == 0 && first_range_contains_delete {
532 return Some(value_index);
533 }
534
535 for i in r.clone() {
538 if op_types.value(i) == OpType::Delete as u8 {
539 break;
540 }
541 if field_array.is_valid(i) {
542 value_index = i as u64;
543 break;
544 }
545 }
546
547 Some(value_index)
548 })
549 .collect()
550 }
551
552 fn last_range_has_delete(ranges: &[Range<usize>], op_types: &UInt8Array) -> bool {
554 if let Some(last_range) = ranges.last() {
555 last_range
556 .clone()
557 .any(|i| op_types.value(i) == OpType::Delete as u8)
558 } else {
559 false
560 }
561 }
562}
563
564struct BatchLastRow {
566 last_batch: RecordBatch,
569 primary_key: PrimaryKeyArray,
571 timestamp: i64,
573}
574
575impl BatchLastRow {
576 fn try_new(record_batch: RecordBatch) -> Option<Self> {
578 if record_batch.num_rows() > 0 {
579 let num_columns = record_batch.num_columns();
580 let primary_key = record_batch
581 .column(primary_key_column_index(num_columns))
582 .as_any()
583 .downcast_ref::<PrimaryKeyArray>()
584 .unwrap()
585 .clone();
586 let timestamp_array = record_batch.column(time_index_column_index(num_columns));
587 let timestamp = timestamp_value(timestamp_array, timestamp_array.len() - 1);
588
589 Some(Self {
590 last_batch: record_batch,
591 primary_key,
592 timestamp,
593 })
594 } else {
595 None
596 }
597 }
598
599 fn is_last_row_duplicated(&self, batch: &RecordBatch) -> bool {
601 if batch.num_rows() == 0 {
602 return false;
603 }
604
605 let batch_timestamp = timestamp_value(
607 batch.column(time_index_column_index(batch.num_columns())),
608 0,
609 );
610 if batch_timestamp != self.timestamp {
611 return false;
612 }
613
614 let last_key = primary_key_at(&self.primary_key, self.last_batch.num_rows() - 1);
615 let primary_key = batch
616 .column(primary_key_column_index(batch.num_columns()))
617 .as_any()
618 .downcast_ref::<PrimaryKeyArray>()
619 .unwrap();
620 let batch_key = primary_key_at(primary_key, 0);
622
623 last_key == batch_key
624 }
625}
626
627fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
632 let slice_len = v.len() - 1;
633 let v1 = v.slice(0, slice_len);
634 let v2 = v.slice(1, slice_len);
635
636 if !v.data_type().is_nested() {
637 return Ok(distinct(&v1, &v2)?.values().clone());
638 }
639 let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
642 Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
643}
644
645fn maybe_filter_deleted(
647 record_batch: RecordBatch,
648 filter_deleted: bool,
649 metrics: &mut DedupMetrics,
650) -> Result<Option<RecordBatch>> {
651 if !filter_deleted {
652 return Ok(Some(record_batch));
653 }
654 let batch = filter_deleted_from_batch(record_batch, metrics)?;
655 if batch.num_rows() == 0 {
657 return Ok(None);
658 }
659 Ok(Some(batch))
660}
661
662fn filter_deleted_from_batch(
664 batch: RecordBatch,
665 metrics: &mut DedupMetrics,
666) -> Result<RecordBatch> {
667 let num_rows = batch.num_rows();
668 let op_type_column = batch.column(op_type_column_index(batch.num_columns()));
669 let op_types = op_type_column
671 .as_any()
672 .downcast_ref::<UInt8Array>()
673 .unwrap();
674 let has_delete = op_types
675 .values()
676 .iter()
677 .any(|op_type| *op_type != OpType::Put as u8);
678 if !has_delete {
679 return Ok(batch);
680 }
681
682 let mut builder = BooleanBufferBuilder::new(op_types.len());
683 for op_type in op_types.values() {
684 if *op_type == OpType::Delete as u8 {
685 builder.append(false);
686 } else {
687 builder.append(true);
688 }
689 }
690 let predicate = BooleanArray::new(builder.into(), None);
691 let new_batch = filter_record_batch(&batch, &predicate).context(ComputeArrowSnafu)?;
692 let num_deleted = num_rows - new_batch.num_rows();
693 metrics.num_deleted_rows += num_deleted;
694 metrics.num_unselected_rows += num_deleted;
695
696 Ok(new_batch)
697}
698
699fn primary_key_at(primary_key: &PrimaryKeyArray, index: usize) -> &[u8] {
701 let key = primary_key.keys().value(index);
702 let binary_values = primary_key
703 .values()
704 .as_any()
705 .downcast_ref::<BinaryArray>()
706 .unwrap();
707 binary_values.value(key as usize)
708}
709
710pub(crate) fn timestamp_value(array: &ArrayRef, idx: usize) -> i64 {
716 timestamp_array_to_i64_slice(array)[idx]
717}
718
719#[cfg(test)]
720mod tests {
721 use std::sync::Arc;
722
723 use api::v1::OpType;
724 use datatypes::arrow::array::{
725 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
726 TimestampMillisecondArray, UInt8Array, UInt64Array,
727 };
728 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
729 use datatypes::arrow::record_batch::RecordBatch;
730
731 use super::*;
732
733 fn new_record_batch(
735 primary_keys: &[&[u8]],
736 timestamps: &[i64],
737 sequences: &[u64],
738 op_types: &[OpType],
739 fields: &[u64],
740 ) -> RecordBatch {
741 let num_rows = timestamps.len();
742 debug_assert_eq!(sequences.len(), num_rows);
743 debug_assert_eq!(op_types.len(), num_rows);
744 debug_assert_eq!(fields.len(), num_rows);
745 debug_assert_eq!(primary_keys.len(), num_rows);
746
747 let columns: Vec<ArrayRef> = vec![
748 build_test_pk_string_dict_array(primary_keys),
750 Arc::new(Int64Array::from_iter(
752 fields.iter().map(|v| Some(*v as i64)),
753 )),
754 Arc::new(TimestampMillisecondArray::from_iter_values(
756 timestamps.iter().copied(),
757 )),
758 build_test_pk_array(primary_keys),
760 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
762 Arc::new(UInt8Array::from_iter_values(
764 op_types.iter().map(|v| *v as u8),
765 )),
766 ];
767
768 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
769 }
770
771 fn new_record_batch_multi_fields(
773 primary_keys: &[&[u8]],
774 timestamps: &[i64],
775 sequences: &[u64],
776 op_types: &[OpType],
777 fields: &[(Option<u64>, Option<u64>)],
778 ) -> RecordBatch {
779 let num_rows = timestamps.len();
780 debug_assert_eq!(sequences.len(), num_rows);
781 debug_assert_eq!(op_types.len(), num_rows);
782 debug_assert_eq!(fields.len(), num_rows);
783 debug_assert_eq!(primary_keys.len(), num_rows);
784
785 let columns: Vec<ArrayRef> = vec![
786 build_test_pk_string_dict_array(primary_keys),
788 Arc::new(Int64Array::from_iter(
790 fields.iter().map(|field| field.0.map(|v| v as i64)),
791 )),
792 Arc::new(Int64Array::from_iter(
794 fields.iter().map(|field| field.1.map(|v| v as i64)),
795 )),
796 Arc::new(TimestampMillisecondArray::from_iter_values(
798 timestamps.iter().copied(),
799 )),
800 build_test_pk_array(primary_keys),
802 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
804 Arc::new(UInt8Array::from_iter_values(
806 op_types.iter().map(|v| *v as u8),
807 )),
808 ];
809
810 RecordBatch::try_new(build_test_multi_field_schema(), columns).unwrap()
811 }
812
813 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
815 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
816 for &pk in primary_keys {
817 let pk_str = std::str::from_utf8(pk).unwrap();
818 builder.append(pk_str).unwrap();
819 }
820 Arc::new(builder.finish())
821 }
822
823 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
825 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
826 for &pk in primary_keys {
827 builder.append(pk).unwrap();
828 }
829 Arc::new(builder.finish())
830 }
831
832 fn build_test_flat_schema() -> SchemaRef {
834 let fields = vec![
835 Field::new(
836 "k0",
837 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
838 false,
839 ),
840 Field::new("field0", DataType::Int64, true),
841 Field::new(
842 "ts",
843 DataType::Timestamp(TimeUnit::Millisecond, None),
844 false,
845 ),
846 Field::new(
847 "__primary_key",
848 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
849 false,
850 ),
851 Field::new("__sequence", DataType::UInt64, false),
852 Field::new("__op_type", DataType::UInt8, false),
853 ];
854 Arc::new(Schema::new(fields))
855 }
856
857 fn build_test_multi_field_schema() -> SchemaRef {
859 let fields = vec![
860 Field::new(
861 "k0",
862 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
863 false,
864 ),
865 Field::new("field0", DataType::Int64, true),
866 Field::new("field1", DataType::Int64, true),
867 Field::new(
868 "ts",
869 DataType::Timestamp(TimeUnit::Millisecond, None),
870 false,
871 ),
872 Field::new(
873 "__primary_key",
874 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
875 false,
876 ),
877 Field::new("__sequence", DataType::UInt64, false),
878 Field::new("__op_type", DataType::UInt8, false),
879 ];
880 Arc::new(Schema::new(fields))
881 }
882
883 fn check_record_batches_equal(expected: &[RecordBatch], actual: &[RecordBatch]) {
885 for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() {
886 assert_eq!(exp, act, "RecordBatch {} differs", i);
887 }
888 assert_eq!(
889 expected.len(),
890 actual.len(),
891 "Number of batches don't match"
892 );
893 }
894
895 fn collect_iterator_results<I>(iter: I) -> Vec<RecordBatch>
897 where
898 I: Iterator<Item = Result<RecordBatch>>,
899 {
900 iter.map(|result| result.unwrap()).collect()
901 }
902
903 #[test]
904 fn test_flat_last_row_no_duplications() {
905 let input = vec![
906 new_record_batch(
907 &[b"k1", b"k1"],
908 &[1, 2],
909 &[11, 12],
910 &[OpType::Put, OpType::Put],
911 &[21, 22],
912 ),
913 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
914 new_record_batch(
915 &[b"k2", b"k2"],
916 &[1, 2],
917 &[111, 112],
918 &[OpType::Put, OpType::Put],
919 &[31, 32],
920 ),
921 ];
922
923 let iter = input.clone().into_iter().map(Ok);
925 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
926 let result = collect_iterator_results(&mut dedup_iter);
927 check_record_batches_equal(&input, &result);
928 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
929 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
930
931 let iter = input.clone().into_iter().map(Ok);
933 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
934 let result = collect_iterator_results(&mut dedup_iter);
935 check_record_batches_equal(&input, &result);
936 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
937 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
938 }
939
940 #[test]
941 fn test_flat_last_row_duplications() {
942 let input = vec![
943 new_record_batch(
944 &[b"k1", b"k1"],
945 &[1, 2],
946 &[13, 11],
947 &[OpType::Put, OpType::Put],
948 &[11, 12],
949 ),
950 new_record_batch(&[], &[], &[], &[], &[]),
952 new_record_batch(
954 &[b"k1", b"k1", b"k1"],
955 &[2, 3, 4],
956 &[10, 13, 13],
957 &[OpType::Put, OpType::Put, OpType::Delete],
958 &[2, 13, 14],
959 ),
960 new_record_batch(
961 &[b"k2", b"k2"],
962 &[1, 2],
963 &[20, 20],
964 &[OpType::Put, OpType::Delete],
965 &[101, 0],
966 ),
967 new_record_batch(&[b"k2"], &[2], &[19], &[OpType::Put], &[102]),
968 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
969 new_record_batch(&[b"k3"], &[2], &[19], &[OpType::Delete], &[0]),
972 ];
973
974 let expected_filter_deleted = vec![
976 new_record_batch(
977 &[b"k1", b"k1"],
978 &[1, 2],
979 &[13, 11],
980 &[OpType::Put, OpType::Put],
981 &[11, 12],
982 ),
983 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[13]),
984 new_record_batch(&[b"k2"], &[1], &[20], &[OpType::Put], &[101]),
985 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
986 ];
987
988 let iter = input.clone().into_iter().map(Ok);
989 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
990 let result = collect_iterator_results(&mut dedup_iter);
991 check_record_batches_equal(&expected_filter_deleted, &result);
992 assert_eq!(5, dedup_iter.metrics.num_unselected_rows);
993 assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
994
995 let expected_no_filter = vec![
997 new_record_batch(
998 &[b"k1", b"k1"],
999 &[1, 2],
1000 &[13, 11],
1001 &[OpType::Put, OpType::Put],
1002 &[11, 12],
1003 ),
1004 new_record_batch(
1005 &[b"k1", b"k1"],
1006 &[3, 4],
1007 &[13, 13],
1008 &[OpType::Put, OpType::Delete],
1009 &[13, 14],
1010 ),
1011 new_record_batch(
1012 &[b"k2", b"k2"],
1013 &[1, 2],
1014 &[20, 20],
1015 &[OpType::Put, OpType::Delete],
1016 &[101, 0],
1017 ),
1018 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
1019 ];
1020
1021 let iter = input.clone().into_iter().map(Ok);
1022 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
1023 let result = collect_iterator_results(&mut dedup_iter);
1024 check_record_batches_equal(&expected_no_filter, &result);
1025 assert_eq!(3, dedup_iter.metrics.num_unselected_rows);
1026 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1027 }
1028
1029 #[test]
1030 fn test_flat_last_non_null_no_duplications() {
1031 let input = vec![
1032 new_record_batch(
1033 &[b"k1", b"k1"],
1034 &[1, 2],
1035 &[11, 12],
1036 &[OpType::Put, OpType::Put],
1037 &[21, 22],
1038 ),
1039 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
1040 new_record_batch(
1041 &[b"k2", b"k2"],
1042 &[1, 2],
1043 &[111, 112],
1044 &[OpType::Put, OpType::Put],
1045 &[31, 32],
1046 ),
1047 ];
1048
1049 let iter = input.clone().into_iter().map(Ok);
1051 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1052 let result = collect_iterator_results(&mut dedup_iter);
1053 check_record_batches_equal(&input, &result);
1054 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1055 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1056
1057 let iter = input.clone().into_iter().map(Ok);
1059 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1060 let result = collect_iterator_results(&mut dedup_iter);
1061 check_record_batches_equal(&input, &result);
1062 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1063 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1064 }
1065
1066 #[test]
1067 fn test_flat_last_non_null_field_merging() {
1068 let input = vec![
1069 new_record_batch_multi_fields(
1070 &[b"k1", b"k1"],
1071 &[1, 2],
1072 &[13, 11],
1073 &[OpType::Put, OpType::Put],
1074 &[(Some(11), Some(11)), (None, None)],
1075 ),
1076 new_record_batch_multi_fields(&[], &[], &[], &[], &[]),
1078 new_record_batch_multi_fields(
1080 &[b"k1"],
1081 &[2],
1082 &[10],
1083 &[OpType::Put],
1084 &[(Some(12), None)],
1085 ),
1086 new_record_batch_multi_fields(
1087 &[b"k1", b"k1", b"k1"],
1088 &[2, 3, 4],
1089 &[10, 13, 13],
1090 &[OpType::Put, OpType::Put, OpType::Delete],
1091 &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
1092 ),
1093 new_record_batch_multi_fields(
1094 &[b"k2", b"k2"],
1095 &[1, 2],
1096 &[20, 20],
1097 &[OpType::Put, OpType::Delete],
1098 &[(Some(101), Some(101)), (None, None)],
1099 ),
1100 new_record_batch_multi_fields(
1101 &[b"k2"],
1102 &[2],
1103 &[19],
1104 &[OpType::Put],
1105 &[(Some(102), Some(102))],
1106 ),
1107 new_record_batch_multi_fields(
1108 &[b"k3"],
1109 &[2],
1110 &[20],
1111 &[OpType::Put],
1112 &[(Some(202), Some(202))],
1113 ),
1114 new_record_batch_multi_fields(
1117 &[b"k3"],
1118 &[2],
1119 &[19],
1120 &[OpType::Delete],
1121 &[(None, None)],
1122 ),
1123 ];
1124
1125 let expected_filter_deleted = vec![
1127 new_record_batch_multi_fields(
1128 &[b"k1"],
1129 &[1],
1130 &[13],
1131 &[OpType::Put],
1132 &[(Some(11), Some(11))],
1133 ),
1134 new_record_batch_multi_fields(
1135 &[b"k1", b"k1"],
1136 &[2, 3],
1137 &[11, 13],
1138 &[OpType::Put, OpType::Put],
1139 &[(Some(12), Some(22)), (Some(13), None)],
1140 ),
1141 new_record_batch_multi_fields(
1142 &[b"k2"],
1143 &[1],
1144 &[20],
1145 &[OpType::Put],
1146 &[(Some(101), Some(101))],
1147 ),
1148 new_record_batch_multi_fields(
1149 &[b"k3"],
1150 &[2],
1151 &[20],
1152 &[OpType::Put],
1153 &[(Some(202), Some(202))],
1154 ),
1155 ];
1156
1157 let iter = input.clone().into_iter().map(Ok);
1158 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1159 let result = collect_iterator_results(&mut dedup_iter);
1160 check_record_batches_equal(&expected_filter_deleted, &result);
1161 assert_eq!(6, dedup_iter.metrics.num_unselected_rows);
1162 assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
1163
1164 let expected_no_filter = vec![
1166 new_record_batch_multi_fields(
1167 &[b"k1"],
1168 &[1],
1169 &[13],
1170 &[OpType::Put],
1171 &[(Some(11), Some(11))],
1172 ),
1173 new_record_batch_multi_fields(
1174 &[b"k1", b"k1", b"k1"],
1175 &[2, 3, 4],
1176 &[11, 13, 13],
1177 &[OpType::Put, OpType::Put, OpType::Delete],
1178 &[(Some(12), Some(22)), (Some(13), None), (None, Some(14))],
1179 ),
1180 new_record_batch_multi_fields(
1181 &[b"k2"],
1182 &[1],
1183 &[20],
1184 &[OpType::Put],
1185 &[(Some(101), Some(101))],
1186 ),
1187 new_record_batch_multi_fields(
1188 &[b"k2"],
1189 &[2],
1190 &[20],
1191 &[OpType::Delete],
1192 &[(None, None)],
1193 ),
1194 new_record_batch_multi_fields(
1195 &[b"k3"],
1196 &[2],
1197 &[20],
1198 &[OpType::Put],
1199 &[(Some(202), Some(202))],
1200 ),
1201 ];
1202
1203 let iter = input.clone().into_iter().map(Ok);
1204 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1205 let result = collect_iterator_results(&mut dedup_iter);
1206 check_record_batches_equal(&expected_no_filter, &result);
1207 assert_eq!(4, dedup_iter.metrics.num_unselected_rows);
1208 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1209 }
1210
1211 #[test]
1212 fn test_flat_last_non_null_skip_merge_no_null() {
1213 let input = vec![
1214 new_record_batch_multi_fields(
1215 &[b"k1", b"k1"],
1216 &[1, 2],
1217 &[13, 11],
1218 &[OpType::Put, OpType::Put],
1219 &[(Some(11), Some(11)), (Some(12), Some(12))],
1220 ),
1221 new_record_batch_multi_fields(
1222 &[b"k1"],
1223 &[2],
1224 &[10],
1225 &[OpType::Put],
1226 &[(None, Some(22))],
1227 ),
1228 new_record_batch_multi_fields(
1229 &[b"k1", b"k1"],
1230 &[2, 3],
1231 &[9, 13],
1232 &[OpType::Put, OpType::Put],
1233 &[(Some(32), None), (Some(13), Some(13))],
1234 ),
1235 ];
1236
1237 let expected = vec![
1238 new_record_batch_multi_fields(
1239 &[b"k1"],
1240 &[1],
1241 &[13],
1242 &[OpType::Put],
1243 &[(Some(11), Some(11))],
1244 ),
1245 new_record_batch_multi_fields(
1246 &[b"k1", b"k1"],
1247 &[2, 3],
1248 &[11, 13],
1249 &[OpType::Put, OpType::Put],
1250 &[(Some(12), Some(12)), (Some(13), Some(13))],
1251 ),
1252 ];
1253
1254 let iter = input.into_iter().map(Ok);
1255 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1256 let result = collect_iterator_results(&mut dedup_iter);
1257 check_record_batches_equal(&expected, &result);
1258 assert_eq!(2, dedup_iter.metrics.num_unselected_rows);
1259 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1260 }
1261
1262 #[test]
1263 fn test_flat_last_non_null_merge_null() {
1264 let input = vec![
1265 new_record_batch_multi_fields(
1266 &[b"k1", b"k1"],
1267 &[1, 2],
1268 &[13, 11],
1269 &[OpType::Put, OpType::Put],
1270 &[(Some(11), Some(11)), (None, None)],
1271 ),
1272 new_record_batch_multi_fields(
1273 &[b"k1"],
1274 &[2],
1275 &[10],
1276 &[OpType::Put],
1277 &[(None, Some(22))],
1278 ),
1279 new_record_batch_multi_fields(
1280 &[b"k1"],
1281 &[3],
1282 &[13],
1283 &[OpType::Put],
1284 &[(Some(33), None)],
1285 ),
1286 ];
1287
1288 let expected = vec![
1289 new_record_batch_multi_fields(
1290 &[b"k1"],
1291 &[1],
1292 &[13],
1293 &[OpType::Put],
1294 &[(Some(11), Some(11))],
1295 ),
1296 new_record_batch_multi_fields(
1297 &[b"k1"],
1298 &[2],
1299 &[11],
1300 &[OpType::Put],
1301 &[(None, Some(22))],
1302 ),
1303 new_record_batch_multi_fields(
1304 &[b"k1"],
1305 &[3],
1306 &[13],
1307 &[OpType::Put],
1308 &[(Some(33), None)],
1309 ),
1310 ];
1311
1312 let iter = input.into_iter().map(Ok);
1313 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1314 let result = collect_iterator_results(&mut dedup_iter);
1315 check_record_batches_equal(&expected, &result);
1316 assert_eq!(1, dedup_iter.metrics.num_unselected_rows);
1317 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1318 }
1319
1320 fn check_flat_dedup_strategy(
1322 input: &[RecordBatch],
1323 strategy: &mut dyn RecordBatchDedupStrategy,
1324 expect: &[RecordBatch],
1325 ) {
1326 let mut actual = Vec::new();
1327 let mut metrics = DedupMetrics::default();
1328 for batch in input {
1329 if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1330 actual.push(out);
1331 }
1332 }
1333 if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1334 actual.push(out);
1335 }
1336
1337 check_record_batches_equal(expect, &actual);
1338 }
1339
1340 #[test]
1341 fn test_flat_last_non_null_strategy_delete_last() {
1342 let input = vec![
1343 new_record_batch_multi_fields(
1344 &[b"k1"],
1345 &[1],
1346 &[6],
1347 &[OpType::Put],
1348 &[(Some(11), None)],
1349 ),
1350 new_record_batch_multi_fields(
1351 &[b"k1", b"k1"],
1352 &[1, 2],
1353 &[1, 7],
1354 &[OpType::Put, OpType::Put],
1355 &[(Some(1), None), (Some(22), Some(222))],
1356 ),
1357 new_record_batch_multi_fields(
1358 &[b"k1"],
1359 &[2],
1360 &[4],
1361 &[OpType::Put],
1362 &[(Some(12), None)],
1363 ),
1364 new_record_batch_multi_fields(
1365 &[b"k2", b"k2"],
1366 &[2, 3],
1367 &[2, 5],
1368 &[OpType::Put, OpType::Delete],
1369 &[(None, None), (Some(13), None)],
1370 ),
1371 new_record_batch_multi_fields(&[b"k2"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1372 ];
1373
1374 let mut strategy = FlatLastNonNull::new(1, true);
1375 check_flat_dedup_strategy(
1376 &input,
1377 &mut strategy,
1378 &[
1379 new_record_batch_multi_fields(
1380 &[b"k1"],
1381 &[1],
1382 &[6],
1383 &[OpType::Put],
1384 &[(Some(11), None)],
1385 ),
1386 new_record_batch_multi_fields(
1387 &[b"k1"],
1388 &[2],
1389 &[7],
1390 &[OpType::Put],
1391 &[(Some(22), Some(222))],
1392 ),
1393 new_record_batch_multi_fields(
1394 &[b"k2"],
1395 &[2],
1396 &[2],
1397 &[OpType::Put],
1398 &[(None, None)],
1399 ),
1400 ],
1401 );
1402 }
1403
1404 #[test]
1405 fn test_flat_last_non_null_strategy_delete_one() {
1406 let input = vec![
1407 new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1408 new_record_batch_multi_fields(
1409 &[b"k2"],
1410 &[1],
1411 &[6],
1412 &[OpType::Put],
1413 &[(Some(11), None)],
1414 ),
1415 ];
1416
1417 let mut strategy = FlatLastNonNull::new(1, true);
1418 check_flat_dedup_strategy(
1419 &input,
1420 &mut strategy,
1421 &[new_record_batch_multi_fields(
1422 &[b"k2"],
1423 &[1],
1424 &[6],
1425 &[OpType::Put],
1426 &[(Some(11), None)],
1427 )],
1428 );
1429 }
1430
1431 #[test]
1432 fn test_flat_last_non_null_strategy_delete_all() {
1433 let input = vec![
1434 new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1435 new_record_batch_multi_fields(
1436 &[b"k2"],
1437 &[1],
1438 &[6],
1439 &[OpType::Delete],
1440 &[(Some(11), None)],
1441 ),
1442 ];
1443
1444 let mut strategy = FlatLastNonNull::new(1, true);
1445 check_flat_dedup_strategy(&input, &mut strategy, &[]);
1446 }
1447
1448 #[test]
1449 fn test_flat_last_non_null_strategy_same_batch() {
1450 let input = vec![
1451 new_record_batch_multi_fields(
1452 &[b"k1"],
1453 &[1],
1454 &[6],
1455 &[OpType::Put],
1456 &[(Some(11), None)],
1457 ),
1458 new_record_batch_multi_fields(
1459 &[b"k1", b"k1"],
1460 &[1, 2],
1461 &[1, 7],
1462 &[OpType::Put, OpType::Put],
1463 &[(Some(1), None), (Some(22), Some(222))],
1464 ),
1465 new_record_batch_multi_fields(
1466 &[b"k1"],
1467 &[2],
1468 &[4],
1469 &[OpType::Put],
1470 &[(Some(12), None)],
1471 ),
1472 new_record_batch_multi_fields(
1473 &[b"k1", b"k1"],
1474 &[2, 3],
1475 &[2, 5],
1476 &[OpType::Put, OpType::Put],
1477 &[(None, None), (Some(13), None)],
1478 ),
1479 new_record_batch_multi_fields(&[b"k1"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1480 ];
1481
1482 let mut strategy = FlatLastNonNull::new(1, true);
1483 check_flat_dedup_strategy(
1484 &input,
1485 &mut strategy,
1486 &[
1487 new_record_batch_multi_fields(
1488 &[b"k1"],
1489 &[1],
1490 &[6],
1491 &[OpType::Put],
1492 &[(Some(11), None)],
1493 ),
1494 new_record_batch_multi_fields(
1495 &[b"k1"],
1496 &[2],
1497 &[7],
1498 &[OpType::Put],
1499 &[(Some(22), Some(222))],
1500 ),
1501 new_record_batch_multi_fields(
1502 &[b"k1"],
1503 &[3],
1504 &[5],
1505 &[OpType::Put],
1506 &[(Some(13), Some(3))],
1507 ),
1508 ],
1509 );
1510 }
1511
1512 #[test]
1513 fn test_flat_last_non_null_strategy_delete_middle() {
1514 let input = vec![
1515 new_record_batch_multi_fields(
1516 &[b"k1"],
1517 &[1],
1518 &[7],
1519 &[OpType::Put],
1520 &[(Some(11), None)],
1521 ),
1522 new_record_batch_multi_fields(&[b"k1"], &[1], &[4], &[OpType::Delete], &[(None, None)]),
1523 new_record_batch_multi_fields(
1524 &[b"k1"],
1525 &[1],
1526 &[1],
1527 &[OpType::Put],
1528 &[(Some(12), Some(1))],
1529 ),
1530 new_record_batch_multi_fields(
1531 &[b"k1"],
1532 &[2],
1533 &[8],
1534 &[OpType::Put],
1535 &[(Some(21), None)],
1536 ),
1537 new_record_batch_multi_fields(&[b"k1"], &[2], &[5], &[OpType::Delete], &[(None, None)]),
1538 new_record_batch_multi_fields(
1539 &[b"k1"],
1540 &[2],
1541 &[2],
1542 &[OpType::Put],
1543 &[(Some(22), Some(2))],
1544 ),
1545 new_record_batch_multi_fields(
1546 &[b"k1"],
1547 &[3],
1548 &[9],
1549 &[OpType::Put],
1550 &[(Some(31), None)],
1551 ),
1552 new_record_batch_multi_fields(&[b"k1"], &[3], &[6], &[OpType::Delete], &[(None, None)]),
1553 new_record_batch_multi_fields(
1554 &[b"k1"],
1555 &[3],
1556 &[3],
1557 &[OpType::Put],
1558 &[(Some(32), Some(3))],
1559 ),
1560 ];
1561
1562 let mut strategy = FlatLastNonNull::new(1, true);
1563 check_flat_dedup_strategy(
1564 &input,
1565 &mut strategy,
1566 &[
1567 new_record_batch_multi_fields(
1568 &[b"k1"],
1569 &[1],
1570 &[7],
1571 &[OpType::Put],
1572 &[(Some(11), None)],
1573 ),
1574 new_record_batch_multi_fields(
1575 &[b"k1"],
1576 &[2],
1577 &[8],
1578 &[OpType::Put],
1579 &[(Some(21), None)],
1580 ),
1581 new_record_batch_multi_fields(
1582 &[b"k1"],
1583 &[3],
1584 &[9],
1585 &[OpType::Put],
1586 &[(Some(31), None)],
1587 ),
1588 ],
1589 );
1590 }
1591}