1pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod projection;
22pub(crate) mod prune;
23pub(crate) mod range;
24pub(crate) mod scan_region;
25pub(crate) mod scan_util;
26pub(crate) mod seq_scan;
27pub(crate) mod unordered_scan;
28
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::Duration;
32
33use api::v1::OpType;
34use async_trait::async_trait;
35use common_time::Timestamp;
36use datafusion_common::arrow::array::UInt8Array;
37use datatypes::arrow;
38use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
39use datatypes::arrow::compute::SortOptions;
40use datatypes::arrow::row::{RowConverter, SortField};
41use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
42use datatypes::types::TimestampType;
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::{
45 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
46 TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
47 Vector, VectorRef,
48};
49use futures::stream::BoxStream;
50use futures::TryStreamExt;
51use snafu::{ensure, OptionExt, ResultExt};
52use store_api::metadata::RegionMetadata;
53use store_api::storage::{ColumnId, SequenceNumber};
54
55use crate::error::{
56 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
57};
58use crate::memtable::BoxedBatchIterator;
59use crate::read::prune::PruneReader;
60use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
61
62#[derive(Debug, PartialEq, Clone)]
67pub struct Batch {
68 primary_key: Vec<u8>,
70 pk_values: Option<CompositeValues>,
72 timestamps: VectorRef,
74 sequences: Arc<UInt64Vector>,
78 op_types: Arc<UInt8Vector>,
82 fields: Vec<BatchColumn>,
84 fields_idx: Option<HashMap<ColumnId, usize>>,
86}
87
88impl Batch {
89 pub fn new(
91 primary_key: Vec<u8>,
92 timestamps: VectorRef,
93 sequences: Arc<UInt64Vector>,
94 op_types: Arc<UInt8Vector>,
95 fields: Vec<BatchColumn>,
96 ) -> Result<Batch> {
97 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
98 .with_fields(fields)
99 .build()
100 }
101
102 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
104 Batch::new(
105 self.primary_key,
106 self.timestamps,
107 self.sequences,
108 self.op_types,
109 fields,
110 )
111 }
112
113 pub fn primary_key(&self) -> &[u8] {
115 &self.primary_key
116 }
117
118 pub fn pk_values(&self) -> Option<&CompositeValues> {
120 self.pk_values.as_ref()
121 }
122
123 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
125 self.pk_values = Some(pk_values);
126 }
127
128 #[cfg(any(test, feature = "test"))]
130 pub fn remove_pk_values(&mut self) {
131 self.pk_values = None;
132 }
133
134 pub fn fields(&self) -> &[BatchColumn] {
136 &self.fields
137 }
138
139 pub fn timestamps(&self) -> &VectorRef {
141 &self.timestamps
142 }
143
144 pub fn sequences(&self) -> &Arc<UInt64Vector> {
146 &self.sequences
147 }
148
149 pub fn op_types(&self) -> &Arc<UInt8Vector> {
151 &self.op_types
152 }
153
154 pub fn num_rows(&self) -> usize {
156 self.sequences.len()
159 }
160
161 pub fn is_empty(&self) -> bool {
163 self.num_rows() == 0
164 }
165
166 pub fn first_timestamp(&self) -> Option<Timestamp> {
168 if self.timestamps.is_empty() {
169 return None;
170 }
171
172 Some(self.get_timestamp(0))
173 }
174
175 pub fn last_timestamp(&self) -> Option<Timestamp> {
177 if self.timestamps.is_empty() {
178 return None;
179 }
180
181 Some(self.get_timestamp(self.timestamps.len() - 1))
182 }
183
184 pub fn first_sequence(&self) -> Option<SequenceNumber> {
186 if self.sequences.is_empty() {
187 return None;
188 }
189
190 Some(self.get_sequence(0))
191 }
192
193 pub fn last_sequence(&self) -> Option<SequenceNumber> {
195 if self.sequences.is_empty() {
196 return None;
197 }
198
199 Some(self.get_sequence(self.sequences.len() - 1))
200 }
201
202 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
207 self.primary_key = primary_key;
208 }
209
210 pub fn slice(&self, offset: usize, length: usize) -> Batch {
215 let fields = self
216 .fields
217 .iter()
218 .map(|column| BatchColumn {
219 column_id: column.column_id,
220 data: column.data.slice(offset, length),
221 })
222 .collect();
223 Batch {
225 primary_key: self.primary_key.clone(),
228 pk_values: self.pk_values.clone(),
229 timestamps: self.timestamps.slice(offset, length),
230 sequences: Arc::new(self.sequences.get_slice(offset, length)),
231 op_types: Arc::new(self.op_types.get_slice(offset, length)),
232 fields,
233 fields_idx: self.fields_idx.clone(),
234 }
235 }
236
237 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
241 ensure!(
242 !batches.is_empty(),
243 InvalidBatchSnafu {
244 reason: "empty batches",
245 }
246 );
247 if batches.len() == 1 {
248 return Ok(batches.pop().unwrap());
250 }
251
252 let primary_key = std::mem::take(&mut batches[0].primary_key);
253 let first = &batches[0];
254 ensure!(
256 batches
257 .iter()
258 .skip(1)
259 .all(|b| b.primary_key() == primary_key),
260 InvalidBatchSnafu {
261 reason: "batches have different primary key",
262 }
263 );
264 for b in batches.iter().skip(1) {
265 ensure!(
266 b.fields.len() == first.fields.len(),
267 InvalidBatchSnafu {
268 reason: "batches have different field num",
269 }
270 );
271 for (l, r) in b.fields.iter().zip(&first.fields) {
272 ensure!(
273 l.column_id == r.column_id,
274 InvalidBatchSnafu {
275 reason: "batches have different fields",
276 }
277 );
278 }
279 }
280
281 let mut builder = BatchBuilder::new(primary_key);
283 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
285 builder.timestamps_array(array)?;
286 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
287 builder.sequences_array(array)?;
288 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
289 builder.op_types_array(array)?;
290 for (i, batch_column) in first.fields.iter().enumerate() {
291 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
292 builder.push_field_array(batch_column.column_id, array)?;
293 }
294
295 builder.build()
296 }
297
298 pub fn filter_deleted(&mut self) -> Result<()> {
300 let array = self.op_types.as_arrow();
302 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
304 let predicate =
305 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
306 self.filter(&BooleanVector::from(predicate))
307 }
308
309 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
312 self.timestamps = self
313 .timestamps
314 .filter(predicate)
315 .context(ComputeVectorSnafu)?;
316 self.sequences = Arc::new(
317 UInt64Vector::try_from_arrow_array(
318 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
319 .context(ComputeArrowSnafu)?,
320 )
321 .unwrap(),
322 );
323 self.op_types = Arc::new(
324 UInt8Vector::try_from_arrow_array(
325 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
326 .context(ComputeArrowSnafu)?,
327 )
328 .unwrap(),
329 );
330 for batch_column in &mut self.fields {
331 batch_column.data = batch_column
332 .data
333 .filter(predicate)
334 .context(ComputeVectorSnafu)?;
335 }
336
337 Ok(())
338 }
339
340 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
342 let seq = match (sequence, self.last_sequence()) {
343 (None, _) | (_, None) => return Ok(()),
344 (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
345 (Some(sequence), Some(_)) => sequence,
346 };
347
348 let seqs = self.sequences.as_arrow();
349 let sequence = UInt64Array::new_scalar(seq);
350 let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
351 .context(ComputeArrowSnafu)?;
352 let predicate = BooleanVector::from(predicate);
353 self.filter(&predicate)?;
354
355 Ok(())
356 }
357
358 pub fn sort(&mut self, dedup: bool) -> Result<()> {
365 let converter = RowConverter::new(vec![
368 SortField::new(self.timestamps.data_type().as_arrow_type()),
369 SortField::new_with_options(
370 self.sequences.data_type().as_arrow_type(),
371 SortOptions {
372 descending: true,
373 ..Default::default()
374 },
375 ),
376 ])
377 .context(ComputeArrowSnafu)?;
378 let columns = [
380 self.timestamps.to_arrow_array(),
381 self.sequences.to_arrow_array(),
382 ];
383 let rows = converter.convert_columns(&columns).unwrap();
384 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
385
386 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
387 if !was_sorted {
388 to_sort.sort_unstable_by_key(|x| x.1);
389 }
390
391 let num_rows = to_sort.len();
392 if dedup {
393 to_sort.dedup_by(|left, right| {
395 debug_assert_eq!(18, left.1.as_ref().len());
396 debug_assert_eq!(18, right.1.as_ref().len());
397 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
398 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
400 });
401 }
402 let no_dedup = to_sort.len() == num_rows;
403
404 if was_sorted && no_dedup {
405 return Ok(());
406 }
407 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
408 self.take_in_place(&indices)
409 }
410
411 pub fn memory_size(&self) -> usize {
413 let mut size = std::mem::size_of::<Self>();
414 size += self.primary_key.len();
415 size += self.timestamps.memory_size();
416 size += self.sequences.memory_size();
417 size += self.op_types.memory_size();
418 for batch_column in &self.fields {
419 size += batch_column.data.memory_size();
420 }
421 size
422 }
423
424 pub(crate) fn projected_fields(
426 metadata: &RegionMetadata,
427 projection: &[ColumnId],
428 ) -> Vec<(ColumnId, ConcreteDataType)> {
429 let projected_ids: HashSet<_> = projection.iter().copied().collect();
430 metadata
431 .field_columns()
432 .filter_map(|column| {
433 if projected_ids.contains(&column.column_id) {
434 Some((column.column_id, column.column_schema.data_type.clone()))
435 } else {
436 None
437 }
438 })
439 .collect()
440 }
441
442 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
444 if self.timestamps.is_empty() {
445 return None;
446 }
447
448 let values = match self.timestamps.data_type() {
449 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
450 .timestamps
451 .as_any()
452 .downcast_ref::<TimestampSecondVector>()
453 .unwrap()
454 .as_arrow()
455 .values(),
456 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
457 .timestamps
458 .as_any()
459 .downcast_ref::<TimestampMillisecondVector>()
460 .unwrap()
461 .as_arrow()
462 .values(),
463 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
464 .timestamps
465 .as_any()
466 .downcast_ref::<TimestampMicrosecondVector>()
467 .unwrap()
468 .as_arrow()
469 .values(),
470 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
471 .timestamps
472 .as_any()
473 .downcast_ref::<TimestampNanosecondVector>()
474 .unwrap()
475 .as_arrow()
476 .values(),
477 other => panic!("timestamps in a Batch has other type {:?}", other),
478 };
479
480 Some(values)
481 }
482
483 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
485 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
486 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
487 .context(ComputeArrowSnafu)?;
488 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
490 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
491 .context(ComputeArrowSnafu)?;
492 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
493 for batch_column in &mut self.fields {
494 batch_column.data = batch_column
495 .data
496 .take(indices)
497 .context(ComputeVectorSnafu)?;
498 }
499
500 Ok(())
501 }
502
503 fn get_timestamp(&self, index: usize) -> Timestamp {
508 match self.timestamps.get_ref(index) {
509 ValueRef::Timestamp(timestamp) => timestamp,
510
511 value => panic!("{:?} is not a timestamp", value),
513 }
514 }
515
516 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
521 self.sequences.get_data(index).unwrap()
523 }
524
525 #[cfg(debug_assertions)]
527 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
528 use std::cmp::Ordering;
529 if self.timestamps_native().is_none() {
530 return Ok(());
531 }
532
533 let timestamps = self.timestamps_native().unwrap();
534 let sequences = self.sequences.as_arrow().values();
535 for (i, window) in timestamps.windows(2).enumerate() {
536 let current = window[0];
537 let next = window[1];
538 let current_sequence = sequences[i];
539 let next_sequence = sequences[i + 1];
540 match current.cmp(&next) {
541 Ordering::Less => {
542 continue;
544 }
545 Ordering::Equal => {
546 if current_sequence < next_sequence {
548 return Err(format!(
549 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
550 current, next, current_sequence, next_sequence, i
551 ));
552 }
553 }
554 Ordering::Greater => {
555 return Err(format!(
557 "timestamps are not monotonic: {} > {}, index: {}",
558 current, next, i
559 ));
560 }
561 }
562 }
563
564 Ok(())
565 }
566
567 #[cfg(debug_assertions)]
569 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
570 if self.primary_key() < other.primary_key() {
572 return Ok(());
573 }
574 if self.primary_key() > other.primary_key() {
575 return Err(format!(
576 "primary key is not monotonic: {:?} > {:?}",
577 self.primary_key(),
578 other.primary_key()
579 ));
580 }
581 if self.last_timestamp() < other.first_timestamp() {
583 return Ok(());
584 }
585 if self.last_timestamp() > other.first_timestamp() {
586 return Err(format!(
587 "timestamps are not monotonic: {:?} > {:?}",
588 self.last_timestamp(),
589 other.first_timestamp()
590 ));
591 }
592 if self.last_sequence() >= other.first_sequence() {
594 return Ok(());
595 }
596 Err(format!(
597 "sequences are not monotonic: {:?} < {:?}",
598 self.last_sequence(),
599 other.first_sequence()
600 ))
601 }
602
603 pub fn pk_col_value(
607 &mut self,
608 codec: &dyn PrimaryKeyCodec,
609 col_idx_in_pk: usize,
610 column_id: ColumnId,
611 ) -> Result<Option<&Value>> {
612 if self.pk_values.is_none() {
613 self.pk_values = Some(codec.decode(&self.primary_key)?);
614 }
615
616 let pk_values = self.pk_values.as_ref().unwrap();
617 Ok(match pk_values {
618 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
619 CompositeValues::Sparse(values) => values.get(&column_id),
620 })
621 }
622
623 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
627 if self.fields_idx.is_none() {
628 self.fields_idx = Some(
629 self.fields
630 .iter()
631 .enumerate()
632 .map(|(i, c)| (c.column_id, i))
633 .collect(),
634 );
635 }
636
637 self.fields_idx
638 .as_ref()
639 .unwrap()
640 .get(&column_id)
641 .map(|&idx| &self.fields[idx])
642 }
643}
644
645#[cfg(debug_assertions)]
647#[derive(Default)]
648pub(crate) struct BatchChecker {
649 last_batch: Option<Batch>,
650 start: Option<Timestamp>,
651 end: Option<Timestamp>,
652}
653
654#[cfg(debug_assertions)]
655impl BatchChecker {
656 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
658 self.start = start;
659 self
660 }
661
662 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
664 self.end = end;
665 self
666 }
667
668 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
671 batch.check_monotonic()?;
672
673 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
674 if start > first {
675 return Err(format!(
676 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
677 start, first
678 ));
679 }
680 }
681 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
682 if end <= last {
683 return Err(format!(
684 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
685 end, last
686 ));
687 }
688 }
689
690 let res = self
693 .last_batch
694 .as_ref()
695 .map(|last| last.check_next_batch(batch))
696 .unwrap_or(Ok(()));
697 self.last_batch = Some(batch.clone());
698 res
699 }
700
701 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
703 use std::fmt::Write;
704
705 let mut message = String::new();
706 if let Some(last) = &self.last_batch {
707 write!(
708 message,
709 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
710 last.primary_key(),
711 last.last_timestamp(),
712 last.last_sequence()
713 )
714 .unwrap();
715 }
716 write!(
717 message,
718 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
719 batch.primary_key(),
720 batch.timestamps(),
721 batch.sequences()
722 )
723 .unwrap();
724
725 message
726 }
727
728 pub(crate) fn ensure_part_range_batch(
730 &mut self,
731 scanner: &str,
732 region_id: store_api::storage::RegionId,
733 partition: usize,
734 part_range: store_api::region_engine::PartitionRange,
735 batch: &Batch,
736 ) {
737 if let Err(e) = self.check_monotonic(batch) {
738 let err_msg = format!(
739 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
740 scanner, e, region_id, partition, part_range,
741 );
742 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
743 panic!("{err_msg}, batch rows: {}", batch.num_rows());
745 }
746 }
747}
748
749const TIMESTAMP_KEY_LEN: usize = 9;
751
752fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
754 let arrays: Vec<_> = iter.collect();
755 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
756 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
757}
758
759#[derive(Debug, PartialEq, Eq, Clone)]
761pub struct BatchColumn {
762 pub column_id: ColumnId,
764 pub data: VectorRef,
766}
767
768pub struct BatchBuilder {
770 primary_key: Vec<u8>,
771 timestamps: Option<VectorRef>,
772 sequences: Option<Arc<UInt64Vector>>,
773 op_types: Option<Arc<UInt8Vector>>,
774 fields: Vec<BatchColumn>,
775}
776
777impl BatchBuilder {
778 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
780 BatchBuilder {
781 primary_key,
782 timestamps: None,
783 sequences: None,
784 op_types: None,
785 fields: Vec::new(),
786 }
787 }
788
789 pub fn with_required_columns(
791 primary_key: Vec<u8>,
792 timestamps: VectorRef,
793 sequences: Arc<UInt64Vector>,
794 op_types: Arc<UInt8Vector>,
795 ) -> BatchBuilder {
796 BatchBuilder {
797 primary_key,
798 timestamps: Some(timestamps),
799 sequences: Some(sequences),
800 op_types: Some(op_types),
801 fields: Vec::new(),
802 }
803 }
804
805 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
807 self.fields = fields;
808 self
809 }
810
811 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
813 self.fields.push(column);
814 self
815 }
816
817 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
819 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
820 self.fields.push(BatchColumn {
821 column_id,
822 data: vector,
823 });
824
825 Ok(self)
826 }
827
828 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
830 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
831 ensure!(
832 vector.data_type().is_timestamp(),
833 InvalidBatchSnafu {
834 reason: format!("{:?} is not a timestamp type", vector.data_type()),
835 }
836 );
837
838 self.timestamps = Some(vector);
839 Ok(self)
840 }
841
842 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
844 ensure!(
845 *array.data_type() == arrow::datatypes::DataType::UInt64,
846 InvalidBatchSnafu {
847 reason: "sequence array is not UInt64 type",
848 }
849 );
850 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
852 self.sequences = Some(vector);
853
854 Ok(self)
855 }
856
857 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
859 ensure!(
860 *array.data_type() == arrow::datatypes::DataType::UInt8,
861 InvalidBatchSnafu {
862 reason: "sequence array is not UInt8 type",
863 }
864 );
865 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
867 self.op_types = Some(vector);
868
869 Ok(self)
870 }
871
872 pub fn build(self) -> Result<Batch> {
874 let timestamps = self.timestamps.context(InvalidBatchSnafu {
875 reason: "missing timestamps",
876 })?;
877 let sequences = self.sequences.context(InvalidBatchSnafu {
878 reason: "missing sequences",
879 })?;
880 let op_types = self.op_types.context(InvalidBatchSnafu {
881 reason: "missing op_types",
882 })?;
883 assert_eq!(0, timestamps.null_count());
886 assert_eq!(0, sequences.null_count());
887 assert_eq!(0, op_types.null_count());
888
889 let ts_len = timestamps.len();
890 ensure!(
891 sequences.len() == ts_len,
892 InvalidBatchSnafu {
893 reason: format!(
894 "sequence have different len {} != {}",
895 sequences.len(),
896 ts_len
897 ),
898 }
899 );
900 ensure!(
901 op_types.len() == ts_len,
902 InvalidBatchSnafu {
903 reason: format!(
904 "op type have different len {} != {}",
905 op_types.len(),
906 ts_len
907 ),
908 }
909 );
910 for column in &self.fields {
911 ensure!(
912 column.data.len() == ts_len,
913 InvalidBatchSnafu {
914 reason: format!(
915 "column {} has different len {} != {}",
916 column.column_id,
917 column.data.len(),
918 ts_len
919 ),
920 }
921 );
922 }
923
924 Ok(Batch {
925 primary_key: self.primary_key,
926 pk_values: None,
927 timestamps,
928 sequences,
929 op_types,
930 fields: self.fields,
931 fields_idx: None,
932 })
933 }
934}
935
936impl From<Batch> for BatchBuilder {
937 fn from(batch: Batch) -> Self {
938 Self {
939 primary_key: batch.primary_key,
940 timestamps: Some(batch.timestamps),
941 sequences: Some(batch.sequences),
942 op_types: Some(batch.op_types),
943 fields: batch.fields,
944 }
945 }
946}
947
948pub enum Source {
952 Reader(BoxedBatchReader),
954 Iter(BoxedBatchIterator),
956 Stream(BoxedBatchStream),
958 PruneReader(PruneReader),
960}
961
962impl Source {
963 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
965 match self {
966 Source::Reader(reader) => reader.next_batch().await,
967 Source::Iter(iter) => iter.next().transpose(),
968 Source::Stream(stream) => stream.try_next().await,
969 Source::PruneReader(reader) => reader.next_batch().await,
970 }
971 }
972}
973
974#[async_trait]
978pub trait BatchReader: Send {
979 async fn next_batch(&mut self) -> Result<Option<Batch>>;
987}
988
989pub type BoxedBatchReader = Box<dyn BatchReader>;
991
992pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
994
995#[async_trait::async_trait]
996impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
997 async fn next_batch(&mut self) -> Result<Option<Batch>> {
998 (**self).next_batch().await
999 }
1000}
1001
1002#[derive(Debug, Default)]
1004pub(crate) struct ScannerMetrics {
1005 prepare_scan_cost: Duration,
1007 build_reader_cost: Duration,
1009 scan_cost: Duration,
1011 convert_cost: Duration,
1013 yield_cost: Duration,
1015 num_batches: usize,
1017 num_rows: usize,
1019 num_mem_ranges: usize,
1021 num_file_ranges: usize,
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027 use store_api::codec::PrimaryKeyEncoding;
1028 use store_api::storage::consts::ReservedColumnId;
1029
1030 use super::*;
1031 use crate::error::Error;
1032 use crate::row_converter::{self, build_primary_key_codec_with_fields};
1033 use crate::test_util::new_batch_builder;
1034
1035 fn new_batch(
1036 timestamps: &[i64],
1037 sequences: &[u64],
1038 op_types: &[OpType],
1039 field: &[u64],
1040 ) -> Batch {
1041 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1042 .build()
1043 .unwrap()
1044 }
1045
1046 #[test]
1047 fn test_empty_batch() {
1048 let batch = new_batch(&[], &[], &[], &[]);
1049 assert_eq!(None, batch.first_timestamp());
1050 assert_eq!(None, batch.last_timestamp());
1051 assert_eq!(None, batch.first_sequence());
1052 assert_eq!(None, batch.last_sequence());
1053 assert!(batch.timestamps_native().is_none());
1054 }
1055
1056 #[test]
1057 fn test_first_last_one() {
1058 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1059 assert_eq!(
1060 Timestamp::new_millisecond(1),
1061 batch.first_timestamp().unwrap()
1062 );
1063 assert_eq!(
1064 Timestamp::new_millisecond(1),
1065 batch.last_timestamp().unwrap()
1066 );
1067 assert_eq!(2, batch.first_sequence().unwrap());
1068 assert_eq!(2, batch.last_sequence().unwrap());
1069 }
1070
1071 #[test]
1072 fn test_first_last_multiple() {
1073 let batch = new_batch(
1074 &[1, 2, 3],
1075 &[11, 12, 13],
1076 &[OpType::Put, OpType::Put, OpType::Put],
1077 &[21, 22, 23],
1078 );
1079 assert_eq!(
1080 Timestamp::new_millisecond(1),
1081 batch.first_timestamp().unwrap()
1082 );
1083 assert_eq!(
1084 Timestamp::new_millisecond(3),
1085 batch.last_timestamp().unwrap()
1086 );
1087 assert_eq!(11, batch.first_sequence().unwrap());
1088 assert_eq!(13, batch.last_sequence().unwrap());
1089 }
1090
1091 #[test]
1092 fn test_slice() {
1093 let batch = new_batch(
1094 &[1, 2, 3, 4],
1095 &[11, 12, 13, 14],
1096 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1097 &[21, 22, 23, 24],
1098 );
1099 let batch = batch.slice(1, 2);
1100 let expect = new_batch(
1101 &[2, 3],
1102 &[12, 13],
1103 &[OpType::Delete, OpType::Put],
1104 &[22, 23],
1105 );
1106 assert_eq!(expect, batch);
1107 }
1108
1109 #[test]
1110 fn test_timestamps_native() {
1111 let batch = new_batch(
1112 &[1, 2, 3, 4],
1113 &[11, 12, 13, 14],
1114 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1115 &[21, 22, 23, 24],
1116 );
1117 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1118 }
1119
1120 #[test]
1121 fn test_concat_empty() {
1122 let err = Batch::concat(vec![]).unwrap_err();
1123 assert!(
1124 matches!(err, Error::InvalidBatch { .. }),
1125 "unexpected err: {err}"
1126 );
1127 }
1128
1129 #[test]
1130 fn test_concat_one() {
1131 let batch = new_batch(&[], &[], &[], &[]);
1132 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1133 assert_eq!(batch, actual);
1134
1135 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1136 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1137 assert_eq!(batch, actual);
1138 }
1139
1140 #[test]
1141 fn test_concat_multiple() {
1142 let batches = vec![
1143 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1144 new_batch(
1145 &[3, 4, 5],
1146 &[13, 14, 15],
1147 &[OpType::Put, OpType::Delete, OpType::Put],
1148 &[23, 24, 25],
1149 ),
1150 new_batch(&[], &[], &[], &[]),
1151 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1152 ];
1153 let batch = Batch::concat(batches).unwrap();
1154 let expect = new_batch(
1155 &[1, 2, 3, 4, 5, 6],
1156 &[11, 12, 13, 14, 15, 16],
1157 &[
1158 OpType::Put,
1159 OpType::Put,
1160 OpType::Put,
1161 OpType::Delete,
1162 OpType::Put,
1163 OpType::Put,
1164 ],
1165 &[21, 22, 23, 24, 25, 26],
1166 );
1167 assert_eq!(expect, batch);
1168 }
1169
1170 #[test]
1171 fn test_concat_different() {
1172 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1173 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1174 batch2.primary_key = b"hello".to_vec();
1175 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1176 assert!(
1177 matches!(err, Error::InvalidBatch { .. }),
1178 "unexpected err: {err}"
1179 );
1180 }
1181
1182 #[test]
1183 fn test_concat_different_fields() {
1184 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1185 let fields = vec![
1186 batch1.fields()[0].clone(),
1187 BatchColumn {
1188 column_id: 2,
1189 data: Arc::new(UInt64Vector::from_slice([2])),
1190 },
1191 ];
1192 let batch2 = batch1.clone().with_fields(fields).unwrap();
1194 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1195 assert!(
1196 matches!(err, Error::InvalidBatch { .. }),
1197 "unexpected err: {err}"
1198 );
1199
1200 let fields = vec![BatchColumn {
1202 column_id: 2,
1203 data: Arc::new(UInt64Vector::from_slice([2])),
1204 }];
1205 let batch2 = batch1.clone().with_fields(fields).unwrap();
1206 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1207 assert!(
1208 matches!(err, Error::InvalidBatch { .. }),
1209 "unexpected err: {err}"
1210 );
1211 }
1212
1213 #[test]
1214 fn test_filter_deleted_empty() {
1215 let mut batch = new_batch(&[], &[], &[], &[]);
1216 batch.filter_deleted().unwrap();
1217 assert!(batch.is_empty());
1218 }
1219
1220 #[test]
1221 fn test_filter_deleted() {
1222 let mut batch = new_batch(
1223 &[1, 2, 3, 4],
1224 &[11, 12, 13, 14],
1225 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1226 &[21, 22, 23, 24],
1227 );
1228 batch.filter_deleted().unwrap();
1229 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1230 assert_eq!(expect, batch);
1231
1232 let mut batch = new_batch(
1233 &[1, 2, 3, 4],
1234 &[11, 12, 13, 14],
1235 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1236 &[21, 22, 23, 24],
1237 );
1238 let expect = batch.clone();
1239 batch.filter_deleted().unwrap();
1240 assert_eq!(expect, batch);
1241 }
1242
1243 #[test]
1244 fn test_filter_by_sequence() {
1245 let mut batch = new_batch(
1247 &[1, 2, 3, 4],
1248 &[11, 12, 13, 14],
1249 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1250 &[21, 22, 23, 24],
1251 );
1252 batch.filter_by_sequence(Some(13)).unwrap();
1253 let expect = new_batch(
1254 &[1, 2, 3],
1255 &[11, 12, 13],
1256 &[OpType::Put, OpType::Put, OpType::Put],
1257 &[21, 22, 23],
1258 );
1259 assert_eq!(expect, batch);
1260
1261 let mut batch = new_batch(
1263 &[1, 2, 3, 4],
1264 &[11, 12, 13, 14],
1265 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1266 &[21, 22, 23, 24],
1267 );
1268
1269 batch.filter_by_sequence(Some(10)).unwrap();
1270 assert!(batch.is_empty());
1271
1272 let mut batch = new_batch(
1274 &[1, 2, 3, 4],
1275 &[11, 12, 13, 14],
1276 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1277 &[21, 22, 23, 24],
1278 );
1279 let expect = batch.clone();
1280 batch.filter_by_sequence(None).unwrap();
1281 assert_eq!(expect, batch);
1282
1283 let mut batch = new_batch(&[], &[], &[], &[]);
1285 batch.filter_by_sequence(Some(10)).unwrap();
1286 assert!(batch.is_empty());
1287
1288 let mut batch = new_batch(&[], &[], &[], &[]);
1290 batch.filter_by_sequence(None).unwrap();
1291 assert!(batch.is_empty());
1292 }
1293
1294 #[test]
1295 fn test_filter() {
1296 let mut batch = new_batch(
1298 &[1, 2, 3, 4],
1299 &[11, 12, 13, 14],
1300 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1301 &[21, 22, 23, 24],
1302 );
1303 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1304 batch.filter(&predicate).unwrap();
1305 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1306 assert_eq!(expect, batch);
1307
1308 let mut batch = new_batch(
1310 &[1, 2, 3, 4],
1311 &[11, 12, 13, 14],
1312 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1313 &[21, 22, 23, 24],
1314 );
1315 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1316 batch.filter(&predicate).unwrap();
1317 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1318 assert_eq!(expect, batch);
1319
1320 let predicate = BooleanVector::from_vec(vec![false, false]);
1322 batch.filter(&predicate).unwrap();
1323 assert!(batch.is_empty());
1324 }
1325
1326 #[test]
1327 fn test_sort_and_dedup() {
1328 let original = new_batch(
1329 &[2, 3, 1, 4, 5, 2],
1330 &[1, 2, 3, 4, 5, 6],
1331 &[
1332 OpType::Put,
1333 OpType::Put,
1334 OpType::Put,
1335 OpType::Put,
1336 OpType::Put,
1337 OpType::Put,
1338 ],
1339 &[21, 22, 23, 24, 25, 26],
1340 );
1341
1342 let mut batch = original.clone();
1343 batch.sort(true).unwrap();
1344 assert_eq!(
1346 new_batch(
1347 &[1, 2, 3, 4, 5],
1348 &[3, 6, 2, 4, 5],
1349 &[
1350 OpType::Put,
1351 OpType::Put,
1352 OpType::Put,
1353 OpType::Put,
1354 OpType::Put,
1355 ],
1356 &[23, 26, 22, 24, 25],
1357 ),
1358 batch
1359 );
1360
1361 let mut batch = original.clone();
1362 batch.sort(false).unwrap();
1363
1364 assert_eq!(
1366 new_batch(
1367 &[1, 2, 2, 3, 4, 5],
1368 &[3, 6, 1, 2, 4, 5],
1369 &[
1370 OpType::Put,
1371 OpType::Put,
1372 OpType::Put,
1373 OpType::Put,
1374 OpType::Put,
1375 OpType::Put,
1376 ],
1377 &[23, 26, 21, 22, 24, 25],
1378 ),
1379 batch
1380 );
1381
1382 let original = new_batch(
1383 &[2, 2, 1],
1384 &[1, 6, 1],
1385 &[OpType::Delete, OpType::Put, OpType::Put],
1386 &[21, 22, 23],
1387 );
1388
1389 let mut batch = original.clone();
1390 batch.sort(true).unwrap();
1391 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1392 assert_eq!(expect, batch);
1393
1394 let mut batch = original.clone();
1395 batch.sort(false).unwrap();
1396 let expect = new_batch(
1397 &[1, 2, 2],
1398 &[1, 6, 1],
1399 &[OpType::Put, OpType::Put, OpType::Delete],
1400 &[23, 22, 21],
1401 );
1402 assert_eq!(expect, batch);
1403 }
1404
1405 #[test]
1406 fn test_get_value() {
1407 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1408
1409 for encoding in encodings {
1410 let codec = build_primary_key_codec_with_fields(
1411 encoding,
1412 [
1413 (
1414 ReservedColumnId::table_id(),
1415 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1416 ),
1417 (
1418 ReservedColumnId::tsid(),
1419 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1420 ),
1421 (
1422 100,
1423 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1424 ),
1425 (
1426 200,
1427 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1428 ),
1429 ]
1430 .into_iter(),
1431 );
1432
1433 let values = [
1434 Value::UInt32(1000),
1435 Value::UInt64(2000),
1436 Value::String("abcdefgh".into()),
1437 Value::String("zyxwvu".into()),
1438 ];
1439 let mut buf = vec![];
1440 codec
1441 .encode_values(
1442 &[
1443 (ReservedColumnId::table_id(), values[0].clone()),
1444 (ReservedColumnId::tsid(), values[1].clone()),
1445 (100, values[2].clone()),
1446 (200, values[3].clone()),
1447 ],
1448 &mut buf,
1449 )
1450 .unwrap();
1451
1452 let field_col_id = 2;
1453 let mut batch = new_batch_builder(
1454 &buf,
1455 &[1, 2, 3],
1456 &[1, 1, 1],
1457 &[OpType::Put, OpType::Put, OpType::Put],
1458 field_col_id,
1459 &[42, 43, 44],
1460 )
1461 .build()
1462 .unwrap();
1463
1464 let v = batch
1465 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1466 .unwrap()
1467 .unwrap();
1468 assert_eq!(values[0], *v);
1469
1470 let v = batch
1471 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1472 .unwrap()
1473 .unwrap();
1474 assert_eq!(values[1], *v);
1475
1476 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1477 assert_eq!(values[2], *v);
1478
1479 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1480 assert_eq!(values[3], *v);
1481
1482 let v = batch.field_col_value(field_col_id).unwrap();
1483 assert_eq!(v.data.get(0), Value::UInt64(42));
1484 assert_eq!(v.data.get(1), Value::UInt64(43));
1485 assert_eq!(v.data.get(2), Value::UInt64(44));
1486 }
1487 }
1488}