1pub mod compat;
18pub mod dedup;
19pub mod flat_dedup;
20pub mod flat_merge;
21pub mod flat_projection;
22pub mod last_row;
23pub mod merge;
24pub mod plain_batch;
25pub mod projection;
26pub(crate) mod prune;
27pub(crate) mod pruner;
28pub mod range;
29pub mod scan_region;
30pub mod scan_util;
31pub(crate) mod seq_scan;
32pub mod series_scan;
33pub mod stream;
34pub(crate) mod unordered_scan;
35
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::time::Duration;
39
40use api::v1::OpType;
41use async_trait::async_trait;
42use common_time::Timestamp;
43use datafusion_common::arrow::array::UInt8Array;
44use datatypes::arrow;
45use datatypes::arrow::array::{Array, ArrayRef};
46use datatypes::arrow::compute::SortOptions;
47use datatypes::arrow::record_batch::RecordBatch;
48use datatypes::arrow::row::{RowConverter, SortField};
49use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
50use datatypes::scalars::ScalarVectorBuilder;
51use datatypes::types::TimestampType;
52use datatypes::value::{Value, ValueRef};
53use datatypes::vectors::{
54 BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
55 TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
56 UInt8Vector, UInt8VectorBuilder, UInt32Vector, UInt64Vector, UInt64VectorBuilder, Vector,
57 VectorRef,
58};
59use futures::TryStreamExt;
60use futures::stream::BoxStream;
61use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
62use snafu::{OptionExt, ResultExt, ensure};
63use store_api::metadata::RegionMetadata;
64use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
65
66use crate::error::{
67 ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
68 Result,
69};
70use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
71use crate::read::prune::PruneReader;
72
73#[derive(Debug, PartialEq, Clone)]
78pub struct Batch {
79 primary_key: Vec<u8>,
81 pk_values: Option<CompositeValues>,
83 timestamps: VectorRef,
85 sequences: Arc<UInt64Vector>,
89 op_types: Arc<UInt8Vector>,
93 fields: Vec<BatchColumn>,
95 fields_idx: Option<HashMap<ColumnId, usize>>,
97}
98
99impl Batch {
100 pub fn new(
102 primary_key: Vec<u8>,
103 timestamps: VectorRef,
104 sequences: Arc<UInt64Vector>,
105 op_types: Arc<UInt8Vector>,
106 fields: Vec<BatchColumn>,
107 ) -> Result<Batch> {
108 BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
109 .with_fields(fields)
110 .build()
111 }
112
113 pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
115 Batch::new(
116 self.primary_key,
117 self.timestamps,
118 self.sequences,
119 self.op_types,
120 fields,
121 )
122 }
123
124 pub fn primary_key(&self) -> &[u8] {
126 &self.primary_key
127 }
128
129 pub fn pk_values(&self) -> Option<&CompositeValues> {
131 self.pk_values.as_ref()
132 }
133
134 pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
136 self.pk_values = Some(pk_values);
137 }
138
139 #[cfg(any(test, feature = "test"))]
141 pub fn remove_pk_values(&mut self) {
142 self.pk_values = None;
143 }
144
145 pub fn fields(&self) -> &[BatchColumn] {
147 &self.fields
148 }
149
150 pub fn timestamps(&self) -> &VectorRef {
152 &self.timestamps
153 }
154
155 pub fn sequences(&self) -> &Arc<UInt64Vector> {
157 &self.sequences
158 }
159
160 pub fn op_types(&self) -> &Arc<UInt8Vector> {
162 &self.op_types
163 }
164
165 pub fn num_rows(&self) -> usize {
167 self.sequences.len()
170 }
171
172 pub(crate) fn empty() -> Self {
174 Self {
175 primary_key: vec![],
176 pk_values: None,
177 timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
178 sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
179 op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
180 fields: vec![],
181 fields_idx: None,
182 }
183 }
184
185 pub fn is_empty(&self) -> bool {
187 self.num_rows() == 0
188 }
189
190 pub fn first_timestamp(&self) -> Option<Timestamp> {
192 if self.timestamps.is_empty() {
193 return None;
194 }
195
196 Some(self.get_timestamp(0))
197 }
198
199 pub fn last_timestamp(&self) -> Option<Timestamp> {
201 if self.timestamps.is_empty() {
202 return None;
203 }
204
205 Some(self.get_timestamp(self.timestamps.len() - 1))
206 }
207
208 pub fn first_sequence(&self) -> Option<SequenceNumber> {
210 if self.sequences.is_empty() {
211 return None;
212 }
213
214 Some(self.get_sequence(0))
215 }
216
217 pub fn last_sequence(&self) -> Option<SequenceNumber> {
219 if self.sequences.is_empty() {
220 return None;
221 }
222
223 Some(self.get_sequence(self.sequences.len() - 1))
224 }
225
226 pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
231 self.primary_key = primary_key;
232 }
233
234 pub fn slice(&self, offset: usize, length: usize) -> Batch {
239 let fields = self
240 .fields
241 .iter()
242 .map(|column| BatchColumn {
243 column_id: column.column_id,
244 data: column.data.slice(offset, length),
245 })
246 .collect();
247 Batch {
249 primary_key: self.primary_key.clone(),
252 pk_values: self.pk_values.clone(),
253 timestamps: self.timestamps.slice(offset, length),
254 sequences: Arc::new(self.sequences.get_slice(offset, length)),
255 op_types: Arc::new(self.op_types.get_slice(offset, length)),
256 fields,
257 fields_idx: self.fields_idx.clone(),
258 }
259 }
260
261 pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
265 ensure!(
266 !batches.is_empty(),
267 InvalidBatchSnafu {
268 reason: "empty batches",
269 }
270 );
271 if batches.len() == 1 {
272 return Ok(batches.pop().unwrap());
274 }
275
276 let primary_key = std::mem::take(&mut batches[0].primary_key);
277 let first = &batches[0];
278 ensure!(
280 batches
281 .iter()
282 .skip(1)
283 .all(|b| b.primary_key() == primary_key),
284 InvalidBatchSnafu {
285 reason: "batches have different primary key",
286 }
287 );
288 for b in batches.iter().skip(1) {
289 ensure!(
290 b.fields.len() == first.fields.len(),
291 InvalidBatchSnafu {
292 reason: "batches have different field num",
293 }
294 );
295 for (l, r) in b.fields.iter().zip(&first.fields) {
296 ensure!(
297 l.column_id == r.column_id,
298 InvalidBatchSnafu {
299 reason: "batches have different fields",
300 }
301 );
302 }
303 }
304
305 let mut builder = BatchBuilder::new(primary_key);
307 let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
309 builder.timestamps_array(array)?;
310 let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
311 builder.sequences_array(array)?;
312 let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
313 builder.op_types_array(array)?;
314 for (i, batch_column) in first.fields.iter().enumerate() {
315 let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
316 builder.push_field_array(batch_column.column_id, array)?;
317 }
318
319 builder.build()
320 }
321
322 pub fn filter_deleted(&mut self) -> Result<()> {
324 let array = self.op_types.as_arrow();
326 let rhs = UInt8Array::new_scalar(OpType::Delete as u8);
328 let predicate =
329 arrow::compute::kernels::cmp::neq(array, &rhs).context(ComputeArrowSnafu)?;
330 self.filter(&BooleanVector::from(predicate))
331 }
332
333 pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
336 self.timestamps = self
337 .timestamps
338 .filter(predicate)
339 .context(ComputeVectorSnafu)?;
340 self.sequences = Arc::new(
341 UInt64Vector::try_from_arrow_array(
342 arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
343 .context(ComputeArrowSnafu)?,
344 )
345 .unwrap(),
346 );
347 self.op_types = Arc::new(
348 UInt8Vector::try_from_arrow_array(
349 arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
350 .context(ComputeArrowSnafu)?,
351 )
352 .unwrap(),
353 );
354 for batch_column in &mut self.fields {
355 batch_column.data = batch_column
356 .data
357 .filter(predicate)
358 .context(ComputeVectorSnafu)?;
359 }
360
361 Ok(())
362 }
363
364 pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
366 let seq_range = match sequence {
367 None => return Ok(()),
368 Some(seq_range) => {
369 let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
370 else {
371 return Ok(());
372 };
373 let is_subset = match seq_range {
374 SequenceRange::Gt { min } => min < first,
375 SequenceRange::LtEq { max } => max >= last,
376 SequenceRange::GtLtEq { min, max } => min < first && max >= last,
377 };
378 if is_subset {
379 return Ok(());
380 }
381 seq_range
382 }
383 };
384
385 let seqs = self.sequences.as_arrow();
386 let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
387
388 let predicate = BooleanVector::from(predicate);
389 self.filter(&predicate)?;
390
391 Ok(())
392 }
393
394 pub fn sort(&mut self, dedup: bool) -> Result<()> {
401 let converter = RowConverter::new(vec![
404 SortField::new(self.timestamps.data_type().as_arrow_type()),
405 SortField::new_with_options(
406 self.sequences.data_type().as_arrow_type(),
407 SortOptions {
408 descending: true,
409 ..Default::default()
410 },
411 ),
412 ])
413 .context(ComputeArrowSnafu)?;
414 let columns = [
416 self.timestamps.to_arrow_array(),
417 self.sequences.to_arrow_array(),
418 ];
419 let rows = converter.convert_columns(&columns).unwrap();
420 let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
421
422 let was_sorted = to_sort.is_sorted_by_key(|x| x.1);
423 if !was_sorted {
424 to_sort.sort_unstable_by_key(|x| x.1);
425 }
426
427 let num_rows = to_sort.len();
428 if dedup {
429 to_sort.dedup_by(|left, right| {
431 debug_assert_eq!(18, left.1.as_ref().len());
432 debug_assert_eq!(18, right.1.as_ref().len());
433 let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
434 left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
436 });
437 }
438 let no_dedup = to_sort.len() == num_rows;
439
440 if was_sorted && no_dedup {
441 return Ok(());
442 }
443 let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
444 self.take_in_place(&indices)
445 }
446
447 pub(crate) fn merge_last_non_null(&mut self) -> Result<()> {
455 let num_rows = self.num_rows();
456 if num_rows < 2 {
457 return Ok(());
458 }
459
460 let Some(timestamps) = self.timestamps_native() else {
461 return Ok(());
462 };
463
464 let mut has_dup = false;
466 let mut group_count = 1;
467 for i in 1..num_rows {
468 has_dup |= timestamps[i] == timestamps[i - 1];
469 group_count += (timestamps[i] != timestamps[i - 1]) as usize;
470 }
471 if !has_dup {
472 return Ok(());
473 }
474
475 let num_fields = self.fields.len();
476 let op_types = self.op_types.as_arrow().values();
477
478 let mut base_indices: Vec<u32> = Vec::with_capacity(group_count);
479 let mut field_indices: Vec<Vec<u32>> = (0..num_fields)
480 .map(|_| Vec::with_capacity(group_count))
481 .collect();
482
483 let mut start = 0;
484 while start < num_rows {
485 let ts = timestamps[start];
486 let mut end = start + 1;
487 while end < num_rows && timestamps[end] == ts {
488 end += 1;
489 }
490
491 let group_pos = base_indices.len();
492 base_indices.push(start as u32);
493
494 if num_fields > 0 {
495 for idx in &mut field_indices {
497 idx.push(start as u32);
498 }
499
500 let base_deleted = op_types[start] == OpType::Delete as u8;
501 if !base_deleted {
502 let mut missing_fields = Vec::new();
505 for (field_idx, col) in self.fields.iter().enumerate() {
506 if col.data.is_null(start) {
507 missing_fields.push(field_idx);
508 }
509 }
510
511 if !missing_fields.is_empty() {
512 for row_idx in (start + 1)..end {
513 if op_types[row_idx] == OpType::Delete as u8 {
514 break;
515 }
516
517 missing_fields.retain(|&field_idx| {
518 if self.fields[field_idx].data.is_null(row_idx) {
519 true
520 } else {
521 field_indices[field_idx][group_pos] = row_idx as u32;
522 false
523 }
524 });
525
526 if missing_fields.is_empty() {
527 break;
528 }
529 }
530 }
531 }
532 }
533
534 start = end;
535 }
536
537 let base_indices = UInt32Vector::from_vec(base_indices);
538 self.timestamps = self
539 .timestamps
540 .take(&base_indices)
541 .context(ComputeVectorSnafu)?;
542 let array = arrow::compute::take(self.sequences.as_arrow(), base_indices.as_arrow(), None)
543 .context(ComputeArrowSnafu)?;
544 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
546 let array = arrow::compute::take(self.op_types.as_arrow(), base_indices.as_arrow(), None)
547 .context(ComputeArrowSnafu)?;
548 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
550
551 for (field_idx, batch_column) in self.fields.iter_mut().enumerate() {
552 let idx = UInt32Vector::from_vec(std::mem::take(&mut field_indices[field_idx]));
553 batch_column.data = batch_column.data.take(&idx).context(ComputeVectorSnafu)?;
554 }
555
556 Ok(())
557 }
558
559 pub fn memory_size(&self) -> usize {
561 let mut size = std::mem::size_of::<Self>();
562 size += self.primary_key.len();
563 size += self.timestamps.memory_size();
564 size += self.sequences.memory_size();
565 size += self.op_types.memory_size();
566 for batch_column in &self.fields {
567 size += batch_column.data.memory_size();
568 }
569 size
570 }
571
572 pub(crate) fn projected_fields(
574 metadata: &RegionMetadata,
575 projection: &[ColumnId],
576 ) -> Vec<(ColumnId, ConcreteDataType)> {
577 let projected_ids: HashSet<_> = projection.iter().copied().collect();
578 metadata
579 .field_columns()
580 .filter_map(|column| {
581 if projected_ids.contains(&column.column_id) {
582 Some((column.column_id, column.column_schema.data_type.clone()))
583 } else {
584 None
585 }
586 })
587 .collect()
588 }
589
590 pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
592 if self.timestamps.is_empty() {
593 return None;
594 }
595
596 let values = match self.timestamps.data_type() {
597 ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
598 .timestamps
599 .as_any()
600 .downcast_ref::<TimestampSecondVector>()
601 .unwrap()
602 .as_arrow()
603 .values(),
604 ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
605 .timestamps
606 .as_any()
607 .downcast_ref::<TimestampMillisecondVector>()
608 .unwrap()
609 .as_arrow()
610 .values(),
611 ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
612 .timestamps
613 .as_any()
614 .downcast_ref::<TimestampMicrosecondVector>()
615 .unwrap()
616 .as_arrow()
617 .values(),
618 ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
619 .timestamps
620 .as_any()
621 .downcast_ref::<TimestampNanosecondVector>()
622 .unwrap()
623 .as_arrow()
624 .values(),
625 other => panic!("timestamps in a Batch has other type {:?}", other),
626 };
627
628 Some(values)
629 }
630
631 fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
633 self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
634 let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
635 .context(ComputeArrowSnafu)?;
636 self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
638 let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
639 .context(ComputeArrowSnafu)?;
640 self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
641 for batch_column in &mut self.fields {
642 batch_column.data = batch_column
643 .data
644 .take(indices)
645 .context(ComputeVectorSnafu)?;
646 }
647
648 Ok(())
649 }
650
651 fn get_timestamp(&self, index: usize) -> Timestamp {
656 match self.timestamps.get_ref(index) {
657 ValueRef::Timestamp(timestamp) => timestamp,
658
659 value => panic!("{:?} is not a timestamp", value),
661 }
662 }
663
664 pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
669 self.sequences.get_data(index).unwrap()
671 }
672
673 #[cfg(debug_assertions)]
675 pub(crate) fn check_monotonic(&self) -> Result<(), String> {
676 use std::cmp::Ordering;
677 if self.timestamps_native().is_none() {
678 return Ok(());
679 }
680
681 let timestamps = self.timestamps_native().unwrap();
682 let sequences = self.sequences.as_arrow().values();
683 for (i, window) in timestamps.windows(2).enumerate() {
684 let current = window[0];
685 let next = window[1];
686 let current_sequence = sequences[i];
687 let next_sequence = sequences[i + 1];
688 match current.cmp(&next) {
689 Ordering::Less => {
690 continue;
692 }
693 Ordering::Equal => {
694 if current_sequence < next_sequence {
696 return Err(format!(
697 "sequence are not monotonic: ts {} == {} but current sequence {} < {}, index: {}",
698 current, next, current_sequence, next_sequence, i
699 ));
700 }
701 }
702 Ordering::Greater => {
703 return Err(format!(
705 "timestamps are not monotonic: {} > {}, index: {}",
706 current, next, i
707 ));
708 }
709 }
710 }
711
712 Ok(())
713 }
714
715 #[cfg(debug_assertions)]
717 pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> {
718 if self.primary_key() < other.primary_key() {
720 return Ok(());
721 }
722 if self.primary_key() > other.primary_key() {
723 return Err(format!(
724 "primary key is not monotonic: {:?} > {:?}",
725 self.primary_key(),
726 other.primary_key()
727 ));
728 }
729 if self.last_timestamp() < other.first_timestamp() {
731 return Ok(());
732 }
733 if self.last_timestamp() > other.first_timestamp() {
734 return Err(format!(
735 "timestamps are not monotonic: {:?} > {:?}",
736 self.last_timestamp(),
737 other.first_timestamp()
738 ));
739 }
740 if self.last_sequence() >= other.first_sequence() {
742 return Ok(());
743 }
744 Err(format!(
745 "sequences are not monotonic: {:?} < {:?}",
746 self.last_sequence(),
747 other.first_sequence()
748 ))
749 }
750
751 pub fn pk_col_value(
755 &mut self,
756 codec: &dyn PrimaryKeyCodec,
757 col_idx_in_pk: usize,
758 column_id: ColumnId,
759 ) -> Result<Option<&Value>> {
760 if self.pk_values.is_none() {
761 self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
762 }
763
764 let pk_values = self.pk_values.as_ref().unwrap();
765 Ok(match pk_values {
766 CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
767 CompositeValues::Sparse(values) => values.get(&column_id),
768 })
769 }
770
771 pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
775 if self.fields_idx.is_none() {
776 self.fields_idx = Some(
777 self.fields
778 .iter()
779 .enumerate()
780 .map(|(i, c)| (c.column_id, i))
781 .collect(),
782 );
783 }
784
785 self.fields_idx
786 .as_ref()
787 .unwrap()
788 .get(&column_id)
789 .map(|&idx| &self.fields[idx])
790 }
791}
792
793#[cfg(debug_assertions)]
795#[derive(Default)]
796pub(crate) struct BatchChecker {
797 last_batch: Option<Batch>,
798 start: Option<Timestamp>,
799 end: Option<Timestamp>,
800}
801
802#[cfg(debug_assertions)]
803impl BatchChecker {
804 pub(crate) fn with_start(mut self, start: Option<Timestamp>) -> Self {
806 self.start = start;
807 self
808 }
809
810 pub(crate) fn with_end(mut self, end: Option<Timestamp>) -> Self {
812 self.end = end;
813 self
814 }
815
816 pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> Result<(), String> {
819 batch.check_monotonic()?;
820
821 if let (Some(start), Some(first)) = (self.start, batch.first_timestamp())
822 && start > first
823 {
824 return Err(format!(
825 "batch's first timestamp is before the start timestamp: {:?} > {:?}",
826 start, first
827 ));
828 }
829 if let (Some(end), Some(last)) = (self.end, batch.last_timestamp())
830 && end <= last
831 {
832 return Err(format!(
833 "batch's last timestamp is after the end timestamp: {:?} <= {:?}",
834 end, last
835 ));
836 }
837
838 let res = self
841 .last_batch
842 .as_ref()
843 .map(|last| last.check_next_batch(batch))
844 .unwrap_or(Ok(()));
845 self.last_batch = Some(batch.clone());
846 res
847 }
848
849 pub(crate) fn format_batch(&self, batch: &Batch) -> String {
851 use std::fmt::Write;
852
853 let mut message = String::new();
854 if let Some(last) = &self.last_batch {
855 write!(
856 message,
857 "last_pk: {:?}, last_ts: {:?}, last_seq: {:?}, ",
858 last.primary_key(),
859 last.last_timestamp(),
860 last.last_sequence()
861 )
862 .unwrap();
863 }
864 write!(
865 message,
866 "batch_pk: {:?}, batch_ts: {:?}, batch_seq: {:?}",
867 batch.primary_key(),
868 batch.timestamps(),
869 batch.sequences()
870 )
871 .unwrap();
872
873 message
874 }
875
876 pub(crate) fn ensure_part_range_batch(
878 &mut self,
879 scanner: &str,
880 region_id: store_api::storage::RegionId,
881 partition: usize,
882 part_range: store_api::region_engine::PartitionRange,
883 batch: &Batch,
884 ) {
885 if let Err(e) = self.check_monotonic(batch) {
886 let err_msg = format!(
887 "{}: batch is not sorted, {}, region_id: {}, partition: {}, part_range: {:?}",
888 scanner, e, region_id, partition, part_range,
889 );
890 common_telemetry::error!("{err_msg}, {}", self.format_batch(batch));
891 panic!("{err_msg}, batch rows: {}", batch.num_rows());
893 }
894 }
895}
896
897const TIMESTAMP_KEY_LEN: usize = 9;
899
900fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
902 let arrays: Vec<_> = iter.collect();
903 let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
904 arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
905}
906
907#[derive(Debug, PartialEq, Eq, Clone)]
909pub struct BatchColumn {
910 pub column_id: ColumnId,
912 pub data: VectorRef,
914}
915
916pub struct BatchBuilder {
918 primary_key: Vec<u8>,
919 timestamps: Option<VectorRef>,
920 sequences: Option<Arc<UInt64Vector>>,
921 op_types: Option<Arc<UInt8Vector>>,
922 fields: Vec<BatchColumn>,
923}
924
925impl BatchBuilder {
926 pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
928 BatchBuilder {
929 primary_key,
930 timestamps: None,
931 sequences: None,
932 op_types: None,
933 fields: Vec::new(),
934 }
935 }
936
937 pub fn with_required_columns(
939 primary_key: Vec<u8>,
940 timestamps: VectorRef,
941 sequences: Arc<UInt64Vector>,
942 op_types: Arc<UInt8Vector>,
943 ) -> BatchBuilder {
944 BatchBuilder {
945 primary_key,
946 timestamps: Some(timestamps),
947 sequences: Some(sequences),
948 op_types: Some(op_types),
949 fields: Vec::new(),
950 }
951 }
952
953 pub fn with_fields(mut self, fields: Vec<BatchColumn>) -> Self {
955 self.fields = fields;
956 self
957 }
958
959 pub fn push_field(&mut self, column: BatchColumn) -> &mut Self {
961 self.fields.push(column);
962 self
963 }
964
965 pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
967 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
968 self.fields.push(BatchColumn {
969 column_id,
970 data: vector,
971 });
972
973 Ok(self)
974 }
975
976 pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
978 let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
979 ensure!(
980 vector.data_type().is_timestamp(),
981 InvalidBatchSnafu {
982 reason: format!("{:?} is not a timestamp type", vector.data_type()),
983 }
984 );
985
986 self.timestamps = Some(vector);
987 Ok(self)
988 }
989
990 pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
992 ensure!(
993 *array.data_type() == arrow::datatypes::DataType::UInt64,
994 InvalidBatchSnafu {
995 reason: "sequence array is not UInt64 type",
996 }
997 );
998 let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
1000 self.sequences = Some(vector);
1001
1002 Ok(self)
1003 }
1004
1005 pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
1007 ensure!(
1008 *array.data_type() == arrow::datatypes::DataType::UInt8,
1009 InvalidBatchSnafu {
1010 reason: "sequence array is not UInt8 type",
1011 }
1012 );
1013 let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
1015 self.op_types = Some(vector);
1016
1017 Ok(self)
1018 }
1019
1020 pub fn build(self) -> Result<Batch> {
1022 let timestamps = self.timestamps.context(InvalidBatchSnafu {
1023 reason: "missing timestamps",
1024 })?;
1025 let sequences = self.sequences.context(InvalidBatchSnafu {
1026 reason: "missing sequences",
1027 })?;
1028 let op_types = self.op_types.context(InvalidBatchSnafu {
1029 reason: "missing op_types",
1030 })?;
1031 assert_eq!(0, timestamps.null_count());
1034 assert_eq!(0, sequences.null_count());
1035 assert_eq!(0, op_types.null_count());
1036
1037 let ts_len = timestamps.len();
1038 ensure!(
1039 sequences.len() == ts_len,
1040 InvalidBatchSnafu {
1041 reason: format!(
1042 "sequence have different len {} != {}",
1043 sequences.len(),
1044 ts_len
1045 ),
1046 }
1047 );
1048 ensure!(
1049 op_types.len() == ts_len,
1050 InvalidBatchSnafu {
1051 reason: format!(
1052 "op type have different len {} != {}",
1053 op_types.len(),
1054 ts_len
1055 ),
1056 }
1057 );
1058 for column in &self.fields {
1059 ensure!(
1060 column.data.len() == ts_len,
1061 InvalidBatchSnafu {
1062 reason: format!(
1063 "column {} has different len {} != {}",
1064 column.column_id,
1065 column.data.len(),
1066 ts_len
1067 ),
1068 }
1069 );
1070 }
1071
1072 Ok(Batch {
1073 primary_key: self.primary_key,
1074 pk_values: None,
1075 timestamps,
1076 sequences,
1077 op_types,
1078 fields: self.fields,
1079 fields_idx: None,
1080 })
1081 }
1082}
1083
1084impl From<Batch> for BatchBuilder {
1085 fn from(batch: Batch) -> Self {
1086 Self {
1087 primary_key: batch.primary_key,
1088 timestamps: Some(batch.timestamps),
1089 sequences: Some(batch.sequences),
1090 op_types: Some(batch.op_types),
1091 fields: batch.fields,
1092 }
1093 }
1094}
1095
1096pub enum Source {
1100 Reader(BoxedBatchReader),
1102 Iter(BoxedBatchIterator),
1104 Stream(BoxedBatchStream),
1106 PruneReader(PruneReader),
1108}
1109
1110impl Source {
1111 pub async fn next_batch(&mut self) -> Result<Option<Batch>> {
1113 match self {
1114 Source::Reader(reader) => reader.next_batch().await,
1115 Source::Iter(iter) => iter.next().transpose(),
1116 Source::Stream(stream) => stream.try_next().await,
1117 Source::PruneReader(reader) => reader.next_batch().await,
1118 }
1119 }
1120}
1121
1122pub enum FlatSource {
1124 Iter(BoxedRecordBatchIterator),
1126 Stream(BoxedRecordBatchStream),
1128}
1129
1130impl FlatSource {
1131 pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
1133 match self {
1134 FlatSource::Iter(iter) => iter.next().transpose(),
1135 FlatSource::Stream(stream) => stream.try_next().await,
1136 }
1137 }
1138}
1139
1140#[async_trait]
1144pub trait BatchReader: Send {
1145 async fn next_batch(&mut self) -> Result<Option<Batch>>;
1153}
1154
1155pub type BoxedBatchReader = Box<dyn BatchReader>;
1157
1158pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
1160
1161pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
1163
1164#[async_trait::async_trait]
1165impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
1166 async fn next_batch(&mut self) -> Result<Option<Batch>> {
1167 (**self).next_batch().await
1168 }
1169}
1170
1171#[derive(Debug, Default)]
1173pub(crate) struct ScannerMetrics {
1174 scan_cost: Duration,
1176 yield_cost: Duration,
1178 num_batches: usize,
1180 num_rows: usize,
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
1187 use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
1188 use store_api::codec::PrimaryKeyEncoding;
1189 use store_api::storage::consts::ReservedColumnId;
1190
1191 use super::*;
1192 use crate::error::Error;
1193 use crate::test_util::new_batch_builder;
1194
1195 fn new_batch(
1196 timestamps: &[i64],
1197 sequences: &[u64],
1198 op_types: &[OpType],
1199 field: &[u64],
1200 ) -> Batch {
1201 new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
1202 .build()
1203 .unwrap()
1204 }
1205
1206 fn new_batch_with_u64_fields(
1207 timestamps: &[i64],
1208 sequences: &[u64],
1209 op_types: &[OpType],
1210 fields: &[(ColumnId, &[Option<u64>])],
1211 ) -> Batch {
1212 assert_eq!(timestamps.len(), sequences.len());
1213 assert_eq!(timestamps.len(), op_types.len());
1214 for (_, values) in fields {
1215 assert_eq!(timestamps.len(), values.len());
1216 }
1217
1218 let mut builder = BatchBuilder::new(b"test".to_vec());
1219 builder
1220 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1221 timestamps.iter().copied(),
1222 )))
1223 .unwrap()
1224 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1225 sequences.iter().copied(),
1226 )))
1227 .unwrap()
1228 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1229 op_types.iter().map(|v| *v as u8),
1230 )))
1231 .unwrap();
1232
1233 for (col_id, values) in fields {
1234 builder
1235 .push_field_array(*col_id, Arc::new(UInt64Array::from(values.to_vec())))
1236 .unwrap();
1237 }
1238
1239 builder.build().unwrap()
1240 }
1241
1242 fn new_batch_without_fields(
1243 timestamps: &[i64],
1244 sequences: &[u64],
1245 op_types: &[OpType],
1246 ) -> Batch {
1247 assert_eq!(timestamps.len(), sequences.len());
1248 assert_eq!(timestamps.len(), op_types.len());
1249
1250 let mut builder = BatchBuilder::new(b"test".to_vec());
1251 builder
1252 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
1253 timestamps.iter().copied(),
1254 )))
1255 .unwrap()
1256 .sequences_array(Arc::new(UInt64Array::from_iter_values(
1257 sequences.iter().copied(),
1258 )))
1259 .unwrap()
1260 .op_types_array(Arc::new(UInt8Array::from_iter_values(
1261 op_types.iter().map(|v| *v as u8),
1262 )))
1263 .unwrap();
1264
1265 builder.build().unwrap()
1266 }
1267
1268 #[test]
1269 fn test_empty_batch() {
1270 let batch = Batch::empty();
1271 assert!(batch.is_empty());
1272 assert_eq!(None, batch.first_timestamp());
1273 assert_eq!(None, batch.last_timestamp());
1274 assert_eq!(None, batch.first_sequence());
1275 assert_eq!(None, batch.last_sequence());
1276 assert!(batch.timestamps_native().is_none());
1277 }
1278
1279 #[test]
1280 fn test_first_last_one() {
1281 let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
1282 assert_eq!(
1283 Timestamp::new_millisecond(1),
1284 batch.first_timestamp().unwrap()
1285 );
1286 assert_eq!(
1287 Timestamp::new_millisecond(1),
1288 batch.last_timestamp().unwrap()
1289 );
1290 assert_eq!(2, batch.first_sequence().unwrap());
1291 assert_eq!(2, batch.last_sequence().unwrap());
1292 }
1293
1294 #[test]
1295 fn test_first_last_multiple() {
1296 let batch = new_batch(
1297 &[1, 2, 3],
1298 &[11, 12, 13],
1299 &[OpType::Put, OpType::Put, OpType::Put],
1300 &[21, 22, 23],
1301 );
1302 assert_eq!(
1303 Timestamp::new_millisecond(1),
1304 batch.first_timestamp().unwrap()
1305 );
1306 assert_eq!(
1307 Timestamp::new_millisecond(3),
1308 batch.last_timestamp().unwrap()
1309 );
1310 assert_eq!(11, batch.first_sequence().unwrap());
1311 assert_eq!(13, batch.last_sequence().unwrap());
1312 }
1313
1314 #[test]
1315 fn test_slice() {
1316 let batch = new_batch(
1317 &[1, 2, 3, 4],
1318 &[11, 12, 13, 14],
1319 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1320 &[21, 22, 23, 24],
1321 );
1322 let batch = batch.slice(1, 2);
1323 let expect = new_batch(
1324 &[2, 3],
1325 &[12, 13],
1326 &[OpType::Delete, OpType::Put],
1327 &[22, 23],
1328 );
1329 assert_eq!(expect, batch);
1330 }
1331
1332 #[test]
1333 fn test_timestamps_native() {
1334 let batch = new_batch(
1335 &[1, 2, 3, 4],
1336 &[11, 12, 13, 14],
1337 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1338 &[21, 22, 23, 24],
1339 );
1340 assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
1341 }
1342
1343 #[test]
1344 fn test_concat_empty() {
1345 let err = Batch::concat(vec![]).unwrap_err();
1346 assert!(
1347 matches!(err, Error::InvalidBatch { .. }),
1348 "unexpected err: {err}"
1349 );
1350 }
1351
1352 #[test]
1353 fn test_concat_one() {
1354 let batch = new_batch(&[], &[], &[], &[]);
1355 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1356 assert_eq!(batch, actual);
1357
1358 let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
1359 let actual = Batch::concat(vec![batch.clone()]).unwrap();
1360 assert_eq!(batch, actual);
1361 }
1362
1363 #[test]
1364 fn test_concat_multiple() {
1365 let batches = vec![
1366 new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
1367 new_batch(
1368 &[3, 4, 5],
1369 &[13, 14, 15],
1370 &[OpType::Put, OpType::Delete, OpType::Put],
1371 &[23, 24, 25],
1372 ),
1373 new_batch(&[], &[], &[], &[]),
1374 new_batch(&[6], &[16], &[OpType::Put], &[26]),
1375 ];
1376 let batch = Batch::concat(batches).unwrap();
1377 let expect = new_batch(
1378 &[1, 2, 3, 4, 5, 6],
1379 &[11, 12, 13, 14, 15, 16],
1380 &[
1381 OpType::Put,
1382 OpType::Put,
1383 OpType::Put,
1384 OpType::Delete,
1385 OpType::Put,
1386 OpType::Put,
1387 ],
1388 &[21, 22, 23, 24, 25, 26],
1389 );
1390 assert_eq!(expect, batch);
1391 }
1392
1393 #[test]
1394 fn test_concat_different() {
1395 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1396 let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
1397 batch2.primary_key = b"hello".to_vec();
1398 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1399 assert!(
1400 matches!(err, Error::InvalidBatch { .. }),
1401 "unexpected err: {err}"
1402 );
1403 }
1404
1405 #[test]
1406 fn test_concat_different_fields() {
1407 let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
1408 let fields = vec![
1409 batch1.fields()[0].clone(),
1410 BatchColumn {
1411 column_id: 2,
1412 data: Arc::new(UInt64Vector::from_slice([2])),
1413 },
1414 ];
1415 let batch2 = batch1.clone().with_fields(fields).unwrap();
1417 let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
1418 assert!(
1419 matches!(err, Error::InvalidBatch { .. }),
1420 "unexpected err: {err}"
1421 );
1422
1423 let fields = vec![BatchColumn {
1425 column_id: 2,
1426 data: Arc::new(UInt64Vector::from_slice([2])),
1427 }];
1428 let batch2 = batch1.clone().with_fields(fields).unwrap();
1429 let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
1430 assert!(
1431 matches!(err, Error::InvalidBatch { .. }),
1432 "unexpected err: {err}"
1433 );
1434 }
1435
1436 #[test]
1437 fn test_filter_deleted_empty() {
1438 let mut batch = new_batch(&[], &[], &[], &[]);
1439 batch.filter_deleted().unwrap();
1440 assert!(batch.is_empty());
1441 }
1442
1443 #[test]
1444 fn test_filter_deleted() {
1445 let mut batch = new_batch(
1446 &[1, 2, 3, 4],
1447 &[11, 12, 13, 14],
1448 &[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
1449 &[21, 22, 23, 24],
1450 );
1451 batch.filter_deleted().unwrap();
1452 let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
1453 assert_eq!(expect, batch);
1454
1455 let mut batch = new_batch(
1456 &[1, 2, 3, 4],
1457 &[11, 12, 13, 14],
1458 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1459 &[21, 22, 23, 24],
1460 );
1461 let expect = batch.clone();
1462 batch.filter_deleted().unwrap();
1463 assert_eq!(expect, batch);
1464 }
1465
1466 #[test]
1467 fn test_filter_by_sequence() {
1468 let mut batch = new_batch(
1470 &[1, 2, 3, 4],
1471 &[11, 12, 13, 14],
1472 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1473 &[21, 22, 23, 24],
1474 );
1475 batch
1476 .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
1477 .unwrap();
1478 let expect = new_batch(
1479 &[1, 2, 3],
1480 &[11, 12, 13],
1481 &[OpType::Put, OpType::Put, OpType::Put],
1482 &[21, 22, 23],
1483 );
1484 assert_eq!(expect, batch);
1485
1486 let mut batch = new_batch(
1488 &[1, 2, 3, 4],
1489 &[11, 12, 13, 14],
1490 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1491 &[21, 22, 23, 24],
1492 );
1493
1494 batch
1495 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1496 .unwrap();
1497 assert!(batch.is_empty());
1498
1499 let mut batch = new_batch(
1501 &[1, 2, 3, 4],
1502 &[11, 12, 13, 14],
1503 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1504 &[21, 22, 23, 24],
1505 );
1506 let expect = batch.clone();
1507 batch.filter_by_sequence(None).unwrap();
1508 assert_eq!(expect, batch);
1509
1510 let mut batch = new_batch(&[], &[], &[], &[]);
1512 batch
1513 .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
1514 .unwrap();
1515 assert!(batch.is_empty());
1516
1517 let mut batch = new_batch(&[], &[], &[], &[]);
1519 batch.filter_by_sequence(None).unwrap();
1520 assert!(batch.is_empty());
1521
1522 let mut batch = new_batch(
1524 &[1, 2, 3, 4],
1525 &[11, 12, 13, 14],
1526 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1527 &[21, 22, 23, 24],
1528 );
1529 batch
1530 .filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
1531 .unwrap();
1532 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1533 assert_eq!(expect, batch);
1534
1535 let mut batch = new_batch(
1537 &[1, 2, 3, 4],
1538 &[11, 12, 13, 14],
1539 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1540 &[21, 22, 23, 24],
1541 );
1542 batch
1543 .filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
1544 .unwrap();
1545 assert!(batch.is_empty());
1546
1547 let mut batch = new_batch(
1549 &[1, 2, 3, 4, 5],
1550 &[11, 12, 13, 14, 15],
1551 &[
1552 OpType::Put,
1553 OpType::Put,
1554 OpType::Put,
1555 OpType::Put,
1556 OpType::Put,
1557 ],
1558 &[21, 22, 23, 24, 25],
1559 );
1560 batch
1561 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
1562 .unwrap();
1563 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1564 assert_eq!(expect, batch);
1565
1566 let mut batch = new_batch(
1568 &[1, 2, 3, 4, 5],
1569 &[11, 12, 13, 14, 15],
1570 &[
1571 OpType::Put,
1572 OpType::Delete,
1573 OpType::Put,
1574 OpType::Delete,
1575 OpType::Put,
1576 ],
1577 &[21, 22, 23, 24, 25],
1578 );
1579 batch
1580 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
1581 .unwrap();
1582 let expect = new_batch(
1583 &[2, 3],
1584 &[12, 13],
1585 &[OpType::Delete, OpType::Put],
1586 &[22, 23],
1587 );
1588 assert_eq!(expect, batch);
1589
1590 let mut batch = new_batch(
1592 &[1, 2, 3, 4],
1593 &[11, 12, 13, 14],
1594 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1595 &[21, 22, 23, 24],
1596 );
1597 batch
1598 .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
1599 .unwrap();
1600 assert!(batch.is_empty());
1601 }
1602
1603 #[test]
1604 fn test_merge_last_non_null_no_dup() {
1605 let mut batch = new_batch_with_u64_fields(
1606 &[1, 2],
1607 &[2, 1],
1608 &[OpType::Put, OpType::Put],
1609 &[(1, &[Some(10), None]), (2, &[Some(100), Some(200)])],
1610 );
1611 let expect = batch.clone();
1612 batch.merge_last_non_null().unwrap();
1613 assert_eq!(expect, batch);
1614 }
1615
1616 #[test]
1617 fn test_merge_last_non_null_fill_null_fields() {
1618 let mut batch = new_batch_with_u64_fields(
1620 &[1, 1, 1],
1621 &[3, 2, 1],
1622 &[OpType::Put, OpType::Put, OpType::Put],
1623 &[
1624 (1, &[None, Some(10), Some(11)]),
1625 (2, &[Some(100), Some(200), Some(300)]),
1626 ],
1627 );
1628 batch.merge_last_non_null().unwrap();
1629
1630 let expect = new_batch_with_u64_fields(
1633 &[1],
1634 &[3],
1635 &[OpType::Put],
1636 &[(1, &[Some(10)]), (2, &[Some(100)])],
1637 );
1638 assert_eq!(expect, batch);
1639 }
1640
1641 #[test]
1642 fn test_merge_last_non_null_stop_at_delete_row() {
1643 let mut batch = new_batch_with_u64_fields(
1646 &[1, 1, 1],
1647 &[3, 2, 1],
1648 &[OpType::Put, OpType::Delete, OpType::Put],
1649 &[
1650 (1, &[None, Some(10), Some(11)]),
1651 (2, &[Some(100), Some(200), Some(300)]),
1652 ],
1653 );
1654 batch.merge_last_non_null().unwrap();
1655
1656 let expect = new_batch_with_u64_fields(
1657 &[1],
1658 &[3],
1659 &[OpType::Put],
1660 &[(1, &[None]), (2, &[Some(100)])],
1661 );
1662 assert_eq!(expect, batch);
1663 }
1664
1665 #[test]
1666 fn test_merge_last_non_null_base_delete_no_merge() {
1667 let mut batch = new_batch_with_u64_fields(
1668 &[1, 1],
1669 &[3, 2],
1670 &[OpType::Delete, OpType::Put],
1671 &[(1, &[None, Some(10)]), (2, &[None, Some(200)])],
1672 );
1673 batch.merge_last_non_null().unwrap();
1674
1675 let expect =
1677 new_batch_with_u64_fields(&[1], &[3], &[OpType::Delete], &[(1, &[None]), (2, &[None])]);
1678 assert_eq!(expect, batch);
1679 }
1680
1681 #[test]
1682 fn test_merge_last_non_null_multiple_timestamp_groups() {
1683 let mut batch = new_batch_with_u64_fields(
1684 &[1, 1, 2, 3, 3],
1685 &[5, 4, 3, 2, 1],
1686 &[
1687 OpType::Put,
1688 OpType::Put,
1689 OpType::Put,
1690 OpType::Put,
1691 OpType::Put,
1692 ],
1693 &[
1694 (1, &[None, Some(10), Some(20), None, Some(30)]),
1695 (2, &[Some(100), Some(110), Some(120), None, Some(130)]),
1696 ],
1697 );
1698 batch.merge_last_non_null().unwrap();
1699
1700 let expect = new_batch_with_u64_fields(
1701 &[1, 2, 3],
1702 &[5, 3, 2],
1703 &[OpType::Put, OpType::Put, OpType::Put],
1704 &[
1705 (1, &[Some(10), Some(20), Some(30)]),
1706 (2, &[Some(100), Some(120), Some(130)]),
1707 ],
1708 );
1709 assert_eq!(expect, batch);
1710 }
1711
1712 #[test]
1713 fn test_merge_last_non_null_no_fields() {
1714 let mut batch = new_batch_without_fields(
1715 &[1, 1, 2],
1716 &[3, 2, 1],
1717 &[OpType::Put, OpType::Put, OpType::Put],
1718 );
1719 batch.merge_last_non_null().unwrap();
1720
1721 let expect = new_batch_without_fields(&[1, 2], &[3, 1], &[OpType::Put, OpType::Put]);
1722 assert_eq!(expect, batch);
1723 }
1724
1725 #[test]
1726 fn test_filter() {
1727 let mut batch = new_batch(
1729 &[1, 2, 3, 4],
1730 &[11, 12, 13, 14],
1731 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1732 &[21, 22, 23, 24],
1733 );
1734 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1735 batch.filter(&predicate).unwrap();
1736 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1737 assert_eq!(expect, batch);
1738
1739 let mut batch = new_batch(
1741 &[1, 2, 3, 4],
1742 &[11, 12, 13, 14],
1743 &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
1744 &[21, 22, 23, 24],
1745 );
1746 let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
1747 batch.filter(&predicate).unwrap();
1748 let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
1749 assert_eq!(expect, batch);
1750
1751 let predicate = BooleanVector::from_vec(vec![false, false]);
1753 batch.filter(&predicate).unwrap();
1754 assert!(batch.is_empty());
1755 }
1756
1757 #[test]
1758 fn test_sort_and_dedup() {
1759 let original = new_batch(
1760 &[2, 3, 1, 4, 5, 2],
1761 &[1, 2, 3, 4, 5, 6],
1762 &[
1763 OpType::Put,
1764 OpType::Put,
1765 OpType::Put,
1766 OpType::Put,
1767 OpType::Put,
1768 OpType::Put,
1769 ],
1770 &[21, 22, 23, 24, 25, 26],
1771 );
1772
1773 let mut batch = original.clone();
1774 batch.sort(true).unwrap();
1775 assert_eq!(
1777 new_batch(
1778 &[1, 2, 3, 4, 5],
1779 &[3, 6, 2, 4, 5],
1780 &[
1781 OpType::Put,
1782 OpType::Put,
1783 OpType::Put,
1784 OpType::Put,
1785 OpType::Put,
1786 ],
1787 &[23, 26, 22, 24, 25],
1788 ),
1789 batch
1790 );
1791
1792 let mut batch = original.clone();
1793 batch.sort(false).unwrap();
1794
1795 assert_eq!(
1797 new_batch(
1798 &[1, 2, 2, 3, 4, 5],
1799 &[3, 6, 1, 2, 4, 5],
1800 &[
1801 OpType::Put,
1802 OpType::Put,
1803 OpType::Put,
1804 OpType::Put,
1805 OpType::Put,
1806 OpType::Put,
1807 ],
1808 &[23, 26, 21, 22, 24, 25],
1809 ),
1810 batch
1811 );
1812
1813 let original = new_batch(
1814 &[2, 2, 1],
1815 &[1, 6, 1],
1816 &[OpType::Delete, OpType::Put, OpType::Put],
1817 &[21, 22, 23],
1818 );
1819
1820 let mut batch = original.clone();
1821 batch.sort(true).unwrap();
1822 let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
1823 assert_eq!(expect, batch);
1824
1825 let mut batch = original.clone();
1826 batch.sort(false).unwrap();
1827 let expect = new_batch(
1828 &[1, 2, 2],
1829 &[1, 6, 1],
1830 &[OpType::Put, OpType::Put, OpType::Delete],
1831 &[23, 22, 21],
1832 );
1833 assert_eq!(expect, batch);
1834 }
1835
1836 #[test]
1837 fn test_get_value() {
1838 let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
1839
1840 for encoding in encodings {
1841 let codec = build_primary_key_codec_with_fields(
1842 encoding,
1843 [
1844 (
1845 ReservedColumnId::table_id(),
1846 row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
1847 ),
1848 (
1849 ReservedColumnId::tsid(),
1850 row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
1851 ),
1852 (
1853 100,
1854 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1855 ),
1856 (
1857 200,
1858 row_converter::SortField::new(ConcreteDataType::string_datatype()),
1859 ),
1860 ]
1861 .into_iter(),
1862 );
1863
1864 let values = [
1865 Value::UInt32(1000),
1866 Value::UInt64(2000),
1867 Value::String("abcdefgh".into()),
1868 Value::String("zyxwvu".into()),
1869 ];
1870 let mut buf = vec![];
1871 codec
1872 .encode_values(
1873 &[
1874 (ReservedColumnId::table_id(), values[0].clone()),
1875 (ReservedColumnId::tsid(), values[1].clone()),
1876 (100, values[2].clone()),
1877 (200, values[3].clone()),
1878 ],
1879 &mut buf,
1880 )
1881 .unwrap();
1882
1883 let field_col_id = 2;
1884 let mut batch = new_batch_builder(
1885 &buf,
1886 &[1, 2, 3],
1887 &[1, 1, 1],
1888 &[OpType::Put, OpType::Put, OpType::Put],
1889 field_col_id,
1890 &[42, 43, 44],
1891 )
1892 .build()
1893 .unwrap();
1894
1895 let v = batch
1896 .pk_col_value(&*codec, 0, ReservedColumnId::table_id())
1897 .unwrap()
1898 .unwrap();
1899 assert_eq!(values[0], *v);
1900
1901 let v = batch
1902 .pk_col_value(&*codec, 1, ReservedColumnId::tsid())
1903 .unwrap()
1904 .unwrap();
1905 assert_eq!(values[1], *v);
1906
1907 let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
1908 assert_eq!(values[2], *v);
1909
1910 let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
1911 assert_eq!(values[3], *v);
1912
1913 let v = batch.field_col_value(field_col_id).unwrap();
1914 assert_eq!(v.data.get(0), Value::UInt64(42));
1915 assert_eq!(v.data.get(1), Value::UInt64(43));
1916 assert_eq!(v.data.get(2), Value::UInt64(44));
1917 }
1918 }
1919}