1pub mod batch_adapter;
18pub mod compat;
19pub mod dedup;
20pub mod flat_dedup;
21pub mod flat_merge;
22pub mod flat_projection;
23pub mod last_row;
24pub mod projection;
25pub(crate) mod prune;
26pub(crate) mod pruner;
27pub mod range;
28#[cfg(feature = "test")]
29pub mod range_cache;
30#[cfg(not(feature = "test"))]
31pub(crate) mod range_cache;
32pub(crate) mod read_columns;
33pub mod scan_region;
34pub mod scan_util;
35pub(crate) mod seq_scan;
36pub mod series_scan;
37pub mod stream;
38pub(crate) mod unordered_scan;
39
40use std::collections::HashMap;
41use std::sync::Arc;
42use std::time::Duration;
43
44use api::v1::OpType;
45use async_trait::async_trait;
46use common_time::Timestamp;
47use datafusion_common::arrow::array::UInt8Array;
48use datatypes::arrow;
49use datatypes::arrow::array::{Array, ArrayRef};
50use datatypes::arrow::compute::SortOptions;
51use datatypes::arrow::record_batch::RecordBatch;
52use datatypes::arrow::row::{RowConverter, SortField};
53use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
54use datatypes::scalars::ScalarVectorBuilder;
55use datatypes::types::TimestampType;
56use datatypes::value::{Value, ValueRef};
57use datatypes::vectors::{
58 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
59 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
60 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
61 VectorRef,
62};
63use futures::TryStreamExt;
64use futures::stream::BoxStream;
65use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
66use snafu::{OptionExt, ResultExt, ensure};
67use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
68
69use crate::error::{
70 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
71 Result,
72};
73use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
74#[derive(Debug, PartialEq, Clone)]
79pub struct Batch {
80 primary_key: Vec<u8>,
82 pk_values: Option<CompositeValues>,
84 timestamps: VectorRef,
86 sequences: Arc<UInt64Vector>,
90 op_types: Arc<UInt8Vector>,
94 fields: Vec<BatchColumn>,
96 fields_idx: Option<HashMap<ColumnId, usize>>,
98}
99
100impl Batch {
101 pub fn new(
103 primary_key: Vec<u8>,
104 timestamps: VectorRef,
105 sequences: Arc<UInt64Vector>,
106 op_types: Arc<UInt8Vector>,
107 fields: Vec<BatchColumn>,
108 ) -> Result<Batch> {
109 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
110 .with_fields(fields)
111 .build()
112 }
113
114 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
116 Batch::new(
117 self.primary_key,
118 self.timestamps,
119 self.sequences,
120 self.op_types,
121 fields,
122 )
123 }
124
125 pub fn primary_key(&self) -> &[u8] {
127 &self.primary_key
128 }
129
130 pub fn pk_values(&self) -> Option<&CompositeValues> {
132 self.pk_values.as_ref()
133 }
134
135 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
137 self.pk_values = Some(pk_values);
138 }
139
140 #[cfg(any(test, feature = "test"))]
142 pub fn remove_pk_values(&mut self) {
143 self.pk_values = None;
144 }
145
146 pub fn fields(&self) -> &[BatchColumn] {
148 &self.fields
149 }
150
151 pub fn timestamps(&self) -> &VectorRef {
153 &self.timestamps
154 }
155
156 pub fn sequences(&self) -> &Arc<UInt64Vector> {
158 &self.sequences
159 }
160
161 pub fn op_types(&self) -> &Arc<UInt8Vector> {
163 &self.op_types
164 }
165
166 pub fn num_rows(&self) -> usize {
168 self.sequences.len()
171 }
172
173 #[allow(dead_code)]
175 pub(crate) fn empty() -> Self {
176 Self {
177 primary_key: vec![],
178 pk_values: None,
179 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
180 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
181 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
182 fields: vec![],
183 fields_idx: None,
184 }
185 }
186
187 pub fn is_empty(&self) -> bool {
189 self.num_rows() == 0
190 }
191
192 pub fn first_timestamp(&self) -> Option<Timestamp> {
194 if self.timestamps.is_empty() {
195 return None;
196 }
197
198 Some(self.get_timestamp(0))
199 }
200
201 pub fn last_timestamp(&self) -> Option<Timestamp> {
203 if self.timestamps.is_empty() {
204 return None;
205 }
206
207 Some(self.get_timestamp(self.timestamps.len() - 1))
208 }
209
210 pub fn first_sequence(&self) -> Option<SequenceNumber> {
212 if self.sequences.is_empty() {
213 return None;
214 }
215
216 Some(self.get_sequence(0))
217 }
218
219 pub fn last_sequence(&self) -> Option<SequenceNumber> {
221 if self.sequences.is_empty() {
222 return None;
223 }
224
225 Some(self.get_sequence(self.sequences.len() - 1))
226 }
227
228 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
233 self.primary_key = primary_key;
234 }
235
236 pub fn slice(&self, offset: usize, length: usize) -> Batch {
241 let fields = self
242 .fields
243 .iter()
244 .map(|column| BatchColumn {
245 column_id: column.column_id,
246 data: column.data.slice(offset, length),
247 })
248 .collect();
249 Batch {
251 primary_key: self.primary_key.clone(),
254 pk_values: self.pk_values.clone(),
255 timestamps: self.timestamps.slice(offset, length),
256 sequences: Arc::new(self.sequences.get_slice(offset, length)),
257 op_types: Arc::new(self.op_types.get_slice(offset, length)),
258 fields,
259 fields_idx: self.fields_idx.clone(),
260 }
261 }
262
263 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
267 ensure!(
268 !batches.is_empty(),
269 InvalidBatchSnafu {
270 reason: "empty batches",
271 }
272 );
273 if batches.len() == 1 {
274 return Ok(batches.pop().unwrap());
276 }
277
278 let primary_key = std::mem::take(&mut batches[0].primary_key);
279 let first = &batches[0];
280 ensure!(
282 batches
283 .iter()
284 .skip(1)
285 .all(|b| b.primary_key() == primary_key),
286 InvalidBatchSnafu {
287 reason: "batches have different primary key",
288 }
289 );
290 for b in batches.iter().skip(1) {
291 ensure!(
292 b.fields.len() == first.fields.len(),
293 InvalidBatchSnafu {
294 reason: "batches have different field num",
295 }
296 );
297 for (l, r) in b.fields.iter().zip(&first.fields) {
298 ensure!(
299 l.column_id == r.column_id,
300 InvalidBatchSnafu {
301 reason: "batches have different fields",
302 }
303 );
304 }
305 }
306
307 let mut builder = BatchBuilder::new(primary_key);
309 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
311 builder.timestamps_array(array)?;
312 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
313 builder.sequences_array(array)?;
314 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
315 builder.op_types_array(array)?;
316 for (i, batch_column) in first.fields.iter().enumerate() {
317 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
318 builder.push_field_array(batch_column.column_id, array)?;
319 }
320
321 builder.build()
322 }
323
324 pub fn filter_deleted(&mut self) -> Result<()> {
326 let array = self.op_types.as_arrow();
328 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
330 let predicate =
331 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
332 self.filter(&BooleanVector::from(predicate))
333 }
334
335 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
338 self.timestamps = self
339 .timestamps
340 .filter(predicate)
341 .context(ComputeVectorSnafu)?;
342 self.sequences = Arc::new(
343 UInt64Vector::try_from_arrow_array(
344 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
345 .context(ComputeArrowSnafu)?,
346 )
347 .unwrap(),
348 );
349 self.op_types = Arc::new(
350 UInt8Vector::try_from_arrow_array(
351 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
352 .context(ComputeArrowSnafu)?,
353 )
354 .unwrap(),
355 );
356 for batch_column in &mut self.fields {
357 batch_column.data = batch_column
358 .data
359 .filter(predicate)
360 .context(ComputeVectorSnafu)?;
361 }
362
363 Ok(())
364 }
365
366 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
368 let seq_range = match sequence {
369 None => return Ok(()),
370 Some(seq_range) => {
371 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
372 else {
373 return Ok(());
374 };
375 let is_subset = match seq_range {
376 SequenceRange::Gt { min } => min < first,
377 SequenceRange::LtEq { max } => max >= last,
378 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
379 };
380 if is_subset {
381 return Ok(());
382 }
383 seq_range
384 }
385 };
386
387 let seqs = self.sequences.as_arrow();
388 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
389
390 let predicate = BooleanVector::from(predicate);
391 self.filter(&predicate)?;
392
393 Ok(())
394 }
395
396 pub fn sort(&mut self, dedup: bool) -> Result<()> {
403 let converter = RowConverter::new(vec![
406 SortField::new(self.timestamps.data_type().as_arrow_type()),
407 SortField::new_with_options(
408 self.sequences.data_type().as_arrow_type(),
409 SortOptions {
410 descending: true,
411 ..Default::default()
412 },
413 ),
414 ])
415 .context(ComputeArrowSnafu)?;
416 let columns = [
418 self.timestamps.to_arrow_array(),
419 self.sequences.to_arrow_array(),
420 ];
421 let rows = converter.convert_columns(&columns).unwrap();
422 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
423
424 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
425 if !was_sorted {
426 to_sort.sort_unstable_by_key(|x| x.1);
427 }
428
429 let num_rows = to_sort.len();
430 if dedup {
431 to_sort.dedup_by(|left, right| {
433 debug_assert_eq!(18, left.1.as_ref().len());
434 debug_assert_eq!(18, right.1.as_ref().len());
435 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
436 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
438 });
439 }
440 let no_dedup = to_sort.len() == num_rows;
441
442 if was_sorted && no_dedup {
443 return Ok(());
444 }
445 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
446 self.take_in_place(&indices)
447 }
448
449 pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
457 let num_rows = self.num_rows();
458 if num_rows < 2 {
459 return Ok(());
460 }
461
462 let Some(timestamps) = self.timestamps_native() else {
463 return Ok(());
464 };
465
466 let mut has_dup = false;
468 let mut group_count = 1;
469 for i in 1..num_rows {
470 has_dup |= timestamps[i] == timestamps[i - 1];
471 group_count += (timestamps[i] != timestamps[i - 1]) as usize;
472 }
473 if !has_dup {
474 return Ok(());
475 }
476
477 let num_fields = self.fields.len();
478 let op_types = self.op_types.as_arrow().values();
479
480 let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
481 let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
482 .map(|_| Vec::with_capacity(group_count))
483 .collect();
484
485 let mut start = 0;
486 while start < num_rows {
487 let ts = timestamps[start];
488 let mut end = start + 1;
489 while end < num_rows && timestamps[end] == ts {
490 end += 1;
491 }
492
493 let group_pos = base_indices.len();
494 base_indices.push(start as u32);
495
496 if num_fields > 0 {
497 for idx in &mut field_indices {
499 idx.push(start as u32);
500 }
501
502 let base_deleted = op_types[start] == OpType::Delete as u8;
503 if !base_deleted {
504 let mut missing_fields = Vec::new();
507 for (field_idx, col) in self.fields.iter().enumerate() {
508 if col.data.is_null(start) {
509 missing_fields.push(field_idx);
510 }
511 }
512
513 if !missing_fields.is_empty() {
514 for row_idx in (start + 1)..end {
515 if op_types[row_idx] == OpType::Delete as u8 {
516 break;
517 }
518
519 missing_fields.retain(|&field_idx| {
520 if self.fields[field_idx].data.is_null(row_idx) {
521 true
522 } else {
523 field_indices[field_idx][group_pos] = row_idx as u32;
524 false
525 }
526 });
527
528 if missing_fields.is_empty() {
529 break;
530 }
531 }
532 }
533 }
534 }
535
536 start = end;
537 }
538
539 let base_indices = UInt32Vector::from_vec(base_indices);
540 self.timestamps = self
541 .timestamps
542 .take(&base_indices)
543 .context(ComputeVectorSnafu)?;
544 let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
545 .context(ComputeArrowSnafu)?;
546 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
548 let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
549 .context(ComputeArrowSnafu)?;
550 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
552
553 for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
554 let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
555 batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
556 }
557
558 Ok(())
559 }
560
561 pub fn memory_size(&self) -> usize {
563 let mut size = std::mem::size_of::<Self>();
564 size += self.primary_key.len();
565 size += self.timestamps.memory_size();
566 size += self.sequences.memory_size();
567 size += self.op_types.memory_size();
568 for batch_column in &self.fields {
569 size += batch_column.data.memory_size();
570 }
571 size
572 }
573
574 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
576 if self.timestamps.is_empty() {
577 return None;
578 }
579
580 let values = match self.timestamps.data_type() {
581 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
582 .timestamps
583 .as_any()
584 .downcast_ref::<TimestampSecondVector>()
585 .unwrap()
586 .as_arrow()
587 .values(),
588 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
589 .timestamps
590 .as_any()
591 .downcast_ref::<TimestampMillisecondVector>()
592 .unwrap()
593 .as_arrow()
594 .values(),
595 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
596 .timestamps
597 .as_any()
598 .downcast_ref::<TimestampMicrosecondVector>()
599 .unwrap()
600 .as_arrow()
601 .values(),
602 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
603 .timestamps
604 .as_any()
605 .downcast_ref::<TimestampNanosecondVector>()
606 .unwrap()
607 .as_arrow()
608 .values(),
609 other => panic!("timestamps in a Batch has other type {:?}", other),
610 };
611
612 Some(values)
613 }
614
615 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
617 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
618 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
619 .context(ComputeArrowSnafu)?;
620 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
622 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
623 .context(ComputeArrowSnafu)?;
624 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
625 for batch_column in &mut self.fields {
626 batch_column.data = batch_column
627 .data
628 .take(indices)
629 .context(ComputeVectorSnafu)?;
630 }
631
632 Ok(())
633 }
634
635 fn get_timestamp(&self, index: usize) -> Timestamp {
640 match self.timestamps.get_ref(index) {
641 ValueRef::Timestamp(timestamp) => timestamp,
642
643 value => panic!("{:?} is not a timestamp", value),
645 }
646 }
647
648 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
653 self.sequences.get_data(index).unwrap()
655 }
656
657 #[cfg(debug_assertions)]
659 #[allow(dead_code)]
660 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
661 use std::cmp::Ordering;
662 if self.timestamps_native().is_none() {
663 return Ok(());
664 }
665
666 let timestamps = self.timestamps_native().unwrap();
667 let sequences = self.sequences.as_arrow().values();
668 for (i, window) in timestamps.windows(2).enumerate() {
669 let current = window[0];
670 let next = window[1];
671 let current_sequence = sequences[i];
672 let next_sequence = sequences[i + 1];
673 match current.cmp(&next) {
674 Ordering::Less => {
675 continue;
677 }
678 Ordering::Equal => {
679 if current_sequence < next_sequence {
681 return Err(format!(
682 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
683 current, next, current_sequence, next_sequence, i
684 ));
685 }
686 }
687 Ordering::Greater => {
688 return Err(format!(
690 "timestamps are not monotonic: {} > {}, index: {}",
691 current, next, i
692 ));
693 }
694 }
695 }
696
697 Ok(())
698 }
699
700 #[cfg(debug_assertions)]
702 #[allow(dead_code)]
703 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
704 if self.primary_key() < other.primary_key() {
706 return Ok(());
707 }
708 if self.primary_key() > other.primary_key() {
709 return Err(format!(
710 "primary key is not monotonic: {:?} > {:?}",
711 self.primary_key(),
712 other.primary_key()
713 ));
714 }
715 if self.last_timestamp() < other.first_timestamp() {
717 return Ok(());
718 }
719 if self.last_timestamp() > other.first_timestamp() {
720 return Err(format!(
721 "timestamps are not monotonic: {:?} > {:?}",
722 self.last_timestamp(),
723 other.first_timestamp()
724 ));
725 }
726 if self.last_sequence() >= other.first_sequence() {
728 return Ok(());
729 }
730 Err(format!(
731 "sequences are not monotonic: {:?} < {:?}",
732 self.last_sequence(),
733 other.first_sequence()
734 ))
735 }
736
737 pub fn pk_col_value(
741 &mut self,
742 codec: &dyn PrimaryKeyCodec,
743 col_idx_in_pk: usize,
744 column_id: ColumnId,
745 ) -> Result<Option<&Value>> {
746 if self.pk_values.is_none() {
747 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
748 }
749
750 let pk_values = self.pk_values.as_ref().unwrap();
751 Ok(match pk_values {
752 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
753 CompositeValues::Sparse(values) => values.get(&column_id),
754 })
755 }
756
757 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
761 if self.fields_idx.is_none() {
762 self.fields_idx = Some(
763 self.fields
764 .iter()
765 .enumerate()
766 .map(|(i, c)| (c.column_id, i))
767 .collect(),
768 );
769 }
770
771 self.fields_idx
772 .as_ref()
773 .unwrap()
774 .get(&column_id)
775 .map(|&idx| &self.fields[idx])
776 }
777}
778
779#[cfg(debug_assertions)]
781#[derive(Default)]
782#[allow(dead_code)]
783pub(crate) struct BatchChecker {
784 last_batch: Option<Batch>,
785 start: Option<Timestamp>,
786 end: Option<Timestamp>,
787}
788
789#[cfg(debug_assertions)]
790#[allow(dead_code)]
791impl BatchChecker {
792 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
794 self.start = start;
795 self
796 }
797
798 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
800 self.end = end;
801 self
802 }
803
804 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
807 batch.check_monotonic()?;
808
809 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
810 && start > first
811 {
812 return Err(format!(
813 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
814 start, first
815 ));
816 }
817 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
818 && end <= last
819 {
820 return Err(format!(
821 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
822 end, last
823 ));
824 }
825
826 let res = self
829 .last_batch
830 .as_ref()
831 .map(|last| last.check_next_batch(batch))
832 .unwrap_or(Ok(()));
833 self.last_batch = Some(batch.clone());
834 res
835 }
836
837 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
839 use std::fmt::Write;
840
841 let mut message = String::new();
842 if let Some(last) = &self.last_batch {
843 write!(
844 message,
845 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
846 last.primary_key(),
847 last.last_timestamp(),
848 last.last_sequence()
849 )
850 .unwrap();
851 }
852 write!(
853 message,
854 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
855 batch.primary_key(),
856 batch.timestamps(),
857 batch.sequences()
858 )
859 .unwrap();
860
861 message
862 }
863
864 pub(crate) fn ensure_part_range_batch(
866 &mut self,
867 scanner: &str,
868 region_id: store_api::storage::RegionId,
869 partition: usize,
870 part_range: store_api::region_engine::PartitionRange,
871 batch: &Batch,
872 ) {
873 if let Err(e) = self.check_monotonic(batch) {
874 let err_msg = format!(
875 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
876 scanner, e, region_id, partition, part_range,
877 );
878 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
879 panic!("{err_msg}, batch rows: {}", batch.num_rows());
881 }
882 }
883}
884
885const TIMESTAMP_KEY_LEN: usize = 9;
887
888fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
890 let arrays: Vec<_> = iter.collect();
891 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
892 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
893}
894
895#[derive(Debug, PartialEq, Eq, Clone)]
897pub struct BatchColumn {
898 pub column_id: ColumnId,
900 pub data: VectorRef,
902}
903
904pub struct BatchBuilder {
906 primary_key: Vec<u8>,
907 timestamps: Option<VectorRef>,
908 sequences: Option<Arc<UInt64Vector>>,
909 op_types: Option<Arc<UInt8Vector>>,
910 fields: Vec<BatchColumn>,
911}
912
913impl BatchBuilder {
914 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
916 BatchBuilder {
917 primary_key,
918 timestamps: None,
919 sequences: None,
920 op_types: None,
921 fields: Vec::new(),
922 }
923 }
924
925 pub fn with_required_columns(
927 primary_key: Vec<u8>,
928 timestamps: VectorRef,
929 sequences: Arc<UInt64Vector>,
930 op_types: Arc<UInt8Vector>,
931 ) -> BatchBuilder {
932 BatchBuilder {
933 primary_key,
934 timestamps: Some(timestamps),
935 sequences: Some(sequences),
936 op_types: Some(op_types),
937 fields: Vec::new(),
938 }
939 }
940
941 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
943 self.fields = fields;
944 self
945 }
946
947 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
949 self.fields.push(column);
950 self
951 }
952
953 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
955 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
956 self.fields.push(BatchColumn {
957 column_id,
958 data: vector,
959 });
960
961 Ok(self)
962 }
963
964 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
966 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
967 ensure!(
968 vector.data_type().is_timestamp(),
969 InvalidBatchSnafu {
970 reason: format!("{:?} is not a timestamp type", vector.data_type()),
971 }
972 );
973
974 self.timestamps = Some(vector);
975 Ok(self)
976 }
977
978 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
980 ensure!(
981 *array.data_type() == arrow::datatypes::DataType::UInt64,
982 InvalidBatchSnafu {
983 reason: "sequence array is not UInt64 type",
984 }
985 );
986 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
988 self.sequences = Some(vector);
989
990 Ok(self)
991 }
992
993 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
995 ensure!(
996 *array.data_type() == arrow::datatypes::DataType::UInt8,
997 InvalidBatchSnafu {
998 reason: "sequence array is not UInt8 type",
999 }
1000 );
1001 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1003 self.op_types = Some(vector);
1004
1005 Ok(self)
1006 }
1007
1008 pub fn build(self) -> Result<Batch> {
1010 let timestamps = self.timestamps.context(InvalidBatchSnafu {
1011 reason: "missing timestamps",
1012 })?;
1013 let sequences = self.sequences.context(InvalidBatchSnafu {
1014 reason: "missing sequences",
1015 })?;
1016 let op_types = self.op_types.context(InvalidBatchSnafu {
1017 reason: "missing op_types",
1018 })?;
1019 assert_eq!(0, timestamps.null_count());
1022 assert_eq!(0, sequences.null_count());
1023 assert_eq!(0, op_types.null_count());
1024
1025 let ts_len = timestamps.len();
1026 ensure!(
1027 sequences.len() == ts_len,
1028 InvalidBatchSnafu {
1029 reason: format!(
1030 "sequence have different len {} != {}",
1031 sequences.len(),
1032 ts_len
1033 ),
1034 }
1035 );
1036 ensure!(
1037 op_types.len() == ts_len,
1038 InvalidBatchSnafu {
1039 reason: format!(
1040 "op type have different len {} != {}",
1041 op_types.len(),
1042 ts_len
1043 ),
1044 }
1045 );
1046 for column in &self.fields {
1047 ensure!(
1048 column.data.len() == ts_len,
1049 InvalidBatchSnafu {
1050 reason: format!(
1051 "column {} has different len {} != {}",
1052 column.column_id,
1053 column.data.len(),
1054 ts_len
1055 ),
1056 }
1057 );
1058 }
1059
1060 Ok(Batch {
1061 primary_key: self.primary_key,
1062 pk_values: None,
1063 timestamps,
1064 sequences,
1065 op_types,
1066 fields: self.fields,
1067 fields_idx: None,
1068 })
1069 }
1070}
1071
1072impl From<Batch> for BatchBuilder {
1073 fn from(batch: Batch) -> Self {
1074 Self {
1075 primary_key: batch.primary_key,
1076 timestamps: Some(batch.timestamps),
1077 sequences: Some(batch.sequences),
1078 op_types: Some(batch.op_types),
1079 fields: batch.fields,
1080 }
1081 }
1082}
1083
1084pub enum Source {
1088 Reader(BoxedBatchReader),
1090 Iter(BoxedBatchIterator),
1092 Stream(BoxedBatchStream),
1094}
1095
1096impl Source {
1097 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1099 match self {
1100 Source::Reader(reader) => reader.next_batch().await,
1101 Source::Iter(iter) => iter.next().transpose(),
1102 Source::Stream(stream) => stream.try_next().await,
1103 }
1104 }
1105}
1106
1107pub enum FlatSource {
1109 Iter(BoxedRecordBatchIterator),
1111 Stream(BoxedRecordBatchStream),
1113}
1114
1115impl FlatSource {
1116 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1118 match self {
1119 FlatSource::Iter(iter) => iter.next().transpose(),
1120 FlatSource::Stream(stream) => stream.try_next().await,
1121 }
1122 }
1123}
1124
1125#[async_trait]
1129pub trait BatchReader: Send {
1130 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1138}
1139
1140pub type BoxedBatchReader = Box<dyn BatchReader>;
1142
1143pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1145
1146pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1148
1149#[async_trait::async_trait]
1150impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1151 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1152 (**self).next_batch().await
1153 }
1154}
1155
1156#[derive(Debug, Default)]
1158pub(crate) struct ScannerMetrics {
1159 scan_cost: Duration,
1161 yield_cost: Duration,
1163 num_batches: usize,
1165 num_rows: usize,
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1172 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1173 use store_api::codec::PrimaryKeyEncoding;
1174 use store_api::storage::consts::ReservedColumnId;
1175
1176 use super::*;
1177 use crate::error::Error;
1178 use crate::test_util::new_batch_builder;
1179
1180 fn new_batch(
1181 timestamps: &[i64],
1182 sequences: &[u64],
1183 op_types: &[OpType],
1184 field: &[u64],
1185 ) -> Batch {
1186 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1187 .build()
1188 .unwrap()
1189 }
1190
1191 fn new_batch_with_u64_fields(
1192 timestamps: &[i64],
1193 sequences: &[u64],
1194 op_types: &[OpType],
1195 fields: &[(ColumnId, &[Option<u64>])],
1196 ) -> Batch {
1197 assert_eq!(timestamps.len(), sequences.len());
1198 assert_eq!(timestamps.len(), op_types.len());
1199 for (_, values) in fields {
1200 assert_eq!(timestamps.len(), values.len());
1201 }
1202
1203 let mut builder = BatchBuilder::new(b"test".to_vec());
1204 builder
1205 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1206 timestamps.iter().copied(),
1207 )))
1208 .unwrap()
1209 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1210 sequences.iter().copied(),
1211 )))
1212 .unwrap()
1213 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1214 op_types.iter().map(|v| *v as u8),
1215 )))
1216 .unwrap();
1217
1218 for (col_id, values) in fields {
1219 builder
1220 .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1221 .unwrap();
1222 }
1223
1224 builder.build().unwrap()
1225 }
1226
1227 fn new_batch_without_fields(
1228 timestamps: &[i64],
1229 sequences: &[u64],
1230 op_types: &[OpType],
1231 ) -> Batch {
1232 assert_eq!(timestamps.len(), sequences.len());
1233 assert_eq!(timestamps.len(), op_types.len());
1234
1235 let mut builder = BatchBuilder::new(b"test".to_vec());
1236 builder
1237 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1238 timestamps.iter().copied(),
1239 )))
1240 .unwrap()
1241 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1242 sequences.iter().copied(),
1243 )))
1244 .unwrap()
1245 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1246 op_types.iter().map(|v| *v as u8),
1247 )))
1248 .unwrap();
1249
1250 builder.build().unwrap()
1251 }
1252
1253 #[test]
1254 fn test_empty_batch() {
1255 let batch = Batch::empty();
1256 assert!(batch.is_empty());
1257 assert_eq!(None, batch.first_timestamp());
1258 assert_eq!(None, batch.last_timestamp());
1259 assert_eq!(None, batch.first_sequence());
1260 assert_eq!(None, batch.last_sequence());
1261 assert!(batch.timestamps_native().is_none());
1262 }
1263
1264 #[test]
1265 fn test_first_last_one() {
1266 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1267 assert_eq!(
1268 Timestamp::new_millisecond(1),
1269 batch.first_timestamp().unwrap()
1270 );
1271 assert_eq!(
1272 Timestamp::new_millisecond(1),
1273 batch.last_timestamp().unwrap()
1274 );
1275 assert_eq!(2, batch.first_sequence().unwrap());
1276 assert_eq!(2, batch.last_sequence().unwrap());
1277 }
1278
1279 #[test]
1280 fn test_first_last_multiple() {
1281 let batch = new_batch(
1282 &[1, 2, 3],
1283 &[11, 12, 13],
1284 &[OpType::Put, OpType::Put, OpType::Put],
1285 &[21, 22, 23],
1286 );
1287 assert_eq!(
1288 Timestamp::new_millisecond(1),
1289 batch.first_timestamp().unwrap()
1290 );
1291 assert_eq!(
1292 Timestamp::new_millisecond(3),
1293 batch.last_timestamp().unwrap()
1294 );
1295 assert_eq!(11, batch.first_sequence().unwrap());
1296 assert_eq!(13, batch.last_sequence().unwrap());
1297 }
1298
1299 #[test]
1300 fn test_slice() {
1301 let batch = new_batch(
1302 &[1, 2, 3, 4],
1303 &[11, 12, 13, 14],
1304 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1305 &[21, 22, 23, 24],
1306 );
1307 let batch = batch.slice(1, 2);
1308 let expect = new_batch(
1309 &[2, 3],
1310 &[12, 13],
1311 &[OpType::Delete, OpType::Put],
1312 &[22, 23],
1313 );
1314 assert_eq!(expect, batch);
1315 }
1316
1317 #[test]
1318 fn test_timestamps_native() {
1319 let batch = new_batch(
1320 &[1, 2, 3, 4],
1321 &[11, 12, 13, 14],
1322 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1323 &[21, 22, 23, 24],
1324 );
1325 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1326 }
1327
1328 #[test]
1329 fn test_concat_empty() {
1330 let err = Batch::concat(vec![]).unwrap_err();
1331 assert!(
1332 matches!(err, Error::InvalidBatch { .. }),
1333 "unexpected err: {err}"
1334 );
1335 }
1336
1337 #[test]
1338 fn test_concat_one() {
1339 let batch = new_batch(&[], &[], &[], &[]);
1340 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1341 assert_eq!(batch, actual);
1342
1343 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1344 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1345 assert_eq!(batch, actual);
1346 }
1347
1348 #[test]
1349 fn test_concat_multiple() {
1350 let batches = vec![
1351 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1352 new_batch(
1353 &[3, 4, 5],
1354 &[13, 14, 15],
1355 &[OpType::Put, OpType::Delete, OpType::Put],
1356 &[23, 24, 25],
1357 ),
1358 new_batch(&[], &[], &[], &[]),
1359 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1360 ];
1361 let batch = Batch::concat(batches).unwrap();
1362 let expect = new_batch(
1363 &[1, 2, 3, 4, 5, 6],
1364 &[11, 12, 13, 14, 15, 16],
1365 &[
1366 OpType::Put,
1367 OpType::Put,
1368 OpType::Put,
1369 OpType::Delete,
1370 OpType::Put,
1371 OpType::Put,
1372 ],
1373 &[21, 22, 23, 24, 25, 26],
1374 );
1375 assert_eq!(expect, batch);
1376 }
1377
1378 #[test]
1379 fn test_concat_different() {
1380 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1381 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1382 batch2.primary_key = b"hello".to_vec();
1383 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1384 assert!(
1385 matches!(err, Error::InvalidBatch { .. }),
1386 "unexpected err: {err}"
1387 );
1388 }
1389
1390 #[test]
1391 fn test_concat_different_fields() {
1392 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1393 let fields = vec![
1394 batch1.fields()[0].clone(),
1395 BatchColumn {
1396 column_id: 2,
1397 data: Arc::new(UInt64Vector::from_slice([2])),
1398 },
1399 ];
1400 let batch2 = batch1.clone().with_fields(fields).unwrap();
1402 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1403 assert!(
1404 matches!(err, Error::InvalidBatch { .. }),
1405 "unexpected err: {err}"
1406 );
1407
1408 let fields = vec![BatchColumn {
1410 column_id: 2,
1411 data: Arc::new(UInt64Vector::from_slice([2])),
1412 }];
1413 let batch2 = batch1.clone().with_fields(fields).unwrap();
1414 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1415 assert!(
1416 matches!(err, Error::InvalidBatch { .. }),
1417 "unexpected err: {err}"
1418 );
1419 }
1420
1421 #[test]
1422 fn test_filter_deleted_empty() {
1423 let mut batch = new_batch(&[], &[], &[], &[]);
1424 batch.filter_deleted().unwrap();
1425 assert!(batch.is_empty());
1426 }
1427
1428 #[test]
1429 fn test_filter_deleted() {
1430 let mut batch = new_batch(
1431 &[1, 2, 3, 4],
1432 &[11, 12, 13, 14],
1433 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1434 &[21, 22, 23, 24],
1435 );
1436 batch.filter_deleted().unwrap();
1437 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1438 assert_eq!(expect, batch);
1439
1440 let mut batch = new_batch(
1441 &[1, 2, 3, 4],
1442 &[11, 12, 13, 14],
1443 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1444 &[21, 22, 23, 24],
1445 );
1446 let expect = batch.clone();
1447 batch.filter_deleted().unwrap();
1448 assert_eq!(expect, batch);
1449 }
1450
1451 #[test]
1452 fn test_filter_by_sequence() {
1453 let mut batch = new_batch(
1455 &[1, 2, 3, 4],
1456 &[11, 12, 13, 14],
1457 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1458 &[21, 22, 23, 24],
1459 );
1460 batch
1461 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1462 .unwrap();
1463 let expect = new_batch(
1464 &[1, 2, 3],
1465 &[11, 12, 13],
1466 &[OpType::Put, OpType::Put, OpType::Put],
1467 &[21, 22, 23],
1468 );
1469 assert_eq!(expect, batch);
1470
1471 let mut batch = new_batch(
1473 &[1, 2, 3, 4],
1474 &[11, 12, 13, 14],
1475 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1476 &[21, 22, 23, 24],
1477 );
1478
1479 batch
1480 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1481 .unwrap();
1482 assert!(batch.is_empty());
1483
1484 let mut batch = new_batch(
1486 &[1, 2, 3, 4],
1487 &[11, 12, 13, 14],
1488 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1489 &[21, 22, 23, 24],
1490 );
1491 let expect = batch.clone();
1492 batch.filter_by_sequence(None).unwrap();
1493 assert_eq!(expect, batch);
1494
1495 let mut batch = new_batch(&[], &[], &[], &[]);
1497 batch
1498 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1499 .unwrap();
1500 assert!(batch.is_empty());
1501
1502 let mut batch = new_batch(&[], &[], &[], &[]);
1504 batch.filter_by_sequence(None).unwrap();
1505 assert!(batch.is_empty());
1506
1507 let mut batch = new_batch(
1509 &[1, 2, 3, 4],
1510 &[11, 12, 13, 14],
1511 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1512 &[21, 22, 23, 24],
1513 );
1514 batch
1515 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1516 .unwrap();
1517 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1518 assert_eq!(expect, batch);
1519
1520 let mut batch = new_batch(
1522 &[1, 2, 3, 4],
1523 &[11, 12, 13, 14],
1524 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1525 &[21, 22, 23, 24],
1526 );
1527 batch
1528 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1529 .unwrap();
1530 assert!(batch.is_empty());
1531
1532 let mut batch = new_batch(
1534 &[1, 2, 3, 4, 5],
1535 &[11, 12, 13, 14, 15],
1536 &[
1537 OpType::Put,
1538 OpType::Put,
1539 OpType::Put,
1540 OpType::Put,
1541 OpType::Put,
1542 ],
1543 &[21, 22, 23, 24, 25],
1544 );
1545 batch
1546 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1547 .unwrap();
1548 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1549 assert_eq!(expect, batch);
1550
1551 let mut batch = new_batch(
1553 &[1, 2, 3, 4, 5],
1554 &[11, 12, 13, 14, 15],
1555 &[
1556 OpType::Put,
1557 OpType::Delete,
1558 OpType::Put,
1559 OpType::Delete,
1560 OpType::Put,
1561 ],
1562 &[21, 22, 23, 24, 25],
1563 );
1564 batch
1565 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1566 .unwrap();
1567 let expect = new_batch(
1568 &[2, 3],
1569 &[12, 13],
1570 &[OpType::Delete, OpType::Put],
1571 &[22, 23],
1572 );
1573 assert_eq!(expect, batch);
1574
1575 let mut batch = new_batch(
1577 &[1, 2, 3, 4],
1578 &[11, 12, 13, 14],
1579 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1580 &[21, 22, 23, 24],
1581 );
1582 batch
1583 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1584 .unwrap();
1585 assert!(batch.is_empty());
1586 }
1587
1588 #[test]
1589 fn test_merge_last_non_null_no_dup() {
1590 let mut batch = new_batch_with_u64_fields(
1591 &[1, 2],
1592 &[2, 1],
1593 &[OpType::Put, OpType::Put],
1594 &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1595 );
1596 let expect = batch.clone();
1597 batch.merge_last_non_null().unwrap();
1598 assert_eq!(expect, batch);
1599 }
1600
1601 #[test]
1602 fn test_merge_last_non_null_fill_null_fields() {
1603 let mut batch = new_batch_with_u64_fields(
1605 &[1, 1, 1],
1606 &[3, 2, 1],
1607 &[OpType::Put, OpType::Put, OpType::Put],
1608 &[
1609 (1, &[None, Some(10), Some(11)]),
1610 (2, &[Some(100), Some(200), Some(300)]),
1611 ],
1612 );
1613 batch.merge_last_non_null().unwrap();
1614
1615 let expect = new_batch_with_u64_fields(
1618 &[1],
1619 &[3],
1620 &[OpType::Put],
1621 &[(1, &[Some(10)]), (2, &[Some(100)])],
1622 );
1623 assert_eq!(expect, batch);
1624 }
1625
1626 #[test]
1627 fn test_merge_last_non_null_stop_at_delete_row() {
1628 let mut batch = new_batch_with_u64_fields(
1631 &[1, 1, 1],
1632 &[3, 2, 1],
1633 &[OpType::Put, OpType::Delete, OpType::Put],
1634 &[
1635 (1, &[None, Some(10), Some(11)]),
1636 (2, &[Some(100), Some(200), Some(300)]),
1637 ],
1638 );
1639 batch.merge_last_non_null().unwrap();
1640
1641 let expect = new_batch_with_u64_fields(
1642 &[1],
1643 &[3],
1644 &[OpType::Put],
1645 &[(1, &[None]), (2, &[Some(100)])],
1646 );
1647 assert_eq!(expect, batch);
1648 }
1649
1650 #[test]
1651 fn test_merge_last_non_null_base_delete_no_merge() {
1652 let mut batch = new_batch_with_u64_fields(
1653 &[1, 1],
1654 &[3, 2],
1655 &[OpType::Delete, OpType::Put],
1656 &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1657 );
1658 batch.merge_last_non_null().unwrap();
1659
1660 let expect =
1662 new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1663 assert_eq!(expect, batch);
1664 }
1665
1666 #[test]
1667 fn test_merge_last_non_null_multiple_timestamp_groups() {
1668 let mut batch = new_batch_with_u64_fields(
1669 &[1, 1, 2, 3, 3],
1670 &[5, 4, 3, 2, 1],
1671 &[
1672 OpType::Put,
1673 OpType::Put,
1674 OpType::Put,
1675 OpType::Put,
1676 OpType::Put,
1677 ],
1678 &[
1679 (1, &[None, Some(10), Some(20), None, Some(30)]),
1680 (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1681 ],
1682 );
1683 batch.merge_last_non_null().unwrap();
1684
1685 let expect = new_batch_with_u64_fields(
1686 &[1, 2, 3],
1687 &[5, 3, 2],
1688 &[OpType::Put, OpType::Put, OpType::Put],
1689 &[
1690 (1, &[Some(10), Some(20), Some(30)]),
1691 (2, &[Some(100), Some(120), Some(130)]),
1692 ],
1693 );
1694 assert_eq!(expect, batch);
1695 }
1696
1697 #[test]
1698 fn test_merge_last_non_null_no_fields() {
1699 let mut batch = new_batch_without_fields(
1700 &[1, 1, 2],
1701 &[3, 2, 1],
1702 &[OpType::Put, OpType::Put, OpType::Put],
1703 );
1704 batch.merge_last_non_null().unwrap();
1705
1706 let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1707 assert_eq!(expect, batch);
1708 }
1709
1710 #[test]
1711 fn test_filter() {
1712 let mut batch = new_batch(
1714 &[1, 2, 3, 4],
1715 &[11, 12, 13, 14],
1716 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1717 &[21, 22, 23, 24],
1718 );
1719 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1720 batch.filter(&predicate).unwrap();
1721 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1722 assert_eq!(expect, batch);
1723
1724 let mut batch = new_batch(
1726 &[1, 2, 3, 4],
1727 &[11, 12, 13, 14],
1728 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1729 &[21, 22, 23, 24],
1730 );
1731 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1732 batch.filter(&predicate).unwrap();
1733 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1734 assert_eq!(expect, batch);
1735
1736 let predicate = BooleanVector::from_vec(vec![false, false]);
1738 batch.filter(&predicate).unwrap();
1739 assert!(batch.is_empty());
1740 }
1741
1742 #[test]
1743 fn test_sort_and_dedup() {
1744 let original = new_batch(
1745 &[2, 3, 1, 4, 5, 2],
1746 &[1, 2, 3, 4, 5, 6],
1747 &[
1748 OpType::Put,
1749 OpType::Put,
1750 OpType::Put,
1751 OpType::Put,
1752 OpType::Put,
1753 OpType::Put,
1754 ],
1755 &[21, 22, 23, 24, 25, 26],
1756 );
1757
1758 let mut batch = original.clone();
1759 batch.sort(true).unwrap();
1760 assert_eq!(
1762 new_batch(
1763 &[1, 2, 3, 4, 5],
1764 &[3, 6, 2, 4, 5],
1765 &[
1766 OpType::Put,
1767 OpType::Put,
1768 OpType::Put,
1769 OpType::Put,
1770 OpType::Put,
1771 ],
1772 &[23, 26, 22, 24, 25],
1773 ),
1774 batch
1775 );
1776
1777 let mut batch = original.clone();
1778 batch.sort(false).unwrap();
1779
1780 assert_eq!(
1782 new_batch(
1783 &[1, 2, 2, 3, 4, 5],
1784 &[3, 6, 1, 2, 4, 5],
1785 &[
1786 OpType::Put,
1787 OpType::Put,
1788 OpType::Put,
1789 OpType::Put,
1790 OpType::Put,
1791 OpType::Put,
1792 ],
1793 &[23, 26, 21, 22, 24, 25],
1794 ),
1795 batch
1796 );
1797
1798 let original = new_batch(
1799 &[2, 2, 1],
1800 &[1, 6, 1],
1801 &[OpType::Delete, OpType::Put, OpType::Put],
1802 &[21, 22, 23],
1803 );
1804
1805 let mut batch = original.clone();
1806 batch.sort(true).unwrap();
1807 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1808 assert_eq!(expect, batch);
1809
1810 let mut batch = original.clone();
1811 batch.sort(false).unwrap();
1812 let expect = new_batch(
1813 &[1, 2, 2],
1814 &[1, 6, 1],
1815 &[OpType::Put, OpType::Put, OpType::Delete],
1816 &[23, 22, 21],
1817 );
1818 assert_eq!(expect, batch);
1819 }
1820
1821 #[test]
1822 fn test_get_value() {
1823 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1824
1825 for encoding in encodings {
1826 let codec = build_primary_key_codec_with_fields(
1827 encoding,
1828 [
1829 (
1830 ReservedColumnId::table_id(),
1831 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1832 ),
1833 (
1834 ReservedColumnId::tsid(),
1835 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1836 ),
1837 (
1838 100,
1839 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1840 ),
1841 (
1842 200,
1843 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1844 ),
1845 ]
1846 .into_iter(),
1847 );
1848
1849 let values = [
1850 Value::UInt32(1000),
1851 Value::UInt64(2000),
1852 Value::String("abcdefgh".into()),
1853 Value::String("zyxwvu".into()),
1854 ];
1855 let mut buf = vec![];
1856 codec
1857 .encode_values(
1858 &[
1859 (ReservedColumnId::table_id(), values[0].clone()),
1860 (ReservedColumnId::tsid(), values[1].clone()),
1861 (100, values[2].clone()),
1862 (200, values[3].clone()),
1863 ],
1864 &mut buf,
1865 )
1866 .unwrap();
1867
1868 let field_col_id = 2;
1869 let mut batch = new_batch_builder(
1870 &buf,
1871 &[1, 2, 3],
1872 &[1, 1, 1],
1873 &[OpType::Put, OpType::Put, OpType::Put],
1874 field_col_id,
1875 &[42, 43, 44],
1876 )
1877 .build()
1878 .unwrap();
1879
1880 let v = batch
1881 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1882 .unwrap()
1883 .unwrap();
1884 assert_eq!(values[0], *v);
1885
1886 let v = batch
1887 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1888 .unwrap()
1889 .unwrap();
1890 assert_eq!(values[1], *v);
1891
1892 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1893 assert_eq!(values[2], *v);
1894
1895 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1896 assert_eq!(values[3], *v);
1897
1898 let v = batch.field_col_value(field_col_id).unwrap();
1899 assert_eq!(v.data.get(0), Value::UInt64(42));
1900 assert_eq!(v.data.get(1), Value::UInt64(43));
1901 assert_eq!(v.data.get(2), Value::UInt64(44));
1902 }
1903 }
1904}