1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::data_type::AsBytes;
49use parquet::file::metadata::ParquetMetaData;
50use parquet::file::properties::WriterProperties;
51use snafu::{OptionExt, ResultExt, Snafu};
52use store_api::codec::PrimaryKeyEncoding;
53use store_api::metadata::{RegionMetadata, RegionMetadataRef};
54use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
55use store_api::storage::{FileId, SequenceNumber};
56use table::predicate::Predicate;
57
58use crate::error::{
59 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
60 EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
61};
62use crate::memtable::BoxedRecordBatchIterator;
63use crate::memtable::bulk::context::BulkIterContextRef;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::sst::index::IndexOutput;
67use crate::sst::parquet::flat_format::primary_key_column_index;
68use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
69use crate::sst::parquet::helper::parse_parquet_metadata;
70use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
71use crate::sst::to_sst_arrow_schema;
72
73const INIT_DICT_VALUE_CAPACITY: usize = 8;
74
75#[derive(Clone)]
77pub struct BulkPart {
78 pub batch: RecordBatch,
79 pub max_timestamp: i64,
80 pub min_timestamp: i64,
81 pub sequence: u64,
82 pub timestamp_index: usize,
83 pub raw_data: Option<ArrowIpc>,
84}
85
86impl TryFrom<BulkWalEntry> for BulkPart {
87 type Error = error::Error;
88
89 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
90 match value.body.expect("Entry payload should be present") {
91 Body::ArrowIpc(ipc) => {
92 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
93 .context(error::ConvertBulkWalEntrySnafu)?;
94 let batch = decoder
95 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
96 .context(error::ConvertBulkWalEntrySnafu)?;
97 Ok(Self {
98 batch,
99 max_timestamp: value.max_ts,
100 min_timestamp: value.min_ts,
101 sequence: value.sequence,
102 timestamp_index: value.timestamp_index as usize,
103 raw_data: Some(ipc),
104 })
105 }
106 }
107 }
108}
109
110impl TryFrom<&BulkPart> for BulkWalEntry {
111 type Error = error::Error;
112
113 fn try_from(value: &BulkPart) -> Result<Self> {
114 if let Some(ipc) = &value.raw_data {
115 Ok(BulkWalEntry {
116 sequence: value.sequence,
117 max_ts: value.max_timestamp,
118 min_ts: value.min_timestamp,
119 timestamp_index: value.timestamp_index as u32,
120 body: Some(Body::ArrowIpc(ipc.clone())),
121 })
122 } else {
123 let mut encoder = FlightEncoder::default();
124 let schema_bytes = encoder
125 .encode_schema(value.batch.schema().as_ref())
126 .data_header;
127 let [rb_data] = encoder
128 .encode(FlightMessage::RecordBatch(value.batch.clone()))
129 .try_into()
130 .map_err(|_| {
131 error::UnsupportedOperationSnafu {
132 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
133 }
134 .build()
135 })?;
136 Ok(BulkWalEntry {
137 sequence: value.sequence,
138 max_ts: value.max_timestamp,
139 min_ts: value.min_timestamp,
140 timestamp_index: value.timestamp_index as u32,
141 body: Some(Body::ArrowIpc(ArrowIpc {
142 schema: schema_bytes,
143 data_header: rb_data.data_header,
144 payload: rb_data.data_body,
145 })),
146 })
147 }
148 }
149}
150
151impl BulkPart {
152 pub(crate) fn estimated_size(&self) -> usize {
153 record_batch_estimated_size(&self.batch)
154 }
155
156 pub fn estimated_series_count(&self) -> usize {
159 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
160 let pk_column = self.batch.column(pk_column_idx);
161 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
162 dict_array.values().len()
163 } else {
164 0
165 }
166 }
167
168 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
170 let vectors = region_metadata
171 .schema
172 .column_schemas()
173 .iter()
174 .map(|col| match self.batch.column_by_name(&col.name) {
175 None => Ok(None),
176 Some(col) => Helper::try_into_vector(col).map(Some),
177 })
178 .collect::<datatypes::error::Result<Vec<_>>>()
179 .context(error::ComputeVectorSnafu)?;
180
181 let rows = (0..self.num_rows())
182 .map(|row_idx| {
183 let values = (0..self.batch.num_columns())
184 .map(|col_idx| {
185 if let Some(v) = &vectors[col_idx] {
186 value_to_grpc_value(v.get(row_idx))
187 } else {
188 api::v1::Value { value_data: None }
189 }
190 })
191 .collect::<Vec<_>>();
192 api::v1::Row { values }
193 })
194 .collect::<Vec<_>>();
195
196 let schema = region_metadata
197 .column_metadatas
198 .iter()
199 .map(|c| {
200 let data_type_wrapper =
201 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
202 Ok(api::v1::ColumnSchema {
203 column_name: c.column_schema.name.clone(),
204 datatype: data_type_wrapper.datatype() as i32,
205 semantic_type: c.semantic_type as i32,
206 ..Default::default()
207 })
208 })
209 .collect::<api::error::Result<Vec<_>>>()
210 .context(error::ConvertColumnDataTypeSnafu {
211 reason: "failed to convert region metadata to column schema",
212 })?;
213
214 let rows = api::v1::Rows { schema, rows };
215
216 Ok(Mutation {
217 op_type: OpType::Put as i32,
218 sequence: self.sequence,
219 rows: Some(rows),
220 write_hint: None,
221 })
222 }
223
224 pub fn timestamps(&self) -> &ArrayRef {
225 self.batch.column(self.timestamp_index)
226 }
227
228 pub fn num_rows(&self) -> usize {
229 self.batch.num_rows()
230 }
231}
232
233pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
235 batch
236 .columns()
237 .iter()
238 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
240 .sum()
241}
242
243enum PrimaryKeyColumnBuilder {
245 StringDict(StringDictionaryBuilder<UInt32Type>),
247 Vector(Box<dyn MutableVector>),
249}
250
251impl PrimaryKeyColumnBuilder {
252 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
254 match self {
255 PrimaryKeyColumnBuilder::StringDict(builder) => {
256 if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
257 builder.append_value(s);
259 } else {
260 builder.append_null();
261 }
262 }
263 PrimaryKeyColumnBuilder::Vector(builder) => {
264 builder.push_value_ref(value);
265 }
266 }
267 Ok(())
268 }
269
270 fn into_arrow_array(self) -> ArrayRef {
272 match self {
273 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
274 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
275 }
276 }
277}
278
279pub struct BulkPartConverter {
281 region_metadata: RegionMetadataRef,
283 schema: SchemaRef,
285 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
287 key_buf: Vec<u8>,
289 key_array_builder: PrimaryKeyArrayBuilder,
291 value_builder: ValueBuilder,
293 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
296
297 max_ts: i64,
299 min_ts: i64,
301 max_sequence: SequenceNumber,
303}
304
305impl BulkPartConverter {
306 pub fn new(
311 region_metadata: &RegionMetadataRef,
312 schema: SchemaRef,
313 capacity: usize,
314 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
315 store_primary_key_columns: bool,
316 ) -> Self {
317 debug_assert_eq!(
318 region_metadata.primary_key_encoding,
319 primary_key_codec.encoding()
320 );
321
322 let primary_key_column_builders = if store_primary_key_columns
323 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
324 {
325 new_primary_key_column_builders(region_metadata, capacity)
326 } else {
327 Vec::new()
328 };
329
330 Self {
331 region_metadata: region_metadata.clone(),
332 schema,
333 primary_key_codec,
334 key_buf: Vec::new(),
335 key_array_builder: PrimaryKeyArrayBuilder::new(),
336 value_builder: ValueBuilder::new(region_metadata, capacity),
337 primary_key_column_builders,
338 min_ts: i64::MAX,
339 max_ts: i64::MIN,
340 max_sequence: SequenceNumber::MIN,
341 }
342 }
343
344 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
346 for kv in key_values.iter() {
347 self.append_key_value(&kv)?;
348 }
349
350 Ok(())
351 }
352
353 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
357 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
359 let mut primary_keys = kv.primary_keys();
362 if let Some(encoded) = primary_keys
363 .next()
364 .context(ColumnNotFoundSnafu {
365 column: PRIMARY_KEY_COLUMN_NAME,
366 })?
367 .as_binary()
368 .context(DataTypeMismatchSnafu)?
369 {
370 self.key_array_builder
371 .append(encoded)
372 .context(ComputeArrowSnafu)?;
373 } else {
374 self.key_array_builder
375 .append("")
376 .context(ComputeArrowSnafu)?;
377 }
378 } else {
379 self.key_buf.clear();
381 self.primary_key_codec
382 .encode_key_value(kv, &mut self.key_buf)
383 .context(EncodeSnafu)?;
384 self.key_array_builder
385 .append(&self.key_buf)
386 .context(ComputeArrowSnafu)?;
387 };
388
389 if !self.primary_key_column_builders.is_empty() {
391 for (builder, pk_value) in self
392 .primary_key_column_builders
393 .iter_mut()
394 .zip(kv.primary_keys())
395 {
396 builder.push_value_ref(pk_value)?;
397 }
398 }
399
400 self.value_builder.push(
402 kv.timestamp(),
403 kv.sequence(),
404 kv.op_type() as u8,
405 kv.fields(),
406 );
407
408 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
411 self.min_ts = self.min_ts.min(ts);
412 self.max_ts = self.max_ts.max(ts);
413 self.max_sequence = self.max_sequence.max(kv.sequence());
414
415 Ok(())
416 }
417
418 pub fn convert(mut self) -> Result<BulkPart> {
422 let values = Values::from(self.value_builder);
423 let mut columns =
424 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
425
426 for builder in self.primary_key_column_builders {
428 columns.push(builder.into_arrow_array());
429 }
430 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
432 let timestamp_index = columns.len();
434 columns.push(values.timestamp.to_arrow_array());
435 let pk_array = self.key_array_builder.finish();
437 columns.push(Arc::new(pk_array));
438 columns.push(values.sequence.to_arrow_array());
440 columns.push(values.op_type.to_arrow_array());
441
442 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
443 let batch = sort_primary_key_record_batch(&batch)?;
445
446 Ok(BulkPart {
447 batch,
448 max_timestamp: self.max_ts,
449 min_timestamp: self.min_ts,
450 sequence: self.max_sequence,
451 timestamp_index,
452 raw_data: None,
453 })
454 }
455}
456
457fn new_primary_key_column_builders(
458 metadata: &RegionMetadata,
459 capacity: usize,
460) -> Vec<PrimaryKeyColumnBuilder> {
461 metadata
462 .primary_key_columns()
463 .map(|col| {
464 if col.column_schema.data_type.is_string() {
465 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
466 capacity,
467 INIT_DICT_VALUE_CAPACITY,
468 capacity,
469 ))
470 } else {
471 PrimaryKeyColumnBuilder::Vector(
472 col.column_schema.data_type.create_mutable_vector(capacity),
473 )
474 }
475 })
476 .collect()
477}
478
479fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
481 let total_columns = batch.num_columns();
482 let sort_columns = vec![
483 SortColumn {
485 values: batch.column(total_columns - 3).clone(),
486 options: Some(SortOptions {
487 descending: false,
488 nulls_first: true,
489 }),
490 },
491 SortColumn {
493 values: batch.column(total_columns - 4).clone(),
494 options: Some(SortOptions {
495 descending: false,
496 nulls_first: true,
497 }),
498 },
499 SortColumn {
501 values: batch.column(total_columns - 2).clone(),
502 options: Some(SortOptions {
503 descending: true,
504 nulls_first: true,
505 }),
506 },
507 ];
508
509 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
510 .context(ComputeArrowSnafu)?;
511
512 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
513}
514
515#[derive(Debug, Clone)]
516pub struct EncodedBulkPart {
517 data: Bytes,
518 metadata: BulkPartMeta,
519}
520
521impl EncodedBulkPart {
522 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
523 Self { data, metadata }
524 }
525
526 pub(crate) fn metadata(&self) -> &BulkPartMeta {
527 &self.metadata
528 }
529
530 pub(crate) fn size_bytes(&self) -> usize {
532 self.data.len()
533 }
534
535 pub(crate) fn data(&self) -> &Bytes {
537 &self.data
538 }
539
540 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
548 let unit = self.metadata.region_metadata.time_index_type().unit();
549 SstInfo {
550 file_id,
551 time_range: (
552 Timestamp::new(self.metadata.min_timestamp, unit),
553 Timestamp::new(self.metadata.max_timestamp, unit),
554 ),
555 file_size: self.data.len() as u64,
556 num_rows: self.metadata.num_rows,
557 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
558 file_metadata: Some(self.metadata.parquet_metadata.clone()),
559 index_metadata: IndexOutput::default(),
560 }
561 }
562
563 pub(crate) fn read(
564 &self,
565 context: BulkIterContextRef,
566 sequence: Option<SequenceNumber>,
567 ) -> Result<Option<BoxedRecordBatchIterator>> {
568 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
570
571 if row_groups_to_read.is_empty() {
572 return Ok(None);
574 }
575
576 let iter = EncodedBulkPartIter::try_new(
577 context,
578 row_groups_to_read,
579 self.metadata.parquet_metadata.clone(),
580 self.data.clone(),
581 sequence,
582 )?;
583 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
584 }
585}
586
587#[derive(Debug, Clone)]
588pub struct BulkPartMeta {
589 pub num_rows: usize,
591 pub max_timestamp: i64,
593 pub min_timestamp: i64,
595 pub parquet_metadata: Arc<ParquetMetaData>,
597 pub region_metadata: RegionMetadataRef,
599}
600
601#[derive(Default, Debug)]
603pub struct BulkPartEncodeMetrics {
604 pub iter_cost: Duration,
606 pub write_cost: Duration,
608 pub raw_size: usize,
610 pub encoded_size: usize,
612 pub num_rows: usize,
614}
615
616pub struct BulkPartEncoder {
617 metadata: RegionMetadataRef,
618 row_group_size: usize,
619 writer_props: Option<WriterProperties>,
620}
621
622impl BulkPartEncoder {
623 pub(crate) fn new(
624 metadata: RegionMetadataRef,
625 row_group_size: usize,
626 ) -> Result<BulkPartEncoder> {
627 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
629 let key_value_meta =
630 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
631
632 let writer_props = Some(
634 WriterProperties::builder()
635 .set_key_value_metadata(Some(vec![key_value_meta]))
636 .set_write_batch_size(row_group_size)
637 .set_max_row_group_size(row_group_size)
638 .set_column_index_truncate_length(None)
639 .set_statistics_truncate_length(None)
640 .build(),
641 );
642
643 Ok(Self {
644 metadata,
645 row_group_size,
646 writer_props,
647 })
648 }
649}
650
651impl BulkPartEncoder {
652 pub fn encode_record_batch_iter(
654 &self,
655 iter: BoxedRecordBatchIterator,
656 arrow_schema: SchemaRef,
657 min_timestamp: i64,
658 max_timestamp: i64,
659 metrics: &mut BulkPartEncodeMetrics,
660 ) -> Result<Option<EncodedBulkPart>> {
661 let mut buf = Vec::with_capacity(4096);
662 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
663 .context(EncodeMemtableSnafu)?;
664 let mut total_rows = 0;
665
666 let mut iter_start = Instant::now();
668 for batch_result in iter {
669 metrics.iter_cost += iter_start.elapsed();
670 let batch = batch_result?;
671 if batch.num_rows() == 0 {
672 continue;
673 }
674
675 metrics.raw_size += record_batch_estimated_size(&batch);
676 let write_start = Instant::now();
677 writer.write(&batch).context(EncodeMemtableSnafu)?;
678 metrics.write_cost += write_start.elapsed();
679 total_rows += batch.num_rows();
680 iter_start = Instant::now();
681 }
682 metrics.iter_cost += iter_start.elapsed();
683 iter_start = Instant::now();
684
685 if total_rows == 0 {
686 return Ok(None);
687 }
688
689 let close_start = Instant::now();
690 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
691 metrics.write_cost += close_start.elapsed();
692 metrics.encoded_size += buf.len();
693 metrics.num_rows += total_rows;
694
695 let buf = Bytes::from(buf);
696 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
697
698 Ok(Some(EncodedBulkPart {
699 data: buf,
700 metadata: BulkPartMeta {
701 num_rows: total_rows,
702 max_timestamp,
703 min_timestamp,
704 parquet_metadata,
705 region_metadata: self.metadata.clone(),
706 },
707 }))
708 }
709
710 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
712 if part.batch.num_rows() == 0 {
713 return Ok(None);
714 }
715
716 let mut buf = Vec::with_capacity(4096);
717 let arrow_schema = part.batch.schema();
718
719 let file_metadata = {
720 let mut writer =
721 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
722 .context(EncodeMemtableSnafu)?;
723 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
724 writer.finish().context(EncodeMemtableSnafu)?
725 };
726
727 let buf = Bytes::from(buf);
728 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
729
730 Ok(Some(EncodedBulkPart {
731 data: buf,
732 metadata: BulkPartMeta {
733 num_rows: part.batch.num_rows(),
734 max_timestamp: part.max_timestamp,
735 min_timestamp: part.min_timestamp,
736 parquet_metadata,
737 region_metadata: self.metadata.clone(),
738 },
739 }))
740 }
741}
742
743fn mutations_to_record_batch(
745 mutations: &[Mutation],
746 metadata: &RegionMetadataRef,
747 pk_encoder: &DensePrimaryKeyCodec,
748 dedup: bool,
749) -> Result<Option<(RecordBatch, i64, i64)>> {
750 let total_rows: usize = mutations
751 .iter()
752 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
753 .sum();
754
755 if total_rows == 0 {
756 return Ok(None);
757 }
758
759 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
760
761 let mut ts_vector: Box<dyn MutableVector> = metadata
762 .time_index_column()
763 .column_schema
764 .data_type
765 .create_mutable_vector(total_rows);
766 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
767 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
768
769 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
770 .field_columns()
771 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
772 .collect();
773
774 let mut pk_buffer = vec![];
775 for m in mutations {
776 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
777 continue;
778 };
779
780 for row in key_values.iter() {
781 pk_buffer.clear();
782 pk_encoder
783 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
784 .context(EncodeSnafu)?;
785 pk_builder.append_value(pk_buffer.as_bytes());
786 ts_vector.push_value_ref(row.timestamp());
787 sequence_builder.append_value(row.sequence());
788 op_type_builder.append_value(row.op_type() as u8);
789 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
790 builder.push_value_ref(field);
791 }
792 }
793 }
794
795 let arrow_schema = to_sst_arrow_schema(metadata);
796 let timestamp_unit = metadata
798 .time_index_column()
799 .column_schema
800 .data_type
801 .as_timestamp()
802 .unwrap()
803 .unit();
804 let sorter = ArraysSorter {
805 encoded_primary_keys: pk_builder.finish(),
806 timestamp_unit,
807 timestamp: ts_vector.to_vector().to_arrow_array(),
808 sequence: sequence_builder.finish(),
809 op_type: op_type_builder.finish(),
810 fields: field_builders
811 .iter_mut()
812 .map(|f| f.to_vector().to_arrow_array()),
813 dedup,
814 arrow_schema,
815 };
816
817 sorter.sort().map(Some)
818}
819
820struct ArraysSorter<I> {
821 encoded_primary_keys: BinaryArray,
822 timestamp_unit: TimeUnit,
823 timestamp: ArrayRef,
824 sequence: UInt64Array,
825 op_type: UInt8Array,
826 fields: I,
827 dedup: bool,
828 arrow_schema: SchemaRef,
829}
830
831impl<I> ArraysSorter<I>
832where
833 I: Iterator<Item = ArrayRef>,
834{
835 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
837 debug_assert!(!self.timestamp.is_empty());
838 debug_assert!(self.timestamp.len() == self.sequence.len());
839 debug_assert!(self.timestamp.len() == self.op_type.len());
840 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
841
842 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
843 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
844 let mut to_sort = self
845 .encoded_primary_keys
846 .iter()
847 .zip(timestamp_iter)
848 .zip(self.sequence.iter())
849 .map(|((pk, timestamp), sequence)| {
850 max_timestamp = max_timestamp.max(*timestamp);
851 min_timestamp = min_timestamp.min(*timestamp);
852 (pk, timestamp, sequence)
853 })
854 .enumerate()
855 .collect::<Vec<_>>();
856
857 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
858 l_pk.cmp(r_pk)
859 .then(l_ts.cmp(r_ts))
860 .then(l_seq.cmp(r_seq).reverse())
861 });
862
863 if self.dedup {
864 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
866 l_pk == r_pk && l_ts == r_ts
867 });
868 }
869
870 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
871
872 let pk_dictionary = Arc::new(binary_array_to_dictionary(
873 arrow::compute::take(
875 &self.encoded_primary_keys,
876 &indices,
877 Some(TakeOptions {
878 check_bounds: false,
879 }),
880 )
881 .context(ComputeArrowSnafu)?
882 .as_any()
883 .downcast_ref::<BinaryArray>()
884 .unwrap(),
885 )?) as ArrayRef;
886
887 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
888 for arr in self.fields {
889 arrays.push(
890 arrow::compute::take(
891 &arr,
892 &indices,
893 Some(TakeOptions {
894 check_bounds: false,
895 }),
896 )
897 .context(ComputeArrowSnafu)?,
898 );
899 }
900
901 let timestamp = arrow::compute::take(
902 &self.timestamp,
903 &indices,
904 Some(TakeOptions {
905 check_bounds: false,
906 }),
907 )
908 .context(ComputeArrowSnafu)?;
909
910 arrays.push(timestamp);
911 arrays.push(pk_dictionary);
912 arrays.push(
913 arrow::compute::take(
914 &self.sequence,
915 &indices,
916 Some(TakeOptions {
917 check_bounds: false,
918 }),
919 )
920 .context(ComputeArrowSnafu)?,
921 );
922
923 arrays.push(
924 arrow::compute::take(
925 &self.op_type,
926 &indices,
927 Some(TakeOptions {
928 check_bounds: false,
929 }),
930 )
931 .context(ComputeArrowSnafu)?,
932 );
933
934 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
935 Ok((batch, min_timestamp, max_timestamp))
936 }
937}
938
939fn timestamp_array_to_iter(
941 timestamp_unit: TimeUnit,
942 timestamp: &ArrayRef,
943) -> impl Iterator<Item = &i64> {
944 match timestamp_unit {
945 TimeUnit::Second => timestamp
947 .as_any()
948 .downcast_ref::<TimestampSecondArray>()
949 .unwrap()
950 .values()
951 .iter(),
952 TimeUnit::Millisecond => timestamp
953 .as_any()
954 .downcast_ref::<TimestampMillisecondArray>()
955 .unwrap()
956 .values()
957 .iter(),
958 TimeUnit::Microsecond => timestamp
959 .as_any()
960 .downcast_ref::<TimestampMicrosecondArray>()
961 .unwrap()
962 .values()
963 .iter(),
964 TimeUnit::Nanosecond => timestamp
965 .as_any()
966 .downcast_ref::<TimestampNanosecondArray>()
967 .unwrap()
968 .values()
969 .iter(),
970 }
971}
972
973fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
975 if input.is_empty() {
976 return Ok(DictionaryArray::new(
977 UInt32Array::from(Vec::<u32>::new()),
978 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
979 ));
980 }
981 let mut keys = Vec::with_capacity(16);
982 let mut values = BinaryBuilder::new();
983 let mut prev: usize = 0;
984 keys.push(prev as u32);
985 values.append_value(input.value(prev));
986
987 for current_bytes in input.iter().skip(1) {
988 let current_bytes = current_bytes.unwrap();
990 let prev_bytes = input.value(prev);
991 if current_bytes != prev_bytes {
992 values.append_value(current_bytes);
993 prev += 1;
994 }
995 keys.push(prev as u32);
996 }
997
998 Ok(DictionaryArray::new(
999 UInt32Array::from(keys),
1000 Arc::new(values.finish()) as ArrayRef,
1001 ))
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use std::collections::VecDeque;
1007
1008 use api::v1::{Row, WriteHint};
1009 use datafusion_common::ScalarValue;
1010 use datatypes::arrow::array::Float64Array;
1011 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1012 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1013 use store_api::storage::consts::ReservedColumnId;
1014
1015 use super::*;
1016 use crate::memtable::bulk::context::BulkIterContext;
1017 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1018 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1019 use crate::test_util::memtable_util::{
1020 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1021 };
1022
1023 fn check_binary_array_to_dictionary(
1024 input: &[&[u8]],
1025 expected_keys: &[u32],
1026 expected_values: &[&[u8]],
1027 ) {
1028 let input = BinaryArray::from_iter_values(input.iter());
1029 let array = binary_array_to_dictionary(&input).unwrap();
1030 assert_eq!(
1031 &expected_keys,
1032 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1033 );
1034 assert_eq!(
1035 expected_values,
1036 &array
1037 .values()
1038 .as_any()
1039 .downcast_ref::<BinaryArray>()
1040 .unwrap()
1041 .iter()
1042 .map(|v| v.unwrap())
1043 .collect::<Vec<_>>()
1044 );
1045 }
1046
1047 #[test]
1048 fn test_binary_array_to_dictionary() {
1049 check_binary_array_to_dictionary(&[], &[], &[]);
1050
1051 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1052
1053 check_binary_array_to_dictionary(
1054 &["a".as_bytes(), "a".as_bytes()],
1055 &[0, 0],
1056 &["a".as_bytes()],
1057 );
1058
1059 check_binary_array_to_dictionary(
1060 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1061 &[0, 0, 1],
1062 &["a".as_bytes(), "b".as_bytes()],
1063 );
1064
1065 check_binary_array_to_dictionary(
1066 &[
1067 "a".as_bytes(),
1068 "a".as_bytes(),
1069 "b".as_bytes(),
1070 "c".as_bytes(),
1071 ],
1072 &[0, 0, 1, 2],
1073 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1074 );
1075 }
1076
1077 struct MutationInput<'a> {
1078 k0: &'a str,
1079 k1: u32,
1080 timestamps: &'a [i64],
1081 v1: &'a [Option<f64>],
1082 sequence: u64,
1083 }
1084
1085 #[derive(Debug, PartialOrd, PartialEq)]
1086 struct BatchOutput<'a> {
1087 pk_values: &'a [Value],
1088 timestamps: &'a [i64],
1089 v1: &'a [Option<f64>],
1090 }
1091
1092 fn check_mutations_to_record_batches(
1093 input: &[MutationInput],
1094 expected: &[BatchOutput],
1095 expected_timestamp: (i64, i64),
1096 dedup: bool,
1097 ) {
1098 let metadata = metadata_for_test();
1099 let mutations = input
1100 .iter()
1101 .map(|m| {
1102 build_key_values_with_ts_seq_values(
1103 &metadata,
1104 m.k0.to_string(),
1105 m.k1,
1106 m.timestamps.iter().copied(),
1107 m.v1.iter().copied(),
1108 m.sequence,
1109 )
1110 .mutation
1111 })
1112 .collect::<Vec<_>>();
1113 let total_rows: usize = mutations
1114 .iter()
1115 .flat_map(|m| m.rows.iter())
1116 .map(|r| r.rows.len())
1117 .sum();
1118
1119 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1120
1121 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1122 .unwrap()
1123 .unwrap();
1124 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1125 let mut batches = VecDeque::new();
1126 read_format
1127 .convert_record_batch(&batch, None, &mut batches)
1128 .unwrap();
1129 if !dedup {
1130 assert_eq!(
1131 total_rows,
1132 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1133 );
1134 }
1135 let batch_values = batches
1136 .into_iter()
1137 .map(|b| {
1138 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1139 let timestamps = b
1140 .timestamps()
1141 .as_any()
1142 .downcast_ref::<TimestampMillisecondVector>()
1143 .unwrap()
1144 .iter_data()
1145 .map(|v| v.unwrap().0.value())
1146 .collect::<Vec<_>>();
1147 let float_values = b.fields()[1]
1148 .data
1149 .as_any()
1150 .downcast_ref::<Float64Vector>()
1151 .unwrap()
1152 .iter_data()
1153 .collect::<Vec<_>>();
1154
1155 (pk_values, timestamps, float_values)
1156 })
1157 .collect::<Vec<_>>();
1158 assert_eq!(expected.len(), batch_values.len());
1159
1160 for idx in 0..expected.len() {
1161 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1162 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1163 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1164 }
1165 }
1166
1167 #[test]
1168 fn test_mutations_to_record_batch() {
1169 check_mutations_to_record_batches(
1170 &[MutationInput {
1171 k0: "a",
1172 k1: 0,
1173 timestamps: &[0],
1174 v1: &[Some(0.1)],
1175 sequence: 0,
1176 }],
1177 &[BatchOutput {
1178 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1179 timestamps: &[0],
1180 v1: &[Some(0.1)],
1181 }],
1182 (0, 0),
1183 true,
1184 );
1185
1186 check_mutations_to_record_batches(
1187 &[
1188 MutationInput {
1189 k0: "a",
1190 k1: 0,
1191 timestamps: &[0],
1192 v1: &[Some(0.1)],
1193 sequence: 0,
1194 },
1195 MutationInput {
1196 k0: "b",
1197 k1: 0,
1198 timestamps: &[0],
1199 v1: &[Some(0.0)],
1200 sequence: 0,
1201 },
1202 MutationInput {
1203 k0: "a",
1204 k1: 0,
1205 timestamps: &[1],
1206 v1: &[Some(0.2)],
1207 sequence: 1,
1208 },
1209 MutationInput {
1210 k0: "a",
1211 k1: 1,
1212 timestamps: &[1],
1213 v1: &[Some(0.3)],
1214 sequence: 2,
1215 },
1216 ],
1217 &[
1218 BatchOutput {
1219 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1220 timestamps: &[0, 1],
1221 v1: &[Some(0.1), Some(0.2)],
1222 },
1223 BatchOutput {
1224 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1225 timestamps: &[1],
1226 v1: &[Some(0.3)],
1227 },
1228 BatchOutput {
1229 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1230 timestamps: &[0],
1231 v1: &[Some(0.0)],
1232 },
1233 ],
1234 (0, 1),
1235 true,
1236 );
1237
1238 check_mutations_to_record_batches(
1239 &[
1240 MutationInput {
1241 k0: "a",
1242 k1: 0,
1243 timestamps: &[0],
1244 v1: &[Some(0.1)],
1245 sequence: 0,
1246 },
1247 MutationInput {
1248 k0: "b",
1249 k1: 0,
1250 timestamps: &[0],
1251 v1: &[Some(0.0)],
1252 sequence: 0,
1253 },
1254 MutationInput {
1255 k0: "a",
1256 k1: 0,
1257 timestamps: &[0],
1258 v1: &[Some(0.2)],
1259 sequence: 1,
1260 },
1261 ],
1262 &[
1263 BatchOutput {
1264 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1265 timestamps: &[0],
1266 v1: &[Some(0.2)],
1267 },
1268 BatchOutput {
1269 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1270 timestamps: &[0],
1271 v1: &[Some(0.0)],
1272 },
1273 ],
1274 (0, 0),
1275 true,
1276 );
1277 check_mutations_to_record_batches(
1278 &[
1279 MutationInput {
1280 k0: "a",
1281 k1: 0,
1282 timestamps: &[0],
1283 v1: &[Some(0.1)],
1284 sequence: 0,
1285 },
1286 MutationInput {
1287 k0: "b",
1288 k1: 0,
1289 timestamps: &[0],
1290 v1: &[Some(0.0)],
1291 sequence: 0,
1292 },
1293 MutationInput {
1294 k0: "a",
1295 k1: 0,
1296 timestamps: &[0],
1297 v1: &[Some(0.2)],
1298 sequence: 1,
1299 },
1300 ],
1301 &[
1302 BatchOutput {
1303 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1304 timestamps: &[0, 0],
1305 v1: &[Some(0.2), Some(0.1)],
1306 },
1307 BatchOutput {
1308 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1309 timestamps: &[0],
1310 v1: &[Some(0.0)],
1311 },
1312 ],
1313 (0, 0),
1314 false,
1315 );
1316 }
1317
1318 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1319 let metadata = metadata_for_test();
1320 let kvs = input
1321 .iter()
1322 .map(|m| {
1323 build_key_values_with_ts_seq_values(
1324 &metadata,
1325 m.k0.to_string(),
1326 m.k1,
1327 m.timestamps.iter().copied(),
1328 m.v1.iter().copied(),
1329 m.sequence,
1330 )
1331 })
1332 .collect::<Vec<_>>();
1333 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1334 let primary_key_codec = build_primary_key_codec(&metadata);
1335 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1336 for kv in kvs {
1337 converter.append_key_values(&kv).unwrap();
1338 }
1339 let part = converter.convert().unwrap();
1340 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1341 encoder.encode_part(&part).unwrap().unwrap()
1342 }
1343
1344 #[test]
1345 fn test_write_and_read_part_projection() {
1346 let part = encode(&[
1347 MutationInput {
1348 k0: "a",
1349 k1: 0,
1350 timestamps: &[1],
1351 v1: &[Some(0.1)],
1352 sequence: 0,
1353 },
1354 MutationInput {
1355 k0: "b",
1356 k1: 0,
1357 timestamps: &[1],
1358 v1: &[Some(0.0)],
1359 sequence: 0,
1360 },
1361 MutationInput {
1362 k0: "a",
1363 k1: 0,
1364 timestamps: &[2],
1365 v1: &[Some(0.2)],
1366 sequence: 1,
1367 },
1368 ]);
1369
1370 let projection = &[4u32];
1371 let mut reader = part
1372 .read(
1373 Arc::new(BulkIterContext::new(
1374 part.metadata.region_metadata.clone(),
1375 &Some(projection.as_slice()),
1376 None,
1377 )),
1378 None,
1379 )
1380 .unwrap()
1381 .expect("expect at least one row group");
1382
1383 let mut total_rows_read = 0;
1384 let mut field: Vec<f64> = vec![];
1385 for res in reader {
1386 let batch = res.unwrap();
1387 assert_eq!(5, batch.num_columns());
1388 field.extend_from_slice(
1389 batch
1390 .column(0)
1391 .as_any()
1392 .downcast_ref::<Float64Array>()
1393 .unwrap()
1394 .values(),
1395 );
1396 total_rows_read += batch.num_rows();
1397 }
1398 assert_eq!(3, total_rows_read);
1399 assert_eq!(vec![0.1, 0.2, 0.0], field);
1400 }
1401
1402 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1403 let metadata = metadata_for_test();
1404 let kvs = key_values
1405 .into_iter()
1406 .map(|(k0, k1, (start, end), sequence)| {
1407 let ts = (start..end);
1408 let v1 = (start..end).map(|_| None);
1409 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1410 })
1411 .collect::<Vec<_>>();
1412 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1413 let primary_key_codec = build_primary_key_codec(&metadata);
1414 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1415 for kv in kvs {
1416 converter.append_key_values(&kv).unwrap();
1417 }
1418 let part = converter.convert().unwrap();
1419 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1420 encoder.encode_part(&part).unwrap().unwrap()
1421 }
1422
1423 fn check_prune_row_group(
1424 part: &EncodedBulkPart,
1425 predicate: Option<Predicate>,
1426 expected_rows: usize,
1427 ) {
1428 let context = Arc::new(BulkIterContext::new(
1429 part.metadata.region_metadata.clone(),
1430 &None,
1431 predicate,
1432 ));
1433 let mut reader = part
1434 .read(context, None)
1435 .unwrap()
1436 .expect("expect at least one row group");
1437 let mut total_rows_read = 0;
1438 for res in reader {
1439 let batch = res.unwrap();
1440 total_rows_read += batch.num_rows();
1441 }
1442 assert_eq!(expected_rows, total_rows_read);
1444 }
1445
1446 #[test]
1447 fn test_prune_row_groups() {
1448 let part = prepare(vec![
1449 ("a", 0, (0, 40), 1),
1450 ("a", 1, (0, 60), 1),
1451 ("b", 0, (0, 100), 2),
1452 ("b", 1, (100, 180), 3),
1453 ("b", 1, (180, 210), 4),
1454 ]);
1455
1456 let context = Arc::new(BulkIterContext::new(
1457 part.metadata.region_metadata.clone(),
1458 &None,
1459 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1460 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1461 )])),
1462 ));
1463 assert!(part.read(context, None).unwrap().is_none());
1464
1465 check_prune_row_group(&part, None, 310);
1466
1467 check_prune_row_group(
1468 &part,
1469 Some(Predicate::new(vec![
1470 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1471 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1472 ])),
1473 40,
1474 );
1475
1476 check_prune_row_group(
1477 &part,
1478 Some(Predicate::new(vec![
1479 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1480 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1481 ])),
1482 60,
1483 );
1484
1485 check_prune_row_group(
1486 &part,
1487 Some(Predicate::new(vec![
1488 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1489 ])),
1490 100,
1491 );
1492
1493 check_prune_row_group(
1494 &part,
1495 Some(Predicate::new(vec![
1496 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1497 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1498 ])),
1499 100,
1500 );
1501
1502 check_prune_row_group(
1504 &part,
1505 Some(Predicate::new(vec![
1506 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1507 ])),
1508 1,
1509 );
1510 }
1511
1512 #[test]
1513 fn test_bulk_part_converter_append_and_convert() {
1514 let metadata = metadata_for_test();
1515 let capacity = 100;
1516 let primary_key_codec = build_primary_key_codec(&metadata);
1517 let schema = to_flat_sst_arrow_schema(
1518 &metadata,
1519 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1520 );
1521
1522 let mut converter =
1523 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1524
1525 let key_values1 = build_key_values_with_ts_seq_values(
1526 &metadata,
1527 "key1".to_string(),
1528 1u32,
1529 vec![1000, 2000].into_iter(),
1530 vec![Some(1.0), Some(2.0)].into_iter(),
1531 1,
1532 );
1533
1534 let key_values2 = build_key_values_with_ts_seq_values(
1535 &metadata,
1536 "key2".to_string(),
1537 2u32,
1538 vec![1500].into_iter(),
1539 vec![Some(3.0)].into_iter(),
1540 2,
1541 );
1542
1543 converter.append_key_values(&key_values1).unwrap();
1544 converter.append_key_values(&key_values2).unwrap();
1545
1546 let bulk_part = converter.convert().unwrap();
1547
1548 assert_eq!(bulk_part.num_rows(), 3);
1549 assert_eq!(bulk_part.min_timestamp, 1000);
1550 assert_eq!(bulk_part.max_timestamp, 2000);
1551 assert_eq!(bulk_part.sequence, 2);
1552 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1553
1554 let schema = bulk_part.batch.schema();
1557 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1558 assert_eq!(
1559 field_names,
1560 vec![
1561 "k0",
1562 "k1",
1563 "v0",
1564 "v1",
1565 "ts",
1566 "__primary_key",
1567 "__sequence",
1568 "__op_type"
1569 ]
1570 );
1571 }
1572
1573 #[test]
1574 fn test_bulk_part_converter_sorting() {
1575 let metadata = metadata_for_test();
1576 let capacity = 100;
1577 let primary_key_codec = build_primary_key_codec(&metadata);
1578 let schema = to_flat_sst_arrow_schema(
1579 &metadata,
1580 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1581 );
1582
1583 let mut converter =
1584 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1585
1586 let key_values1 = build_key_values_with_ts_seq_values(
1587 &metadata,
1588 "z_key".to_string(),
1589 3u32,
1590 vec![3000].into_iter(),
1591 vec![Some(3.0)].into_iter(),
1592 3,
1593 );
1594
1595 let key_values2 = build_key_values_with_ts_seq_values(
1596 &metadata,
1597 "a_key".to_string(),
1598 1u32,
1599 vec![1000].into_iter(),
1600 vec![Some(1.0)].into_iter(),
1601 1,
1602 );
1603
1604 let key_values3 = build_key_values_with_ts_seq_values(
1605 &metadata,
1606 "m_key".to_string(),
1607 2u32,
1608 vec![2000].into_iter(),
1609 vec![Some(2.0)].into_iter(),
1610 2,
1611 );
1612
1613 converter.append_key_values(&key_values1).unwrap();
1614 converter.append_key_values(&key_values2).unwrap();
1615 converter.append_key_values(&key_values3).unwrap();
1616
1617 let bulk_part = converter.convert().unwrap();
1618
1619 assert_eq!(bulk_part.num_rows(), 3);
1620
1621 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1622 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1623
1624 let ts_array = ts_column
1625 .as_any()
1626 .downcast_ref::<TimestampMillisecondArray>()
1627 .unwrap();
1628 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1629
1630 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1631 assert_eq!(seq_array.values(), &[1, 2, 3]);
1632
1633 let schema = bulk_part.batch.schema();
1635 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1636 assert_eq!(
1637 field_names,
1638 vec![
1639 "k0",
1640 "k1",
1641 "v0",
1642 "v1",
1643 "ts",
1644 "__primary_key",
1645 "__sequence",
1646 "__op_type"
1647 ]
1648 );
1649 }
1650
1651 #[test]
1652 fn test_bulk_part_converter_empty() {
1653 let metadata = metadata_for_test();
1654 let capacity = 10;
1655 let primary_key_codec = build_primary_key_codec(&metadata);
1656 let schema = to_flat_sst_arrow_schema(
1657 &metadata,
1658 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1659 );
1660
1661 let converter =
1662 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1663
1664 let bulk_part = converter.convert().unwrap();
1665
1666 assert_eq!(bulk_part.num_rows(), 0);
1667 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1668 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1669 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1670
1671 let schema = bulk_part.batch.schema();
1673 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1674 assert_eq!(
1675 field_names,
1676 vec![
1677 "k0",
1678 "k1",
1679 "v0",
1680 "v1",
1681 "ts",
1682 "__primary_key",
1683 "__sequence",
1684 "__op_type"
1685 ]
1686 );
1687 }
1688
1689 #[test]
1690 fn test_bulk_part_converter_without_primary_key_columns() {
1691 let metadata = metadata_for_test();
1692 let primary_key_codec = build_primary_key_codec(&metadata);
1693 let schema = to_flat_sst_arrow_schema(
1694 &metadata,
1695 &FlatSchemaOptions {
1696 raw_pk_columns: false,
1697 string_pk_use_dict: true,
1698 },
1699 );
1700
1701 let capacity = 100;
1702 let mut converter =
1703 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1704
1705 let key_values1 = build_key_values_with_ts_seq_values(
1706 &metadata,
1707 "key1".to_string(),
1708 1u32,
1709 vec![1000, 2000].into_iter(),
1710 vec![Some(1.0), Some(2.0)].into_iter(),
1711 1,
1712 );
1713
1714 let key_values2 = build_key_values_with_ts_seq_values(
1715 &metadata,
1716 "key2".to_string(),
1717 2u32,
1718 vec![1500].into_iter(),
1719 vec![Some(3.0)].into_iter(),
1720 2,
1721 );
1722
1723 converter.append_key_values(&key_values1).unwrap();
1724 converter.append_key_values(&key_values2).unwrap();
1725
1726 let bulk_part = converter.convert().unwrap();
1727
1728 assert_eq!(bulk_part.num_rows(), 3);
1729 assert_eq!(bulk_part.min_timestamp, 1000);
1730 assert_eq!(bulk_part.max_timestamp, 2000);
1731 assert_eq!(bulk_part.sequence, 2);
1732 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1733
1734 let schema = bulk_part.batch.schema();
1736 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1737 assert_eq!(
1738 field_names,
1739 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1740 );
1741 }
1742
1743 #[allow(clippy::too_many_arguments)]
1744 fn build_key_values_with_sparse_encoding(
1745 metadata: &RegionMetadataRef,
1746 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1747 table_id: u32,
1748 tsid: u64,
1749 k0: String,
1750 k1: String,
1751 timestamps: impl Iterator<Item = i64>,
1752 values: impl Iterator<Item = Option<f64>>,
1753 sequence: SequenceNumber,
1754 ) -> KeyValues {
1755 let pk_values = vec![
1757 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1758 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1759 (0, Value::String(k0.clone().into())),
1760 (1, Value::String(k1.clone().into())),
1761 ];
1762 let mut encoded_key = Vec::new();
1763 primary_key_codec
1764 .encode_values(&pk_values, &mut encoded_key)
1765 .unwrap();
1766 assert!(!encoded_key.is_empty());
1767
1768 let column_schema = vec![
1770 api::v1::ColumnSchema {
1771 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1772 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1773 ConcreteDataType::binary_datatype(),
1774 )
1775 .unwrap()
1776 .datatype() as i32,
1777 semantic_type: api::v1::SemanticType::Tag as i32,
1778 ..Default::default()
1779 },
1780 api::v1::ColumnSchema {
1781 column_name: "ts".to_string(),
1782 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1783 ConcreteDataType::timestamp_millisecond_datatype(),
1784 )
1785 .unwrap()
1786 .datatype() as i32,
1787 semantic_type: api::v1::SemanticType::Timestamp as i32,
1788 ..Default::default()
1789 },
1790 api::v1::ColumnSchema {
1791 column_name: "v0".to_string(),
1792 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1793 ConcreteDataType::int64_datatype(),
1794 )
1795 .unwrap()
1796 .datatype() as i32,
1797 semantic_type: api::v1::SemanticType::Field as i32,
1798 ..Default::default()
1799 },
1800 api::v1::ColumnSchema {
1801 column_name: "v1".to_string(),
1802 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1803 ConcreteDataType::float64_datatype(),
1804 )
1805 .unwrap()
1806 .datatype() as i32,
1807 semantic_type: api::v1::SemanticType::Field as i32,
1808 ..Default::default()
1809 },
1810 ];
1811
1812 let rows = timestamps
1813 .zip(values)
1814 .map(|(ts, v)| Row {
1815 values: vec![
1816 api::v1::Value {
1817 value_data: Some(api::v1::value::ValueData::BinaryValue(
1818 encoded_key.clone(),
1819 )),
1820 },
1821 api::v1::Value {
1822 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1823 },
1824 api::v1::Value {
1825 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1826 },
1827 api::v1::Value {
1828 value_data: v.map(api::v1::value::ValueData::F64Value),
1829 },
1830 ],
1831 })
1832 .collect();
1833
1834 let mutation = api::v1::Mutation {
1835 op_type: 1,
1836 sequence,
1837 rows: Some(api::v1::Rows {
1838 schema: column_schema,
1839 rows,
1840 }),
1841 write_hint: Some(WriteHint {
1842 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1843 }),
1844 };
1845 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1846 }
1847
1848 #[test]
1849 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1850 use api::v1::SemanticType;
1851 use datatypes::schema::ColumnSchema;
1852 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1853 use store_api::storage::RegionId;
1854
1855 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1856 builder
1857 .push_column_metadata(ColumnMetadata {
1858 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1859 semantic_type: SemanticType::Tag,
1860 column_id: 0,
1861 })
1862 .push_column_metadata(ColumnMetadata {
1863 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1864 semantic_type: SemanticType::Tag,
1865 column_id: 1,
1866 })
1867 .push_column_metadata(ColumnMetadata {
1868 column_schema: ColumnSchema::new(
1869 "ts",
1870 ConcreteDataType::timestamp_millisecond_datatype(),
1871 false,
1872 ),
1873 semantic_type: SemanticType::Timestamp,
1874 column_id: 2,
1875 })
1876 .push_column_metadata(ColumnMetadata {
1877 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1878 semantic_type: SemanticType::Field,
1879 column_id: 3,
1880 })
1881 .push_column_metadata(ColumnMetadata {
1882 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1883 semantic_type: SemanticType::Field,
1884 column_id: 4,
1885 })
1886 .primary_key(vec![0, 1])
1887 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1888 let metadata = Arc::new(builder.build().unwrap());
1889
1890 let primary_key_codec = build_primary_key_codec(&metadata);
1891 let schema = to_flat_sst_arrow_schema(
1892 &metadata,
1893 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1894 );
1895
1896 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1897 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1898
1899 let capacity = 100;
1900 let mut converter =
1901 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1902
1903 let key_values1 = build_key_values_with_sparse_encoding(
1904 &metadata,
1905 &primary_key_codec,
1906 2048u32, 100u64, "key11".to_string(),
1909 "key21".to_string(),
1910 vec![1000, 2000].into_iter(),
1911 vec![Some(1.0), Some(2.0)].into_iter(),
1912 1,
1913 );
1914
1915 let key_values2 = build_key_values_with_sparse_encoding(
1916 &metadata,
1917 &primary_key_codec,
1918 4096u32, 200u64, "key12".to_string(),
1921 "key22".to_string(),
1922 vec![1500].into_iter(),
1923 vec![Some(3.0)].into_iter(),
1924 2,
1925 );
1926
1927 converter.append_key_values(&key_values1).unwrap();
1928 converter.append_key_values(&key_values2).unwrap();
1929
1930 let bulk_part = converter.convert().unwrap();
1931
1932 assert_eq!(bulk_part.num_rows(), 3);
1933 assert_eq!(bulk_part.min_timestamp, 1000);
1934 assert_eq!(bulk_part.max_timestamp, 2000);
1935 assert_eq!(bulk_part.sequence, 2);
1936 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1937
1938 let schema = bulk_part.batch.schema();
1942 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1943 assert_eq!(
1944 field_names,
1945 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1946 );
1947
1948 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1950 let dict_array = primary_key_column
1951 .as_any()
1952 .downcast_ref::<DictionaryArray<UInt32Type>>()
1953 .unwrap();
1954
1955 assert!(!dict_array.is_empty());
1957 assert_eq!(dict_array.len(), 3); let values = dict_array
1961 .values()
1962 .as_any()
1963 .downcast_ref::<BinaryArray>()
1964 .unwrap();
1965 for i in 0..values.len() {
1966 assert!(
1967 !values.value(i).is_empty(),
1968 "Encoded primary key should not be empty"
1969 );
1970 }
1971 }
1972}