1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datatypes::arrow;
28use datatypes::arrow::array::{
29 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
30 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
31 TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder,
32 UInt8Array, UInt8Builder,
33};
34use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
35use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::{Value, ValueRef};
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
42use mito_codec::row_converter::{
43 build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt,
44};
45use parquet::arrow::ArrowWriter;
46use parquet::data_type::AsBytes;
47use parquet::file::metadata::ParquetMetaData;
48use parquet::file::properties::WriterProperties;
49use snafu::{OptionExt, ResultExt, Snafu};
50use store_api::codec::PrimaryKeyEncoding;
51use store_api::metadata::{RegionMetadata, RegionMetadataRef};
52use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
53use store_api::storage::SequenceNumber;
54use table::predicate::Predicate;
55
56use crate::error::{
57 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
58 EncodeSnafu, NewRecordBatchSnafu, Result,
59};
60use crate::memtable::bulk::context::BulkIterContextRef;
61use crate::memtable::bulk::part_reader::BulkPartIter;
62use crate::memtable::time_series::{ValueBuilder, Values};
63use crate::memtable::BoxedBatchIterator;
64use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
65use crate::sst::parquet::helper::parse_parquet_metadata;
66use crate::sst::to_sst_arrow_schema;
67
68const INIT_DICT_VALUE_CAPACITY: usize = 8;
69
70#[derive(Clone)]
71pub struct BulkPart {
72 pub batch: RecordBatch,
73 pub max_ts: i64,
74 pub min_ts: i64,
75 pub sequence: u64,
76 pub timestamp_index: usize,
77 pub raw_data: Option<ArrowIpc>,
78}
79
80impl TryFrom<BulkWalEntry> for BulkPart {
81 type Error = error::Error;
82
83 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
84 match value.body.expect("Entry payload should be present") {
85 Body::ArrowIpc(ipc) => {
86 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
87 .context(error::ConvertBulkWalEntrySnafu)?;
88 let batch = decoder
89 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
90 .context(error::ConvertBulkWalEntrySnafu)?;
91 Ok(Self {
92 batch,
93 max_ts: value.max_ts,
94 min_ts: value.min_ts,
95 sequence: value.sequence,
96 timestamp_index: value.timestamp_index as usize,
97 raw_data: Some(ipc),
98 })
99 }
100 }
101 }
102}
103
104impl TryFrom<&BulkPart> for BulkWalEntry {
105 type Error = error::Error;
106
107 fn try_from(value: &BulkPart) -> Result<Self> {
108 if let Some(ipc) = &value.raw_data {
109 Ok(BulkWalEntry {
110 sequence: value.sequence,
111 max_ts: value.max_ts,
112 min_ts: value.min_ts,
113 timestamp_index: value.timestamp_index as u32,
114 body: Some(Body::ArrowIpc(ipc.clone())),
115 })
116 } else {
117 let mut encoder = FlightEncoder::default();
118 let schema_bytes = encoder
119 .encode_schema(value.batch.schema().as_ref())
120 .data_header;
121 let [rb_data] = encoder
122 .encode(FlightMessage::RecordBatch(value.batch.clone()))
123 .try_into()
124 .map_err(|_| {
125 error::UnsupportedOperationSnafu {
126 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
127 }
128 .build()
129 })?;
130 Ok(BulkWalEntry {
131 sequence: value.sequence,
132 max_ts: value.max_ts,
133 min_ts: value.min_ts,
134 timestamp_index: value.timestamp_index as u32,
135 body: Some(Body::ArrowIpc(ArrowIpc {
136 schema: schema_bytes,
137 data_header: rb_data.data_header,
138 payload: rb_data.data_body,
139 })),
140 })
141 }
142 }
143}
144
145impl BulkPart {
146 pub(crate) fn estimated_size(&self) -> usize {
147 self.batch
148 .columns()
149 .iter()
150 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
152 .sum()
153 }
154
155 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
157 let vectors = region_metadata
158 .schema
159 .column_schemas()
160 .iter()
161 .map(|col| match self.batch.column_by_name(&col.name) {
162 None => Ok(None),
163 Some(col) => Helper::try_into_vector(col).map(Some),
164 })
165 .collect::<datatypes::error::Result<Vec<_>>>()
166 .context(error::ComputeVectorSnafu)?;
167
168 let rows = (0..self.num_rows())
169 .map(|row_idx| {
170 let values = (0..self.batch.num_columns())
171 .map(|col_idx| {
172 if let Some(v) = &vectors[col_idx] {
173 value_to_grpc_value(v.get(row_idx))
174 } else {
175 api::v1::Value { value_data: None }
176 }
177 })
178 .collect::<Vec<_>>();
179 api::v1::Row { values }
180 })
181 .collect::<Vec<_>>();
182
183 let schema = region_metadata
184 .column_metadatas
185 .iter()
186 .map(|c| {
187 let data_type_wrapper =
188 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
189 Ok(api::v1::ColumnSchema {
190 column_name: c.column_schema.name.clone(),
191 datatype: data_type_wrapper.datatype() as i32,
192 semantic_type: c.semantic_type as i32,
193 ..Default::default()
194 })
195 })
196 .collect::<api::error::Result<Vec<_>>>()
197 .context(error::ConvertColumnDataTypeSnafu {
198 reason: "failed to convert region metadata to column schema",
199 })?;
200
201 let rows = api::v1::Rows { schema, rows };
202
203 Ok(Mutation {
204 op_type: OpType::Put as i32,
205 sequence: self.sequence,
206 rows: Some(rows),
207 write_hint: None,
208 })
209 }
210
211 pub fn timestamps(&self) -> &ArrayRef {
212 self.batch.column(self.timestamp_index)
213 }
214
215 pub fn num_rows(&self) -> usize {
216 self.batch.num_rows()
217 }
218}
219
220type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
222
223enum PrimaryKeyColumnBuilder {
225 StringDict(StringDictionaryBuilder<UInt32Type>),
227 Vector(Box<dyn MutableVector>),
229}
230
231impl PrimaryKeyColumnBuilder {
232 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
234 match self {
235 PrimaryKeyColumnBuilder::StringDict(builder) => {
236 if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
237 builder.append_value(s);
239 } else {
240 builder.append_null();
241 }
242 }
243 PrimaryKeyColumnBuilder::Vector(builder) => {
244 builder.push_value_ref(value);
245 }
246 }
247 Ok(())
248 }
249
250 fn into_arrow_array(self) -> ArrayRef {
252 match self {
253 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
254 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
255 }
256 }
257}
258
259pub struct BulkPartConverter {
261 region_metadata: RegionMetadataRef,
263 schema: SchemaRef,
265 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
267 key_buf: Vec<u8>,
269 key_array_builder: PrimaryKeyArrayBuilder,
271 value_builder: ValueBuilder,
273 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
276
277 max_ts: i64,
279 min_ts: i64,
281 max_sequence: SequenceNumber,
283}
284
285impl BulkPartConverter {
286 pub fn new(
291 region_metadata: &RegionMetadataRef,
292 schema: SchemaRef,
293 capacity: usize,
294 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
295 store_primary_key_columns: bool,
296 ) -> Self {
297 debug_assert_eq!(
298 region_metadata.primary_key_encoding,
299 primary_key_codec.encoding()
300 );
301
302 let primary_key_column_builders = if store_primary_key_columns
303 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
304 {
305 new_primary_key_column_builders(region_metadata, capacity)
306 } else {
307 Vec::new()
308 };
309
310 Self {
311 region_metadata: region_metadata.clone(),
312 schema,
313 primary_key_codec,
314 key_buf: Vec::new(),
315 key_array_builder: PrimaryKeyArrayBuilder::new(),
316 value_builder: ValueBuilder::new(region_metadata, capacity),
317 primary_key_column_builders,
318 min_ts: i64::MAX,
319 max_ts: i64::MIN,
320 max_sequence: SequenceNumber::MIN,
321 }
322 }
323
324 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
326 for kv in key_values.iter() {
327 self.append_key_value(&kv)?;
328 }
329
330 Ok(())
331 }
332
333 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
337 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
339 let mut primary_keys = kv.primary_keys();
342 if let Some(encoded) = primary_keys
343 .next()
344 .context(ColumnNotFoundSnafu {
345 column: PRIMARY_KEY_COLUMN_NAME,
346 })?
347 .as_binary()
348 .context(DataTypeMismatchSnafu)?
349 {
350 self.key_array_builder
351 .append(encoded)
352 .context(ComputeArrowSnafu)?;
353 } else {
354 self.key_array_builder
355 .append("")
356 .context(ComputeArrowSnafu)?;
357 }
358 } else {
359 self.key_buf.clear();
361 self.primary_key_codec
362 .encode_key_value(kv, &mut self.key_buf)
363 .context(EncodeSnafu)?;
364 self.key_array_builder
365 .append(&self.key_buf)
366 .context(ComputeArrowSnafu)?;
367 };
368
369 if !self.primary_key_column_builders.is_empty() {
371 for (builder, pk_value) in self
372 .primary_key_column_builders
373 .iter_mut()
374 .zip(kv.primary_keys())
375 {
376 builder.push_value_ref(pk_value)?;
377 }
378 }
379
380 self.value_builder.push(
382 kv.timestamp(),
383 kv.sequence(),
384 kv.op_type() as u8,
385 kv.fields(),
386 );
387
388 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
391 self.min_ts = self.min_ts.min(ts);
392 self.max_ts = self.max_ts.max(ts);
393 self.max_sequence = self.max_sequence.max(kv.sequence());
394
395 Ok(())
396 }
397
398 pub fn convert(mut self) -> Result<BulkPart> {
402 let values = Values::from(self.value_builder);
403 let mut columns =
404 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
405
406 for builder in self.primary_key_column_builders {
408 columns.push(builder.into_arrow_array());
409 }
410 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
412 let timestamp_index = columns.len();
414 columns.push(values.timestamp.to_arrow_array());
415 let pk_array = self.key_array_builder.finish();
417 columns.push(Arc::new(pk_array));
418 columns.push(values.sequence.to_arrow_array());
420 columns.push(values.op_type.to_arrow_array());
421
422 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
423 let batch = sort_primary_key_record_batch(&batch)?;
425
426 Ok(BulkPart {
427 batch,
428 max_ts: self.max_ts,
429 min_ts: self.min_ts,
430 sequence: self.max_sequence,
431 timestamp_index,
432 raw_data: None,
433 })
434 }
435}
436
437fn new_primary_key_column_builders(
438 metadata: &RegionMetadata,
439 capacity: usize,
440) -> Vec<PrimaryKeyColumnBuilder> {
441 metadata
442 .primary_key_columns()
443 .map(|col| {
444 if col.column_schema.data_type.is_string() {
445 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
446 capacity,
447 INIT_DICT_VALUE_CAPACITY,
448 capacity,
449 ))
450 } else {
451 PrimaryKeyColumnBuilder::Vector(
452 col.column_schema.data_type.create_mutable_vector(capacity),
453 )
454 }
455 })
456 .collect()
457}
458
459fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
461 let total_columns = batch.num_columns();
462 let sort_columns = vec![
463 SortColumn {
465 values: batch.column(total_columns - 3).clone(),
466 options: Some(SortOptions {
467 descending: false,
468 nulls_first: true,
469 }),
470 },
471 SortColumn {
473 values: batch.column(total_columns - 4).clone(),
474 options: Some(SortOptions {
475 descending: false,
476 nulls_first: true,
477 }),
478 },
479 SortColumn {
481 values: batch.column(total_columns - 2).clone(),
482 options: Some(SortOptions {
483 descending: true,
484 nulls_first: true,
485 }),
486 },
487 ];
488
489 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
490 .context(ComputeArrowSnafu)?;
491
492 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
493}
494
495#[derive(Debug)]
496pub struct EncodedBulkPart {
497 data: Bytes,
498 metadata: BulkPartMeta,
499}
500
501impl EncodedBulkPart {
502 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
503 Self { data, metadata }
504 }
505
506 pub(crate) fn metadata(&self) -> &BulkPartMeta {
507 &self.metadata
508 }
509
510 pub(crate) fn read(
511 &self,
512 context: BulkIterContextRef,
513 sequence: Option<SequenceNumber>,
514 ) -> Result<Option<BoxedBatchIterator>> {
515 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
517
518 if row_groups_to_read.is_empty() {
519 return Ok(None);
521 }
522
523 let iter = BulkPartIter::try_new(
524 context,
525 row_groups_to_read,
526 self.metadata.parquet_metadata.clone(),
527 self.data.clone(),
528 sequence,
529 )?;
530 Ok(Some(Box::new(iter) as BoxedBatchIterator))
531 }
532}
533
534#[derive(Debug)]
535pub struct BulkPartMeta {
536 pub num_rows: usize,
538 pub max_timestamp: i64,
540 pub min_timestamp: i64,
542 pub parquet_metadata: Arc<ParquetMetaData>,
544 pub region_metadata: RegionMetadataRef,
546}
547
548pub struct BulkPartEncoder {
549 metadata: RegionMetadataRef,
550 pk_encoder: DensePrimaryKeyCodec,
551 row_group_size: usize,
552 dedup: bool,
553 writer_props: Option<WriterProperties>,
554}
555
556impl BulkPartEncoder {
557 pub(crate) fn new(
558 metadata: RegionMetadataRef,
559 dedup: bool,
560 row_group_size: usize,
561 ) -> BulkPartEncoder {
562 let codec = DensePrimaryKeyCodec::new(&metadata);
563 let writer_props = Some(
564 WriterProperties::builder()
565 .set_write_batch_size(row_group_size)
566 .set_max_row_group_size(row_group_size)
567 .build(),
568 );
569 Self {
570 metadata,
571 pk_encoder: codec,
572 row_group_size,
573 dedup,
574 writer_props,
575 }
576 }
577}
578
579impl BulkPartEncoder {
580 fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
582 let Some((arrow_record_batch, min_ts, max_ts)) =
583 mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
584 else {
585 return Ok(None);
586 };
587
588 let mut buf = Vec::with_capacity(4096);
589 let arrow_schema = arrow_record_batch.schema();
590
591 let file_metadata = {
592 let mut writer =
593 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
594 .context(EncodeMemtableSnafu)?;
595 writer
596 .write(&arrow_record_batch)
597 .context(EncodeMemtableSnafu)?;
598 writer.finish().context(EncodeMemtableSnafu)?
599 };
600
601 let buf = Bytes::from(buf);
602 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
603
604 Ok(Some(EncodedBulkPart {
605 data: buf,
606 metadata: BulkPartMeta {
607 num_rows: arrow_record_batch.num_rows(),
608 max_timestamp: max_ts,
609 min_timestamp: min_ts,
610 parquet_metadata,
611 region_metadata: self.metadata.clone(),
612 },
613 }))
614 }
615}
616
617fn mutations_to_record_batch(
619 mutations: &[Mutation],
620 metadata: &RegionMetadataRef,
621 pk_encoder: &DensePrimaryKeyCodec,
622 dedup: bool,
623) -> Result<Option<(RecordBatch, i64, i64)>> {
624 let total_rows: usize = mutations
625 .iter()
626 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
627 .sum();
628
629 if total_rows == 0 {
630 return Ok(None);
631 }
632
633 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
634
635 let mut ts_vector: Box<dyn MutableVector> = metadata
636 .time_index_column()
637 .column_schema
638 .data_type
639 .create_mutable_vector(total_rows);
640 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
641 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
642
643 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
644 .field_columns()
645 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
646 .collect();
647
648 let mut pk_buffer = vec![];
649 for m in mutations {
650 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
651 continue;
652 };
653
654 for row in key_values.iter() {
655 pk_buffer.clear();
656 pk_encoder
657 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
658 .context(EncodeSnafu)?;
659 pk_builder.append_value(pk_buffer.as_bytes());
660 ts_vector.push_value_ref(row.timestamp());
661 sequence_builder.append_value(row.sequence());
662 op_type_builder.append_value(row.op_type() as u8);
663 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
664 builder.push_value_ref(field);
665 }
666 }
667 }
668
669 let arrow_schema = to_sst_arrow_schema(metadata);
670 let timestamp_unit = metadata
672 .time_index_column()
673 .column_schema
674 .data_type
675 .as_timestamp()
676 .unwrap()
677 .unit();
678 let sorter = ArraysSorter {
679 encoded_primary_keys: pk_builder.finish(),
680 timestamp_unit,
681 timestamp: ts_vector.to_vector().to_arrow_array(),
682 sequence: sequence_builder.finish(),
683 op_type: op_type_builder.finish(),
684 fields: field_builders
685 .iter_mut()
686 .map(|f| f.to_vector().to_arrow_array()),
687 dedup,
688 arrow_schema,
689 };
690
691 sorter.sort().map(Some)
692}
693
694struct ArraysSorter<I> {
695 encoded_primary_keys: BinaryArray,
696 timestamp_unit: TimeUnit,
697 timestamp: ArrayRef,
698 sequence: UInt64Array,
699 op_type: UInt8Array,
700 fields: I,
701 dedup: bool,
702 arrow_schema: SchemaRef,
703}
704
705impl<I> ArraysSorter<I>
706where
707 I: Iterator<Item = ArrayRef>,
708{
709 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
711 debug_assert!(!self.timestamp.is_empty());
712 debug_assert!(self.timestamp.len() == self.sequence.len());
713 debug_assert!(self.timestamp.len() == self.op_type.len());
714 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
715
716 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
717 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
718 let mut to_sort = self
719 .encoded_primary_keys
720 .iter()
721 .zip(timestamp_iter)
722 .zip(self.sequence.iter())
723 .map(|((pk, timestamp), sequence)| {
724 max_timestamp = max_timestamp.max(*timestamp);
725 min_timestamp = min_timestamp.min(*timestamp);
726 (pk, timestamp, sequence)
727 })
728 .enumerate()
729 .collect::<Vec<_>>();
730
731 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
732 l_pk.cmp(r_pk)
733 .then(l_ts.cmp(r_ts))
734 .then(l_seq.cmp(r_seq).reverse())
735 });
736
737 if self.dedup {
738 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
740 l_pk == r_pk && l_ts == r_ts
741 });
742 }
743
744 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
745
746 let pk_dictionary = Arc::new(binary_array_to_dictionary(
747 arrow::compute::take(
749 &self.encoded_primary_keys,
750 &indices,
751 Some(TakeOptions {
752 check_bounds: false,
753 }),
754 )
755 .context(ComputeArrowSnafu)?
756 .as_any()
757 .downcast_ref::<BinaryArray>()
758 .unwrap(),
759 )?) as ArrayRef;
760
761 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
762 for arr in self.fields {
763 arrays.push(
764 arrow::compute::take(
765 &arr,
766 &indices,
767 Some(TakeOptions {
768 check_bounds: false,
769 }),
770 )
771 .context(ComputeArrowSnafu)?,
772 );
773 }
774
775 let timestamp = arrow::compute::take(
776 &self.timestamp,
777 &indices,
778 Some(TakeOptions {
779 check_bounds: false,
780 }),
781 )
782 .context(ComputeArrowSnafu)?;
783
784 arrays.push(timestamp);
785 arrays.push(pk_dictionary);
786 arrays.push(
787 arrow::compute::take(
788 &self.sequence,
789 &indices,
790 Some(TakeOptions {
791 check_bounds: false,
792 }),
793 )
794 .context(ComputeArrowSnafu)?,
795 );
796
797 arrays.push(
798 arrow::compute::take(
799 &self.op_type,
800 &indices,
801 Some(TakeOptions {
802 check_bounds: false,
803 }),
804 )
805 .context(ComputeArrowSnafu)?,
806 );
807
808 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
809 Ok((batch, min_timestamp, max_timestamp))
810 }
811}
812
813fn timestamp_array_to_iter(
815 timestamp_unit: TimeUnit,
816 timestamp: &ArrayRef,
817) -> impl Iterator<Item = &i64> {
818 match timestamp_unit {
819 TimeUnit::Second => timestamp
821 .as_any()
822 .downcast_ref::<TimestampSecondArray>()
823 .unwrap()
824 .values()
825 .iter(),
826 TimeUnit::Millisecond => timestamp
827 .as_any()
828 .downcast_ref::<TimestampMillisecondArray>()
829 .unwrap()
830 .values()
831 .iter(),
832 TimeUnit::Microsecond => timestamp
833 .as_any()
834 .downcast_ref::<TimestampMicrosecondArray>()
835 .unwrap()
836 .values()
837 .iter(),
838 TimeUnit::Nanosecond => timestamp
839 .as_any()
840 .downcast_ref::<TimestampNanosecondArray>()
841 .unwrap()
842 .values()
843 .iter(),
844 }
845}
846
847fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
849 if input.is_empty() {
850 return Ok(DictionaryArray::new(
851 UInt32Array::from(Vec::<u32>::new()),
852 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
853 ));
854 }
855 let mut keys = Vec::with_capacity(16);
856 let mut values = BinaryBuilder::new();
857 let mut prev: usize = 0;
858 keys.push(prev as u32);
859 values.append_value(input.value(prev));
860
861 for current_bytes in input.iter().skip(1) {
862 let current_bytes = current_bytes.unwrap();
864 let prev_bytes = input.value(prev);
865 if current_bytes != prev_bytes {
866 values.append_value(current_bytes);
867 prev += 1;
868 }
869 keys.push(prev as u32);
870 }
871
872 Ok(DictionaryArray::new(
873 UInt32Array::from(keys),
874 Arc::new(values.finish()) as ArrayRef,
875 ))
876}
877
878#[cfg(test)]
879mod tests {
880 use std::collections::VecDeque;
881
882 use api::v1::{Row, WriteHint};
883 use datafusion_common::ScalarValue;
884 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
885 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
886 use store_api::storage::consts::ReservedColumnId;
887
888 use super::*;
889 use crate::memtable::bulk::context::BulkIterContext;
890 use crate::sst::parquet::format::ReadFormat;
891 use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
892 use crate::test_util::memtable_util::{
893 build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key,
894 region_metadata_to_row_schema,
895 };
896
897 fn check_binary_array_to_dictionary(
898 input: &[&[u8]],
899 expected_keys: &[u32],
900 expected_values: &[&[u8]],
901 ) {
902 let input = BinaryArray::from_iter_values(input.iter());
903 let array = binary_array_to_dictionary(&input).unwrap();
904 assert_eq!(
905 &expected_keys,
906 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
907 );
908 assert_eq!(
909 expected_values,
910 &array
911 .values()
912 .as_any()
913 .downcast_ref::<BinaryArray>()
914 .unwrap()
915 .iter()
916 .map(|v| v.unwrap())
917 .collect::<Vec<_>>()
918 );
919 }
920
921 #[test]
922 fn test_binary_array_to_dictionary() {
923 check_binary_array_to_dictionary(&[], &[], &[]);
924
925 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
926
927 check_binary_array_to_dictionary(
928 &["a".as_bytes(), "a".as_bytes()],
929 &[0, 0],
930 &["a".as_bytes()],
931 );
932
933 check_binary_array_to_dictionary(
934 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
935 &[0, 0, 1],
936 &["a".as_bytes(), "b".as_bytes()],
937 );
938
939 check_binary_array_to_dictionary(
940 &[
941 "a".as_bytes(),
942 "a".as_bytes(),
943 "b".as_bytes(),
944 "c".as_bytes(),
945 ],
946 &[0, 0, 1, 2],
947 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
948 );
949 }
950
951 struct MutationInput<'a> {
952 k0: &'a str,
953 k1: u32,
954 timestamps: &'a [i64],
955 v1: &'a [Option<f64>],
956 sequence: u64,
957 }
958
959 #[derive(Debug, PartialOrd, PartialEq)]
960 struct BatchOutput<'a> {
961 pk_values: &'a [Value],
962 timestamps: &'a [i64],
963 v1: &'a [Option<f64>],
964 }
965
966 fn check_mutations_to_record_batches(
967 input: &[MutationInput],
968 expected: &[BatchOutput],
969 expected_timestamp: (i64, i64),
970 dedup: bool,
971 ) {
972 let metadata = metadata_for_test();
973 let mutations = input
974 .iter()
975 .map(|m| {
976 build_key_values_with_ts_seq_values(
977 &metadata,
978 m.k0.to_string(),
979 m.k1,
980 m.timestamps.iter().copied(),
981 m.v1.iter().copied(),
982 m.sequence,
983 )
984 .mutation
985 })
986 .collect::<Vec<_>>();
987 let total_rows: usize = mutations
988 .iter()
989 .flat_map(|m| m.rows.iter())
990 .map(|r| r.rows.len())
991 .sum();
992
993 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
994
995 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
996 .unwrap()
997 .unwrap();
998 let read_format = ReadFormat::new_with_all_columns(metadata.clone());
999 let mut batches = VecDeque::new();
1000 read_format
1001 .convert_record_batch(&batch, None, &mut batches)
1002 .unwrap();
1003 if !dedup {
1004 assert_eq!(
1005 total_rows,
1006 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1007 );
1008 }
1009 let batch_values = batches
1010 .into_iter()
1011 .map(|b| {
1012 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1013 let timestamps = b
1014 .timestamps()
1015 .as_any()
1016 .downcast_ref::<TimestampMillisecondVector>()
1017 .unwrap()
1018 .iter_data()
1019 .map(|v| v.unwrap().0.value())
1020 .collect::<Vec<_>>();
1021 let float_values = b.fields()[1]
1022 .data
1023 .as_any()
1024 .downcast_ref::<Float64Vector>()
1025 .unwrap()
1026 .iter_data()
1027 .collect::<Vec<_>>();
1028
1029 (pk_values, timestamps, float_values)
1030 })
1031 .collect::<Vec<_>>();
1032 assert_eq!(expected.len(), batch_values.len());
1033
1034 for idx in 0..expected.len() {
1035 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1036 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1037 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1038 }
1039 }
1040
1041 #[test]
1042 fn test_mutations_to_record_batch() {
1043 check_mutations_to_record_batches(
1044 &[MutationInput {
1045 k0: "a",
1046 k1: 0,
1047 timestamps: &[0],
1048 v1: &[Some(0.1)],
1049 sequence: 0,
1050 }],
1051 &[BatchOutput {
1052 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1053 timestamps: &[0],
1054 v1: &[Some(0.1)],
1055 }],
1056 (0, 0),
1057 true,
1058 );
1059
1060 check_mutations_to_record_batches(
1061 &[
1062 MutationInput {
1063 k0: "a",
1064 k1: 0,
1065 timestamps: &[0],
1066 v1: &[Some(0.1)],
1067 sequence: 0,
1068 },
1069 MutationInput {
1070 k0: "b",
1071 k1: 0,
1072 timestamps: &[0],
1073 v1: &[Some(0.0)],
1074 sequence: 0,
1075 },
1076 MutationInput {
1077 k0: "a",
1078 k1: 0,
1079 timestamps: &[1],
1080 v1: &[Some(0.2)],
1081 sequence: 1,
1082 },
1083 MutationInput {
1084 k0: "a",
1085 k1: 1,
1086 timestamps: &[1],
1087 v1: &[Some(0.3)],
1088 sequence: 2,
1089 },
1090 ],
1091 &[
1092 BatchOutput {
1093 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1094 timestamps: &[0, 1],
1095 v1: &[Some(0.1), Some(0.2)],
1096 },
1097 BatchOutput {
1098 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1099 timestamps: &[1],
1100 v1: &[Some(0.3)],
1101 },
1102 BatchOutput {
1103 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1104 timestamps: &[0],
1105 v1: &[Some(0.0)],
1106 },
1107 ],
1108 (0, 1),
1109 true,
1110 );
1111
1112 check_mutations_to_record_batches(
1113 &[
1114 MutationInput {
1115 k0: "a",
1116 k1: 0,
1117 timestamps: &[0],
1118 v1: &[Some(0.1)],
1119 sequence: 0,
1120 },
1121 MutationInput {
1122 k0: "b",
1123 k1: 0,
1124 timestamps: &[0],
1125 v1: &[Some(0.0)],
1126 sequence: 0,
1127 },
1128 MutationInput {
1129 k0: "a",
1130 k1: 0,
1131 timestamps: &[0],
1132 v1: &[Some(0.2)],
1133 sequence: 1,
1134 },
1135 ],
1136 &[
1137 BatchOutput {
1138 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1139 timestamps: &[0],
1140 v1: &[Some(0.2)],
1141 },
1142 BatchOutput {
1143 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1144 timestamps: &[0],
1145 v1: &[Some(0.0)],
1146 },
1147 ],
1148 (0, 0),
1149 true,
1150 );
1151 check_mutations_to_record_batches(
1152 &[
1153 MutationInput {
1154 k0: "a",
1155 k1: 0,
1156 timestamps: &[0],
1157 v1: &[Some(0.1)],
1158 sequence: 0,
1159 },
1160 MutationInput {
1161 k0: "b",
1162 k1: 0,
1163 timestamps: &[0],
1164 v1: &[Some(0.0)],
1165 sequence: 0,
1166 },
1167 MutationInput {
1168 k0: "a",
1169 k1: 0,
1170 timestamps: &[0],
1171 v1: &[Some(0.2)],
1172 sequence: 1,
1173 },
1174 ],
1175 &[
1176 BatchOutput {
1177 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1178 timestamps: &[0, 0],
1179 v1: &[Some(0.2), Some(0.1)],
1180 },
1181 BatchOutput {
1182 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1183 timestamps: &[0],
1184 v1: &[Some(0.0)],
1185 },
1186 ],
1187 (0, 0),
1188 false,
1189 );
1190 }
1191
1192 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1193 let metadata = metadata_for_test();
1194 let mutations = input
1195 .iter()
1196 .map(|m| {
1197 build_key_values_with_ts_seq_values(
1198 &metadata,
1199 m.k0.to_string(),
1200 m.k1,
1201 m.timestamps.iter().copied(),
1202 m.v1.iter().copied(),
1203 m.sequence,
1204 )
1205 .mutation
1206 })
1207 .collect::<Vec<_>>();
1208 let encoder = BulkPartEncoder::new(metadata, true, 1024);
1209 encoder.encode_mutations(&mutations).unwrap().unwrap()
1210 }
1211
1212 #[test]
1213 fn test_write_and_read_part_projection() {
1214 let part = encode(&[
1215 MutationInput {
1216 k0: "a",
1217 k1: 0,
1218 timestamps: &[1],
1219 v1: &[Some(0.1)],
1220 sequence: 0,
1221 },
1222 MutationInput {
1223 k0: "b",
1224 k1: 0,
1225 timestamps: &[1],
1226 v1: &[Some(0.0)],
1227 sequence: 0,
1228 },
1229 MutationInput {
1230 k0: "a",
1231 k1: 0,
1232 timestamps: &[2],
1233 v1: &[Some(0.2)],
1234 sequence: 1,
1235 },
1236 ]);
1237
1238 let projection = &[4u32];
1239
1240 let mut reader = part
1241 .read(
1242 Arc::new(BulkIterContext::new(
1243 part.metadata.region_metadata.clone(),
1244 &Some(projection.as_slice()),
1245 None,
1246 )),
1247 None,
1248 )
1249 .unwrap()
1250 .expect("expect at least one row group");
1251
1252 let mut total_rows_read = 0;
1253 let mut field = vec![];
1254 for res in reader {
1255 let batch = res.unwrap();
1256 assert_eq!(1, batch.fields().len());
1257 assert_eq!(4, batch.fields()[0].column_id);
1258 field.extend(
1259 batch.fields()[0]
1260 .data
1261 .as_any()
1262 .downcast_ref::<Float64Vector>()
1263 .unwrap()
1264 .iter_data()
1265 .map(|v| v.unwrap()),
1266 );
1267 total_rows_read += batch.num_rows();
1268 }
1269 assert_eq!(3, total_rows_read);
1270 assert_eq!(vec![0.1, 0.2, 0.0], field);
1271 }
1272
1273 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1274 let metadata = metadata_for_test();
1275 let mutations = key_values
1276 .into_iter()
1277 .map(|(k0, k1, (start, end), sequence)| {
1278 let ts = (start..end);
1279 let v1 = (start..end).map(|_| None);
1280 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1281 .mutation
1282 })
1283 .collect::<Vec<_>>();
1284 let encoder = BulkPartEncoder::new(metadata, true, 100);
1285 encoder.encode_mutations(&mutations).unwrap().unwrap()
1286 }
1287
1288 fn check_prune_row_group(
1289 part: &EncodedBulkPart,
1290 predicate: Option<Predicate>,
1291 expected_rows: usize,
1292 ) {
1293 let context = Arc::new(BulkIterContext::new(
1294 part.metadata.region_metadata.clone(),
1295 &None,
1296 predicate,
1297 ));
1298 let mut reader = part
1299 .read(context, None)
1300 .unwrap()
1301 .expect("expect at least one row group");
1302 let mut total_rows_read = 0;
1303 for res in reader {
1304 let batch = res.unwrap();
1305 total_rows_read += batch.num_rows();
1306 }
1307 assert_eq!(expected_rows, total_rows_read);
1309 }
1310
1311 #[test]
1312 fn test_prune_row_groups() {
1313 let part = prepare(vec![
1314 ("a", 0, (0, 40), 1),
1315 ("a", 1, (0, 60), 1),
1316 ("b", 0, (0, 100), 2),
1317 ("b", 1, (100, 180), 3),
1318 ("b", 1, (180, 210), 4),
1319 ]);
1320
1321 let context = Arc::new(BulkIterContext::new(
1322 part.metadata.region_metadata.clone(),
1323 &None,
1324 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1325 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1326 )])),
1327 ));
1328 assert!(part.read(context, None).unwrap().is_none());
1329
1330 check_prune_row_group(&part, None, 310);
1331
1332 check_prune_row_group(
1333 &part,
1334 Some(Predicate::new(vec![
1335 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1336 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1337 ])),
1338 40,
1339 );
1340
1341 check_prune_row_group(
1342 &part,
1343 Some(Predicate::new(vec![
1344 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1345 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1346 ])),
1347 60,
1348 );
1349
1350 check_prune_row_group(
1351 &part,
1352 Some(Predicate::new(vec![
1353 datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1354 ])),
1355 100,
1356 );
1357
1358 check_prune_row_group(
1359 &part,
1360 Some(Predicate::new(vec![
1361 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1362 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1363 ])),
1364 100,
1365 );
1366
1367 check_prune_row_group(
1369 &part,
1370 Some(Predicate::new(vec![
1371 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1372 ])),
1373 1,
1374 );
1375 }
1376
1377 #[test]
1378 fn test_bulk_part_converter_append_and_convert() {
1379 let metadata = metadata_for_test();
1380 let capacity = 100;
1381 let primary_key_codec = build_primary_key_codec(&metadata);
1382 let schema = to_flat_sst_arrow_schema(
1383 &metadata,
1384 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1385 );
1386
1387 let mut converter =
1388 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1389
1390 let key_values1 = build_key_values_with_ts_seq_values(
1391 &metadata,
1392 "key1".to_string(),
1393 1u32,
1394 vec![1000, 2000].into_iter(),
1395 vec![Some(1.0), Some(2.0)].into_iter(),
1396 1,
1397 );
1398
1399 let key_values2 = build_key_values_with_ts_seq_values(
1400 &metadata,
1401 "key2".to_string(),
1402 2u32,
1403 vec![1500].into_iter(),
1404 vec![Some(3.0)].into_iter(),
1405 2,
1406 );
1407
1408 converter.append_key_values(&key_values1).unwrap();
1409 converter.append_key_values(&key_values2).unwrap();
1410
1411 let bulk_part = converter.convert().unwrap();
1412
1413 assert_eq!(bulk_part.num_rows(), 3);
1414 assert_eq!(bulk_part.min_ts, 1000);
1415 assert_eq!(bulk_part.max_ts, 2000);
1416 assert_eq!(bulk_part.sequence, 2);
1417 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1418
1419 let schema = bulk_part.batch.schema();
1422 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1423 assert_eq!(
1424 field_names,
1425 vec![
1426 "k0",
1427 "k1",
1428 "v0",
1429 "v1",
1430 "ts",
1431 "__primary_key",
1432 "__sequence",
1433 "__op_type"
1434 ]
1435 );
1436 }
1437
1438 #[test]
1439 fn test_bulk_part_converter_sorting() {
1440 let metadata = metadata_for_test();
1441 let capacity = 100;
1442 let primary_key_codec = build_primary_key_codec(&metadata);
1443 let schema = to_flat_sst_arrow_schema(
1444 &metadata,
1445 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1446 );
1447
1448 let mut converter =
1449 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1450
1451 let key_values1 = build_key_values_with_ts_seq_values(
1452 &metadata,
1453 "z_key".to_string(),
1454 3u32,
1455 vec![3000].into_iter(),
1456 vec![Some(3.0)].into_iter(),
1457 3,
1458 );
1459
1460 let key_values2 = build_key_values_with_ts_seq_values(
1461 &metadata,
1462 "a_key".to_string(),
1463 1u32,
1464 vec![1000].into_iter(),
1465 vec![Some(1.0)].into_iter(),
1466 1,
1467 );
1468
1469 let key_values3 = build_key_values_with_ts_seq_values(
1470 &metadata,
1471 "m_key".to_string(),
1472 2u32,
1473 vec![2000].into_iter(),
1474 vec![Some(2.0)].into_iter(),
1475 2,
1476 );
1477
1478 converter.append_key_values(&key_values1).unwrap();
1479 converter.append_key_values(&key_values2).unwrap();
1480 converter.append_key_values(&key_values3).unwrap();
1481
1482 let bulk_part = converter.convert().unwrap();
1483
1484 assert_eq!(bulk_part.num_rows(), 3);
1485
1486 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1487 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1488
1489 let ts_array = ts_column
1490 .as_any()
1491 .downcast_ref::<TimestampMillisecondArray>()
1492 .unwrap();
1493 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1494
1495 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1496 assert_eq!(seq_array.values(), &[1, 2, 3]);
1497
1498 let schema = bulk_part.batch.schema();
1500 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1501 assert_eq!(
1502 field_names,
1503 vec![
1504 "k0",
1505 "k1",
1506 "v0",
1507 "v1",
1508 "ts",
1509 "__primary_key",
1510 "__sequence",
1511 "__op_type"
1512 ]
1513 );
1514 }
1515
1516 #[test]
1517 fn test_bulk_part_converter_empty() {
1518 let metadata = metadata_for_test();
1519 let capacity = 10;
1520 let primary_key_codec = build_primary_key_codec(&metadata);
1521 let schema = to_flat_sst_arrow_schema(
1522 &metadata,
1523 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1524 );
1525
1526 let converter =
1527 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1528
1529 let bulk_part = converter.convert().unwrap();
1530
1531 assert_eq!(bulk_part.num_rows(), 0);
1532 assert_eq!(bulk_part.min_ts, i64::MAX);
1533 assert_eq!(bulk_part.max_ts, i64::MIN);
1534 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1535
1536 let schema = bulk_part.batch.schema();
1538 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1539 assert_eq!(
1540 field_names,
1541 vec![
1542 "k0",
1543 "k1",
1544 "v0",
1545 "v1",
1546 "ts",
1547 "__primary_key",
1548 "__sequence",
1549 "__op_type"
1550 ]
1551 );
1552 }
1553
1554 #[test]
1555 fn test_bulk_part_converter_without_primary_key_columns() {
1556 let metadata = metadata_for_test();
1557 let primary_key_codec = build_primary_key_codec(&metadata);
1558 let schema = to_flat_sst_arrow_schema(
1559 &metadata,
1560 &FlatSchemaOptions {
1561 raw_pk_columns: false,
1562 string_pk_use_dict: true,
1563 },
1564 );
1565
1566 let capacity = 100;
1567 let mut converter =
1568 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1569
1570 let key_values1 = build_key_values_with_ts_seq_values(
1571 &metadata,
1572 "key1".to_string(),
1573 1u32,
1574 vec![1000, 2000].into_iter(),
1575 vec![Some(1.0), Some(2.0)].into_iter(),
1576 1,
1577 );
1578
1579 let key_values2 = build_key_values_with_ts_seq_values(
1580 &metadata,
1581 "key2".to_string(),
1582 2u32,
1583 vec![1500].into_iter(),
1584 vec![Some(3.0)].into_iter(),
1585 2,
1586 );
1587
1588 converter.append_key_values(&key_values1).unwrap();
1589 converter.append_key_values(&key_values2).unwrap();
1590
1591 let bulk_part = converter.convert().unwrap();
1592
1593 assert_eq!(bulk_part.num_rows(), 3);
1594 assert_eq!(bulk_part.min_ts, 1000);
1595 assert_eq!(bulk_part.max_ts, 2000);
1596 assert_eq!(bulk_part.sequence, 2);
1597 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1598
1599 let schema = bulk_part.batch.schema();
1601 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1602 assert_eq!(
1603 field_names,
1604 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1605 );
1606 }
1607
1608 #[allow(clippy::too_many_arguments)]
1609 fn build_key_values_with_sparse_encoding(
1610 metadata: &RegionMetadataRef,
1611 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1612 table_id: u32,
1613 tsid: u64,
1614 k0: String,
1615 k1: String,
1616 timestamps: impl Iterator<Item = i64>,
1617 values: impl Iterator<Item = Option<f64>>,
1618 sequence: SequenceNumber,
1619 ) -> KeyValues {
1620 let pk_values = vec![
1622 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1623 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1624 (0, Value::String(k0.clone().into())),
1625 (1, Value::String(k1.clone().into())),
1626 ];
1627 let mut encoded_key = Vec::new();
1628 primary_key_codec
1629 .encode_values(&pk_values, &mut encoded_key)
1630 .unwrap();
1631 assert!(!encoded_key.is_empty());
1632
1633 let column_schema = vec![
1635 api::v1::ColumnSchema {
1636 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1637 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1638 ConcreteDataType::binary_datatype(),
1639 )
1640 .unwrap()
1641 .datatype() as i32,
1642 semantic_type: api::v1::SemanticType::Tag as i32,
1643 ..Default::default()
1644 },
1645 api::v1::ColumnSchema {
1646 column_name: "ts".to_string(),
1647 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1648 ConcreteDataType::timestamp_millisecond_datatype(),
1649 )
1650 .unwrap()
1651 .datatype() as i32,
1652 semantic_type: api::v1::SemanticType::Timestamp as i32,
1653 ..Default::default()
1654 },
1655 api::v1::ColumnSchema {
1656 column_name: "v0".to_string(),
1657 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1658 ConcreteDataType::int64_datatype(),
1659 )
1660 .unwrap()
1661 .datatype() as i32,
1662 semantic_type: api::v1::SemanticType::Field as i32,
1663 ..Default::default()
1664 },
1665 api::v1::ColumnSchema {
1666 column_name: "v1".to_string(),
1667 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1668 ConcreteDataType::float64_datatype(),
1669 )
1670 .unwrap()
1671 .datatype() as i32,
1672 semantic_type: api::v1::SemanticType::Field as i32,
1673 ..Default::default()
1674 },
1675 ];
1676
1677 let rows = timestamps
1678 .zip(values)
1679 .map(|(ts, v)| Row {
1680 values: vec![
1681 api::v1::Value {
1682 value_data: Some(api::v1::value::ValueData::BinaryValue(
1683 encoded_key.clone(),
1684 )),
1685 },
1686 api::v1::Value {
1687 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1688 },
1689 api::v1::Value {
1690 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1691 },
1692 api::v1::Value {
1693 value_data: v.map(api::v1::value::ValueData::F64Value),
1694 },
1695 ],
1696 })
1697 .collect();
1698
1699 let mutation = api::v1::Mutation {
1700 op_type: 1,
1701 sequence,
1702 rows: Some(api::v1::Rows {
1703 schema: column_schema,
1704 rows,
1705 }),
1706 write_hint: Some(WriteHint {
1707 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1708 }),
1709 };
1710 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1711 }
1712
1713 #[test]
1714 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1715 use api::v1::SemanticType;
1716 use datatypes::schema::ColumnSchema;
1717 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1718 use store_api::storage::RegionId;
1719
1720 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1721 builder
1722 .push_column_metadata(ColumnMetadata {
1723 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1724 semantic_type: SemanticType::Tag,
1725 column_id: 0,
1726 })
1727 .push_column_metadata(ColumnMetadata {
1728 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1729 semantic_type: SemanticType::Tag,
1730 column_id: 1,
1731 })
1732 .push_column_metadata(ColumnMetadata {
1733 column_schema: ColumnSchema::new(
1734 "ts",
1735 ConcreteDataType::timestamp_millisecond_datatype(),
1736 false,
1737 ),
1738 semantic_type: SemanticType::Timestamp,
1739 column_id: 2,
1740 })
1741 .push_column_metadata(ColumnMetadata {
1742 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1743 semantic_type: SemanticType::Field,
1744 column_id: 3,
1745 })
1746 .push_column_metadata(ColumnMetadata {
1747 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1748 semantic_type: SemanticType::Field,
1749 column_id: 4,
1750 })
1751 .primary_key(vec![0, 1])
1752 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1753 let metadata = Arc::new(builder.build().unwrap());
1754
1755 let primary_key_codec = build_primary_key_codec(&metadata);
1756 let schema = to_flat_sst_arrow_schema(
1757 &metadata,
1758 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1759 );
1760
1761 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1762 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1763
1764 let capacity = 100;
1765 let mut converter =
1766 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1767
1768 let key_values1 = build_key_values_with_sparse_encoding(
1769 &metadata,
1770 &primary_key_codec,
1771 2048u32, 100u64, "key11".to_string(),
1774 "key21".to_string(),
1775 vec![1000, 2000].into_iter(),
1776 vec![Some(1.0), Some(2.0)].into_iter(),
1777 1,
1778 );
1779
1780 let key_values2 = build_key_values_with_sparse_encoding(
1781 &metadata,
1782 &primary_key_codec,
1783 4096u32, 200u64, "key12".to_string(),
1786 "key22".to_string(),
1787 vec![1500].into_iter(),
1788 vec![Some(3.0)].into_iter(),
1789 2,
1790 );
1791
1792 converter.append_key_values(&key_values1).unwrap();
1793 converter.append_key_values(&key_values2).unwrap();
1794
1795 let bulk_part = converter.convert().unwrap();
1796
1797 assert_eq!(bulk_part.num_rows(), 3);
1798 assert_eq!(bulk_part.min_ts, 1000);
1799 assert_eq!(bulk_part.max_ts, 2000);
1800 assert_eq!(bulk_part.sequence, 2);
1801 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1802
1803 let schema = bulk_part.batch.schema();
1807 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1808 assert_eq!(
1809 field_names,
1810 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1811 );
1812
1813 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1815 let dict_array = primary_key_column
1816 .as_any()
1817 .downcast_ref::<DictionaryArray<UInt32Type>>()
1818 .unwrap();
1819
1820 assert!(!dict_array.is_empty());
1822 assert_eq!(dict_array.len(), 3); let values = dict_array
1826 .values()
1827 .as_any()
1828 .downcast_ref::<BinaryArray>()
1829 .unwrap();
1830 for i in 0..values.len() {
1831 assert!(
1832 !values.value(i).is_empty(),
1833 "Encoded primary key should not be empty"
1834 );
1835 }
1836 }
1837}