1use std::ops::Range;
18
19use api::v1::OpType;
20use async_stream::try_stream;
21use datatypes::arrow::array::{
22 make_comparator, Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt64Array,
23 UInt8Array,
24};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::compute::kernels::cmp::distinct;
27use datatypes::arrow::compute::kernels::partition::{partition, Partitions};
28use datatypes::arrow::compute::kernels::take::take;
29use datatypes::arrow::compute::{
30 concat_batches, filter_record_batch, take_record_batch, SortOptions, TakeOptions,
31};
32use datatypes::arrow::error::ArrowError;
33use datatypes::arrow::record_batch::RecordBatch;
34use futures::{Stream, TryStreamExt};
35use snafu::ResultExt;
36
37use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
38use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
39use crate::read::dedup::DedupMetrics;
40use crate::sst::parquet::flat_format::{
41 op_type_column_index, primary_key_column_index, time_index_column_index,
42};
43use crate::sst::parquet::format::{PrimaryKeyArray, FIXED_POS_COLUMN_NUM};
44
45pub struct FlatDedupIterator<I, S> {
47 iter: I,
48 strategy: S,
49 metrics: DedupMetrics,
50}
51
52impl<I, S> FlatDedupIterator<I, S> {
53 pub fn new(iter: I, strategy: S) -> Self {
55 Self {
56 iter,
57 strategy,
58 metrics: DedupMetrics::default(),
59 }
60 }
61}
62
63impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> FlatDedupIterator<I, S> {
64 fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
66 while let Some(batch) = self.iter.next().transpose()? {
67 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
68 return Ok(Some(batch));
69 }
70 }
71
72 self.strategy.finish(&mut self.metrics)
73 }
74}
75
76impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Iterator
77 for FlatDedupIterator<I, S>
78{
79 type Item = Result<RecordBatch>;
80
81 fn next(&mut self) -> Option<Self::Item> {
82 self.fetch_next_batch().transpose()
83 }
84}
85
86pub struct FlatDedupReader<I, S> {
88 stream: I,
89 strategy: S,
90 metrics: DedupMetrics,
91}
92
93impl<I, S> FlatDedupReader<I, S> {
94 pub fn new(stream: I, strategy: S) -> Self {
96 Self {
97 stream,
98 strategy,
99 metrics: DedupMetrics::default(),
100 }
101 }
102}
103
104impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
105 FlatDedupReader<I, S>
106{
107 async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
109 while let Some(batch) = self.stream.try_next().await? {
110 if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
111 return Ok(Some(batch));
112 }
113 }
114
115 self.strategy.finish(&mut self.metrics)
116 }
117
118 pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
120 try_stream! {
121 while let Some(batch) = self.fetch_next_batch().await? {
122 yield batch;
123 }
124 }
125 }
126}
127
128pub trait RecordBatchDedupStrategy: Send {
130 fn push_batch(
134 &mut self,
135 batch: RecordBatch,
136 metrics: &mut DedupMetrics,
137 ) -> Result<Option<RecordBatch>>;
138
139 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>>;
144}
145
146pub struct FlatLastRow {
148 prev_batch: Option<BatchLastRow>,
151 filter_deleted: bool,
153}
154
155impl FlatLastRow {
156 pub fn new(filter_deleted: bool) -> Self {
158 Self {
159 prev_batch: None,
160 filter_deleted,
161 }
162 }
163
164 fn dedup_one_batch(batch: RecordBatch) -> Result<RecordBatch> {
166 let num_rows = batch.num_rows();
167 if num_rows < 2 {
168 return Ok(batch);
169 }
170
171 let num_columns = batch.num_columns();
172 let timestamps = batch.column(time_index_column_index(num_columns));
173 let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
175 if mask.count_set_bits() == num_rows - 1 {
176 return Ok(batch);
178 }
179
180 let columns: Vec<_> = [
184 primary_key_column_index(num_columns),
185 time_index_column_index(num_columns),
186 ]
187 .iter()
188 .map(|index| batch.column(*index).clone())
189 .collect();
190 let partitions = partition(&columns).context(ComputeArrowSnafu)?;
191
192 Self::dedup_by_partitions(batch, &partitions)
193 }
194
195 fn dedup_by_partitions(batch: RecordBatch, partitions: &Partitions) -> Result<RecordBatch> {
197 let ranges = partitions.ranges();
198 let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
200 if num_duplications == 0 {
201 return Ok(batch);
203 }
204
205 let take_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
207 take_record_batch(&batch, &take_indices).context(ComputeArrowSnafu)
208 }
209}
210
211impl RecordBatchDedupStrategy for FlatLastRow {
212 fn push_batch(
213 &mut self,
214 batch: RecordBatch,
215 metrics: &mut DedupMetrics,
216 ) -> Result<Option<RecordBatch>> {
217 if batch.num_rows() == 0 {
218 return Ok(None);
219 }
220
221 let row_before_dedup = batch.num_rows();
223 let mut batch = Self::dedup_one_batch(batch)?;
224
225 if let Some(prev_batch) = &self.prev_batch {
226 if prev_batch.is_last_row_duplicated(&batch) {
228 batch = batch.slice(1, batch.num_rows() - 1);
230 }
231 }
232 metrics.num_unselected_rows += row_before_dedup - batch.num_rows();
233
234 let Some(batch_last_row) = BatchLastRow::try_new(batch.clone()) else {
235 return Ok(None);
239 };
240
241 self.prev_batch = Some(batch_last_row);
247
248 maybe_filter_deleted(batch, self.filter_deleted, metrics)
250 }
251
252 fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
253 Ok(None)
254 }
255}
256
257pub struct FlatLastNonNull {
259 field_column_start: usize,
261 filter_deleted: bool,
263 buffer: Option<BatchLastRow>,
267 contains_delete: bool,
270}
271
272impl RecordBatchDedupStrategy for FlatLastNonNull {
273 fn push_batch(
274 &mut self,
275 batch: RecordBatch,
276 metrics: &mut DedupMetrics,
277 ) -> Result<Option<RecordBatch>> {
278 if batch.num_rows() == 0 {
279 return Ok(None);
280 }
281
282 let row_before_dedup = batch.num_rows();
283
284 let Some(buffer) = self.buffer.take() else {
285 let (record_batch, contains_delete) =
288 Self::dedup_one_batch(batch, self.field_column_start, false)?;
289 metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
290 self.buffer = BatchLastRow::try_new(record_batch);
291 self.contains_delete = contains_delete;
292
293 return Ok(None);
294 };
295
296 if !buffer.is_last_row_duplicated(&batch) {
297 let (record_batch, contains_delete) =
302 Self::dedup_one_batch(batch, self.field_column_start, false)?;
303 metrics.num_unselected_rows += row_before_dedup - record_batch.num_rows();
304 debug_assert!(record_batch.num_rows() > 0);
305 self.buffer = BatchLastRow::try_new(record_batch);
306 self.contains_delete = contains_delete;
307
308 return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
309 }
310
311 let output = if buffer.last_batch.num_rows() > 1 {
314 let dedup_batch = buffer.last_batch.slice(0, buffer.last_batch.num_rows() - 1);
315 debug_assert_eq!(buffer.last_batch.num_rows() - 1, dedup_batch.num_rows());
316
317 maybe_filter_deleted(dedup_batch, self.filter_deleted, metrics)?
318 } else {
319 None
320 };
321 let last_row = buffer.last_batch.slice(buffer.last_batch.num_rows() - 1, 1);
322
323 let schema = batch.schema();
325 let merged = concat_batches(&schema, &[last_row, batch]).context(ComputeArrowSnafu)?;
326 let merged_row_count = merged.num_rows();
327 let (record_batch, contains_delete) =
329 Self::dedup_one_batch(merged, self.field_column_start, self.contains_delete)?;
330 metrics.num_unselected_rows += merged_row_count - record_batch.num_rows();
331 debug_assert!(record_batch.num_rows() > 0);
332 self.buffer = BatchLastRow::try_new(record_batch);
333 self.contains_delete = contains_delete;
334
335 Ok(output)
336 }
337
338 fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
339 let Some(buffer) = self.buffer.take() else {
340 return Ok(None);
341 };
342
343 maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics)
344 }
345}
346
347impl FlatLastNonNull {
348 pub fn new(field_column_start: usize, filter_deleted: bool) -> Self {
350 Self {
351 field_column_start,
352 filter_deleted,
353 buffer: None,
354 contains_delete: false,
355 }
356 }
357
358 fn dedup_one_batch(
361 batch: RecordBatch,
362 field_column_start: usize,
363 prev_batch_contains_delete: bool,
364 ) -> Result<(RecordBatch, bool)> {
365 let op_type_column = batch
367 .column(op_type_column_index(batch.num_columns()))
368 .clone();
369 let op_types = op_type_column
370 .as_any()
371 .downcast_ref::<UInt8Array>()
372 .unwrap();
373 let num_rows = batch.num_rows();
374 if num_rows < 2 {
375 let contains_delete = if num_rows > 0 {
376 op_types.value(0) == OpType::Delete as u8
377 } else {
378 false
379 };
380 return Ok((batch, contains_delete));
381 }
382
383 let num_columns = batch.num_columns();
384 let timestamps = batch.column(time_index_column_index(num_columns));
385 let mask = find_boundaries(timestamps).context(ComputeArrowSnafu)?;
387 if mask.count_set_bits() == num_rows - 1 {
388 let contains_delete = op_types.value(num_rows - 1) == OpType::Delete as u8;
389 return Ok((batch, contains_delete));
391 }
392
393 let columns: Vec<_> = [
397 primary_key_column_index(num_columns),
398 time_index_column_index(num_columns),
399 ]
400 .iter()
401 .map(|index| batch.column(*index).clone())
402 .collect();
403 let partitions = partition(&columns).context(ComputeArrowSnafu)?;
404
405 Self::dedup_by_partitions(
406 batch,
407 &partitions,
408 field_column_start,
409 op_types,
410 prev_batch_contains_delete,
411 )
412 }
413
414 fn dedup_by_partitions(
417 batch: RecordBatch,
418 partitions: &Partitions,
419 field_column_start: usize,
420 op_types: &UInt8Array,
421 first_range_contains_delete: bool,
422 ) -> Result<(RecordBatch, bool)> {
423 let ranges = partitions.ranges();
424 let contains_delete = Self::last_range_has_delete(&ranges, op_types);
425
426 let num_duplications: usize = ranges.iter().map(|r| r.end - r.start - 1).sum();
428 if num_duplications == 0 {
429 return Ok((batch, contains_delete));
431 }
432
433 let field_column_end = batch.num_columns() - FIXED_POS_COLUMN_NUM;
434 let take_options = Some(TakeOptions {
435 check_bounds: false,
436 });
437 let non_field_indices: UInt64Array = ranges.iter().map(|r| Some(r.start as u64)).collect();
439 let new_columns = batch
440 .columns()
441 .iter()
442 .enumerate()
443 .map(|(col_idx, column)| {
444 if col_idx >= field_column_start && col_idx < field_column_end {
445 let field_indices = Self::compute_field_indices(
446 &ranges,
447 column,
448 op_types,
449 first_range_contains_delete,
450 );
451 take(column, &field_indices, take_options.clone()).context(ComputeArrowSnafu)
452 } else {
453 take(column, &non_field_indices, take_options.clone())
454 .context(ComputeArrowSnafu)
455 }
456 })
457 .collect::<Result<Vec<ArrayRef>>>()?;
458
459 let record_batch =
460 RecordBatch::try_new(batch.schema(), new_columns).context(NewRecordBatchSnafu)?;
461 Ok((record_batch, contains_delete))
462 }
463
464 fn compute_field_indices(
469 ranges: &[Range<usize>],
470 field_array: &ArrayRef,
471 op_types: &UInt8Array,
472 first_range_contains_delete: bool,
473 ) -> UInt64Array {
474 ranges
475 .iter()
476 .enumerate()
477 .map(|(range_idx, r)| {
478 let mut value_index = r.start as u64;
479 if range_idx == 0 && first_range_contains_delete {
480 return Some(value_index);
481 }
482
483 for i in r.clone() {
486 if op_types.value(i) == OpType::Delete as u8 {
487 break;
488 }
489 if field_array.is_valid(i) {
490 value_index = i as u64;
491 break;
492 }
493 }
494
495 Some(value_index)
496 })
497 .collect()
498 }
499
500 fn last_range_has_delete(ranges: &[Range<usize>], op_types: &UInt8Array) -> bool {
502 if let Some(last_range) = ranges.last() {
503 last_range
504 .clone()
505 .any(|i| op_types.value(i) == OpType::Delete as u8)
506 } else {
507 false
508 }
509 }
510}
511
512struct BatchLastRow {
514 last_batch: RecordBatch,
517 primary_key: PrimaryKeyArray,
519 timestamp: i64,
521}
522
523impl BatchLastRow {
524 fn try_new(record_batch: RecordBatch) -> Option<Self> {
526 if record_batch.num_rows() > 0 {
527 let num_columns = record_batch.num_columns();
528 let primary_key = record_batch
529 .column(primary_key_column_index(num_columns))
530 .as_any()
531 .downcast_ref::<PrimaryKeyArray>()
532 .unwrap()
533 .clone();
534 let timestamp_array = record_batch.column(time_index_column_index(num_columns));
535 let timestamp = timestamp_value(timestamp_array, timestamp_array.len() - 1);
536
537 Some(Self {
538 last_batch: record_batch,
539 primary_key,
540 timestamp,
541 })
542 } else {
543 None
544 }
545 }
546
547 fn is_last_row_duplicated(&self, batch: &RecordBatch) -> bool {
549 if batch.num_rows() == 0 {
550 return false;
551 }
552
553 let batch_timestamp = timestamp_value(
555 batch.column(time_index_column_index(batch.num_columns())),
556 0,
557 );
558 if batch_timestamp != self.timestamp {
559 return false;
560 }
561
562 let last_key = primary_key_at(&self.primary_key, self.last_batch.num_rows() - 1);
563 let primary_key = batch
564 .column(primary_key_column_index(batch.num_columns()))
565 .as_any()
566 .downcast_ref::<PrimaryKeyArray>()
567 .unwrap();
568 let batch_key = primary_key_at(primary_key, 0);
570
571 last_key == batch_key
572 }
573}
574
575fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
580 let slice_len = v.len() - 1;
581 let v1 = v.slice(0, slice_len);
582 let v2 = v.slice(1, slice_len);
583
584 if !v.data_type().is_nested() {
585 return Ok(distinct(&v1, &v2)?.values().clone());
586 }
587 let cmp = make_comparator(&v1, &v2, SortOptions::default())?;
590 Ok((0..slice_len).map(|i| !cmp(i, i).is_eq()).collect())
591}
592
593fn maybe_filter_deleted(
595 record_batch: RecordBatch,
596 filter_deleted: bool,
597 metrics: &mut DedupMetrics,
598) -> Result<Option<RecordBatch>> {
599 if !filter_deleted {
600 return Ok(Some(record_batch));
601 }
602 let batch = filter_deleted_from_batch(record_batch, metrics)?;
603 if batch.num_rows() == 0 {
605 return Ok(None);
606 }
607 Ok(Some(batch))
608}
609
610fn filter_deleted_from_batch(
612 batch: RecordBatch,
613 metrics: &mut DedupMetrics,
614) -> Result<RecordBatch> {
615 let num_rows = batch.num_rows();
616 let op_type_column = batch.column(op_type_column_index(batch.num_columns()));
617 let op_types = op_type_column
619 .as_any()
620 .downcast_ref::<UInt8Array>()
621 .unwrap();
622 let has_delete = op_types
623 .values()
624 .iter()
625 .any(|op_type| *op_type != OpType::Put as u8);
626 if !has_delete {
627 return Ok(batch);
628 }
629
630 let mut builder = BooleanBufferBuilder::new(op_types.len());
631 for op_type in op_types.values() {
632 if *op_type == OpType::Delete as u8 {
633 builder.append(false);
634 } else {
635 builder.append(true);
636 }
637 }
638 let predicate = BooleanArray::new(builder.into(), None);
639 let new_batch = filter_record_batch(&batch, &predicate).context(ComputeArrowSnafu)?;
640 let num_deleted = num_rows - new_batch.num_rows();
641 metrics.num_deleted_rows += num_deleted;
642 metrics.num_unselected_rows += num_deleted;
643
644 Ok(new_batch)
645}
646
647fn primary_key_at(primary_key: &PrimaryKeyArray, index: usize) -> &[u8] {
649 let key = primary_key.keys().value(index);
650 let binary_values = primary_key
651 .values()
652 .as_any()
653 .downcast_ref::<BinaryArray>()
654 .unwrap();
655 binary_values.value(key as usize)
656}
657
658pub(crate) fn timestamp_value(array: &ArrayRef, idx: usize) -> i64 {
664 timestamp_array_to_i64_slice(array)[idx]
665}
666
667#[cfg(test)]
668mod tests {
669 use std::sync::Arc;
670
671 use api::v1::OpType;
672 use datatypes::arrow::array::{
673 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
674 TimestampMillisecondArray, UInt64Array, UInt8Array,
675 };
676 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
677 use datatypes::arrow::record_batch::RecordBatch;
678
679 use super::*;
680
681 fn new_record_batch(
683 primary_keys: &[&[u8]],
684 timestamps: &[i64],
685 sequences: &[u64],
686 op_types: &[OpType],
687 fields: &[u64],
688 ) -> RecordBatch {
689 let num_rows = timestamps.len();
690 debug_assert_eq!(sequences.len(), num_rows);
691 debug_assert_eq!(op_types.len(), num_rows);
692 debug_assert_eq!(fields.len(), num_rows);
693 debug_assert_eq!(primary_keys.len(), num_rows);
694
695 let columns: Vec<ArrayRef> = vec![
696 build_test_pk_string_dict_array(primary_keys),
698 Arc::new(Int64Array::from_iter(
700 fields.iter().map(|v| Some(*v as i64)),
701 )),
702 Arc::new(TimestampMillisecondArray::from_iter_values(
704 timestamps.iter().copied(),
705 )),
706 build_test_pk_array(primary_keys),
708 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
710 Arc::new(UInt8Array::from_iter_values(
712 op_types.iter().map(|v| *v as u8),
713 )),
714 ];
715
716 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
717 }
718
719 fn new_record_batch_multi_fields(
721 primary_keys: &[&[u8]],
722 timestamps: &[i64],
723 sequences: &[u64],
724 op_types: &[OpType],
725 fields: &[(Option<u64>, Option<u64>)],
726 ) -> RecordBatch {
727 let num_rows = timestamps.len();
728 debug_assert_eq!(sequences.len(), num_rows);
729 debug_assert_eq!(op_types.len(), num_rows);
730 debug_assert_eq!(fields.len(), num_rows);
731 debug_assert_eq!(primary_keys.len(), num_rows);
732
733 let columns: Vec<ArrayRef> = vec![
734 build_test_pk_string_dict_array(primary_keys),
736 Arc::new(Int64Array::from_iter(
738 fields.iter().map(|field| field.0.map(|v| v as i64)),
739 )),
740 Arc::new(Int64Array::from_iter(
742 fields.iter().map(|field| field.1.map(|v| v as i64)),
743 )),
744 Arc::new(TimestampMillisecondArray::from_iter_values(
746 timestamps.iter().copied(),
747 )),
748 build_test_pk_array(primary_keys),
750 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
752 Arc::new(UInt8Array::from_iter_values(
754 op_types.iter().map(|v| *v as u8),
755 )),
756 ];
757
758 RecordBatch::try_new(build_test_multi_field_schema(), columns).unwrap()
759 }
760
761 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
763 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
764 for &pk in primary_keys {
765 let pk_str = std::str::from_utf8(pk).unwrap();
766 builder.append(pk_str).unwrap();
767 }
768 Arc::new(builder.finish())
769 }
770
771 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
773 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
774 for &pk in primary_keys {
775 builder.append(pk).unwrap();
776 }
777 Arc::new(builder.finish())
778 }
779
780 fn build_test_flat_schema() -> SchemaRef {
782 let fields = vec![
783 Field::new(
784 "k0",
785 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
786 false,
787 ),
788 Field::new("field0", DataType::Int64, true),
789 Field::new(
790 "ts",
791 DataType::Timestamp(TimeUnit::Millisecond, None),
792 false,
793 ),
794 Field::new(
795 "__primary_key",
796 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
797 false,
798 ),
799 Field::new("__sequence", DataType::UInt64, false),
800 Field::new("__op_type", DataType::UInt8, false),
801 ];
802 Arc::new(Schema::new(fields))
803 }
804
805 fn build_test_multi_field_schema() -> SchemaRef {
807 let fields = vec![
808 Field::new(
809 "k0",
810 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
811 false,
812 ),
813 Field::new("field0", DataType::Int64, true),
814 Field::new("field1", DataType::Int64, true),
815 Field::new(
816 "ts",
817 DataType::Timestamp(TimeUnit::Millisecond, None),
818 false,
819 ),
820 Field::new(
821 "__primary_key",
822 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
823 false,
824 ),
825 Field::new("__sequence", DataType::UInt64, false),
826 Field::new("__op_type", DataType::UInt8, false),
827 ];
828 Arc::new(Schema::new(fields))
829 }
830
831 fn check_record_batches_equal(expected: &[RecordBatch], actual: &[RecordBatch]) {
833 for (i, (exp, act)) in expected.iter().zip(actual.iter()).enumerate() {
834 assert_eq!(exp, act, "RecordBatch {} differs", i);
835 }
836 assert_eq!(
837 expected.len(),
838 actual.len(),
839 "Number of batches don't match"
840 );
841 }
842
843 fn collect_iterator_results<I>(iter: I) -> Vec<RecordBatch>
845 where
846 I: Iterator<Item = Result<RecordBatch>>,
847 {
848 iter.map(|result| result.unwrap()).collect()
849 }
850
851 #[test]
852 fn test_flat_last_row_no_duplications() {
853 let input = vec![
854 new_record_batch(
855 &[b"k1", b"k1"],
856 &[1, 2],
857 &[11, 12],
858 &[OpType::Put, OpType::Put],
859 &[21, 22],
860 ),
861 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
862 new_record_batch(
863 &[b"k2", b"k2"],
864 &[1, 2],
865 &[111, 112],
866 &[OpType::Put, OpType::Put],
867 &[31, 32],
868 ),
869 ];
870
871 let iter = input.clone().into_iter().map(Ok);
873 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
874 let result = collect_iterator_results(&mut dedup_iter);
875 check_record_batches_equal(&input, &result);
876 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
877 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
878
879 let iter = input.clone().into_iter().map(Ok);
881 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
882 let result = collect_iterator_results(&mut dedup_iter);
883 check_record_batches_equal(&input, &result);
884 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
885 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
886 }
887
888 #[test]
889 fn test_flat_last_row_duplications() {
890 let input = vec![
891 new_record_batch(
892 &[b"k1", b"k1"],
893 &[1, 2],
894 &[13, 11],
895 &[OpType::Put, OpType::Put],
896 &[11, 12],
897 ),
898 new_record_batch(&[], &[], &[], &[], &[]),
900 new_record_batch(
902 &[b"k1", b"k1", b"k1"],
903 &[2, 3, 4],
904 &[10, 13, 13],
905 &[OpType::Put, OpType::Put, OpType::Delete],
906 &[2, 13, 14],
907 ),
908 new_record_batch(
909 &[b"k2", b"k2"],
910 &[1, 2],
911 &[20, 20],
912 &[OpType::Put, OpType::Delete],
913 &[101, 0],
914 ),
915 new_record_batch(&[b"k2"], &[2], &[19], &[OpType::Put], &[102]),
916 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
917 new_record_batch(&[b"k3"], &[2], &[19], &[OpType::Delete], &[0]),
920 ];
921
922 let expected_filter_deleted = vec![
924 new_record_batch(
925 &[b"k1", b"k1"],
926 &[1, 2],
927 &[13, 11],
928 &[OpType::Put, OpType::Put],
929 &[11, 12],
930 ),
931 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[13]),
932 new_record_batch(&[b"k2"], &[1], &[20], &[OpType::Put], &[101]),
933 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
934 ];
935
936 let iter = input.clone().into_iter().map(Ok);
937 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
938 let result = collect_iterator_results(&mut dedup_iter);
939 check_record_batches_equal(&expected_filter_deleted, &result);
940 assert_eq!(5, dedup_iter.metrics.num_unselected_rows);
941 assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
942
943 let expected_no_filter = vec![
945 new_record_batch(
946 &[b"k1", b"k1"],
947 &[1, 2],
948 &[13, 11],
949 &[OpType::Put, OpType::Put],
950 &[11, 12],
951 ),
952 new_record_batch(
953 &[b"k1", b"k1"],
954 &[3, 4],
955 &[13, 13],
956 &[OpType::Put, OpType::Delete],
957 &[13, 14],
958 ),
959 new_record_batch(
960 &[b"k2", b"k2"],
961 &[1, 2],
962 &[20, 20],
963 &[OpType::Put, OpType::Delete],
964 &[101, 0],
965 ),
966 new_record_batch(&[b"k3"], &[2], &[20], &[OpType::Put], &[202]),
967 ];
968
969 let iter = input.clone().into_iter().map(Ok);
970 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
971 let result = collect_iterator_results(&mut dedup_iter);
972 check_record_batches_equal(&expected_no_filter, &result);
973 assert_eq!(3, dedup_iter.metrics.num_unselected_rows);
974 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
975 }
976
977 #[test]
978 fn test_flat_last_non_null_no_duplications() {
979 let input = vec![
980 new_record_batch(
981 &[b"k1", b"k1"],
982 &[1, 2],
983 &[11, 12],
984 &[OpType::Put, OpType::Put],
985 &[21, 22],
986 ),
987 new_record_batch(&[b"k1"], &[3], &[13], &[OpType::Put], &[23]),
988 new_record_batch(
989 &[b"k2", b"k2"],
990 &[1, 2],
991 &[111, 112],
992 &[OpType::Put, OpType::Put],
993 &[31, 32],
994 ),
995 ];
996
997 let iter = input.clone().into_iter().map(Ok);
999 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1000 let result = collect_iterator_results(&mut dedup_iter);
1001 check_record_batches_equal(&input, &result);
1002 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1003 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1004
1005 let iter = input.clone().into_iter().map(Ok);
1007 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1008 let result = collect_iterator_results(&mut dedup_iter);
1009 check_record_batches_equal(&input, &result);
1010 assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
1011 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1012 }
1013
1014 #[test]
1015 fn test_flat_last_non_null_field_merging() {
1016 let input = vec![
1017 new_record_batch_multi_fields(
1018 &[b"k1", b"k1"],
1019 &[1, 2],
1020 &[13, 11],
1021 &[OpType::Put, OpType::Put],
1022 &[(Some(11), Some(11)), (None, None)],
1023 ),
1024 new_record_batch_multi_fields(&[], &[], &[], &[], &[]),
1026 new_record_batch_multi_fields(
1028 &[b"k1"],
1029 &[2],
1030 &[10],
1031 &[OpType::Put],
1032 &[(Some(12), None)],
1033 ),
1034 new_record_batch_multi_fields(
1035 &[b"k1", b"k1", b"k1"],
1036 &[2, 3, 4],
1037 &[10, 13, 13],
1038 &[OpType::Put, OpType::Put, OpType::Delete],
1039 &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
1040 ),
1041 new_record_batch_multi_fields(
1042 &[b"k2", b"k2"],
1043 &[1, 2],
1044 &[20, 20],
1045 &[OpType::Put, OpType::Delete],
1046 &[(Some(101), Some(101)), (None, None)],
1047 ),
1048 new_record_batch_multi_fields(
1049 &[b"k2"],
1050 &[2],
1051 &[19],
1052 &[OpType::Put],
1053 &[(Some(102), Some(102))],
1054 ),
1055 new_record_batch_multi_fields(
1056 &[b"k3"],
1057 &[2],
1058 &[20],
1059 &[OpType::Put],
1060 &[(Some(202), Some(202))],
1061 ),
1062 new_record_batch_multi_fields(
1065 &[b"k3"],
1066 &[2],
1067 &[19],
1068 &[OpType::Delete],
1069 &[(None, None)],
1070 ),
1071 ];
1072
1073 let expected_filter_deleted = vec![
1075 new_record_batch_multi_fields(
1076 &[b"k1"],
1077 &[1],
1078 &[13],
1079 &[OpType::Put],
1080 &[(Some(11), Some(11))],
1081 ),
1082 new_record_batch_multi_fields(
1083 &[b"k1", b"k1"],
1084 &[2, 3],
1085 &[11, 13],
1086 &[OpType::Put, OpType::Put],
1087 &[(Some(12), Some(22)), (Some(13), None)],
1088 ),
1089 new_record_batch_multi_fields(
1090 &[b"k2"],
1091 &[1],
1092 &[20],
1093 &[OpType::Put],
1094 &[(Some(101), Some(101))],
1095 ),
1096 new_record_batch_multi_fields(
1097 &[b"k3"],
1098 &[2],
1099 &[20],
1100 &[OpType::Put],
1101 &[(Some(202), Some(202))],
1102 ),
1103 ];
1104
1105 let iter = input.clone().into_iter().map(Ok);
1106 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1107 let result = collect_iterator_results(&mut dedup_iter);
1108 check_record_batches_equal(&expected_filter_deleted, &result);
1109 assert_eq!(6, dedup_iter.metrics.num_unselected_rows);
1110 assert_eq!(2, dedup_iter.metrics.num_deleted_rows);
1111
1112 let expected_no_filter = vec![
1114 new_record_batch_multi_fields(
1115 &[b"k1"],
1116 &[1],
1117 &[13],
1118 &[OpType::Put],
1119 &[(Some(11), Some(11))],
1120 ),
1121 new_record_batch_multi_fields(
1122 &[b"k1", b"k1", b"k1"],
1123 &[2, 3, 4],
1124 &[11, 13, 13],
1125 &[OpType::Put, OpType::Put, OpType::Delete],
1126 &[(Some(12), Some(22)), (Some(13), None), (None, Some(14))],
1127 ),
1128 new_record_batch_multi_fields(
1129 &[b"k2"],
1130 &[1],
1131 &[20],
1132 &[OpType::Put],
1133 &[(Some(101), Some(101))],
1134 ),
1135 new_record_batch_multi_fields(
1136 &[b"k2"],
1137 &[2],
1138 &[20],
1139 &[OpType::Delete],
1140 &[(None, None)],
1141 ),
1142 new_record_batch_multi_fields(
1143 &[b"k3"],
1144 &[2],
1145 &[20],
1146 &[OpType::Put],
1147 &[(Some(202), Some(202))],
1148 ),
1149 ];
1150
1151 let iter = input.clone().into_iter().map(Ok);
1152 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
1153 let result = collect_iterator_results(&mut dedup_iter);
1154 check_record_batches_equal(&expected_no_filter, &result);
1155 assert_eq!(4, dedup_iter.metrics.num_unselected_rows);
1156 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1157 }
1158
1159 #[test]
1160 fn test_flat_last_non_null_skip_merge_no_null() {
1161 let input = vec![
1162 new_record_batch_multi_fields(
1163 &[b"k1", b"k1"],
1164 &[1, 2],
1165 &[13, 11],
1166 &[OpType::Put, OpType::Put],
1167 &[(Some(11), Some(11)), (Some(12), Some(12))],
1168 ),
1169 new_record_batch_multi_fields(
1170 &[b"k1"],
1171 &[2],
1172 &[10],
1173 &[OpType::Put],
1174 &[(None, Some(22))],
1175 ),
1176 new_record_batch_multi_fields(
1177 &[b"k1", b"k1"],
1178 &[2, 3],
1179 &[9, 13],
1180 &[OpType::Put, OpType::Put],
1181 &[(Some(32), None), (Some(13), Some(13))],
1182 ),
1183 ];
1184
1185 let expected = vec![
1186 new_record_batch_multi_fields(
1187 &[b"k1"],
1188 &[1],
1189 &[13],
1190 &[OpType::Put],
1191 &[(Some(11), Some(11))],
1192 ),
1193 new_record_batch_multi_fields(
1194 &[b"k1", b"k1"],
1195 &[2, 3],
1196 &[11, 13],
1197 &[OpType::Put, OpType::Put],
1198 &[(Some(12), Some(12)), (Some(13), Some(13))],
1199 ),
1200 ];
1201
1202 let iter = input.into_iter().map(Ok);
1203 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1204 let result = collect_iterator_results(&mut dedup_iter);
1205 check_record_batches_equal(&expected, &result);
1206 assert_eq!(2, dedup_iter.metrics.num_unselected_rows);
1207 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1208 }
1209
1210 #[test]
1211 fn test_flat_last_non_null_merge_null() {
1212 let input = vec![
1213 new_record_batch_multi_fields(
1214 &[b"k1", b"k1"],
1215 &[1, 2],
1216 &[13, 11],
1217 &[OpType::Put, OpType::Put],
1218 &[(Some(11), Some(11)), (None, None)],
1219 ),
1220 new_record_batch_multi_fields(
1221 &[b"k1"],
1222 &[2],
1223 &[10],
1224 &[OpType::Put],
1225 &[(None, Some(22))],
1226 ),
1227 new_record_batch_multi_fields(
1228 &[b"k1"],
1229 &[3],
1230 &[13],
1231 &[OpType::Put],
1232 &[(Some(33), None)],
1233 ),
1234 ];
1235
1236 let expected = vec![
1237 new_record_batch_multi_fields(
1238 &[b"k1"],
1239 &[1],
1240 &[13],
1241 &[OpType::Put],
1242 &[(Some(11), Some(11))],
1243 ),
1244 new_record_batch_multi_fields(
1245 &[b"k1"],
1246 &[2],
1247 &[11],
1248 &[OpType::Put],
1249 &[(None, Some(22))],
1250 ),
1251 new_record_batch_multi_fields(
1252 &[b"k1"],
1253 &[3],
1254 &[13],
1255 &[OpType::Put],
1256 &[(Some(33), None)],
1257 ),
1258 ];
1259
1260 let iter = input.into_iter().map(Ok);
1261 let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
1262 let result = collect_iterator_results(&mut dedup_iter);
1263 check_record_batches_equal(&expected, &result);
1264 assert_eq!(1, dedup_iter.metrics.num_unselected_rows);
1265 assert_eq!(0, dedup_iter.metrics.num_deleted_rows);
1266 }
1267
1268 fn check_flat_dedup_strategy(
1270 input: &[RecordBatch],
1271 strategy: &mut dyn RecordBatchDedupStrategy,
1272 expect: &[RecordBatch],
1273 ) {
1274 let mut actual = Vec::new();
1275 let mut metrics = DedupMetrics::default();
1276 for batch in input {
1277 if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
1278 actual.push(out);
1279 }
1280 }
1281 if let Some(out) = strategy.finish(&mut metrics).unwrap() {
1282 actual.push(out);
1283 }
1284
1285 check_record_batches_equal(expect, &actual);
1286 }
1287
1288 #[test]
1289 fn test_flat_last_non_null_strategy_delete_last() {
1290 let input = vec![
1291 new_record_batch_multi_fields(
1292 &[b"k1"],
1293 &[1],
1294 &[6],
1295 &[OpType::Put],
1296 &[(Some(11), None)],
1297 ),
1298 new_record_batch_multi_fields(
1299 &[b"k1", b"k1"],
1300 &[1, 2],
1301 &[1, 7],
1302 &[OpType::Put, OpType::Put],
1303 &[(Some(1), None), (Some(22), Some(222))],
1304 ),
1305 new_record_batch_multi_fields(
1306 &[b"k1"],
1307 &[2],
1308 &[4],
1309 &[OpType::Put],
1310 &[(Some(12), None)],
1311 ),
1312 new_record_batch_multi_fields(
1313 &[b"k2", b"k2"],
1314 &[2, 3],
1315 &[2, 5],
1316 &[OpType::Put, OpType::Delete],
1317 &[(None, None), (Some(13), None)],
1318 ),
1319 new_record_batch_multi_fields(&[b"k2"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1320 ];
1321
1322 let mut strategy = FlatLastNonNull::new(1, true);
1323 check_flat_dedup_strategy(
1324 &input,
1325 &mut strategy,
1326 &[
1327 new_record_batch_multi_fields(
1328 &[b"k1"],
1329 &[1],
1330 &[6],
1331 &[OpType::Put],
1332 &[(Some(11), None)],
1333 ),
1334 new_record_batch_multi_fields(
1335 &[b"k1"],
1336 &[2],
1337 &[7],
1338 &[OpType::Put],
1339 &[(Some(22), Some(222))],
1340 ),
1341 new_record_batch_multi_fields(
1342 &[b"k2"],
1343 &[2],
1344 &[2],
1345 &[OpType::Put],
1346 &[(None, None)],
1347 ),
1348 ],
1349 );
1350 }
1351
1352 #[test]
1353 fn test_flat_last_non_null_strategy_delete_one() {
1354 let input = vec![
1355 new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1356 new_record_batch_multi_fields(
1357 &[b"k2"],
1358 &[1],
1359 &[6],
1360 &[OpType::Put],
1361 &[(Some(11), None)],
1362 ),
1363 ];
1364
1365 let mut strategy = FlatLastNonNull::new(1, true);
1366 check_flat_dedup_strategy(
1367 &input,
1368 &mut strategy,
1369 &[new_record_batch_multi_fields(
1370 &[b"k2"],
1371 &[1],
1372 &[6],
1373 &[OpType::Put],
1374 &[(Some(11), None)],
1375 )],
1376 );
1377 }
1378
1379 #[test]
1380 fn test_flat_last_non_null_strategy_delete_all() {
1381 let input = vec![
1382 new_record_batch_multi_fields(&[b"k1"], &[1], &[1], &[OpType::Delete], &[(None, None)]),
1383 new_record_batch_multi_fields(
1384 &[b"k2"],
1385 &[1],
1386 &[6],
1387 &[OpType::Delete],
1388 &[(Some(11), None)],
1389 ),
1390 ];
1391
1392 let mut strategy = FlatLastNonNull::new(1, true);
1393 check_flat_dedup_strategy(&input, &mut strategy, &[]);
1394 }
1395
1396 #[test]
1397 fn test_flat_last_non_null_strategy_same_batch() {
1398 let input = vec![
1399 new_record_batch_multi_fields(
1400 &[b"k1"],
1401 &[1],
1402 &[6],
1403 &[OpType::Put],
1404 &[(Some(11), None)],
1405 ),
1406 new_record_batch_multi_fields(
1407 &[b"k1", b"k1"],
1408 &[1, 2],
1409 &[1, 7],
1410 &[OpType::Put, OpType::Put],
1411 &[(Some(1), None), (Some(22), Some(222))],
1412 ),
1413 new_record_batch_multi_fields(
1414 &[b"k1"],
1415 &[2],
1416 &[4],
1417 &[OpType::Put],
1418 &[(Some(12), None)],
1419 ),
1420 new_record_batch_multi_fields(
1421 &[b"k1", b"k1"],
1422 &[2, 3],
1423 &[2, 5],
1424 &[OpType::Put, OpType::Put],
1425 &[(None, None), (Some(13), None)],
1426 ),
1427 new_record_batch_multi_fields(&[b"k1"], &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
1428 ];
1429
1430 let mut strategy = FlatLastNonNull::new(1, true);
1431 check_flat_dedup_strategy(
1432 &input,
1433 &mut strategy,
1434 &[
1435 new_record_batch_multi_fields(
1436 &[b"k1"],
1437 &[1],
1438 &[6],
1439 &[OpType::Put],
1440 &[(Some(11), None)],
1441 ),
1442 new_record_batch_multi_fields(
1443 &[b"k1"],
1444 &[2],
1445 &[7],
1446 &[OpType::Put],
1447 &[(Some(22), Some(222))],
1448 ),
1449 new_record_batch_multi_fields(
1450 &[b"k1"],
1451 &[3],
1452 &[5],
1453 &[OpType::Put],
1454 &[(Some(13), Some(3))],
1455 ),
1456 ],
1457 );
1458 }
1459
1460 #[test]
1461 fn test_flat_last_non_null_strategy_delete_middle() {
1462 let input = vec![
1463 new_record_batch_multi_fields(
1464 &[b"k1"],
1465 &[1],
1466 &[7],
1467 &[OpType::Put],
1468 &[(Some(11), None)],
1469 ),
1470 new_record_batch_multi_fields(&[b"k1"], &[1], &[4], &[OpType::Delete], &[(None, None)]),
1471 new_record_batch_multi_fields(
1472 &[b"k1"],
1473 &[1],
1474 &[1],
1475 &[OpType::Put],
1476 &[(Some(12), Some(1))],
1477 ),
1478 new_record_batch_multi_fields(
1479 &[b"k1"],
1480 &[2],
1481 &[8],
1482 &[OpType::Put],
1483 &[(Some(21), None)],
1484 ),
1485 new_record_batch_multi_fields(&[b"k1"], &[2], &[5], &[OpType::Delete], &[(None, None)]),
1486 new_record_batch_multi_fields(
1487 &[b"k1"],
1488 &[2],
1489 &[2],
1490 &[OpType::Put],
1491 &[(Some(22), Some(2))],
1492 ),
1493 new_record_batch_multi_fields(
1494 &[b"k1"],
1495 &[3],
1496 &[9],
1497 &[OpType::Put],
1498 &[(Some(31), None)],
1499 ),
1500 new_record_batch_multi_fields(&[b"k1"], &[3], &[6], &[OpType::Delete], &[(None, None)]),
1501 new_record_batch_multi_fields(
1502 &[b"k1"],
1503 &[3],
1504 &[3],
1505 &[OpType::Put],
1506 &[(Some(32), Some(3))],
1507 ),
1508 ];
1509
1510 let mut strategy = FlatLastNonNull::new(1, true);
1511 check_flat_dedup_strategy(
1512 &input,
1513 &mut strategy,
1514 &[
1515 new_record_batch_multi_fields(
1516 &[b"k1"],
1517 &[1],
1518 &[7],
1519 &[OpType::Put],
1520 &[(Some(11), None)],
1521 ),
1522 new_record_batch_multi_fields(
1523 &[b"k1"],
1524 &[2],
1525 &[8],
1526 &[OpType::Put],
1527 &[(Some(21), None)],
1528 ),
1529 new_record_batch_multi_fields(
1530 &[b"k1"],
1531 &[3],
1532 &[9],
1533 &[OpType::Put],
1534 &[(Some(31), None)],
1535 ),
1536 ],
1537 );
1538 }
1539}