1pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55 UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector,
56 VectorRef,
57};
58use futures::stream::BoxStream;
59use futures::TryStreamExt;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{ensure, OptionExt, ResultExt};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber};
64
65use crate::error::{
66 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67 Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78 primary_key: Vec<u8>,
80 pk_values: Option<CompositeValues>,
82 timestamps: VectorRef,
84 sequences: Arc<UInt64Vector>,
88 op_types: Arc<UInt8Vector>,
92 fields: Vec<BatchColumn>,
94 fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99 pub fn new(
101 primary_key: Vec<u8>,
102 timestamps: VectorRef,
103 sequences: Arc<UInt64Vector>,
104 op_types: Arc<UInt8Vector>,
105 fields: Vec<BatchColumn>,
106 ) -> Result<Batch> {
107 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108 .with_fields(fields)
109 .build()
110 }
111
112 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114 Batch::new(
115 self.primary_key,
116 self.timestamps,
117 self.sequences,
118 self.op_types,
119 fields,
120 )
121 }
122
123 pub fn primary_key(&self) -> &[u8] {
125 &self.primary_key
126 }
127
128 pub fn pk_values(&self) -> Option<&CompositeValues> {
130 self.pk_values.as_ref()
131 }
132
133 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135 self.pk_values = Some(pk_values);
136 }
137
138 #[cfg(any(test, feature = "test"))]
140 pub fn remove_pk_values(&mut self) {
141 self.pk_values = None;
142 }
143
144 pub fn fields(&self) -> &[BatchColumn] {
146 &self.fields
147 }
148
149 pub fn timestamps(&self) -> &VectorRef {
151 &self.timestamps
152 }
153
154 pub fn sequences(&self) -> &Arc<UInt64Vector> {
156 &self.sequences
157 }
158
159 pub fn op_types(&self) -> &Arc<UInt8Vector> {
161 &self.op_types
162 }
163
164 pub fn num_rows(&self) -> usize {
166 self.sequences.len()
169 }
170
171 pub(crate) fn empty() -> Self {
173 Self {
174 primary_key: vec![],
175 pk_values: None,
176 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179 fields: vec![],
180 fields_idx: None,
181 }
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.num_rows() == 0
187 }
188
189 pub fn first_timestamp(&self) -> Option<Timestamp> {
191 if self.timestamps.is_empty() {
192 return None;
193 }
194
195 Some(self.get_timestamp(0))
196 }
197
198 pub fn last_timestamp(&self) -> Option<Timestamp> {
200 if self.timestamps.is_empty() {
201 return None;
202 }
203
204 Some(self.get_timestamp(self.timestamps.len() - 1))
205 }
206
207 pub fn first_sequence(&self) -> Option<SequenceNumber> {
209 if self.sequences.is_empty() {
210 return None;
211 }
212
213 Some(self.get_sequence(0))
214 }
215
216 pub fn last_sequence(&self) -> Option<SequenceNumber> {
218 if self.sequences.is_empty() {
219 return None;
220 }
221
222 Some(self.get_sequence(self.sequences.len() - 1))
223 }
224
225 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230 self.primary_key = primary_key;
231 }
232
233 pub fn slice(&self, offset: usize, length: usize) -> Batch {
238 let fields = self
239 .fields
240 .iter()
241 .map(|column| BatchColumn {
242 column_id: column.column_id,
243 data: column.data.slice(offset, length),
244 })
245 .collect();
246 Batch {
248 primary_key: self.primary_key.clone(),
251 pk_values: self.pk_values.clone(),
252 timestamps: self.timestamps.slice(offset, length),
253 sequences: Arc::new(self.sequences.get_slice(offset, length)),
254 op_types: Arc::new(self.op_types.get_slice(offset, length)),
255 fields,
256 fields_idx: self.fields_idx.clone(),
257 }
258 }
259
260 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264 ensure!(
265 !batches.is_empty(),
266 InvalidBatchSnafu {
267 reason: "empty batches",
268 }
269 );
270 if batches.len() == 1 {
271 return Ok(batches.pop().unwrap());
273 }
274
275 let primary_key = std::mem::take(&mut batches[0].primary_key);
276 let first = &batches[0];
277 ensure!(
279 batches
280 .iter()
281 .skip(1)
282 .all(|b| b.primary_key() == primary_key),
283 InvalidBatchSnafu {
284 reason: "batches have different primary key",
285 }
286 );
287 for b in batches.iter().skip(1) {
288 ensure!(
289 b.fields.len() == first.fields.len(),
290 InvalidBatchSnafu {
291 reason: "batches have different field num",
292 }
293 );
294 for (l, r) in b.fields.iter().zip(&first.fields) {
295 ensure!(
296 l.column_id == r.column_id,
297 InvalidBatchSnafu {
298 reason: "batches have different fields",
299 }
300 );
301 }
302 }
303
304 let mut builder = BatchBuilder::new(primary_key);
306 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308 builder.timestamps_array(array)?;
309 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310 builder.sequences_array(array)?;
311 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312 builder.op_types_array(array)?;
313 for (i, batch_column) in first.fields.iter().enumerate() {
314 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315 builder.push_field_array(batch_column.column_id, array)?;
316 }
317
318 builder.build()
319 }
320
321 pub fn filter_deleted(&mut self) -> Result<()> {
323 let array = self.op_types.as_arrow();
325 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327 let predicate =
328 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329 self.filter(&BooleanVector::from(predicate))
330 }
331
332 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335 self.timestamps = self
336 .timestamps
337 .filter(predicate)
338 .context(ComputeVectorSnafu)?;
339 self.sequences = Arc::new(
340 UInt64Vector::try_from_arrow_array(
341 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342 .context(ComputeArrowSnafu)?,
343 )
344 .unwrap(),
345 );
346 self.op_types = Arc::new(
347 UInt8Vector::try_from_arrow_array(
348 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349 .context(ComputeArrowSnafu)?,
350 )
351 .unwrap(),
352 );
353 for batch_column in &mut self.fields {
354 batch_column.data = batch_column
355 .data
356 .filter(predicate)
357 .context(ComputeVectorSnafu)?;
358 }
359
360 Ok(())
361 }
362
363 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
365 let seq = match (sequence, self.last_sequence()) {
366 (None, _) | (_, None) => return Ok(()),
367 (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
368 (Some(sequence), Some(_)) => sequence,
369 };
370
371 let seqs = self.sequences.as_arrow();
372 let sequence = UInt64Array::new_scalar(seq);
373 let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
374 .context(ComputeArrowSnafu)?;
375 let predicate = BooleanVector::from(predicate);
376 self.filter(&predicate)?;
377
378 Ok(())
379 }
380
381 pub fn sort(&mut self, dedup: bool) -> Result<()> {
388 let converter = RowConverter::new(vec![
391 SortField::new(self.timestamps.data_type().as_arrow_type()),
392 SortField::new_with_options(
393 self.sequences.data_type().as_arrow_type(),
394 SortOptions {
395 descending: true,
396 ..Default::default()
397 },
398 ),
399 ])
400 .context(ComputeArrowSnafu)?;
401 let columns = [
403 self.timestamps.to_arrow_array(),
404 self.sequences.to_arrow_array(),
405 ];
406 let rows = converter.convert_columns(&columns).unwrap();
407 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
408
409 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
410 if !was_sorted {
411 to_sort.sort_unstable_by_key(|x| x.1);
412 }
413
414 let num_rows = to_sort.len();
415 if dedup {
416 to_sort.dedup_by(|left, right| {
418 debug_assert_eq!(18, left.1.as_ref().len());
419 debug_assert_eq!(18, right.1.as_ref().len());
420 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
421 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
423 });
424 }
425 let no_dedup = to_sort.len() == num_rows;
426
427 if was_sorted && no_dedup {
428 return Ok(());
429 }
430 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
431 self.take_in_place(&indices)
432 }
433
434 pub fn memory_size(&self) -> usize {
436 let mut size = std::mem::size_of::<Self>();
437 size += self.primary_key.len();
438 size += self.timestamps.memory_size();
439 size += self.sequences.memory_size();
440 size += self.op_types.memory_size();
441 for batch_column in &self.fields {
442 size += batch_column.data.memory_size();
443 }
444 size
445 }
446
447 pub(crate) fn projected_fields(
449 metadata: &RegionMetadata,
450 projection: &[ColumnId],
451 ) -> Vec<(ColumnId, ConcreteDataType)> {
452 let projected_ids: HashSet<_> = projection.iter().copied().collect();
453 metadata
454 .field_columns()
455 .filter_map(|column| {
456 if projected_ids.contains(&column.column_id) {
457 Some((column.column_id, column.column_schema.data_type.clone()))
458 } else {
459 None
460 }
461 })
462 .collect()
463 }
464
465 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
467 if self.timestamps.is_empty() {
468 return None;
469 }
470
471 let values = match self.timestamps.data_type() {
472 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
473 .timestamps
474 .as_any()
475 .downcast_ref::<TimestampSecondVector>()
476 .unwrap()
477 .as_arrow()
478 .values(),
479 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
480 .timestamps
481 .as_any()
482 .downcast_ref::<TimestampMillisecondVector>()
483 .unwrap()
484 .as_arrow()
485 .values(),
486 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
487 .timestamps
488 .as_any()
489 .downcast_ref::<TimestampMicrosecondVector>()
490 .unwrap()
491 .as_arrow()
492 .values(),
493 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
494 .timestamps
495 .as_any()
496 .downcast_ref::<TimestampNanosecondVector>()
497 .unwrap()
498 .as_arrow()
499 .values(),
500 other => panic!("timestamps in a Batch has other type {:?}", other),
501 };
502
503 Some(values)
504 }
505
506 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
508 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
509 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
510 .context(ComputeArrowSnafu)?;
511 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
513 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
514 .context(ComputeArrowSnafu)?;
515 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
516 for batch_column in &mut self.fields {
517 batch_column.data = batch_column
518 .data
519 .take(indices)
520 .context(ComputeVectorSnafu)?;
521 }
522
523 Ok(())
524 }
525
526 fn get_timestamp(&self, index: usize) -> Timestamp {
531 match self.timestamps.get_ref(index) {
532 ValueRef::Timestamp(timestamp) => timestamp,
533
534 value => panic!("{:?} is not a timestamp", value),
536 }
537 }
538
539 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
544 self.sequences.get_data(index).unwrap()
546 }
547
548 #[cfg(debug_assertions)]
550 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
551 use std::cmp::Ordering;
552 if self.timestamps_native().is_none() {
553 return Ok(());
554 }
555
556 let timestamps = self.timestamps_native().unwrap();
557 let sequences = self.sequences.as_arrow().values();
558 for (i, window) in timestamps.windows(2).enumerate() {
559 let current = window[0];
560 let next = window[1];
561 let current_sequence = sequences[i];
562 let next_sequence = sequences[i + 1];
563 match current.cmp(&next) {
564 Ordering::Less => {
565 continue;
567 }
568 Ordering::Equal => {
569 if current_sequence < next_sequence {
571 return Err(format!(
572 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
573 current, next, current_sequence, next_sequence, i
574 ));
575 }
576 }
577 Ordering::Greater => {
578 return Err(format!(
580 "timestamps are not monotonic: {} > {}, index: {}",
581 current, next, i
582 ));
583 }
584 }
585 }
586
587 Ok(())
588 }
589
590 #[cfg(debug_assertions)]
592 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
593 if self.primary_key() < other.primary_key() {
595 return Ok(());
596 }
597 if self.primary_key() > other.primary_key() {
598 return Err(format!(
599 "primary key is not monotonic: {:?} > {:?}",
600 self.primary_key(),
601 other.primary_key()
602 ));
603 }
604 if self.last_timestamp() < other.first_timestamp() {
606 return Ok(());
607 }
608 if self.last_timestamp() > other.first_timestamp() {
609 return Err(format!(
610 "timestamps are not monotonic: {:?} > {:?}",
611 self.last_timestamp(),
612 other.first_timestamp()
613 ));
614 }
615 if self.last_sequence() >= other.first_sequence() {
617 return Ok(());
618 }
619 Err(format!(
620 "sequences are not monotonic: {:?} < {:?}",
621 self.last_sequence(),
622 other.first_sequence()
623 ))
624 }
625
626 pub fn pk_col_value(
630 &mut self,
631 codec: &dyn PrimaryKeyCodec,
632 col_idx_in_pk: usize,
633 column_id: ColumnId,
634 ) -> Result<Option<&Value>> {
635 if self.pk_values.is_none() {
636 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
637 }
638
639 let pk_values = self.pk_values.as_ref().unwrap();
640 Ok(match pk_values {
641 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
642 CompositeValues::Sparse(values) => values.get(&column_id),
643 })
644 }
645
646 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
650 if self.fields_idx.is_none() {
651 self.fields_idx = Some(
652 self.fields
653 .iter()
654 .enumerate()
655 .map(|(i, c)| (c.column_id, i))
656 .collect(),
657 );
658 }
659
660 self.fields_idx
661 .as_ref()
662 .unwrap()
663 .get(&column_id)
664 .map(|&idx| &self.fields[idx])
665 }
666}
667
668#[cfg(debug_assertions)]
670#[derive(Default)]
671pub(crate) struct BatchChecker {
672 last_batch: Option<Batch>,
673 start: Option<Timestamp>,
674 end: Option<Timestamp>,
675}
676
677#[cfg(debug_assertions)]
678impl BatchChecker {
679 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
681 self.start = start;
682 self
683 }
684
685 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
687 self.end = end;
688 self
689 }
690
691 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
694 batch.check_monotonic()?;
695
696 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
697 if start > first {
698 return Err(format!(
699 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
700 start, first
701 ));
702 }
703 }
704 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
705 if end <= last {
706 return Err(format!(
707 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
708 end, last
709 ));
710 }
711 }
712
713 let res = self
716 .last_batch
717 .as_ref()
718 .map(|last| last.check_next_batch(batch))
719 .unwrap_or(Ok(()));
720 self.last_batch = Some(batch.clone());
721 res
722 }
723
724 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
726 use std::fmt::Write;
727
728 let mut message = String::new();
729 if let Some(last) = &self.last_batch {
730 write!(
731 message,
732 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
733 last.primary_key(),
734 last.last_timestamp(),
735 last.last_sequence()
736 )
737 .unwrap();
738 }
739 write!(
740 message,
741 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
742 batch.primary_key(),
743 batch.timestamps(),
744 batch.sequences()
745 )
746 .unwrap();
747
748 message
749 }
750
751 pub(crate) fn ensure_part_range_batch(
753 &mut self,
754 scanner: &str,
755 region_id: store_api::storage::RegionId,
756 partition: usize,
757 part_range: store_api::region_engine::PartitionRange,
758 batch: &Batch,
759 ) {
760 if let Err(e) = self.check_monotonic(batch) {
761 let err_msg = format!(
762 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
763 scanner, e, region_id, partition, part_range,
764 );
765 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
766 panic!("{err_msg}, batch rows: {}", batch.num_rows());
768 }
769 }
770}
771
772const TIMESTAMP_KEY_LEN: usize = 9;
774
775fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
777 let arrays: Vec<_> = iter.collect();
778 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
779 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
780}
781
782#[derive(Debug, PartialEq, Eq, Clone)]
784pub struct BatchColumn {
785 pub column_id: ColumnId,
787 pub data: VectorRef,
789}
790
791pub struct BatchBuilder {
793 primary_key: Vec<u8>,
794 timestamps: Option<VectorRef>,
795 sequences: Option<Arc<UInt64Vector>>,
796 op_types: Option<Arc<UInt8Vector>>,
797 fields: Vec<BatchColumn>,
798}
799
800impl BatchBuilder {
801 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
803 BatchBuilder {
804 primary_key,
805 timestamps: None,
806 sequences: None,
807 op_types: None,
808 fields: Vec::new(),
809 }
810 }
811
812 pub fn with_required_columns(
814 primary_key: Vec<u8>,
815 timestamps: VectorRef,
816 sequences: Arc<UInt64Vector>,
817 op_types: Arc<UInt8Vector>,
818 ) -> BatchBuilder {
819 BatchBuilder {
820 primary_key,
821 timestamps: Some(timestamps),
822 sequences: Some(sequences),
823 op_types: Some(op_types),
824 fields: Vec::new(),
825 }
826 }
827
828 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
830 self.fields = fields;
831 self
832 }
833
834 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
836 self.fields.push(column);
837 self
838 }
839
840 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
842 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
843 self.fields.push(BatchColumn {
844 column_id,
845 data: vector,
846 });
847
848 Ok(self)
849 }
850
851 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
853 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
854 ensure!(
855 vector.data_type().is_timestamp(),
856 InvalidBatchSnafu {
857 reason: format!("{:?} is not a timestamp type", vector.data_type()),
858 }
859 );
860
861 self.timestamps = Some(vector);
862 Ok(self)
863 }
864
865 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
867 ensure!(
868 *array.data_type() == arrow::datatypes::DataType::UInt64,
869 InvalidBatchSnafu {
870 reason: "sequence array is not UInt64 type",
871 }
872 );
873 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
875 self.sequences = Some(vector);
876
877 Ok(self)
878 }
879
880 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
882 ensure!(
883 *array.data_type() == arrow::datatypes::DataType::UInt8,
884 InvalidBatchSnafu {
885 reason: "sequence array is not UInt8 type",
886 }
887 );
888 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
890 self.op_types = Some(vector);
891
892 Ok(self)
893 }
894
895 pub fn build(self) -> Result<Batch> {
897 let timestamps = self.timestamps.context(InvalidBatchSnafu {
898 reason: "missing timestamps",
899 })?;
900 let sequences = self.sequences.context(InvalidBatchSnafu {
901 reason: "missing sequences",
902 })?;
903 let op_types = self.op_types.context(InvalidBatchSnafu {
904 reason: "missing op_types",
905 })?;
906 assert_eq!(0, timestamps.null_count());
909 assert_eq!(0, sequences.null_count());
910 assert_eq!(0, op_types.null_count());
911
912 let ts_len = timestamps.len();
913 ensure!(
914 sequences.len() == ts_len,
915 InvalidBatchSnafu {
916 reason: format!(
917 "sequence have different len {} != {}",
918 sequences.len(),
919 ts_len
920 ),
921 }
922 );
923 ensure!(
924 op_types.len() == ts_len,
925 InvalidBatchSnafu {
926 reason: format!(
927 "op type have different len {} != {}",
928 op_types.len(),
929 ts_len
930 ),
931 }
932 );
933 for column in &self.fields {
934 ensure!(
935 column.data.len() == ts_len,
936 InvalidBatchSnafu {
937 reason: format!(
938 "column {} has different len {} != {}",
939 column.column_id,
940 column.data.len(),
941 ts_len
942 ),
943 }
944 );
945 }
946
947 Ok(Batch {
948 primary_key: self.primary_key,
949 pk_values: None,
950 timestamps,
951 sequences,
952 op_types,
953 fields: self.fields,
954 fields_idx: None,
955 })
956 }
957}
958
959impl From<Batch> for BatchBuilder {
960 fn from(batch: Batch) -> Self {
961 Self {
962 primary_key: batch.primary_key,
963 timestamps: Some(batch.timestamps),
964 sequences: Some(batch.sequences),
965 op_types: Some(batch.op_types),
966 fields: batch.fields,
967 }
968 }
969}
970
971pub enum Source {
975 Reader(BoxedBatchReader),
977 Iter(BoxedBatchIterator),
979 Stream(BoxedBatchStream),
981 PruneReader(PruneReader),
983}
984
985impl Source {
986 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
988 match self {
989 Source::Reader(reader) => reader.next_batch().await,
990 Source::Iter(iter) => iter.next().transpose(),
991 Source::Stream(stream) => stream.try_next().await,
992 Source::PruneReader(reader) => reader.next_batch().await,
993 }
994 }
995}
996
997pub enum FlatSource {
999 Iter(BoxedRecordBatchIterator),
1001 Stream(BoxedRecordBatchStream),
1003}
1004
1005impl FlatSource {
1006 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1008 match self {
1009 FlatSource::Iter(iter) => iter.next().transpose(),
1010 FlatSource::Stream(stream) => stream.try_next().await,
1011 }
1012 }
1013}
1014
1015#[async_trait]
1019pub trait BatchReader: Send {
1020 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1028}
1029
1030pub type BoxedBatchReader = Box<dyn BatchReader>;
1032
1033pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1035
1036pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1038
1039#[async_trait::async_trait]
1040impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1041 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1042 (**self).next_batch().await
1043 }
1044}
1045
1046#[derive(Debug, Default)]
1048pub(crate) struct ScannerMetrics {
1049 prepare_scan_cost: Duration,
1051 build_reader_cost: Duration,
1053 scan_cost: Duration,
1055 yield_cost: Duration,
1057 num_batches: usize,
1059 num_rows: usize,
1061 num_mem_ranges: usize,
1063 num_file_ranges: usize,
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1070 use store_api::codec::PrimaryKeyEncoding;
1071 use store_api::storage::consts::ReservedColumnId;
1072
1073 use super::*;
1074 use crate::error::Error;
1075 use crate::test_util::new_batch_builder;
1076
1077 fn new_batch(
1078 timestamps: &[i64],
1079 sequences: &[u64],
1080 op_types: &[OpType],
1081 field: &[u64],
1082 ) -> Batch {
1083 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1084 .build()
1085 .unwrap()
1086 }
1087
1088 #[test]
1089 fn test_empty_batch() {
1090 let batch = Batch::empty();
1091 assert!(batch.is_empty());
1092 assert_eq!(None, batch.first_timestamp());
1093 assert_eq!(None, batch.last_timestamp());
1094 assert_eq!(None, batch.first_sequence());
1095 assert_eq!(None, batch.last_sequence());
1096 assert!(batch.timestamps_native().is_none());
1097 }
1098
1099 #[test]
1100 fn test_first_last_one() {
1101 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1102 assert_eq!(
1103 Timestamp::new_millisecond(1),
1104 batch.first_timestamp().unwrap()
1105 );
1106 assert_eq!(
1107 Timestamp::new_millisecond(1),
1108 batch.last_timestamp().unwrap()
1109 );
1110 assert_eq!(2, batch.first_sequence().unwrap());
1111 assert_eq!(2, batch.last_sequence().unwrap());
1112 }
1113
1114 #[test]
1115 fn test_first_last_multiple() {
1116 let batch = new_batch(
1117 &[1, 2, 3],
1118 &[11, 12, 13],
1119 &[OpType::Put, OpType::Put, OpType::Put],
1120 &[21, 22, 23],
1121 );
1122 assert_eq!(
1123 Timestamp::new_millisecond(1),
1124 batch.first_timestamp().unwrap()
1125 );
1126 assert_eq!(
1127 Timestamp::new_millisecond(3),
1128 batch.last_timestamp().unwrap()
1129 );
1130 assert_eq!(11, batch.first_sequence().unwrap());
1131 assert_eq!(13, batch.last_sequence().unwrap());
1132 }
1133
1134 #[test]
1135 fn test_slice() {
1136 let batch = new_batch(
1137 &[1, 2, 3, 4],
1138 &[11, 12, 13, 14],
1139 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1140 &[21, 22, 23, 24],
1141 );
1142 let batch = batch.slice(1, 2);
1143 let expect = new_batch(
1144 &[2, 3],
1145 &[12, 13],
1146 &[OpType::Delete, OpType::Put],
1147 &[22, 23],
1148 );
1149 assert_eq!(expect, batch);
1150 }
1151
1152 #[test]
1153 fn test_timestamps_native() {
1154 let batch = new_batch(
1155 &[1, 2, 3, 4],
1156 &[11, 12, 13, 14],
1157 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1158 &[21, 22, 23, 24],
1159 );
1160 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1161 }
1162
1163 #[test]
1164 fn test_concat_empty() {
1165 let err = Batch::concat(vec![]).unwrap_err();
1166 assert!(
1167 matches!(err, Error::InvalidBatch { .. }),
1168 "unexpected err: {err}"
1169 );
1170 }
1171
1172 #[test]
1173 fn test_concat_one() {
1174 let batch = new_batch(&[], &[], &[], &[]);
1175 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1176 assert_eq!(batch, actual);
1177
1178 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1179 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1180 assert_eq!(batch, actual);
1181 }
1182
1183 #[test]
1184 fn test_concat_multiple() {
1185 let batches = vec![
1186 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1187 new_batch(
1188 &[3, 4, 5],
1189 &[13, 14, 15],
1190 &[OpType::Put, OpType::Delete, OpType::Put],
1191 &[23, 24, 25],
1192 ),
1193 new_batch(&[], &[], &[], &[]),
1194 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1195 ];
1196 let batch = Batch::concat(batches).unwrap();
1197 let expect = new_batch(
1198 &[1, 2, 3, 4, 5, 6],
1199 &[11, 12, 13, 14, 15, 16],
1200 &[
1201 OpType::Put,
1202 OpType::Put,
1203 OpType::Put,
1204 OpType::Delete,
1205 OpType::Put,
1206 OpType::Put,
1207 ],
1208 &[21, 22, 23, 24, 25, 26],
1209 );
1210 assert_eq!(expect, batch);
1211 }
1212
1213 #[test]
1214 fn test_concat_different() {
1215 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1216 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1217 batch2.primary_key = b"hello".to_vec();
1218 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1219 assert!(
1220 matches!(err, Error::InvalidBatch { .. }),
1221 "unexpected err: {err}"
1222 );
1223 }
1224
1225 #[test]
1226 fn test_concat_different_fields() {
1227 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1228 let fields = vec![
1229 batch1.fields()[0].clone(),
1230 BatchColumn {
1231 column_id: 2,
1232 data: Arc::new(UInt64Vector::from_slice([2])),
1233 },
1234 ];
1235 let batch2 = batch1.clone().with_fields(fields).unwrap();
1237 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1238 assert!(
1239 matches!(err, Error::InvalidBatch { .. }),
1240 "unexpected err: {err}"
1241 );
1242
1243 let fields = vec![BatchColumn {
1245 column_id: 2,
1246 data: Arc::new(UInt64Vector::from_slice([2])),
1247 }];
1248 let batch2 = batch1.clone().with_fields(fields).unwrap();
1249 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1250 assert!(
1251 matches!(err, Error::InvalidBatch { .. }),
1252 "unexpected err: {err}"
1253 );
1254 }
1255
1256 #[test]
1257 fn test_filter_deleted_empty() {
1258 let mut batch = new_batch(&[], &[], &[], &[]);
1259 batch.filter_deleted().unwrap();
1260 assert!(batch.is_empty());
1261 }
1262
1263 #[test]
1264 fn test_filter_deleted() {
1265 let mut batch = new_batch(
1266 &[1, 2, 3, 4],
1267 &[11, 12, 13, 14],
1268 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1269 &[21, 22, 23, 24],
1270 );
1271 batch.filter_deleted().unwrap();
1272 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1273 assert_eq!(expect, batch);
1274
1275 let mut batch = new_batch(
1276 &[1, 2, 3, 4],
1277 &[11, 12, 13, 14],
1278 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1279 &[21, 22, 23, 24],
1280 );
1281 let expect = batch.clone();
1282 batch.filter_deleted().unwrap();
1283 assert_eq!(expect, batch);
1284 }
1285
1286 #[test]
1287 fn test_filter_by_sequence() {
1288 let mut batch = new_batch(
1290 &[1, 2, 3, 4],
1291 &[11, 12, 13, 14],
1292 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1293 &[21, 22, 23, 24],
1294 );
1295 batch.filter_by_sequence(Some(13)).unwrap();
1296 let expect = new_batch(
1297 &[1, 2, 3],
1298 &[11, 12, 13],
1299 &[OpType::Put, OpType::Put, OpType::Put],
1300 &[21, 22, 23],
1301 );
1302 assert_eq!(expect, batch);
1303
1304 let mut batch = new_batch(
1306 &[1, 2, 3, 4],
1307 &[11, 12, 13, 14],
1308 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1309 &[21, 22, 23, 24],
1310 );
1311
1312 batch.filter_by_sequence(Some(10)).unwrap();
1313 assert!(batch.is_empty());
1314
1315 let mut batch = new_batch(
1317 &[1, 2, 3, 4],
1318 &[11, 12, 13, 14],
1319 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1320 &[21, 22, 23, 24],
1321 );
1322 let expect = batch.clone();
1323 batch.filter_by_sequence(None).unwrap();
1324 assert_eq!(expect, batch);
1325
1326 let mut batch = new_batch(&[], &[], &[], &[]);
1328 batch.filter_by_sequence(Some(10)).unwrap();
1329 assert!(batch.is_empty());
1330
1331 let mut batch = new_batch(&[], &[], &[], &[]);
1333 batch.filter_by_sequence(None).unwrap();
1334 assert!(batch.is_empty());
1335 }
1336
1337 #[test]
1338 fn test_filter() {
1339 let mut batch = new_batch(
1341 &[1, 2, 3, 4],
1342 &[11, 12, 13, 14],
1343 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1344 &[21, 22, 23, 24],
1345 );
1346 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1347 batch.filter(&predicate).unwrap();
1348 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1349 assert_eq!(expect, batch);
1350
1351 let mut batch = new_batch(
1353 &[1, 2, 3, 4],
1354 &[11, 12, 13, 14],
1355 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1356 &[21, 22, 23, 24],
1357 );
1358 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1359 batch.filter(&predicate).unwrap();
1360 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1361 assert_eq!(expect, batch);
1362
1363 let predicate = BooleanVector::from_vec(vec![false, false]);
1365 batch.filter(&predicate).unwrap();
1366 assert!(batch.is_empty());
1367 }
1368
1369 #[test]
1370 fn test_sort_and_dedup() {
1371 let original = new_batch(
1372 &[2, 3, 1, 4, 5, 2],
1373 &[1, 2, 3, 4, 5, 6],
1374 &[
1375 OpType::Put,
1376 OpType::Put,
1377 OpType::Put,
1378 OpType::Put,
1379 OpType::Put,
1380 OpType::Put,
1381 ],
1382 &[21, 22, 23, 24, 25, 26],
1383 );
1384
1385 let mut batch = original.clone();
1386 batch.sort(true).unwrap();
1387 assert_eq!(
1389 new_batch(
1390 &[1, 2, 3, 4, 5],
1391 &[3, 6, 2, 4, 5],
1392 &[
1393 OpType::Put,
1394 OpType::Put,
1395 OpType::Put,
1396 OpType::Put,
1397 OpType::Put,
1398 ],
1399 &[23, 26, 22, 24, 25],
1400 ),
1401 batch
1402 );
1403
1404 let mut batch = original.clone();
1405 batch.sort(false).unwrap();
1406
1407 assert_eq!(
1409 new_batch(
1410 &[1, 2, 2, 3, 4, 5],
1411 &[3, 6, 1, 2, 4, 5],
1412 &[
1413 OpType::Put,
1414 OpType::Put,
1415 OpType::Put,
1416 OpType::Put,
1417 OpType::Put,
1418 OpType::Put,
1419 ],
1420 &[23, 26, 21, 22, 24, 25],
1421 ),
1422 batch
1423 );
1424
1425 let original = new_batch(
1426 &[2, 2, 1],
1427 &[1, 6, 1],
1428 &[OpType::Delete, OpType::Put, OpType::Put],
1429 &[21, 22, 23],
1430 );
1431
1432 let mut batch = original.clone();
1433 batch.sort(true).unwrap();
1434 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1435 assert_eq!(expect, batch);
1436
1437 let mut batch = original.clone();
1438 batch.sort(false).unwrap();
1439 let expect = new_batch(
1440 &[1, 2, 2],
1441 &[1, 6, 1],
1442 &[OpType::Put, OpType::Put, OpType::Delete],
1443 &[23, 22, 21],
1444 );
1445 assert_eq!(expect, batch);
1446 }
1447
1448 #[test]
1449 fn test_get_value() {
1450 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1451
1452 for encoding in encodings {
1453 let codec = build_primary_key_codec_with_fields(
1454 encoding,
1455 [
1456 (
1457 ReservedColumnId::table_id(),
1458 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1459 ),
1460 (
1461 ReservedColumnId::tsid(),
1462 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1463 ),
1464 (
1465 100,
1466 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1467 ),
1468 (
1469 200,
1470 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1471 ),
1472 ]
1473 .into_iter(),
1474 );
1475
1476 let values = [
1477 Value::UInt32(1000),
1478 Value::UInt64(2000),
1479 Value::String("abcdefgh".into()),
1480 Value::String("zyxwvu".into()),
1481 ];
1482 let mut buf = vec![];
1483 codec
1484 .encode_values(
1485 &[
1486 (ReservedColumnId::table_id(), values[0].clone()),
1487 (ReservedColumnId::tsid(), values[1].clone()),
1488 (100, values[2].clone()),
1489 (200, values[3].clone()),
1490 ],
1491 &mut buf,
1492 )
1493 .unwrap();
1494
1495 let field_col_id = 2;
1496 let mut batch = new_batch_builder(
1497 &buf,
1498 &[1, 2, 3],
1499 &[1, 1, 1],
1500 &[OpType::Put, OpType::Put, OpType::Put],
1501 field_col_id,
1502 &[42, 43, 44],
1503 )
1504 .build()
1505 .unwrap();
1506
1507 let v = batch
1508 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1509 .unwrap()
1510 .unwrap();
1511 assert_eq!(values[0], *v);
1512
1513 let v = batch
1514 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1515 .unwrap()
1516 .unwrap();
1517 assert_eq!(values[1], *v);
1518
1519 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1520 assert_eq!(values[2], *v);
1521
1522 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1523 assert_eq!(values[3], *v);
1524
1525 let v = batch.field_col_value(field_col_id).unwrap();
1526 assert_eq!(v.data.get(0), Value::UInt64(42));
1527 assert_eq!(v.data.get(1), Value::UInt64(43));
1528 assert_eq!(v.data.get(2), Value::UInt64(44));
1529 }
1530 }
1531}