1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datatypes::arrow;
28use datatypes::arrow::array::{
29 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
30 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
31 TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder,
32 UInt8Array, UInt8Builder,
33};
34use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
35use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::{Value, ValueRef};
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
42use mito_codec::row_converter::{
43 build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt,
44};
45use parquet::arrow::ArrowWriter;
46use parquet::data_type::AsBytes;
47use parquet::file::metadata::ParquetMetaData;
48use parquet::file::properties::WriterProperties;
49use snafu::{OptionExt, ResultExt, Snafu};
50use store_api::codec::PrimaryKeyEncoding;
51use store_api::metadata::{RegionMetadata, RegionMetadataRef};
52use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
53use store_api::storage::SequenceNumber;
54use table::predicate::Predicate;
55
56use crate::error::{
57 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
58 EncodeSnafu, NewRecordBatchSnafu, Result,
59};
60use crate::memtable::bulk::context::BulkIterContextRef;
61use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
62use crate::memtable::time_series::{ValueBuilder, Values};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::sst::parquet::flat_format::primary_key_column_index;
65use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
66use crate::sst::parquet::helper::parse_parquet_metadata;
67use crate::sst::to_sst_arrow_schema;
68
69const INIT_DICT_VALUE_CAPACITY: usize = 8;
70
71#[derive(Clone)]
72pub struct BulkPart {
73 pub batch: RecordBatch,
74 pub max_ts: i64,
75 pub min_ts: i64,
76 pub sequence: u64,
77 pub timestamp_index: usize,
78 pub raw_data: Option<ArrowIpc>,
79}
80
81impl TryFrom<BulkWalEntry> for BulkPart {
82 type Error = error::Error;
83
84 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
85 match value.body.expect("Entry payload should be present") {
86 Body::ArrowIpc(ipc) => {
87 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
88 .context(error::ConvertBulkWalEntrySnafu)?;
89 let batch = decoder
90 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
91 .context(error::ConvertBulkWalEntrySnafu)?;
92 Ok(Self {
93 batch,
94 max_ts: value.max_ts,
95 min_ts: value.min_ts,
96 sequence: value.sequence,
97 timestamp_index: value.timestamp_index as usize,
98 raw_data: Some(ipc),
99 })
100 }
101 }
102 }
103}
104
105impl TryFrom<&BulkPart> for BulkWalEntry {
106 type Error = error::Error;
107
108 fn try_from(value: &BulkPart) -> Result<Self> {
109 if let Some(ipc) = &value.raw_data {
110 Ok(BulkWalEntry {
111 sequence: value.sequence,
112 max_ts: value.max_ts,
113 min_ts: value.min_ts,
114 timestamp_index: value.timestamp_index as u32,
115 body: Some(Body::ArrowIpc(ipc.clone())),
116 })
117 } else {
118 let mut encoder = FlightEncoder::default();
119 let schema_bytes = encoder
120 .encode_schema(value.batch.schema().as_ref())
121 .data_header;
122 let [rb_data] = encoder
123 .encode(FlightMessage::RecordBatch(value.batch.clone()))
124 .try_into()
125 .map_err(|_| {
126 error::UnsupportedOperationSnafu {
127 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
128 }
129 .build()
130 })?;
131 Ok(BulkWalEntry {
132 sequence: value.sequence,
133 max_ts: value.max_ts,
134 min_ts: value.min_ts,
135 timestamp_index: value.timestamp_index as u32,
136 body: Some(Body::ArrowIpc(ArrowIpc {
137 schema: schema_bytes,
138 data_header: rb_data.data_header,
139 payload: rb_data.data_body,
140 })),
141 })
142 }
143 }
144}
145
146impl BulkPart {
147 pub(crate) fn estimated_size(&self) -> usize {
148 self.batch
149 .columns()
150 .iter()
151 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
153 .sum()
154 }
155
156 pub fn estimated_series_count(&self) -> usize {
159 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
160 let pk_column = self.batch.column(pk_column_idx);
161 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
162 dict_array.values().len()
163 } else {
164 0
165 }
166 }
167
168 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
170 let vectors = region_metadata
171 .schema
172 .column_schemas()
173 .iter()
174 .map(|col| match self.batch.column_by_name(&col.name) {
175 None => Ok(None),
176 Some(col) => Helper::try_into_vector(col).map(Some),
177 })
178 .collect::<datatypes::error::Result<Vec<_>>>()
179 .context(error::ComputeVectorSnafu)?;
180
181 let rows = (0..self.num_rows())
182 .map(|row_idx| {
183 let values = (0..self.batch.num_columns())
184 .map(|col_idx| {
185 if let Some(v) = &vectors[col_idx] {
186 value_to_grpc_value(v.get(row_idx))
187 } else {
188 api::v1::Value { value_data: None }
189 }
190 })
191 .collect::<Vec<_>>();
192 api::v1::Row { values }
193 })
194 .collect::<Vec<_>>();
195
196 let schema = region_metadata
197 .column_metadatas
198 .iter()
199 .map(|c| {
200 let data_type_wrapper =
201 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
202 Ok(api::v1::ColumnSchema {
203 column_name: c.column_schema.name.clone(),
204 datatype: data_type_wrapper.datatype() as i32,
205 semantic_type: c.semantic_type as i32,
206 ..Default::default()
207 })
208 })
209 .collect::<api::error::Result<Vec<_>>>()
210 .context(error::ConvertColumnDataTypeSnafu {
211 reason: "failed to convert region metadata to column schema",
212 })?;
213
214 let rows = api::v1::Rows { schema, rows };
215
216 Ok(Mutation {
217 op_type: OpType::Put as i32,
218 sequence: self.sequence,
219 rows: Some(rows),
220 write_hint: None,
221 })
222 }
223
224 pub fn timestamps(&self) -> &ArrayRef {
225 self.batch.column(self.timestamp_index)
226 }
227
228 pub fn num_rows(&self) -> usize {
229 self.batch.num_rows()
230 }
231}
232
233enum PrimaryKeyColumnBuilder {
235 StringDict(StringDictionaryBuilder<UInt32Type>),
237 Vector(Box<dyn MutableVector>),
239}
240
241impl PrimaryKeyColumnBuilder {
242 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
244 match self {
245 PrimaryKeyColumnBuilder::StringDict(builder) => {
246 if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
247 builder.append_value(s);
249 } else {
250 builder.append_null();
251 }
252 }
253 PrimaryKeyColumnBuilder::Vector(builder) => {
254 builder.push_value_ref(value);
255 }
256 }
257 Ok(())
258 }
259
260 fn into_arrow_array(self) -> ArrayRef {
262 match self {
263 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
264 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
265 }
266 }
267}
268
269pub struct BulkPartConverter {
271 region_metadata: RegionMetadataRef,
273 schema: SchemaRef,
275 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
277 key_buf: Vec<u8>,
279 key_array_builder: PrimaryKeyArrayBuilder,
281 value_builder: ValueBuilder,
283 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
286
287 max_ts: i64,
289 min_ts: i64,
291 max_sequence: SequenceNumber,
293}
294
295impl BulkPartConverter {
296 pub fn new(
301 region_metadata: &RegionMetadataRef,
302 schema: SchemaRef,
303 capacity: usize,
304 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
305 store_primary_key_columns: bool,
306 ) -> Self {
307 debug_assert_eq!(
308 region_metadata.primary_key_encoding,
309 primary_key_codec.encoding()
310 );
311
312 let primary_key_column_builders = if store_primary_key_columns
313 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
314 {
315 new_primary_key_column_builders(region_metadata, capacity)
316 } else {
317 Vec::new()
318 };
319
320 Self {
321 region_metadata: region_metadata.clone(),
322 schema,
323 primary_key_codec,
324 key_buf: Vec::new(),
325 key_array_builder: PrimaryKeyArrayBuilder::new(),
326 value_builder: ValueBuilder::new(region_metadata, capacity),
327 primary_key_column_builders,
328 min_ts: i64::MAX,
329 max_ts: i64::MIN,
330 max_sequence: SequenceNumber::MIN,
331 }
332 }
333
334 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
336 for kv in key_values.iter() {
337 self.append_key_value(&kv)?;
338 }
339
340 Ok(())
341 }
342
343 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
347 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
349 let mut primary_keys = kv.primary_keys();
352 if let Some(encoded) = primary_keys
353 .next()
354 .context(ColumnNotFoundSnafu {
355 column: PRIMARY_KEY_COLUMN_NAME,
356 })?
357 .as_binary()
358 .context(DataTypeMismatchSnafu)?
359 {
360 self.key_array_builder
361 .append(encoded)
362 .context(ComputeArrowSnafu)?;
363 } else {
364 self.key_array_builder
365 .append("")
366 .context(ComputeArrowSnafu)?;
367 }
368 } else {
369 self.key_buf.clear();
371 self.primary_key_codec
372 .encode_key_value(kv, &mut self.key_buf)
373 .context(EncodeSnafu)?;
374 self.key_array_builder
375 .append(&self.key_buf)
376 .context(ComputeArrowSnafu)?;
377 };
378
379 if !self.primary_key_column_builders.is_empty() {
381 for (builder, pk_value) in self
382 .primary_key_column_builders
383 .iter_mut()
384 .zip(kv.primary_keys())
385 {
386 builder.push_value_ref(pk_value)?;
387 }
388 }
389
390 self.value_builder.push(
392 kv.timestamp(),
393 kv.sequence(),
394 kv.op_type() as u8,
395 kv.fields(),
396 );
397
398 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
401 self.min_ts = self.min_ts.min(ts);
402 self.max_ts = self.max_ts.max(ts);
403 self.max_sequence = self.max_sequence.max(kv.sequence());
404
405 Ok(())
406 }
407
408 pub fn convert(mut self) -> Result<BulkPart> {
412 let values = Values::from(self.value_builder);
413 let mut columns =
414 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
415
416 for builder in self.primary_key_column_builders {
418 columns.push(builder.into_arrow_array());
419 }
420 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
422 let timestamp_index = columns.len();
424 columns.push(values.timestamp.to_arrow_array());
425 let pk_array = self.key_array_builder.finish();
427 columns.push(Arc::new(pk_array));
428 columns.push(values.sequence.to_arrow_array());
430 columns.push(values.op_type.to_arrow_array());
431
432 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
433 let batch = sort_primary_key_record_batch(&batch)?;
435
436 Ok(BulkPart {
437 batch,
438 max_ts: self.max_ts,
439 min_ts: self.min_ts,
440 sequence: self.max_sequence,
441 timestamp_index,
442 raw_data: None,
443 })
444 }
445}
446
447fn new_primary_key_column_builders(
448 metadata: &RegionMetadata,
449 capacity: usize,
450) -> Vec<PrimaryKeyColumnBuilder> {
451 metadata
452 .primary_key_columns()
453 .map(|col| {
454 if col.column_schema.data_type.is_string() {
455 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
456 capacity,
457 INIT_DICT_VALUE_CAPACITY,
458 capacity,
459 ))
460 } else {
461 PrimaryKeyColumnBuilder::Vector(
462 col.column_schema.data_type.create_mutable_vector(capacity),
463 )
464 }
465 })
466 .collect()
467}
468
469fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
471 let total_columns = batch.num_columns();
472 let sort_columns = vec![
473 SortColumn {
475 values: batch.column(total_columns - 3).clone(),
476 options: Some(SortOptions {
477 descending: false,
478 nulls_first: true,
479 }),
480 },
481 SortColumn {
483 values: batch.column(total_columns - 4).clone(),
484 options: Some(SortOptions {
485 descending: false,
486 nulls_first: true,
487 }),
488 },
489 SortColumn {
491 values: batch.column(total_columns - 2).clone(),
492 options: Some(SortOptions {
493 descending: true,
494 nulls_first: true,
495 }),
496 },
497 ];
498
499 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
500 .context(ComputeArrowSnafu)?;
501
502 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
503}
504
505#[derive(Debug, Clone)]
506pub struct EncodedBulkPart {
507 data: Bytes,
508 metadata: BulkPartMeta,
509}
510
511impl EncodedBulkPart {
512 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
513 Self { data, metadata }
514 }
515
516 pub(crate) fn metadata(&self) -> &BulkPartMeta {
517 &self.metadata
518 }
519
520 pub(crate) fn read(
521 &self,
522 context: BulkIterContextRef,
523 sequence: Option<SequenceNumber>,
524 ) -> Result<Option<BoxedRecordBatchIterator>> {
525 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
527
528 if row_groups_to_read.is_empty() {
529 return Ok(None);
531 }
532
533 let iter = EncodedBulkPartIter::try_new(
534 context,
535 row_groups_to_read,
536 self.metadata.parquet_metadata.clone(),
537 self.data.clone(),
538 sequence,
539 )?;
540 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
541 }
542}
543
544#[derive(Debug, Clone)]
545pub struct BulkPartMeta {
546 pub num_rows: usize,
548 pub max_timestamp: i64,
550 pub min_timestamp: i64,
552 pub parquet_metadata: Arc<ParquetMetaData>,
554 pub region_metadata: RegionMetadataRef,
556}
557
558pub struct BulkPartEncoder {
559 metadata: RegionMetadataRef,
560 row_group_size: usize,
561 writer_props: Option<WriterProperties>,
562}
563
564impl BulkPartEncoder {
565 pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder {
566 let writer_props = Some(
567 WriterProperties::builder()
568 .set_write_batch_size(row_group_size)
569 .set_max_row_group_size(row_group_size)
570 .build(),
571 );
572 Self {
573 metadata,
574 row_group_size,
575 writer_props,
576 }
577 }
578}
579
580impl BulkPartEncoder {
581 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
583 if part.batch.num_rows() == 0 {
584 return Ok(None);
585 }
586
587 let mut buf = Vec::with_capacity(4096);
588 let arrow_schema = part.batch.schema();
589
590 let file_metadata = {
591 let mut writer =
592 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
593 .context(EncodeMemtableSnafu)?;
594 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
595 writer.finish().context(EncodeMemtableSnafu)?
596 };
597
598 let buf = Bytes::from(buf);
599 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
600
601 Ok(Some(EncodedBulkPart {
602 data: buf,
603 metadata: BulkPartMeta {
604 num_rows: part.batch.num_rows(),
605 max_timestamp: part.max_ts,
606 min_timestamp: part.min_ts,
607 parquet_metadata,
608 region_metadata: self.metadata.clone(),
609 },
610 }))
611 }
612}
613
614fn mutations_to_record_batch(
616 mutations: &[Mutation],
617 metadata: &RegionMetadataRef,
618 pk_encoder: &DensePrimaryKeyCodec,
619 dedup: bool,
620) -> Result<Option<(RecordBatch, i64, i64)>> {
621 let total_rows: usize = mutations
622 .iter()
623 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
624 .sum();
625
626 if total_rows == 0 {
627 return Ok(None);
628 }
629
630 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
631
632 let mut ts_vector: Box<dyn MutableVector> = metadata
633 .time_index_column()
634 .column_schema
635 .data_type
636 .create_mutable_vector(total_rows);
637 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
638 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
639
640 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
641 .field_columns()
642 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
643 .collect();
644
645 let mut pk_buffer = vec![];
646 for m in mutations {
647 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
648 continue;
649 };
650
651 for row in key_values.iter() {
652 pk_buffer.clear();
653 pk_encoder
654 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
655 .context(EncodeSnafu)?;
656 pk_builder.append_value(pk_buffer.as_bytes());
657 ts_vector.push_value_ref(row.timestamp());
658 sequence_builder.append_value(row.sequence());
659 op_type_builder.append_value(row.op_type() as u8);
660 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
661 builder.push_value_ref(field);
662 }
663 }
664 }
665
666 let arrow_schema = to_sst_arrow_schema(metadata);
667 let timestamp_unit = metadata
669 .time_index_column()
670 .column_schema
671 .data_type
672 .as_timestamp()
673 .unwrap()
674 .unit();
675 let sorter = ArraysSorter {
676 encoded_primary_keys: pk_builder.finish(),
677 timestamp_unit,
678 timestamp: ts_vector.to_vector().to_arrow_array(),
679 sequence: sequence_builder.finish(),
680 op_type: op_type_builder.finish(),
681 fields: field_builders
682 .iter_mut()
683 .map(|f| f.to_vector().to_arrow_array()),
684 dedup,
685 arrow_schema,
686 };
687
688 sorter.sort().map(Some)
689}
690
691struct ArraysSorter<I> {
692 encoded_primary_keys: BinaryArray,
693 timestamp_unit: TimeUnit,
694 timestamp: ArrayRef,
695 sequence: UInt64Array,
696 op_type: UInt8Array,
697 fields: I,
698 dedup: bool,
699 arrow_schema: SchemaRef,
700}
701
702impl<I> ArraysSorter<I>
703where
704 I: Iterator<Item = ArrayRef>,
705{
706 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
708 debug_assert!(!self.timestamp.is_empty());
709 debug_assert!(self.timestamp.len() == self.sequence.len());
710 debug_assert!(self.timestamp.len() == self.op_type.len());
711 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
712
713 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
714 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
715 let mut to_sort = self
716 .encoded_primary_keys
717 .iter()
718 .zip(timestamp_iter)
719 .zip(self.sequence.iter())
720 .map(|((pk, timestamp), sequence)| {
721 max_timestamp = max_timestamp.max(*timestamp);
722 min_timestamp = min_timestamp.min(*timestamp);
723 (pk, timestamp, sequence)
724 })
725 .enumerate()
726 .collect::<Vec<_>>();
727
728 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
729 l_pk.cmp(r_pk)
730 .then(l_ts.cmp(r_ts))
731 .then(l_seq.cmp(r_seq).reverse())
732 });
733
734 if self.dedup {
735 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
737 l_pk == r_pk && l_ts == r_ts
738 });
739 }
740
741 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
742
743 let pk_dictionary = Arc::new(binary_array_to_dictionary(
744 arrow::compute::take(
746 &self.encoded_primary_keys,
747 &indices,
748 Some(TakeOptions {
749 check_bounds: false,
750 }),
751 )
752 .context(ComputeArrowSnafu)?
753 .as_any()
754 .downcast_ref::<BinaryArray>()
755 .unwrap(),
756 )?) as ArrayRef;
757
758 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
759 for arr in self.fields {
760 arrays.push(
761 arrow::compute::take(
762 &arr,
763 &indices,
764 Some(TakeOptions {
765 check_bounds: false,
766 }),
767 )
768 .context(ComputeArrowSnafu)?,
769 );
770 }
771
772 let timestamp = arrow::compute::take(
773 &self.timestamp,
774 &indices,
775 Some(TakeOptions {
776 check_bounds: false,
777 }),
778 )
779 .context(ComputeArrowSnafu)?;
780
781 arrays.push(timestamp);
782 arrays.push(pk_dictionary);
783 arrays.push(
784 arrow::compute::take(
785 &self.sequence,
786 &indices,
787 Some(TakeOptions {
788 check_bounds: false,
789 }),
790 )
791 .context(ComputeArrowSnafu)?,
792 );
793
794 arrays.push(
795 arrow::compute::take(
796 &self.op_type,
797 &indices,
798 Some(TakeOptions {
799 check_bounds: false,
800 }),
801 )
802 .context(ComputeArrowSnafu)?,
803 );
804
805 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
806 Ok((batch, min_timestamp, max_timestamp))
807 }
808}
809
810fn timestamp_array_to_iter(
812 timestamp_unit: TimeUnit,
813 timestamp: &ArrayRef,
814) -> impl Iterator<Item = &i64> {
815 match timestamp_unit {
816 TimeUnit::Second => timestamp
818 .as_any()
819 .downcast_ref::<TimestampSecondArray>()
820 .unwrap()
821 .values()
822 .iter(),
823 TimeUnit::Millisecond => timestamp
824 .as_any()
825 .downcast_ref::<TimestampMillisecondArray>()
826 .unwrap()
827 .values()
828 .iter(),
829 TimeUnit::Microsecond => timestamp
830 .as_any()
831 .downcast_ref::<TimestampMicrosecondArray>()
832 .unwrap()
833 .values()
834 .iter(),
835 TimeUnit::Nanosecond => timestamp
836 .as_any()
837 .downcast_ref::<TimestampNanosecondArray>()
838 .unwrap()
839 .values()
840 .iter(),
841 }
842}
843
844fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
846 if input.is_empty() {
847 return Ok(DictionaryArray::new(
848 UInt32Array::from(Vec::<u32>::new()),
849 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
850 ));
851 }
852 let mut keys = Vec::with_capacity(16);
853 let mut values = BinaryBuilder::new();
854 let mut prev: usize = 0;
855 keys.push(prev as u32);
856 values.append_value(input.value(prev));
857
858 for current_bytes in input.iter().skip(1) {
859 let current_bytes = current_bytes.unwrap();
861 let prev_bytes = input.value(prev);
862 if current_bytes != prev_bytes {
863 values.append_value(current_bytes);
864 prev += 1;
865 }
866 keys.push(prev as u32);
867 }
868
869 Ok(DictionaryArray::new(
870 UInt32Array::from(keys),
871 Arc::new(values.finish()) as ArrayRef,
872 ))
873}
874
875#[cfg(test)]
876mod tests {
877 use std::collections::VecDeque;
878
879 use api::v1::{Row, WriteHint};
880 use datafusion_common::ScalarValue;
881 use datatypes::arrow::array::Float64Array;
882 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
883 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
884 use store_api::storage::consts::ReservedColumnId;
885
886 use super::*;
887 use crate::memtable::bulk::context::BulkIterContext;
888 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
889 use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
890 use crate::test_util::memtable_util::{
891 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
892 };
893
894 fn check_binary_array_to_dictionary(
895 input: &[&[u8]],
896 expected_keys: &[u32],
897 expected_values: &[&[u8]],
898 ) {
899 let input = BinaryArray::from_iter_values(input.iter());
900 let array = binary_array_to_dictionary(&input).unwrap();
901 assert_eq!(
902 &expected_keys,
903 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
904 );
905 assert_eq!(
906 expected_values,
907 &array
908 .values()
909 .as_any()
910 .downcast_ref::<BinaryArray>()
911 .unwrap()
912 .iter()
913 .map(|v| v.unwrap())
914 .collect::<Vec<_>>()
915 );
916 }
917
918 #[test]
919 fn test_binary_array_to_dictionary() {
920 check_binary_array_to_dictionary(&[], &[], &[]);
921
922 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
923
924 check_binary_array_to_dictionary(
925 &["a".as_bytes(), "a".as_bytes()],
926 &[0, 0],
927 &["a".as_bytes()],
928 );
929
930 check_binary_array_to_dictionary(
931 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
932 &[0, 0, 1],
933 &["a".as_bytes(), "b".as_bytes()],
934 );
935
936 check_binary_array_to_dictionary(
937 &[
938 "a".as_bytes(),
939 "a".as_bytes(),
940 "b".as_bytes(),
941 "c".as_bytes(),
942 ],
943 &[0, 0, 1, 2],
944 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
945 );
946 }
947
948 struct MutationInput<'a> {
949 k0: &'a str,
950 k1: u32,
951 timestamps: &'a [i64],
952 v1: &'a [Option<f64>],
953 sequence: u64,
954 }
955
956 #[derive(Debug, PartialOrd, PartialEq)]
957 struct BatchOutput<'a> {
958 pk_values: &'a [Value],
959 timestamps: &'a [i64],
960 v1: &'a [Option<f64>],
961 }
962
963 fn check_mutations_to_record_batches(
964 input: &[MutationInput],
965 expected: &[BatchOutput],
966 expected_timestamp: (i64, i64),
967 dedup: bool,
968 ) {
969 let metadata = metadata_for_test();
970 let mutations = input
971 .iter()
972 .map(|m| {
973 build_key_values_with_ts_seq_values(
974 &metadata,
975 m.k0.to_string(),
976 m.k1,
977 m.timestamps.iter().copied(),
978 m.v1.iter().copied(),
979 m.sequence,
980 )
981 .mutation
982 })
983 .collect::<Vec<_>>();
984 let total_rows: usize = mutations
985 .iter()
986 .flat_map(|m| m.rows.iter())
987 .map(|r| r.rows.len())
988 .sum();
989
990 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
991
992 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
993 .unwrap()
994 .unwrap();
995 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
996 let mut batches = VecDeque::new();
997 read_format
998 .convert_record_batch(&batch, None, &mut batches)
999 .unwrap();
1000 if !dedup {
1001 assert_eq!(
1002 total_rows,
1003 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1004 );
1005 }
1006 let batch_values = batches
1007 .into_iter()
1008 .map(|b| {
1009 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1010 let timestamps = b
1011 .timestamps()
1012 .as_any()
1013 .downcast_ref::<TimestampMillisecondVector>()
1014 .unwrap()
1015 .iter_data()
1016 .map(|v| v.unwrap().0.value())
1017 .collect::<Vec<_>>();
1018 let float_values = b.fields()[1]
1019 .data
1020 .as_any()
1021 .downcast_ref::<Float64Vector>()
1022 .unwrap()
1023 .iter_data()
1024 .collect::<Vec<_>>();
1025
1026 (pk_values, timestamps, float_values)
1027 })
1028 .collect::<Vec<_>>();
1029 assert_eq!(expected.len(), batch_values.len());
1030
1031 for idx in 0..expected.len() {
1032 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1033 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1034 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1035 }
1036 }
1037
1038 #[test]
1039 fn test_mutations_to_record_batch() {
1040 check_mutations_to_record_batches(
1041 &[MutationInput {
1042 k0: "a",
1043 k1: 0,
1044 timestamps: &[0],
1045 v1: &[Some(0.1)],
1046 sequence: 0,
1047 }],
1048 &[BatchOutput {
1049 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1050 timestamps: &[0],
1051 v1: &[Some(0.1)],
1052 }],
1053 (0, 0),
1054 true,
1055 );
1056
1057 check_mutations_to_record_batches(
1058 &[
1059 MutationInput {
1060 k0: "a",
1061 k1: 0,
1062 timestamps: &[0],
1063 v1: &[Some(0.1)],
1064 sequence: 0,
1065 },
1066 MutationInput {
1067 k0: "b",
1068 k1: 0,
1069 timestamps: &[0],
1070 v1: &[Some(0.0)],
1071 sequence: 0,
1072 },
1073 MutationInput {
1074 k0: "a",
1075 k1: 0,
1076 timestamps: &[1],
1077 v1: &[Some(0.2)],
1078 sequence: 1,
1079 },
1080 MutationInput {
1081 k0: "a",
1082 k1: 1,
1083 timestamps: &[1],
1084 v1: &[Some(0.3)],
1085 sequence: 2,
1086 },
1087 ],
1088 &[
1089 BatchOutput {
1090 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1091 timestamps: &[0, 1],
1092 v1: &[Some(0.1), Some(0.2)],
1093 },
1094 BatchOutput {
1095 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1096 timestamps: &[1],
1097 v1: &[Some(0.3)],
1098 },
1099 BatchOutput {
1100 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1101 timestamps: &[0],
1102 v1: &[Some(0.0)],
1103 },
1104 ],
1105 (0, 1),
1106 true,
1107 );
1108
1109 check_mutations_to_record_batches(
1110 &[
1111 MutationInput {
1112 k0: "a",
1113 k1: 0,
1114 timestamps: &[0],
1115 v1: &[Some(0.1)],
1116 sequence: 0,
1117 },
1118 MutationInput {
1119 k0: "b",
1120 k1: 0,
1121 timestamps: &[0],
1122 v1: &[Some(0.0)],
1123 sequence: 0,
1124 },
1125 MutationInput {
1126 k0: "a",
1127 k1: 0,
1128 timestamps: &[0],
1129 v1: &[Some(0.2)],
1130 sequence: 1,
1131 },
1132 ],
1133 &[
1134 BatchOutput {
1135 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1136 timestamps: &[0],
1137 v1: &[Some(0.2)],
1138 },
1139 BatchOutput {
1140 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1141 timestamps: &[0],
1142 v1: &[Some(0.0)],
1143 },
1144 ],
1145 (0, 0),
1146 true,
1147 );
1148 check_mutations_to_record_batches(
1149 &[
1150 MutationInput {
1151 k0: "a",
1152 k1: 0,
1153 timestamps: &[0],
1154 v1: &[Some(0.1)],
1155 sequence: 0,
1156 },
1157 MutationInput {
1158 k0: "b",
1159 k1: 0,
1160 timestamps: &[0],
1161 v1: &[Some(0.0)],
1162 sequence: 0,
1163 },
1164 MutationInput {
1165 k0: "a",
1166 k1: 0,
1167 timestamps: &[0],
1168 v1: &[Some(0.2)],
1169 sequence: 1,
1170 },
1171 ],
1172 &[
1173 BatchOutput {
1174 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1175 timestamps: &[0, 0],
1176 v1: &[Some(0.2), Some(0.1)],
1177 },
1178 BatchOutput {
1179 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1180 timestamps: &[0],
1181 v1: &[Some(0.0)],
1182 },
1183 ],
1184 (0, 0),
1185 false,
1186 );
1187 }
1188
1189 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1190 let metadata = metadata_for_test();
1191 let kvs = input
1192 .iter()
1193 .map(|m| {
1194 build_key_values_with_ts_seq_values(
1195 &metadata,
1196 m.k0.to_string(),
1197 m.k1,
1198 m.timestamps.iter().copied(),
1199 m.v1.iter().copied(),
1200 m.sequence,
1201 )
1202 })
1203 .collect::<Vec<_>>();
1204 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1205 let primary_key_codec = build_primary_key_codec(&metadata);
1206 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1207 for kv in kvs {
1208 converter.append_key_values(&kv).unwrap();
1209 }
1210 let part = converter.convert().unwrap();
1211 let encoder = BulkPartEncoder::new(metadata, 1024);
1212 encoder.encode_part(&part).unwrap().unwrap()
1213 }
1214
1215 #[test]
1216 fn test_write_and_read_part_projection() {
1217 let part = encode(&[
1218 MutationInput {
1219 k0: "a",
1220 k1: 0,
1221 timestamps: &[1],
1222 v1: &[Some(0.1)],
1223 sequence: 0,
1224 },
1225 MutationInput {
1226 k0: "b",
1227 k1: 0,
1228 timestamps: &[1],
1229 v1: &[Some(0.0)],
1230 sequence: 0,
1231 },
1232 MutationInput {
1233 k0: "a",
1234 k1: 0,
1235 timestamps: &[2],
1236 v1: &[Some(0.2)],
1237 sequence: 1,
1238 },
1239 ]);
1240
1241 let projection = &[4u32];
1242 let mut reader = part
1243 .read(
1244 Arc::new(BulkIterContext::new(
1245 part.metadata.region_metadata.clone(),
1246 &Some(projection.as_slice()),
1247 None,
1248 )),
1249 None,
1250 )
1251 .unwrap()
1252 .expect("expect at least one row group");
1253
1254 let mut total_rows_read = 0;
1255 let mut field: Vec<f64> = vec![];
1256 for res in reader {
1257 let batch = res.unwrap();
1258 assert_eq!(5, batch.num_columns());
1259 field.extend_from_slice(
1260 batch
1261 .column(0)
1262 .as_any()
1263 .downcast_ref::<Float64Array>()
1264 .unwrap()
1265 .values(),
1266 );
1267 total_rows_read += batch.num_rows();
1268 }
1269 assert_eq!(3, total_rows_read);
1270 assert_eq!(vec![0.1, 0.2, 0.0], field);
1271 }
1272
1273 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1274 let metadata = metadata_for_test();
1275 let kvs = key_values
1276 .into_iter()
1277 .map(|(k0, k1, (start, end), sequence)| {
1278 let ts = (start..end);
1279 let v1 = (start..end).map(|_| None);
1280 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1281 })
1282 .collect::<Vec<_>>();
1283 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1284 let primary_key_codec = build_primary_key_codec(&metadata);
1285 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1286 for kv in kvs {
1287 converter.append_key_values(&kv).unwrap();
1288 }
1289 let part = converter.convert().unwrap();
1290 let encoder = BulkPartEncoder::new(metadata, 1024);
1291 encoder.encode_part(&part).unwrap().unwrap()
1292 }
1293
1294 fn check_prune_row_group(
1295 part: &EncodedBulkPart,
1296 predicate: Option<Predicate>,
1297 expected_rows: usize,
1298 ) {
1299 let context = Arc::new(BulkIterContext::new(
1300 part.metadata.region_metadata.clone(),
1301 &None,
1302 predicate,
1303 ));
1304 let mut reader = part
1305 .read(context, None)
1306 .unwrap()
1307 .expect("expect at least one row group");
1308 let mut total_rows_read = 0;
1309 for res in reader {
1310 let batch = res.unwrap();
1311 total_rows_read += batch.num_rows();
1312 }
1313 assert_eq!(expected_rows, total_rows_read);
1315 }
1316
1317 #[test]
1318 fn test_prune_row_groups() {
1319 let part = prepare(vec![
1320 ("a", 0, (0, 40), 1),
1321 ("a", 1, (0, 60), 1),
1322 ("b", 0, (0, 100), 2),
1323 ("b", 1, (100, 180), 3),
1324 ("b", 1, (180, 210), 4),
1325 ]);
1326
1327 let context = Arc::new(BulkIterContext::new(
1328 part.metadata.region_metadata.clone(),
1329 &None,
1330 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1331 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1332 )])),
1333 ));
1334 assert!(part.read(context, None).unwrap().is_none());
1335
1336 check_prune_row_group(&part, None, 310);
1337
1338 check_prune_row_group(
1339 &part,
1340 Some(Predicate::new(vec![
1341 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1342 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1343 ])),
1344 40,
1345 );
1346
1347 check_prune_row_group(
1348 &part,
1349 Some(Predicate::new(vec![
1350 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1351 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1352 ])),
1353 60,
1354 );
1355
1356 check_prune_row_group(
1357 &part,
1358 Some(Predicate::new(vec![
1359 datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1360 ])),
1361 100,
1362 );
1363
1364 check_prune_row_group(
1365 &part,
1366 Some(Predicate::new(vec![
1367 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1368 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1369 ])),
1370 100,
1371 );
1372
1373 check_prune_row_group(
1375 &part,
1376 Some(Predicate::new(vec![
1377 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1378 ])),
1379 1,
1380 );
1381 }
1382
1383 #[test]
1384 fn test_bulk_part_converter_append_and_convert() {
1385 let metadata = metadata_for_test();
1386 let capacity = 100;
1387 let primary_key_codec = build_primary_key_codec(&metadata);
1388 let schema = to_flat_sst_arrow_schema(
1389 &metadata,
1390 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1391 );
1392
1393 let mut converter =
1394 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1395
1396 let key_values1 = build_key_values_with_ts_seq_values(
1397 &metadata,
1398 "key1".to_string(),
1399 1u32,
1400 vec![1000, 2000].into_iter(),
1401 vec![Some(1.0), Some(2.0)].into_iter(),
1402 1,
1403 );
1404
1405 let key_values2 = build_key_values_with_ts_seq_values(
1406 &metadata,
1407 "key2".to_string(),
1408 2u32,
1409 vec![1500].into_iter(),
1410 vec![Some(3.0)].into_iter(),
1411 2,
1412 );
1413
1414 converter.append_key_values(&key_values1).unwrap();
1415 converter.append_key_values(&key_values2).unwrap();
1416
1417 let bulk_part = converter.convert().unwrap();
1418
1419 assert_eq!(bulk_part.num_rows(), 3);
1420 assert_eq!(bulk_part.min_ts, 1000);
1421 assert_eq!(bulk_part.max_ts, 2000);
1422 assert_eq!(bulk_part.sequence, 2);
1423 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1424
1425 let schema = bulk_part.batch.schema();
1428 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1429 assert_eq!(
1430 field_names,
1431 vec![
1432 "k0",
1433 "k1",
1434 "v0",
1435 "v1",
1436 "ts",
1437 "__primary_key",
1438 "__sequence",
1439 "__op_type"
1440 ]
1441 );
1442 }
1443
1444 #[test]
1445 fn test_bulk_part_converter_sorting() {
1446 let metadata = metadata_for_test();
1447 let capacity = 100;
1448 let primary_key_codec = build_primary_key_codec(&metadata);
1449 let schema = to_flat_sst_arrow_schema(
1450 &metadata,
1451 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1452 );
1453
1454 let mut converter =
1455 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1456
1457 let key_values1 = build_key_values_with_ts_seq_values(
1458 &metadata,
1459 "z_key".to_string(),
1460 3u32,
1461 vec![3000].into_iter(),
1462 vec![Some(3.0)].into_iter(),
1463 3,
1464 );
1465
1466 let key_values2 = build_key_values_with_ts_seq_values(
1467 &metadata,
1468 "a_key".to_string(),
1469 1u32,
1470 vec![1000].into_iter(),
1471 vec![Some(1.0)].into_iter(),
1472 1,
1473 );
1474
1475 let key_values3 = build_key_values_with_ts_seq_values(
1476 &metadata,
1477 "m_key".to_string(),
1478 2u32,
1479 vec![2000].into_iter(),
1480 vec![Some(2.0)].into_iter(),
1481 2,
1482 );
1483
1484 converter.append_key_values(&key_values1).unwrap();
1485 converter.append_key_values(&key_values2).unwrap();
1486 converter.append_key_values(&key_values3).unwrap();
1487
1488 let bulk_part = converter.convert().unwrap();
1489
1490 assert_eq!(bulk_part.num_rows(), 3);
1491
1492 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1493 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1494
1495 let ts_array = ts_column
1496 .as_any()
1497 .downcast_ref::<TimestampMillisecondArray>()
1498 .unwrap();
1499 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1500
1501 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1502 assert_eq!(seq_array.values(), &[1, 2, 3]);
1503
1504 let schema = bulk_part.batch.schema();
1506 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1507 assert_eq!(
1508 field_names,
1509 vec![
1510 "k0",
1511 "k1",
1512 "v0",
1513 "v1",
1514 "ts",
1515 "__primary_key",
1516 "__sequence",
1517 "__op_type"
1518 ]
1519 );
1520 }
1521
1522 #[test]
1523 fn test_bulk_part_converter_empty() {
1524 let metadata = metadata_for_test();
1525 let capacity = 10;
1526 let primary_key_codec = build_primary_key_codec(&metadata);
1527 let schema = to_flat_sst_arrow_schema(
1528 &metadata,
1529 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1530 );
1531
1532 let converter =
1533 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1534
1535 let bulk_part = converter.convert().unwrap();
1536
1537 assert_eq!(bulk_part.num_rows(), 0);
1538 assert_eq!(bulk_part.min_ts, i64::MAX);
1539 assert_eq!(bulk_part.max_ts, i64::MIN);
1540 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1541
1542 let schema = bulk_part.batch.schema();
1544 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1545 assert_eq!(
1546 field_names,
1547 vec![
1548 "k0",
1549 "k1",
1550 "v0",
1551 "v1",
1552 "ts",
1553 "__primary_key",
1554 "__sequence",
1555 "__op_type"
1556 ]
1557 );
1558 }
1559
1560 #[test]
1561 fn test_bulk_part_converter_without_primary_key_columns() {
1562 let metadata = metadata_for_test();
1563 let primary_key_codec = build_primary_key_codec(&metadata);
1564 let schema = to_flat_sst_arrow_schema(
1565 &metadata,
1566 &FlatSchemaOptions {
1567 raw_pk_columns: false,
1568 string_pk_use_dict: true,
1569 },
1570 );
1571
1572 let capacity = 100;
1573 let mut converter =
1574 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1575
1576 let key_values1 = build_key_values_with_ts_seq_values(
1577 &metadata,
1578 "key1".to_string(),
1579 1u32,
1580 vec![1000, 2000].into_iter(),
1581 vec![Some(1.0), Some(2.0)].into_iter(),
1582 1,
1583 );
1584
1585 let key_values2 = build_key_values_with_ts_seq_values(
1586 &metadata,
1587 "key2".to_string(),
1588 2u32,
1589 vec![1500].into_iter(),
1590 vec![Some(3.0)].into_iter(),
1591 2,
1592 );
1593
1594 converter.append_key_values(&key_values1).unwrap();
1595 converter.append_key_values(&key_values2).unwrap();
1596
1597 let bulk_part = converter.convert().unwrap();
1598
1599 assert_eq!(bulk_part.num_rows(), 3);
1600 assert_eq!(bulk_part.min_ts, 1000);
1601 assert_eq!(bulk_part.max_ts, 2000);
1602 assert_eq!(bulk_part.sequence, 2);
1603 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1604
1605 let schema = bulk_part.batch.schema();
1607 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1608 assert_eq!(
1609 field_names,
1610 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1611 );
1612 }
1613
1614 #[allow(clippy::too_many_arguments)]
1615 fn build_key_values_with_sparse_encoding(
1616 metadata: &RegionMetadataRef,
1617 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1618 table_id: u32,
1619 tsid: u64,
1620 k0: String,
1621 k1: String,
1622 timestamps: impl Iterator<Item = i64>,
1623 values: impl Iterator<Item = Option<f64>>,
1624 sequence: SequenceNumber,
1625 ) -> KeyValues {
1626 let pk_values = vec![
1628 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1629 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1630 (0, Value::String(k0.clone().into())),
1631 (1, Value::String(k1.clone().into())),
1632 ];
1633 let mut encoded_key = Vec::new();
1634 primary_key_codec
1635 .encode_values(&pk_values, &mut encoded_key)
1636 .unwrap();
1637 assert!(!encoded_key.is_empty());
1638
1639 let column_schema = vec![
1641 api::v1::ColumnSchema {
1642 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1643 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1644 ConcreteDataType::binary_datatype(),
1645 )
1646 .unwrap()
1647 .datatype() as i32,
1648 semantic_type: api::v1::SemanticType::Tag as i32,
1649 ..Default::default()
1650 },
1651 api::v1::ColumnSchema {
1652 column_name: "ts".to_string(),
1653 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1654 ConcreteDataType::timestamp_millisecond_datatype(),
1655 )
1656 .unwrap()
1657 .datatype() as i32,
1658 semantic_type: api::v1::SemanticType::Timestamp as i32,
1659 ..Default::default()
1660 },
1661 api::v1::ColumnSchema {
1662 column_name: "v0".to_string(),
1663 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1664 ConcreteDataType::int64_datatype(),
1665 )
1666 .unwrap()
1667 .datatype() as i32,
1668 semantic_type: api::v1::SemanticType::Field as i32,
1669 ..Default::default()
1670 },
1671 api::v1::ColumnSchema {
1672 column_name: "v1".to_string(),
1673 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1674 ConcreteDataType::float64_datatype(),
1675 )
1676 .unwrap()
1677 .datatype() as i32,
1678 semantic_type: api::v1::SemanticType::Field as i32,
1679 ..Default::default()
1680 },
1681 ];
1682
1683 let rows = timestamps
1684 .zip(values)
1685 .map(|(ts, v)| Row {
1686 values: vec![
1687 api::v1::Value {
1688 value_data: Some(api::v1::value::ValueData::BinaryValue(
1689 encoded_key.clone(),
1690 )),
1691 },
1692 api::v1::Value {
1693 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1694 },
1695 api::v1::Value {
1696 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1697 },
1698 api::v1::Value {
1699 value_data: v.map(api::v1::value::ValueData::F64Value),
1700 },
1701 ],
1702 })
1703 .collect();
1704
1705 let mutation = api::v1::Mutation {
1706 op_type: 1,
1707 sequence,
1708 rows: Some(api::v1::Rows {
1709 schema: column_schema,
1710 rows,
1711 }),
1712 write_hint: Some(WriteHint {
1713 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1714 }),
1715 };
1716 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1717 }
1718
1719 #[test]
1720 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1721 use api::v1::SemanticType;
1722 use datatypes::schema::ColumnSchema;
1723 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1724 use store_api::storage::RegionId;
1725
1726 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1727 builder
1728 .push_column_metadata(ColumnMetadata {
1729 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1730 semantic_type: SemanticType::Tag,
1731 column_id: 0,
1732 })
1733 .push_column_metadata(ColumnMetadata {
1734 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1735 semantic_type: SemanticType::Tag,
1736 column_id: 1,
1737 })
1738 .push_column_metadata(ColumnMetadata {
1739 column_schema: ColumnSchema::new(
1740 "ts",
1741 ConcreteDataType::timestamp_millisecond_datatype(),
1742 false,
1743 ),
1744 semantic_type: SemanticType::Timestamp,
1745 column_id: 2,
1746 })
1747 .push_column_metadata(ColumnMetadata {
1748 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1749 semantic_type: SemanticType::Field,
1750 column_id: 3,
1751 })
1752 .push_column_metadata(ColumnMetadata {
1753 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1754 semantic_type: SemanticType::Field,
1755 column_id: 4,
1756 })
1757 .primary_key(vec![0, 1])
1758 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1759 let metadata = Arc::new(builder.build().unwrap());
1760
1761 let primary_key_codec = build_primary_key_codec(&metadata);
1762 let schema = to_flat_sst_arrow_schema(
1763 &metadata,
1764 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1765 );
1766
1767 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1768 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1769
1770 let capacity = 100;
1771 let mut converter =
1772 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1773
1774 let key_values1 = build_key_values_with_sparse_encoding(
1775 &metadata,
1776 &primary_key_codec,
1777 2048u32, 100u64, "key11".to_string(),
1780 "key21".to_string(),
1781 vec![1000, 2000].into_iter(),
1782 vec![Some(1.0), Some(2.0)].into_iter(),
1783 1,
1784 );
1785
1786 let key_values2 = build_key_values_with_sparse_encoding(
1787 &metadata,
1788 &primary_key_codec,
1789 4096u32, 200u64, "key12".to_string(),
1792 "key22".to_string(),
1793 vec![1500].into_iter(),
1794 vec![Some(3.0)].into_iter(),
1795 2,
1796 );
1797
1798 converter.append_key_values(&key_values1).unwrap();
1799 converter.append_key_values(&key_values2).unwrap();
1800
1801 let bulk_part = converter.convert().unwrap();
1802
1803 assert_eq!(bulk_part.num_rows(), 3);
1804 assert_eq!(bulk_part.min_ts, 1000);
1805 assert_eq!(bulk_part.max_ts, 2000);
1806 assert_eq!(bulk_part.sequence, 2);
1807 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1808
1809 let schema = bulk_part.batch.schema();
1813 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1814 assert_eq!(
1815 field_names,
1816 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1817 );
1818
1819 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1821 let dict_array = primary_key_column
1822 .as_any()
1823 .downcast_ref::<DictionaryArray<UInt32Type>>()
1824 .unwrap();
1825
1826 assert!(!dict_array.is_empty());
1828 assert_eq!(dict_array.len(), 3); let values = dict_array
1832 .values()
1833 .as_any()
1834 .downcast_ref::<BinaryArray>()
1835 .unwrap();
1836 for i in 0..values.len() {
1837 assert!(
1838 !values.value(i).is_empty(),
1839 "Encoded primary key should not be empty"
1840 );
1841 }
1842 }
1843}