1pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub mod range;
28pub mod scan_region;
29pub mod scan_util;
30pub(crate) mod seq_scan;
31pub mod series_scan;
32pub mod stream;
33pub(crate) mod unordered_scan;
34
35use std::collections::{HashMap, HashSet};
36use std::sync::Arc;
37use std::time::Duration;
38
39use api::v1::OpType;
40use async_trait::async_trait;
41use common_time::Timestamp;
42use datafusion_common::arrow::array::UInt8Array;
43use datatypes::arrow;
44use datatypes::arrow::array::{Array, ArrayRef};
45use datatypes::arrow::compute::SortOptions;
46use datatypes::arrow::record_batch::RecordBatch;
47use datatypes::arrow::row::{RowConverter, SortField};
48use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
49use datatypes::scalars::ScalarVectorBuilder;
50use datatypes::types::TimestampType;
51use datatypes::value::{Value, ValueRef};
52use datatypes::vectors::{
53 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
54 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
55 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
56 VectorRef,
57};
58use futures::TryStreamExt;
59use futures::stream::BoxStream;
60use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
61use snafu::{OptionExt, ResultExt, ensure};
62use store_api::metadata::RegionMetadata;
63use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
64
65use crate::error::{
66 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
67 Result,
68};
69use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
70use crate::read::prune::PruneReader;
71
72#[derive(Debug, PartialEq, Clone)]
77pub struct Batch {
78 primary_key: Vec<u8>,
80 pk_values: Option<CompositeValues>,
82 timestamps: VectorRef,
84 sequences: Arc<UInt64Vector>,
88 op_types: Arc<UInt8Vector>,
92 fields: Vec<BatchColumn>,
94 fields_idx: Option<HashMap<ColumnId, usize>>,
96}
97
98impl Batch {
99 pub fn new(
101 primary_key: Vec<u8>,
102 timestamps: VectorRef,
103 sequences: Arc<UInt64Vector>,
104 op_types: Arc<UInt8Vector>,
105 fields: Vec<BatchColumn>,
106 ) -> Result<Batch> {
107 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
108 .with_fields(fields)
109 .build()
110 }
111
112 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
114 Batch::new(
115 self.primary_key,
116 self.timestamps,
117 self.sequences,
118 self.op_types,
119 fields,
120 )
121 }
122
123 pub fn primary_key(&self) -> &[u8] {
125 &self.primary_key
126 }
127
128 pub fn pk_values(&self) -> Option<&CompositeValues> {
130 self.pk_values.as_ref()
131 }
132
133 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
135 self.pk_values = Some(pk_values);
136 }
137
138 #[cfg(any(test, feature = "test"))]
140 pub fn remove_pk_values(&mut self) {
141 self.pk_values = None;
142 }
143
144 pub fn fields(&self) -> &[BatchColumn] {
146 &self.fields
147 }
148
149 pub fn timestamps(&self) -> &VectorRef {
151 &self.timestamps
152 }
153
154 pub fn sequences(&self) -> &Arc<UInt64Vector> {
156 &self.sequences
157 }
158
159 pub fn op_types(&self) -> &Arc<UInt8Vector> {
161 &self.op_types
162 }
163
164 pub fn num_rows(&self) -> usize {
166 self.sequences.len()
169 }
170
171 pub(crate) fn empty() -> Self {
173 Self {
174 primary_key: vec![],
175 pk_values: None,
176 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
177 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
178 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
179 fields: vec![],
180 fields_idx: None,
181 }
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.num_rows() == 0
187 }
188
189 pub fn first_timestamp(&self) -> Option<Timestamp> {
191 if self.timestamps.is_empty() {
192 return None;
193 }
194
195 Some(self.get_timestamp(0))
196 }
197
198 pub fn last_timestamp(&self) -> Option<Timestamp> {
200 if self.timestamps.is_empty() {
201 return None;
202 }
203
204 Some(self.get_timestamp(self.timestamps.len() - 1))
205 }
206
207 pub fn first_sequence(&self) -> Option<SequenceNumber> {
209 if self.sequences.is_empty() {
210 return None;
211 }
212
213 Some(self.get_sequence(0))
214 }
215
216 pub fn last_sequence(&self) -> Option<SequenceNumber> {
218 if self.sequences.is_empty() {
219 return None;
220 }
221
222 Some(self.get_sequence(self.sequences.len() - 1))
223 }
224
225 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
230 self.primary_key = primary_key;
231 }
232
233 pub fn slice(&self, offset: usize, length: usize) -> Batch {
238 let fields = self
239 .fields
240 .iter()
241 .map(|column| BatchColumn {
242 column_id: column.column_id,
243 data: column.data.slice(offset, length),
244 })
245 .collect();
246 Batch {
248 primary_key: self.primary_key.clone(),
251 pk_values: self.pk_values.clone(),
252 timestamps: self.timestamps.slice(offset, length),
253 sequences: Arc::new(self.sequences.get_slice(offset, length)),
254 op_types: Arc::new(self.op_types.get_slice(offset, length)),
255 fields,
256 fields_idx: self.fields_idx.clone(),
257 }
258 }
259
260 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
264 ensure!(
265 !batches.is_empty(),
266 InvalidBatchSnafu {
267 reason: "empty batches",
268 }
269 );
270 if batches.len() == 1 {
271 return Ok(batches.pop().unwrap());
273 }
274
275 let primary_key = std::mem::take(&mut batches[0].primary_key);
276 let first = &batches[0];
277 ensure!(
279 batches
280 .iter()
281 .skip(1)
282 .all(|b| b.primary_key() == primary_key),
283 InvalidBatchSnafu {
284 reason: "batches have different primary key",
285 }
286 );
287 for b in batches.iter().skip(1) {
288 ensure!(
289 b.fields.len() == first.fields.len(),
290 InvalidBatchSnafu {
291 reason: "batches have different field num",
292 }
293 );
294 for (l, r) in b.fields.iter().zip(&first.fields) {
295 ensure!(
296 l.column_id == r.column_id,
297 InvalidBatchSnafu {
298 reason: "batches have different fields",
299 }
300 );
301 }
302 }
303
304 let mut builder = BatchBuilder::new(primary_key);
306 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
308 builder.timestamps_array(array)?;
309 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
310 builder.sequences_array(array)?;
311 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
312 builder.op_types_array(array)?;
313 for (i, batch_column) in first.fields.iter().enumerate() {
314 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
315 builder.push_field_array(batch_column.column_id, array)?;
316 }
317
318 builder.build()
319 }
320
321 pub fn filter_deleted(&mut self) -> Result<()> {
323 let array = self.op_types.as_arrow();
325 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
327 let predicate =
328 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
329 self.filter(&BooleanVector::from(predicate))
330 }
331
332 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
335 self.timestamps = self
336 .timestamps
337 .filter(predicate)
338 .context(ComputeVectorSnafu)?;
339 self.sequences = Arc::new(
340 UInt64Vector::try_from_arrow_array(
341 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
342 .context(ComputeArrowSnafu)?,
343 )
344 .unwrap(),
345 );
346 self.op_types = Arc::new(
347 UInt8Vector::try_from_arrow_array(
348 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
349 .context(ComputeArrowSnafu)?,
350 )
351 .unwrap(),
352 );
353 for batch_column in &mut self.fields {
354 batch_column.data = batch_column
355 .data
356 .filter(predicate)
357 .context(ComputeVectorSnafu)?;
358 }
359
360 Ok(())
361 }
362
363 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
365 let seq_range = match sequence {
366 None => return Ok(()),
367 Some(seq_range) => {
368 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
369 else {
370 return Ok(());
371 };
372 let is_subset = match seq_range {
373 SequenceRange::Gt { min } => min < first,
374 SequenceRange::LtEq { max } => max >= last,
375 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
376 };
377 if is_subset {
378 return Ok(());
379 }
380 seq_range
381 }
382 };
383
384 let seqs = self.sequences.as_arrow();
385 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
386
387 let predicate = BooleanVector::from(predicate);
388 self.filter(&predicate)?;
389
390 Ok(())
391 }
392
393 pub fn sort(&mut self, dedup: bool) -> Result<()> {
400 let converter = RowConverter::new(vec![
403 SortField::new(self.timestamps.data_type().as_arrow_type()),
404 SortField::new_with_options(
405 self.sequences.data_type().as_arrow_type(),
406 SortOptions {
407 descending: true,
408 ..Default::default()
409 },
410 ),
411 ])
412 .context(ComputeArrowSnafu)?;
413 let columns = [
415 self.timestamps.to_arrow_array(),
416 self.sequences.to_arrow_array(),
417 ];
418 let rows = converter.convert_columns(&columns).unwrap();
419 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
420
421 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
422 if !was_sorted {
423 to_sort.sort_unstable_by_key(|x| x.1);
424 }
425
426 let num_rows = to_sort.len();
427 if dedup {
428 to_sort.dedup_by(|left, right| {
430 debug_assert_eq!(18, left.1.as_ref().len());
431 debug_assert_eq!(18, right.1.as_ref().len());
432 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
433 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
435 });
436 }
437 let no_dedup = to_sort.len() == num_rows;
438
439 if was_sorted && no_dedup {
440 return Ok(());
441 }
442 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
443 self.take_in_place(&indices)
444 }
445
446 pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
454 let num_rows = self.num_rows();
455 if num_rows < 2 {
456 return Ok(());
457 }
458
459 let Some(timestamps) = self.timestamps_native() else {
460 return Ok(());
461 };
462
463 let mut has_dup = false;
465 let mut group_count = 1;
466 for i in 1..num_rows {
467 has_dup |= timestamps[i] == timestamps[i - 1];
468 group_count += (timestamps[i] != timestamps[i - 1]) as usize;
469 }
470 if !has_dup {
471 return Ok(());
472 }
473
474 let num_fields = self.fields.len();
475 let op_types = self.op_types.as_arrow().values();
476
477 let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
478 let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
479 .map(|_| Vec::with_capacity(group_count))
480 .collect();
481
482 let mut start = 0;
483 while start < num_rows {
484 let ts = timestamps[start];
485 let mut end = start + 1;
486 while end < num_rows && timestamps[end] == ts {
487 end += 1;
488 }
489
490 let group_pos = base_indices.len();
491 base_indices.push(start as u32);
492
493 if num_fields > 0 {
494 for idx in &mut field_indices {
496 idx.push(start as u32);
497 }
498
499 let base_deleted = op_types[start] == OpType::Delete as u8;
500 if !base_deleted {
501 let mut missing_fields = Vec::new();
504 for (field_idx, col) in self.fields.iter().enumerate() {
505 if col.data.is_null(start) {
506 missing_fields.push(field_idx);
507 }
508 }
509
510 if !missing_fields.is_empty() {
511 for row_idx in (start + 1)..end {
512 if op_types[row_idx] == OpType::Delete as u8 {
513 break;
514 }
515
516 missing_fields.retain(|&field_idx| {
517 if self.fields[field_idx].data.is_null(row_idx) {
518 true
519 } else {
520 field_indices[field_idx][group_pos] = row_idx as u32;
521 false
522 }
523 });
524
525 if missing_fields.is_empty() {
526 break;
527 }
528 }
529 }
530 }
531 }
532
533 start = end;
534 }
535
536 let base_indices = UInt32Vector::from_vec(base_indices);
537 self.timestamps = self
538 .timestamps
539 .take(&base_indices)
540 .context(ComputeVectorSnafu)?;
541 let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
542 .context(ComputeArrowSnafu)?;
543 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
545 let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
546 .context(ComputeArrowSnafu)?;
547 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
549
550 for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
551 let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
552 batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
553 }
554
555 Ok(())
556 }
557
558 pub fn memory_size(&self) -> usize {
560 let mut size = std::mem::size_of::<Self>();
561 size += self.primary_key.len();
562 size += self.timestamps.memory_size();
563 size += self.sequences.memory_size();
564 size += self.op_types.memory_size();
565 for batch_column in &self.fields {
566 size += batch_column.data.memory_size();
567 }
568 size
569 }
570
571 pub(crate) fn projected_fields(
573 metadata: &RegionMetadata,
574 projection: &[ColumnId],
575 ) -> Vec<(ColumnId, ConcreteDataType)> {
576 let projected_ids: HashSet<_> = projection.iter().copied().collect();
577 metadata
578 .field_columns()
579 .filter_map(|column| {
580 if projected_ids.contains(&column.column_id) {
581 Some((column.column_id, column.column_schema.data_type.clone()))
582 } else {
583 None
584 }
585 })
586 .collect()
587 }
588
589 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
591 if self.timestamps.is_empty() {
592 return None;
593 }
594
595 let values = match self.timestamps.data_type() {
596 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
597 .timestamps
598 .as_any()
599 .downcast_ref::<TimestampSecondVector>()
600 .unwrap()
601 .as_arrow()
602 .values(),
603 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
604 .timestamps
605 .as_any()
606 .downcast_ref::<TimestampMillisecondVector>()
607 .unwrap()
608 .as_arrow()
609 .values(),
610 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
611 .timestamps
612 .as_any()
613 .downcast_ref::<TimestampMicrosecondVector>()
614 .unwrap()
615 .as_arrow()
616 .values(),
617 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
618 .timestamps
619 .as_any()
620 .downcast_ref::<TimestampNanosecondVector>()
621 .unwrap()
622 .as_arrow()
623 .values(),
624 other => panic!("timestamps in a Batch has other type {:?}", other),
625 };
626
627 Some(values)
628 }
629
630 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
632 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
633 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
634 .context(ComputeArrowSnafu)?;
635 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
637 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
638 .context(ComputeArrowSnafu)?;
639 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
640 for batch_column in &mut self.fields {
641 batch_column.data = batch_column
642 .data
643 .take(indices)
644 .context(ComputeVectorSnafu)?;
645 }
646
647 Ok(())
648 }
649
650 fn get_timestamp(&self, index: usize) -> Timestamp {
655 match self.timestamps.get_ref(index) {
656 ValueRef::Timestamp(timestamp) => timestamp,
657
658 value => panic!("{:?} is not a timestamp", value),
660 }
661 }
662
663 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
668 self.sequences.get_data(index).unwrap()
670 }
671
672 #[cfg(debug_assertions)]
674 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
675 use std::cmp::Ordering;
676 if self.timestamps_native().is_none() {
677 return Ok(());
678 }
679
680 let timestamps = self.timestamps_native().unwrap();
681 let sequences = self.sequences.as_arrow().values();
682 for (i, window) in timestamps.windows(2).enumerate() {
683 let current = window[0];
684 let next = window[1];
685 let current_sequence = sequences[i];
686 let next_sequence = sequences[i + 1];
687 match current.cmp(&next) {
688 Ordering::Less => {
689 continue;
691 }
692 Ordering::Equal => {
693 if current_sequence < next_sequence {
695 return Err(format!(
696 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
697 current, next, current_sequence, next_sequence, i
698 ));
699 }
700 }
701 Ordering::Greater => {
702 return Err(format!(
704 "timestamps are not monotonic: {} > {}, index: {}",
705 current, next, i
706 ));
707 }
708 }
709 }
710
711 Ok(())
712 }
713
714 #[cfg(debug_assertions)]
716 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
717 if self.primary_key() < other.primary_key() {
719 return Ok(());
720 }
721 if self.primary_key() > other.primary_key() {
722 return Err(format!(
723 "primary key is not monotonic: {:?} > {:?}",
724 self.primary_key(),
725 other.primary_key()
726 ));
727 }
728 if self.last_timestamp() < other.first_timestamp() {
730 return Ok(());
731 }
732 if self.last_timestamp() > other.first_timestamp() {
733 return Err(format!(
734 "timestamps are not monotonic: {:?} > {:?}",
735 self.last_timestamp(),
736 other.first_timestamp()
737 ));
738 }
739 if self.last_sequence() >= other.first_sequence() {
741 return Ok(());
742 }
743 Err(format!(
744 "sequences are not monotonic: {:?} < {:?}",
745 self.last_sequence(),
746 other.first_sequence()
747 ))
748 }
749
750 pub fn pk_col_value(
754 &mut self,
755 codec: &dyn PrimaryKeyCodec,
756 col_idx_in_pk: usize,
757 column_id: ColumnId,
758 ) -> Result<Option<&Value>> {
759 if self.pk_values.is_none() {
760 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
761 }
762
763 let pk_values = self.pk_values.as_ref().unwrap();
764 Ok(match pk_values {
765 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
766 CompositeValues::Sparse(values) => values.get(&column_id),
767 })
768 }
769
770 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
774 if self.fields_idx.is_none() {
775 self.fields_idx = Some(
776 self.fields
777 .iter()
778 .enumerate()
779 .map(|(i, c)| (c.column_id, i))
780 .collect(),
781 );
782 }
783
784 self.fields_idx
785 .as_ref()
786 .unwrap()
787 .get(&column_id)
788 .map(|&idx| &self.fields[idx])
789 }
790}
791
792#[cfg(debug_assertions)]
794#[derive(Default)]
795pub(crate) struct BatchChecker {
796 last_batch: Option<Batch>,
797 start: Option<Timestamp>,
798 end: Option<Timestamp>,
799}
800
801#[cfg(debug_assertions)]
802impl BatchChecker {
803 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
805 self.start = start;
806 self
807 }
808
809 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
811 self.end = end;
812 self
813 }
814
815 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
818 batch.check_monotonic()?;
819
820 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
821 && start > first
822 {
823 return Err(format!(
824 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
825 start, first
826 ));
827 }
828 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
829 && end <= last
830 {
831 return Err(format!(
832 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
833 end, last
834 ));
835 }
836
837 let res = self
840 .last_batch
841 .as_ref()
842 .map(|last| last.check_next_batch(batch))
843 .unwrap_or(Ok(()));
844 self.last_batch = Some(batch.clone());
845 res
846 }
847
848 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
850 use std::fmt::Write;
851
852 let mut message = String::new();
853 if let Some(last) = &self.last_batch {
854 write!(
855 message,
856 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
857 last.primary_key(),
858 last.last_timestamp(),
859 last.last_sequence()
860 )
861 .unwrap();
862 }
863 write!(
864 message,
865 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
866 batch.primary_key(),
867 batch.timestamps(),
868 batch.sequences()
869 )
870 .unwrap();
871
872 message
873 }
874
875 pub(crate) fn ensure_part_range_batch(
877 &mut self,
878 scanner: &str,
879 region_id: store_api::storage::RegionId,
880 partition: usize,
881 part_range: store_api::region_engine::PartitionRange,
882 batch: &Batch,
883 ) {
884 if let Err(e) = self.check_monotonic(batch) {
885 let err_msg = format!(
886 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
887 scanner, e, region_id, partition, part_range,
888 );
889 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
890 panic!("{err_msg}, batch rows: {}", batch.num_rows());
892 }
893 }
894}
895
896const TIMESTAMP_KEY_LEN: usize = 9;
898
899fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
901 let arrays: Vec<_> = iter.collect();
902 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
903 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
904}
905
906#[derive(Debug, PartialEq, Eq, Clone)]
908pub struct BatchColumn {
909 pub column_id: ColumnId,
911 pub data: VectorRef,
913}
914
915pub struct BatchBuilder {
917 primary_key: Vec<u8>,
918 timestamps: Option<VectorRef>,
919 sequences: Option<Arc<UInt64Vector>>,
920 op_types: Option<Arc<UInt8Vector>>,
921 fields: Vec<BatchColumn>,
922}
923
924impl BatchBuilder {
925 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
927 BatchBuilder {
928 primary_key,
929 timestamps: None,
930 sequences: None,
931 op_types: None,
932 fields: Vec::new(),
933 }
934 }
935
936 pub fn with_required_columns(
938 primary_key: Vec<u8>,
939 timestamps: VectorRef,
940 sequences: Arc<UInt64Vector>,
941 op_types: Arc<UInt8Vector>,
942 ) -> BatchBuilder {
943 BatchBuilder {
944 primary_key,
945 timestamps: Some(timestamps),
946 sequences: Some(sequences),
947 op_types: Some(op_types),
948 fields: Vec::new(),
949 }
950 }
951
952 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
954 self.fields = fields;
955 self
956 }
957
958 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
960 self.fields.push(column);
961 self
962 }
963
964 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
966 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
967 self.fields.push(BatchColumn {
968 column_id,
969 data: vector,
970 });
971
972 Ok(self)
973 }
974
975 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
977 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
978 ensure!(
979 vector.data_type().is_timestamp(),
980 InvalidBatchSnafu {
981 reason: format!("{:?} is not a timestamp type", vector.data_type()),
982 }
983 );
984
985 self.timestamps = Some(vector);
986 Ok(self)
987 }
988
989 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
991 ensure!(
992 *array.data_type() == arrow::datatypes::DataType::UInt64,
993 InvalidBatchSnafu {
994 reason: "sequence array is not UInt64 type",
995 }
996 );
997 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
999 self.sequences = Some(vector);
1000
1001 Ok(self)
1002 }
1003
1004 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1006 ensure!(
1007 *array.data_type() == arrow::datatypes::DataType::UInt8,
1008 InvalidBatchSnafu {
1009 reason: "sequence array is not UInt8 type",
1010 }
1011 );
1012 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1014 self.op_types = Some(vector);
1015
1016 Ok(self)
1017 }
1018
1019 pub fn build(self) -> Result<Batch> {
1021 let timestamps = self.timestamps.context(InvalidBatchSnafu {
1022 reason: "missing timestamps",
1023 })?;
1024 let sequences = self.sequences.context(InvalidBatchSnafu {
1025 reason: "missing sequences",
1026 })?;
1027 let op_types = self.op_types.context(InvalidBatchSnafu {
1028 reason: "missing op_types",
1029 })?;
1030 assert_eq!(0, timestamps.null_count());
1033 assert_eq!(0, sequences.null_count());
1034 assert_eq!(0, op_types.null_count());
1035
1036 let ts_len = timestamps.len();
1037 ensure!(
1038 sequences.len() == ts_len,
1039 InvalidBatchSnafu {
1040 reason: format!(
1041 "sequence have different len {} != {}",
1042 sequences.len(),
1043 ts_len
1044 ),
1045 }
1046 );
1047 ensure!(
1048 op_types.len() == ts_len,
1049 InvalidBatchSnafu {
1050 reason: format!(
1051 "op type have different len {} != {}",
1052 op_types.len(),
1053 ts_len
1054 ),
1055 }
1056 );
1057 for column in &self.fields {
1058 ensure!(
1059 column.data.len() == ts_len,
1060 InvalidBatchSnafu {
1061 reason: format!(
1062 "column {} has different len {} != {}",
1063 column.column_id,
1064 column.data.len(),
1065 ts_len
1066 ),
1067 }
1068 );
1069 }
1070
1071 Ok(Batch {
1072 primary_key: self.primary_key,
1073 pk_values: None,
1074 timestamps,
1075 sequences,
1076 op_types,
1077 fields: self.fields,
1078 fields_idx: None,
1079 })
1080 }
1081}
1082
1083impl From<Batch> for BatchBuilder {
1084 fn from(batch: Batch) -> Self {
1085 Self {
1086 primary_key: batch.primary_key,
1087 timestamps: Some(batch.timestamps),
1088 sequences: Some(batch.sequences),
1089 op_types: Some(batch.op_types),
1090 fields: batch.fields,
1091 }
1092 }
1093}
1094
1095pub enum Source {
1099 Reader(BoxedBatchReader),
1101 Iter(BoxedBatchIterator),
1103 Stream(BoxedBatchStream),
1105 PruneReader(PruneReader),
1107}
1108
1109impl Source {
1110 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1112 match self {
1113 Source::Reader(reader) => reader.next_batch().await,
1114 Source::Iter(iter) => iter.next().transpose(),
1115 Source::Stream(stream) => stream.try_next().await,
1116 Source::PruneReader(reader) => reader.next_batch().await,
1117 }
1118 }
1119}
1120
1121pub enum FlatSource {
1123 Iter(BoxedRecordBatchIterator),
1125 Stream(BoxedRecordBatchStream),
1127}
1128
1129impl FlatSource {
1130 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1132 match self {
1133 FlatSource::Iter(iter) => iter.next().transpose(),
1134 FlatSource::Stream(stream) => stream.try_next().await,
1135 }
1136 }
1137}
1138
1139#[async_trait]
1143pub trait BatchReader: Send {
1144 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1152}
1153
1154pub type BoxedBatchReader = Box<dyn BatchReader>;
1156
1157pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1159
1160pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1162
1163#[async_trait::async_trait]
1164impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1165 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1166 (**self).next_batch().await
1167 }
1168}
1169
1170#[derive(Debug, Default)]
1172pub(crate) struct ScannerMetrics {
1173 scan_cost: Duration,
1175 yield_cost: Duration,
1177 num_batches: usize,
1179 num_rows: usize,
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1186 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1187 use store_api::codec::PrimaryKeyEncoding;
1188 use store_api::storage::consts::ReservedColumnId;
1189
1190 use super::*;
1191 use crate::error::Error;
1192 use crate::test_util::new_batch_builder;
1193
1194 fn new_batch(
1195 timestamps: &[i64],
1196 sequences: &[u64],
1197 op_types: &[OpType],
1198 field: &[u64],
1199 ) -> Batch {
1200 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1201 .build()
1202 .unwrap()
1203 }
1204
1205 fn new_batch_with_u64_fields(
1206 timestamps: &[i64],
1207 sequences: &[u64],
1208 op_types: &[OpType],
1209 fields: &[(ColumnId, &[Option<u64>])],
1210 ) -> Batch {
1211 assert_eq!(timestamps.len(), sequences.len());
1212 assert_eq!(timestamps.len(), op_types.len());
1213 for (_, values) in fields {
1214 assert_eq!(timestamps.len(), values.len());
1215 }
1216
1217 let mut builder = BatchBuilder::new(b"test".to_vec());
1218 builder
1219 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1220 timestamps.iter().copied(),
1221 )))
1222 .unwrap()
1223 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1224 sequences.iter().copied(),
1225 )))
1226 .unwrap()
1227 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1228 op_types.iter().map(|v| *v as u8),
1229 )))
1230 .unwrap();
1231
1232 for (col_id, values) in fields {
1233 builder
1234 .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1235 .unwrap();
1236 }
1237
1238 builder.build().unwrap()
1239 }
1240
1241 fn new_batch_without_fields(
1242 timestamps: &[i64],
1243 sequences: &[u64],
1244 op_types: &[OpType],
1245 ) -> Batch {
1246 assert_eq!(timestamps.len(), sequences.len());
1247 assert_eq!(timestamps.len(), op_types.len());
1248
1249 let mut builder = BatchBuilder::new(b"test".to_vec());
1250 builder
1251 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1252 timestamps.iter().copied(),
1253 )))
1254 .unwrap()
1255 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1256 sequences.iter().copied(),
1257 )))
1258 .unwrap()
1259 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1260 op_types.iter().map(|v| *v as u8),
1261 )))
1262 .unwrap();
1263
1264 builder.build().unwrap()
1265 }
1266
1267 #[test]
1268 fn test_empty_batch() {
1269 let batch = Batch::empty();
1270 assert!(batch.is_empty());
1271 assert_eq!(None, batch.first_timestamp());
1272 assert_eq!(None, batch.last_timestamp());
1273 assert_eq!(None, batch.first_sequence());
1274 assert_eq!(None, batch.last_sequence());
1275 assert!(batch.timestamps_native().is_none());
1276 }
1277
1278 #[test]
1279 fn test_first_last_one() {
1280 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1281 assert_eq!(
1282 Timestamp::new_millisecond(1),
1283 batch.first_timestamp().unwrap()
1284 );
1285 assert_eq!(
1286 Timestamp::new_millisecond(1),
1287 batch.last_timestamp().unwrap()
1288 );
1289 assert_eq!(2, batch.first_sequence().unwrap());
1290 assert_eq!(2, batch.last_sequence().unwrap());
1291 }
1292
1293 #[test]
1294 fn test_first_last_multiple() {
1295 let batch = new_batch(
1296 &[1, 2, 3],
1297 &[11, 12, 13],
1298 &[OpType::Put, OpType::Put, OpType::Put],
1299 &[21, 22, 23],
1300 );
1301 assert_eq!(
1302 Timestamp::new_millisecond(1),
1303 batch.first_timestamp().unwrap()
1304 );
1305 assert_eq!(
1306 Timestamp::new_millisecond(3),
1307 batch.last_timestamp().unwrap()
1308 );
1309 assert_eq!(11, batch.first_sequence().unwrap());
1310 assert_eq!(13, batch.last_sequence().unwrap());
1311 }
1312
1313 #[test]
1314 fn test_slice() {
1315 let batch = new_batch(
1316 &[1, 2, 3, 4],
1317 &[11, 12, 13, 14],
1318 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1319 &[21, 22, 23, 24],
1320 );
1321 let batch = batch.slice(1, 2);
1322 let expect = new_batch(
1323 &[2, 3],
1324 &[12, 13],
1325 &[OpType::Delete, OpType::Put],
1326 &[22, 23],
1327 );
1328 assert_eq!(expect, batch);
1329 }
1330
1331 #[test]
1332 fn test_timestamps_native() {
1333 let batch = new_batch(
1334 &[1, 2, 3, 4],
1335 &[11, 12, 13, 14],
1336 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1337 &[21, 22, 23, 24],
1338 );
1339 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1340 }
1341
1342 #[test]
1343 fn test_concat_empty() {
1344 let err = Batch::concat(vec![]).unwrap_err();
1345 assert!(
1346 matches!(err, Error::InvalidBatch { .. }),
1347 "unexpected err: {err}"
1348 );
1349 }
1350
1351 #[test]
1352 fn test_concat_one() {
1353 let batch = new_batch(&[], &[], &[], &[]);
1354 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1355 assert_eq!(batch, actual);
1356
1357 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1358 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1359 assert_eq!(batch, actual);
1360 }
1361
1362 #[test]
1363 fn test_concat_multiple() {
1364 let batches = vec![
1365 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1366 new_batch(
1367 &[3, 4, 5],
1368 &[13, 14, 15],
1369 &[OpType::Put, OpType::Delete, OpType::Put],
1370 &[23, 24, 25],
1371 ),
1372 new_batch(&[], &[], &[], &[]),
1373 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1374 ];
1375 let batch = Batch::concat(batches).unwrap();
1376 let expect = new_batch(
1377 &[1, 2, 3, 4, 5, 6],
1378 &[11, 12, 13, 14, 15, 16],
1379 &[
1380 OpType::Put,
1381 OpType::Put,
1382 OpType::Put,
1383 OpType::Delete,
1384 OpType::Put,
1385 OpType::Put,
1386 ],
1387 &[21, 22, 23, 24, 25, 26],
1388 );
1389 assert_eq!(expect, batch);
1390 }
1391
1392 #[test]
1393 fn test_concat_different() {
1394 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1395 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1396 batch2.primary_key = b"hello".to_vec();
1397 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1398 assert!(
1399 matches!(err, Error::InvalidBatch { .. }),
1400 "unexpected err: {err}"
1401 );
1402 }
1403
1404 #[test]
1405 fn test_concat_different_fields() {
1406 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1407 let fields = vec![
1408 batch1.fields()[0].clone(),
1409 BatchColumn {
1410 column_id: 2,
1411 data: Arc::new(UInt64Vector::from_slice([2])),
1412 },
1413 ];
1414 let batch2 = batch1.clone().with_fields(fields).unwrap();
1416 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1417 assert!(
1418 matches!(err, Error::InvalidBatch { .. }),
1419 "unexpected err: {err}"
1420 );
1421
1422 let fields = vec![BatchColumn {
1424 column_id: 2,
1425 data: Arc::new(UInt64Vector::from_slice([2])),
1426 }];
1427 let batch2 = batch1.clone().with_fields(fields).unwrap();
1428 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1429 assert!(
1430 matches!(err, Error::InvalidBatch { .. }),
1431 "unexpected err: {err}"
1432 );
1433 }
1434
1435 #[test]
1436 fn test_filter_deleted_empty() {
1437 let mut batch = new_batch(&[], &[], &[], &[]);
1438 batch.filter_deleted().unwrap();
1439 assert!(batch.is_empty());
1440 }
1441
1442 #[test]
1443 fn test_filter_deleted() {
1444 let mut batch = new_batch(
1445 &[1, 2, 3, 4],
1446 &[11, 12, 13, 14],
1447 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1448 &[21, 22, 23, 24],
1449 );
1450 batch.filter_deleted().unwrap();
1451 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1452 assert_eq!(expect, batch);
1453
1454 let mut batch = new_batch(
1455 &[1, 2, 3, 4],
1456 &[11, 12, 13, 14],
1457 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1458 &[21, 22, 23, 24],
1459 );
1460 let expect = batch.clone();
1461 batch.filter_deleted().unwrap();
1462 assert_eq!(expect, batch);
1463 }
1464
1465 #[test]
1466 fn test_filter_by_sequence() {
1467 let mut batch = new_batch(
1469 &[1, 2, 3, 4],
1470 &[11, 12, 13, 14],
1471 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1472 &[21, 22, 23, 24],
1473 );
1474 batch
1475 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1476 .unwrap();
1477 let expect = new_batch(
1478 &[1, 2, 3],
1479 &[11, 12, 13],
1480 &[OpType::Put, OpType::Put, OpType::Put],
1481 &[21, 22, 23],
1482 );
1483 assert_eq!(expect, batch);
1484
1485 let mut batch = new_batch(
1487 &[1, 2, 3, 4],
1488 &[11, 12, 13, 14],
1489 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1490 &[21, 22, 23, 24],
1491 );
1492
1493 batch
1494 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1495 .unwrap();
1496 assert!(batch.is_empty());
1497
1498 let mut batch = new_batch(
1500 &[1, 2, 3, 4],
1501 &[11, 12, 13, 14],
1502 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1503 &[21, 22, 23, 24],
1504 );
1505 let expect = batch.clone();
1506 batch.filter_by_sequence(None).unwrap();
1507 assert_eq!(expect, batch);
1508
1509 let mut batch = new_batch(&[], &[], &[], &[]);
1511 batch
1512 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1513 .unwrap();
1514 assert!(batch.is_empty());
1515
1516 let mut batch = new_batch(&[], &[], &[], &[]);
1518 batch.filter_by_sequence(None).unwrap();
1519 assert!(batch.is_empty());
1520
1521 let mut batch = new_batch(
1523 &[1, 2, 3, 4],
1524 &[11, 12, 13, 14],
1525 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1526 &[21, 22, 23, 24],
1527 );
1528 batch
1529 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1530 .unwrap();
1531 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1532 assert_eq!(expect, batch);
1533
1534 let mut batch = new_batch(
1536 &[1, 2, 3, 4],
1537 &[11, 12, 13, 14],
1538 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1539 &[21, 22, 23, 24],
1540 );
1541 batch
1542 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1543 .unwrap();
1544 assert!(batch.is_empty());
1545
1546 let mut batch = new_batch(
1548 &[1, 2, 3, 4, 5],
1549 &[11, 12, 13, 14, 15],
1550 &[
1551 OpType::Put,
1552 OpType::Put,
1553 OpType::Put,
1554 OpType::Put,
1555 OpType::Put,
1556 ],
1557 &[21, 22, 23, 24, 25],
1558 );
1559 batch
1560 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1561 .unwrap();
1562 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1563 assert_eq!(expect, batch);
1564
1565 let mut batch = new_batch(
1567 &[1, 2, 3, 4, 5],
1568 &[11, 12, 13, 14, 15],
1569 &[
1570 OpType::Put,
1571 OpType::Delete,
1572 OpType::Put,
1573 OpType::Delete,
1574 OpType::Put,
1575 ],
1576 &[21, 22, 23, 24, 25],
1577 );
1578 batch
1579 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1580 .unwrap();
1581 let expect = new_batch(
1582 &[2, 3],
1583 &[12, 13],
1584 &[OpType::Delete, OpType::Put],
1585 &[22, 23],
1586 );
1587 assert_eq!(expect, batch);
1588
1589 let mut batch = new_batch(
1591 &[1, 2, 3, 4],
1592 &[11, 12, 13, 14],
1593 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1594 &[21, 22, 23, 24],
1595 );
1596 batch
1597 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1598 .unwrap();
1599 assert!(batch.is_empty());
1600 }
1601
1602 #[test]
1603 fn test_merge_last_non_null_no_dup() {
1604 let mut batch = new_batch_with_u64_fields(
1605 &[1, 2],
1606 &[2, 1],
1607 &[OpType::Put, OpType::Put],
1608 &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1609 );
1610 let expect = batch.clone();
1611 batch.merge_last_non_null().unwrap();
1612 assert_eq!(expect, batch);
1613 }
1614
1615 #[test]
1616 fn test_merge_last_non_null_fill_null_fields() {
1617 let mut batch = new_batch_with_u64_fields(
1619 &[1, 1, 1],
1620 &[3, 2, 1],
1621 &[OpType::Put, OpType::Put, OpType::Put],
1622 &[
1623 (1, &[None, Some(10), Some(11)]),
1624 (2, &[Some(100), Some(200), Some(300)]),
1625 ],
1626 );
1627 batch.merge_last_non_null().unwrap();
1628
1629 let expect = new_batch_with_u64_fields(
1632 &[1],
1633 &[3],
1634 &[OpType::Put],
1635 &[(1, &[Some(10)]), (2, &[Some(100)])],
1636 );
1637 assert_eq!(expect, batch);
1638 }
1639
1640 #[test]
1641 fn test_merge_last_non_null_stop_at_delete_row() {
1642 let mut batch = new_batch_with_u64_fields(
1645 &[1, 1, 1],
1646 &[3, 2, 1],
1647 &[OpType::Put, OpType::Delete, OpType::Put],
1648 &[
1649 (1, &[None, Some(10), Some(11)]),
1650 (2, &[Some(100), Some(200), Some(300)]),
1651 ],
1652 );
1653 batch.merge_last_non_null().unwrap();
1654
1655 let expect = new_batch_with_u64_fields(
1656 &[1],
1657 &[3],
1658 &[OpType::Put],
1659 &[(1, &[None]), (2, &[Some(100)])],
1660 );
1661 assert_eq!(expect, batch);
1662 }
1663
1664 #[test]
1665 fn test_merge_last_non_null_base_delete_no_merge() {
1666 let mut batch = new_batch_with_u64_fields(
1667 &[1, 1],
1668 &[3, 2],
1669 &[OpType::Delete, OpType::Put],
1670 &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1671 );
1672 batch.merge_last_non_null().unwrap();
1673
1674 let expect =
1676 new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1677 assert_eq!(expect, batch);
1678 }
1679
1680 #[test]
1681 fn test_merge_last_non_null_multiple_timestamp_groups() {
1682 let mut batch = new_batch_with_u64_fields(
1683 &[1, 1, 2, 3, 3],
1684 &[5, 4, 3, 2, 1],
1685 &[
1686 OpType::Put,
1687 OpType::Put,
1688 OpType::Put,
1689 OpType::Put,
1690 OpType::Put,
1691 ],
1692 &[
1693 (1, &[None, Some(10), Some(20), None, Some(30)]),
1694 (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1695 ],
1696 );
1697 batch.merge_last_non_null().unwrap();
1698
1699 let expect = new_batch_with_u64_fields(
1700 &[1, 2, 3],
1701 &[5, 3, 2],
1702 &[OpType::Put, OpType::Put, OpType::Put],
1703 &[
1704 (1, &[Some(10), Some(20), Some(30)]),
1705 (2, &[Some(100), Some(120), Some(130)]),
1706 ],
1707 );
1708 assert_eq!(expect, batch);
1709 }
1710
1711 #[test]
1712 fn test_merge_last_non_null_no_fields() {
1713 let mut batch = new_batch_without_fields(
1714 &[1, 1, 2],
1715 &[3, 2, 1],
1716 &[OpType::Put, OpType::Put, OpType::Put],
1717 );
1718 batch.merge_last_non_null().unwrap();
1719
1720 let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1721 assert_eq!(expect, batch);
1722 }
1723
1724 #[test]
1725 fn test_filter() {
1726 let mut batch = new_batch(
1728 &[1, 2, 3, 4],
1729 &[11, 12, 13, 14],
1730 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1731 &[21, 22, 23, 24],
1732 );
1733 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1734 batch.filter(&predicate).unwrap();
1735 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1736 assert_eq!(expect, batch);
1737
1738 let mut batch = new_batch(
1740 &[1, 2, 3, 4],
1741 &[11, 12, 13, 14],
1742 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1743 &[21, 22, 23, 24],
1744 );
1745 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1746 batch.filter(&predicate).unwrap();
1747 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1748 assert_eq!(expect, batch);
1749
1750 let predicate = BooleanVector::from_vec(vec![false, false]);
1752 batch.filter(&predicate).unwrap();
1753 assert!(batch.is_empty());
1754 }
1755
1756 #[test]
1757 fn test_sort_and_dedup() {
1758 let original = new_batch(
1759 &[2, 3, 1, 4, 5, 2],
1760 &[1, 2, 3, 4, 5, 6],
1761 &[
1762 OpType::Put,
1763 OpType::Put,
1764 OpType::Put,
1765 OpType::Put,
1766 OpType::Put,
1767 OpType::Put,
1768 ],
1769 &[21, 22, 23, 24, 25, 26],
1770 );
1771
1772 let mut batch = original.clone();
1773 batch.sort(true).unwrap();
1774 assert_eq!(
1776 new_batch(
1777 &[1, 2, 3, 4, 5],
1778 &[3, 6, 2, 4, 5],
1779 &[
1780 OpType::Put,
1781 OpType::Put,
1782 OpType::Put,
1783 OpType::Put,
1784 OpType::Put,
1785 ],
1786 &[23, 26, 22, 24, 25],
1787 ),
1788 batch
1789 );
1790
1791 let mut batch = original.clone();
1792 batch.sort(false).unwrap();
1793
1794 assert_eq!(
1796 new_batch(
1797 &[1, 2, 2, 3, 4, 5],
1798 &[3, 6, 1, 2, 4, 5],
1799 &[
1800 OpType::Put,
1801 OpType::Put,
1802 OpType::Put,
1803 OpType::Put,
1804 OpType::Put,
1805 OpType::Put,
1806 ],
1807 &[23, 26, 21, 22, 24, 25],
1808 ),
1809 batch
1810 );
1811
1812 let original = new_batch(
1813 &[2, 2, 1],
1814 &[1, 6, 1],
1815 &[OpType::Delete, OpType::Put, OpType::Put],
1816 &[21, 22, 23],
1817 );
1818
1819 let mut batch = original.clone();
1820 batch.sort(true).unwrap();
1821 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1822 assert_eq!(expect, batch);
1823
1824 let mut batch = original.clone();
1825 batch.sort(false).unwrap();
1826 let expect = new_batch(
1827 &[1, 2, 2],
1828 &[1, 6, 1],
1829 &[OpType::Put, OpType::Put, OpType::Delete],
1830 &[23, 22, 21],
1831 );
1832 assert_eq!(expect, batch);
1833 }
1834
1835 #[test]
1836 fn test_get_value() {
1837 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1838
1839 for encoding in encodings {
1840 let codec = build_primary_key_codec_with_fields(
1841 encoding,
1842 [
1843 (
1844 ReservedColumnId::table_id(),
1845 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1846 ),
1847 (
1848 ReservedColumnId::tsid(),
1849 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1850 ),
1851 (
1852 100,
1853 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1854 ),
1855 (
1856 200,
1857 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1858 ),
1859 ]
1860 .into_iter(),
1861 );
1862
1863 let values = [
1864 Value::UInt32(1000),
1865 Value::UInt64(2000),
1866 Value::String("abcdefgh".into()),
1867 Value::String("zyxwvu".into()),
1868 ];
1869 let mut buf = vec![];
1870 codec
1871 .encode_values(
1872 &[
1873 (ReservedColumnId::table_id(), values[0].clone()),
1874 (ReservedColumnId::tsid(), values[1].clone()),
1875 (100, values[2].clone()),
1876 (200, values[3].clone()),
1877 ],
1878 &mut buf,
1879 )
1880 .unwrap();
1881
1882 let field_col_id = 2;
1883 let mut batch = new_batch_builder(
1884 &buf,
1885 &[1, 2, 3],
1886 &[1, 1, 1],
1887 &[OpType::Put, OpType::Put, OpType::Put],
1888 field_col_id,
1889 &[42, 43, 44],
1890 )
1891 .build()
1892 .unwrap();
1893
1894 let v = batch
1895 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1896 .unwrap()
1897 .unwrap();
1898 assert_eq!(values[0], *v);
1899
1900 let v = batch
1901 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1902 .unwrap()
1903 .unwrap();
1904 assert_eq!(values[1], *v);
1905
1906 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1907 assert_eq!(values[2], *v);
1908
1909 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1910 assert_eq!(values[3], *v);
1911
1912 let v = batch.field_col_value(field_col_id).unwrap();
1913 assert_eq!(v.data.get(0), Value::UInt64(42));
1914 assert_eq!(v.data.get(1), Value::UInt64(43));
1915 assert_eq!(v.data.get(2), Value::UInt64(44));
1916 }
1917 }
1918}