1pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod merge;
25pub mod plain_batch;
26pub mod projection;
27pub(crate) mod prune;
28pub(crate) mod pruner;
29pub mod range;
30pub(crate) mod range_cache;
31pub mod scan_region;
32pub mod scan_util;
33pub(crate) mod seq_scan;
34pub mod series_scan;
35pub mod stream;
36pub(crate) mod unordered_scan;
37
38use std::collections::{HashMap, HashSet};
39use std::sync::Arc;
40use std::time::Duration;
41
42use api::v1::OpType;
43use async_trait::async_trait;
44use common_time::Timestamp;
45use datafusion_common::arrow::array::UInt8Array;
46use datatypes::arrow;
47use datatypes::arrow::array::{Array, ArrayRef};
48use datatypes::arrow::compute::SortOptions;
49use datatypes::arrow::record_batch::RecordBatch;
50use datatypes::arrow::row::{RowConverter, SortField};
51use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
52use datatypes::scalars::ScalarVectorBuilder;
53use datatypes::types::TimestampType;
54use datatypes::value::{Value, ValueRef};
55use datatypes::vectors::{
56 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
57 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
58 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
59 VectorRef,
60};
61use futures::TryStreamExt;
62use futures::stream::BoxStream;
63use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metadata::RegionMetadata;
66use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
67
68use crate::error::{
69 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
70 Result,
71};
72use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
73use crate::read::prune::PruneReader;
74
75#[derive(Debug, PartialEq, Clone)]
80pub struct Batch {
81 primary_key: Vec<u8>,
83 pk_values: Option<CompositeValues>,
85 timestamps: VectorRef,
87 sequences: Arc<UInt64Vector>,
91 op_types: Arc<UInt8Vector>,
95 fields: Vec<BatchColumn>,
97 fields_idx: Option<HashMap<ColumnId, usize>>,
99}
100
101impl Batch {
102 pub fn new(
104 primary_key: Vec<u8>,
105 timestamps: VectorRef,
106 sequences: Arc<UInt64Vector>,
107 op_types: Arc<UInt8Vector>,
108 fields: Vec<BatchColumn>,
109 ) -> Result<Batch> {
110 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
111 .with_fields(fields)
112 .build()
113 }
114
115 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
117 Batch::new(
118 self.primary_key,
119 self.timestamps,
120 self.sequences,
121 self.op_types,
122 fields,
123 )
124 }
125
126 pub fn primary_key(&self) -> &[u8] {
128 &self.primary_key
129 }
130
131 pub fn pk_values(&self) -> Option<&CompositeValues> {
133 self.pk_values.as_ref()
134 }
135
136 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
138 self.pk_values = Some(pk_values);
139 }
140
141 #[cfg(any(test, feature = "test"))]
143 pub fn remove_pk_values(&mut self) {
144 self.pk_values = None;
145 }
146
147 pub fn fields(&self) -> &[BatchColumn] {
149 &self.fields
150 }
151
152 pub fn timestamps(&self) -> &VectorRef {
154 &self.timestamps
155 }
156
157 pub fn sequences(&self) -> &Arc<UInt64Vector> {
159 &self.sequences
160 }
161
162 pub fn op_types(&self) -> &Arc<UInt8Vector> {
164 &self.op_types
165 }
166
167 pub fn num_rows(&self) -> usize {
169 self.sequences.len()
172 }
173
174 pub(crate) fn empty() -> Self {
176 Self {
177 primary_key: vec![],
178 pk_values: None,
179 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
180 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
181 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
182 fields: vec![],
183 fields_idx: None,
184 }
185 }
186
187 pub fn is_empty(&self) -> bool {
189 self.num_rows() == 0
190 }
191
192 pub fn first_timestamp(&self) -> Option<Timestamp> {
194 if self.timestamps.is_empty() {
195 return None;
196 }
197
198 Some(self.get_timestamp(0))
199 }
200
201 pub fn last_timestamp(&self) -> Option<Timestamp> {
203 if self.timestamps.is_empty() {
204 return None;
205 }
206
207 Some(self.get_timestamp(self.timestamps.len() - 1))
208 }
209
210 pub fn first_sequence(&self) -> Option<SequenceNumber> {
212 if self.sequences.is_empty() {
213 return None;
214 }
215
216 Some(self.get_sequence(0))
217 }
218
219 pub fn last_sequence(&self) -> Option<SequenceNumber> {
221 if self.sequences.is_empty() {
222 return None;
223 }
224
225 Some(self.get_sequence(self.sequences.len() - 1))
226 }
227
228 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
233 self.primary_key = primary_key;
234 }
235
236 pub fn slice(&self, offset: usize, length: usize) -> Batch {
241 let fields = self
242 .fields
243 .iter()
244 .map(|column| BatchColumn {
245 column_id: column.column_id,
246 data: column.data.slice(offset, length),
247 })
248 .collect();
249 Batch {
251 primary_key: self.primary_key.clone(),
254 pk_values: self.pk_values.clone(),
255 timestamps: self.timestamps.slice(offset, length),
256 sequences: Arc::new(self.sequences.get_slice(offset, length)),
257 op_types: Arc::new(self.op_types.get_slice(offset, length)),
258 fields,
259 fields_idx: self.fields_idx.clone(),
260 }
261 }
262
263 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
267 ensure!(
268 !batches.is_empty(),
269 InvalidBatchSnafu {
270 reason: "empty batches",
271 }
272 );
273 if batches.len() == 1 {
274 return Ok(batches.pop().unwrap());
276 }
277
278 let primary_key = std::mem::take(&mut batches[0].primary_key);
279 let first = &batches[0];
280 ensure!(
282 batches
283 .iter()
284 .skip(1)
285 .all(|b| b.primary_key() == primary_key),
286 InvalidBatchSnafu {
287 reason: "batches have different primary key",
288 }
289 );
290 for b in batches.iter().skip(1) {
291 ensure!(
292 b.fields.len() == first.fields.len(),
293 InvalidBatchSnafu {
294 reason: "batches have different field num",
295 }
296 );
297 for (l, r) in b.fields.iter().zip(&first.fields) {
298 ensure!(
299 l.column_id == r.column_id,
300 InvalidBatchSnafu {
301 reason: "batches have different fields",
302 }
303 );
304 }
305 }
306
307 let mut builder = BatchBuilder::new(primary_key);
309 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
311 builder.timestamps_array(array)?;
312 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
313 builder.sequences_array(array)?;
314 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
315 builder.op_types_array(array)?;
316 for (i, batch_column) in first.fields.iter().enumerate() {
317 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
318 builder.push_field_array(batch_column.column_id, array)?;
319 }
320
321 builder.build()
322 }
323
324 pub fn filter_deleted(&mut self) -> Result<()> {
326 let array = self.op_types.as_arrow();
328 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
330 let predicate =
331 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
332 self.filter(&BooleanVector::from(predicate))
333 }
334
335 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
338 self.timestamps = self
339 .timestamps
340 .filter(predicate)
341 .context(ComputeVectorSnafu)?;
342 self.sequences = Arc::new(
343 UInt64Vector::try_from_arrow_array(
344 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
345 .context(ComputeArrowSnafu)?,
346 )
347 .unwrap(),
348 );
349 self.op_types = Arc::new(
350 UInt8Vector::try_from_arrow_array(
351 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
352 .context(ComputeArrowSnafu)?,
353 )
354 .unwrap(),
355 );
356 for batch_column in &mut self.fields {
357 batch_column.data = batch_column
358 .data
359 .filter(predicate)
360 .context(ComputeVectorSnafu)?;
361 }
362
363 Ok(())
364 }
365
366 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
368 let seq_range = match sequence {
369 None => return Ok(()),
370 Some(seq_range) => {
371 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
372 else {
373 return Ok(());
374 };
375 let is_subset = match seq_range {
376 SequenceRange::Gt { min } => min < first,
377 SequenceRange::LtEq { max } => max >= last,
378 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
379 };
380 if is_subset {
381 return Ok(());
382 }
383 seq_range
384 }
385 };
386
387 let seqs = self.sequences.as_arrow();
388 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
389
390 let predicate = BooleanVector::from(predicate);
391 self.filter(&predicate)?;
392
393 Ok(())
394 }
395
396 pub fn sort(&mut self, dedup: bool) -> Result<()> {
403 let converter = RowConverter::new(vec![
406 SortField::new(self.timestamps.data_type().as_arrow_type()),
407 SortField::new_with_options(
408 self.sequences.data_type().as_arrow_type(),
409 SortOptions {
410 descending: true,
411 ..Default::default()
412 },
413 ),
414 ])
415 .context(ComputeArrowSnafu)?;
416 let columns = [
418 self.timestamps.to_arrow_array(),
419 self.sequences.to_arrow_array(),
420 ];
421 let rows = converter.convert_columns(&columns).unwrap();
422 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
423
424 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
425 if !was_sorted {
426 to_sort.sort_unstable_by_key(|x| x.1);
427 }
428
429 let num_rows = to_sort.len();
430 if dedup {
431 to_sort.dedup_by(|left, right| {
433 debug_assert_eq!(18, left.1.as_ref().len());
434 debug_assert_eq!(18, right.1.as_ref().len());
435 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
436 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
438 });
439 }
440 let no_dedup = to_sort.len() == num_rows;
441
442 if was_sorted && no_dedup {
443 return Ok(());
444 }
445 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
446 self.take_in_place(&indices)
447 }
448
449 pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
457 let num_rows = self.num_rows();
458 if num_rows < 2 {
459 return Ok(());
460 }
461
462 let Some(timestamps) = self.timestamps_native() else {
463 return Ok(());
464 };
465
466 let mut has_dup = false;
468 let mut group_count = 1;
469 for i in 1..num_rows {
470 has_dup |= timestamps[i] == timestamps[i - 1];
471 group_count += (timestamps[i] != timestamps[i - 1]) as usize;
472 }
473 if !has_dup {
474 return Ok(());
475 }
476
477 let num_fields = self.fields.len();
478 let op_types = self.op_types.as_arrow().values();
479
480 let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
481 let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
482 .map(|_| Vec::with_capacity(group_count))
483 .collect();
484
485 let mut start = 0;
486 while start < num_rows {
487 let ts = timestamps[start];
488 let mut end = start + 1;
489 while end < num_rows && timestamps[end] == ts {
490 end += 1;
491 }
492
493 let group_pos = base_indices.len();
494 base_indices.push(start as u32);
495
496 if num_fields > 0 {
497 for idx in &mut field_indices {
499 idx.push(start as u32);
500 }
501
502 let base_deleted = op_types[start] == OpType::Delete as u8;
503 if !base_deleted {
504 let mut missing_fields = Vec::new();
507 for (field_idx, col) in self.fields.iter().enumerate() {
508 if col.data.is_null(start) {
509 missing_fields.push(field_idx);
510 }
511 }
512
513 if !missing_fields.is_empty() {
514 for row_idx in (start + 1)..end {
515 if op_types[row_idx] == OpType::Delete as u8 {
516 break;
517 }
518
519 missing_fields.retain(|&field_idx| {
520 if self.fields[field_idx].data.is_null(row_idx) {
521 true
522 } else {
523 field_indices[field_idx][group_pos] = row_idx as u32;
524 false
525 }
526 });
527
528 if missing_fields.is_empty() {
529 break;
530 }
531 }
532 }
533 }
534 }
535
536 start = end;
537 }
538
539 let base_indices = UInt32Vector::from_vec(base_indices);
540 self.timestamps = self
541 .timestamps
542 .take(&base_indices)
543 .context(ComputeVectorSnafu)?;
544 let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
545 .context(ComputeArrowSnafu)?;
546 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
548 let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
549 .context(ComputeArrowSnafu)?;
550 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
552
553 for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
554 let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
555 batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
556 }
557
558 Ok(())
559 }
560
561 pub fn memory_size(&self) -> usize {
563 let mut size = std::mem::size_of::<Self>();
564 size += self.primary_key.len();
565 size += self.timestamps.memory_size();
566 size += self.sequences.memory_size();
567 size += self.op_types.memory_size();
568 for batch_column in &self.fields {
569 size += batch_column.data.memory_size();
570 }
571 size
572 }
573
574 pub(crate) fn projected_fields(
576 metadata: &RegionMetadata,
577 projection: &[ColumnId],
578 ) -> Vec<(ColumnId, ConcreteDataType)> {
579 let projected_ids: HashSet<_> = projection.iter().copied().collect();
580 metadata
581 .field_columns()
582 .filter_map(|column| {
583 if projected_ids.contains(&column.column_id) {
584 Some((column.column_id, column.column_schema.data_type.clone()))
585 } else {
586 None
587 }
588 })
589 .collect()
590 }
591
592 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
594 if self.timestamps.is_empty() {
595 return None;
596 }
597
598 let values = match self.timestamps.data_type() {
599 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
600 .timestamps
601 .as_any()
602 .downcast_ref::<TimestampSecondVector>()
603 .unwrap()
604 .as_arrow()
605 .values(),
606 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
607 .timestamps
608 .as_any()
609 .downcast_ref::<TimestampMillisecondVector>()
610 .unwrap()
611 .as_arrow()
612 .values(),
613 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
614 .timestamps
615 .as_any()
616 .downcast_ref::<TimestampMicrosecondVector>()
617 .unwrap()
618 .as_arrow()
619 .values(),
620 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
621 .timestamps
622 .as_any()
623 .downcast_ref::<TimestampNanosecondVector>()
624 .unwrap()
625 .as_arrow()
626 .values(),
627 other => panic!("timestamps in a Batch has other type {:?}", other),
628 };
629
630 Some(values)
631 }
632
633 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
635 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
636 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
637 .context(ComputeArrowSnafu)?;
638 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
640 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
641 .context(ComputeArrowSnafu)?;
642 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
643 for batch_column in &mut self.fields {
644 batch_column.data = batch_column
645 .data
646 .take(indices)
647 .context(ComputeVectorSnafu)?;
648 }
649
650 Ok(())
651 }
652
653 fn get_timestamp(&self, index: usize) -> Timestamp {
658 match self.timestamps.get_ref(index) {
659 ValueRef::Timestamp(timestamp) => timestamp,
660
661 value => panic!("{:?} is not a timestamp", value),
663 }
664 }
665
666 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
671 self.sequences.get_data(index).unwrap()
673 }
674
675 #[cfg(debug_assertions)]
677 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
678 use std::cmp::Ordering;
679 if self.timestamps_native().is_none() {
680 return Ok(());
681 }
682
683 let timestamps = self.timestamps_native().unwrap();
684 let sequences = self.sequences.as_arrow().values();
685 for (i, window) in timestamps.windows(2).enumerate() {
686 let current = window[0];
687 let next = window[1];
688 let current_sequence = sequences[i];
689 let next_sequence = sequences[i + 1];
690 match current.cmp(&next) {
691 Ordering::Less => {
692 continue;
694 }
695 Ordering::Equal => {
696 if current_sequence < next_sequence {
698 return Err(format!(
699 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
700 current, next, current_sequence, next_sequence, i
701 ));
702 }
703 }
704 Ordering::Greater => {
705 return Err(format!(
707 "timestamps are not monotonic: {} > {}, index: {}",
708 current, next, i
709 ));
710 }
711 }
712 }
713
714 Ok(())
715 }
716
717 #[cfg(debug_assertions)]
719 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
720 if self.primary_key() < other.primary_key() {
722 return Ok(());
723 }
724 if self.primary_key() > other.primary_key() {
725 return Err(format!(
726 "primary key is not monotonic: {:?} > {:?}",
727 self.primary_key(),
728 other.primary_key()
729 ));
730 }
731 if self.last_timestamp() < other.first_timestamp() {
733 return Ok(());
734 }
735 if self.last_timestamp() > other.first_timestamp() {
736 return Err(format!(
737 "timestamps are not monotonic: {:?} > {:?}",
738 self.last_timestamp(),
739 other.first_timestamp()
740 ));
741 }
742 if self.last_sequence() >= other.first_sequence() {
744 return Ok(());
745 }
746 Err(format!(
747 "sequences are not monotonic: {:?} < {:?}",
748 self.last_sequence(),
749 other.first_sequence()
750 ))
751 }
752
753 pub fn pk_col_value(
757 &mut self,
758 codec: &dyn PrimaryKeyCodec,
759 col_idx_in_pk: usize,
760 column_id: ColumnId,
761 ) -> Result<Option<&Value>> {
762 if self.pk_values.is_none() {
763 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
764 }
765
766 let pk_values = self.pk_values.as_ref().unwrap();
767 Ok(match pk_values {
768 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
769 CompositeValues::Sparse(values) => values.get(&column_id),
770 })
771 }
772
773 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
777 if self.fields_idx.is_none() {
778 self.fields_idx = Some(
779 self.fields
780 .iter()
781 .enumerate()
782 .map(|(i, c)| (c.column_id, i))
783 .collect(),
784 );
785 }
786
787 self.fields_idx
788 .as_ref()
789 .unwrap()
790 .get(&column_id)
791 .map(|&idx| &self.fields[idx])
792 }
793}
794
795#[cfg(debug_assertions)]
797#[derive(Default)]
798pub(crate) struct BatchChecker {
799 last_batch: Option<Batch>,
800 start: Option<Timestamp>,
801 end: Option<Timestamp>,
802}
803
804#[cfg(debug_assertions)]
805impl BatchChecker {
806 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
808 self.start = start;
809 self
810 }
811
812 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
814 self.end = end;
815 self
816 }
817
818 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
821 batch.check_monotonic()?;
822
823 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
824 && start > first
825 {
826 return Err(format!(
827 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
828 start, first
829 ));
830 }
831 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
832 && end <= last
833 {
834 return Err(format!(
835 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
836 end, last
837 ));
838 }
839
840 let res = self
843 .last_batch
844 .as_ref()
845 .map(|last| last.check_next_batch(batch))
846 .unwrap_or(Ok(()));
847 self.last_batch = Some(batch.clone());
848 res
849 }
850
851 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
853 use std::fmt::Write;
854
855 let mut message = String::new();
856 if let Some(last) = &self.last_batch {
857 write!(
858 message,
859 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
860 last.primary_key(),
861 last.last_timestamp(),
862 last.last_sequence()
863 )
864 .unwrap();
865 }
866 write!(
867 message,
868 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
869 batch.primary_key(),
870 batch.timestamps(),
871 batch.sequences()
872 )
873 .unwrap();
874
875 message
876 }
877
878 pub(crate) fn ensure_part_range_batch(
880 &mut self,
881 scanner: &str,
882 region_id: store_api::storage::RegionId,
883 partition: usize,
884 part_range: store_api::region_engine::PartitionRange,
885 batch: &Batch,
886 ) {
887 if let Err(e) = self.check_monotonic(batch) {
888 let err_msg = format!(
889 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
890 scanner, e, region_id, partition, part_range,
891 );
892 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
893 panic!("{err_msg}, batch rows: {}", batch.num_rows());
895 }
896 }
897}
898
899const TIMESTAMP_KEY_LEN: usize = 9;
901
902fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
904 let arrays: Vec<_> = iter.collect();
905 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
906 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
907}
908
909#[derive(Debug, PartialEq, Eq, Clone)]
911pub struct BatchColumn {
912 pub column_id: ColumnId,
914 pub data: VectorRef,
916}
917
918pub struct BatchBuilder {
920 primary_key: Vec<u8>,
921 timestamps: Option<VectorRef>,
922 sequences: Option<Arc<UInt64Vector>>,
923 op_types: Option<Arc<UInt8Vector>>,
924 fields: Vec<BatchColumn>,
925}
926
927impl BatchBuilder {
928 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
930 BatchBuilder {
931 primary_key,
932 timestamps: None,
933 sequences: None,
934 op_types: None,
935 fields: Vec::new(),
936 }
937 }
938
939 pub fn with_required_columns(
941 primary_key: Vec<u8>,
942 timestamps: VectorRef,
943 sequences: Arc<UInt64Vector>,
944 op_types: Arc<UInt8Vector>,
945 ) -> BatchBuilder {
946 BatchBuilder {
947 primary_key,
948 timestamps: Some(timestamps),
949 sequences: Some(sequences),
950 op_types: Some(op_types),
951 fields: Vec::new(),
952 }
953 }
954
955 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
957 self.fields = fields;
958 self
959 }
960
961 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
963 self.fields.push(column);
964 self
965 }
966
967 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
969 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
970 self.fields.push(BatchColumn {
971 column_id,
972 data: vector,
973 });
974
975 Ok(self)
976 }
977
978 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
980 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
981 ensure!(
982 vector.data_type().is_timestamp(),
983 InvalidBatchSnafu {
984 reason: format!("{:?} is not a timestamp type", vector.data_type()),
985 }
986 );
987
988 self.timestamps = Some(vector);
989 Ok(self)
990 }
991
992 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
994 ensure!(
995 *array.data_type() == arrow::datatypes::DataType::UInt64,
996 InvalidBatchSnafu {
997 reason: "sequence array is not UInt64 type",
998 }
999 );
1000 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1002 self.sequences = Some(vector);
1003
1004 Ok(self)
1005 }
1006
1007 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1009 ensure!(
1010 *array.data_type() == arrow::datatypes::DataType::UInt8,
1011 InvalidBatchSnafu {
1012 reason: "sequence array is not UInt8 type",
1013 }
1014 );
1015 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1017 self.op_types = Some(vector);
1018
1019 Ok(self)
1020 }
1021
1022 pub fn build(self) -> Result<Batch> {
1024 let timestamps = self.timestamps.context(InvalidBatchSnafu {
1025 reason: "missing timestamps",
1026 })?;
1027 let sequences = self.sequences.context(InvalidBatchSnafu {
1028 reason: "missing sequences",
1029 })?;
1030 let op_types = self.op_types.context(InvalidBatchSnafu {
1031 reason: "missing op_types",
1032 })?;
1033 assert_eq!(0, timestamps.null_count());
1036 assert_eq!(0, sequences.null_count());
1037 assert_eq!(0, op_types.null_count());
1038
1039 let ts_len = timestamps.len();
1040 ensure!(
1041 sequences.len() == ts_len,
1042 InvalidBatchSnafu {
1043 reason: format!(
1044 "sequence have different len {} != {}",
1045 sequences.len(),
1046 ts_len
1047 ),
1048 }
1049 );
1050 ensure!(
1051 op_types.len() == ts_len,
1052 InvalidBatchSnafu {
1053 reason: format!(
1054 "op type have different len {} != {}",
1055 op_types.len(),
1056 ts_len
1057 ),
1058 }
1059 );
1060 for column in &self.fields {
1061 ensure!(
1062 column.data.len() == ts_len,
1063 InvalidBatchSnafu {
1064 reason: format!(
1065 "column {} has different len {} != {}",
1066 column.column_id,
1067 column.data.len(),
1068 ts_len
1069 ),
1070 }
1071 );
1072 }
1073
1074 Ok(Batch {
1075 primary_key: self.primary_key,
1076 pk_values: None,
1077 timestamps,
1078 sequences,
1079 op_types,
1080 fields: self.fields,
1081 fields_idx: None,
1082 })
1083 }
1084}
1085
1086impl From<Batch> for BatchBuilder {
1087 fn from(batch: Batch) -> Self {
1088 Self {
1089 primary_key: batch.primary_key,
1090 timestamps: Some(batch.timestamps),
1091 sequences: Some(batch.sequences),
1092 op_types: Some(batch.op_types),
1093 fields: batch.fields,
1094 }
1095 }
1096}
1097
1098pub enum Source {
1102 Reader(BoxedBatchReader),
1104 Iter(BoxedBatchIterator),
1106 Stream(BoxedBatchStream),
1108 PruneReader(PruneReader),
1110}
1111
1112impl Source {
1113 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1115 match self {
1116 Source::Reader(reader) => reader.next_batch().await,
1117 Source::Iter(iter) => iter.next().transpose(),
1118 Source::Stream(stream) => stream.try_next().await,
1119 Source::PruneReader(reader) => reader.next_batch().await,
1120 }
1121 }
1122}
1123
1124pub enum FlatSource {
1126 Iter(BoxedRecordBatchIterator),
1128 Stream(BoxedRecordBatchStream),
1130}
1131
1132impl FlatSource {
1133 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1135 match self {
1136 FlatSource::Iter(iter) => iter.next().transpose(),
1137 FlatSource::Stream(stream) => stream.try_next().await,
1138 }
1139 }
1140}
1141
1142#[async_trait]
1146pub trait BatchReader: Send {
1147 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1155}
1156
1157pub type BoxedBatchReader = Box<dyn BatchReader>;
1159
1160pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1162
1163pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1165
1166#[async_trait::async_trait]
1167impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1168 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1169 (**self).next_batch().await
1170 }
1171}
1172
1173#[derive(Debug, Default)]
1175pub(crate) struct ScannerMetrics {
1176 scan_cost: Duration,
1178 yield_cost: Duration,
1180 num_batches: usize,
1182 num_rows: usize,
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1189 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1190 use store_api::codec::PrimaryKeyEncoding;
1191 use store_api::storage::consts::ReservedColumnId;
1192
1193 use super::*;
1194 use crate::error::Error;
1195 use crate::test_util::new_batch_builder;
1196
1197 fn new_batch(
1198 timestamps: &[i64],
1199 sequences: &[u64],
1200 op_types: &[OpType],
1201 field: &[u64],
1202 ) -> Batch {
1203 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1204 .build()
1205 .unwrap()
1206 }
1207
1208 fn new_batch_with_u64_fields(
1209 timestamps: &[i64],
1210 sequences: &[u64],
1211 op_types: &[OpType],
1212 fields: &[(ColumnId, &[Option<u64>])],
1213 ) -> Batch {
1214 assert_eq!(timestamps.len(), sequences.len());
1215 assert_eq!(timestamps.len(), op_types.len());
1216 for (_, values) in fields {
1217 assert_eq!(timestamps.len(), values.len());
1218 }
1219
1220 let mut builder = BatchBuilder::new(b"test".to_vec());
1221 builder
1222 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1223 timestamps.iter().copied(),
1224 )))
1225 .unwrap()
1226 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1227 sequences.iter().copied(),
1228 )))
1229 .unwrap()
1230 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1231 op_types.iter().map(|v| *v as u8),
1232 )))
1233 .unwrap();
1234
1235 for (col_id, values) in fields {
1236 builder
1237 .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1238 .unwrap();
1239 }
1240
1241 builder.build().unwrap()
1242 }
1243
1244 fn new_batch_without_fields(
1245 timestamps: &[i64],
1246 sequences: &[u64],
1247 op_types: &[OpType],
1248 ) -> Batch {
1249 assert_eq!(timestamps.len(), sequences.len());
1250 assert_eq!(timestamps.len(), op_types.len());
1251
1252 let mut builder = BatchBuilder::new(b"test".to_vec());
1253 builder
1254 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1255 timestamps.iter().copied(),
1256 )))
1257 .unwrap()
1258 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1259 sequences.iter().copied(),
1260 )))
1261 .unwrap()
1262 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1263 op_types.iter().map(|v| *v as u8),
1264 )))
1265 .unwrap();
1266
1267 builder.build().unwrap()
1268 }
1269
1270 #[test]
1271 fn test_empty_batch() {
1272 let batch = Batch::empty();
1273 assert!(batch.is_empty());
1274 assert_eq!(None, batch.first_timestamp());
1275 assert_eq!(None, batch.last_timestamp());
1276 assert_eq!(None, batch.first_sequence());
1277 assert_eq!(None, batch.last_sequence());
1278 assert!(batch.timestamps_native().is_none());
1279 }
1280
1281 #[test]
1282 fn test_first_last_one() {
1283 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1284 assert_eq!(
1285 Timestamp::new_millisecond(1),
1286 batch.first_timestamp().unwrap()
1287 );
1288 assert_eq!(
1289 Timestamp::new_millisecond(1),
1290 batch.last_timestamp().unwrap()
1291 );
1292 assert_eq!(2, batch.first_sequence().unwrap());
1293 assert_eq!(2, batch.last_sequence().unwrap());
1294 }
1295
1296 #[test]
1297 fn test_first_last_multiple() {
1298 let batch = new_batch(
1299 &[1, 2, 3],
1300 &[11, 12, 13],
1301 &[OpType::Put, OpType::Put, OpType::Put],
1302 &[21, 22, 23],
1303 );
1304 assert_eq!(
1305 Timestamp::new_millisecond(1),
1306 batch.first_timestamp().unwrap()
1307 );
1308 assert_eq!(
1309 Timestamp::new_millisecond(3),
1310 batch.last_timestamp().unwrap()
1311 );
1312 assert_eq!(11, batch.first_sequence().unwrap());
1313 assert_eq!(13, batch.last_sequence().unwrap());
1314 }
1315
1316 #[test]
1317 fn test_slice() {
1318 let batch = new_batch(
1319 &[1, 2, 3, 4],
1320 &[11, 12, 13, 14],
1321 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1322 &[21, 22, 23, 24],
1323 );
1324 let batch = batch.slice(1, 2);
1325 let expect = new_batch(
1326 &[2, 3],
1327 &[12, 13],
1328 &[OpType::Delete, OpType::Put],
1329 &[22, 23],
1330 );
1331 assert_eq!(expect, batch);
1332 }
1333
1334 #[test]
1335 fn test_timestamps_native() {
1336 let batch = new_batch(
1337 &[1, 2, 3, 4],
1338 &[11, 12, 13, 14],
1339 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1340 &[21, 22, 23, 24],
1341 );
1342 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1343 }
1344
1345 #[test]
1346 fn test_concat_empty() {
1347 let err = Batch::concat(vec![]).unwrap_err();
1348 assert!(
1349 matches!(err, Error::InvalidBatch { .. }),
1350 "unexpected err: {err}"
1351 );
1352 }
1353
1354 #[test]
1355 fn test_concat_one() {
1356 let batch = new_batch(&[], &[], &[], &[]);
1357 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1358 assert_eq!(batch, actual);
1359
1360 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1361 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1362 assert_eq!(batch, actual);
1363 }
1364
1365 #[test]
1366 fn test_concat_multiple() {
1367 let batches = vec![
1368 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1369 new_batch(
1370 &[3, 4, 5],
1371 &[13, 14, 15],
1372 &[OpType::Put, OpType::Delete, OpType::Put],
1373 &[23, 24, 25],
1374 ),
1375 new_batch(&[], &[], &[], &[]),
1376 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1377 ];
1378 let batch = Batch::concat(batches).unwrap();
1379 let expect = new_batch(
1380 &[1, 2, 3, 4, 5, 6],
1381 &[11, 12, 13, 14, 15, 16],
1382 &[
1383 OpType::Put,
1384 OpType::Put,
1385 OpType::Put,
1386 OpType::Delete,
1387 OpType::Put,
1388 OpType::Put,
1389 ],
1390 &[21, 22, 23, 24, 25, 26],
1391 );
1392 assert_eq!(expect, batch);
1393 }
1394
1395 #[test]
1396 fn test_concat_different() {
1397 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1398 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1399 batch2.primary_key = b"hello".to_vec();
1400 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1401 assert!(
1402 matches!(err, Error::InvalidBatch { .. }),
1403 "unexpected err: {err}"
1404 );
1405 }
1406
1407 #[test]
1408 fn test_concat_different_fields() {
1409 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1410 let fields = vec![
1411 batch1.fields()[0].clone(),
1412 BatchColumn {
1413 column_id: 2,
1414 data: Arc::new(UInt64Vector::from_slice([2])),
1415 },
1416 ];
1417 let batch2 = batch1.clone().with_fields(fields).unwrap();
1419 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1420 assert!(
1421 matches!(err, Error::InvalidBatch { .. }),
1422 "unexpected err: {err}"
1423 );
1424
1425 let fields = vec![BatchColumn {
1427 column_id: 2,
1428 data: Arc::new(UInt64Vector::from_slice([2])),
1429 }];
1430 let batch2 = batch1.clone().with_fields(fields).unwrap();
1431 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1432 assert!(
1433 matches!(err, Error::InvalidBatch { .. }),
1434 "unexpected err: {err}"
1435 );
1436 }
1437
1438 #[test]
1439 fn test_filter_deleted_empty() {
1440 let mut batch = new_batch(&[], &[], &[], &[]);
1441 batch.filter_deleted().unwrap();
1442 assert!(batch.is_empty());
1443 }
1444
1445 #[test]
1446 fn test_filter_deleted() {
1447 let mut batch = new_batch(
1448 &[1, 2, 3, 4],
1449 &[11, 12, 13, 14],
1450 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1451 &[21, 22, 23, 24],
1452 );
1453 batch.filter_deleted().unwrap();
1454 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1455 assert_eq!(expect, batch);
1456
1457 let mut batch = new_batch(
1458 &[1, 2, 3, 4],
1459 &[11, 12, 13, 14],
1460 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1461 &[21, 22, 23, 24],
1462 );
1463 let expect = batch.clone();
1464 batch.filter_deleted().unwrap();
1465 assert_eq!(expect, batch);
1466 }
1467
1468 #[test]
1469 fn test_filter_by_sequence() {
1470 let mut batch = new_batch(
1472 &[1, 2, 3, 4],
1473 &[11, 12, 13, 14],
1474 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1475 &[21, 22, 23, 24],
1476 );
1477 batch
1478 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1479 .unwrap();
1480 let expect = new_batch(
1481 &[1, 2, 3],
1482 &[11, 12, 13],
1483 &[OpType::Put, OpType::Put, OpType::Put],
1484 &[21, 22, 23],
1485 );
1486 assert_eq!(expect, batch);
1487
1488 let mut batch = new_batch(
1490 &[1, 2, 3, 4],
1491 &[11, 12, 13, 14],
1492 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1493 &[21, 22, 23, 24],
1494 );
1495
1496 batch
1497 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1498 .unwrap();
1499 assert!(batch.is_empty());
1500
1501 let mut batch = new_batch(
1503 &[1, 2, 3, 4],
1504 &[11, 12, 13, 14],
1505 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1506 &[21, 22, 23, 24],
1507 );
1508 let expect = batch.clone();
1509 batch.filter_by_sequence(None).unwrap();
1510 assert_eq!(expect, batch);
1511
1512 let mut batch = new_batch(&[], &[], &[], &[]);
1514 batch
1515 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1516 .unwrap();
1517 assert!(batch.is_empty());
1518
1519 let mut batch = new_batch(&[], &[], &[], &[]);
1521 batch.filter_by_sequence(None).unwrap();
1522 assert!(batch.is_empty());
1523
1524 let mut batch = new_batch(
1526 &[1, 2, 3, 4],
1527 &[11, 12, 13, 14],
1528 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1529 &[21, 22, 23, 24],
1530 );
1531 batch
1532 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1533 .unwrap();
1534 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1535 assert_eq!(expect, batch);
1536
1537 let mut batch = new_batch(
1539 &[1, 2, 3, 4],
1540 &[11, 12, 13, 14],
1541 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1542 &[21, 22, 23, 24],
1543 );
1544 batch
1545 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1546 .unwrap();
1547 assert!(batch.is_empty());
1548
1549 let mut batch = new_batch(
1551 &[1, 2, 3, 4, 5],
1552 &[11, 12, 13, 14, 15],
1553 &[
1554 OpType::Put,
1555 OpType::Put,
1556 OpType::Put,
1557 OpType::Put,
1558 OpType::Put,
1559 ],
1560 &[21, 22, 23, 24, 25],
1561 );
1562 batch
1563 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1564 .unwrap();
1565 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1566 assert_eq!(expect, batch);
1567
1568 let mut batch = new_batch(
1570 &[1, 2, 3, 4, 5],
1571 &[11, 12, 13, 14, 15],
1572 &[
1573 OpType::Put,
1574 OpType::Delete,
1575 OpType::Put,
1576 OpType::Delete,
1577 OpType::Put,
1578 ],
1579 &[21, 22, 23, 24, 25],
1580 );
1581 batch
1582 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1583 .unwrap();
1584 let expect = new_batch(
1585 &[2, 3],
1586 &[12, 13],
1587 &[OpType::Delete, OpType::Put],
1588 &[22, 23],
1589 );
1590 assert_eq!(expect, batch);
1591
1592 let mut batch = new_batch(
1594 &[1, 2, 3, 4],
1595 &[11, 12, 13, 14],
1596 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1597 &[21, 22, 23, 24],
1598 );
1599 batch
1600 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1601 .unwrap();
1602 assert!(batch.is_empty());
1603 }
1604
1605 #[test]
1606 fn test_merge_last_non_null_no_dup() {
1607 let mut batch = new_batch_with_u64_fields(
1608 &[1, 2],
1609 &[2, 1],
1610 &[OpType::Put, OpType::Put],
1611 &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1612 );
1613 let expect = batch.clone();
1614 batch.merge_last_non_null().unwrap();
1615 assert_eq!(expect, batch);
1616 }
1617
1618 #[test]
1619 fn test_merge_last_non_null_fill_null_fields() {
1620 let mut batch = new_batch_with_u64_fields(
1622 &[1, 1, 1],
1623 &[3, 2, 1],
1624 &[OpType::Put, OpType::Put, OpType::Put],
1625 &[
1626 (1, &[None, Some(10), Some(11)]),
1627 (2, &[Some(100), Some(200), Some(300)]),
1628 ],
1629 );
1630 batch.merge_last_non_null().unwrap();
1631
1632 let expect = new_batch_with_u64_fields(
1635 &[1],
1636 &[3],
1637 &[OpType::Put],
1638 &[(1, &[Some(10)]), (2, &[Some(100)])],
1639 );
1640 assert_eq!(expect, batch);
1641 }
1642
1643 #[test]
1644 fn test_merge_last_non_null_stop_at_delete_row() {
1645 let mut batch = new_batch_with_u64_fields(
1648 &[1, 1, 1],
1649 &[3, 2, 1],
1650 &[OpType::Put, OpType::Delete, OpType::Put],
1651 &[
1652 (1, &[None, Some(10), Some(11)]),
1653 (2, &[Some(100), Some(200), Some(300)]),
1654 ],
1655 );
1656 batch.merge_last_non_null().unwrap();
1657
1658 let expect = new_batch_with_u64_fields(
1659 &[1],
1660 &[3],
1661 &[OpType::Put],
1662 &[(1, &[None]), (2, &[Some(100)])],
1663 );
1664 assert_eq!(expect, batch);
1665 }
1666
1667 #[test]
1668 fn test_merge_last_non_null_base_delete_no_merge() {
1669 let mut batch = new_batch_with_u64_fields(
1670 &[1, 1],
1671 &[3, 2],
1672 &[OpType::Delete, OpType::Put],
1673 &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1674 );
1675 batch.merge_last_non_null().unwrap();
1676
1677 let expect =
1679 new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1680 assert_eq!(expect, batch);
1681 }
1682
1683 #[test]
1684 fn test_merge_last_non_null_multiple_timestamp_groups() {
1685 let mut batch = new_batch_with_u64_fields(
1686 &[1, 1, 2, 3, 3],
1687 &[5, 4, 3, 2, 1],
1688 &[
1689 OpType::Put,
1690 OpType::Put,
1691 OpType::Put,
1692 OpType::Put,
1693 OpType::Put,
1694 ],
1695 &[
1696 (1, &[None, Some(10), Some(20), None, Some(30)]),
1697 (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1698 ],
1699 );
1700 batch.merge_last_non_null().unwrap();
1701
1702 let expect = new_batch_with_u64_fields(
1703 &[1, 2, 3],
1704 &[5, 3, 2],
1705 &[OpType::Put, OpType::Put, OpType::Put],
1706 &[
1707 (1, &[Some(10), Some(20), Some(30)]),
1708 (2, &[Some(100), Some(120), Some(130)]),
1709 ],
1710 );
1711 assert_eq!(expect, batch);
1712 }
1713
1714 #[test]
1715 fn test_merge_last_non_null_no_fields() {
1716 let mut batch = new_batch_without_fields(
1717 &[1, 1, 2],
1718 &[3, 2, 1],
1719 &[OpType::Put, OpType::Put, OpType::Put],
1720 );
1721 batch.merge_last_non_null().unwrap();
1722
1723 let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1724 assert_eq!(expect, batch);
1725 }
1726
1727 #[test]
1728 fn test_filter() {
1729 let mut batch = new_batch(
1731 &[1, 2, 3, 4],
1732 &[11, 12, 13, 14],
1733 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1734 &[21, 22, 23, 24],
1735 );
1736 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1737 batch.filter(&predicate).unwrap();
1738 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1739 assert_eq!(expect, batch);
1740
1741 let mut batch = new_batch(
1743 &[1, 2, 3, 4],
1744 &[11, 12, 13, 14],
1745 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1746 &[21, 22, 23, 24],
1747 );
1748 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1749 batch.filter(&predicate).unwrap();
1750 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1751 assert_eq!(expect, batch);
1752
1753 let predicate = BooleanVector::from_vec(vec![false, false]);
1755 batch.filter(&predicate).unwrap();
1756 assert!(batch.is_empty());
1757 }
1758
1759 #[test]
1760 fn test_sort_and_dedup() {
1761 let original = new_batch(
1762 &[2, 3, 1, 4, 5, 2],
1763 &[1, 2, 3, 4, 5, 6],
1764 &[
1765 OpType::Put,
1766 OpType::Put,
1767 OpType::Put,
1768 OpType::Put,
1769 OpType::Put,
1770 OpType::Put,
1771 ],
1772 &[21, 22, 23, 24, 25, 26],
1773 );
1774
1775 let mut batch = original.clone();
1776 batch.sort(true).unwrap();
1777 assert_eq!(
1779 new_batch(
1780 &[1, 2, 3, 4, 5],
1781 &[3, 6, 2, 4, 5],
1782 &[
1783 OpType::Put,
1784 OpType::Put,
1785 OpType::Put,
1786 OpType::Put,
1787 OpType::Put,
1788 ],
1789 &[23, 26, 22, 24, 25],
1790 ),
1791 batch
1792 );
1793
1794 let mut batch = original.clone();
1795 batch.sort(false).unwrap();
1796
1797 assert_eq!(
1799 new_batch(
1800 &[1, 2, 2, 3, 4, 5],
1801 &[3, 6, 1, 2, 4, 5],
1802 &[
1803 OpType::Put,
1804 OpType::Put,
1805 OpType::Put,
1806 OpType::Put,
1807 OpType::Put,
1808 OpType::Put,
1809 ],
1810 &[23, 26, 21, 22, 24, 25],
1811 ),
1812 batch
1813 );
1814
1815 let original = new_batch(
1816 &[2, 2, 1],
1817 &[1, 6, 1],
1818 &[OpType::Delete, OpType::Put, OpType::Put],
1819 &[21, 22, 23],
1820 );
1821
1822 let mut batch = original.clone();
1823 batch.sort(true).unwrap();
1824 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1825 assert_eq!(expect, batch);
1826
1827 let mut batch = original.clone();
1828 batch.sort(false).unwrap();
1829 let expect = new_batch(
1830 &[1, 2, 2],
1831 &[1, 6, 1],
1832 &[OpType::Put, OpType::Put, OpType::Delete],
1833 &[23, 22, 21],
1834 );
1835 assert_eq!(expect, batch);
1836 }
1837
1838 #[test]
1839 fn test_get_value() {
1840 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1841
1842 for encoding in encodings {
1843 let codec = build_primary_key_codec_with_fields(
1844 encoding,
1845 [
1846 (
1847 ReservedColumnId::table_id(),
1848 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1849 ),
1850 (
1851 ReservedColumnId::tsid(),
1852 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1853 ),
1854 (
1855 100,
1856 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1857 ),
1858 (
1859 200,
1860 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1861 ),
1862 ]
1863 .into_iter(),
1864 );
1865
1866 let values = [
1867 Value::UInt32(1000),
1868 Value::UInt64(2000),
1869 Value::String("abcdefgh".into()),
1870 Value::String("zyxwvu".into()),
1871 ];
1872 let mut buf = vec![];
1873 codec
1874 .encode_values(
1875 &[
1876 (ReservedColumnId::table_id(), values[0].clone()),
1877 (ReservedColumnId::tsid(), values[1].clone()),
1878 (100, values[2].clone()),
1879 (200, values[3].clone()),
1880 ],
1881 &mut buf,
1882 )
1883 .unwrap();
1884
1885 let field_col_id = 2;
1886 let mut batch = new_batch_builder(
1887 &buf,
1888 &[1, 2, 3],
1889 &[1, 1, 1],
1890 &[OpType::Put, OpType::Put, OpType::Put],
1891 field_col_id,
1892 &[42, 43, 44],
1893 )
1894 .build()
1895 .unwrap();
1896
1897 let v = batch
1898 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1899 .unwrap()
1900 .unwrap();
1901 assert_eq!(values[0], *v);
1902
1903 let v = batch
1904 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1905 .unwrap()
1906 .unwrap();
1907 assert_eq!(values[1], *v);
1908
1909 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1910 assert_eq!(values[2], *v);
1911
1912 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1913 assert_eq!(values[3], *v);
1914
1915 let v = batch.field_col_value(field_col_id).unwrap();
1916 assert_eq!(v.data.get(0), Value::UInt64(42));
1917 assert_eq!(v.data.get(1), Value::UInt64(43));
1918 assert_eq!(v.data.get(2), Value::UInt64(44));
1919 }
1920 }
1921}