1pub mod compat;
18pub mod dedup;
19pub mod last_row;
20pub mod merge;
21pub mod plain_batch;
22pub mod projection;
23pub(crate) mod prune;
24pub(crate) mod range;
25pub(crate) mod scan_region;
26pub(crate) mod scan_util;
27pub(crate) mod seq_scan;
28pub(crate) mod series_scan;
29pub(crate) mod unordered_scan;
30
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use std::time::Duration;
34
35use api::v1::OpType;
36use async_trait::async_trait;
37use common_time::Timestamp;
38use datafusion_common::arrow::array::UInt8Array;
39use datatypes::arrow;
40use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
41use datatypes::arrow::compute::SortOptions;
42use datatypes::arrow::row::{RowConverter, SortField};
43use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
44use datatypes::types::TimestampType;
45use datatypes::value::{Value, ValueRef};
46use datatypes::vectors::{
47 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
48 TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
49 Vector, VectorRef,
50};
51use futures::stream::BoxStream;
52use futures::TryStreamExt;
53use snafu::{ensure, OptionExt, ResultExt};
54use store_api::metadata::RegionMetadata;
55use store_api::storage::{ColumnId, SequenceNumber};
56
57use crate::error::{
58 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
59};
60use crate::memtable::BoxedBatchIterator;
61use crate::read::prune::PruneReader;
62use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
63
64#[derive(Debug, PartialEq, Clone)]
69pub struct Batch {
70 primary_key: Vec<u8>,
72 pk_values: Option<CompositeValues>,
74 timestamps: VectorRef,
76 sequences: Arc<UInt64Vector>,
80 op_types: Arc<UInt8Vector>,
84 fields: Vec<BatchColumn>,
86 fields_idx: Option<HashMap<ColumnId, usize>>,
88}
89
90impl Batch {
91 pub fn new(
93 primary_key: Vec<u8>,
94 timestamps: VectorRef,
95 sequences: Arc<UInt64Vector>,
96 op_types: Arc<UInt8Vector>,
97 fields: Vec<BatchColumn>,
98 ) -> Result<Batch> {
99 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
100 .with_fields(fields)
101 .build()
102 }
103
104 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
106 Batch::new(
107 self.primary_key,
108 self.timestamps,
109 self.sequences,
110 self.op_types,
111 fields,
112 )
113 }
114
115 pub fn primary_key(&self) -> &[u8] {
117 &self.primary_key
118 }
119
120 pub fn pk_values(&self) -> Option<&CompositeValues> {
122 self.pk_values.as_ref()
123 }
124
125 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
127 self.pk_values = Some(pk_values);
128 }
129
130 #[cfg(any(test, feature = "test"))]
132 pub fn remove_pk_values(&mut self) {
133 self.pk_values = None;
134 }
135
136 pub fn fields(&self) -> &[BatchColumn] {
138 &self.fields
139 }
140
141 pub fn timestamps(&self) -> &VectorRef {
143 &self.timestamps
144 }
145
146 pub fn sequences(&self) -> &Arc<UInt64Vector> {
148 &self.sequences
149 }
150
151 pub fn op_types(&self) -> &Arc<UInt8Vector> {
153 &self.op_types
154 }
155
156 pub fn num_rows(&self) -> usize {
158 self.sequences.len()
161 }
162
163 pub fn is_empty(&self) -> bool {
165 self.num_rows() == 0
166 }
167
168 pub fn first_timestamp(&self) -> Option<Timestamp> {
170 if self.timestamps.is_empty() {
171 return None;
172 }
173
174 Some(self.get_timestamp(0))
175 }
176
177 pub fn last_timestamp(&self) -> Option<Timestamp> {
179 if self.timestamps.is_empty() {
180 return None;
181 }
182
183 Some(self.get_timestamp(self.timestamps.len() - 1))
184 }
185
186 pub fn first_sequence(&self) -> Option<SequenceNumber> {
188 if self.sequences.is_empty() {
189 return None;
190 }
191
192 Some(self.get_sequence(0))
193 }
194
195 pub fn last_sequence(&self) -> Option<SequenceNumber> {
197 if self.sequences.is_empty() {
198 return None;
199 }
200
201 Some(self.get_sequence(self.sequences.len() - 1))
202 }
203
204 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
209 self.primary_key = primary_key;
210 }
211
212 pub fn slice(&self, offset: usize, length: usize) -> Batch {
217 let fields = self
218 .fields
219 .iter()
220 .map(|column| BatchColumn {
221 column_id: column.column_id,
222 data: column.data.slice(offset, length),
223 })
224 .collect();
225 Batch {
227 primary_key: self.primary_key.clone(),
230 pk_values: self.pk_values.clone(),
231 timestamps: self.timestamps.slice(offset, length),
232 sequences: Arc::new(self.sequences.get_slice(offset, length)),
233 op_types: Arc::new(self.op_types.get_slice(offset, length)),
234 fields,
235 fields_idx: self.fields_idx.clone(),
236 }
237 }
238
239 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
243 ensure!(
244 !batches.is_empty(),
245 InvalidBatchSnafu {
246 reason: "empty batches",
247 }
248 );
249 if batches.len() == 1 {
250 return Ok(batches.pop().unwrap());
252 }
253
254 let primary_key = std::mem::take(&mut batches[0].primary_key);
255 let first = &batches[0];
256 ensure!(
258 batches
259 .iter()
260 .skip(1)
261 .all(|b| b.primary_key() == primary_key),
262 InvalidBatchSnafu {
263 reason: "batches have different primary key",
264 }
265 );
266 for b in batches.iter().skip(1) {
267 ensure!(
268 b.fields.len() == first.fields.len(),
269 InvalidBatchSnafu {
270 reason: "batches have different field num",
271 }
272 );
273 for (l, r) in b.fields.iter().zip(&first.fields) {
274 ensure!(
275 l.column_id == r.column_id,
276 InvalidBatchSnafu {
277 reason: "batches have different fields",
278 }
279 );
280 }
281 }
282
283 let mut builder = BatchBuilder::new(primary_key);
285 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
287 builder.timestamps_array(array)?;
288 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
289 builder.sequences_array(array)?;
290 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
291 builder.op_types_array(array)?;
292 for (i, batch_column) in first.fields.iter().enumerate() {
293 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
294 builder.push_field_array(batch_column.column_id, array)?;
295 }
296
297 builder.build()
298 }
299
300 pub fn filter_deleted(&mut self) -> Result<()> {
302 let array = self.op_types.as_arrow();
304 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
306 let predicate =
307 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
308 self.filter(&BooleanVector::from(predicate))
309 }
310
311 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
314 self.timestamps = self
315 .timestamps
316 .filter(predicate)
317 .context(ComputeVectorSnafu)?;
318 self.sequences = Arc::new(
319 UInt64Vector::try_from_arrow_array(
320 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
321 .context(ComputeArrowSnafu)?,
322 )
323 .unwrap(),
324 );
325 self.op_types = Arc::new(
326 UInt8Vector::try_from_arrow_array(
327 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
328 .context(ComputeArrowSnafu)?,
329 )
330 .unwrap(),
331 );
332 for batch_column in &mut self.fields {
333 batch_column.data = batch_column
334 .data
335 .filter(predicate)
336 .context(ComputeVectorSnafu)?;
337 }
338
339 Ok(())
340 }
341
342 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
344 let seq = match (sequence, self.last_sequence()) {
345 (None, _) | (_, None) => return Ok(()),
346 (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
347 (Some(sequence), Some(_)) => sequence,
348 };
349
350 let seqs = self.sequences.as_arrow();
351 let sequence = UInt64Array::new_scalar(seq);
352 let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
353 .context(ComputeArrowSnafu)?;
354 let predicate = BooleanVector::from(predicate);
355 self.filter(&predicate)?;
356
357 Ok(())
358 }
359
360 pub fn sort(&mut self, dedup: bool) -> Result<()> {
367 let converter = RowConverter::new(vec![
370 SortField::new(self.timestamps.data_type().as_arrow_type()),
371 SortField::new_with_options(
372 self.sequences.data_type().as_arrow_type(),
373 SortOptions {
374 descending: true,
375 ..Default::default()
376 },
377 ),
378 ])
379 .context(ComputeArrowSnafu)?;
380 let columns = [
382 self.timestamps.to_arrow_array(),
383 self.sequences.to_arrow_array(),
384 ];
385 let rows = converter.convert_columns(&columns).unwrap();
386 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
387
388 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
389 if !was_sorted {
390 to_sort.sort_unstable_by_key(|x| x.1);
391 }
392
393 let num_rows = to_sort.len();
394 if dedup {
395 to_sort.dedup_by(|left, right| {
397 debug_assert_eq!(18, left.1.as_ref().len());
398 debug_assert_eq!(18, right.1.as_ref().len());
399 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
400 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
402 });
403 }
404 let no_dedup = to_sort.len() == num_rows;
405
406 if was_sorted && no_dedup {
407 return Ok(());
408 }
409 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
410 self.take_in_place(&indices)
411 }
412
413 pub fn memory_size(&self) -> usize {
415 let mut size = std::mem::size_of::<Self>();
416 size += self.primary_key.len();
417 size += self.timestamps.memory_size();
418 size += self.sequences.memory_size();
419 size += self.op_types.memory_size();
420 for batch_column in &self.fields {
421 size += batch_column.data.memory_size();
422 }
423 size
424 }
425
426 pub(crate) fn projected_fields(
428 metadata: &RegionMetadata,
429 projection: &[ColumnId],
430 ) -> Vec<(ColumnId, ConcreteDataType)> {
431 let projected_ids: HashSet<_> = projection.iter().copied().collect();
432 metadata
433 .field_columns()
434 .filter_map(|column| {
435 if projected_ids.contains(&column.column_id) {
436 Some((column.column_id, column.column_schema.data_type.clone()))
437 } else {
438 None
439 }
440 })
441 .collect()
442 }
443
444 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
446 if self.timestamps.is_empty() {
447 return None;
448 }
449
450 let values = match self.timestamps.data_type() {
451 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
452 .timestamps
453 .as_any()
454 .downcast_ref::<TimestampSecondVector>()
455 .unwrap()
456 .as_arrow()
457 .values(),
458 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
459 .timestamps
460 .as_any()
461 .downcast_ref::<TimestampMillisecondVector>()
462 .unwrap()
463 .as_arrow()
464 .values(),
465 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
466 .timestamps
467 .as_any()
468 .downcast_ref::<TimestampMicrosecondVector>()
469 .unwrap()
470 .as_arrow()
471 .values(),
472 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
473 .timestamps
474 .as_any()
475 .downcast_ref::<TimestampNanosecondVector>()
476 .unwrap()
477 .as_arrow()
478 .values(),
479 other => panic!("timestamps in a Batch has other type {:?}", other),
480 };
481
482 Some(values)
483 }
484
485 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
487 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
488 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
489 .context(ComputeArrowSnafu)?;
490 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
492 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
493 .context(ComputeArrowSnafu)?;
494 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
495 for batch_column in &mut self.fields {
496 batch_column.data = batch_column
497 .data
498 .take(indices)
499 .context(ComputeVectorSnafu)?;
500 }
501
502 Ok(())
503 }
504
505 fn get_timestamp(&self, index: usize) -> Timestamp {
510 match self.timestamps.get_ref(index) {
511 ValueRef::Timestamp(timestamp) => timestamp,
512
513 value => panic!("{:?} is not a timestamp", value),
515 }
516 }
517
518 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
523 self.sequences.get_data(index).unwrap()
525 }
526
527 #[cfg(debug_assertions)]
529 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
530 use std::cmp::Ordering;
531 if self.timestamps_native().is_none() {
532 return Ok(());
533 }
534
535 let timestamps = self.timestamps_native().unwrap();
536 let sequences = self.sequences.as_arrow().values();
537 for (i, window) in timestamps.windows(2).enumerate() {
538 let current = window[0];
539 let next = window[1];
540 let current_sequence = sequences[i];
541 let next_sequence = sequences[i + 1];
542 match current.cmp(&next) {
543 Ordering::Less => {
544 continue;
546 }
547 Ordering::Equal => {
548 if current_sequence < next_sequence {
550 return Err(format!(
551 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
552 current, next, current_sequence, next_sequence, i
553 ));
554 }
555 }
556 Ordering::Greater => {
557 return Err(format!(
559 "timestamps are not monotonic: {} > {}, index: {}",
560 current, next, i
561 ));
562 }
563 }
564 }
565
566 Ok(())
567 }
568
569 #[cfg(debug_assertions)]
571 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
572 if self.primary_key() < other.primary_key() {
574 return Ok(());
575 }
576 if self.primary_key() > other.primary_key() {
577 return Err(format!(
578 "primary key is not monotonic: {:?} > {:?}",
579 self.primary_key(),
580 other.primary_key()
581 ));
582 }
583 if self.last_timestamp() < other.first_timestamp() {
585 return Ok(());
586 }
587 if self.last_timestamp() > other.first_timestamp() {
588 return Err(format!(
589 "timestamps are not monotonic: {:?} > {:?}",
590 self.last_timestamp(),
591 other.first_timestamp()
592 ));
593 }
594 if self.last_sequence() >= other.first_sequence() {
596 return Ok(());
597 }
598 Err(format!(
599 "sequences are not monotonic: {:?} < {:?}",
600 self.last_sequence(),
601 other.first_sequence()
602 ))
603 }
604
605 pub fn pk_col_value(
609 &mut self,
610 codec: &dyn PrimaryKeyCodec,
611 col_idx_in_pk: usize,
612 column_id: ColumnId,
613 ) -> Result<Option<&Value>> {
614 if self.pk_values.is_none() {
615 self.pk_values = Some(codec.decode(&self.primary_key)?);
616 }
617
618 let pk_values = self.pk_values.as_ref().unwrap();
619 Ok(match pk_values {
620 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
621 CompositeValues::Sparse(values) => values.get(&column_id),
622 })
623 }
624
625 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
629 if self.fields_idx.is_none() {
630 self.fields_idx = Some(
631 self.fields
632 .iter()
633 .enumerate()
634 .map(|(i, c)| (c.column_id, i))
635 .collect(),
636 );
637 }
638
639 self.fields_idx
640 .as_ref()
641 .unwrap()
642 .get(&column_id)
643 .map(|&idx| &self.fields[idx])
644 }
645}
646
647#[cfg(debug_assertions)]
649#[derive(Default)]
650pub(crate) struct BatchChecker {
651 last_batch: Option<Batch>,
652 start: Option<Timestamp>,
653 end: Option<Timestamp>,
654}
655
656#[cfg(debug_assertions)]
657impl BatchChecker {
658 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
660 self.start = start;
661 self
662 }
663
664 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
666 self.end = end;
667 self
668 }
669
670 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
673 batch.check_monotonic()?;
674
675 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) {
676 if start > first {
677 return Err(format!(
678 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
679 start, first
680 ));
681 }
682 }
683 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) {
684 if end <= last {
685 return Err(format!(
686 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
687 end, last
688 ));
689 }
690 }
691
692 let res = self
695 .last_batch
696 .as_ref()
697 .map(|last| last.check_next_batch(batch))
698 .unwrap_or(Ok(()));
699 self.last_batch = Some(batch.clone());
700 res
701 }
702
703 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
705 use std::fmt::Write;
706
707 let mut message = String::new();
708 if let Some(last) = &self.last_batch {
709 write!(
710 message,
711 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
712 last.primary_key(),
713 last.last_timestamp(),
714 last.last_sequence()
715 )
716 .unwrap();
717 }
718 write!(
719 message,
720 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
721 batch.primary_key(),
722 batch.timestamps(),
723 batch.sequences()
724 )
725 .unwrap();
726
727 message
728 }
729
730 pub(crate) fn ensure_part_range_batch(
732 &mut self,
733 scanner: &str,
734 region_id: store_api::storage::RegionId,
735 partition: usize,
736 part_range: store_api::region_engine::PartitionRange,
737 batch: &Batch,
738 ) {
739 if let Err(e) = self.check_monotonic(batch) {
740 let err_msg = format!(
741 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
742 scanner, e, region_id, partition, part_range,
743 );
744 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
745 panic!("{err_msg}, batch rows: {}", batch.num_rows());
747 }
748 }
749}
750
751const TIMESTAMP_KEY_LEN: usize = 9;
753
754fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
756 let arrays: Vec<_> = iter.collect();
757 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
758 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
759}
760
761#[derive(Debug, PartialEq, Eq, Clone)]
763pub struct BatchColumn {
764 pub column_id: ColumnId,
766 pub data: VectorRef,
768}
769
770pub struct BatchBuilder {
772 primary_key: Vec<u8>,
773 timestamps: Option<VectorRef>,
774 sequences: Option<Arc<UInt64Vector>>,
775 op_types: Option<Arc<UInt8Vector>>,
776 fields: Vec<BatchColumn>,
777}
778
779impl BatchBuilder {
780 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
782 BatchBuilder {
783 primary_key,
784 timestamps: None,
785 sequences: None,
786 op_types: None,
787 fields: Vec::new(),
788 }
789 }
790
791 pub fn with_required_columns(
793 primary_key: Vec<u8>,
794 timestamps: VectorRef,
795 sequences: Arc<UInt64Vector>,
796 op_types: Arc<UInt8Vector>,
797 ) -> BatchBuilder {
798 BatchBuilder {
799 primary_key,
800 timestamps: Some(timestamps),
801 sequences: Some(sequences),
802 op_types: Some(op_types),
803 fields: Vec::new(),
804 }
805 }
806
807 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
809 self.fields = fields;
810 self
811 }
812
813 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
815 self.fields.push(column);
816 self
817 }
818
819 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
821 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
822 self.fields.push(BatchColumn {
823 column_id,
824 data: vector,
825 });
826
827 Ok(self)
828 }
829
830 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
832 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
833 ensure!(
834 vector.data_type().is_timestamp(),
835 InvalidBatchSnafu {
836 reason: format!("{:?} is not a timestamp type", vector.data_type()),
837 }
838 );
839
840 self.timestamps = Some(vector);
841 Ok(self)
842 }
843
844 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
846 ensure!(
847 *array.data_type() == arrow::datatypes::DataType::UInt64,
848 InvalidBatchSnafu {
849 reason: "sequence array is not UInt64 type",
850 }
851 );
852 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
854 self.sequences = Some(vector);
855
856 Ok(self)
857 }
858
859 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
861 ensure!(
862 *array.data_type() == arrow::datatypes::DataType::UInt8,
863 InvalidBatchSnafu {
864 reason: "sequence array is not UInt8 type",
865 }
866 );
867 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
869 self.op_types = Some(vector);
870
871 Ok(self)
872 }
873
874 pub fn build(self) -> Result<Batch> {
876 let timestamps = self.timestamps.context(InvalidBatchSnafu {
877 reason: "missing timestamps",
878 })?;
879 let sequences = self.sequences.context(InvalidBatchSnafu {
880 reason: "missing sequences",
881 })?;
882 let op_types = self.op_types.context(InvalidBatchSnafu {
883 reason: "missing op_types",
884 })?;
885 assert_eq!(0, timestamps.null_count());
888 assert_eq!(0, sequences.null_count());
889 assert_eq!(0, op_types.null_count());
890
891 let ts_len = timestamps.len();
892 ensure!(
893 sequences.len() == ts_len,
894 InvalidBatchSnafu {
895 reason: format!(
896 "sequence have different len {} != {}",
897 sequences.len(),
898 ts_len
899 ),
900 }
901 );
902 ensure!(
903 op_types.len() == ts_len,
904 InvalidBatchSnafu {
905 reason: format!(
906 "op type have different len {} != {}",
907 op_types.len(),
908 ts_len
909 ),
910 }
911 );
912 for column in &self.fields {
913 ensure!(
914 column.data.len() == ts_len,
915 InvalidBatchSnafu {
916 reason: format!(
917 "column {} has different len {} != {}",
918 column.column_id,
919 column.data.len(),
920 ts_len
921 ),
922 }
923 );
924 }
925
926 Ok(Batch {
927 primary_key: self.primary_key,
928 pk_values: None,
929 timestamps,
930 sequences,
931 op_types,
932 fields: self.fields,
933 fields_idx: None,
934 })
935 }
936}
937
938impl From<Batch> for BatchBuilder {
939 fn from(batch: Batch) -> Self {
940 Self {
941 primary_key: batch.primary_key,
942 timestamps: Some(batch.timestamps),
943 sequences: Some(batch.sequences),
944 op_types: Some(batch.op_types),
945 fields: batch.fields,
946 }
947 }
948}
949
950pub enum Source {
954 Reader(BoxedBatchReader),
956 Iter(BoxedBatchIterator),
958 Stream(BoxedBatchStream),
960 PruneReader(PruneReader),
962}
963
964impl Source {
965 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
967 match self {
968 Source::Reader(reader) => reader.next_batch().await,
969 Source::Iter(iter) => iter.next().transpose(),
970 Source::Stream(stream) => stream.try_next().await,
971 Source::PruneReader(reader) => reader.next_batch().await,
972 }
973 }
974}
975
976#[async_trait]
980pub trait BatchReader: Send {
981 async fn next_batch(&mut self) -> Result<Option<Batch>>;
989}
990
991pub type BoxedBatchReader = Box<dyn BatchReader>;
993
994pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
996
997#[async_trait::async_trait]
998impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
999 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1000 (**self).next_batch().await
1001 }
1002}
1003
1004#[derive(Debug, Default)]
1006pub(crate) struct ScannerMetrics {
1007 prepare_scan_cost: Duration,
1009 build_reader_cost: Duration,
1011 scan_cost: Duration,
1013 convert_cost: Duration,
1015 yield_cost: Duration,
1017 num_batches: usize,
1019 num_rows: usize,
1021 num_mem_ranges: usize,
1023 num_file_ranges: usize,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use store_api::codec::PrimaryKeyEncoding;
1030 use store_api::storage::consts::ReservedColumnId;
1031
1032 use super::*;
1033 use crate::error::Error;
1034 use crate::row_converter::{self, build_primary_key_codec_with_fields};
1035 use crate::test_util::new_batch_builder;
1036
1037 fn new_batch(
1038 timestamps: &[i64],
1039 sequences: &[u64],
1040 op_types: &[OpType],
1041 field: &[u64],
1042 ) -> Batch {
1043 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1044 .build()
1045 .unwrap()
1046 }
1047
1048 #[test]
1049 fn test_empty_batch() {
1050 let batch = new_batch(&[], &[], &[], &[]);
1051 assert_eq!(None, batch.first_timestamp());
1052 assert_eq!(None, batch.last_timestamp());
1053 assert_eq!(None, batch.first_sequence());
1054 assert_eq!(None, batch.last_sequence());
1055 assert!(batch.timestamps_native().is_none());
1056 }
1057
1058 #[test]
1059 fn test_first_last_one() {
1060 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1061 assert_eq!(
1062 Timestamp::new_millisecond(1),
1063 batch.first_timestamp().unwrap()
1064 );
1065 assert_eq!(
1066 Timestamp::new_millisecond(1),
1067 batch.last_timestamp().unwrap()
1068 );
1069 assert_eq!(2, batch.first_sequence().unwrap());
1070 assert_eq!(2, batch.last_sequence().unwrap());
1071 }
1072
1073 #[test]
1074 fn test_first_last_multiple() {
1075 let batch = new_batch(
1076 &[1, 2, 3],
1077 &[11, 12, 13],
1078 &[OpType::Put, OpType::Put, OpType::Put],
1079 &[21, 22, 23],
1080 );
1081 assert_eq!(
1082 Timestamp::new_millisecond(1),
1083 batch.first_timestamp().unwrap()
1084 );
1085 assert_eq!(
1086 Timestamp::new_millisecond(3),
1087 batch.last_timestamp().unwrap()
1088 );
1089 assert_eq!(11, batch.first_sequence().unwrap());
1090 assert_eq!(13, batch.last_sequence().unwrap());
1091 }
1092
1093 #[test]
1094 fn test_slice() {
1095 let batch = new_batch(
1096 &[1, 2, 3, 4],
1097 &[11, 12, 13, 14],
1098 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1099 &[21, 22, 23, 24],
1100 );
1101 let batch = batch.slice(1, 2);
1102 let expect = new_batch(
1103 &[2, 3],
1104 &[12, 13],
1105 &[OpType::Delete, OpType::Put],
1106 &[22, 23],
1107 );
1108 assert_eq!(expect, batch);
1109 }
1110
1111 #[test]
1112 fn test_timestamps_native() {
1113 let batch = new_batch(
1114 &[1, 2, 3, 4],
1115 &[11, 12, 13, 14],
1116 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1117 &[21, 22, 23, 24],
1118 );
1119 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1120 }
1121
1122 #[test]
1123 fn test_concat_empty() {
1124 let err = Batch::concat(vec![]).unwrap_err();
1125 assert!(
1126 matches!(err, Error::InvalidBatch { .. }),
1127 "unexpected err: {err}"
1128 );
1129 }
1130
1131 #[test]
1132 fn test_concat_one() {
1133 let batch = new_batch(&[], &[], &[], &[]);
1134 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1135 assert_eq!(batch, actual);
1136
1137 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1138 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1139 assert_eq!(batch, actual);
1140 }
1141
1142 #[test]
1143 fn test_concat_multiple() {
1144 let batches = vec![
1145 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1146 new_batch(
1147 &[3, 4, 5],
1148 &[13, 14, 15],
1149 &[OpType::Put, OpType::Delete, OpType::Put],
1150 &[23, 24, 25],
1151 ),
1152 new_batch(&[], &[], &[], &[]),
1153 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1154 ];
1155 let batch = Batch::concat(batches).unwrap();
1156 let expect = new_batch(
1157 &[1, 2, 3, 4, 5, 6],
1158 &[11, 12, 13, 14, 15, 16],
1159 &[
1160 OpType::Put,
1161 OpType::Put,
1162 OpType::Put,
1163 OpType::Delete,
1164 OpType::Put,
1165 OpType::Put,
1166 ],
1167 &[21, 22, 23, 24, 25, 26],
1168 );
1169 assert_eq!(expect, batch);
1170 }
1171
1172 #[test]
1173 fn test_concat_different() {
1174 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1175 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1176 batch2.primary_key = b"hello".to_vec();
1177 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1178 assert!(
1179 matches!(err, Error::InvalidBatch { .. }),
1180 "unexpected err: {err}"
1181 );
1182 }
1183
1184 #[test]
1185 fn test_concat_different_fields() {
1186 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1187 let fields = vec![
1188 batch1.fields()[0].clone(),
1189 BatchColumn {
1190 column_id: 2,
1191 data: Arc::new(UInt64Vector::from_slice([2])),
1192 },
1193 ];
1194 let batch2 = batch1.clone().with_fields(fields).unwrap();
1196 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1197 assert!(
1198 matches!(err, Error::InvalidBatch { .. }),
1199 "unexpected err: {err}"
1200 );
1201
1202 let fields = vec![BatchColumn {
1204 column_id: 2,
1205 data: Arc::new(UInt64Vector::from_slice([2])),
1206 }];
1207 let batch2 = batch1.clone().with_fields(fields).unwrap();
1208 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1209 assert!(
1210 matches!(err, Error::InvalidBatch { .. }),
1211 "unexpected err: {err}"
1212 );
1213 }
1214
1215 #[test]
1216 fn test_filter_deleted_empty() {
1217 let mut batch = new_batch(&[], &[], &[], &[]);
1218 batch.filter_deleted().unwrap();
1219 assert!(batch.is_empty());
1220 }
1221
1222 #[test]
1223 fn test_filter_deleted() {
1224 let mut batch = new_batch(
1225 &[1, 2, 3, 4],
1226 &[11, 12, 13, 14],
1227 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1228 &[21, 22, 23, 24],
1229 );
1230 batch.filter_deleted().unwrap();
1231 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1232 assert_eq!(expect, batch);
1233
1234 let mut batch = new_batch(
1235 &[1, 2, 3, 4],
1236 &[11, 12, 13, 14],
1237 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1238 &[21, 22, 23, 24],
1239 );
1240 let expect = batch.clone();
1241 batch.filter_deleted().unwrap();
1242 assert_eq!(expect, batch);
1243 }
1244
1245 #[test]
1246 fn test_filter_by_sequence() {
1247 let mut batch = new_batch(
1249 &[1, 2, 3, 4],
1250 &[11, 12, 13, 14],
1251 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1252 &[21, 22, 23, 24],
1253 );
1254 batch.filter_by_sequence(Some(13)).unwrap();
1255 let expect = new_batch(
1256 &[1, 2, 3],
1257 &[11, 12, 13],
1258 &[OpType::Put, OpType::Put, OpType::Put],
1259 &[21, 22, 23],
1260 );
1261 assert_eq!(expect, batch);
1262
1263 let mut batch = new_batch(
1265 &[1, 2, 3, 4],
1266 &[11, 12, 13, 14],
1267 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1268 &[21, 22, 23, 24],
1269 );
1270
1271 batch.filter_by_sequence(Some(10)).unwrap();
1272 assert!(batch.is_empty());
1273
1274 let mut batch = new_batch(
1276 &[1, 2, 3, 4],
1277 &[11, 12, 13, 14],
1278 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1279 &[21, 22, 23, 24],
1280 );
1281 let expect = batch.clone();
1282 batch.filter_by_sequence(None).unwrap();
1283 assert_eq!(expect, batch);
1284
1285 let mut batch = new_batch(&[], &[], &[], &[]);
1287 batch.filter_by_sequence(Some(10)).unwrap();
1288 assert!(batch.is_empty());
1289
1290 let mut batch = new_batch(&[], &[], &[], &[]);
1292 batch.filter_by_sequence(None).unwrap();
1293 assert!(batch.is_empty());
1294 }
1295
1296 #[test]
1297 fn test_filter() {
1298 let mut batch = new_batch(
1300 &[1, 2, 3, 4],
1301 &[11, 12, 13, 14],
1302 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1303 &[21, 22, 23, 24],
1304 );
1305 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1306 batch.filter(&predicate).unwrap();
1307 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1308 assert_eq!(expect, batch);
1309
1310 let mut batch = new_batch(
1312 &[1, 2, 3, 4],
1313 &[11, 12, 13, 14],
1314 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1315 &[21, 22, 23, 24],
1316 );
1317 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1318 batch.filter(&predicate).unwrap();
1319 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1320 assert_eq!(expect, batch);
1321
1322 let predicate = BooleanVector::from_vec(vec![false, false]);
1324 batch.filter(&predicate).unwrap();
1325 assert!(batch.is_empty());
1326 }
1327
1328 #[test]
1329 fn test_sort_and_dedup() {
1330 let original = new_batch(
1331 &[2, 3, 1, 4, 5, 2],
1332 &[1, 2, 3, 4, 5, 6],
1333 &[
1334 OpType::Put,
1335 OpType::Put,
1336 OpType::Put,
1337 OpType::Put,
1338 OpType::Put,
1339 OpType::Put,
1340 ],
1341 &[21, 22, 23, 24, 25, 26],
1342 );
1343
1344 let mut batch = original.clone();
1345 batch.sort(true).unwrap();
1346 assert_eq!(
1348 new_batch(
1349 &[1, 2, 3, 4, 5],
1350 &[3, 6, 2, 4, 5],
1351 &[
1352 OpType::Put,
1353 OpType::Put,
1354 OpType::Put,
1355 OpType::Put,
1356 OpType::Put,
1357 ],
1358 &[23, 26, 22, 24, 25],
1359 ),
1360 batch
1361 );
1362
1363 let mut batch = original.clone();
1364 batch.sort(false).unwrap();
1365
1366 assert_eq!(
1368 new_batch(
1369 &[1, 2, 2, 3, 4, 5],
1370 &[3, 6, 1, 2, 4, 5],
1371 &[
1372 OpType::Put,
1373 OpType::Put,
1374 OpType::Put,
1375 OpType::Put,
1376 OpType::Put,
1377 OpType::Put,
1378 ],
1379 &[23, 26, 21, 22, 24, 25],
1380 ),
1381 batch
1382 );
1383
1384 let original = new_batch(
1385 &[2, 2, 1],
1386 &[1, 6, 1],
1387 &[OpType::Delete, OpType::Put, OpType::Put],
1388 &[21, 22, 23],
1389 );
1390
1391 let mut batch = original.clone();
1392 batch.sort(true).unwrap();
1393 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1394 assert_eq!(expect, batch);
1395
1396 let mut batch = original.clone();
1397 batch.sort(false).unwrap();
1398 let expect = new_batch(
1399 &[1, 2, 2],
1400 &[1, 6, 1],
1401 &[OpType::Put, OpType::Put, OpType::Delete],
1402 &[23, 22, 21],
1403 );
1404 assert_eq!(expect, batch);
1405 }
1406
1407 #[test]
1408 fn test_get_value() {
1409 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1410
1411 for encoding in encodings {
1412 let codec = build_primary_key_codec_with_fields(
1413 encoding,
1414 [
1415 (
1416 ReservedColumnId::table_id(),
1417 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1418 ),
1419 (
1420 ReservedColumnId::tsid(),
1421 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1422 ),
1423 (
1424 100,
1425 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1426 ),
1427 (
1428 200,
1429 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1430 ),
1431 ]
1432 .into_iter(),
1433 );
1434
1435 let values = [
1436 Value::UInt32(1000),
1437 Value::UInt64(2000),
1438 Value::String("abcdefgh".into()),
1439 Value::String("zyxwvu".into()),
1440 ];
1441 let mut buf = vec![];
1442 codec
1443 .encode_values(
1444 &[
1445 (ReservedColumnId::table_id(), values[0].clone()),
1446 (ReservedColumnId::tsid(), values[1].clone()),
1447 (100, values[2].clone()),
1448 (200, values[3].clone()),
1449 ],
1450 &mut buf,
1451 )
1452 .unwrap();
1453
1454 let field_col_id = 2;
1455 let mut batch = new_batch_builder(
1456 &buf,
1457 &[1, 2, 3],
1458 &[1, 1, 1],
1459 &[OpType::Put, OpType::Put, OpType::Put],
1460 field_col_id,
1461 &[42, 43, 44],
1462 )
1463 .build()
1464 .unwrap();
1465
1466 let v = batch
1467 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1468 .unwrap()
1469 .unwrap();
1470 assert_eq!(values[0], *v);
1471
1472 let v = batch
1473 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1474 .unwrap()
1475 .unwrap();
1476 assert_eq!(values[1], *v);
1477
1478 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1479 assert_eq!(values[2], *v);
1480
1481 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1482 assert_eq!(values[3], *v);
1483
1484 let v = batch.field_col_value(field_col_id).unwrap();
1485 assert_eq!(v.data.get(0), Value::UInt64(42));
1486 assert_eq!(v.data.get(1), Value::UInt64(43));
1487 assert_eq!(v.data.get(2), Value::UInt64(44));
1488 }
1489 }
1490}