1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber, SequenceRange};
57use table::predicate::Predicate;
58
59use crate::error::{
60 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61 EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::memtable::bulk::context::BulkIterContextRef;
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
71use crate::sst::parquet::helper::parse_parquet_metadata;
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77#[derive(Clone)]
79pub struct BulkPart {
80 pub batch: RecordBatch,
81 pub max_timestamp: i64,
82 pub min_timestamp: i64,
83 pub sequence: u64,
84 pub timestamp_index: usize,
85 pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89 type Error = error::Error;
90
91 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92 match value.body.expect("Entry payload should be present") {
93 Body::ArrowIpc(ipc) => {
94 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95 .context(error::ConvertBulkWalEntrySnafu)?;
96 let batch = decoder
97 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 Ok(Self {
100 batch,
101 max_timestamp: value.max_ts,
102 min_timestamp: value.min_ts,
103 sequence: value.sequence,
104 timestamp_index: value.timestamp_index as usize,
105 raw_data: Some(ipc),
106 })
107 }
108 }
109 }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113 type Error = error::Error;
114
115 fn try_from(value: &BulkPart) -> Result<Self> {
116 if let Some(ipc) = &value.raw_data {
117 Ok(BulkWalEntry {
118 sequence: value.sequence,
119 max_ts: value.max_timestamp,
120 min_ts: value.min_timestamp,
121 timestamp_index: value.timestamp_index as u32,
122 body: Some(Body::ArrowIpc(ipc.clone())),
123 })
124 } else {
125 let mut encoder = FlightEncoder::default();
126 let schema_bytes = encoder
127 .encode_schema(value.batch.schema().as_ref())
128 .data_header;
129 let [rb_data] = encoder
130 .encode(FlightMessage::RecordBatch(value.batch.clone()))
131 .try_into()
132 .map_err(|_| {
133 error::UnsupportedOperationSnafu {
134 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135 }
136 .build()
137 })?;
138 Ok(BulkWalEntry {
139 sequence: value.sequence,
140 max_ts: value.max_timestamp,
141 min_ts: value.min_timestamp,
142 timestamp_index: value.timestamp_index as u32,
143 body: Some(Body::ArrowIpc(ArrowIpc {
144 schema: schema_bytes,
145 data_header: rb_data.data_header,
146 payload: rb_data.data_body,
147 })),
148 })
149 }
150 }
151}
152
153impl BulkPart {
154 pub(crate) fn estimated_size(&self) -> usize {
155 record_batch_estimated_size(&self.batch)
156 }
157
158 pub fn estimated_series_count(&self) -> usize {
161 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162 let pk_column = self.batch.column(pk_column_idx);
163 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164 dict_array.values().len()
165 } else {
166 0
167 }
168 }
169
170 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
172 let vectors = region_metadata
173 .schema
174 .column_schemas()
175 .iter()
176 .map(|col| match self.batch.column_by_name(&col.name) {
177 None => Ok(None),
178 Some(col) => Helper::try_into_vector(col).map(Some),
179 })
180 .collect::<datatypes::error::Result<Vec<_>>>()
181 .context(error::ComputeVectorSnafu)?;
182
183 let rows = (0..self.num_rows())
184 .map(|row_idx| {
185 let values = (0..self.batch.num_columns())
186 .map(|col_idx| {
187 if let Some(v) = &vectors[col_idx] {
188 value_to_grpc_value(v.get(row_idx))
189 } else {
190 api::v1::Value { value_data: None }
191 }
192 })
193 .collect::<Vec<_>>();
194 api::v1::Row { values }
195 })
196 .collect::<Vec<_>>();
197
198 let schema = region_metadata
199 .column_metadatas
200 .iter()
201 .map(|c| {
202 let data_type_wrapper =
203 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
204 Ok(api::v1::ColumnSchema {
205 column_name: c.column_schema.name.clone(),
206 datatype: data_type_wrapper.datatype() as i32,
207 semantic_type: c.semantic_type as i32,
208 ..Default::default()
209 })
210 })
211 .collect::<api::error::Result<Vec<_>>>()
212 .context(error::ConvertColumnDataTypeSnafu {
213 reason: "failed to convert region metadata to column schema",
214 })?;
215
216 let rows = api::v1::Rows { schema, rows };
217
218 Ok(Mutation {
219 op_type: OpType::Put as i32,
220 sequence: self.sequence,
221 rows: Some(rows),
222 write_hint: None,
223 })
224 }
225
226 pub fn timestamps(&self) -> &ArrayRef {
227 self.batch.column(self.timestamp_index)
228 }
229
230 pub fn num_rows(&self) -> usize {
231 self.batch.num_rows()
232 }
233}
234
235pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
237 batch
238 .columns()
239 .iter()
240 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
242 .sum()
243}
244
245enum PrimaryKeyColumnBuilder {
247 StringDict(StringDictionaryBuilder<UInt32Type>),
249 Vector(Box<dyn MutableVector>),
251}
252
253impl PrimaryKeyColumnBuilder {
254 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
256 match self {
257 PrimaryKeyColumnBuilder::StringDict(builder) => {
258 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
259 builder.append_value(s);
261 } else {
262 builder.append_null();
263 }
264 }
265 PrimaryKeyColumnBuilder::Vector(builder) => {
266 builder.push_value_ref(&value);
267 }
268 }
269 Ok(())
270 }
271
272 fn into_arrow_array(self) -> ArrayRef {
274 match self {
275 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
276 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
277 }
278 }
279}
280
281pub struct BulkPartConverter {
283 region_metadata: RegionMetadataRef,
285 schema: SchemaRef,
287 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
289 key_buf: Vec<u8>,
291 key_array_builder: PrimaryKeyArrayBuilder,
293 value_builder: ValueBuilder,
295 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
298
299 max_ts: i64,
301 min_ts: i64,
303 max_sequence: SequenceNumber,
305}
306
307impl BulkPartConverter {
308 pub fn new(
313 region_metadata: &RegionMetadataRef,
314 schema: SchemaRef,
315 capacity: usize,
316 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
317 store_primary_key_columns: bool,
318 ) -> Self {
319 debug_assert_eq!(
320 region_metadata.primary_key_encoding,
321 primary_key_codec.encoding()
322 );
323
324 let primary_key_column_builders = if store_primary_key_columns
325 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
326 {
327 new_primary_key_column_builders(region_metadata, capacity)
328 } else {
329 Vec::new()
330 };
331
332 Self {
333 region_metadata: region_metadata.clone(),
334 schema,
335 primary_key_codec,
336 key_buf: Vec::new(),
337 key_array_builder: PrimaryKeyArrayBuilder::new(),
338 value_builder: ValueBuilder::new(region_metadata, capacity),
339 primary_key_column_builders,
340 min_ts: i64::MAX,
341 max_ts: i64::MIN,
342 max_sequence: SequenceNumber::MIN,
343 }
344 }
345
346 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
348 for kv in key_values.iter() {
349 self.append_key_value(&kv)?;
350 }
351
352 Ok(())
353 }
354
355 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
359 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
361 let mut primary_keys = kv.primary_keys();
364 if let Some(encoded) = primary_keys
365 .next()
366 .context(ColumnNotFoundSnafu {
367 column: PRIMARY_KEY_COLUMN_NAME,
368 })?
369 .try_into_binary()
370 .context(DataTypeMismatchSnafu)?
371 {
372 self.key_array_builder
373 .append(encoded)
374 .context(ComputeArrowSnafu)?;
375 } else {
376 self.key_array_builder
377 .append("")
378 .context(ComputeArrowSnafu)?;
379 }
380 } else {
381 self.key_buf.clear();
383 self.primary_key_codec
384 .encode_key_value(kv, &mut self.key_buf)
385 .context(EncodeSnafu)?;
386 self.key_array_builder
387 .append(&self.key_buf)
388 .context(ComputeArrowSnafu)?;
389 };
390
391 if !self.primary_key_column_builders.is_empty() {
393 for (builder, pk_value) in self
394 .primary_key_column_builders
395 .iter_mut()
396 .zip(kv.primary_keys())
397 {
398 builder.push_value_ref(pk_value)?;
399 }
400 }
401
402 self.value_builder.push(
404 kv.timestamp(),
405 kv.sequence(),
406 kv.op_type() as u8,
407 kv.fields(),
408 );
409
410 let ts = kv
413 .timestamp()
414 .try_into_timestamp()
415 .unwrap()
416 .unwrap()
417 .value();
418 self.min_ts = self.min_ts.min(ts);
419 self.max_ts = self.max_ts.max(ts);
420 self.max_sequence = self.max_sequence.max(kv.sequence());
421
422 Ok(())
423 }
424
425 pub fn convert(mut self) -> Result<BulkPart> {
429 let values = Values::from(self.value_builder);
430 let mut columns =
431 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
432
433 for builder in self.primary_key_column_builders {
435 columns.push(builder.into_arrow_array());
436 }
437 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
439 let timestamp_index = columns.len();
441 columns.push(values.timestamp.to_arrow_array());
442 let pk_array = self.key_array_builder.finish();
444 columns.push(Arc::new(pk_array));
445 columns.push(values.sequence.to_arrow_array());
447 columns.push(values.op_type.to_arrow_array());
448
449 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
450 let batch = sort_primary_key_record_batch(&batch)?;
452
453 Ok(BulkPart {
454 batch,
455 max_timestamp: self.max_ts,
456 min_timestamp: self.min_ts,
457 sequence: self.max_sequence,
458 timestamp_index,
459 raw_data: None,
460 })
461 }
462}
463
464fn new_primary_key_column_builders(
465 metadata: &RegionMetadata,
466 capacity: usize,
467) -> Vec<PrimaryKeyColumnBuilder> {
468 metadata
469 .primary_key_columns()
470 .map(|col| {
471 if col.column_schema.data_type.is_string() {
472 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
473 capacity,
474 INIT_DICT_VALUE_CAPACITY,
475 capacity,
476 ))
477 } else {
478 PrimaryKeyColumnBuilder::Vector(
479 col.column_schema.data_type.create_mutable_vector(capacity),
480 )
481 }
482 })
483 .collect()
484}
485
486fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
488 let total_columns = batch.num_columns();
489 let sort_columns = vec![
490 SortColumn {
492 values: batch.column(total_columns - 3).clone(),
493 options: Some(SortOptions {
494 descending: false,
495 nulls_first: true,
496 }),
497 },
498 SortColumn {
500 values: batch.column(total_columns - 4).clone(),
501 options: Some(SortOptions {
502 descending: false,
503 nulls_first: true,
504 }),
505 },
506 SortColumn {
508 values: batch.column(total_columns - 2).clone(),
509 options: Some(SortOptions {
510 descending: true,
511 nulls_first: true,
512 }),
513 },
514 ];
515
516 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
517 .context(ComputeArrowSnafu)?;
518
519 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
520}
521
522#[derive(Debug, Clone)]
523pub struct EncodedBulkPart {
524 data: Bytes,
525 metadata: BulkPartMeta,
526}
527
528impl EncodedBulkPart {
529 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
530 Self { data, metadata }
531 }
532
533 pub(crate) fn metadata(&self) -> &BulkPartMeta {
534 &self.metadata
535 }
536
537 pub(crate) fn size_bytes(&self) -> usize {
539 self.data.len()
540 }
541
542 pub(crate) fn data(&self) -> &Bytes {
544 &self.data
545 }
546
547 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
555 let unit = self.metadata.region_metadata.time_index_type().unit();
556 SstInfo {
557 file_id,
558 time_range: (
559 Timestamp::new(self.metadata.min_timestamp, unit),
560 Timestamp::new(self.metadata.max_timestamp, unit),
561 ),
562 file_size: self.data.len() as u64,
563 num_rows: self.metadata.num_rows,
564 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
565 file_metadata: Some(self.metadata.parquet_metadata.clone()),
566 index_metadata: IndexOutput::default(),
567 num_series: self.metadata.num_series,
568 }
569 }
570
571 pub(crate) fn read(
572 &self,
573 context: BulkIterContextRef,
574 sequence: Option<SequenceRange>,
575 ) -> Result<Option<BoxedRecordBatchIterator>> {
576 let skip_fields_for_pruning =
578 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
579
580 let row_groups_to_read =
582 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
583
584 if row_groups_to_read.is_empty() {
585 return Ok(None);
587 }
588
589 let iter = EncodedBulkPartIter::try_new(
590 context,
591 row_groups_to_read,
592 self.metadata.parquet_metadata.clone(),
593 self.data.clone(),
594 sequence,
595 )?;
596 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
597 }
598
599 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
601 match pre_filter_mode {
602 PreFilterMode::All => false,
603 PreFilterMode::SkipFields => true,
604 PreFilterMode::SkipFieldsOnDelete => {
605 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
607 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
608 })
609 }
610 }
611 }
612}
613
614#[derive(Debug, Clone)]
615pub struct BulkPartMeta {
616 pub num_rows: usize,
618 pub max_timestamp: i64,
620 pub min_timestamp: i64,
622 pub parquet_metadata: Arc<ParquetMetaData>,
624 pub region_metadata: RegionMetadataRef,
626 pub num_series: u64,
628}
629
630#[derive(Default, Debug)]
632pub struct BulkPartEncodeMetrics {
633 pub iter_cost: Duration,
635 pub write_cost: Duration,
637 pub raw_size: usize,
639 pub encoded_size: usize,
641 pub num_rows: usize,
643}
644
645pub struct BulkPartEncoder {
646 metadata: RegionMetadataRef,
647 row_group_size: usize,
648 writer_props: Option<WriterProperties>,
649}
650
651impl BulkPartEncoder {
652 pub(crate) fn new(
653 metadata: RegionMetadataRef,
654 row_group_size: usize,
655 ) -> Result<BulkPartEncoder> {
656 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
658 let key_value_meta =
659 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
660
661 let writer_props = Some(
663 WriterProperties::builder()
664 .set_key_value_metadata(Some(vec![key_value_meta]))
665 .set_write_batch_size(row_group_size)
666 .set_max_row_group_size(row_group_size)
667 .set_compression(Compression::ZSTD(ZstdLevel::default()))
668 .set_column_index_truncate_length(None)
669 .set_statistics_truncate_length(None)
670 .build(),
671 );
672
673 Ok(Self {
674 metadata,
675 row_group_size,
676 writer_props,
677 })
678 }
679}
680
681impl BulkPartEncoder {
682 pub fn encode_record_batch_iter(
684 &self,
685 iter: BoxedRecordBatchIterator,
686 arrow_schema: SchemaRef,
687 min_timestamp: i64,
688 max_timestamp: i64,
689 metrics: &mut BulkPartEncodeMetrics,
690 ) -> Result<Option<EncodedBulkPart>> {
691 let mut buf = Vec::with_capacity(4096);
692 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
693 .context(EncodeMemtableSnafu)?;
694 let mut total_rows = 0;
695 let mut series_estimator = SeriesEstimator::default();
696
697 let mut iter_start = Instant::now();
699 for batch_result in iter {
700 metrics.iter_cost += iter_start.elapsed();
701 let batch = batch_result?;
702 if batch.num_rows() == 0 {
703 continue;
704 }
705
706 series_estimator.update_flat(&batch);
707 metrics.raw_size += record_batch_estimated_size(&batch);
708 let write_start = Instant::now();
709 writer.write(&batch).context(EncodeMemtableSnafu)?;
710 metrics.write_cost += write_start.elapsed();
711 total_rows += batch.num_rows();
712 iter_start = Instant::now();
713 }
714 metrics.iter_cost += iter_start.elapsed();
715 iter_start = Instant::now();
716
717 if total_rows == 0 {
718 return Ok(None);
719 }
720
721 let close_start = Instant::now();
722 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
723 metrics.write_cost += close_start.elapsed();
724 metrics.encoded_size += buf.len();
725 metrics.num_rows += total_rows;
726
727 let buf = Bytes::from(buf);
728 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
729 let num_series = series_estimator.finish();
730
731 Ok(Some(EncodedBulkPart {
732 data: buf,
733 metadata: BulkPartMeta {
734 num_rows: total_rows,
735 max_timestamp,
736 min_timestamp,
737 parquet_metadata,
738 region_metadata: self.metadata.clone(),
739 num_series,
740 },
741 }))
742 }
743
744 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
746 if part.batch.num_rows() == 0 {
747 return Ok(None);
748 }
749
750 let mut buf = Vec::with_capacity(4096);
751 let arrow_schema = part.batch.schema();
752
753 let file_metadata = {
754 let mut writer =
755 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
756 .context(EncodeMemtableSnafu)?;
757 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
758 writer.finish().context(EncodeMemtableSnafu)?
759 };
760
761 let buf = Bytes::from(buf);
762 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
763
764 Ok(Some(EncodedBulkPart {
765 data: buf,
766 metadata: BulkPartMeta {
767 num_rows: part.batch.num_rows(),
768 max_timestamp: part.max_timestamp,
769 min_timestamp: part.min_timestamp,
770 parquet_metadata,
771 region_metadata: self.metadata.clone(),
772 num_series: part.estimated_series_count() as u64,
773 },
774 }))
775 }
776}
777
778fn mutations_to_record_batch(
780 mutations: &[Mutation],
781 metadata: &RegionMetadataRef,
782 pk_encoder: &DensePrimaryKeyCodec,
783 dedup: bool,
784) -> Result<Option<(RecordBatch, i64, i64)>> {
785 let total_rows: usize = mutations
786 .iter()
787 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
788 .sum();
789
790 if total_rows == 0 {
791 return Ok(None);
792 }
793
794 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
795
796 let mut ts_vector: Box<dyn MutableVector> = metadata
797 .time_index_column()
798 .column_schema
799 .data_type
800 .create_mutable_vector(total_rows);
801 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
802 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
803
804 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
805 .field_columns()
806 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
807 .collect();
808
809 let mut pk_buffer = vec![];
810 for m in mutations {
811 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
812 continue;
813 };
814
815 for row in key_values.iter() {
816 pk_buffer.clear();
817 pk_encoder
818 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
819 .context(EncodeSnafu)?;
820 pk_builder.append_value(pk_buffer.as_bytes());
821 ts_vector.push_value_ref(&row.timestamp());
822 sequence_builder.append_value(row.sequence());
823 op_type_builder.append_value(row.op_type() as u8);
824 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
825 builder.push_value_ref(&field);
826 }
827 }
828 }
829
830 let arrow_schema = to_sst_arrow_schema(metadata);
831 let timestamp_unit = metadata
833 .time_index_column()
834 .column_schema
835 .data_type
836 .as_timestamp()
837 .unwrap()
838 .unit();
839 let sorter = ArraysSorter {
840 encoded_primary_keys: pk_builder.finish(),
841 timestamp_unit,
842 timestamp: ts_vector.to_vector().to_arrow_array(),
843 sequence: sequence_builder.finish(),
844 op_type: op_type_builder.finish(),
845 fields: field_builders
846 .iter_mut()
847 .map(|f| f.to_vector().to_arrow_array()),
848 dedup,
849 arrow_schema,
850 };
851
852 sorter.sort().map(Some)
853}
854
855struct ArraysSorter<I> {
856 encoded_primary_keys: BinaryArray,
857 timestamp_unit: TimeUnit,
858 timestamp: ArrayRef,
859 sequence: UInt64Array,
860 op_type: UInt8Array,
861 fields: I,
862 dedup: bool,
863 arrow_schema: SchemaRef,
864}
865
866impl<I> ArraysSorter<I>
867where
868 I: Iterator<Item = ArrayRef>,
869{
870 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
872 debug_assert!(!self.timestamp.is_empty());
873 debug_assert!(self.timestamp.len() == self.sequence.len());
874 debug_assert!(self.timestamp.len() == self.op_type.len());
875 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
876
877 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
878 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
879 let mut to_sort = self
880 .encoded_primary_keys
881 .iter()
882 .zip(timestamp_iter)
883 .zip(self.sequence.iter())
884 .map(|((pk, timestamp), sequence)| {
885 max_timestamp = max_timestamp.max(*timestamp);
886 min_timestamp = min_timestamp.min(*timestamp);
887 (pk, timestamp, sequence)
888 })
889 .enumerate()
890 .collect::<Vec<_>>();
891
892 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
893 l_pk.cmp(r_pk)
894 .then(l_ts.cmp(r_ts))
895 .then(l_seq.cmp(r_seq).reverse())
896 });
897
898 if self.dedup {
899 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
901 l_pk == r_pk && l_ts == r_ts
902 });
903 }
904
905 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
906
907 let pk_dictionary = Arc::new(binary_array_to_dictionary(
908 arrow::compute::take(
910 &self.encoded_primary_keys,
911 &indices,
912 Some(TakeOptions {
913 check_bounds: false,
914 }),
915 )
916 .context(ComputeArrowSnafu)?
917 .as_any()
918 .downcast_ref::<BinaryArray>()
919 .unwrap(),
920 )?) as ArrayRef;
921
922 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
923 for arr in self.fields {
924 arrays.push(
925 arrow::compute::take(
926 &arr,
927 &indices,
928 Some(TakeOptions {
929 check_bounds: false,
930 }),
931 )
932 .context(ComputeArrowSnafu)?,
933 );
934 }
935
936 let timestamp = arrow::compute::take(
937 &self.timestamp,
938 &indices,
939 Some(TakeOptions {
940 check_bounds: false,
941 }),
942 )
943 .context(ComputeArrowSnafu)?;
944
945 arrays.push(timestamp);
946 arrays.push(pk_dictionary);
947 arrays.push(
948 arrow::compute::take(
949 &self.sequence,
950 &indices,
951 Some(TakeOptions {
952 check_bounds: false,
953 }),
954 )
955 .context(ComputeArrowSnafu)?,
956 );
957
958 arrays.push(
959 arrow::compute::take(
960 &self.op_type,
961 &indices,
962 Some(TakeOptions {
963 check_bounds: false,
964 }),
965 )
966 .context(ComputeArrowSnafu)?,
967 );
968
969 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
970 Ok((batch, min_timestamp, max_timestamp))
971 }
972}
973
974fn timestamp_array_to_iter(
976 timestamp_unit: TimeUnit,
977 timestamp: &ArrayRef,
978) -> impl Iterator<Item = &i64> {
979 match timestamp_unit {
980 TimeUnit::Second => timestamp
982 .as_any()
983 .downcast_ref::<TimestampSecondArray>()
984 .unwrap()
985 .values()
986 .iter(),
987 TimeUnit::Millisecond => timestamp
988 .as_any()
989 .downcast_ref::<TimestampMillisecondArray>()
990 .unwrap()
991 .values()
992 .iter(),
993 TimeUnit::Microsecond => timestamp
994 .as_any()
995 .downcast_ref::<TimestampMicrosecondArray>()
996 .unwrap()
997 .values()
998 .iter(),
999 TimeUnit::Nanosecond => timestamp
1000 .as_any()
1001 .downcast_ref::<TimestampNanosecondArray>()
1002 .unwrap()
1003 .values()
1004 .iter(),
1005 }
1006}
1007
1008fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1010 if input.is_empty() {
1011 return Ok(DictionaryArray::new(
1012 UInt32Array::from(Vec::<u32>::new()),
1013 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1014 ));
1015 }
1016 let mut keys = Vec::with_capacity(16);
1017 let mut values = BinaryBuilder::new();
1018 let mut prev: usize = 0;
1019 keys.push(prev as u32);
1020 values.append_value(input.value(prev));
1021
1022 for current_bytes in input.iter().skip(1) {
1023 let current_bytes = current_bytes.unwrap();
1025 let prev_bytes = input.value(prev);
1026 if current_bytes != prev_bytes {
1027 values.append_value(current_bytes);
1028 prev += 1;
1029 }
1030 keys.push(prev as u32);
1031 }
1032
1033 Ok(DictionaryArray::new(
1034 UInt32Array::from(keys),
1035 Arc::new(values.finish()) as ArrayRef,
1036 ))
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use std::collections::VecDeque;
1042
1043 use api::v1::{Row, WriteHint};
1044 use datafusion_common::ScalarValue;
1045 use datatypes::arrow::array::Float64Array;
1046 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1047 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1048 use store_api::storage::consts::ReservedColumnId;
1049
1050 use super::*;
1051 use crate::memtable::bulk::context::BulkIterContext;
1052 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1053 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1054 use crate::test_util::memtable_util::{
1055 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1056 };
1057
1058 fn check_binary_array_to_dictionary(
1059 input: &[&[u8]],
1060 expected_keys: &[u32],
1061 expected_values: &[&[u8]],
1062 ) {
1063 let input = BinaryArray::from_iter_values(input.iter());
1064 let array = binary_array_to_dictionary(&input).unwrap();
1065 assert_eq!(
1066 &expected_keys,
1067 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1068 );
1069 assert_eq!(
1070 expected_values,
1071 &array
1072 .values()
1073 .as_any()
1074 .downcast_ref::<BinaryArray>()
1075 .unwrap()
1076 .iter()
1077 .map(|v| v.unwrap())
1078 .collect::<Vec<_>>()
1079 );
1080 }
1081
1082 #[test]
1083 fn test_binary_array_to_dictionary() {
1084 check_binary_array_to_dictionary(&[], &[], &[]);
1085
1086 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1087
1088 check_binary_array_to_dictionary(
1089 &["a".as_bytes(), "a".as_bytes()],
1090 &[0, 0],
1091 &["a".as_bytes()],
1092 );
1093
1094 check_binary_array_to_dictionary(
1095 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1096 &[0, 0, 1],
1097 &["a".as_bytes(), "b".as_bytes()],
1098 );
1099
1100 check_binary_array_to_dictionary(
1101 &[
1102 "a".as_bytes(),
1103 "a".as_bytes(),
1104 "b".as_bytes(),
1105 "c".as_bytes(),
1106 ],
1107 &[0, 0, 1, 2],
1108 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1109 );
1110 }
1111
1112 struct MutationInput<'a> {
1113 k0: &'a str,
1114 k1: u32,
1115 timestamps: &'a [i64],
1116 v1: &'a [Option<f64>],
1117 sequence: u64,
1118 }
1119
1120 #[derive(Debug, PartialOrd, PartialEq)]
1121 struct BatchOutput<'a> {
1122 pk_values: &'a [Value],
1123 timestamps: &'a [i64],
1124 v1: &'a [Option<f64>],
1125 }
1126
1127 fn check_mutations_to_record_batches(
1128 input: &[MutationInput],
1129 expected: &[BatchOutput],
1130 expected_timestamp: (i64, i64),
1131 dedup: bool,
1132 ) {
1133 let metadata = metadata_for_test();
1134 let mutations = input
1135 .iter()
1136 .map(|m| {
1137 build_key_values_with_ts_seq_values(
1138 &metadata,
1139 m.k0.to_string(),
1140 m.k1,
1141 m.timestamps.iter().copied(),
1142 m.v1.iter().copied(),
1143 m.sequence,
1144 )
1145 .mutation
1146 })
1147 .collect::<Vec<_>>();
1148 let total_rows: usize = mutations
1149 .iter()
1150 .flat_map(|m| m.rows.iter())
1151 .map(|r| r.rows.len())
1152 .sum();
1153
1154 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1155
1156 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1157 .unwrap()
1158 .unwrap();
1159 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1160 let mut batches = VecDeque::new();
1161 read_format
1162 .convert_record_batch(&batch, None, &mut batches)
1163 .unwrap();
1164 if !dedup {
1165 assert_eq!(
1166 total_rows,
1167 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1168 );
1169 }
1170 let batch_values = batches
1171 .into_iter()
1172 .map(|b| {
1173 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1174 let timestamps = b
1175 .timestamps()
1176 .as_any()
1177 .downcast_ref::<TimestampMillisecondVector>()
1178 .unwrap()
1179 .iter_data()
1180 .map(|v| v.unwrap().0.value())
1181 .collect::<Vec<_>>();
1182 let float_values = b.fields()[1]
1183 .data
1184 .as_any()
1185 .downcast_ref::<Float64Vector>()
1186 .unwrap()
1187 .iter_data()
1188 .collect::<Vec<_>>();
1189
1190 (pk_values, timestamps, float_values)
1191 })
1192 .collect::<Vec<_>>();
1193 assert_eq!(expected.len(), batch_values.len());
1194
1195 for idx in 0..expected.len() {
1196 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1197 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1198 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1199 }
1200 }
1201
1202 #[test]
1203 fn test_mutations_to_record_batch() {
1204 check_mutations_to_record_batches(
1205 &[MutationInput {
1206 k0: "a",
1207 k1: 0,
1208 timestamps: &[0],
1209 v1: &[Some(0.1)],
1210 sequence: 0,
1211 }],
1212 &[BatchOutput {
1213 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1214 timestamps: &[0],
1215 v1: &[Some(0.1)],
1216 }],
1217 (0, 0),
1218 true,
1219 );
1220
1221 check_mutations_to_record_batches(
1222 &[
1223 MutationInput {
1224 k0: "a",
1225 k1: 0,
1226 timestamps: &[0],
1227 v1: &[Some(0.1)],
1228 sequence: 0,
1229 },
1230 MutationInput {
1231 k0: "b",
1232 k1: 0,
1233 timestamps: &[0],
1234 v1: &[Some(0.0)],
1235 sequence: 0,
1236 },
1237 MutationInput {
1238 k0: "a",
1239 k1: 0,
1240 timestamps: &[1],
1241 v1: &[Some(0.2)],
1242 sequence: 1,
1243 },
1244 MutationInput {
1245 k0: "a",
1246 k1: 1,
1247 timestamps: &[1],
1248 v1: &[Some(0.3)],
1249 sequence: 2,
1250 },
1251 ],
1252 &[
1253 BatchOutput {
1254 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1255 timestamps: &[0, 1],
1256 v1: &[Some(0.1), Some(0.2)],
1257 },
1258 BatchOutput {
1259 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1260 timestamps: &[1],
1261 v1: &[Some(0.3)],
1262 },
1263 BatchOutput {
1264 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1265 timestamps: &[0],
1266 v1: &[Some(0.0)],
1267 },
1268 ],
1269 (0, 1),
1270 true,
1271 );
1272
1273 check_mutations_to_record_batches(
1274 &[
1275 MutationInput {
1276 k0: "a",
1277 k1: 0,
1278 timestamps: &[0],
1279 v1: &[Some(0.1)],
1280 sequence: 0,
1281 },
1282 MutationInput {
1283 k0: "b",
1284 k1: 0,
1285 timestamps: &[0],
1286 v1: &[Some(0.0)],
1287 sequence: 0,
1288 },
1289 MutationInput {
1290 k0: "a",
1291 k1: 0,
1292 timestamps: &[0],
1293 v1: &[Some(0.2)],
1294 sequence: 1,
1295 },
1296 ],
1297 &[
1298 BatchOutput {
1299 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1300 timestamps: &[0],
1301 v1: &[Some(0.2)],
1302 },
1303 BatchOutput {
1304 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1305 timestamps: &[0],
1306 v1: &[Some(0.0)],
1307 },
1308 ],
1309 (0, 0),
1310 true,
1311 );
1312 check_mutations_to_record_batches(
1313 &[
1314 MutationInput {
1315 k0: "a",
1316 k1: 0,
1317 timestamps: &[0],
1318 v1: &[Some(0.1)],
1319 sequence: 0,
1320 },
1321 MutationInput {
1322 k0: "b",
1323 k1: 0,
1324 timestamps: &[0],
1325 v1: &[Some(0.0)],
1326 sequence: 0,
1327 },
1328 MutationInput {
1329 k0: "a",
1330 k1: 0,
1331 timestamps: &[0],
1332 v1: &[Some(0.2)],
1333 sequence: 1,
1334 },
1335 ],
1336 &[
1337 BatchOutput {
1338 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1339 timestamps: &[0, 0],
1340 v1: &[Some(0.2), Some(0.1)],
1341 },
1342 BatchOutput {
1343 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1344 timestamps: &[0],
1345 v1: &[Some(0.0)],
1346 },
1347 ],
1348 (0, 0),
1349 false,
1350 );
1351 }
1352
1353 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1354 let metadata = metadata_for_test();
1355 let kvs = input
1356 .iter()
1357 .map(|m| {
1358 build_key_values_with_ts_seq_values(
1359 &metadata,
1360 m.k0.to_string(),
1361 m.k1,
1362 m.timestamps.iter().copied(),
1363 m.v1.iter().copied(),
1364 m.sequence,
1365 )
1366 })
1367 .collect::<Vec<_>>();
1368 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1369 let primary_key_codec = build_primary_key_codec(&metadata);
1370 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1371 for kv in kvs {
1372 converter.append_key_values(&kv).unwrap();
1373 }
1374 let part = converter.convert().unwrap();
1375 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1376 encoder.encode_part(&part).unwrap().unwrap()
1377 }
1378
1379 #[test]
1380 fn test_write_and_read_part_projection() {
1381 let part = encode(&[
1382 MutationInput {
1383 k0: "a",
1384 k1: 0,
1385 timestamps: &[1],
1386 v1: &[Some(0.1)],
1387 sequence: 0,
1388 },
1389 MutationInput {
1390 k0: "b",
1391 k1: 0,
1392 timestamps: &[1],
1393 v1: &[Some(0.0)],
1394 sequence: 0,
1395 },
1396 MutationInput {
1397 k0: "a",
1398 k1: 0,
1399 timestamps: &[2],
1400 v1: &[Some(0.2)],
1401 sequence: 1,
1402 },
1403 ]);
1404
1405 let projection = &[4u32];
1406 let mut reader = part
1407 .read(
1408 Arc::new(
1409 BulkIterContext::new(
1410 part.metadata.region_metadata.clone(),
1411 Some(projection.as_slice()),
1412 None,
1413 false,
1414 )
1415 .unwrap(),
1416 ),
1417 None,
1418 )
1419 .unwrap()
1420 .expect("expect at least one row group");
1421
1422 let mut total_rows_read = 0;
1423 let mut field: Vec<f64> = vec![];
1424 for res in reader {
1425 let batch = res.unwrap();
1426 assert_eq!(5, batch.num_columns());
1427 field.extend_from_slice(
1428 batch
1429 .column(0)
1430 .as_any()
1431 .downcast_ref::<Float64Array>()
1432 .unwrap()
1433 .values(),
1434 );
1435 total_rows_read += batch.num_rows();
1436 }
1437 assert_eq!(3, total_rows_read);
1438 assert_eq!(vec![0.1, 0.2, 0.0], field);
1439 }
1440
1441 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1442 let metadata = metadata_for_test();
1443 let kvs = key_values
1444 .into_iter()
1445 .map(|(k0, k1, (start, end), sequence)| {
1446 let ts = (start..end);
1447 let v1 = (start..end).map(|_| None);
1448 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1449 })
1450 .collect::<Vec<_>>();
1451 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1452 let primary_key_codec = build_primary_key_codec(&metadata);
1453 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1454 for kv in kvs {
1455 converter.append_key_values(&kv).unwrap();
1456 }
1457 let part = converter.convert().unwrap();
1458 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1459 encoder.encode_part(&part).unwrap().unwrap()
1460 }
1461
1462 fn check_prune_row_group(
1463 part: &EncodedBulkPart,
1464 predicate: Option<Predicate>,
1465 expected_rows: usize,
1466 ) {
1467 let context = Arc::new(
1468 BulkIterContext::new(
1469 part.metadata.region_metadata.clone(),
1470 None,
1471 predicate,
1472 false,
1473 )
1474 .unwrap(),
1475 );
1476 let mut reader = part
1477 .read(context, None)
1478 .unwrap()
1479 .expect("expect at least one row group");
1480 let mut total_rows_read = 0;
1481 for res in reader {
1482 let batch = res.unwrap();
1483 total_rows_read += batch.num_rows();
1484 }
1485 assert_eq!(expected_rows, total_rows_read);
1487 }
1488
1489 #[test]
1490 fn test_prune_row_groups() {
1491 let part = prepare(vec![
1492 ("a", 0, (0, 40), 1),
1493 ("a", 1, (0, 60), 1),
1494 ("b", 0, (0, 100), 2),
1495 ("b", 1, (100, 180), 3),
1496 ("b", 1, (180, 210), 4),
1497 ]);
1498
1499 let context = Arc::new(
1500 BulkIterContext::new(
1501 part.metadata.region_metadata.clone(),
1502 None,
1503 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1504 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1505 )])),
1506 false,
1507 )
1508 .unwrap(),
1509 );
1510 assert!(part.read(context, None).unwrap().is_none());
1511
1512 check_prune_row_group(&part, None, 310);
1513
1514 check_prune_row_group(
1515 &part,
1516 Some(Predicate::new(vec![
1517 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1518 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1519 ])),
1520 40,
1521 );
1522
1523 check_prune_row_group(
1524 &part,
1525 Some(Predicate::new(vec![
1526 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1527 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1528 ])),
1529 60,
1530 );
1531
1532 check_prune_row_group(
1533 &part,
1534 Some(Predicate::new(vec![
1535 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1536 ])),
1537 100,
1538 );
1539
1540 check_prune_row_group(
1541 &part,
1542 Some(Predicate::new(vec![
1543 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1544 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1545 ])),
1546 100,
1547 );
1548
1549 check_prune_row_group(
1551 &part,
1552 Some(Predicate::new(vec![
1553 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1554 ])),
1555 1,
1556 );
1557 }
1558
1559 #[test]
1560 fn test_bulk_part_converter_append_and_convert() {
1561 let metadata = metadata_for_test();
1562 let capacity = 100;
1563 let primary_key_codec = build_primary_key_codec(&metadata);
1564 let schema = to_flat_sst_arrow_schema(
1565 &metadata,
1566 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1567 );
1568
1569 let mut converter =
1570 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1571
1572 let key_values1 = build_key_values_with_ts_seq_values(
1573 &metadata,
1574 "key1".to_string(),
1575 1u32,
1576 vec![1000, 2000].into_iter(),
1577 vec![Some(1.0), Some(2.0)].into_iter(),
1578 1,
1579 );
1580
1581 let key_values2 = build_key_values_with_ts_seq_values(
1582 &metadata,
1583 "key2".to_string(),
1584 2u32,
1585 vec![1500].into_iter(),
1586 vec![Some(3.0)].into_iter(),
1587 2,
1588 );
1589
1590 converter.append_key_values(&key_values1).unwrap();
1591 converter.append_key_values(&key_values2).unwrap();
1592
1593 let bulk_part = converter.convert().unwrap();
1594
1595 assert_eq!(bulk_part.num_rows(), 3);
1596 assert_eq!(bulk_part.min_timestamp, 1000);
1597 assert_eq!(bulk_part.max_timestamp, 2000);
1598 assert_eq!(bulk_part.sequence, 2);
1599 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1600
1601 let schema = bulk_part.batch.schema();
1604 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1605 assert_eq!(
1606 field_names,
1607 vec![
1608 "k0",
1609 "k1",
1610 "v0",
1611 "v1",
1612 "ts",
1613 "__primary_key",
1614 "__sequence",
1615 "__op_type"
1616 ]
1617 );
1618 }
1619
1620 #[test]
1621 fn test_bulk_part_converter_sorting() {
1622 let metadata = metadata_for_test();
1623 let capacity = 100;
1624 let primary_key_codec = build_primary_key_codec(&metadata);
1625 let schema = to_flat_sst_arrow_schema(
1626 &metadata,
1627 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1628 );
1629
1630 let mut converter =
1631 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1632
1633 let key_values1 = build_key_values_with_ts_seq_values(
1634 &metadata,
1635 "z_key".to_string(),
1636 3u32,
1637 vec![3000].into_iter(),
1638 vec![Some(3.0)].into_iter(),
1639 3,
1640 );
1641
1642 let key_values2 = build_key_values_with_ts_seq_values(
1643 &metadata,
1644 "a_key".to_string(),
1645 1u32,
1646 vec![1000].into_iter(),
1647 vec![Some(1.0)].into_iter(),
1648 1,
1649 );
1650
1651 let key_values3 = build_key_values_with_ts_seq_values(
1652 &metadata,
1653 "m_key".to_string(),
1654 2u32,
1655 vec![2000].into_iter(),
1656 vec![Some(2.0)].into_iter(),
1657 2,
1658 );
1659
1660 converter.append_key_values(&key_values1).unwrap();
1661 converter.append_key_values(&key_values2).unwrap();
1662 converter.append_key_values(&key_values3).unwrap();
1663
1664 let bulk_part = converter.convert().unwrap();
1665
1666 assert_eq!(bulk_part.num_rows(), 3);
1667
1668 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1669 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1670
1671 let ts_array = ts_column
1672 .as_any()
1673 .downcast_ref::<TimestampMillisecondArray>()
1674 .unwrap();
1675 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1676
1677 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1678 assert_eq!(seq_array.values(), &[1, 2, 3]);
1679
1680 let schema = bulk_part.batch.schema();
1682 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1683 assert_eq!(
1684 field_names,
1685 vec![
1686 "k0",
1687 "k1",
1688 "v0",
1689 "v1",
1690 "ts",
1691 "__primary_key",
1692 "__sequence",
1693 "__op_type"
1694 ]
1695 );
1696 }
1697
1698 #[test]
1699 fn test_bulk_part_converter_empty() {
1700 let metadata = metadata_for_test();
1701 let capacity = 10;
1702 let primary_key_codec = build_primary_key_codec(&metadata);
1703 let schema = to_flat_sst_arrow_schema(
1704 &metadata,
1705 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1706 );
1707
1708 let converter =
1709 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1710
1711 let bulk_part = converter.convert().unwrap();
1712
1713 assert_eq!(bulk_part.num_rows(), 0);
1714 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1715 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1716 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1717
1718 let schema = bulk_part.batch.schema();
1720 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1721 assert_eq!(
1722 field_names,
1723 vec![
1724 "k0",
1725 "k1",
1726 "v0",
1727 "v1",
1728 "ts",
1729 "__primary_key",
1730 "__sequence",
1731 "__op_type"
1732 ]
1733 );
1734 }
1735
1736 #[test]
1737 fn test_bulk_part_converter_without_primary_key_columns() {
1738 let metadata = metadata_for_test();
1739 let primary_key_codec = build_primary_key_codec(&metadata);
1740 let schema = to_flat_sst_arrow_schema(
1741 &metadata,
1742 &FlatSchemaOptions {
1743 raw_pk_columns: false,
1744 string_pk_use_dict: true,
1745 },
1746 );
1747
1748 let capacity = 100;
1749 let mut converter =
1750 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1751
1752 let key_values1 = build_key_values_with_ts_seq_values(
1753 &metadata,
1754 "key1".to_string(),
1755 1u32,
1756 vec![1000, 2000].into_iter(),
1757 vec![Some(1.0), Some(2.0)].into_iter(),
1758 1,
1759 );
1760
1761 let key_values2 = build_key_values_with_ts_seq_values(
1762 &metadata,
1763 "key2".to_string(),
1764 2u32,
1765 vec![1500].into_iter(),
1766 vec![Some(3.0)].into_iter(),
1767 2,
1768 );
1769
1770 converter.append_key_values(&key_values1).unwrap();
1771 converter.append_key_values(&key_values2).unwrap();
1772
1773 let bulk_part = converter.convert().unwrap();
1774
1775 assert_eq!(bulk_part.num_rows(), 3);
1776 assert_eq!(bulk_part.min_timestamp, 1000);
1777 assert_eq!(bulk_part.max_timestamp, 2000);
1778 assert_eq!(bulk_part.sequence, 2);
1779 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1780
1781 let schema = bulk_part.batch.schema();
1783 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1784 assert_eq!(
1785 field_names,
1786 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1787 );
1788 }
1789
1790 #[allow(clippy::too_many_arguments)]
1791 fn build_key_values_with_sparse_encoding(
1792 metadata: &RegionMetadataRef,
1793 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1794 table_id: u32,
1795 tsid: u64,
1796 k0: String,
1797 k1: String,
1798 timestamps: impl Iterator<Item = i64>,
1799 values: impl Iterator<Item = Option<f64>>,
1800 sequence: SequenceNumber,
1801 ) -> KeyValues {
1802 let pk_values = vec![
1804 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1805 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1806 (0, Value::String(k0.clone().into())),
1807 (1, Value::String(k1.clone().into())),
1808 ];
1809 let mut encoded_key = Vec::new();
1810 primary_key_codec
1811 .encode_values(&pk_values, &mut encoded_key)
1812 .unwrap();
1813 assert!(!encoded_key.is_empty());
1814
1815 let column_schema = vec![
1817 api::v1::ColumnSchema {
1818 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1819 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1820 ConcreteDataType::binary_datatype(),
1821 )
1822 .unwrap()
1823 .datatype() as i32,
1824 semantic_type: api::v1::SemanticType::Tag as i32,
1825 ..Default::default()
1826 },
1827 api::v1::ColumnSchema {
1828 column_name: "ts".to_string(),
1829 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1830 ConcreteDataType::timestamp_millisecond_datatype(),
1831 )
1832 .unwrap()
1833 .datatype() as i32,
1834 semantic_type: api::v1::SemanticType::Timestamp as i32,
1835 ..Default::default()
1836 },
1837 api::v1::ColumnSchema {
1838 column_name: "v0".to_string(),
1839 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1840 ConcreteDataType::int64_datatype(),
1841 )
1842 .unwrap()
1843 .datatype() as i32,
1844 semantic_type: api::v1::SemanticType::Field as i32,
1845 ..Default::default()
1846 },
1847 api::v1::ColumnSchema {
1848 column_name: "v1".to_string(),
1849 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1850 ConcreteDataType::float64_datatype(),
1851 )
1852 .unwrap()
1853 .datatype() as i32,
1854 semantic_type: api::v1::SemanticType::Field as i32,
1855 ..Default::default()
1856 },
1857 ];
1858
1859 let rows = timestamps
1860 .zip(values)
1861 .map(|(ts, v)| Row {
1862 values: vec![
1863 api::v1::Value {
1864 value_data: Some(api::v1::value::ValueData::BinaryValue(
1865 encoded_key.clone(),
1866 )),
1867 },
1868 api::v1::Value {
1869 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1870 },
1871 api::v1::Value {
1872 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1873 },
1874 api::v1::Value {
1875 value_data: v.map(api::v1::value::ValueData::F64Value),
1876 },
1877 ],
1878 })
1879 .collect();
1880
1881 let mutation = api::v1::Mutation {
1882 op_type: 1,
1883 sequence,
1884 rows: Some(api::v1::Rows {
1885 schema: column_schema,
1886 rows,
1887 }),
1888 write_hint: Some(WriteHint {
1889 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1890 }),
1891 };
1892 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1893 }
1894
1895 #[test]
1896 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1897 use api::v1::SemanticType;
1898 use datatypes::schema::ColumnSchema;
1899 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1900 use store_api::storage::RegionId;
1901
1902 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1903 builder
1904 .push_column_metadata(ColumnMetadata {
1905 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1906 semantic_type: SemanticType::Tag,
1907 column_id: 0,
1908 })
1909 .push_column_metadata(ColumnMetadata {
1910 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1911 semantic_type: SemanticType::Tag,
1912 column_id: 1,
1913 })
1914 .push_column_metadata(ColumnMetadata {
1915 column_schema: ColumnSchema::new(
1916 "ts",
1917 ConcreteDataType::timestamp_millisecond_datatype(),
1918 false,
1919 ),
1920 semantic_type: SemanticType::Timestamp,
1921 column_id: 2,
1922 })
1923 .push_column_metadata(ColumnMetadata {
1924 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1925 semantic_type: SemanticType::Field,
1926 column_id: 3,
1927 })
1928 .push_column_metadata(ColumnMetadata {
1929 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1930 semantic_type: SemanticType::Field,
1931 column_id: 4,
1932 })
1933 .primary_key(vec![0, 1])
1934 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1935 let metadata = Arc::new(builder.build().unwrap());
1936
1937 let primary_key_codec = build_primary_key_codec(&metadata);
1938 let schema = to_flat_sst_arrow_schema(
1939 &metadata,
1940 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1941 );
1942
1943 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1944 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1945
1946 let capacity = 100;
1947 let mut converter =
1948 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1949
1950 let key_values1 = build_key_values_with_sparse_encoding(
1951 &metadata,
1952 &primary_key_codec,
1953 2048u32, 100u64, "key11".to_string(),
1956 "key21".to_string(),
1957 vec![1000, 2000].into_iter(),
1958 vec![Some(1.0), Some(2.0)].into_iter(),
1959 1,
1960 );
1961
1962 let key_values2 = build_key_values_with_sparse_encoding(
1963 &metadata,
1964 &primary_key_codec,
1965 4096u32, 200u64, "key12".to_string(),
1968 "key22".to_string(),
1969 vec![1500].into_iter(),
1970 vec![Some(3.0)].into_iter(),
1971 2,
1972 );
1973
1974 converter.append_key_values(&key_values1).unwrap();
1975 converter.append_key_values(&key_values2).unwrap();
1976
1977 let bulk_part = converter.convert().unwrap();
1978
1979 assert_eq!(bulk_part.num_rows(), 3);
1980 assert_eq!(bulk_part.min_timestamp, 1000);
1981 assert_eq!(bulk_part.max_timestamp, 2000);
1982 assert_eq!(bulk_part.sequence, 2);
1983 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1984
1985 let schema = bulk_part.batch.schema();
1989 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1990 assert_eq!(
1991 field_names,
1992 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1993 );
1994
1995 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1997 let dict_array = primary_key_column
1998 .as_any()
1999 .downcast_ref::<DictionaryArray<UInt32Type>>()
2000 .unwrap();
2001
2002 assert!(!dict_array.is_empty());
2004 assert_eq!(dict_array.len(), 3); let values = dict_array
2008 .values()
2009 .as_any()
2010 .downcast_ref::<BinaryArray>()
2011 .unwrap();
2012 for i in 0..values.len() {
2013 assert!(
2014 !values.value(i).is_empty(),
2015 "Encoded primary key should not be empty"
2016 );
2017 }
2018 }
2019}