1pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
56 VectorRef,
57};
58use futures::TryStreamExt;
59use futures::stream::BoxStream;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{OptionExt, ResultExt, ensure};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
64
65use crate::error::{
66 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67 Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78 primary_key: Vec<u8>,
80 pk_values: Option<CompositeValues>,
82 timestamps: VectorRef,
84 sequences: Arc<UInt64Vector>,
88 op_types: Arc<UInt8Vector>,
92 fields: Vec<BatchColumn>,
94 fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99 pub fn new(
101 primary_key: Vec<u8>,
102 timestamps: VectorRef,
103 sequences: Arc<UInt64Vector>,
104 op_types: Arc<UInt8Vector>,
105 fields: Vec<BatchColumn>,
106 ) -> Result<Batch> {
107 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108 .with_fields(fields)
109 .build()
110 }
111
112 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114 Batch::new(
115 self.primary_key,
116 self.timestamps,
117 self.sequences,
118 self.op_types,
119 fields,
120 )
121 }
122
123 pub fn primary_key(&self) -> &[u8] {
125 &self.primary_key
126 }
127
128 pub fn pk_values(&self) -> Option<&CompositeValues> {
130 self.pk_values.as_ref()
131 }
132
133 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135 self.pk_values = Some(pk_values);
136 }
137
138 #[cfg(any(test, feature = "test"))]
140 pub fn remove_pk_values(&mut self) {
141 self.pk_values = None;
142 }
143
144 pub fn fields(&self) -> &[BatchColumn] {
146 &self.fields
147 }
148
149 pub fn timestamps(&self) -> &VectorRef {
151 &self.timestamps
152 }
153
154 pub fn sequences(&self) -> &Arc<UInt64Vector> {
156 &self.sequences
157 }
158
159 pub fn op_types(&self) -> &Arc<UInt8Vector> {
161 &self.op_types
162 }
163
164 pub fn num_rows(&self) -> usize {
166 self.sequences.len()
169 }
170
171 pub(crate) fn empty() -> Self {
173 Self {
174 primary_key: vec![],
175 pk_values: None,
176 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179 fields: vec![],
180 fields_idx: None,
181 }
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.num_rows() == 0
187 }
188
189 pub fn first_timestamp(&self) -> Option<Timestamp> {
191 if self.timestamps.is_empty() {
192 return None;
193 }
194
195 Some(self.get_timestamp(0))
196 }
197
198 pub fn last_timestamp(&self) -> Option<Timestamp> {
200 if self.timestamps.is_empty() {
201 return None;
202 }
203
204 Some(self.get_timestamp(self.timestamps.len() - 1))
205 }
206
207 pub fn first_sequence(&self) -> Option<SequenceNumber> {
209 if self.sequences.is_empty() {
210 return None;
211 }
212
213 Some(self.get_sequence(0))
214 }
215
216 pub fn last_sequence(&self) -> Option<SequenceNumber> {
218 if self.sequences.is_empty() {
219 return None;
220 }
221
222 Some(self.get_sequence(self.sequences.len() - 1))
223 }
224
225 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230 self.primary_key = primary_key;
231 }
232
233 pub fn slice(&self, offset: usize, length: usize) -> Batch {
238 let fields = self
239 .fields
240 .iter()
241 .map(|column| BatchColumn {
242 column_id: column.column_id,
243 data: column.data.slice(offset, length),
244 })
245 .collect();
246 Batch {
248 primary_key: self.primary_key.clone(),
251 pk_values: self.pk_values.clone(),
252 timestamps: self.timestamps.slice(offset, length),
253 sequences: Arc::new(self.sequences.get_slice(offset, length)),
254 op_types: Arc::new(self.op_types.get_slice(offset, length)),
255 fields,
256 fields_idx: self.fields_idx.clone(),
257 }
258 }
259
260 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264 ensure!(
265 !batches.is_empty(),
266 InvalidBatchSnafu {
267 reason: "empty batches",
268 }
269 );
270 if batches.len() == 1 {
271 return Ok(batches.pop().unwrap());
273 }
274
275 let primary_key = std::mem::take(&mut batches[0].primary_key);
276 let first = &batches[0];
277 ensure!(
279 batches
280 .iter()
281 .skip(1)
282 .all(|b| b.primary_key() == primary_key),
283 InvalidBatchSnafu {
284 reason: "batches have different primary key",
285 }
286 );
287 for b in batches.iter().skip(1) {
288 ensure!(
289 b.fields.len() == first.fields.len(),
290 InvalidBatchSnafu {
291 reason: "batches have different field num",
292 }
293 );
294 for (l, r) in b.fields.iter().zip(&first.fields) {
295 ensure!(
296 l.column_id == r.column_id,
297 InvalidBatchSnafu {
298 reason: "batches have different fields",
299 }
300 );
301 }
302 }
303
304 let mut builder = BatchBuilder::new(primary_key);
306 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308 builder.timestamps_array(array)?;
309 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310 builder.sequences_array(array)?;
311 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312 builder.op_types_array(array)?;
313 for (i, batch_column) in first.fields.iter().enumerate() {
314 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315 builder.push_field_array(batch_column.column_id, array)?;
316 }
317
318 builder.build()
319 }
320
321 pub fn filter_deleted(&mut self) -> Result<()> {
323 let array = self.op_types.as_arrow();
325 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327 let predicate =
328 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329 self.filter(&BooleanVector::from(predicate))
330 }
331
332 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335 self.timestamps = self
336 .timestamps
337 .filter(predicate)
338 .context(ComputeVectorSnafu)?;
339 self.sequences = Arc::new(
340 UInt64Vector::try_from_arrow_array(
341 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342 .context(ComputeArrowSnafu)?,
343 )
344 .unwrap(),
345 );
346 self.op_types = Arc::new(
347 UInt8Vector::try_from_arrow_array(
348 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349 .context(ComputeArrowSnafu)?,
350 )
351 .unwrap(),
352 );
353 for batch_column in &mut self.fields {
354 batch_column.data = batch_column
355 .data
356 .filter(predicate)
357 .context(ComputeVectorSnafu)?;
358 }
359
360 Ok(())
361 }
362
363 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
365 let seq_range = match sequence {
366 None => return Ok(()),
367 Some(seq_range) => {
368 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
369 else {
370 return Ok(());
371 };
372 let is_subset = match seq_range {
373 SequenceRange::Gt { min } => min < first,
374 SequenceRange::LtEq { max } => max >= last,
375 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
376 };
377 if is_subset {
378 return Ok(());
379 }
380 seq_range
381 }
382 };
383
384 let seqs = self.sequences.as_arrow();
385 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
386
387 let predicate = BooleanVector::from(predicate);
388 self.filter(&predicate)?;
389
390 Ok(())
391 }
392
393 pub fn sort(&mut self, dedup: bool) -> Result<()> {
400 let converter = RowConverter::new(vec![
403 SortField::new(self.timestamps.data_type().as_arrow_type()),
404 SortField::new_with_options(
405 self.sequences.data_type().as_arrow_type(),
406 SortOptions {
407 descending: true,
408 ..Default::default()
409 },
410 ),
411 ])
412 .context(ComputeArrowSnafu)?;
413 let columns = [
415 self.timestamps.to_arrow_array(),
416 self.sequences.to_arrow_array(),
417 ];
418 let rows = converter.convert_columns(&columns).unwrap();
419 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
420
421 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
422 if !was_sorted {
423 to_sort.sort_unstable_by_key(|x| x.1);
424 }
425
426 let num_rows = to_sort.len();
427 if dedup {
428 to_sort.dedup_by(|left, right| {
430 debug_assert_eq!(18, left.1.as_ref().len());
431 debug_assert_eq!(18, right.1.as_ref().len());
432 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
433 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
435 });
436 }
437 let no_dedup = to_sort.len() == num_rows;
438
439 if was_sorted && no_dedup {
440 return Ok(());
441 }
442 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
443 self.take_in_place(&indices)
444 }
445
446 pub fn memory_size(&self) -> usize {
448 let mut size = std::mem::size_of::<Self>();
449 size += self.primary_key.len();
450 size += self.timestamps.memory_size();
451 size += self.sequences.memory_size();
452 size += self.op_types.memory_size();
453 for batch_column in &self.fields {
454 size += batch_column.data.memory_size();
455 }
456 size
457 }
458
459 pub(crate) fn projected_fields(
461 metadata: &RegionMetadata,
462 projection: &[ColumnId],
463 ) -> Vec<(ColumnId, ConcreteDataType)> {
464 let projected_ids: HashSet<_> = projection.iter().copied().collect();
465 metadata
466 .field_columns()
467 .filter_map(|column| {
468 if projected_ids.contains(&column.column_id) {
469 Some((column.column_id, column.column_schema.data_type.clone()))
470 } else {
471 None
472 }
473 })
474 .collect()
475 }
476
477 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
479 if self.timestamps.is_empty() {
480 return None;
481 }
482
483 let values = match self.timestamps.data_type() {
484 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
485 .timestamps
486 .as_any()
487 .downcast_ref::<TimestampSecondVector>()
488 .unwrap()
489 .as_arrow()
490 .values(),
491 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
492 .timestamps
493 .as_any()
494 .downcast_ref::<TimestampMillisecondVector>()
495 .unwrap()
496 .as_arrow()
497 .values(),
498 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
499 .timestamps
500 .as_any()
501 .downcast_ref::<TimestampMicrosecondVector>()
502 .unwrap()
503 .as_arrow()
504 .values(),
505 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
506 .timestamps
507 .as_any()
508 .downcast_ref::<TimestampNanosecondVector>()
509 .unwrap()
510 .as_arrow()
511 .values(),
512 other => panic!("timestamps in a Batch has other type {:?}", other),
513 };
514
515 Some(values)
516 }
517
518 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
520 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
521 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
522 .context(ComputeArrowSnafu)?;
523 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
525 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
526 .context(ComputeArrowSnafu)?;
527 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
528 for batch_column in &mut self.fields {
529 batch_column.data = batch_column
530 .data
531 .take(indices)
532 .context(ComputeVectorSnafu)?;
533 }
534
535 Ok(())
536 }
537
538 fn get_timestamp(&self, index: usize) -> Timestamp {
543 match self.timestamps.get_ref(index) {
544 ValueRef::Timestamp(timestamp) => timestamp,
545
546 value => panic!("{:?} is not a timestamp", value),
548 }
549 }
550
551 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
556 self.sequences.get_data(index).unwrap()
558 }
559
560 #[cfg(debug_assertions)]
562 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
563 use std::cmp::Ordering;
564 if self.timestamps_native().is_none() {
565 return Ok(());
566 }
567
568 let timestamps = self.timestamps_native().unwrap();
569 let sequences = self.sequences.as_arrow().values();
570 for (i, window) in timestamps.windows(2).enumerate() {
571 let current = window[0];
572 let next = window[1];
573 let current_sequence = sequences[i];
574 let next_sequence = sequences[i + 1];
575 match current.cmp(&next) {
576 Ordering::Less => {
577 continue;
579 }
580 Ordering::Equal => {
581 if current_sequence < next_sequence {
583 return Err(format!(
584 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
585 current, next, current_sequence, next_sequence, i
586 ));
587 }
588 }
589 Ordering::Greater => {
590 return Err(format!(
592 "timestamps are not monotonic: {} > {}, index: {}",
593 current, next, i
594 ));
595 }
596 }
597 }
598
599 Ok(())
600 }
601
602 #[cfg(debug_assertions)]
604 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
605 if self.primary_key() < other.primary_key() {
607 return Ok(());
608 }
609 if self.primary_key() > other.primary_key() {
610 return Err(format!(
611 "primary key is not monotonic: {:?} > {:?}",
612 self.primary_key(),
613 other.primary_key()
614 ));
615 }
616 if self.last_timestamp() < other.first_timestamp() {
618 return Ok(());
619 }
620 if self.last_timestamp() > other.first_timestamp() {
621 return Err(format!(
622 "timestamps are not monotonic: {:?} > {:?}",
623 self.last_timestamp(),
624 other.first_timestamp()
625 ));
626 }
627 if self.last_sequence() >= other.first_sequence() {
629 return Ok(());
630 }
631 Err(format!(
632 "sequences are not monotonic: {:?} < {:?}",
633 self.last_sequence(),
634 other.first_sequence()
635 ))
636 }
637
638 pub fn pk_col_value(
642 &mut self,
643 codec: &dyn PrimaryKeyCodec,
644 col_idx_in_pk: usize,
645 column_id: ColumnId,
646 ) -> Result<Option<&Value>> {
647 if self.pk_values.is_none() {
648 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
649 }
650
651 let pk_values = self.pk_values.as_ref().unwrap();
652 Ok(match pk_values {
653 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
654 CompositeValues::Sparse(values) => values.get(&column_id),
655 })
656 }
657
658 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
662 if self.fields_idx.is_none() {
663 self.fields_idx = Some(
664 self.fields
665 .iter()
666 .enumerate()
667 .map(|(i, c)| (c.column_id, i))
668 .collect(),
669 );
670 }
671
672 self.fields_idx
673 .as_ref()
674 .unwrap()
675 .get(&column_id)
676 .map(|&idx| &self.fields[idx])
677 }
678}
679
680#[cfg(debug_assertions)]
682#[derive(Default)]
683pub(crate) struct BatchChecker {
684 last_batch: Option<Batch>,
685 start: Option<Timestamp>,
686 end: Option<Timestamp>,
687}
688
689#[cfg(debug_assertions)]
690impl BatchChecker {
691 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
693 self.start = start;
694 self
695 }
696
697 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
699 self.end = end;
700 self
701 }
702
703 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
706 batch.check_monotonic()?;
707
708 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
709 && start > first
710 {
711 return Err(format!(
712 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
713 start, first
714 ));
715 }
716 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
717 && end <= last
718 {
719 return Err(format!(
720 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
721 end, last
722 ));
723 }
724
725 let res = self
728 .last_batch
729 .as_ref()
730 .map(|last| last.check_next_batch(batch))
731 .unwrap_or(Ok(()));
732 self.last_batch = Some(batch.clone());
733 res
734 }
735
736 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
738 use std::fmt::Write;
739
740 let mut message = String::new();
741 if let Some(last) = &self.last_batch {
742 write!(
743 message,
744 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
745 last.primary_key(),
746 last.last_timestamp(),
747 last.last_sequence()
748 )
749 .unwrap();
750 }
751 write!(
752 message,
753 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
754 batch.primary_key(),
755 batch.timestamps(),
756 batch.sequences()
757 )
758 .unwrap();
759
760 message
761 }
762
763 pub(crate) fn ensure_part_range_batch(
765 &mut self,
766 scanner: &str,
767 region_id: store_api::storage::RegionId,
768 partition: usize,
769 part_range: store_api::region_engine::PartitionRange,
770 batch: &Batch,
771 ) {
772 if let Err(e) = self.check_monotonic(batch) {
773 let err_msg = format!(
774 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
775 scanner, e, region_id, partition, part_range,
776 );
777 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
778 panic!("{err_msg}, batch rows: {}", batch.num_rows());
780 }
781 }
782}
783
784const TIMESTAMP_KEY_LEN: usize = 9;
786
787fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
789 let arrays: Vec<_> = iter.collect();
790 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
791 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
792}
793
794#[derive(Debug, PartialEq, Eq, Clone)]
796pub struct BatchColumn {
797 pub column_id: ColumnId,
799 pub data: VectorRef,
801}
802
803pub struct BatchBuilder {
805 primary_key: Vec<u8>,
806 timestamps: Option<VectorRef>,
807 sequences: Option<Arc<UInt64Vector>>,
808 op_types: Option<Arc<UInt8Vector>>,
809 fields: Vec<BatchColumn>,
810}
811
812impl BatchBuilder {
813 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
815 BatchBuilder {
816 primary_key,
817 timestamps: None,
818 sequences: None,
819 op_types: None,
820 fields: Vec::new(),
821 }
822 }
823
824 pub fn with_required_columns(
826 primary_key: Vec<u8>,
827 timestamps: VectorRef,
828 sequences: Arc<UInt64Vector>,
829 op_types: Arc<UInt8Vector>,
830 ) -> BatchBuilder {
831 BatchBuilder {
832 primary_key,
833 timestamps: Some(timestamps),
834 sequences: Some(sequences),
835 op_types: Some(op_types),
836 fields: Vec::new(),
837 }
838 }
839
840 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
842 self.fields = fields;
843 self
844 }
845
846 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
848 self.fields.push(column);
849 self
850 }
851
852 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
854 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
855 self.fields.push(BatchColumn {
856 column_id,
857 data: vector,
858 });
859
860 Ok(self)
861 }
862
863 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
865 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
866 ensure!(
867 vector.data_type().is_timestamp(),
868 InvalidBatchSnafu {
869 reason: format!("{:?} is not a timestamp type", vector.data_type()),
870 }
871 );
872
873 self.timestamps = Some(vector);
874 Ok(self)
875 }
876
877 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
879 ensure!(
880 *array.data_type() == arrow::datatypes::DataType::UInt64,
881 InvalidBatchSnafu {
882 reason: "sequence array is not UInt64 type",
883 }
884 );
885 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
887 self.sequences = Some(vector);
888
889 Ok(self)
890 }
891
892 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
894 ensure!(
895 *array.data_type() == arrow::datatypes::DataType::UInt8,
896 InvalidBatchSnafu {
897 reason: "sequence array is not UInt8 type",
898 }
899 );
900 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
902 self.op_types = Some(vector);
903
904 Ok(self)
905 }
906
907 pub fn build(self) -> Result<Batch> {
909 let timestamps = self.timestamps.context(InvalidBatchSnafu {
910 reason: "missing timestamps",
911 })?;
912 let sequences = self.sequences.context(InvalidBatchSnafu {
913 reason: "missing sequences",
914 })?;
915 let op_types = self.op_types.context(InvalidBatchSnafu {
916 reason: "missing op_types",
917 })?;
918 assert_eq!(0, timestamps.null_count());
921 assert_eq!(0, sequences.null_count());
922 assert_eq!(0, op_types.null_count());
923
924 let ts_len = timestamps.len();
925 ensure!(
926 sequences.len() == ts_len,
927 InvalidBatchSnafu {
928 reason: format!(
929 "sequence have different len {} != {}",
930 sequences.len(),
931 ts_len
932 ),
933 }
934 );
935 ensure!(
936 op_types.len() == ts_len,
937 InvalidBatchSnafu {
938 reason: format!(
939 "op type have different len {} != {}",
940 op_types.len(),
941 ts_len
942 ),
943 }
944 );
945 for column in &self.fields {
946 ensure!(
947 column.data.len() == ts_len,
948 InvalidBatchSnafu {
949 reason: format!(
950 "column {} has different len {} != {}",
951 column.column_id,
952 column.data.len(),
953 ts_len
954 ),
955 }
956 );
957 }
958
959 Ok(Batch {
960 primary_key: self.primary_key,
961 pk_values: None,
962 timestamps,
963 sequences,
964 op_types,
965 fields: self.fields,
966 fields_idx: None,
967 })
968 }
969}
970
971impl From<Batch> for BatchBuilder {
972 fn from(batch: Batch) -> Self {
973 Self {
974 primary_key: batch.primary_key,
975 timestamps: Some(batch.timestamps),
976 sequences: Some(batch.sequences),
977 op_types: Some(batch.op_types),
978 fields: batch.fields,
979 }
980 }
981}
982
983pub enum Source {
987 Reader(BoxedBatchReader),
989 Iter(BoxedBatchIterator),
991 Stream(BoxedBatchStream),
993 PruneReader(PruneReader),
995}
996
997impl Source {
998 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1000 match self {
1001 Source::Reader(reader) => reader.next_batch().await,
1002 Source::Iter(iter) => iter.next().transpose(),
1003 Source::Stream(stream) => stream.try_next().await,
1004 Source::PruneReader(reader) => reader.next_batch().await,
1005 }
1006 }
1007}
1008
1009pub enum FlatSource {
1011 Iter(BoxedRecordBatchIterator),
1013 Stream(BoxedRecordBatchStream),
1015}
1016
1017impl FlatSource {
1018 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1020 match self {
1021 FlatSource::Iter(iter) => iter.next().transpose(),
1022 FlatSource::Stream(stream) => stream.try_next().await,
1023 }
1024 }
1025}
1026
1027#[async_trait]
1031pub trait BatchReader: Send {
1032 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1040}
1041
1042pub type BoxedBatchReader = Box<dyn BatchReader>;
1044
1045pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1047
1048pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1050
1051#[async_trait::async_trait]
1052impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1053 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1054 (**self).next_batch().await
1055 }
1056}
1057
1058#[derive(Debug, Default)]
1060pub(crate) struct ScannerMetrics {
1061 prepare_scan_cost: Duration,
1063 build_reader_cost: Duration,
1065 scan_cost: Duration,
1067 yield_cost: Duration,
1069 num_batches: usize,
1071 num_rows: usize,
1073 num_mem_ranges: usize,
1075 num_file_ranges: usize,
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1082 use store_api::codec::PrimaryKeyEncoding;
1083 use store_api::storage::consts::ReservedColumnId;
1084
1085 use super::*;
1086 use crate::error::Error;
1087 use crate::test_util::new_batch_builder;
1088
1089 fn new_batch(
1090 timestamps: &[i64],
1091 sequences: &[u64],
1092 op_types: &[OpType],
1093 field: &[u64],
1094 ) -> Batch {
1095 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1096 .build()
1097 .unwrap()
1098 }
1099
1100 #[test]
1101 fn test_empty_batch() {
1102 let batch = Batch::empty();
1103 assert!(batch.is_empty());
1104 assert_eq!(None, batch.first_timestamp());
1105 assert_eq!(None, batch.last_timestamp());
1106 assert_eq!(None, batch.first_sequence());
1107 assert_eq!(None, batch.last_sequence());
1108 assert!(batch.timestamps_native().is_none());
1109 }
1110
1111 #[test]
1112 fn test_first_last_one() {
1113 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1114 assert_eq!(
1115 Timestamp::new_millisecond(1),
1116 batch.first_timestamp().unwrap()
1117 );
1118 assert_eq!(
1119 Timestamp::new_millisecond(1),
1120 batch.last_timestamp().unwrap()
1121 );
1122 assert_eq!(2, batch.first_sequence().unwrap());
1123 assert_eq!(2, batch.last_sequence().unwrap());
1124 }
1125
1126 #[test]
1127 fn test_first_last_multiple() {
1128 let batch = new_batch(
1129 &[1, 2, 3],
1130 &[11, 12, 13],
1131 &[OpType::Put, OpType::Put, OpType::Put],
1132 &[21, 22, 23],
1133 );
1134 assert_eq!(
1135 Timestamp::new_millisecond(1),
1136 batch.first_timestamp().unwrap()
1137 );
1138 assert_eq!(
1139 Timestamp::new_millisecond(3),
1140 batch.last_timestamp().unwrap()
1141 );
1142 assert_eq!(11, batch.first_sequence().unwrap());
1143 assert_eq!(13, batch.last_sequence().unwrap());
1144 }
1145
1146 #[test]
1147 fn test_slice() {
1148 let batch = new_batch(
1149 &[1, 2, 3, 4],
1150 &[11, 12, 13, 14],
1151 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1152 &[21, 22, 23, 24],
1153 );
1154 let batch = batch.slice(1, 2);
1155 let expect = new_batch(
1156 &[2, 3],
1157 &[12, 13],
1158 &[OpType::Delete, OpType::Put],
1159 &[22, 23],
1160 );
1161 assert_eq!(expect, batch);
1162 }
1163
1164 #[test]
1165 fn test_timestamps_native() {
1166 let batch = new_batch(
1167 &[1, 2, 3, 4],
1168 &[11, 12, 13, 14],
1169 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1170 &[21, 22, 23, 24],
1171 );
1172 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1173 }
1174
1175 #[test]
1176 fn test_concat_empty() {
1177 let err = Batch::concat(vec![]).unwrap_err();
1178 assert!(
1179 matches!(err, Error::InvalidBatch { .. }),
1180 "unexpected err: {err}"
1181 );
1182 }
1183
1184 #[test]
1185 fn test_concat_one() {
1186 let batch = new_batch(&[], &[], &[], &[]);
1187 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1188 assert_eq!(batch, actual);
1189
1190 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1191 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1192 assert_eq!(batch, actual);
1193 }
1194
1195 #[test]
1196 fn test_concat_multiple() {
1197 let batches = vec![
1198 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1199 new_batch(
1200 &[3, 4, 5],
1201 &[13, 14, 15],
1202 &[OpType::Put, OpType::Delete, OpType::Put],
1203 &[23, 24, 25],
1204 ),
1205 new_batch(&[], &[], &[], &[]),
1206 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1207 ];
1208 let batch = Batch::concat(batches).unwrap();
1209 let expect = new_batch(
1210 &[1, 2, 3, 4, 5, 6],
1211 &[11, 12, 13, 14, 15, 16],
1212 &[
1213 OpType::Put,
1214 OpType::Put,
1215 OpType::Put,
1216 OpType::Delete,
1217 OpType::Put,
1218 OpType::Put,
1219 ],
1220 &[21, 22, 23, 24, 25, 26],
1221 );
1222 assert_eq!(expect, batch);
1223 }
1224
1225 #[test]
1226 fn test_concat_different() {
1227 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1228 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1229 batch2.primary_key = b"hello".to_vec();
1230 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1231 assert!(
1232 matches!(err, Error::InvalidBatch { .. }),
1233 "unexpected err: {err}"
1234 );
1235 }
1236
1237 #[test]
1238 fn test_concat_different_fields() {
1239 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1240 let fields = vec![
1241 batch1.fields()[0].clone(),
1242 BatchColumn {
1243 column_id: 2,
1244 data: Arc::new(UInt64Vector::from_slice([2])),
1245 },
1246 ];
1247 let batch2 = batch1.clone().with_fields(fields).unwrap();
1249 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1250 assert!(
1251 matches!(err, Error::InvalidBatch { .. }),
1252 "unexpected err: {err}"
1253 );
1254
1255 let fields = vec![BatchColumn {
1257 column_id: 2,
1258 data: Arc::new(UInt64Vector::from_slice([2])),
1259 }];
1260 let batch2 = batch1.clone().with_fields(fields).unwrap();
1261 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1262 assert!(
1263 matches!(err, Error::InvalidBatch { .. }),
1264 "unexpected err: {err}"
1265 );
1266 }
1267
1268 #[test]
1269 fn test_filter_deleted_empty() {
1270 let mut batch = new_batch(&[], &[], &[], &[]);
1271 batch.filter_deleted().unwrap();
1272 assert!(batch.is_empty());
1273 }
1274
1275 #[test]
1276 fn test_filter_deleted() {
1277 let mut batch = new_batch(
1278 &[1, 2, 3, 4],
1279 &[11, 12, 13, 14],
1280 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1281 &[21, 22, 23, 24],
1282 );
1283 batch.filter_deleted().unwrap();
1284 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1285 assert_eq!(expect, batch);
1286
1287 let mut batch = new_batch(
1288 &[1, 2, 3, 4],
1289 &[11, 12, 13, 14],
1290 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1291 &[21, 22, 23, 24],
1292 );
1293 let expect = batch.clone();
1294 batch.filter_deleted().unwrap();
1295 assert_eq!(expect, batch);
1296 }
1297
1298 #[test]
1299 fn test_filter_by_sequence() {
1300 let mut batch = new_batch(
1302 &[1, 2, 3, 4],
1303 &[11, 12, 13, 14],
1304 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1305 &[21, 22, 23, 24],
1306 );
1307 batch
1308 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1309 .unwrap();
1310 let expect = new_batch(
1311 &[1, 2, 3],
1312 &[11, 12, 13],
1313 &[OpType::Put, OpType::Put, OpType::Put],
1314 &[21, 22, 23],
1315 );
1316 assert_eq!(expect, batch);
1317
1318 let mut batch = new_batch(
1320 &[1, 2, 3, 4],
1321 &[11, 12, 13, 14],
1322 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1323 &[21, 22, 23, 24],
1324 );
1325
1326 batch
1327 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1328 .unwrap();
1329 assert!(batch.is_empty());
1330
1331 let mut batch = new_batch(
1333 &[1, 2, 3, 4],
1334 &[11, 12, 13, 14],
1335 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1336 &[21, 22, 23, 24],
1337 );
1338 let expect = batch.clone();
1339 batch.filter_by_sequence(None).unwrap();
1340 assert_eq!(expect, batch);
1341
1342 let mut batch = new_batch(&[], &[], &[], &[]);
1344 batch
1345 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1346 .unwrap();
1347 assert!(batch.is_empty());
1348
1349 let mut batch = new_batch(&[], &[], &[], &[]);
1351 batch.filter_by_sequence(None).unwrap();
1352 assert!(batch.is_empty());
1353
1354 let mut batch = new_batch(
1356 &[1, 2, 3, 4],
1357 &[11, 12, 13, 14],
1358 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1359 &[21, 22, 23, 24],
1360 );
1361 batch
1362 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1363 .unwrap();
1364 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1365 assert_eq!(expect, batch);
1366
1367 let mut batch = new_batch(
1369 &[1, 2, 3, 4],
1370 &[11, 12, 13, 14],
1371 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1372 &[21, 22, 23, 24],
1373 );
1374 batch
1375 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1376 .unwrap();
1377 assert!(batch.is_empty());
1378
1379 let mut batch = new_batch(
1381 &[1, 2, 3, 4, 5],
1382 &[11, 12, 13, 14, 15],
1383 &[
1384 OpType::Put,
1385 OpType::Put,
1386 OpType::Put,
1387 OpType::Put,
1388 OpType::Put,
1389 ],
1390 &[21, 22, 23, 24, 25],
1391 );
1392 batch
1393 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1394 .unwrap();
1395 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1396 assert_eq!(expect, batch);
1397
1398 let mut batch = new_batch(
1400 &[1, 2, 3, 4, 5],
1401 &[11, 12, 13, 14, 15],
1402 &[
1403 OpType::Put,
1404 OpType::Delete,
1405 OpType::Put,
1406 OpType::Delete,
1407 OpType::Put,
1408 ],
1409 &[21, 22, 23, 24, 25],
1410 );
1411 batch
1412 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1413 .unwrap();
1414 let expect = new_batch(
1415 &[2, 3],
1416 &[12, 13],
1417 &[OpType::Delete, OpType::Put],
1418 &[22, 23],
1419 );
1420 assert_eq!(expect, batch);
1421
1422 let mut batch = new_batch(
1424 &[1, 2, 3, 4],
1425 &[11, 12, 13, 14],
1426 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1427 &[21, 22, 23, 24],
1428 );
1429 batch
1430 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1431 .unwrap();
1432 assert!(batch.is_empty());
1433 }
1434
1435 #[test]
1436 fn test_filter() {
1437 let mut batch = new_batch(
1439 &[1, 2, 3, 4],
1440 &[11, 12, 13, 14],
1441 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1442 &[21, 22, 23, 24],
1443 );
1444 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1445 batch.filter(&predicate).unwrap();
1446 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1447 assert_eq!(expect, batch);
1448
1449 let mut batch = new_batch(
1451 &[1, 2, 3, 4],
1452 &[11, 12, 13, 14],
1453 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1454 &[21, 22, 23, 24],
1455 );
1456 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1457 batch.filter(&predicate).unwrap();
1458 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1459 assert_eq!(expect, batch);
1460
1461 let predicate = BooleanVector::from_vec(vec![false, false]);
1463 batch.filter(&predicate).unwrap();
1464 assert!(batch.is_empty());
1465 }
1466
1467 #[test]
1468 fn test_sort_and_dedup() {
1469 let original = new_batch(
1470 &[2, 3, 1, 4, 5, 2],
1471 &[1, 2, 3, 4, 5, 6],
1472 &[
1473 OpType::Put,
1474 OpType::Put,
1475 OpType::Put,
1476 OpType::Put,
1477 OpType::Put,
1478 OpType::Put,
1479 ],
1480 &[21, 22, 23, 24, 25, 26],
1481 );
1482
1483 let mut batch = original.clone();
1484 batch.sort(true).unwrap();
1485 assert_eq!(
1487 new_batch(
1488 &[1, 2, 3, 4, 5],
1489 &[3, 6, 2, 4, 5],
1490 &[
1491 OpType::Put,
1492 OpType::Put,
1493 OpType::Put,
1494 OpType::Put,
1495 OpType::Put,
1496 ],
1497 &[23, 26, 22, 24, 25],
1498 ),
1499 batch
1500 );
1501
1502 let mut batch = original.clone();
1503 batch.sort(false).unwrap();
1504
1505 assert_eq!(
1507 new_batch(
1508 &[1, 2, 2, 3, 4, 5],
1509 &[3, 6, 1, 2, 4, 5],
1510 &[
1511 OpType::Put,
1512 OpType::Put,
1513 OpType::Put,
1514 OpType::Put,
1515 OpType::Put,
1516 OpType::Put,
1517 ],
1518 &[23, 26, 21, 22, 24, 25],
1519 ),
1520 batch
1521 );
1522
1523 let original = new_batch(
1524 &[2, 2, 1],
1525 &[1, 6, 1],
1526 &[OpType::Delete, OpType::Put, OpType::Put],
1527 &[21, 22, 23],
1528 );
1529
1530 let mut batch = original.clone();
1531 batch.sort(true).unwrap();
1532 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1533 assert_eq!(expect, batch);
1534
1535 let mut batch = original.clone();
1536 batch.sort(false).unwrap();
1537 let expect = new_batch(
1538 &[1, 2, 2],
1539 &[1, 6, 1],
1540 &[OpType::Put, OpType::Put, OpType::Delete],
1541 &[23, 22, 21],
1542 );
1543 assert_eq!(expect, batch);
1544 }
1545
1546 #[test]
1547 fn test_get_value() {
1548 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1549
1550 for encoding in encodings {
1551 let codec = build_primary_key_codec_with_fields(
1552 encoding,
1553 [
1554 (
1555 ReservedColumnId::table_id(),
1556 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1557 ),
1558 (
1559 ReservedColumnId::tsid(),
1560 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1561 ),
1562 (
1563 100,
1564 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1565 ),
1566 (
1567 200,
1568 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1569 ),
1570 ]
1571 .into_iter(),
1572 );
1573
1574 let values = [
1575 Value::UInt32(1000),
1576 Value::UInt64(2000),
1577 Value::String("abcdefgh".into()),
1578 Value::String("zyxwvu".into()),
1579 ];
1580 let mut buf = vec![];
1581 codec
1582 .encode_values(
1583 &[
1584 (ReservedColumnId::table_id(), values[0].clone()),
1585 (ReservedColumnId::tsid(), values[1].clone()),
1586 (100, values[2].clone()),
1587 (200, values[3].clone()),
1588 ],
1589 &mut buf,
1590 )
1591 .unwrap();
1592
1593 let field_col_id = 2;
1594 let mut batch = new_batch_builder(
1595 &buf,
1596 &[1, 2, 3],
1597 &[1, 1, 1],
1598 &[OpType::Put, OpType::Put, OpType::Put],
1599 field_col_id,
1600 &[42, 43, 44],
1601 )
1602 .build()
1603 .unwrap();
1604
1605 let v = batch
1606 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1607 .unwrap()
1608 .unwrap();
1609 assert_eq!(values[0], *v);
1610
1611 let v = batch
1612 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1613 .unwrap()
1614 .unwrap();
1615 assert_eq!(values[1], *v);
1616
1617 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1618 assert_eq!(values[2], *v);
1619
1620 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1621 assert_eq!(values[3], *v);
1622
1623 let v = batch.field_col_value(field_col_id).unwrap();
1624 assert_eq!(v.data.get(0), Value::UInt64(42));
1625 assert_eq!(v.data.get(1), Value::UInt64(43));
1626 assert_eq!(v.data.get(2), Value::UInt64(44));
1627 }
1628 }
1629}