1pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub mod range;
25pub mod scan_region;
26pub mod scan_util;
27pub(crate) mod seq_scan;
28pub mod series_scan;
29pub mod stream;
30pub(crate) mod unordered_scan;
31
32use std::collections::{HashMap, HashSet};
33use std::sync::Arc;
34use std::time::Duration;
35
36use api::v1::OpType;
37use async_trait::async_trait;
38use common_time::Timestamp;
39use datafusion_common::arrow::array::UInt8Array;
40use datatypes::arrow;
41use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
42use datatypes::arrow::compute::SortOptions;
43use datatypes::arrow::row::{RowConverter, SortField};
44use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
45use datatypes::scalars::ScalarVectorBuilder;
46use datatypes::types::TimestampType;
47use datatypes::value::{Value, ValueRef};
48use datatypes::vectors::{
49 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
50 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
51 UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector,
52 VectorRef,
53};
54use futures::stream::BoxStream;
55use futures::TryStreamExt;
56use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
57use snafu::{ensure, OptionExt, ResultExt};
58use store_api::metadata::RegionMetadata;
59use store_api::storage::{ColumnId, SequenceNumber};
60
61use crate::error::{
62 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
63 Result,
64};
65use crate::memtable::BoxedBatchIterator;
66use crate::read::prune::PruneReader;
67
68#[derive(Debug, PartialEq, Clone)]
73pub struct Batch {
74 primary_key: Vec<u8>,
76 pk_values: Option<CompositeValues>,
78 timestamps: VectorRef,
80 sequences: Arc<UInt64Vector>,
84 op_types: Arc<UInt8Vector>,
88 fields: Vec<BatchColumn>,
90 fields_idx: Option<HashMap<ColumnId, usize>>,
92}
93
94impl Batch {
95 pub fn new(
97 primary_key: Vec<u8>,
98 timestamps: VectorRef,
99 sequences: Arc<UInt64Vector>,
100 op_types: Arc<UInt8Vector>,
101 fields: Vec<BatchColumn>,
102 ) -> Result<Batch> {
103 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
104 .with_fields(fields)
105 .build()
106 }
107
108 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
110 Batch::new(
111 self.primary_key,
112 self.timestamps,
113 self.sequences,
114 self.op_types,
115 fields,
116 )
117 }
118
119 pub fn primary_key(&self) -> &[u8] {
121 &self.primary_key
122 }
123
124 pub fn pk_values(&self) -> Option<&CompositeValues> {
126 self.pk_values.as_ref()
127 }
128
129 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
131 self.pk_values = Some(pk_values);
132 }
133
134 #[cfg(any(test, feature = "test"))]
136 pub fn remove_pk_values(&mut self) {
137 self.pk_values = None;
138 }
139
140 pub fn fields(&self) -> &[BatchColumn] {
142 &self.fields
143 }
144
145 pub fn timestamps(&self) -> &VectorRef {
147 &self.timestamps
148 }
149
150 pub fn sequences(&self) -> &Arc<UInt64Vector> {
152 &self.sequences
153 }
154
155 pub fn op_types(&self) -> &Arc<UInt8Vector> {
157 &self.op_types
158 }
159
160 pub fn num_rows(&self) -> usize {
162 self.sequences.len()
165 }
166
167 pub(crate) fn empty() -> Self {
169 Self {
170 primary_key: vec![],
171 pk_values: None,
172 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
173 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
174 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
175 fields: vec![],
176 fields_idx: None,
177 }
178 }
179
180 pub fn is_empty(&self) -> bool {
182 self.num_rows() == 0
183 }
184
185 pub fn first_timestamp(&self) -> Option<Timestamp> {
187 if self.timestamps.is_empty() {
188 return None;
189 }
190
191 Some(self.get_timestamp(0))
192 }
193
194 pub fn last_timestamp(&self) -> Option<Timestamp> {
196 if self.timestamps.is_empty() {
197 return None;
198 }
199
200 Some(self.get_timestamp(self.timestamps.len() - 1))
201 }
202
203 pub fn first_sequence(&self) -> Option<SequenceNumber> {
205 if self.sequences.is_empty() {
206 return None;
207 }
208
209 Some(self.get_sequence(0))
210 }
211
212 pub fn last_sequence(&self) -> Option<SequenceNumber> {
214 if self.sequences.is_empty() {
215 return None;
216 }
217
218 Some(self.get_sequence(self.sequences.len() - 1))
219 }
220
221 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
226 self.primary_key = primary_key;
227 }
228
229 pub fn slice(&self, offset: usize, length: usize) -> Batch {
234 let fields = self
235 .fields
236 .iter()
237 .map(|column| BatchColumn {
238 column_id: column.column_id,
239 data: column.data.slice(offset, length),
240 })
241 .collect();
242 Batch {
244 primary_key: self.primary_key.clone(),
247 pk_values: self.pk_values.clone(),
248 timestamps: self.timestamps.slice(offset, length),
249 sequences: Arc::new(self.sequences.get_slice(offset, length)),
250 op_types: Arc::new(self.op_types.get_slice(offset, length)),
251 fields,
252 fields_idx: self.fields_idx.clone(),
253 }
254 }
255
256 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
260 ensure!(
261 !batches.is_empty(),
262 InvalidBatchSnafu {
263 reason: "empty batches",
264 }
265 );
266 if batches.len() == 1 {
267 return Ok(batches.pop().unwrap());
269 }
270
271 let primary_key = std::mem::take(&mut batches[0].primary_key);
272 let first = &batches[0];
273 ensure!(
275 batches
276 .iter()
277 .skip(1)
278 .all(|b| b.primary_key() == primary_key),
279 InvalidBatchSnafu {
280 reason: "batches have different primary key",
281 }
282 );
283 for b in batches.iter().skip(1) {
284 ensure!(
285 b.fields.len() == first.fields.len(),
286 InvalidBatchSnafu {
287 reason: "batches have different field num",
288 }
289 );
290 for (l, r) in b.fields.iter().zip(&first.fields) {
291 ensure!(
292 l.column_id == r.column_id,
293 InvalidBatchSnafu {
294 reason: "batches have different fields",
295 }
296 );
297 }
298 }
299
300 let mut builder = BatchBuilder::new(primary_key);
302 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
304 builder.timestamps_array(array)?;
305 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
306 builder.sequences_array(array)?;
307 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
308 builder.op_types_array(array)?;
309 for (i, batch_column) in first.fields.iter().enumerate() {
310 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
311 builder.push_field_array(batch_column.column_id, array)?;
312 }
313
314 builder.build()
315 }
316
317 pub fn filter_deleted(&mut self) -> Result<()> {
319 let array = self.op_types.as_arrow();
321 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
323 let predicate =
324 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
325 self.filter(&BooleanVector::from(predicate))
326 }
327
328 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
331 self.timestamps = self
332 .timestamps
333 .filter(predicate)
334 .context(ComputeVectorSnafu)?;
335 self.sequences = Arc::new(
336 UInt64Vector::try_from_arrow_array(
337 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
338 .context(ComputeArrowSnafu)?,
339 )
340 .unwrap(),
341 );
342 self.op_types = Arc::new(
343 UInt8Vector::try_from_arrow_array(
344 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
345 .context(ComputeArrowSnafu)?,
346 )
347 .unwrap(),
348 );
349 for batch_column in &mut self.fields {
350 batch_column.data = batch_column
351 .data
352 .filter(predicate)
353 .context(ComputeVectorSnafu)?;
354 }
355
356 Ok(())
357 }
358
359 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
361 let seq = match (sequence, self.last_sequence()) {
362 (None, _) | (_, None) => return Ok(()),
363 (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
364 (Some(sequence), Some(_)) => sequence,
365 };
366
367 let seqs = self.sequences.as_arrow();
368 let sequence = UInt64Array::new_scalar(seq);
369 let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
370 .context(ComputeArrowSnafu)?;
371 let predicate = BooleanVector::from(predicate);
372 self.filter(&predicate)?;
373
374 Ok(())
375 }
376
377 pub fn sort(&mut self, dedup: bool) -> Result<()> {
384 let converter = RowConverter::new(vec![
387 SortField::new(self.timestamps.data_type().as_arrow_type()),
388 SortField::new_with_options(
389 self.sequences.data_type().as_arrow_type(),
390 SortOptions {
391 descending: true,
392 ..Default::default()
393 },
394 ),
395 ])
396 .context(ComputeArrowSnafu)?;
397 let columns = [
399 self.timestamps.to_arrow_array(),
400 self.sequences.to_arrow_array(),
401 ];
402 let rows = converter.convert_columns(&columns).unwrap();
403 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
404
405 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
406 if !was_sorted {
407 to_sort.sort_unstable_by_key(|x| x.1);
408 }
409
410 let num_rows = to_sort.len();
411 if dedup {
412 to_sort.dedup_by(|left, right| {
414 debug_assert_eq!(18, left.1.as_ref().len());
415 debug_assert_eq!(18, right.1.as_ref().len());
416 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
417 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
419 });
420 }
421 let no_dedup = to_sort.len() == num_rows;
422
423 if was_sorted && no_dedup {
424 return Ok(());
425 }
426 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
427 self.take_in_place(&indices)
428 }
429
430 pub fn memory_size(&self) -> usize {
432 let mut size = std::mem::size_of::<Self>();
433 size += self.primary_key.len();
434 size += self.timestamps.memory_size();
435 size += self.sequences.memory_size();
436 size += self.op_types.memory_size();
437 for batch_column in &self.fields {
438 size += batch_column.data.memory_size();
439 }
440 size
441 }
442
443 pub(crate) fn projected_fields(
445 metadata: &RegionMetadata,
446 projection: &[ColumnId],
447 ) -> Vec<(ColumnId, ConcreteDataType)> {
448 let projected_ids: HashSet<_> = projection.iter().copied().collect();
449 metadata
450 .field_columns()
451 .filter_map(|column| {
452 if projected_ids.contains(&column.column_id) {
453 Some((column.column_id, column.column_schema.data_type.clone()))
454 } else {
455 None
456 }
457 })
458 .collect()
459 }
460
461 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
463 if self.timestamps.is_empty() {
464 return None;
465 }
466
467 let values = match self.timestamps.data_type() {
468 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
469 .timestamps
470 .as_any()
471 .downcast_ref::<TimestampSecondVector>()
472 .unwrap()
473 .as_arrow()
474 .values(),
475 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
476 .timestamps
477 .as_any()
478 .downcast_ref::<TimestampMillisecondVector>()
479 .unwrap()
480 .as_arrow()
481 .values(),
482 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
483 .timestamps
484 .as_any()
485 .downcast_ref::<TimestampMicrosecondVector>()
486 .unwrap()
487 .as_arrow()
488 .values(),
489 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
490 .timestamps
491 .as_any()
492 .downcast_ref::<TimestampNanosecondVector>()
493 .unwrap()
494 .as_arrow()
495 .values(),
496 other => panic!("timestamps in a Batch has other type {:?}", other),
497 };
498
499 Some(values)
500 }
501
502 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
504 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
505 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
506 .context(ComputeArrowSnafu)?;
507 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
509 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
510 .context(ComputeArrowSnafu)?;
511 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
512 for batch_column in &mut self.fields {
513 batch_column.data = batch_column
514 .data
515 .take(indices)
516 .context(ComputeVectorSnafu)?;
517 }
518
519 Ok(())
520 }
521
522 fn get_timestamp(&self, index: usize) -> Timestamp {
527 match self.timestamps.get_ref(index) {
528 ValueRef::Timestamp(timestamp) => timestamp,
529
530 value => panic!("{:?} is not a timestamp", value),
532 }
533 }
534
535 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
540 self.sequences.get_data(index).unwrap()
542 }
543
544 #[cfg(debug_assertions)]
546 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
547 use std::cmp::Ordering;
548 if self.timestamps_native().is_none() {
549 return Ok(());
550 }
551
552 let timestamps = self.timestamps_native().unwrap();
553 let sequences = self.sequences.as_arrow().values();
554 for (i, window) in timestamps.windows(2).enumerate() {
555 let current = window[0];
556 let next = window[1];
557 let current_sequence = sequences[i];
558 let next_sequence = sequences[i + 1];
559 match current.cmp(&next) {
560 Ordering::Less => {
561 continue;
563 }
564 Ordering::Equal => {
565 if current_sequence < next_sequence {
567 return Err(format!(
568 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
569 current, next, current_sequence, next_sequence, i
570 ));
571 }
572 }
573 Ordering::Greater => {
574 return Err(format!(
576 "timestamps are not monotonic: {} > {}, index: {}",
577 current, next, i
578 ));
579 }
580 }
581 }
582
583 Ok(())
584 }
585
586 #[cfg(debug_assertions)]
588 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
589 if self.primary_key() < other.primary_key() {
591 return Ok(());
592 }
593 if self.primary_key() > other.primary_key() {
594 return Err(format!(
595 "primary key is not monotonic: {:?} > {:?}",
596 self.primary_key(),
597 other.primary_key()
598 ));
599 }
600 if self.last_timestamp() < other.first_timestamp() {
602 return Ok(());
603 }
604 if self.last_timestamp() > other.first_timestamp() {
605 return Err(format!(
606 "timestamps are not monotonic: {:?} > {:?}",
607 self.last_timestamp(),
608 other.first_timestamp()
609 ));
610 }
611 if self.last_sequence() >= other.first_sequence() {
613 return Ok(());
614 }
615 Err(format!(
616 "sequences are not monotonic: {:?} < {:?}",
617 self.last_sequence(),
618 other.first_sequence()
619 ))
620 }
621
622 pub fn pk_col_value(
626 &mut self,
627 codec: &dyn PrimaryKeyCodec,
628 col_idx_in_pk: usize,
629 column_id: ColumnId,
630 ) -> Result<Option<&Value>> {
631 if self.pk_values.is_none() {
632 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
633 }
634
635 let pk_values = self.pk_values.as_ref().unwrap();
636 Ok(match pk_values {
637 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
638 CompositeValues::Sparse(values) => values.get(&column_id),
639 })
640 }
641
642 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
646 if self.fields_idx.is_none() {
647 self.fields_idx = Some(
648 self.fields
649 .iter()
650 .enumerate()
651 .map(|(i, c)| (c.column_id, i))
652 .collect(),
653 );
654 }
655
656 self.fields_idx
657 .as_ref()
658 .unwrap()
659 .get(&column_id)
660 .map(|&idx| &self.fields[idx])
661 }
662}
663
664#[cfg(debug_assertions)]
666#[derive(Default)]
667pub(crate) struct BatchChecker {
668 last_batch: Option<Batch>,
669 start: Option<Timestamp>,
670 end: Option<Timestamp>,
671}
672
673#[cfg(debug_assertions)]
674impl BatchChecker {
675 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
677 self.start = start;
678 self
679 }
680
681 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
683 self.end = end;
684 self
685 }
686
687 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
690 batch.check_monotonic()?;
691
692 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
693 if start > first {
694 return Err(format!(
695 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
696 start, first
697 ));
698 }
699 }
700 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
701 if end <= last {
702 return Err(format!(
703 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
704 end, last
705 ));
706 }
707 }
708
709 let res = self
712 .last_batch
713 .as_ref()
714 .map(|last| last.check_next_batch(batch))
715 .unwrap_or(Ok(()));
716 self.last_batch = Some(batch.clone());
717 res
718 }
719
720 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
722 use std::fmt::Write;
723
724 let mut message = String::new();
725 if let Some(last) = &self.last_batch {
726 write!(
727 message,
728 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
729 last.primary_key(),
730 last.last_timestamp(),
731 last.last_sequence()
732 )
733 .unwrap();
734 }
735 write!(
736 message,
737 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
738 batch.primary_key(),
739 batch.timestamps(),
740 batch.sequences()
741 )
742 .unwrap();
743
744 message
745 }
746
747 pub(crate) fn ensure_part_range_batch(
749 &mut self,
750 scanner: &str,
751 region_id: store_api::storage::RegionId,
752 partition: usize,
753 part_range: store_api::region_engine::PartitionRange,
754 batch: &Batch,
755 ) {
756 if let Err(e) = self.check_monotonic(batch) {
757 let err_msg = format!(
758 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
759 scanner, e, region_id, partition, part_range,
760 );
761 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
762 panic!("{err_msg}, batch rows: {}", batch.num_rows());
764 }
765 }
766}
767
768const TIMESTAMP_KEY_LEN: usize = 9;
770
771fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
773 let arrays: Vec<_> = iter.collect();
774 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
775 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
776}
777
778#[derive(Debug, PartialEq, Eq, Clone)]
780pub struct BatchColumn {
781 pub column_id: ColumnId,
783 pub data: VectorRef,
785}
786
787pub struct BatchBuilder {
789 primary_key: Vec<u8>,
790 timestamps: Option<VectorRef>,
791 sequences: Option<Arc<UInt64Vector>>,
792 op_types: Option<Arc<UInt8Vector>>,
793 fields: Vec<BatchColumn>,
794}
795
796impl BatchBuilder {
797 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
799 BatchBuilder {
800 primary_key,
801 timestamps: None,
802 sequences: None,
803 op_types: None,
804 fields: Vec::new(),
805 }
806 }
807
808 pub fn with_required_columns(
810 primary_key: Vec<u8>,
811 timestamps: VectorRef,
812 sequences: Arc<UInt64Vector>,
813 op_types: Arc<UInt8Vector>,
814 ) -> BatchBuilder {
815 BatchBuilder {
816 primary_key,
817 timestamps: Some(timestamps),
818 sequences: Some(sequences),
819 op_types: Some(op_types),
820 fields: Vec::new(),
821 }
822 }
823
824 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
826 self.fields = fields;
827 self
828 }
829
830 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
832 self.fields.push(column);
833 self
834 }
835
836 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
838 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
839 self.fields.push(BatchColumn {
840 column_id,
841 data: vector,
842 });
843
844 Ok(self)
845 }
846
847 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
849 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
850 ensure!(
851 vector.data_type().is_timestamp(),
852 InvalidBatchSnafu {
853 reason: format!("{:?} is not a timestamp type", vector.data_type()),
854 }
855 );
856
857 self.timestamps = Some(vector);
858 Ok(self)
859 }
860
861 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
863 ensure!(
864 *array.data_type() == arrow::datatypes::DataType::UInt64,
865 InvalidBatchSnafu {
866 reason: "sequence array is not UInt64 type",
867 }
868 );
869 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
871 self.sequences = Some(vector);
872
873 Ok(self)
874 }
875
876 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
878 ensure!(
879 *array.data_type() == arrow::datatypes::DataType::UInt8,
880 InvalidBatchSnafu {
881 reason: "sequence array is not UInt8 type",
882 }
883 );
884 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
886 self.op_types = Some(vector);
887
888 Ok(self)
889 }
890
891 pub fn build(self) -> Result<Batch> {
893 let timestamps = self.timestamps.context(InvalidBatchSnafu {
894 reason: "missing timestamps",
895 })?;
896 let sequences = self.sequences.context(InvalidBatchSnafu {
897 reason: "missing sequences",
898 })?;
899 let op_types = self.op_types.context(InvalidBatchSnafu {
900 reason: "missing op_types",
901 })?;
902 assert_eq!(0, timestamps.null_count());
905 assert_eq!(0, sequences.null_count());
906 assert_eq!(0, op_types.null_count());
907
908 let ts_len = timestamps.len();
909 ensure!(
910 sequences.len() == ts_len,
911 InvalidBatchSnafu {
912 reason: format!(
913 "sequence have different len {} != {}",
914 sequences.len(),
915 ts_len
916 ),
917 }
918 );
919 ensure!(
920 op_types.len() == ts_len,
921 InvalidBatchSnafu {
922 reason: format!(
923 "op type have different len {} != {}",
924 op_types.len(),
925 ts_len
926 ),
927 }
928 );
929 for column in &self.fields {
930 ensure!(
931 column.data.len() == ts_len,
932 InvalidBatchSnafu {
933 reason: format!(
934 "column {} has different len {} != {}",
935 column.column_id,
936 column.data.len(),
937 ts_len
938 ),
939 }
940 );
941 }
942
943 Ok(Batch {
944 primary_key: self.primary_key,
945 pk_values: None,
946 timestamps,
947 sequences,
948 op_types,
949 fields: self.fields,
950 fields_idx: None,
951 })
952 }
953}
954
955impl From<Batch> for BatchBuilder {
956 fn from(batch: Batch) -> Self {
957 Self {
958 primary_key: batch.primary_key,
959 timestamps: Some(batch.timestamps),
960 sequences: Some(batch.sequences),
961 op_types: Some(batch.op_types),
962 fields: batch.fields,
963 }
964 }
965}
966
967pub enum Source {
971 Reader(BoxedBatchReader),
973 Iter(BoxedBatchIterator),
975 Stream(BoxedBatchStream),
977 PruneReader(PruneReader),
979}
980
981impl Source {
982 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
984 match self {
985 Source::Reader(reader) => reader.next_batch().await,
986 Source::Iter(iter) => iter.next().transpose(),
987 Source::Stream(stream) => stream.try_next().await,
988 Source::PruneReader(reader) => reader.next_batch().await,
989 }
990 }
991}
992
993#[async_trait]
997pub trait BatchReader: Send {
998 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1006}
1007
1008pub type BoxedBatchReader = Box<dyn BatchReader>;
1010
1011pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1013
1014#[async_trait::async_trait]
1015impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1016 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1017 (**self).next_batch().await
1018 }
1019}
1020
1021#[derive(Debug, Default)]
1023pub(crate) struct ScannerMetrics {
1024 prepare_scan_cost: Duration,
1026 build_reader_cost: Duration,
1028 scan_cost: Duration,
1030 yield_cost: Duration,
1032 num_batches: usize,
1034 num_rows: usize,
1036 num_mem_ranges: usize,
1038 num_file_ranges: usize,
1040}
1041
1042#[cfg(test)]
1043mod tests {
1044 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1045 use store_api::codec::PrimaryKeyEncoding;
1046 use store_api::storage::consts::ReservedColumnId;
1047
1048 use super::*;
1049 use crate::error::Error;
1050 use crate::test_util::new_batch_builder;
1051
1052 fn new_batch(
1053 timestamps: &[i64],
1054 sequences: &[u64],
1055 op_types: &[OpType],
1056 field: &[u64],
1057 ) -> Batch {
1058 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1059 .build()
1060 .unwrap()
1061 }
1062
1063 #[test]
1064 fn test_empty_batch() {
1065 let batch = Batch::empty();
1066 assert!(batch.is_empty());
1067 assert_eq!(None, batch.first_timestamp());
1068 assert_eq!(None, batch.last_timestamp());
1069 assert_eq!(None, batch.first_sequence());
1070 assert_eq!(None, batch.last_sequence());
1071 assert!(batch.timestamps_native().is_none());
1072 }
1073
1074 #[test]
1075 fn test_first_last_one() {
1076 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1077 assert_eq!(
1078 Timestamp::new_millisecond(1),
1079 batch.first_timestamp().unwrap()
1080 );
1081 assert_eq!(
1082 Timestamp::new_millisecond(1),
1083 batch.last_timestamp().unwrap()
1084 );
1085 assert_eq!(2, batch.first_sequence().unwrap());
1086 assert_eq!(2, batch.last_sequence().unwrap());
1087 }
1088
1089 #[test]
1090 fn test_first_last_multiple() {
1091 let batch = new_batch(
1092 &[1, 2, 3],
1093 &[11, 12, 13],
1094 &[OpType::Put, OpType::Put, OpType::Put],
1095 &[21, 22, 23],
1096 );
1097 assert_eq!(
1098 Timestamp::new_millisecond(1),
1099 batch.first_timestamp().unwrap()
1100 );
1101 assert_eq!(
1102 Timestamp::new_millisecond(3),
1103 batch.last_timestamp().unwrap()
1104 );
1105 assert_eq!(11, batch.first_sequence().unwrap());
1106 assert_eq!(13, batch.last_sequence().unwrap());
1107 }
1108
1109 #[test]
1110 fn test_slice() {
1111 let batch = new_batch(
1112 &[1, 2, 3, 4],
1113 &[11, 12, 13, 14],
1114 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1115 &[21, 22, 23, 24],
1116 );
1117 let batch = batch.slice(1, 2);
1118 let expect = new_batch(
1119 &[2, 3],
1120 &[12, 13],
1121 &[OpType::Delete, OpType::Put],
1122 &[22, 23],
1123 );
1124 assert_eq!(expect, batch);
1125 }
1126
1127 #[test]
1128 fn test_timestamps_native() {
1129 let batch = new_batch(
1130 &[1, 2, 3, 4],
1131 &[11, 12, 13, 14],
1132 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1133 &[21, 22, 23, 24],
1134 );
1135 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1136 }
1137
1138 #[test]
1139 fn test_concat_empty() {
1140 let err = Batch::concat(vec![]).unwrap_err();
1141 assert!(
1142 matches!(err, Error::InvalidBatch { .. }),
1143 "unexpected err: {err}"
1144 );
1145 }
1146
1147 #[test]
1148 fn test_concat_one() {
1149 let batch = new_batch(&[], &[], &[], &[]);
1150 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1151 assert_eq!(batch, actual);
1152
1153 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1154 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1155 assert_eq!(batch, actual);
1156 }
1157
1158 #[test]
1159 fn test_concat_multiple() {
1160 let batches = vec![
1161 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1162 new_batch(
1163 &[3, 4, 5],
1164 &[13, 14, 15],
1165 &[OpType::Put, OpType::Delete, OpType::Put],
1166 &[23, 24, 25],
1167 ),
1168 new_batch(&[], &[], &[], &[]),
1169 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1170 ];
1171 let batch = Batch::concat(batches).unwrap();
1172 let expect = new_batch(
1173 &[1, 2, 3, 4, 5, 6],
1174 &[11, 12, 13, 14, 15, 16],
1175 &[
1176 OpType::Put,
1177 OpType::Put,
1178 OpType::Put,
1179 OpType::Delete,
1180 OpType::Put,
1181 OpType::Put,
1182 ],
1183 &[21, 22, 23, 24, 25, 26],
1184 );
1185 assert_eq!(expect, batch);
1186 }
1187
1188 #[test]
1189 fn test_concat_different() {
1190 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1191 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1192 batch2.primary_key = b"hello".to_vec();
1193 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1194 assert!(
1195 matches!(err, Error::InvalidBatch { .. }),
1196 "unexpected err: {err}"
1197 );
1198 }
1199
1200 #[test]
1201 fn test_concat_different_fields() {
1202 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1203 let fields = vec![
1204 batch1.fields()[0].clone(),
1205 BatchColumn {
1206 column_id: 2,
1207 data: Arc::new(UInt64Vector::from_slice([2])),
1208 },
1209 ];
1210 let batch2 = batch1.clone().with_fields(fields).unwrap();
1212 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1213 assert!(
1214 matches!(err, Error::InvalidBatch { .. }),
1215 "unexpected err: {err}"
1216 );
1217
1218 let fields = vec![BatchColumn {
1220 column_id: 2,
1221 data: Arc::new(UInt64Vector::from_slice([2])),
1222 }];
1223 let batch2 = batch1.clone().with_fields(fields).unwrap();
1224 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1225 assert!(
1226 matches!(err, Error::InvalidBatch { .. }),
1227 "unexpected err: {err}"
1228 );
1229 }
1230
1231 #[test]
1232 fn test_filter_deleted_empty() {
1233 let mut batch = new_batch(&[], &[], &[], &[]);
1234 batch.filter_deleted().unwrap();
1235 assert!(batch.is_empty());
1236 }
1237
1238 #[test]
1239 fn test_filter_deleted() {
1240 let mut batch = new_batch(
1241 &[1, 2, 3, 4],
1242 &[11, 12, 13, 14],
1243 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1244 &[21, 22, 23, 24],
1245 );
1246 batch.filter_deleted().unwrap();
1247 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1248 assert_eq!(expect, batch);
1249
1250 let mut batch = new_batch(
1251 &[1, 2, 3, 4],
1252 &[11, 12, 13, 14],
1253 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1254 &[21, 22, 23, 24],
1255 );
1256 let expect = batch.clone();
1257 batch.filter_deleted().unwrap();
1258 assert_eq!(expect, batch);
1259 }
1260
1261 #[test]
1262 fn test_filter_by_sequence() {
1263 let mut batch = new_batch(
1265 &[1, 2, 3, 4],
1266 &[11, 12, 13, 14],
1267 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1268 &[21, 22, 23, 24],
1269 );
1270 batch.filter_by_sequence(Some(13)).unwrap();
1271 let expect = new_batch(
1272 &[1, 2, 3],
1273 &[11, 12, 13],
1274 &[OpType::Put, OpType::Put, OpType::Put],
1275 &[21, 22, 23],
1276 );
1277 assert_eq!(expect, batch);
1278
1279 let mut batch = new_batch(
1281 &[1, 2, 3, 4],
1282 &[11, 12, 13, 14],
1283 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1284 &[21, 22, 23, 24],
1285 );
1286
1287 batch.filter_by_sequence(Some(10)).unwrap();
1288 assert!(batch.is_empty());
1289
1290 let mut batch = new_batch(
1292 &[1, 2, 3, 4],
1293 &[11, 12, 13, 14],
1294 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1295 &[21, 22, 23, 24],
1296 );
1297 let expect = batch.clone();
1298 batch.filter_by_sequence(None).unwrap();
1299 assert_eq!(expect, batch);
1300
1301 let mut batch = new_batch(&[], &[], &[], &[]);
1303 batch.filter_by_sequence(Some(10)).unwrap();
1304 assert!(batch.is_empty());
1305
1306 let mut batch = new_batch(&[], &[], &[], &[]);
1308 batch.filter_by_sequence(None).unwrap();
1309 assert!(batch.is_empty());
1310 }
1311
1312 #[test]
1313 fn test_filter() {
1314 let mut batch = new_batch(
1316 &[1, 2, 3, 4],
1317 &[11, 12, 13, 14],
1318 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1319 &[21, 22, 23, 24],
1320 );
1321 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1322 batch.filter(&predicate).unwrap();
1323 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1324 assert_eq!(expect, batch);
1325
1326 let mut batch = new_batch(
1328 &[1, 2, 3, 4],
1329 &[11, 12, 13, 14],
1330 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1331 &[21, 22, 23, 24],
1332 );
1333 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1334 batch.filter(&predicate).unwrap();
1335 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1336 assert_eq!(expect, batch);
1337
1338 let predicate = BooleanVector::from_vec(vec![false, false]);
1340 batch.filter(&predicate).unwrap();
1341 assert!(batch.is_empty());
1342 }
1343
1344 #[test]
1345 fn test_sort_and_dedup() {
1346 let original = new_batch(
1347 &[2, 3, 1, 4, 5, 2],
1348 &[1, 2, 3, 4, 5, 6],
1349 &[
1350 OpType::Put,
1351 OpType::Put,
1352 OpType::Put,
1353 OpType::Put,
1354 OpType::Put,
1355 OpType::Put,
1356 ],
1357 &[21, 22, 23, 24, 25, 26],
1358 );
1359
1360 let mut batch = original.clone();
1361 batch.sort(true).unwrap();
1362 assert_eq!(
1364 new_batch(
1365 &[1, 2, 3, 4, 5],
1366 &[3, 6, 2, 4, 5],
1367 &[
1368 OpType::Put,
1369 OpType::Put,
1370 OpType::Put,
1371 OpType::Put,
1372 OpType::Put,
1373 ],
1374 &[23, 26, 22, 24, 25],
1375 ),
1376 batch
1377 );
1378
1379 let mut batch = original.clone();
1380 batch.sort(false).unwrap();
1381
1382 assert_eq!(
1384 new_batch(
1385 &[1, 2, 2, 3, 4, 5],
1386 &[3, 6, 1, 2, 4, 5],
1387 &[
1388 OpType::Put,
1389 OpType::Put,
1390 OpType::Put,
1391 OpType::Put,
1392 OpType::Put,
1393 OpType::Put,
1394 ],
1395 &[23, 26, 21, 22, 24, 25],
1396 ),
1397 batch
1398 );
1399
1400 let original = new_batch(
1401 &[2, 2, 1],
1402 &[1, 6, 1],
1403 &[OpType::Delete, OpType::Put, OpType::Put],
1404 &[21, 22, 23],
1405 );
1406
1407 let mut batch = original.clone();
1408 batch.sort(true).unwrap();
1409 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1410 assert_eq!(expect, batch);
1411
1412 let mut batch = original.clone();
1413 batch.sort(false).unwrap();
1414 let expect = new_batch(
1415 &[1, 2, 2],
1416 &[1, 6, 1],
1417 &[OpType::Put, OpType::Put, OpType::Delete],
1418 &[23, 22, 21],
1419 );
1420 assert_eq!(expect, batch);
1421 }
1422
1423 #[test]
1424 fn test_get_value() {
1425 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1426
1427 for encoding in encodings {
1428 let codec = build_primary_key_codec_with_fields(
1429 encoding,
1430 [
1431 (
1432 ReservedColumnId::table_id(),
1433 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1434 ),
1435 (
1436 ReservedColumnId::tsid(),
1437 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1438 ),
1439 (
1440 100,
1441 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1442 ),
1443 (
1444 200,
1445 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1446 ),
1447 ]
1448 .into_iter(),
1449 );
1450
1451 let values = [
1452 Value::UInt32(1000),
1453 Value::UInt64(2000),
1454 Value::String("abcdefgh".into()),
1455 Value::String("zyxwvu".into()),
1456 ];
1457 let mut buf = vec![];
1458 codec
1459 .encode_values(
1460 &[
1461 (ReservedColumnId::table_id(), values[0].clone()),
1462 (ReservedColumnId::tsid(), values[1].clone()),
1463 (100, values[2].clone()),
1464 (200, values[3].clone()),
1465 ],
1466 &mut buf,
1467 )
1468 .unwrap();
1469
1470 let field_col_id = 2;
1471 let mut batch = new_batch_builder(
1472 &buf,
1473 &[1, 2, 3],
1474 &[1, 1, 1],
1475 &[OpType::Put, OpType::Put, OpType::Put],
1476 field_col_id,
1477 &[42, 43, 44],
1478 )
1479 .build()
1480 .unwrap();
1481
1482 let v = batch
1483 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1484 .unwrap()
1485 .unwrap();
1486 assert_eq!(values[0], *v);
1487
1488 let v = batch
1489 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1490 .unwrap()
1491 .unwrap();
1492 assert_eq!(values[1], *v);
1493
1494 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1495 assert_eq!(values[2], *v);
1496
1497 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1498 assert_eq!(values[3], *v);
1499
1500 let v = batch.field_col_value(field_col_id).unwrap();
1501 assert_eq!(v.data.get(0), Value::UInt64(42));
1502 assert_eq!(v.data.get(1), Value::UInt64(43));
1503 assert_eq!(v.data.get(2), Value::UInt64(44));
1504 }
1505 }
1506}