1use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datatypes::arrow::array::{Array, AsArray, Int64Array, UInt64Array};
24use datatypes::arrow::compute::interleave;
25use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaRef, Utf8Type};
26use datatypes::arrow::error::ArrowError;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::arrow_array::BinaryArray;
29use datatypes::timestamp::timestamp_array_to_primitive;
30use futures::{Stream, TryStreamExt};
31use snafu::ResultExt;
32use store_api::storage::SequenceNumber;
33
34use crate::error::{ComputeArrowSnafu, Result};
35use crate::memtable::BoxedRecordBatchIterator;
36use crate::metrics::READ_STAGE_ELAPSED;
37use crate::read::BoxedRecordBatchStream;
38use crate::sst::parquet::flat_format::{
39 primary_key_column_index, sequence_column_index, time_index_column_index,
40};
41use crate::sst::parquet::format::PrimaryKeyArray;
42
43fn check_interleave_bytes_overflow<T: datatypes::arrow::datatypes::ByteArrayType>(
52 batches: &[(usize, RecordBatch)],
53 col_idx: usize,
54 indices: &[(usize, usize)],
55) -> std::result::Result<(), ArrowError> {
56 let total: usize = batches
59 .iter()
60 .map(|(_, batch)| batch.column(col_idx).as_bytes::<T>().value_data().len())
61 .sum();
62 if T::Offset::from_usize(total).is_some() {
63 return Ok(());
64 }
65 let mut capacity: usize = 0;
67 for &(a, b) in indices {
68 let array = batches[a].1.column(col_idx).as_bytes::<T>();
69 let o = array.value_offsets();
70 let element_len = o[b + 1].as_usize() - o[b].as_usize();
71 capacity += element_len;
72 T::Offset::from_usize(capacity).ok_or(ArrowError::OffsetOverflowError(capacity))?;
73 }
74 Ok(())
75}
76
77fn check_interleave_overflow(
79 batches: &[(usize, RecordBatch)],
80 schema: &SchemaRef,
81 indices: &[(usize, usize)],
82) -> Result<()> {
83 for (col_idx, field) in schema.fields.iter().enumerate() {
84 match field.data_type() {
85 DataType::Utf8 => {
86 check_interleave_bytes_overflow::<Utf8Type>(batches, col_idx, indices)
87 .context(ComputeArrowSnafu)?;
88 }
89 DataType::Binary => {
90 check_interleave_bytes_overflow::<BinaryType>(batches, col_idx, indices)
91 .context(ComputeArrowSnafu)?;
92 }
93 _ => continue,
94 }
95 }
96 Ok(())
97}
98
99#[derive(Debug, Copy, Clone, Default)]
101struct BatchCursor {
102 batch_idx: usize,
104 row_idx: usize,
106}
107
108pub trait MergeMetricsReport: Send + Sync {
110 fn report(&self, metrics: &mut MergeMetrics);
112}
113
114#[derive(Default)]
116pub struct MergeMetrics {
117 pub(crate) init_cost: Duration,
119 pub(crate) scan_cost: Duration,
121 pub(crate) num_fetch_by_batches: usize,
123 pub(crate) num_fetch_by_rows: usize,
125 pub(crate) fetch_cost: Duration,
127}
128
129impl fmt::Debug for MergeMetrics {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 if self.scan_cost.is_zero() {
132 return write!(f, "{{}}");
133 }
134
135 write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
136
137 if !self.init_cost.is_zero() {
138 write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
139 }
140 if self.num_fetch_by_batches > 0 {
141 write!(
142 f,
143 r#", "num_fetch_by_batches":{}"#,
144 self.num_fetch_by_batches
145 )?;
146 }
147 if self.num_fetch_by_rows > 0 {
148 write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
149 }
150 if !self.fetch_cost.is_zero() {
151 write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
152 }
153
154 write!(f, "}}")
155 }
156}
157
158impl MergeMetrics {
159 pub(crate) fn merge(&mut self, other: &MergeMetrics) {
161 let MergeMetrics {
162 init_cost,
163 scan_cost,
164 num_fetch_by_batches,
165 num_fetch_by_rows,
166 fetch_cost,
167 } = other;
168
169 self.init_cost += *init_cost;
170 self.scan_cost += *scan_cost;
171 self.num_fetch_by_batches += *num_fetch_by_batches;
172 self.num_fetch_by_rows += *num_fetch_by_rows;
173 self.fetch_cost += *fetch_cost;
174 }
175
176 pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
178 if self.scan_cost.as_millis() > 10
179 && let Some(r) = reporter
180 {
181 r.report(self);
182 }
183 }
184}
185
186#[derive(Debug)]
190pub struct BatchBuilder {
191 schema: SchemaRef,
193
194 batches: Vec<(usize, RecordBatch)>,
196
197 cursors: Vec<BatchCursor>,
199
200 indices: Vec<(usize, usize)>,
203}
204
205impl BatchBuilder {
206 pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
208 Self {
209 schema,
210 batches: Vec::with_capacity(stream_count * 2),
211 cursors: vec![BatchCursor::default(); stream_count],
212 indices: Vec::with_capacity(batch_size),
213 }
214 }
215
216 pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
218 let batch_idx = self.batches.len();
219 self.batches.push((stream_idx, batch));
220 self.cursors[stream_idx] = BatchCursor {
221 batch_idx,
222 row_idx: 0,
223 };
224 }
225
226 pub fn push_row(&mut self, stream_idx: usize) {
228 let cursor = &mut self.cursors[stream_idx];
229 let row_idx = cursor.row_idx;
230 cursor.row_idx += 1;
231 self.indices.push((cursor.batch_idx, row_idx));
232 }
233
234 pub fn len(&self) -> usize {
236 self.indices.len()
237 }
238
239 pub fn is_empty(&self) -> bool {
241 self.indices.is_empty()
242 }
243
244 pub fn schema(&self) -> &SchemaRef {
246 &self.schema
247 }
248
249 pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
255 if self.is_empty() {
256 return Ok(None);
257 }
258
259 check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
260
261 let columns = (0..self.schema.fields.len())
262 .map(|column_idx| {
263 let arrays: Vec<_> = self
264 .batches
265 .iter()
266 .map(|(_, batch)| batch.column(column_idx).as_ref())
267 .collect();
268 interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
269 })
270 .collect::<Result<Vec<_>>>()?;
271
272 self.indices.clear();
273
274 self.retain_batches();
280
281 RecordBatch::try_new(Arc::clone(&self.schema), columns)
282 .context(ComputeArrowSnafu)
283 .map(Some)
284 }
285
286 pub fn take_remaining_rows(
289 &mut self,
290 stream_idx: usize,
291 next: Option<RecordBatch>,
292 ) -> RecordBatch {
293 let cursor = &mut self.cursors[stream_idx];
294 let batch = &self.batches[cursor.batch_idx];
295 let output = batch
296 .1
297 .slice(cursor.row_idx, batch.1.num_rows() - cursor.row_idx);
298 cursor.row_idx = batch.1.num_rows();
299
300 if let Some(b) = next {
301 self.push_batch(stream_idx, b);
302 self.retain_batches();
303 }
304
305 output
306 }
307
308 fn retain_batches(&mut self) {
309 let mut batch_idx = 0;
310 let mut retained = 0;
311 self.batches.retain(|(stream_idx, _)| {
312 let stream_cursor = &mut self.cursors[*stream_idx];
313 let retain = stream_cursor.batch_idx == batch_idx;
314 batch_idx += 1;
315
316 if retain {
317 stream_cursor.batch_idx = retained;
318 retained += 1;
319 }
320 retain
321 });
322 }
323}
324
325trait NodeCmp: Eq + Ord {
327 fn is_eof(&self) -> bool;
329
330 fn is_behind(&self, other: &Self) -> bool;
336}
337
338struct MergeAlgo<T> {
340 hot: BinaryHeap<T>,
346 cold: BinaryHeap<T>,
350}
351
352impl<T: NodeCmp> MergeAlgo<T> {
353 fn new(mut nodes: Vec<T>) -> Self {
357 nodes.retain(|node| !node.is_eof());
359 let hot = BinaryHeap::with_capacity(nodes.len());
360 let cold = BinaryHeap::from(nodes);
361
362 let mut algo = MergeAlgo { hot, cold };
363 algo.refill_hot();
365
366 algo
367 }
368
369 fn refill_hot(&mut self) {
372 while !self.cold.is_empty() {
373 if let Some(merge_window) = self.hot.peek() {
374 let warmest = self.cold.peek().unwrap();
375 if warmest.is_behind(merge_window) {
376 break;
380 }
381 }
382
383 let warmest = self.cold.pop().unwrap();
384 self.hot.push(warmest);
385 }
386 }
387
388 fn reheap(&mut self, node: T) {
390 if node.is_eof() {
391 self.refill_hot();
394 } else {
395 let node_is_cold = if let Some(hottest) = self.hot.peek() {
397 node.is_behind(hottest)
400 } else {
401 true
404 };
405
406 if node_is_cold {
407 self.cold.push(node);
408 } else {
409 self.hot.push(node);
410 }
411 self.refill_hot();
413 }
414 }
415
416 fn pop_hot(&mut self) -> Option<T> {
418 self.hot.pop()
419 }
420
421 fn has_rows(&self) -> bool {
423 !self.hot.is_empty()
424 }
425
426 fn can_fetch_batch(&self) -> bool {
428 self.hot.len() == 1
429 }
430}
431
432struct SortColumns {
435 primary_key: PrimaryKeyArray,
436 timestamp: Int64Array,
437 sequence: UInt64Array,
438}
439
440impl SortColumns {
441 fn new(batch: &RecordBatch) -> Self {
446 let num_columns = batch.num_columns();
447 let primary_key = batch
448 .column(primary_key_column_index(num_columns))
449 .as_any()
450 .downcast_ref::<PrimaryKeyArray>()
451 .unwrap()
452 .clone();
453 let timestamp = batch.column(time_index_column_index(num_columns));
454 let (timestamp, _unit) = timestamp_array_to_primitive(timestamp).unwrap();
455 let sequence = batch
456 .column(sequence_column_index(num_columns))
457 .as_any()
458 .downcast_ref::<UInt64Array>()
459 .unwrap()
460 .clone();
461
462 Self {
463 primary_key,
464 timestamp,
465 sequence,
466 }
467 }
468
469 fn primary_key_at(&self, index: usize) -> &[u8] {
470 let key = self.primary_key.keys().value(index);
471 let binary_values = self
472 .primary_key
473 .values()
474 .as_any()
475 .downcast_ref::<BinaryArray>()
476 .unwrap();
477 binary_values.value(key as usize)
478 }
479
480 fn timestamp_at(&self, index: usize) -> i64 {
481 self.timestamp.value(index)
482 }
483
484 fn sequence_at(&self, index: usize) -> SequenceNumber {
485 self.sequence.value(index)
486 }
487
488 fn num_rows(&self) -> usize {
489 self.timestamp.len()
490 }
491}
492
493struct RowCursor {
498 offset: usize,
500 columns: SortColumns,
502}
503
504impl RowCursor {
505 fn new(columns: SortColumns) -> Self {
506 debug_assert!(columns.num_rows() > 0);
507
508 Self { offset: 0, columns }
509 }
510
511 fn is_finished(&self) -> bool {
512 self.offset >= self.columns.num_rows()
513 }
514
515 fn advance(&mut self) {
516 self.offset += 1;
517 }
518
519 fn first_primary_key(&self) -> &[u8] {
520 self.columns.primary_key_at(self.offset)
521 }
522
523 fn first_timestamp(&self) -> i64 {
524 self.columns.timestamp_at(self.offset)
525 }
526
527 fn first_sequence(&self) -> SequenceNumber {
528 self.columns.sequence_at(self.offset)
529 }
530
531 fn last_primary_key(&self) -> &[u8] {
532 self.columns.primary_key_at(self.columns.num_rows() - 1)
533 }
534
535 fn last_timestamp(&self) -> i64 {
536 self.columns.timestamp_at(self.columns.num_rows() - 1)
537 }
538}
539
540impl PartialEq for RowCursor {
541 fn eq(&self, other: &Self) -> bool {
542 self.first_primary_key() == other.first_primary_key()
543 && self.first_timestamp() == other.first_timestamp()
544 && self.first_sequence() == other.first_sequence()
545 }
546}
547
548impl Eq for RowCursor {}
549
550impl PartialOrd for RowCursor {
551 fn partial_cmp(&self, other: &RowCursor) -> Option<Ordering> {
552 Some(self.cmp(other))
553 }
554}
555
556impl Ord for RowCursor {
557 fn cmp(&self, other: &RowCursor) -> Ordering {
559 self.first_primary_key()
560 .cmp(other.first_primary_key())
561 .then_with(|| self.first_timestamp().cmp(&other.first_timestamp()))
562 .then_with(|| other.first_sequence().cmp(&self.first_sequence()))
563 }
564}
565
566pub struct FlatMergeIterator {
570 algo: MergeAlgo<IterNode>,
572 in_progress: BatchBuilder,
574 output_batch: Option<RecordBatch>,
576 batch_size: usize,
580}
581
582impl FlatMergeIterator {
583 pub fn new(
585 schema: SchemaRef,
586 iters: Vec<BoxedRecordBatchIterator>,
587 batch_size: usize,
588 ) -> Result<Self> {
589 let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
590 let mut nodes = Vec::with_capacity(iters.len());
591 for (node_index, iter) in iters.into_iter().enumerate() {
593 let mut node = IterNode {
594 node_index,
595 iter,
596 cursor: None,
597 };
598 if let Some(batch) = node.advance_batch()? {
599 in_progress.push_batch(node_index, batch);
600 nodes.push(node);
601 }
602 }
603
604 let algo = MergeAlgo::new(nodes);
605
606 let iter = Self {
607 algo,
608 in_progress,
609 output_batch: None,
610 batch_size,
611 };
612
613 Ok(iter)
614 }
615
616 pub fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
618 while self.algo.has_rows() && self.output_batch.is_none() {
619 if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
620 self.output_batch = self.in_progress.build_record_batch()?;
622 debug_assert!(self.output_batch.is_some());
623 } else if self.algo.can_fetch_batch() {
624 self.fetch_batch_from_hottest()?;
625 } else {
626 self.fetch_row_from_hottest()?;
627 }
628 }
629
630 Ok(self.output_batch.take())
631 }
632
633 fn fetch_batch_from_hottest(&mut self) -> Result<()> {
635 debug_assert!(self.in_progress.is_empty());
636
637 let mut hottest = self.algo.pop_hot().unwrap();
639 debug_assert!(!hottest.current_cursor().is_finished());
640 let next = hottest.advance_batch()?;
641 let batch = self
643 .in_progress
644 .take_remaining_rows(hottest.node_index, next);
645 Self::maybe_output_batch(batch, &mut self.output_batch);
646 self.algo.reheap(hottest);
647
648 Ok(())
649 }
650
651 fn fetch_row_from_hottest(&mut self) -> Result<()> {
653 let mut hottest = self.algo.pop_hot().unwrap();
655 debug_assert!(!hottest.current_cursor().is_finished());
656 self.in_progress.push_row(hottest.node_index);
657 if self.in_progress.len() >= self.batch_size {
658 if let Some(output) = self.in_progress.build_record_batch()? {
660 Self::maybe_output_batch(output, &mut self.output_batch);
661 }
662 }
663
664 if let Some(next) = hottest.advance_row()? {
665 self.in_progress.push_batch(hottest.node_index, next);
666 }
667
668 self.algo.reheap(hottest);
669 Ok(())
670 }
671
672 fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
674 debug_assert!(output_batch.is_none());
675 if batch.num_rows() > 0 {
676 *output_batch = Some(batch);
677 }
678 }
679}
680
681impl Iterator for FlatMergeIterator {
682 type Item = Result<RecordBatch>;
683
684 fn next(&mut self) -> Option<Self::Item> {
685 self.next_batch().transpose()
686 }
687}
688
689pub struct FlatMergeReader {
693 algo: MergeAlgo<StreamNode>,
695 in_progress: BatchBuilder,
697 output_batch: Option<RecordBatch>,
699 batch_size: usize,
703 metrics: MergeMetrics,
705 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
707}
708
709impl FlatMergeReader {
710 pub async fn new(
712 schema: SchemaRef,
713 iters: Vec<BoxedRecordBatchStream>,
714 batch_size: usize,
715 metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
716 ) -> Result<Self> {
717 let start = Instant::now();
718 let metrics = MergeMetrics::default();
719 let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
720 let mut nodes = Vec::with_capacity(iters.len());
721 for (node_index, iter) in iters.into_iter().enumerate() {
723 let mut node = StreamNode {
724 node_index,
725 iter,
726 cursor: None,
727 };
728 if let Some(batch) = node.advance_batch().await? {
729 in_progress.push_batch(node_index, batch);
730 nodes.push(node);
731 }
732 }
733
734 let algo = MergeAlgo::new(nodes);
735
736 let mut reader = Self {
737 algo,
738 in_progress,
739 output_batch: None,
740 batch_size,
741 metrics,
742 metrics_reporter,
743 };
744 let elapsed = start.elapsed();
745 reader.metrics.init_cost += elapsed;
746 reader.metrics.scan_cost += elapsed;
747
748 Ok(reader)
749 }
750
751 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
753 let start = Instant::now();
754 while self.algo.has_rows() && self.output_batch.is_none() {
755 if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
756 self.output_batch = self.in_progress.build_record_batch()?;
758 debug_assert!(self.output_batch.is_some());
759 } else if self.algo.can_fetch_batch() {
760 self.fetch_batch_from_hottest().await?;
761 self.metrics.num_fetch_by_batches += 1;
762 } else {
763 self.fetch_row_from_hottest().await?;
764 self.metrics.num_fetch_by_rows += 1;
765 }
766 }
767
768 if let Some(batch) = self.output_batch.take() {
769 self.metrics.scan_cost += start.elapsed();
770 self.metrics.maybe_report(&self.metrics_reporter);
771 Ok(Some(batch))
772 } else {
773 self.metrics.scan_cost += start.elapsed();
775 self.metrics.maybe_report(&self.metrics_reporter);
776 Ok(None)
777 }
778 }
779
780 pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
782 try_stream! {
783 while let Some(batch) = self.next_batch().await? {
784 yield batch;
785 }
786 }
787 }
788
789 async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
791 debug_assert!(self.in_progress.is_empty());
792
793 let mut hottest = self.algo.pop_hot().unwrap();
795 debug_assert!(!hottest.current_cursor().is_finished());
796 let start = Instant::now();
797 let next = hottest.advance_batch().await?;
798 self.metrics.fetch_cost += start.elapsed();
799 let batch = self
801 .in_progress
802 .take_remaining_rows(hottest.node_index, next);
803 Self::maybe_output_batch(batch, &mut self.output_batch);
804 self.algo.reheap(hottest);
805
806 Ok(())
807 }
808
809 async fn fetch_row_from_hottest(&mut self) -> Result<()> {
811 let mut hottest = self.algo.pop_hot().unwrap();
813 debug_assert!(!hottest.current_cursor().is_finished());
814 self.in_progress.push_row(hottest.node_index);
815 if self.in_progress.len() >= self.batch_size {
816 if let Some(output) = self.in_progress.build_record_batch()? {
818 Self::maybe_output_batch(output, &mut self.output_batch);
819 }
820 }
821
822 let start = Instant::now();
823 if let Some(next) = hottest.advance_row().await? {
824 self.metrics.fetch_cost += start.elapsed();
825 self.in_progress.push_batch(hottest.node_index, next);
826 } else {
827 self.metrics.fetch_cost += start.elapsed();
828 }
829
830 self.algo.reheap(hottest);
831 Ok(())
832 }
833
834 fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
836 debug_assert!(output_batch.is_none());
837 if batch.num_rows() > 0 {
838 *output_batch = Some(batch);
839 }
840 }
841}
842
843impl Drop for FlatMergeReader {
844 fn drop(&mut self) {
845 debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
846
847 READ_STAGE_ELAPSED
848 .with_label_values(&["flat_merge"])
849 .observe(self.metrics.scan_cost.as_secs_f64());
850 READ_STAGE_ELAPSED
851 .with_label_values(&["flat_merge_fetch"])
852 .observe(self.metrics.fetch_cost.as_secs_f64());
853
854 if let Some(reporter) = &self.metrics_reporter {
856 reporter.report(&mut self.metrics);
857 }
858 }
859}
860
861struct GenericNode<T> {
863 node_index: usize,
865 iter: T,
867 cursor: Option<RowCursor>,
872}
873
874impl<T> NodeCmp for GenericNode<T> {
875 fn is_eof(&self) -> bool {
876 self.cursor.is_none()
877 }
878
879 fn is_behind(&self, other: &Self) -> bool {
880 debug_assert!(!self.current_cursor().is_finished());
881 debug_assert!(!other.current_cursor().is_finished());
882
883 self.current_cursor()
887 .first_primary_key()
888 .cmp(other.current_cursor().last_primary_key())
889 .then_with(|| {
890 self.current_cursor()
891 .first_timestamp()
892 .cmp(&other.current_cursor().last_timestamp())
893 })
894 == Ordering::Greater
895 }
896}
897
898impl<T> PartialEq for GenericNode<T> {
899 fn eq(&self, other: &GenericNode<T>) -> bool {
900 self.cursor == other.cursor
901 }
902}
903
904impl<T> Eq for GenericNode<T> {}
905
906impl<T> PartialOrd for GenericNode<T> {
907 fn partial_cmp(&self, other: &GenericNode<T>) -> Option<Ordering> {
908 Some(self.cmp(other))
909 }
910}
911
912impl<T> Ord for GenericNode<T> {
913 fn cmp(&self, other: &GenericNode<T>) -> Ordering {
914 other.cursor.cmp(&self.cursor)
917 }
918}
919
920impl<T> GenericNode<T> {
921 fn current_cursor(&self) -> &RowCursor {
926 self.cursor.as_ref().unwrap()
927 }
928}
929
930impl GenericNode<BoxedRecordBatchIterator> {
931 fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
935 let batch = self.advance_inner_iter()?;
936 let columns = batch.as_ref().map(SortColumns::new);
937 self.cursor = columns.map(RowCursor::new);
938
939 Ok(batch)
940 }
941
942 fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
945 let cursor = self.cursor.as_mut().unwrap();
946 cursor.advance();
947 if !cursor.is_finished() {
948 return Ok(None);
949 }
950
951 self.advance_batch()
953 }
954
955 fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
957 while let Some(batch) = self.iter.next().transpose()? {
958 if batch.num_rows() > 0 {
959 return Ok(Some(batch));
960 }
961 }
962 Ok(None)
963 }
964}
965
966type StreamNode = GenericNode<BoxedRecordBatchStream>;
967type IterNode = GenericNode<BoxedRecordBatchIterator>;
968
969impl GenericNode<BoxedRecordBatchStream> {
970 async fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
974 let batch = self.advance_inner_iter().await?;
975 let columns = batch.as_ref().map(SortColumns::new);
976 self.cursor = columns.map(RowCursor::new);
977
978 Ok(batch)
979 }
980
981 async fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
984 let cursor = self.cursor.as_mut().unwrap();
985 cursor.advance();
986 if !cursor.is_finished() {
987 return Ok(None);
988 }
989
990 self.advance_batch().await
992 }
993
994 async fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
996 while let Some(batch) = self.iter.try_next().await? {
997 if batch.num_rows() > 0 {
998 return Ok(Some(batch));
999 }
1000 }
1001 Ok(None)
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use std::sync::Arc;
1008
1009 use api::v1::OpType;
1010 use datatypes::arrow::array::builder::BinaryDictionaryBuilder;
1011 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
1012 use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type};
1013 use datatypes::arrow::record_batch::RecordBatch;
1014
1015 use super::*;
1016
1017 fn create_test_record_batch(
1019 primary_keys: &[&[u8]],
1020 timestamps: &[i64],
1021 sequences: &[u64],
1022 op_types: &[OpType],
1023 field_values: &[i64],
1024 ) -> RecordBatch {
1025 let schema = Arc::new(Schema::new(vec![
1026 Field::new("field1", DataType::Int64, false),
1027 Field::new(
1028 "timestamp",
1029 DataType::Timestamp(TimeUnit::Millisecond, None),
1030 false,
1031 ),
1032 Field::new(
1033 "__primary_key",
1034 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1035 false,
1036 ),
1037 Field::new("__sequence", DataType::UInt64, false),
1038 Field::new("__op_type", DataType::UInt8, false),
1039 ]));
1040
1041 let field1 = Arc::new(Int64Array::from_iter_values(field_values.iter().copied()));
1042 let timestamp = Arc::new(TimestampMillisecondArray::from_iter_values(
1043 timestamps.iter().copied(),
1044 ));
1045
1046 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1048 for &key in primary_keys {
1049 builder.append(key).unwrap();
1050 }
1051 let primary_key = Arc::new(builder.finish());
1052
1053 let sequence = Arc::new(UInt64Array::from_iter_values(sequences.iter().copied()));
1054 let op_type = Arc::new(UInt8Array::from_iter_values(
1055 op_types.iter().map(|&v| v as u8),
1056 ));
1057
1058 RecordBatch::try_new(
1059 schema,
1060 vec![field1, timestamp, primary_key, sequence, op_type],
1061 )
1062 .unwrap()
1063 }
1064
1065 fn new_test_iter(batches: Vec<RecordBatch>) -> BoxedRecordBatchIterator {
1066 Box::new(batches.into_iter().map(Ok))
1067 }
1068
1069 fn assert_record_batches_eq(expected: &[RecordBatch], actual: &[RecordBatch]) {
1071 for (exp, act) in expected.iter().zip(actual.iter()) {
1072 assert_eq!(exp, act,);
1073 }
1074 }
1075
1076 fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec<RecordBatch> {
1078 iter.map(|result| result.unwrap()).collect()
1079 }
1080
1081 #[test]
1082 fn test_merge_iterator_empty() {
1083 let schema = Arc::new(Schema::new(vec![
1084 Field::new("field1", DataType::Int64, false),
1085 Field::new(
1086 "timestamp",
1087 DataType::Timestamp(TimeUnit::Millisecond, None),
1088 false,
1089 ),
1090 Field::new(
1091 "__primary_key",
1092 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1093 false,
1094 ),
1095 Field::new("__sequence", DataType::UInt64, false),
1096 Field::new("__op_type", DataType::UInt8, false),
1097 ]));
1098
1099 let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap();
1100 assert!(merge_iter.next_batch().unwrap().is_none());
1101 }
1102
1103 #[test]
1104 fn test_merge_iterator_single_batch() {
1105 let batch = create_test_record_batch(
1106 &[b"k1", b"k1"],
1107 &[1000, 2000],
1108 &[21, 22],
1109 &[OpType::Put, OpType::Put],
1110 &[11, 12],
1111 );
1112
1113 let schema = batch.schema();
1114 let iter = Box::new(new_test_iter(vec![batch.clone()]));
1115
1116 let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap();
1117 let result = collect_merge_iterator_batches(merge_iter);
1118
1119 assert_eq!(result.len(), 1);
1120 assert_record_batches_eq(&[batch], &result);
1121 }
1122
1123 #[test]
1124 fn test_merge_iterator_non_overlapping() {
1125 let batch1 = create_test_record_batch(
1126 &[b"k1", b"k1"],
1127 &[1000, 2000],
1128 &[21, 22],
1129 &[OpType::Put, OpType::Put],
1130 &[11, 12],
1131 );
1132 let batch2 = create_test_record_batch(
1133 &[b"k1", b"k1"],
1134 &[4000, 5000],
1135 &[24, 25],
1136 &[OpType::Put, OpType::Put],
1137 &[14, 15],
1138 );
1139 let batch3 = create_test_record_batch(
1140 &[b"k2", b"k2"],
1141 &[2000, 3000],
1142 &[22, 23],
1143 &[OpType::Delete, OpType::Put],
1144 &[12, 13],
1145 );
1146
1147 let schema = batch1.schema();
1148 let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()]));
1149 let iter2 = Box::new(new_test_iter(vec![batch2.clone()]));
1150
1151 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1152 let result = collect_merge_iterator_batches(merge_iter);
1153
1154 let expected = vec![batch1, batch2, batch3];
1156 assert_record_batches_eq(&expected, &result);
1157 }
1158
1159 #[test]
1160 fn test_merge_iterator_overlapping_timestamps() {
1161 let batch1 = create_test_record_batch(
1163 &[b"k1", b"k1"],
1164 &[1000, 2000],
1165 &[21, 22],
1166 &[OpType::Put, OpType::Put],
1167 &[11, 12],
1168 );
1169 let batch2 = create_test_record_batch(
1170 &[b"k1", b"k1"],
1171 &[1500, 2500],
1172 &[31, 32],
1173 &[OpType::Put, OpType::Put],
1174 &[15, 25],
1175 );
1176
1177 let schema = batch1.schema();
1178 let iter1 = Box::new(new_test_iter(vec![batch1]));
1179 let iter2 = Box::new(new_test_iter(vec![batch2]));
1180
1181 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1182 let result = collect_merge_iterator_batches(merge_iter);
1183
1184 let expected = vec![
1185 create_test_record_batch(
1186 &[b"k1", b"k1"],
1187 &[1000, 1500],
1188 &[21, 31],
1189 &[OpType::Put, OpType::Put],
1190 &[11, 15],
1191 ),
1192 create_test_record_batch(&[b"k1"], &[2000], &[22], &[OpType::Put], &[12]),
1193 create_test_record_batch(&[b"k1"], &[2500], &[32], &[OpType::Put], &[25]),
1194 ];
1195 assert_record_batches_eq(&expected, &result);
1196 }
1197
1198 #[test]
1199 fn test_merge_iterator_duplicate_keys_sequences() {
1200 let batch1 = create_test_record_batch(
1202 &[b"k1", b"k1"],
1203 &[1000, 1000],
1204 &[20, 10],
1205 &[OpType::Put, OpType::Put],
1206 &[1, 2],
1207 );
1208 let batch2 = create_test_record_batch(
1209 &[b"k1"],
1210 &[1000],
1211 &[15], &[OpType::Put],
1213 &[3],
1214 );
1215
1216 let schema = batch1.schema();
1217 let iter1 = Box::new(new_test_iter(vec![batch1]));
1218 let iter2 = Box::new(new_test_iter(vec![batch2]));
1219
1220 let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
1221 let result = collect_merge_iterator_batches(merge_iter);
1222
1223 let expected = vec![
1225 create_test_record_batch(
1226 &[b"k1", b"k1"],
1227 &[1000, 1000],
1228 &[20, 15],
1229 &[OpType::Put, OpType::Put],
1230 &[1, 3],
1231 ),
1232 create_test_record_batch(&[b"k1"], &[1000], &[10], &[OpType::Put], &[2]),
1233 ];
1234 assert_record_batches_eq(&expected, &result);
1235 }
1236
1237 #[test]
1238 fn test_batch_builder_basic() {
1239 let schema = Arc::new(Schema::new(vec![
1240 Field::new("field1", DataType::Int64, false),
1241 Field::new(
1242 "timestamp",
1243 DataType::Timestamp(TimeUnit::Millisecond, None),
1244 false,
1245 ),
1246 ]));
1247
1248 let mut builder = BatchBuilder::new(schema.clone(), 2, 1024);
1249 assert!(builder.is_empty());
1250
1251 let batch = RecordBatch::try_new(
1252 schema,
1253 vec![
1254 Arc::new(Int64Array::from(vec![1, 2])),
1255 Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])),
1256 ],
1257 )
1258 .unwrap();
1259
1260 builder.push_batch(0, batch);
1261 builder.push_row(0);
1262 builder.push_row(0);
1263
1264 assert!(!builder.is_empty());
1265 assert_eq!(builder.len(), 2);
1266
1267 let result_batch = builder.build_record_batch().unwrap().unwrap();
1268 assert_eq!(result_batch.num_rows(), 2);
1269 }
1270
1271 #[test]
1272 fn test_row_cursor_comparison() {
1273 let batch1 = create_test_record_batch(
1275 &[b"k1", b"k1"],
1276 &[1000, 2000],
1277 &[22, 21],
1278 &[OpType::Put, OpType::Put],
1279 &[11, 12],
1280 );
1281 let batch2 = create_test_record_batch(
1282 &[b"k1", b"k1"],
1283 &[1000, 2000],
1284 &[23, 20], &[OpType::Put, OpType::Put],
1286 &[11, 12],
1287 );
1288
1289 let columns1 = SortColumns::new(&batch1);
1290 let columns2 = SortColumns::new(&batch2);
1291
1292 let cursor1 = RowCursor::new(columns1);
1293 let cursor2 = RowCursor::new(columns2);
1294
1295 assert!(cursor2 < cursor1);
1298 }
1299}