1use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::sync::Arc;
18use std::time::Instant;
19
20use async_stream::try_stream;
21use common_telemetry::debug;
22use datatypes::arrow::array::{Int64Array, UInt64Array};
23use datatypes::arrow::compute::interleave;
24use datatypes::arrow::datatypes::SchemaRef;
25use datatypes::arrow::record_batch::RecordBatch;
26use datatypes::arrow_array::BinaryArray;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::{Stream, TryStreamExt};
29use snafu::ResultExt;
30use store_api::storage::SequenceNumber;
31
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::memtable::BoxedRecordBatchIterator;
34use crate::metrics::READ_STAGE_ELAPSED;
35use crate::read::BoxedRecordBatchStream;
36use crate::read::merge::{MergeMetrics, MergeMetricsReport};
37use crate::sst::parquet::flat_format::{
38 primary_key_column_index, sequence_column_index, time_index_column_index,
39};
40use crate::sst::parquet::format::PrimaryKeyArray;
41
42#[derive(Debug, Copy, Clone, Default)]
44struct BatchCursor {
45 batch_idx: usize,
47 row_idx: usize,
49}
50
51#[derive(Debug)]
55pub struct BatchBuilder {
56 schema: SchemaRef,
58
59 batches: Vec<(usize, RecordBatch)>,
61
62 cursors: Vec<BatchCursor>,
64
65 indices: Vec<(usize, usize)>,
68}
69
70impl BatchBuilder {
71 pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
73 Self {
74 schema,
75 batches: Vec::with_capacity(stream_count * 2),
76 cursors: vec![BatchCursor::default(); stream_count],
77 indices: Vec::with_capacity(batch_size),
78 }
79 }
80
81 pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
83 let batch_idx = self.batches.len();
84 self.batches.push((stream_idx, batch));
85 self.cursors[stream_idx] = BatchCursor {
86 batch_idx,
87 row_idx: 0,
88 };
89 }
90
91 pub fn push_row(&mut self, stream_idx: usize) {
93 let cursor = &mut self.cursors[stream_idx];
94 let row_idx = cursor.row_idx;
95 cursor.row_idx += 1;
96 self.indices.push((cursor.batch_idx, row_idx));
97 }
98
99 pub fn len(&self) -> usize {
101 self.indices.len()
102 }
103
104 pub fn is_empty(&self) -> bool {
106 self.indices.is_empty()
107 }
108
109 pub fn schema(&self) -> &SchemaRef {
111 &self.schema
112 }
113
114 pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
120 if self.is_empty() {
121 return Ok(None);
122 }
123
124 let columns = (0..self.schema.fields.len())
125 .map(|column_idx| {
126 let arrays: Vec<_> = self
127 .batches
128 .iter()
129 .map(|(_, batch)| batch.column(column_idx).as_ref())
130 .collect();
131 interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
132 })
133 .collect::<Result<Vec<_>>>()?;
134
135 self.indices.clear();
136
137 self.retain_batches();
143
144 RecordBatch::try_new(Arc::clone(&self.schema), columns)
145 .context(ComputeArrowSnafu)
146 .map(Some)
147 }
148
149 pub fn take_remaining_rows(
152 &mut self,
153 stream_idx: usize,
154 next: Option<RecordBatch>,
155 ) -> RecordBatch {
156 let cursor = &mut self.cursors[stream_idx];
157 let batch = &self.batches[cursor.batch_idx];
158 let output = batch
159 .1
160 .slice(cursor.row_idx, batch.1.num_rows() - cursor.row_idx);
161 cursor.row_idx = batch.1.num_rows();
162
163 if let Some(b) = next {
164 self.push_batch(stream_idx, b);
165 self.retain_batches();
166 }
167
168 output
169 }
170
171 fn retain_batches(&mut self) {
172 let mut batch_idx = 0;
173 let mut retained = 0;
174 self.batches.retain(|(stream_idx, _)| {
175 let stream_cursor = &mut self.cursors[*stream_idx];
176 let retain = stream_cursor.batch_idx == batch_idx;
177 batch_idx += 1;
178
179 if retain {
180 stream_cursor.batch_idx = retained;
181 retained += 1;
182 }
183 retain
184 });
185 }
186}
187
188trait NodeCmp: Eq + Ord {
190 fn is_eof(&self) -> bool;
192
193 fn is_behind(&self, other: &Self) -> bool;
199}
200
201struct MergeAlgo<T> {
203 hot: BinaryHeap<T>,
209 cold: BinaryHeap<T>,
213}
214
215impl<T: NodeCmp> MergeAlgo<T> {
216 fn new(mut nodes: Vec<T>) -> Self {
220 nodes.retain(|node| !node.is_eof());
222 let hot = BinaryHeap::with_capacity(nodes.len());
223 let cold = BinaryHeap::from(nodes);
224
225 let mut algo = MergeAlgo { hot, cold };
226 algo.refill_hot();
228
229 algo
230 }
231
232 fn refill_hot(&mut self) {
235 while !self.cold.is_empty() {
236 if let Some(merge_window) = self.hot.peek() {
237 let warmest = self.cold.peek().unwrap();
238 if warmest.is_behind(merge_window) {
239 break;
243 }
244 }
245
246 let warmest = self.cold.pop().unwrap();
247 self.hot.push(warmest);
248 }
249 }
250
251 fn reheap(&mut self, node: T) {
253 if node.is_eof() {
254 self.refill_hot();
257 } else {
258 let node_is_cold = if let Some(hottest) = self.hot.peek() {
260 node.is_behind(hottest)
263 } else {
264 true
267 };
268
269 if node_is_cold {
270 self.cold.push(node);
271 } else {
272 self.hot.push(node);
273 }
274 self.refill_hot();
276 }
277 }
278
279 fn pop_hot(&mut self) -> Option<T> {
281 self.hot.pop()
282 }
283
284 fn has_rows(&self) -> bool {
286 !self.hot.is_empty()
287 }
288
289 fn can_fetch_batch(&self) -> bool {
291 self.hot.len() == 1
292 }
293}
294
295struct SortColumns {
298 primary_key: PrimaryKeyArray,
299 timestamp: Int64Array,
300 sequence: UInt64Array,
301}
302
303impl SortColumns {
304 fn new(batch: &RecordBatch) -> Self {
309 let num_columns = batch.num_columns();
310 let primary_key = batch
311 .column(primary_key_column_index(num_columns))
312 .as_any()
313 .downcast_ref::<PrimaryKeyArray>()
314 .unwrap()
315 .clone();
316 let timestamp = batch.column(time_index_column_index(num_columns));
317 let (timestamp, _unit) = timestamp_array_to_primitive(timestamp).unwrap();
318 let sequence = batch
319 .column(sequence_column_index(num_columns))
320 .as_any()
321 .downcast_ref::<UInt64Array>()
322 .unwrap()
323 .clone();
324
325 Self {
326 primary_key,
327 timestamp,
328 sequence,
329 }
330 }
331
332 fn primary_key_at(&self, index: usize) -> &[u8] {
333 let key = self.primary_key.keys().value(index);
334 let binary_values = self
335 .primary_key
336 .values()
337 .as_any()
338 .downcast_ref::<BinaryArray>()
339 .unwrap();
340 binary_values.value(key as usize)
341 }
342
343 fn timestamp_at(&self, index: usize) -> i64 {
344 self.timestamp.value(index)
345 }
346
347 fn sequence_at(&self, index: usize) -> SequenceNumber {
348 self.sequence.value(index)
349 }
350
351 fn num_rows(&self) -> usize {
352 self.timestamp.len()
353 }
354}
355
356struct RowCursor {
361 offset: usize,
363 columns: SortColumns,
365}
366
367impl RowCursor {
368 fn new(columns: SortColumns) -> Self {
369 debug_assert!(columns.num_rows() > 0);
370
371 Self { offset: 0, columns }
372 }
373
374 fn is_finished(&self) -> bool {
375 self.offset >= self.columns.num_rows()
376 }
377
378 fn advance(&mut self) {
379 self.offset += 1;
380 }
381
382 fn first_primary_key(&self) -> &[u8] {
383 self.columns.primary_key_at(self.offset)
384 }
385
386 fn first_timestamp(&self) -> i64 {
387 self.columns.timestamp_at(self.offset)
388 }
389
390 fn first_sequence(&self) -> SequenceNumber {
391 self.columns.sequence_at(self.offset)
392 }
393
394 fn last_primary_key(&self) -> &[u8] {
395 self.columns.primary_key_at(self.columns.num_rows() - 1)
396 }
397
398 fn last_timestamp(&self) -> i64 {
399 self.columns.timestamp_at(self.columns.num_rows() - 1)
400 }
401}
402
403impl PartialEq for RowCursor {
404 fn eq(&self, other: &Self) -> bool {
405 self.first_primary_key() == other.first_primary_key()
406 && self.first_timestamp() == other.first_timestamp()
407 && self.first_sequence() == other.first_sequence()
408 }
409}
410
411impl Eq for RowCursor {}
412
413impl PartialOrd for RowCursor {
414 fn partial_cmp(&self, other: &RowCursor) -> Option<Ordering> {
415 Some(self.cmp(other))
416 }
417}
418
419impl Ord for RowCursor {
420 fn cmp(&self, other: &RowCursor) -> Ordering {
422 self.first_primary_key()
423 .cmp(other.first_primary_key())
424 .then_with(|| self.first_timestamp().cmp(&other.first_timestamp()))
425 .then_with(|| other.first_sequence().cmp(&self.first_sequence()))
426 }
427}
428
429pub struct FlatMergeIterator {
433 algo: MergeAlgo<IterNode>,
435 in_progress: BatchBuilder,
437 output_batch: Option<RecordBatch>,
439 batch_size: usize,
443}
444
445impl FlatMergeIterator {
446 pub fn new(
448 schema: SchemaRef,
449 iters: Vec<BoxedRecordBatchIterator>,
450 batch_size: usize,
451 ) -> Result<Self> {
452 let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
453 let mut nodes = Vec::with_capacity(iters.len());
454 for (node_index, iter) in iters.into_iter().enumerate() {
456 let mut node = IterNode {
457 node_index,
458 iter,
459 cursor: None,
460 };
461 if let Some(batch) = node.advance_batch()? {
462 in_progress.push_batch(node_index, batch);
463 nodes.push(node);
464 }
465 }
466
467 let algo = MergeAlgo::new(nodes);
468
469 let iter = Self {
470 algo,
471 in_progress,
472 output_batch: None,
473 batch_size,
474 };
475
476 Ok(iter)
477 }
478
479 pub fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
481 while self.algo.has_rows() && self.output_batch.is_none() {
482 if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
483 self.output_batch = self.in_progress.build_record_batch()?;
485 debug_assert!(self.output_batch.is_some());
486 } else if self.algo.can_fetch_batch() {
487 self.fetch_batch_from_hottest()?;
488 } else {
489 self.fetch_row_from_hottest()?;
490 }
491 }
492
493 Ok(self.output_batch.take())
494 }
495
496 fn fetch_batch_from_hottest(&mut self) -> Result<()> {
498 debug_assert!(self.in_progress.is_empty());
499
500 let mut hottest = self.algo.pop_hot().unwrap();
502 debug_assert!(!hottest.current_cursor().is_finished());
503 let next = hottest.advance_batch()?;
504 let batch = self
506 .in_progress
507 .take_remaining_rows(hottest.node_index, next);
508 Self::maybe_output_batch(batch, &mut self.output_batch);
509 self.algo.reheap(hottest);
510
511 Ok(())
512 }
513
514 fn fetch_row_from_hottest(&mut self) -> Result<()> {
516 let mut hottest = self.algo.pop_hot().unwrap();
518 debug_assert!(!hottest.current_cursor().is_finished());
519 self.in_progress.push_row(hottest.node_index);
520 if self.in_progress.len() >= self.batch_size {
521 if let Some(output) = self.in_progress.build_record_batch()? {
523 Self::maybe_output_batch(output, &mut self.output_batch);
524 }
525 }
526
527 if let Some(next) = hottest.advance_row()? {
528 self.in_progress.push_batch(hottest.node_index, next);
529 }
530
531 self.algo.reheap(hottest);
532 Ok(())
533 }
534
535 fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
537 debug_assert!(output_batch.is_none());
538 if batch.num_rows() > 0 {
539 *output_batch = Some(batch);
540 }
541 }
542}
543
544impl Iterator for FlatMergeIterator {
545 type Item = Result<RecordBatch>;
546
547 fn next(&mut self) -> Option<Self::Item> {
548 self.next_batch().transpose()
549 }
550}
551
552pub struct FlatMergeReader {
556 algo: MergeAlgo<StreamNode>,
558 in_progress: BatchBuilder,
560 output_batch: Option<RecordBatch>,
562 batch_size: usize,
566 metrics: MergeMetrics,
568 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
570}
571
572impl FlatMergeReader {
573 pub async fn new(
575 schema: SchemaRef,
576 iters: Vec<BoxedRecordBatchStream>,
577 batch_size: usize,
578 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
579 ) -> Result<Self> {
580 let start = Instant::now();
581 let metrics = MergeMetrics::default();
582 let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
583 let mut nodes = Vec::with_capacity(iters.len());
584 for (node_index, iter) in iters.into_iter().enumerate() {
586 let mut node = StreamNode {
587 node_index,
588 iter,
589 cursor: None,
590 };
591 if let Some(batch) = node.advance_batch().await? {
592 in_progress.push_batch(node_index, batch);
593 nodes.push(node);
594 }
595 }
596
597 let algo = MergeAlgo::new(nodes);
598
599 let mut reader = Self {
600 algo,
601 in_progress,
602 output_batch: None,
603 batch_size,
604 metrics,
605 metrics_reporter,
606 };
607 let elapsed = start.elapsed();
608 reader.metrics.init_cost += elapsed;
609 reader.metrics.scan_cost += elapsed;
610
611 Ok(reader)
612 }
613
614 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
616 let start = Instant::now();
617 while self.algo.has_rows() && self.output_batch.is_none() {
618 if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
619 self.output_batch = self.in_progress.build_record_batch()?;
621 debug_assert!(self.output_batch.is_some());
622 } else if self.algo.can_fetch_batch() {
623 self.fetch_batch_from_hottest().await?;
624 self.metrics.num_fetch_by_batches += 1;
625 } else {
626 self.fetch_row_from_hottest().await?;
627 self.metrics.num_fetch_by_rows += 1;
628 }
629 }
630
631 if let Some(batch) = self.output_batch.take() {
632 self.metrics.scan_cost += start.elapsed();
633 self.metrics.maybe_report(&self.metrics_reporter);
634 Ok(Some(batch))
635 } else {
636 self.metrics.scan_cost += start.elapsed();
638 self.metrics.maybe_report(&self.metrics_reporter);
639 Ok(None)
640 }
641 }
642
643 pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
645 try_stream! {
646 while let Some(batch) = self.next_batch().await? {
647 yield batch;
648 }
649 }
650 }
651
652 async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
654 debug_assert!(self.in_progress.is_empty());
655
656 let mut hottest = self.algo.pop_hot().unwrap();
658 debug_assert!(!hottest.current_cursor().is_finished());
659 let start = Instant::now();
660 let next = hottest.advance_batch().await?;
661 self.metrics.fetch_cost += start.elapsed();
662 let batch = self
664 .in_progress
665 .take_remaining_rows(hottest.node_index, next);
666 Self::maybe_output_batch(batch, &mut self.output_batch);
667 self.algo.reheap(hottest);
668
669 Ok(())
670 }
671
672 async fn fetch_row_from_hottest(&mut self) -> Result<()> {
674 let mut hottest = self.algo.pop_hot().unwrap();
676 debug_assert!(!hottest.current_cursor().is_finished());
677 self.in_progress.push_row(hottest.node_index);
678 if self.in_progress.len() >= self.batch_size {
679 if let Some(output) = self.in_progress.build_record_batch()? {
681 Self::maybe_output_batch(output, &mut self.output_batch);
682 }
683 }
684
685 let start = Instant::now();
686 if let Some(next) = hottest.advance_row().await? {
687 self.metrics.fetch_cost += start.elapsed();
688 self.in_progress.push_batch(hottest.node_index, next);
689 } else {
690 self.metrics.fetch_cost += start.elapsed();
691 }
692
693 self.algo.reheap(hottest);
694 Ok(())
695 }
696
697 fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
699 debug_assert!(output_batch.is_none());
700 if batch.num_rows() > 0 {
701 *output_batch = Some(batch);
702 }
703 }
704}
705
706impl Drop for FlatMergeReader {
707 fn drop(&mut self) {
708 debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
709
710 READ_STAGE_ELAPSED
711 .with_label_values(&["flat_merge"])
712 .observe(self.metrics.scan_cost.as_secs_f64());
713 READ_STAGE_ELAPSED
714 .with_label_values(&["flat_merge_fetch"])
715 .observe(self.metrics.fetch_cost.as_secs_f64());
716
717 if let Some(reporter) = &self.metrics_reporter {
719 reporter.report(&mut self.metrics);
720 }
721 }
722}
723
724struct GenericNode<T> {
726 node_index: usize,
728 iter: T,
730 cursor: Option<RowCursor>,
735}
736
737impl<T> NodeCmp for GenericNode<T> {
738 fn is_eof(&self) -> bool {
739 self.cursor.is_none()
740 }
741
742 fn is_behind(&self, other: &Self) -> bool {
743 debug_assert!(!self.current_cursor().is_finished());
744 debug_assert!(!other.current_cursor().is_finished());
745
746 self.current_cursor()
750 .first_primary_key()
751 .cmp(other.current_cursor().last_primary_key())
752 .then_with(|| {
753 self.current_cursor()
754 .first_timestamp()
755 .cmp(&other.current_cursor().last_timestamp())
756 })
757 == Ordering::Greater
758 }
759}
760
761impl<T> PartialEq for GenericNode<T> {
762 fn eq(&self, other: &GenericNode<T>) -> bool {
763 self.cursor == other.cursor
764 }
765}
766
767impl<T> Eq for GenericNode<T> {}
768
769impl<T> PartialOrd for GenericNode<T> {
770 fn partial_cmp(&self, other: &GenericNode<T>) -> Option<Ordering> {
771 Some(self.cmp(other))
772 }
773}
774
775impl<T> Ord for GenericNode<T> {
776 fn cmp(&self, other: &GenericNode<T>) -> Ordering {
777 other.cursor.cmp(&self.cursor)
780 }
781}
782
783impl<T> GenericNode<T> {
784 fn current_cursor(&self) -> &RowCursor {
789 self.cursor.as_ref().unwrap()
790 }
791}
792
793impl GenericNode<BoxedRecordBatchIterator> {
794 fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
798 let batch = self.advance_inner_iter()?;
799 let columns = batch.as_ref().map(SortColumns::new);
800 self.cursor = columns.map(RowCursor::new);
801
802 Ok(batch)
803 }
804
805 fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
808 let cursor = self.cursor.as_mut().unwrap();
809 cursor.advance();
810 if !cursor.is_finished() {
811 return Ok(None);
812 }
813
814 self.advance_batch()
816 }
817
818 fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
820 while let Some(batch) = self.iter.next().transpose()? {
821 if batch.num_rows() > 0 {
822 return Ok(Some(batch));
823 }
824 }
825 Ok(None)
826 }
827}
828
829type StreamNode = GenericNode<BoxedRecordBatchStream>;
830type IterNode = GenericNode<BoxedRecordBatchIterator>;
831
832impl GenericNode<BoxedRecordBatchStream> {
833 async fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
837 let batch = self.advance_inner_iter().await?;
838 let columns = batch.as_ref().map(SortColumns::new);
839 self.cursor = columns.map(RowCursor::new);
840
841 Ok(batch)
842 }
843
844 async fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
847 let cursor = self.cursor.as_mut().unwrap();
848 cursor.advance();
849 if !cursor.is_finished() {
850 return Ok(None);
851 }
852
853 self.advance_batch().await
855 }
856
857 async fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
859 while let Some(batch) = self.iter.try_next().await? {
860 if batch.num_rows() > 0 {
861 return Ok(Some(batch));
862 }
863 }
864 Ok(None)
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use std::sync::Arc;
871
872 use api::v1::OpType;
873 use datatypes::arrow::array::builder::BinaryDictionaryBuilder;
874 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
875 use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type};
876 use datatypes::arrow::record_batch::RecordBatch;
877
878 use super::*;
879
880 fn create_test_record_batch(
882 primary_keys: &[&[u8]],
883 timestamps: &[i64],
884 sequences: &[u64],
885 op_types: &[OpType],
886 field_values: &[i64],
887 ) -> RecordBatch {
888 let schema = Arc::new(Schema::new(vec![
889 Field::new("field1", DataType::Int64, false),
890 Field::new(
891 "timestamp",
892 DataType::Timestamp(TimeUnit::Millisecond, None),
893 false,
894 ),
895 Field::new(
896 "__primary_key",
897 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
898 false,
899 ),
900 Field::new("__sequence", DataType::UInt64, false),
901 Field::new("__op_type", DataType::UInt8, false),
902 ]));
903
904 let field1 = Arc::new(Int64Array::from_iter_values(field_values.iter().copied()));
905 let timestamp = Arc::new(TimestampMillisecondArray::from_iter_values(
906 timestamps.iter().copied(),
907 ));
908
909 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
911 for &key in primary_keys {
912 builder.append(key).unwrap();
913 }
914 let primary_key = Arc::new(builder.finish());
915
916 let sequence = Arc::new(UInt64Array::from_iter_values(sequences.iter().copied()));
917 let op_type = Arc::new(UInt8Array::from_iter_values(
918 op_types.iter().map(|&v| v as u8),
919 ));
920
921 RecordBatch::try_new(
922 schema,
923 vec![field1, timestamp, primary_key, sequence, op_type],
924 )
925 .unwrap()
926 }
927
928 fn new_test_iter(batches: Vec<RecordBatch>) -> BoxedRecordBatchIterator {
929 Box::new(batches.into_iter().map(Ok))
930 }
931
932 fn assert_record_batches_eq(expected: &[RecordBatch], actual: &[RecordBatch]) {
934 for (exp, act) in expected.iter().zip(actual.iter()) {
935 assert_eq!(exp, act,);
936 }
937 }
938
939 fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec<RecordBatch> {
941 iter.map(|result| result.unwrap()).collect()
942 }
943
944 #[test]
945 fn test_merge_iterator_empty() {
946 let schema = Arc::new(Schema::new(vec![
947 Field::new("field1", DataType::Int64, false),
948 Field::new(
949 "timestamp",
950 DataType::Timestamp(TimeUnit::Millisecond, None),
951 false,
952 ),
953 Field::new(
954 "__primary_key",
955 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
956 false,
957 ),
958 Field::new("__sequence", DataType::UInt64, false),
959 Field::new("__op_type", DataType::UInt8, false),
960 ]));
961
962 let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap();
963 assert!(merge_iter.next_batch().unwrap().is_none());
964 }
965
966 #[test]
967 fn test_merge_iterator_single_batch() {
968 let batch = create_test_record_batch(
969 &[b"k1", b"k1"],
970 &[1000, 2000],
971 &[21, 22],
972 &[OpType::Put, OpType::Put],
973 &[11, 12],
974 );
975
976 let schema = batch.schema();
977 let iter = Box::new(new_test_iter(vec![batch.clone()]));
978
979 let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap();
980 let result = collect_merge_iterator_batches(merge_iter);
981
982 assert_eq!(result.len(), 1);
983 assert_record_batches_eq(&[batch], &result);
984 }
985
986 #[test]
987 fn test_merge_iterator_non_overlapping() {
988 let batch1 = create_test_record_batch(
989 &[b"k1", b"k1"],
990 &[1000, 2000],
991 &[21, 22],
992 &[OpType::Put, OpType::Put],
993 &[11, 12],
994 );
995 let batch2 = create_test_record_batch(
996 &[b"k1", b"k1"],
997 &[4000, 5000],
998 &[24, 25],
999 &[OpType::Put, OpType::Put],
1000 &[14, 15],
1001 );
1002 let batch3 = create_test_record_batch(
1003 &[b"k2", b"k2"],
1004 &[2000, 3000],
1005 &[22, 23],
1006 &[OpType::Delete, OpType::Put],
1007 &[12, 13],
1008 );
1009
1010 let schema = batch1.schema();
1011 let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()]));
1012 let iter2 = Box::new(new_test_iter(vec![batch2.clone()]));
1013
1014 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1015 let result = collect_merge_iterator_batches(merge_iter);
1016
1017 let expected = vec![batch1, batch2, batch3];
1019 assert_record_batches_eq(&expected, &result);
1020 }
1021
1022 #[test]
1023 fn test_merge_iterator_overlapping_timestamps() {
1024 let batch1 = create_test_record_batch(
1026 &[b"k1", b"k1"],
1027 &[1000, 2000],
1028 &[21, 22],
1029 &[OpType::Put, OpType::Put],
1030 &[11, 12],
1031 );
1032 let batch2 = create_test_record_batch(
1033 &[b"k1", b"k1"],
1034 &[1500, 2500],
1035 &[31, 32],
1036 &[OpType::Put, OpType::Put],
1037 &[15, 25],
1038 );
1039
1040 let schema = batch1.schema();
1041 let iter1 = Box::new(new_test_iter(vec![batch1]));
1042 let iter2 = Box::new(new_test_iter(vec![batch2]));
1043
1044 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1045 let result = collect_merge_iterator_batches(merge_iter);
1046
1047 let expected = vec![
1048 create_test_record_batch(
1049 &[b"k1", b"k1"],
1050 &[1000, 1500],
1051 &[21, 31],
1052 &[OpType::Put, OpType::Put],
1053 &[11, 15],
1054 ),
1055 create_test_record_batch(&[b"k1"], &[2000], &[22], &[OpType::Put], &[12]),
1056 create_test_record_batch(&[b"k1"], &[2500], &[32], &[OpType::Put], &[25]),
1057 ];
1058 assert_record_batches_eq(&expected, &result);
1059 }
1060
1061 #[test]
1062 fn test_merge_iterator_duplicate_keys_sequences() {
1063 let batch1 = create_test_record_batch(
1065 &[b"k1", b"k1"],
1066 &[1000, 1000],
1067 &[20, 10],
1068 &[OpType::Put, OpType::Put],
1069 &[1, 2],
1070 );
1071 let batch2 = create_test_record_batch(
1072 &[b"k1"],
1073 &[1000],
1074 &[15], &[OpType::Put],
1076 &[3],
1077 );
1078
1079 let schema = batch1.schema();
1080 let iter1 = Box::new(new_test_iter(vec![batch1]));
1081 let iter2 = Box::new(new_test_iter(vec![batch2]));
1082
1083 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1084 let result = collect_merge_iterator_batches(merge_iter);
1085
1086 let expected = vec![
1088 create_test_record_batch(
1089 &[b"k1", b"k1"],
1090 &[1000, 1000],
1091 &[20, 15],
1092 &[OpType::Put, OpType::Put],
1093 &[1, 3],
1094 ),
1095 create_test_record_batch(&[b"k1"], &[1000], &[10], &[OpType::Put], &[2]),
1096 ];
1097 assert_record_batches_eq(&expected, &result);
1098 }
1099
1100 #[test]
1101 fn test_batch_builder_basic() {
1102 let schema = Arc::new(Schema::new(vec![
1103 Field::new("field1", DataType::Int64, false),
1104 Field::new(
1105 "timestamp",
1106 DataType::Timestamp(TimeUnit::Millisecond, None),
1107 false,
1108 ),
1109 ]));
1110
1111 let mut builder = BatchBuilder::new(schema.clone(), 2, 1024);
1112 assert!(builder.is_empty());
1113
1114 let batch = RecordBatch::try_new(
1115 schema,
1116 vec![
1117 Arc::new(Int64Array::from(vec![1, 2])),
1118 Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])),
1119 ],
1120 )
1121 .unwrap();
1122
1123 builder.push_batch(0, batch);
1124 builder.push_row(0);
1125 builder.push_row(0);
1126
1127 assert!(!builder.is_empty());
1128 assert_eq!(builder.len(), 2);
1129
1130 let result_batch = builder.build_record_batch().unwrap().unwrap();
1131 assert_eq!(result_batch.num_rows(), 2);
1132 }
1133
1134 #[test]
1135 fn test_row_cursor_comparison() {
1136 let batch1 = create_test_record_batch(
1138 &[b"k1", b"k1"],
1139 &[1000, 2000],
1140 &[22, 21],
1141 &[OpType::Put, OpType::Put],
1142 &[11, 12],
1143 );
1144 let batch2 = create_test_record_batch(
1145 &[b"k1", b"k1"],
1146 &[1000, 2000],
1147 &[23, 20], &[OpType::Put, OpType::Put],
1149 &[11, 12],
1150 );
1151
1152 let columns1 = SortColumns::new(&batch1);
1153 let columns2 = SortColumns::new(&batch2);
1154
1155 let cursor1 = RowCursor::new(columns1);
1156 let cursor2 = RowCursor::new(columns2);
1157
1158 assert!(cursor2 < cursor1);
1161 }
1162}