1pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod projection;
25pub(crate) mod prune;
26pub(crate) mod pruner;
27pub mod range;
28#[cfg(feature = "test")]
29pub mod range_cache;
30#[cfg(not(feature = "test"))]
31pub(crate) mod range_cache;
32pub mod scan_region;
33pub mod scan_util;
34pub(crate) mod seq_scan;
35pub mod series_scan;
36pub mod stream;
37pub(crate) mod unordered_scan;
38
39use std::collections::{HashMap, HashSet};
40use std::sync::Arc;
41use std::time::Duration;
42
43use api::v1::OpType;
44use async_trait::async_trait;
45use common_time::Timestamp;
46use datafusion_common::arrow::array::UInt8Array;
47use datatypes::arrow;
48use datatypes::arrow::array::{Array, ArrayRef};
49use datatypes::arrow::compute::SortOptions;
50use datatypes::arrow::record_batch::RecordBatch;
51use datatypes::arrow::row::{RowConverter, SortField};
52use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
53use datatypes::scalars::ScalarVectorBuilder;
54use datatypes::types::TimestampType;
55use datatypes::value::{Value, ValueRef};
56use datatypes::vectors::{
57 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
58 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
59 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
60 VectorRef,
61};
62use futures::TryStreamExt;
63use futures::stream::BoxStream;
64use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
65use snafu::{OptionExt, ResultExt, ensure};
66use store_api::metadata::RegionMetadata;
67use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
68
69use crate::error::{
70 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
71 Result,
72};
73use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
74use crate::read::prune::PruneReader;
75
76#[derive(Debug, PartialEq, Clone)]
81pub struct Batch {
82 primary_key: Vec<u8>,
84 pk_values: Option<CompositeValues>,
86 timestamps: VectorRef,
88 sequences: Arc<UInt64Vector>,
92 op_types: Arc<UInt8Vector>,
96 fields: Vec<BatchColumn>,
98 fields_idx: Option<HashMap<ColumnId, usize>>,
100}
101
102impl Batch {
103 pub fn new(
105 primary_key: Vec<u8>,
106 timestamps: VectorRef,
107 sequences: Arc<UInt64Vector>,
108 op_types: Arc<UInt8Vector>,
109 fields: Vec<BatchColumn>,
110 ) -> Result<Batch> {
111 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
112 .with_fields(fields)
113 .build()
114 }
115
116 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
118 Batch::new(
119 self.primary_key,
120 self.timestamps,
121 self.sequences,
122 self.op_types,
123 fields,
124 )
125 }
126
127 pub fn primary_key(&self) -> &[u8] {
129 &self.primary_key
130 }
131
132 pub fn pk_values(&self) -> Option<&CompositeValues> {
134 self.pk_values.as_ref()
135 }
136
137 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
139 self.pk_values = Some(pk_values);
140 }
141
142 #[cfg(any(test, feature = "test"))]
144 pub fn remove_pk_values(&mut self) {
145 self.pk_values = None;
146 }
147
148 pub fn fields(&self) -> &[BatchColumn] {
150 &self.fields
151 }
152
153 pub fn timestamps(&self) -> &VectorRef {
155 &self.timestamps
156 }
157
158 pub fn sequences(&self) -> &Arc<UInt64Vector> {
160 &self.sequences
161 }
162
163 pub fn op_types(&self) -> &Arc<UInt8Vector> {
165 &self.op_types
166 }
167
168 pub fn num_rows(&self) -> usize {
170 self.sequences.len()
173 }
174
175 #[allow(dead_code)]
177 pub(crate) fn empty() -> Self {
178 Self {
179 primary_key: vec![],
180 pk_values: None,
181 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
182 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
183 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
184 fields: vec![],
185 fields_idx: None,
186 }
187 }
188
189 pub fn is_empty(&self) -> bool {
191 self.num_rows() == 0
192 }
193
194 pub fn first_timestamp(&self) -> Option<Timestamp> {
196 if self.timestamps.is_empty() {
197 return None;
198 }
199
200 Some(self.get_timestamp(0))
201 }
202
203 pub fn last_timestamp(&self) -> Option<Timestamp> {
205 if self.timestamps.is_empty() {
206 return None;
207 }
208
209 Some(self.get_timestamp(self.timestamps.len() - 1))
210 }
211
212 pub fn first_sequence(&self) -> Option<SequenceNumber> {
214 if self.sequences.is_empty() {
215 return None;
216 }
217
218 Some(self.get_sequence(0))
219 }
220
221 pub fn last_sequence(&self) -> Option<SequenceNumber> {
223 if self.sequences.is_empty() {
224 return None;
225 }
226
227 Some(self.get_sequence(self.sequences.len() - 1))
228 }
229
230 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
235 self.primary_key = primary_key;
236 }
237
238 pub fn slice(&self, offset: usize, length: usize) -> Batch {
243 let fields = self
244 .fields
245 .iter()
246 .map(|column| BatchColumn {
247 column_id: column.column_id,
248 data: column.data.slice(offset, length),
249 })
250 .collect();
251 Batch {
253 primary_key: self.primary_key.clone(),
256 pk_values: self.pk_values.clone(),
257 timestamps: self.timestamps.slice(offset, length),
258 sequences: Arc::new(self.sequences.get_slice(offset, length)),
259 op_types: Arc::new(self.op_types.get_slice(offset, length)),
260 fields,
261 fields_idx: self.fields_idx.clone(),
262 }
263 }
264
265 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
269 ensure!(
270 !batches.is_empty(),
271 InvalidBatchSnafu {
272 reason: "empty batches",
273 }
274 );
275 if batches.len() == 1 {
276 return Ok(batches.pop().unwrap());
278 }
279
280 let primary_key = std::mem::take(&mut batches[0].primary_key);
281 let first = &batches[0];
282 ensure!(
284 batches
285 .iter()
286 .skip(1)
287 .all(|b| b.primary_key() == primary_key),
288 InvalidBatchSnafu {
289 reason: "batches have different primary key",
290 }
291 );
292 for b in batches.iter().skip(1) {
293 ensure!(
294 b.fields.len() == first.fields.len(),
295 InvalidBatchSnafu {
296 reason: "batches have different field num",
297 }
298 );
299 for (l, r) in b.fields.iter().zip(&first.fields) {
300 ensure!(
301 l.column_id == r.column_id,
302 InvalidBatchSnafu {
303 reason: "batches have different fields",
304 }
305 );
306 }
307 }
308
309 let mut builder = BatchBuilder::new(primary_key);
311 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
313 builder.timestamps_array(array)?;
314 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
315 builder.sequences_array(array)?;
316 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
317 builder.op_types_array(array)?;
318 for (i, batch_column) in first.fields.iter().enumerate() {
319 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
320 builder.push_field_array(batch_column.column_id, array)?;
321 }
322
323 builder.build()
324 }
325
326 pub fn filter_deleted(&mut self) -> Result<()> {
328 let array = self.op_types.as_arrow();
330 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
332 let predicate =
333 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
334 self.filter(&BooleanVector::from(predicate))
335 }
336
337 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
340 self.timestamps = self
341 .timestamps
342 .filter(predicate)
343 .context(ComputeVectorSnafu)?;
344 self.sequences = Arc::new(
345 UInt64Vector::try_from_arrow_array(
346 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
347 .context(ComputeArrowSnafu)?,
348 )
349 .unwrap(),
350 );
351 self.op_types = Arc::new(
352 UInt8Vector::try_from_arrow_array(
353 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
354 .context(ComputeArrowSnafu)?,
355 )
356 .unwrap(),
357 );
358 for batch_column in &mut self.fields {
359 batch_column.data = batch_column
360 .data
361 .filter(predicate)
362 .context(ComputeVectorSnafu)?;
363 }
364
365 Ok(())
366 }
367
368 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
370 let seq_range = match sequence {
371 None => return Ok(()),
372 Some(seq_range) => {
373 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
374 else {
375 return Ok(());
376 };
377 let is_subset = match seq_range {
378 SequenceRange::Gt { min } => min < first,
379 SequenceRange::LtEq { max } => max >= last,
380 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
381 };
382 if is_subset {
383 return Ok(());
384 }
385 seq_range
386 }
387 };
388
389 let seqs = self.sequences.as_arrow();
390 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
391
392 let predicate = BooleanVector::from(predicate);
393 self.filter(&predicate)?;
394
395 Ok(())
396 }
397
398 pub fn sort(&mut self, dedup: bool) -> Result<()> {
405 let converter = RowConverter::new(vec![
408 SortField::new(self.timestamps.data_type().as_arrow_type()),
409 SortField::new_with_options(
410 self.sequences.data_type().as_arrow_type(),
411 SortOptions {
412 descending: true,
413 ..Default::default()
414 },
415 ),
416 ])
417 .context(ComputeArrowSnafu)?;
418 let columns = [
420 self.timestamps.to_arrow_array(),
421 self.sequences.to_arrow_array(),
422 ];
423 let rows = converter.convert_columns(&columns).unwrap();
424 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
425
426 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
427 if !was_sorted {
428 to_sort.sort_unstable_by_key(|x| x.1);
429 }
430
431 let num_rows = to_sort.len();
432 if dedup {
433 to_sort.dedup_by(|left, right| {
435 debug_assert_eq!(18, left.1.as_ref().len());
436 debug_assert_eq!(18, right.1.as_ref().len());
437 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
438 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
440 });
441 }
442 let no_dedup = to_sort.len() == num_rows;
443
444 if was_sorted && no_dedup {
445 return Ok(());
446 }
447 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
448 self.take_in_place(&indices)
449 }
450
451 pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
459 let num_rows = self.num_rows();
460 if num_rows < 2 {
461 return Ok(());
462 }
463
464 let Some(timestamps) = self.timestamps_native() else {
465 return Ok(());
466 };
467
468 let mut has_dup = false;
470 let mut group_count = 1;
471 for i in 1..num_rows {
472 has_dup |= timestamps[i] == timestamps[i - 1];
473 group_count += (timestamps[i] != timestamps[i - 1]) as usize;
474 }
475 if !has_dup {
476 return Ok(());
477 }
478
479 let num_fields = self.fields.len();
480 let op_types = self.op_types.as_arrow().values();
481
482 let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
483 let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
484 .map(|_| Vec::with_capacity(group_count))
485 .collect();
486
487 let mut start = 0;
488 while start < num_rows {
489 let ts = timestamps[start];
490 let mut end = start + 1;
491 while end < num_rows && timestamps[end] == ts {
492 end += 1;
493 }
494
495 let group_pos = base_indices.len();
496 base_indices.push(start as u32);
497
498 if num_fields > 0 {
499 for idx in &mut field_indices {
501 idx.push(start as u32);
502 }
503
504 let base_deleted = op_types[start] == OpType::Delete as u8;
505 if !base_deleted {
506 let mut missing_fields = Vec::new();
509 for (field_idx, col) in self.fields.iter().enumerate() {
510 if col.data.is_null(start) {
511 missing_fields.push(field_idx);
512 }
513 }
514
515 if !missing_fields.is_empty() {
516 for row_idx in (start + 1)..end {
517 if op_types[row_idx] == OpType::Delete as u8 {
518 break;
519 }
520
521 missing_fields.retain(|&field_idx| {
522 if self.fields[field_idx].data.is_null(row_idx) {
523 true
524 } else {
525 field_indices[field_idx][group_pos] = row_idx as u32;
526 false
527 }
528 });
529
530 if missing_fields.is_empty() {
531 break;
532 }
533 }
534 }
535 }
536 }
537
538 start = end;
539 }
540
541 let base_indices = UInt32Vector::from_vec(base_indices);
542 self.timestamps = self
543 .timestamps
544 .take(&base_indices)
545 .context(ComputeVectorSnafu)?;
546 let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
547 .context(ComputeArrowSnafu)?;
548 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
550 let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
551 .context(ComputeArrowSnafu)?;
552 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
554
555 for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
556 let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
557 batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
558 }
559
560 Ok(())
561 }
562
563 pub fn memory_size(&self) -> usize {
565 let mut size = std::mem::size_of::<Self>();
566 size += self.primary_key.len();
567 size += self.timestamps.memory_size();
568 size += self.sequences.memory_size();
569 size += self.op_types.memory_size();
570 for batch_column in &self.fields {
571 size += batch_column.data.memory_size();
572 }
573 size
574 }
575
576 pub(crate) fn projected_fields(
578 metadata: &RegionMetadata,
579 projection: &[ColumnId],
580 ) -> Vec<(ColumnId, ConcreteDataType)> {
581 let projected_ids: HashSet<_> = projection.iter().copied().collect();
582 metadata
583 .field_columns()
584 .filter_map(|column| {
585 if projected_ids.contains(&column.column_id) {
586 Some((column.column_id, column.column_schema.data_type.clone()))
587 } else {
588 None
589 }
590 })
591 .collect()
592 }
593
594 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
596 if self.timestamps.is_empty() {
597 return None;
598 }
599
600 let values = match self.timestamps.data_type() {
601 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
602 .timestamps
603 .as_any()
604 .downcast_ref::<TimestampSecondVector>()
605 .unwrap()
606 .as_arrow()
607 .values(),
608 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
609 .timestamps
610 .as_any()
611 .downcast_ref::<TimestampMillisecondVector>()
612 .unwrap()
613 .as_arrow()
614 .values(),
615 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
616 .timestamps
617 .as_any()
618 .downcast_ref::<TimestampMicrosecondVector>()
619 .unwrap()
620 .as_arrow()
621 .values(),
622 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
623 .timestamps
624 .as_any()
625 .downcast_ref::<TimestampNanosecondVector>()
626 .unwrap()
627 .as_arrow()
628 .values(),
629 other => panic!("timestamps in a Batch has other type {:?}", other),
630 };
631
632 Some(values)
633 }
634
635 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
637 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
638 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
639 .context(ComputeArrowSnafu)?;
640 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
642 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
643 .context(ComputeArrowSnafu)?;
644 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
645 for batch_column in &mut self.fields {
646 batch_column.data = batch_column
647 .data
648 .take(indices)
649 .context(ComputeVectorSnafu)?;
650 }
651
652 Ok(())
653 }
654
655 fn get_timestamp(&self, index: usize) -> Timestamp {
660 match self.timestamps.get_ref(index) {
661 ValueRef::Timestamp(timestamp) => timestamp,
662
663 value => panic!("{:?} is not a timestamp", value),
665 }
666 }
667
668 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
673 self.sequences.get_data(index).unwrap()
675 }
676
677 #[cfg(debug_assertions)]
679 #[allow(dead_code)]
680 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
681 use std::cmp::Ordering;
682 if self.timestamps_native().is_none() {
683 return Ok(());
684 }
685
686 let timestamps = self.timestamps_native().unwrap();
687 let sequences = self.sequences.as_arrow().values();
688 for (i, window) in timestamps.windows(2).enumerate() {
689 let current = window[0];
690 let next = window[1];
691 let current_sequence = sequences[i];
692 let next_sequence = sequences[i + 1];
693 match current.cmp(&next) {
694 Ordering::Less => {
695 continue;
697 }
698 Ordering::Equal => {
699 if current_sequence < next_sequence {
701 return Err(format!(
702 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
703 current, next, current_sequence, next_sequence, i
704 ));
705 }
706 }
707 Ordering::Greater => {
708 return Err(format!(
710 "timestamps are not monotonic: {} > {}, index: {}",
711 current, next, i
712 ));
713 }
714 }
715 }
716
717 Ok(())
718 }
719
720 #[cfg(debug_assertions)]
722 #[allow(dead_code)]
723 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
724 if self.primary_key() < other.primary_key() {
726 return Ok(());
727 }
728 if self.primary_key() > other.primary_key() {
729 return Err(format!(
730 "primary key is not monotonic: {:?} > {:?}",
731 self.primary_key(),
732 other.primary_key()
733 ));
734 }
735 if self.last_timestamp() < other.first_timestamp() {
737 return Ok(());
738 }
739 if self.last_timestamp() > other.first_timestamp() {
740 return Err(format!(
741 "timestamps are not monotonic: {:?} > {:?}",
742 self.last_timestamp(),
743 other.first_timestamp()
744 ));
745 }
746 if self.last_sequence() >= other.first_sequence() {
748 return Ok(());
749 }
750 Err(format!(
751 "sequences are not monotonic: {:?} < {:?}",
752 self.last_sequence(),
753 other.first_sequence()
754 ))
755 }
756
757 pub fn pk_col_value(
761 &mut self,
762 codec: &dyn PrimaryKeyCodec,
763 col_idx_in_pk: usize,
764 column_id: ColumnId,
765 ) -> Result<Option<&Value>> {
766 if self.pk_values.is_none() {
767 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
768 }
769
770 let pk_values = self.pk_values.as_ref().unwrap();
771 Ok(match pk_values {
772 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
773 CompositeValues::Sparse(values) => values.get(&column_id),
774 })
775 }
776
777 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
781 if self.fields_idx.is_none() {
782 self.fields_idx = Some(
783 self.fields
784 .iter()
785 .enumerate()
786 .map(|(i, c)| (c.column_id, i))
787 .collect(),
788 );
789 }
790
791 self.fields_idx
792 .as_ref()
793 .unwrap()
794 .get(&column_id)
795 .map(|&idx| &self.fields[idx])
796 }
797}
798
799#[cfg(debug_assertions)]
801#[derive(Default)]
802#[allow(dead_code)]
803pub(crate) struct BatchChecker {
804 last_batch: Option<Batch>,
805 start: Option<Timestamp>,
806 end: Option<Timestamp>,
807}
808
809#[cfg(debug_assertions)]
810#[allow(dead_code)]
811impl BatchChecker {
812 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
814 self.start = start;
815 self
816 }
817
818 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
820 self.end = end;
821 self
822 }
823
824 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
827 batch.check_monotonic()?;
828
829 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
830 && start > first
831 {
832 return Err(format!(
833 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
834 start, first
835 ));
836 }
837 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
838 && end <= last
839 {
840 return Err(format!(
841 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
842 end, last
843 ));
844 }
845
846 let res = self
849 .last_batch
850 .as_ref()
851 .map(|last| last.check_next_batch(batch))
852 .unwrap_or(Ok(()));
853 self.last_batch = Some(batch.clone());
854 res
855 }
856
857 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
859 use std::fmt::Write;
860
861 let mut message = String::new();
862 if let Some(last) = &self.last_batch {
863 write!(
864 message,
865 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
866 last.primary_key(),
867 last.last_timestamp(),
868 last.last_sequence()
869 )
870 .unwrap();
871 }
872 write!(
873 message,
874 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
875 batch.primary_key(),
876 batch.timestamps(),
877 batch.sequences()
878 )
879 .unwrap();
880
881 message
882 }
883
884 pub(crate) fn ensure_part_range_batch(
886 &mut self,
887 scanner: &str,
888 region_id: store_api::storage::RegionId,
889 partition: usize,
890 part_range: store_api::region_engine::PartitionRange,
891 batch: &Batch,
892 ) {
893 if let Err(e) = self.check_monotonic(batch) {
894 let err_msg = format!(
895 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
896 scanner, e, region_id, partition, part_range,
897 );
898 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
899 panic!("{err_msg}, batch rows: {}", batch.num_rows());
901 }
902 }
903}
904
905const TIMESTAMP_KEY_LEN: usize = 9;
907
908fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
910 let arrays: Vec<_> = iter.collect();
911 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
912 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
913}
914
915#[derive(Debug, PartialEq, Eq, Clone)]
917pub struct BatchColumn {
918 pub column_id: ColumnId,
920 pub data: VectorRef,
922}
923
924pub struct BatchBuilder {
926 primary_key: Vec<u8>,
927 timestamps: Option<VectorRef>,
928 sequences: Option<Arc<UInt64Vector>>,
929 op_types: Option<Arc<UInt8Vector>>,
930 fields: Vec<BatchColumn>,
931}
932
933impl BatchBuilder {
934 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
936 BatchBuilder {
937 primary_key,
938 timestamps: None,
939 sequences: None,
940 op_types: None,
941 fields: Vec::new(),
942 }
943 }
944
945 pub fn with_required_columns(
947 primary_key: Vec<u8>,
948 timestamps: VectorRef,
949 sequences: Arc<UInt64Vector>,
950 op_types: Arc<UInt8Vector>,
951 ) -> BatchBuilder {
952 BatchBuilder {
953 primary_key,
954 timestamps: Some(timestamps),
955 sequences: Some(sequences),
956 op_types: Some(op_types),
957 fields: Vec::new(),
958 }
959 }
960
961 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
963 self.fields = fields;
964 self
965 }
966
967 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
969 self.fields.push(column);
970 self
971 }
972
973 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
975 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
976 self.fields.push(BatchColumn {
977 column_id,
978 data: vector,
979 });
980
981 Ok(self)
982 }
983
984 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
986 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
987 ensure!(
988 vector.data_type().is_timestamp(),
989 InvalidBatchSnafu {
990 reason: format!("{:?} is not a timestamp type", vector.data_type()),
991 }
992 );
993
994 self.timestamps = Some(vector);
995 Ok(self)
996 }
997
998 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1000 ensure!(
1001 *array.data_type() == arrow::datatypes::DataType::UInt64,
1002 InvalidBatchSnafu {
1003 reason: "sequence array is not UInt64 type",
1004 }
1005 );
1006 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1008 self.sequences = Some(vector);
1009
1010 Ok(self)
1011 }
1012
1013 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1015 ensure!(
1016 *array.data_type() == arrow::datatypes::DataType::UInt8,
1017 InvalidBatchSnafu {
1018 reason: "sequence array is not UInt8 type",
1019 }
1020 );
1021 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1023 self.op_types = Some(vector);
1024
1025 Ok(self)
1026 }
1027
1028 pub fn build(self) -> Result<Batch> {
1030 let timestamps = self.timestamps.context(InvalidBatchSnafu {
1031 reason: "missing timestamps",
1032 })?;
1033 let sequences = self.sequences.context(InvalidBatchSnafu {
1034 reason: "missing sequences",
1035 })?;
1036 let op_types = self.op_types.context(InvalidBatchSnafu {
1037 reason: "missing op_types",
1038 })?;
1039 assert_eq!(0, timestamps.null_count());
1042 assert_eq!(0, sequences.null_count());
1043 assert_eq!(0, op_types.null_count());
1044
1045 let ts_len = timestamps.len();
1046 ensure!(
1047 sequences.len() == ts_len,
1048 InvalidBatchSnafu {
1049 reason: format!(
1050 "sequence have different len {} != {}",
1051 sequences.len(),
1052 ts_len
1053 ),
1054 }
1055 );
1056 ensure!(
1057 op_types.len() == ts_len,
1058 InvalidBatchSnafu {
1059 reason: format!(
1060 "op type have different len {} != {}",
1061 op_types.len(),
1062 ts_len
1063 ),
1064 }
1065 );
1066 for column in &self.fields {
1067 ensure!(
1068 column.data.len() == ts_len,
1069 InvalidBatchSnafu {
1070 reason: format!(
1071 "column {} has different len {} != {}",
1072 column.column_id,
1073 column.data.len(),
1074 ts_len
1075 ),
1076 }
1077 );
1078 }
1079
1080 Ok(Batch {
1081 primary_key: self.primary_key,
1082 pk_values: None,
1083 timestamps,
1084 sequences,
1085 op_types,
1086 fields: self.fields,
1087 fields_idx: None,
1088 })
1089 }
1090}
1091
1092impl From<Batch> for BatchBuilder {
1093 fn from(batch: Batch) -> Self {
1094 Self {
1095 primary_key: batch.primary_key,
1096 timestamps: Some(batch.timestamps),
1097 sequences: Some(batch.sequences),
1098 op_types: Some(batch.op_types),
1099 fields: batch.fields,
1100 }
1101 }
1102}
1103
1104pub enum Source {
1108 Reader(BoxedBatchReader),
1110 Iter(BoxedBatchIterator),
1112 Stream(BoxedBatchStream),
1114 PruneReader(PruneReader),
1116}
1117
1118impl Source {
1119 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1121 match self {
1122 Source::Reader(reader) => reader.next_batch().await,
1123 Source::Iter(iter) => iter.next().transpose(),
1124 Source::Stream(stream) => stream.try_next().await,
1125 Source::PruneReader(reader) => reader.next_batch().await,
1126 }
1127 }
1128}
1129
1130pub enum FlatSource {
1132 Iter(BoxedRecordBatchIterator),
1134 Stream(BoxedRecordBatchStream),
1136}
1137
1138impl FlatSource {
1139 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1141 match self {
1142 FlatSource::Iter(iter) => iter.next().transpose(),
1143 FlatSource::Stream(stream) => stream.try_next().await,
1144 }
1145 }
1146}
1147
1148#[async_trait]
1152pub trait BatchReader: Send {
1153 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1161}
1162
1163pub type BoxedBatchReader = Box<dyn BatchReader>;
1165
1166pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1168
1169pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1171
1172#[async_trait::async_trait]
1173impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1174 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1175 (**self).next_batch().await
1176 }
1177}
1178
1179#[derive(Debug, Default)]
1181pub(crate) struct ScannerMetrics {
1182 scan_cost: Duration,
1184 yield_cost: Duration,
1186 num_batches: usize,
1188 num_rows: usize,
1190}
1191
1192#[cfg(test)]
1193mod tests {
1194 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1195 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1196 use store_api::codec::PrimaryKeyEncoding;
1197 use store_api::storage::consts::ReservedColumnId;
1198
1199 use super::*;
1200 use crate::error::Error;
1201 use crate::test_util::new_batch_builder;
1202
1203 fn new_batch(
1204 timestamps: &[i64],
1205 sequences: &[u64],
1206 op_types: &[OpType],
1207 field: &[u64],
1208 ) -> Batch {
1209 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1210 .build()
1211 .unwrap()
1212 }
1213
1214 fn new_batch_with_u64_fields(
1215 timestamps: &[i64],
1216 sequences: &[u64],
1217 op_types: &[OpType],
1218 fields: &[(ColumnId, &[Option<u64>])],
1219 ) -> Batch {
1220 assert_eq!(timestamps.len(), sequences.len());
1221 assert_eq!(timestamps.len(), op_types.len());
1222 for (_, values) in fields {
1223 assert_eq!(timestamps.len(), values.len());
1224 }
1225
1226 let mut builder = BatchBuilder::new(b"test".to_vec());
1227 builder
1228 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1229 timestamps.iter().copied(),
1230 )))
1231 .unwrap()
1232 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1233 sequences.iter().copied(),
1234 )))
1235 .unwrap()
1236 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1237 op_types.iter().map(|v| *v as u8),
1238 )))
1239 .unwrap();
1240
1241 for (col_id, values) in fields {
1242 builder
1243 .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1244 .unwrap();
1245 }
1246
1247 builder.build().unwrap()
1248 }
1249
1250 fn new_batch_without_fields(
1251 timestamps: &[i64],
1252 sequences: &[u64],
1253 op_types: &[OpType],
1254 ) -> Batch {
1255 assert_eq!(timestamps.len(), sequences.len());
1256 assert_eq!(timestamps.len(), op_types.len());
1257
1258 let mut builder = BatchBuilder::new(b"test".to_vec());
1259 builder
1260 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1261 timestamps.iter().copied(),
1262 )))
1263 .unwrap()
1264 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1265 sequences.iter().copied(),
1266 )))
1267 .unwrap()
1268 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1269 op_types.iter().map(|v| *v as u8),
1270 )))
1271 .unwrap();
1272
1273 builder.build().unwrap()
1274 }
1275
1276 #[test]
1277 fn test_empty_batch() {
1278 let batch = Batch::empty();
1279 assert!(batch.is_empty());
1280 assert_eq!(None, batch.first_timestamp());
1281 assert_eq!(None, batch.last_timestamp());
1282 assert_eq!(None, batch.first_sequence());
1283 assert_eq!(None, batch.last_sequence());
1284 assert!(batch.timestamps_native().is_none());
1285 }
1286
1287 #[test]
1288 fn test_first_last_one() {
1289 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1290 assert_eq!(
1291 Timestamp::new_millisecond(1),
1292 batch.first_timestamp().unwrap()
1293 );
1294 assert_eq!(
1295 Timestamp::new_millisecond(1),
1296 batch.last_timestamp().unwrap()
1297 );
1298 assert_eq!(2, batch.first_sequence().unwrap());
1299 assert_eq!(2, batch.last_sequence().unwrap());
1300 }
1301
1302 #[test]
1303 fn test_first_last_multiple() {
1304 let batch = new_batch(
1305 &[1, 2, 3],
1306 &[11, 12, 13],
1307 &[OpType::Put, OpType::Put, OpType::Put],
1308 &[21, 22, 23],
1309 );
1310 assert_eq!(
1311 Timestamp::new_millisecond(1),
1312 batch.first_timestamp().unwrap()
1313 );
1314 assert_eq!(
1315 Timestamp::new_millisecond(3),
1316 batch.last_timestamp().unwrap()
1317 );
1318 assert_eq!(11, batch.first_sequence().unwrap());
1319 assert_eq!(13, batch.last_sequence().unwrap());
1320 }
1321
1322 #[test]
1323 fn test_slice() {
1324 let batch = new_batch(
1325 &[1, 2, 3, 4],
1326 &[11, 12, 13, 14],
1327 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1328 &[21, 22, 23, 24],
1329 );
1330 let batch = batch.slice(1, 2);
1331 let expect = new_batch(
1332 &[2, 3],
1333 &[12, 13],
1334 &[OpType::Delete, OpType::Put],
1335 &[22, 23],
1336 );
1337 assert_eq!(expect, batch);
1338 }
1339
1340 #[test]
1341 fn test_timestamps_native() {
1342 let batch = new_batch(
1343 &[1, 2, 3, 4],
1344 &[11, 12, 13, 14],
1345 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1346 &[21, 22, 23, 24],
1347 );
1348 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1349 }
1350
1351 #[test]
1352 fn test_concat_empty() {
1353 let err = Batch::concat(vec![]).unwrap_err();
1354 assert!(
1355 matches!(err, Error::InvalidBatch { .. }),
1356 "unexpected err: {err}"
1357 );
1358 }
1359
1360 #[test]
1361 fn test_concat_one() {
1362 let batch = new_batch(&[], &[], &[], &[]);
1363 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1364 assert_eq!(batch, actual);
1365
1366 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1367 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1368 assert_eq!(batch, actual);
1369 }
1370
1371 #[test]
1372 fn test_concat_multiple() {
1373 let batches = vec![
1374 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1375 new_batch(
1376 &[3, 4, 5],
1377 &[13, 14, 15],
1378 &[OpType::Put, OpType::Delete, OpType::Put],
1379 &[23, 24, 25],
1380 ),
1381 new_batch(&[], &[], &[], &[]),
1382 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1383 ];
1384 let batch = Batch::concat(batches).unwrap();
1385 let expect = new_batch(
1386 &[1, 2, 3, 4, 5, 6],
1387 &[11, 12, 13, 14, 15, 16],
1388 &[
1389 OpType::Put,
1390 OpType::Put,
1391 OpType::Put,
1392 OpType::Delete,
1393 OpType::Put,
1394 OpType::Put,
1395 ],
1396 &[21, 22, 23, 24, 25, 26],
1397 );
1398 assert_eq!(expect, batch);
1399 }
1400
1401 #[test]
1402 fn test_concat_different() {
1403 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1404 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1405 batch2.primary_key = b"hello".to_vec();
1406 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1407 assert!(
1408 matches!(err, Error::InvalidBatch { .. }),
1409 "unexpected err: {err}"
1410 );
1411 }
1412
1413 #[test]
1414 fn test_concat_different_fields() {
1415 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1416 let fields = vec![
1417 batch1.fields()[0].clone(),
1418 BatchColumn {
1419 column_id: 2,
1420 data: Arc::new(UInt64Vector::from_slice([2])),
1421 },
1422 ];
1423 let batch2 = batch1.clone().with_fields(fields).unwrap();
1425 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1426 assert!(
1427 matches!(err, Error::InvalidBatch { .. }),
1428 "unexpected err: {err}"
1429 );
1430
1431 let fields = vec![BatchColumn {
1433 column_id: 2,
1434 data: Arc::new(UInt64Vector::from_slice([2])),
1435 }];
1436 let batch2 = batch1.clone().with_fields(fields).unwrap();
1437 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1438 assert!(
1439 matches!(err, Error::InvalidBatch { .. }),
1440 "unexpected err: {err}"
1441 );
1442 }
1443
1444 #[test]
1445 fn test_filter_deleted_empty() {
1446 let mut batch = new_batch(&[], &[], &[], &[]);
1447 batch.filter_deleted().unwrap();
1448 assert!(batch.is_empty());
1449 }
1450
1451 #[test]
1452 fn test_filter_deleted() {
1453 let mut batch = new_batch(
1454 &[1, 2, 3, 4],
1455 &[11, 12, 13, 14],
1456 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1457 &[21, 22, 23, 24],
1458 );
1459 batch.filter_deleted().unwrap();
1460 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1461 assert_eq!(expect, batch);
1462
1463 let mut batch = new_batch(
1464 &[1, 2, 3, 4],
1465 &[11, 12, 13, 14],
1466 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1467 &[21, 22, 23, 24],
1468 );
1469 let expect = batch.clone();
1470 batch.filter_deleted().unwrap();
1471 assert_eq!(expect, batch);
1472 }
1473
1474 #[test]
1475 fn test_filter_by_sequence() {
1476 let mut batch = new_batch(
1478 &[1, 2, 3, 4],
1479 &[11, 12, 13, 14],
1480 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1481 &[21, 22, 23, 24],
1482 );
1483 batch
1484 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1485 .unwrap();
1486 let expect = new_batch(
1487 &[1, 2, 3],
1488 &[11, 12, 13],
1489 &[OpType::Put, OpType::Put, OpType::Put],
1490 &[21, 22, 23],
1491 );
1492 assert_eq!(expect, batch);
1493
1494 let mut batch = new_batch(
1496 &[1, 2, 3, 4],
1497 &[11, 12, 13, 14],
1498 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1499 &[21, 22, 23, 24],
1500 );
1501
1502 batch
1503 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1504 .unwrap();
1505 assert!(batch.is_empty());
1506
1507 let mut batch = new_batch(
1509 &[1, 2, 3, 4],
1510 &[11, 12, 13, 14],
1511 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1512 &[21, 22, 23, 24],
1513 );
1514 let expect = batch.clone();
1515 batch.filter_by_sequence(None).unwrap();
1516 assert_eq!(expect, batch);
1517
1518 let mut batch = new_batch(&[], &[], &[], &[]);
1520 batch
1521 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1522 .unwrap();
1523 assert!(batch.is_empty());
1524
1525 let mut batch = new_batch(&[], &[], &[], &[]);
1527 batch.filter_by_sequence(None).unwrap();
1528 assert!(batch.is_empty());
1529
1530 let mut batch = new_batch(
1532 &[1, 2, 3, 4],
1533 &[11, 12, 13, 14],
1534 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1535 &[21, 22, 23, 24],
1536 );
1537 batch
1538 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1539 .unwrap();
1540 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1541 assert_eq!(expect, batch);
1542
1543 let mut batch = new_batch(
1545 &[1, 2, 3, 4],
1546 &[11, 12, 13, 14],
1547 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1548 &[21, 22, 23, 24],
1549 );
1550 batch
1551 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1552 .unwrap();
1553 assert!(batch.is_empty());
1554
1555 let mut batch = new_batch(
1557 &[1, 2, 3, 4, 5],
1558 &[11, 12, 13, 14, 15],
1559 &[
1560 OpType::Put,
1561 OpType::Put,
1562 OpType::Put,
1563 OpType::Put,
1564 OpType::Put,
1565 ],
1566 &[21, 22, 23, 24, 25],
1567 );
1568 batch
1569 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1570 .unwrap();
1571 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1572 assert_eq!(expect, batch);
1573
1574 let mut batch = new_batch(
1576 &[1, 2, 3, 4, 5],
1577 &[11, 12, 13, 14, 15],
1578 &[
1579 OpType::Put,
1580 OpType::Delete,
1581 OpType::Put,
1582 OpType::Delete,
1583 OpType::Put,
1584 ],
1585 &[21, 22, 23, 24, 25],
1586 );
1587 batch
1588 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1589 .unwrap();
1590 let expect = new_batch(
1591 &[2, 3],
1592 &[12, 13],
1593 &[OpType::Delete, OpType::Put],
1594 &[22, 23],
1595 );
1596 assert_eq!(expect, batch);
1597
1598 let mut batch = new_batch(
1600 &[1, 2, 3, 4],
1601 &[11, 12, 13, 14],
1602 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1603 &[21, 22, 23, 24],
1604 );
1605 batch
1606 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1607 .unwrap();
1608 assert!(batch.is_empty());
1609 }
1610
1611 #[test]
1612 fn test_merge_last_non_null_no_dup() {
1613 let mut batch = new_batch_with_u64_fields(
1614 &[1, 2],
1615 &[2, 1],
1616 &[OpType::Put, OpType::Put],
1617 &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1618 );
1619 let expect = batch.clone();
1620 batch.merge_last_non_null().unwrap();
1621 assert_eq!(expect, batch);
1622 }
1623
1624 #[test]
1625 fn test_merge_last_non_null_fill_null_fields() {
1626 let mut batch = new_batch_with_u64_fields(
1628 &[1, 1, 1],
1629 &[3, 2, 1],
1630 &[OpType::Put, OpType::Put, OpType::Put],
1631 &[
1632 (1, &[None, Some(10), Some(11)]),
1633 (2, &[Some(100), Some(200), Some(300)]),
1634 ],
1635 );
1636 batch.merge_last_non_null().unwrap();
1637
1638 let expect = new_batch_with_u64_fields(
1641 &[1],
1642 &[3],
1643 &[OpType::Put],
1644 &[(1, &[Some(10)]), (2, &[Some(100)])],
1645 );
1646 assert_eq!(expect, batch);
1647 }
1648
1649 #[test]
1650 fn test_merge_last_non_null_stop_at_delete_row() {
1651 let mut batch = new_batch_with_u64_fields(
1654 &[1, 1, 1],
1655 &[3, 2, 1],
1656 &[OpType::Put, OpType::Delete, OpType::Put],
1657 &[
1658 (1, &[None, Some(10), Some(11)]),
1659 (2, &[Some(100), Some(200), Some(300)]),
1660 ],
1661 );
1662 batch.merge_last_non_null().unwrap();
1663
1664 let expect = new_batch_with_u64_fields(
1665 &[1],
1666 &[3],
1667 &[OpType::Put],
1668 &[(1, &[None]), (2, &[Some(100)])],
1669 );
1670 assert_eq!(expect, batch);
1671 }
1672
1673 #[test]
1674 fn test_merge_last_non_null_base_delete_no_merge() {
1675 let mut batch = new_batch_with_u64_fields(
1676 &[1, 1],
1677 &[3, 2],
1678 &[OpType::Delete, OpType::Put],
1679 &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1680 );
1681 batch.merge_last_non_null().unwrap();
1682
1683 let expect =
1685 new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1686 assert_eq!(expect, batch);
1687 }
1688
1689 #[test]
1690 fn test_merge_last_non_null_multiple_timestamp_groups() {
1691 let mut batch = new_batch_with_u64_fields(
1692 &[1, 1, 2, 3, 3],
1693 &[5, 4, 3, 2, 1],
1694 &[
1695 OpType::Put,
1696 OpType::Put,
1697 OpType::Put,
1698 OpType::Put,
1699 OpType::Put,
1700 ],
1701 &[
1702 (1, &[None, Some(10), Some(20), None, Some(30)]),
1703 (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1704 ],
1705 );
1706 batch.merge_last_non_null().unwrap();
1707
1708 let expect = new_batch_with_u64_fields(
1709 &[1, 2, 3],
1710 &[5, 3, 2],
1711 &[OpType::Put, OpType::Put, OpType::Put],
1712 &[
1713 (1, &[Some(10), Some(20), Some(30)]),
1714 (2, &[Some(100), Some(120), Some(130)]),
1715 ],
1716 );
1717 assert_eq!(expect, batch);
1718 }
1719
1720 #[test]
1721 fn test_merge_last_non_null_no_fields() {
1722 let mut batch = new_batch_without_fields(
1723 &[1, 1, 2],
1724 &[3, 2, 1],
1725 &[OpType::Put, OpType::Put, OpType::Put],
1726 );
1727 batch.merge_last_non_null().unwrap();
1728
1729 let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1730 assert_eq!(expect, batch);
1731 }
1732
1733 #[test]
1734 fn test_filter() {
1735 let mut batch = new_batch(
1737 &[1, 2, 3, 4],
1738 &[11, 12, 13, 14],
1739 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1740 &[21, 22, 23, 24],
1741 );
1742 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1743 batch.filter(&predicate).unwrap();
1744 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1745 assert_eq!(expect, batch);
1746
1747 let mut batch = new_batch(
1749 &[1, 2, 3, 4],
1750 &[11, 12, 13, 14],
1751 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1752 &[21, 22, 23, 24],
1753 );
1754 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1755 batch.filter(&predicate).unwrap();
1756 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1757 assert_eq!(expect, batch);
1758
1759 let predicate = BooleanVector::from_vec(vec![false, false]);
1761 batch.filter(&predicate).unwrap();
1762 assert!(batch.is_empty());
1763 }
1764
1765 #[test]
1766 fn test_sort_and_dedup() {
1767 let original = new_batch(
1768 &[2, 3, 1, 4, 5, 2],
1769 &[1, 2, 3, 4, 5, 6],
1770 &[
1771 OpType::Put,
1772 OpType::Put,
1773 OpType::Put,
1774 OpType::Put,
1775 OpType::Put,
1776 OpType::Put,
1777 ],
1778 &[21, 22, 23, 24, 25, 26],
1779 );
1780
1781 let mut batch = original.clone();
1782 batch.sort(true).unwrap();
1783 assert_eq!(
1785 new_batch(
1786 &[1, 2, 3, 4, 5],
1787 &[3, 6, 2, 4, 5],
1788 &[
1789 OpType::Put,
1790 OpType::Put,
1791 OpType::Put,
1792 OpType::Put,
1793 OpType::Put,
1794 ],
1795 &[23, 26, 22, 24, 25],
1796 ),
1797 batch
1798 );
1799
1800 let mut batch = original.clone();
1801 batch.sort(false).unwrap();
1802
1803 assert_eq!(
1805 new_batch(
1806 &[1, 2, 2, 3, 4, 5],
1807 &[3, 6, 1, 2, 4, 5],
1808 &[
1809 OpType::Put,
1810 OpType::Put,
1811 OpType::Put,
1812 OpType::Put,
1813 OpType::Put,
1814 OpType::Put,
1815 ],
1816 &[23, 26, 21, 22, 24, 25],
1817 ),
1818 batch
1819 );
1820
1821 let original = new_batch(
1822 &[2, 2, 1],
1823 &[1, 6, 1],
1824 &[OpType::Delete, OpType::Put, OpType::Put],
1825 &[21, 22, 23],
1826 );
1827
1828 let mut batch = original.clone();
1829 batch.sort(true).unwrap();
1830 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1831 assert_eq!(expect, batch);
1832
1833 let mut batch = original.clone();
1834 batch.sort(false).unwrap();
1835 let expect = new_batch(
1836 &[1, 2, 2],
1837 &[1, 6, 1],
1838 &[OpType::Put, OpType::Put, OpType::Delete],
1839 &[23, 22, 21],
1840 );
1841 assert_eq!(expect, batch);
1842 }
1843
1844 #[test]
1845 fn test_get_value() {
1846 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1847
1848 for encoding in encodings {
1849 let codec = build_primary_key_codec_with_fields(
1850 encoding,
1851 [
1852 (
1853 ReservedColumnId::table_id(),
1854 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1855 ),
1856 (
1857 ReservedColumnId::tsid(),
1858 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1859 ),
1860 (
1861 100,
1862 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1863 ),
1864 (
1865 200,
1866 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1867 ),
1868 ]
1869 .into_iter(),
1870 );
1871
1872 let values = [
1873 Value::UInt32(1000),
1874 Value::UInt64(2000),
1875 Value::String("abcdefgh".into()),
1876 Value::String("zyxwvu".into()),
1877 ];
1878 let mut buf = vec![];
1879 codec
1880 .encode_values(
1881 &[
1882 (ReservedColumnId::table_id(), values[0].clone()),
1883 (ReservedColumnId::tsid(), values[1].clone()),
1884 (100, values[2].clone()),
1885 (200, values[3].clone()),
1886 ],
1887 &mut buf,
1888 )
1889 .unwrap();
1890
1891 let field_col_id = 2;
1892 let mut batch = new_batch_builder(
1893 &buf,
1894 &[1, 2, 3],
1895 &[1, 1, 1],
1896 &[OpType::Put, OpType::Put, OpType::Put],
1897 field_col_id,
1898 &[42, 43, 44],
1899 )
1900 .build()
1901 .unwrap();
1902
1903 let v = batch
1904 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1905 .unwrap()
1906 .unwrap();
1907 assert_eq!(values[0], *v);
1908
1909 let v = batch
1910 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1911 .unwrap()
1912 .unwrap();
1913 assert_eq!(values[1], *v);
1914
1915 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1916 assert_eq!(values[2], *v);
1917
1918 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1919 assert_eq!(values[3], *v);
1920
1921 let v = batch.field_col_value(field_col_id).unwrap();
1922 assert_eq!(v.data.get(0), Value::UInt64(42));
1923 assert_eq!(v.data.get(1), Value::UInt64(43));
1924 assert_eq!(v.data.get(2), Value::UInt64(44));
1925 }
1926 }
1927}