1pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub(crate) mod range;
25pub(crate) mod scan_region;
26pub(crate) mod scan_util;
27pub(crate) mod seq_scan;
28pub(crate) mod series_scan;
29pub(crate) mod unordered_scan;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use std::time::Duration;
34
35use api::v1::OpType;
36use async_trait::async_trait;
37use common_time::Timestamp;
38use datafusion_common::arrow::array::UInt8Array;
39use datatypes::arrow;
40use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
41use datatypes::arrow::compute::SortOptions;
42use datatypes::arrow::row::{RowConverter, SortField};
43use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
44use datatypes::types::TimestampType;
45use datatypes::value::{Value, ValueRef};
46use datatypes::vectors::{
47 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
48 TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
49 Vector, VectorRef,
50};
51use futures::stream::BoxStream;
52use futures::TryStreamExt;
53use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
54use snafu::{ensure, OptionExt, ResultExt};
55use store_api::metadata::RegionMetadata;
56use store_api::storage::{ColumnId, SequenceNumber};
57
58use crate::error::{
59 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
60 Result,
61};
62use crate::memtable::BoxedBatchIterator;
63use crate::read::prune::PruneReader;
64
65#[derive(Debug, PartialEq, Clone)]
70pub struct Batch {
71 primary_key: Vec<u8>,
73 pk_values: Option<CompositeValues>,
75 timestamps: VectorRef,
77 sequences: Arc<UInt64Vector>,
81 op_types: Arc<UInt8Vector>,
85 fields: Vec<BatchColumn>,
87 fields_idx: Option<HashMap<ColumnId, usize>>,
89}
90
91impl Batch {
92 pub fn new(
94 primary_key: Vec<u8>,
95 timestamps: VectorRef,
96 sequences: Arc<UInt64Vector>,
97 op_types: Arc<UInt8Vector>,
98 fields: Vec<BatchColumn>,
99 ) -> Result<Batch> {
100 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
101 .with_fields(fields)
102 .build()
103 }
104
105 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
107 Batch::new(
108 self.primary_key,
109 self.timestamps,
110 self.sequences,
111 self.op_types,
112 fields,
113 )
114 }
115
116 pub fn primary_key(&self) -> &[u8] {
118 &self.primary_key
119 }
120
121 pub fn pk_values(&self) -> Option<&CompositeValues> {
123 self.pk_values.as_ref()
124 }
125
126 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
128 self.pk_values = Some(pk_values);
129 }
130
131 #[cfg(any(test, feature = "test"))]
133 pub fn remove_pk_values(&mut self) {
134 self.pk_values = None;
135 }
136
137 pub fn fields(&self) -> &[BatchColumn] {
139 &self.fields
140 }
141
142 pub fn timestamps(&self) -> &VectorRef {
144 &self.timestamps
145 }
146
147 pub fn sequences(&self) -> &Arc<UInt64Vector> {
149 &self.sequences
150 }
151
152 pub fn op_types(&self) -> &Arc<UInt8Vector> {
154 &self.op_types
155 }
156
157 pub fn num_rows(&self) -> usize {
159 self.sequences.len()
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.num_rows() == 0
167 }
168
169 pub fn first_timestamp(&self) -> Option<Timestamp> {
171 if self.timestamps.is_empty() {
172 return None;
173 }
174
175 Some(self.get_timestamp(0))
176 }
177
178 pub fn last_timestamp(&self) -> Option<Timestamp> {
180 if self.timestamps.is_empty() {
181 return None;
182 }
183
184 Some(self.get_timestamp(self.timestamps.len() - 1))
185 }
186
187 pub fn first_sequence(&self) -> Option<SequenceNumber> {
189 if self.sequences.is_empty() {
190 return None;
191 }
192
193 Some(self.get_sequence(0))
194 }
195
196 pub fn last_sequence(&self) -> Option<SequenceNumber> {
198 if self.sequences.is_empty() {
199 return None;
200 }
201
202 Some(self.get_sequence(self.sequences.len() - 1))
203 }
204
205 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
210 self.primary_key = primary_key;
211 }
212
213 pub fn slice(&self, offset: usize, length: usize) -> Batch {
218 let fields = self
219 .fields
220 .iter()
221 .map(|column| BatchColumn {
222 column_id: column.column_id,
223 data: column.data.slice(offset, length),
224 })
225 .collect();
226 Batch {
228 primary_key: self.primary_key.clone(),
231 pk_values: self.pk_values.clone(),
232 timestamps: self.timestamps.slice(offset, length),
233 sequences: Arc::new(self.sequences.get_slice(offset, length)),
234 op_types: Arc::new(self.op_types.get_slice(offset, length)),
235 fields,
236 fields_idx: self.fields_idx.clone(),
237 }
238 }
239
240 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
244 ensure!(
245 !batches.is_empty(),
246 InvalidBatchSnafu {
247 reason: "empty batches",
248 }
249 );
250 if batches.len() == 1 {
251 return Ok(batches.pop().unwrap());
253 }
254
255 let primary_key = std::mem::take(&mut batches[0].primary_key);
256 let first = &batches[0];
257 ensure!(
259 batches
260 .iter()
261 .skip(1)
262 .all(|b| b.primary_key() == primary_key),
263 InvalidBatchSnafu {
264 reason: "batches have different primary key",
265 }
266 );
267 for b in batches.iter().skip(1) {
268 ensure!(
269 b.fields.len() == first.fields.len(),
270 InvalidBatchSnafu {
271 reason: "batches have different field num",
272 }
273 );
274 for (l, r) in b.fields.iter().zip(&first.fields) {
275 ensure!(
276 l.column_id == r.column_id,
277 InvalidBatchSnafu {
278 reason: "batches have different fields",
279 }
280 );
281 }
282 }
283
284 let mut builder = BatchBuilder::new(primary_key);
286 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
288 builder.timestamps_array(array)?;
289 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
290 builder.sequences_array(array)?;
291 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
292 builder.op_types_array(array)?;
293 for (i, batch_column) in first.fields.iter().enumerate() {
294 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
295 builder.push_field_array(batch_column.column_id, array)?;
296 }
297
298 builder.build()
299 }
300
301 pub fn filter_deleted(&mut self) -> Result<()> {
303 let array = self.op_types.as_arrow();
305 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
307 let predicate =
308 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
309 self.filter(&BooleanVector::from(predicate))
310 }
311
312 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
315 self.timestamps = self
316 .timestamps
317 .filter(predicate)
318 .context(ComputeVectorSnafu)?;
319 self.sequences = Arc::new(
320 UInt64Vector::try_from_arrow_array(
321 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
322 .context(ComputeArrowSnafu)?,
323 )
324 .unwrap(),
325 );
326 self.op_types = Arc::new(
327 UInt8Vector::try_from_arrow_array(
328 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
329 .context(ComputeArrowSnafu)?,
330 )
331 .unwrap(),
332 );
333 for batch_column in &mut self.fields {
334 batch_column.data = batch_column
335 .data
336 .filter(predicate)
337 .context(ComputeVectorSnafu)?;
338 }
339
340 Ok(())
341 }
342
343 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
345 let seq = match (sequence, self.last_sequence()) {
346 (None, _) | (_, None) => return Ok(()),
347 (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
348 (Some(sequence), Some(_)) => sequence,
349 };
350
351 let seqs = self.sequences.as_arrow();
352 let sequence = UInt64Array::new_scalar(seq);
353 let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
354 .context(ComputeArrowSnafu)?;
355 let predicate = BooleanVector::from(predicate);
356 self.filter(&predicate)?;
357
358 Ok(())
359 }
360
361 pub fn sort(&mut self, dedup: bool) -> Result<()> {
368 let converter = RowConverter::new(vec![
371 SortField::new(self.timestamps.data_type().as_arrow_type()),
372 SortField::new_with_options(
373 self.sequences.data_type().as_arrow_type(),
374 SortOptions {
375 descending: true,
376 ..Default::default()
377 },
378 ),
379 ])
380 .context(ComputeArrowSnafu)?;
381 let columns = [
383 self.timestamps.to_arrow_array(),
384 self.sequences.to_arrow_array(),
385 ];
386 let rows = converter.convert_columns(&columns).unwrap();
387 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
388
389 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
390 if !was_sorted {
391 to_sort.sort_unstable_by_key(|x| x.1);
392 }
393
394 let num_rows = to_sort.len();
395 if dedup {
396 to_sort.dedup_by(|left, right| {
398 debug_assert_eq!(18, left.1.as_ref().len());
399 debug_assert_eq!(18, right.1.as_ref().len());
400 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
401 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
403 });
404 }
405 let no_dedup = to_sort.len() == num_rows;
406
407 if was_sorted && no_dedup {
408 return Ok(());
409 }
410 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
411 self.take_in_place(&indices)
412 }
413
414 pub fn memory_size(&self) -> usize {
416 let mut size = std::mem::size_of::<Self>();
417 size += self.primary_key.len();
418 size += self.timestamps.memory_size();
419 size += self.sequences.memory_size();
420 size += self.op_types.memory_size();
421 for batch_column in &self.fields {
422 size += batch_column.data.memory_size();
423 }
424 size
425 }
426
427 pub(crate) fn projected_fields(
429 metadata: &RegionMetadata,
430 projection: &[ColumnId],
431 ) -> Vec<(ColumnId, ConcreteDataType)> {
432 let projected_ids: HashSet<_> = projection.iter().copied().collect();
433 metadata
434 .field_columns()
435 .filter_map(|column| {
436 if projected_ids.contains(&column.column_id) {
437 Some((column.column_id, column.column_schema.data_type.clone()))
438 } else {
439 None
440 }
441 })
442 .collect()
443 }
444
445 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
447 if self.timestamps.is_empty() {
448 return None;
449 }
450
451 let values = match self.timestamps.data_type() {
452 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
453 .timestamps
454 .as_any()
455 .downcast_ref::<TimestampSecondVector>()
456 .unwrap()
457 .as_arrow()
458 .values(),
459 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
460 .timestamps
461 .as_any()
462 .downcast_ref::<TimestampMillisecondVector>()
463 .unwrap()
464 .as_arrow()
465 .values(),
466 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
467 .timestamps
468 .as_any()
469 .downcast_ref::<TimestampMicrosecondVector>()
470 .unwrap()
471 .as_arrow()
472 .values(),
473 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
474 .timestamps
475 .as_any()
476 .downcast_ref::<TimestampNanosecondVector>()
477 .unwrap()
478 .as_arrow()
479 .values(),
480 other => panic!("timestamps in a Batch has other type {:?}", other),
481 };
482
483 Some(values)
484 }
485
486 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
488 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
489 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
490 .context(ComputeArrowSnafu)?;
491 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
493 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
494 .context(ComputeArrowSnafu)?;
495 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
496 for batch_column in &mut self.fields {
497 batch_column.data = batch_column
498 .data
499 .take(indices)
500 .context(ComputeVectorSnafu)?;
501 }
502
503 Ok(())
504 }
505
506 fn get_timestamp(&self, index: usize) -> Timestamp {
511 match self.timestamps.get_ref(index) {
512 ValueRef::Timestamp(timestamp) => timestamp,
513
514 value => panic!("{:?} is not a timestamp", value),
516 }
517 }
518
519 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
524 self.sequences.get_data(index).unwrap()
526 }
527
528 #[cfg(debug_assertions)]
530 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
531 use std::cmp::Ordering;
532 if self.timestamps_native().is_none() {
533 return Ok(());
534 }
535
536 let timestamps = self.timestamps_native().unwrap();
537 let sequences = self.sequences.as_arrow().values();
538 for (i, window) in timestamps.windows(2).enumerate() {
539 let current = window[0];
540 let next = window[1];
541 let current_sequence = sequences[i];
542 let next_sequence = sequences[i + 1];
543 match current.cmp(&next) {
544 Ordering::Less => {
545 continue;
547 }
548 Ordering::Equal => {
549 if current_sequence < next_sequence {
551 return Err(format!(
552 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
553 current, next, current_sequence, next_sequence, i
554 ));
555 }
556 }
557 Ordering::Greater => {
558 return Err(format!(
560 "timestamps are not monotonic: {} > {}, index: {}",
561 current, next, i
562 ));
563 }
564 }
565 }
566
567 Ok(())
568 }
569
570 #[cfg(debug_assertions)]
572 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
573 if self.primary_key() < other.primary_key() {
575 return Ok(());
576 }
577 if self.primary_key() > other.primary_key() {
578 return Err(format!(
579 "primary key is not monotonic: {:?} > {:?}",
580 self.primary_key(),
581 other.primary_key()
582 ));
583 }
584 if self.last_timestamp() < other.first_timestamp() {
586 return Ok(());
587 }
588 if self.last_timestamp() > other.first_timestamp() {
589 return Err(format!(
590 "timestamps are not monotonic: {:?} > {:?}",
591 self.last_timestamp(),
592 other.first_timestamp()
593 ));
594 }
595 if self.last_sequence() >= other.first_sequence() {
597 return Ok(());
598 }
599 Err(format!(
600 "sequences are not monotonic: {:?} < {:?}",
601 self.last_sequence(),
602 other.first_sequence()
603 ))
604 }
605
606 pub fn pk_col_value(
610 &mut self,
611 codec: &dyn PrimaryKeyCodec,
612 col_idx_in_pk: usize,
613 column_id: ColumnId,
614 ) -> Result<Option<&Value>> {
615 if self.pk_values.is_none() {
616 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
617 }
618
619 let pk_values = self.pk_values.as_ref().unwrap();
620 Ok(match pk_values {
621 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
622 CompositeValues::Sparse(values) => values.get(&column_id),
623 })
624 }
625
626 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
630 if self.fields_idx.is_none() {
631 self.fields_idx = Some(
632 self.fields
633 .iter()
634 .enumerate()
635 .map(|(i, c)| (c.column_id, i))
636 .collect(),
637 );
638 }
639
640 self.fields_idx
641 .as_ref()
642 .unwrap()
643 .get(&column_id)
644 .map(|&idx| &self.fields[idx])
645 }
646}
647
648#[cfg(debug_assertions)]
650#[derive(Default)]
651pub(crate) struct BatchChecker {
652 last_batch: Option<Batch>,
653 start: Option<Timestamp>,
654 end: Option<Timestamp>,
655}
656
657#[cfg(debug_assertions)]
658impl BatchChecker {
659 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
661 self.start = start;
662 self
663 }
664
665 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
667 self.end = end;
668 self
669 }
670
671 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
674 batch.check_monotonic()?;
675
676 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
677 if start > first {
678 return Err(format!(
679 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
680 start, first
681 ));
682 }
683 }
684 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
685 if end <= last {
686 return Err(format!(
687 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
688 end, last
689 ));
690 }
691 }
692
693 let res = self
696 .last_batch
697 .as_ref()
698 .map(|last| last.check_next_batch(batch))
699 .unwrap_or(Ok(()));
700 self.last_batch = Some(batch.clone());
701 res
702 }
703
704 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
706 use std::fmt::Write;
707
708 let mut message = String::new();
709 if let Some(last) = &self.last_batch {
710 write!(
711 message,
712 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
713 last.primary_key(),
714 last.last_timestamp(),
715 last.last_sequence()
716 )
717 .unwrap();
718 }
719 write!(
720 message,
721 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
722 batch.primary_key(),
723 batch.timestamps(),
724 batch.sequences()
725 )
726 .unwrap();
727
728 message
729 }
730
731 pub(crate) fn ensure_part_range_batch(
733 &mut self,
734 scanner: &str,
735 region_id: store_api::storage::RegionId,
736 partition: usize,
737 part_range: store_api::region_engine::PartitionRange,
738 batch: &Batch,
739 ) {
740 if let Err(e) = self.check_monotonic(batch) {
741 let err_msg = format!(
742 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
743 scanner, e, region_id, partition, part_range,
744 );
745 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
746 panic!("{err_msg}, batch rows: {}", batch.num_rows());
748 }
749 }
750}
751
752const TIMESTAMP_KEY_LEN: usize = 9;
754
755fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
757 let arrays: Vec<_> = iter.collect();
758 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
759 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
760}
761
762#[derive(Debug, PartialEq, Eq, Clone)]
764pub struct BatchColumn {
765 pub column_id: ColumnId,
767 pub data: VectorRef,
769}
770
771pub struct BatchBuilder {
773 primary_key: Vec<u8>,
774 timestamps: Option<VectorRef>,
775 sequences: Option<Arc<UInt64Vector>>,
776 op_types: Option<Arc<UInt8Vector>>,
777 fields: Vec<BatchColumn>,
778}
779
780impl BatchBuilder {
781 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
783 BatchBuilder {
784 primary_key,
785 timestamps: None,
786 sequences: None,
787 op_types: None,
788 fields: Vec::new(),
789 }
790 }
791
792 pub fn with_required_columns(
794 primary_key: Vec<u8>,
795 timestamps: VectorRef,
796 sequences: Arc<UInt64Vector>,
797 op_types: Arc<UInt8Vector>,
798 ) -> BatchBuilder {
799 BatchBuilder {
800 primary_key,
801 timestamps: Some(timestamps),
802 sequences: Some(sequences),
803 op_types: Some(op_types),
804 fields: Vec::new(),
805 }
806 }
807
808 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
810 self.fields = fields;
811 self
812 }
813
814 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
816 self.fields.push(column);
817 self
818 }
819
820 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
822 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
823 self.fields.push(BatchColumn {
824 column_id,
825 data: vector,
826 });
827
828 Ok(self)
829 }
830
831 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
833 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
834 ensure!(
835 vector.data_type().is_timestamp(),
836 InvalidBatchSnafu {
837 reason: format!("{:?} is not a timestamp type", vector.data_type()),
838 }
839 );
840
841 self.timestamps = Some(vector);
842 Ok(self)
843 }
844
845 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
847 ensure!(
848 *array.data_type() == arrow::datatypes::DataType::UInt64,
849 InvalidBatchSnafu {
850 reason: "sequence array is not UInt64 type",
851 }
852 );
853 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
855 self.sequences = Some(vector);
856
857 Ok(self)
858 }
859
860 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
862 ensure!(
863 *array.data_type() == arrow::datatypes::DataType::UInt8,
864 InvalidBatchSnafu {
865 reason: "sequence array is not UInt8 type",
866 }
867 );
868 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
870 self.op_types = Some(vector);
871
872 Ok(self)
873 }
874
875 pub fn build(self) -> Result<Batch> {
877 let timestamps = self.timestamps.context(InvalidBatchSnafu {
878 reason: "missing timestamps",
879 })?;
880 let sequences = self.sequences.context(InvalidBatchSnafu {
881 reason: "missing sequences",
882 })?;
883 let op_types = self.op_types.context(InvalidBatchSnafu {
884 reason: "missing op_types",
885 })?;
886 assert_eq!(0, timestamps.null_count());
889 assert_eq!(0, sequences.null_count());
890 assert_eq!(0, op_types.null_count());
891
892 let ts_len = timestamps.len();
893 ensure!(
894 sequences.len() == ts_len,
895 InvalidBatchSnafu {
896 reason: format!(
897 "sequence have different len {} != {}",
898 sequences.len(),
899 ts_len
900 ),
901 }
902 );
903 ensure!(
904 op_types.len() == ts_len,
905 InvalidBatchSnafu {
906 reason: format!(
907 "op type have different len {} != {}",
908 op_types.len(),
909 ts_len
910 ),
911 }
912 );
913 for column in &self.fields {
914 ensure!(
915 column.data.len() == ts_len,
916 InvalidBatchSnafu {
917 reason: format!(
918 "column {} has different len {} != {}",
919 column.column_id,
920 column.data.len(),
921 ts_len
922 ),
923 }
924 );
925 }
926
927 Ok(Batch {
928 primary_key: self.primary_key,
929 pk_values: None,
930 timestamps,
931 sequences,
932 op_types,
933 fields: self.fields,
934 fields_idx: None,
935 })
936 }
937}
938
939impl From<Batch> for BatchBuilder {
940 fn from(batch: Batch) -> Self {
941 Self {
942 primary_key: batch.primary_key,
943 timestamps: Some(batch.timestamps),
944 sequences: Some(batch.sequences),
945 op_types: Some(batch.op_types),
946 fields: batch.fields,
947 }
948 }
949}
950
951pub enum Source {
955 Reader(BoxedBatchReader),
957 Iter(BoxedBatchIterator),
959 Stream(BoxedBatchStream),
961 PruneReader(PruneReader),
963}
964
965impl Source {
966 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
968 match self {
969 Source::Reader(reader) => reader.next_batch().await,
970 Source::Iter(iter) => iter.next().transpose(),
971 Source::Stream(stream) => stream.try_next().await,
972 Source::PruneReader(reader) => reader.next_batch().await,
973 }
974 }
975}
976
977#[async_trait]
981pub trait BatchReader: Send {
982 async fn next_batch(&mut self) -> Result<Option<Batch>>;
990}
991
992pub type BoxedBatchReader = Box<dyn BatchReader>;
994
995pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
997
998#[async_trait::async_trait]
999impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1000 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1001 (**self).next_batch().await
1002 }
1003}
1004
1005#[derive(Debug, Default)]
1007pub(crate) struct ScannerMetrics {
1008 prepare_scan_cost: Duration,
1010 build_reader_cost: Duration,
1012 scan_cost: Duration,
1014 convert_cost: Duration,
1016 yield_cost: Duration,
1018 num_batches: usize,
1020 num_rows: usize,
1022 num_mem_ranges: usize,
1024 num_file_ranges: usize,
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1031 use store_api::codec::PrimaryKeyEncoding;
1032 use store_api::storage::consts::ReservedColumnId;
1033
1034 use super::*;
1035 use crate::error::Error;
1036 use crate::test_util::new_batch_builder;
1037
1038 fn new_batch(
1039 timestamps: &[i64],
1040 sequences: &[u64],
1041 op_types: &[OpType],
1042 field: &[u64],
1043 ) -> Batch {
1044 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1045 .build()
1046 .unwrap()
1047 }
1048
1049 #[test]
1050 fn test_empty_batch() {
1051 let batch = new_batch(&[], &[], &[], &[]);
1052 assert_eq!(None, batch.first_timestamp());
1053 assert_eq!(None, batch.last_timestamp());
1054 assert_eq!(None, batch.first_sequence());
1055 assert_eq!(None, batch.last_sequence());
1056 assert!(batch.timestamps_native().is_none());
1057 }
1058
1059 #[test]
1060 fn test_first_last_one() {
1061 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1062 assert_eq!(
1063 Timestamp::new_millisecond(1),
1064 batch.first_timestamp().unwrap()
1065 );
1066 assert_eq!(
1067 Timestamp::new_millisecond(1),
1068 batch.last_timestamp().unwrap()
1069 );
1070 assert_eq!(2, batch.first_sequence().unwrap());
1071 assert_eq!(2, batch.last_sequence().unwrap());
1072 }
1073
1074 #[test]
1075 fn test_first_last_multiple() {
1076 let batch = new_batch(
1077 &[1, 2, 3],
1078 &[11, 12, 13],
1079 &[OpType::Put, OpType::Put, OpType::Put],
1080 &[21, 22, 23],
1081 );
1082 assert_eq!(
1083 Timestamp::new_millisecond(1),
1084 batch.first_timestamp().unwrap()
1085 );
1086 assert_eq!(
1087 Timestamp::new_millisecond(3),
1088 batch.last_timestamp().unwrap()
1089 );
1090 assert_eq!(11, batch.first_sequence().unwrap());
1091 assert_eq!(13, batch.last_sequence().unwrap());
1092 }
1093
1094 #[test]
1095 fn test_slice() {
1096 let batch = new_batch(
1097 &[1, 2, 3, 4],
1098 &[11, 12, 13, 14],
1099 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1100 &[21, 22, 23, 24],
1101 );
1102 let batch = batch.slice(1, 2);
1103 let expect = new_batch(
1104 &[2, 3],
1105 &[12, 13],
1106 &[OpType::Delete, OpType::Put],
1107 &[22, 23],
1108 );
1109 assert_eq!(expect, batch);
1110 }
1111
1112 #[test]
1113 fn test_timestamps_native() {
1114 let batch = new_batch(
1115 &[1, 2, 3, 4],
1116 &[11, 12, 13, 14],
1117 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1118 &[21, 22, 23, 24],
1119 );
1120 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1121 }
1122
1123 #[test]
1124 fn test_concat_empty() {
1125 let err = Batch::concat(vec![]).unwrap_err();
1126 assert!(
1127 matches!(err, Error::InvalidBatch { .. }),
1128 "unexpected err: {err}"
1129 );
1130 }
1131
1132 #[test]
1133 fn test_concat_one() {
1134 let batch = new_batch(&[], &[], &[], &[]);
1135 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1136 assert_eq!(batch, actual);
1137
1138 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1139 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1140 assert_eq!(batch, actual);
1141 }
1142
1143 #[test]
1144 fn test_concat_multiple() {
1145 let batches = vec![
1146 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1147 new_batch(
1148 &[3, 4, 5],
1149 &[13, 14, 15],
1150 &[OpType::Put, OpType::Delete, OpType::Put],
1151 &[23, 24, 25],
1152 ),
1153 new_batch(&[], &[], &[], &[]),
1154 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1155 ];
1156 let batch = Batch::concat(batches).unwrap();
1157 let expect = new_batch(
1158 &[1, 2, 3, 4, 5, 6],
1159 &[11, 12, 13, 14, 15, 16],
1160 &[
1161 OpType::Put,
1162 OpType::Put,
1163 OpType::Put,
1164 OpType::Delete,
1165 OpType::Put,
1166 OpType::Put,
1167 ],
1168 &[21, 22, 23, 24, 25, 26],
1169 );
1170 assert_eq!(expect, batch);
1171 }
1172
1173 #[test]
1174 fn test_concat_different() {
1175 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1176 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1177 batch2.primary_key = b"hello".to_vec();
1178 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1179 assert!(
1180 matches!(err, Error::InvalidBatch { .. }),
1181 "unexpected err: {err}"
1182 );
1183 }
1184
1185 #[test]
1186 fn test_concat_different_fields() {
1187 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1188 let fields = vec![
1189 batch1.fields()[0].clone(),
1190 BatchColumn {
1191 column_id: 2,
1192 data: Arc::new(UInt64Vector::from_slice([2])),
1193 },
1194 ];
1195 let batch2 = batch1.clone().with_fields(fields).unwrap();
1197 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1198 assert!(
1199 matches!(err, Error::InvalidBatch { .. }),
1200 "unexpected err: {err}"
1201 );
1202
1203 let fields = vec![BatchColumn {
1205 column_id: 2,
1206 data: Arc::new(UInt64Vector::from_slice([2])),
1207 }];
1208 let batch2 = batch1.clone().with_fields(fields).unwrap();
1209 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1210 assert!(
1211 matches!(err, Error::InvalidBatch { .. }),
1212 "unexpected err: {err}"
1213 );
1214 }
1215
1216 #[test]
1217 fn test_filter_deleted_empty() {
1218 let mut batch = new_batch(&[], &[], &[], &[]);
1219 batch.filter_deleted().unwrap();
1220 assert!(batch.is_empty());
1221 }
1222
1223 #[test]
1224 fn test_filter_deleted() {
1225 let mut batch = new_batch(
1226 &[1, 2, 3, 4],
1227 &[11, 12, 13, 14],
1228 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1229 &[21, 22, 23, 24],
1230 );
1231 batch.filter_deleted().unwrap();
1232 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1233 assert_eq!(expect, batch);
1234
1235 let mut batch = new_batch(
1236 &[1, 2, 3, 4],
1237 &[11, 12, 13, 14],
1238 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1239 &[21, 22, 23, 24],
1240 );
1241 let expect = batch.clone();
1242 batch.filter_deleted().unwrap();
1243 assert_eq!(expect, batch);
1244 }
1245
1246 #[test]
1247 fn test_filter_by_sequence() {
1248 let mut batch = new_batch(
1250 &[1, 2, 3, 4],
1251 &[11, 12, 13, 14],
1252 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1253 &[21, 22, 23, 24],
1254 );
1255 batch.filter_by_sequence(Some(13)).unwrap();
1256 let expect = new_batch(
1257 &[1, 2, 3],
1258 &[11, 12, 13],
1259 &[OpType::Put, OpType::Put, OpType::Put],
1260 &[21, 22, 23],
1261 );
1262 assert_eq!(expect, batch);
1263
1264 let mut batch = new_batch(
1266 &[1, 2, 3, 4],
1267 &[11, 12, 13, 14],
1268 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1269 &[21, 22, 23, 24],
1270 );
1271
1272 batch.filter_by_sequence(Some(10)).unwrap();
1273 assert!(batch.is_empty());
1274
1275 let mut batch = new_batch(
1277 &[1, 2, 3, 4],
1278 &[11, 12, 13, 14],
1279 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1280 &[21, 22, 23, 24],
1281 );
1282 let expect = batch.clone();
1283 batch.filter_by_sequence(None).unwrap();
1284 assert_eq!(expect, batch);
1285
1286 let mut batch = new_batch(&[], &[], &[], &[]);
1288 batch.filter_by_sequence(Some(10)).unwrap();
1289 assert!(batch.is_empty());
1290
1291 let mut batch = new_batch(&[], &[], &[], &[]);
1293 batch.filter_by_sequence(None).unwrap();
1294 assert!(batch.is_empty());
1295 }
1296
1297 #[test]
1298 fn test_filter() {
1299 let mut batch = new_batch(
1301 &[1, 2, 3, 4],
1302 &[11, 12, 13, 14],
1303 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1304 &[21, 22, 23, 24],
1305 );
1306 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1307 batch.filter(&predicate).unwrap();
1308 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1309 assert_eq!(expect, batch);
1310
1311 let mut batch = new_batch(
1313 &[1, 2, 3, 4],
1314 &[11, 12, 13, 14],
1315 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1316 &[21, 22, 23, 24],
1317 );
1318 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1319 batch.filter(&predicate).unwrap();
1320 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1321 assert_eq!(expect, batch);
1322
1323 let predicate = BooleanVector::from_vec(vec![false, false]);
1325 batch.filter(&predicate).unwrap();
1326 assert!(batch.is_empty());
1327 }
1328
1329 #[test]
1330 fn test_sort_and_dedup() {
1331 let original = new_batch(
1332 &[2, 3, 1, 4, 5, 2],
1333 &[1, 2, 3, 4, 5, 6],
1334 &[
1335 OpType::Put,
1336 OpType::Put,
1337 OpType::Put,
1338 OpType::Put,
1339 OpType::Put,
1340 OpType::Put,
1341 ],
1342 &[21, 22, 23, 24, 25, 26],
1343 );
1344
1345 let mut batch = original.clone();
1346 batch.sort(true).unwrap();
1347 assert_eq!(
1349 new_batch(
1350 &[1, 2, 3, 4, 5],
1351 &[3, 6, 2, 4, 5],
1352 &[
1353 OpType::Put,
1354 OpType::Put,
1355 OpType::Put,
1356 OpType::Put,
1357 OpType::Put,
1358 ],
1359 &[23, 26, 22, 24, 25],
1360 ),
1361 batch
1362 );
1363
1364 let mut batch = original.clone();
1365 batch.sort(false).unwrap();
1366
1367 assert_eq!(
1369 new_batch(
1370 &[1, 2, 2, 3, 4, 5],
1371 &[3, 6, 1, 2, 4, 5],
1372 &[
1373 OpType::Put,
1374 OpType::Put,
1375 OpType::Put,
1376 OpType::Put,
1377 OpType::Put,
1378 OpType::Put,
1379 ],
1380 &[23, 26, 21, 22, 24, 25],
1381 ),
1382 batch
1383 );
1384
1385 let original = new_batch(
1386 &[2, 2, 1],
1387 &[1, 6, 1],
1388 &[OpType::Delete, OpType::Put, OpType::Put],
1389 &[21, 22, 23],
1390 );
1391
1392 let mut batch = original.clone();
1393 batch.sort(true).unwrap();
1394 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1395 assert_eq!(expect, batch);
1396
1397 let mut batch = original.clone();
1398 batch.sort(false).unwrap();
1399 let expect = new_batch(
1400 &[1, 2, 2],
1401 &[1, 6, 1],
1402 &[OpType::Put, OpType::Put, OpType::Delete],
1403 &[23, 22, 21],
1404 );
1405 assert_eq!(expect, batch);
1406 }
1407
1408 #[test]
1409 fn test_get_value() {
1410 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1411
1412 for encoding in encodings {
1413 let codec = build_primary_key_codec_with_fields(
1414 encoding,
1415 [
1416 (
1417 ReservedColumnId::table_id(),
1418 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1419 ),
1420 (
1421 ReservedColumnId::tsid(),
1422 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1423 ),
1424 (
1425 100,
1426 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1427 ),
1428 (
1429 200,
1430 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1431 ),
1432 ]
1433 .into_iter(),
1434 );
1435
1436 let values = [
1437 Value::UInt32(1000),
1438 Value::UInt64(2000),
1439 Value::String("abcdefgh".into()),
1440 Value::String("zyxwvu".into()),
1441 ];
1442 let mut buf = vec![];
1443 codec
1444 .encode_values(
1445 &[
1446 (ReservedColumnId::table_id(), values[0].clone()),
1447 (ReservedColumnId::tsid(), values[1].clone()),
1448 (100, values[2].clone()),
1449 (200, values[3].clone()),
1450 ],
1451 &mut buf,
1452 )
1453 .unwrap();
1454
1455 let field_col_id = 2;
1456 let mut batch = new_batch_builder(
1457 &buf,
1458 &[1, 2, 3],
1459 &[1, 1, 1],
1460 &[OpType::Put, OpType::Put, OpType::Put],
1461 field_col_id,
1462 &[42, 43, 44],
1463 )
1464 .build()
1465 .unwrap();
1466
1467 let v = batch
1468 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1469 .unwrap()
1470 .unwrap();
1471 assert_eq!(values[0], *v);
1472
1473 let v = batch
1474 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1475 .unwrap()
1476 .unwrap();
1477 assert_eq!(values[1], *v);
1478
1479 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1480 assert_eq!(values[2], *v);
1481
1482 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1483 assert_eq!(values[3], *v);
1484
1485 let v = batch.field_col_value(field_col_id).unwrap();
1486 assert_eq!(v.data.get(0), Value::UInt64(42));
1487 assert_eq!(v.data.get(1), Value::UInt64(43));
1488 assert_eq!(v.data.get(2), Value::UInt64(44));
1489 }
1490 }
1491}