1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber};
57use table::predicate::Predicate;
58
59use crate::error::{
60 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61 EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::memtable::bulk::context::BulkIterContextRef;
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::flat_format::primary_key_column_index;
69use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
70use crate::sst::parquet::helper::parse_parquet_metadata;
71use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
72use crate::sst::to_sst_arrow_schema;
73
74const INIT_DICT_VALUE_CAPACITY: usize = 8;
75
76#[derive(Clone)]
78pub struct BulkPart {
79 pub batch: RecordBatch,
80 pub max_timestamp: i64,
81 pub min_timestamp: i64,
82 pub sequence: u64,
83 pub timestamp_index: usize,
84 pub raw_data: Option<ArrowIpc>,
85}
86
87impl TryFrom<BulkWalEntry> for BulkPart {
88 type Error = error::Error;
89
90 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
91 match value.body.expect("Entry payload should be present") {
92 Body::ArrowIpc(ipc) => {
93 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
94 .context(error::ConvertBulkWalEntrySnafu)?;
95 let batch = decoder
96 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
97 .context(error::ConvertBulkWalEntrySnafu)?;
98 Ok(Self {
99 batch,
100 max_timestamp: value.max_ts,
101 min_timestamp: value.min_ts,
102 sequence: value.sequence,
103 timestamp_index: value.timestamp_index as usize,
104 raw_data: Some(ipc),
105 })
106 }
107 }
108 }
109}
110
111impl TryFrom<&BulkPart> for BulkWalEntry {
112 type Error = error::Error;
113
114 fn try_from(value: &BulkPart) -> Result<Self> {
115 if let Some(ipc) = &value.raw_data {
116 Ok(BulkWalEntry {
117 sequence: value.sequence,
118 max_ts: value.max_timestamp,
119 min_ts: value.min_timestamp,
120 timestamp_index: value.timestamp_index as u32,
121 body: Some(Body::ArrowIpc(ipc.clone())),
122 })
123 } else {
124 let mut encoder = FlightEncoder::default();
125 let schema_bytes = encoder
126 .encode_schema(value.batch.schema().as_ref())
127 .data_header;
128 let [rb_data] = encoder
129 .encode(FlightMessage::RecordBatch(value.batch.clone()))
130 .try_into()
131 .map_err(|_| {
132 error::UnsupportedOperationSnafu {
133 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
134 }
135 .build()
136 })?;
137 Ok(BulkWalEntry {
138 sequence: value.sequence,
139 max_ts: value.max_timestamp,
140 min_ts: value.min_timestamp,
141 timestamp_index: value.timestamp_index as u32,
142 body: Some(Body::ArrowIpc(ArrowIpc {
143 schema: schema_bytes,
144 data_header: rb_data.data_header,
145 payload: rb_data.data_body,
146 })),
147 })
148 }
149 }
150}
151
152impl BulkPart {
153 pub(crate) fn estimated_size(&self) -> usize {
154 record_batch_estimated_size(&self.batch)
155 }
156
157 pub fn estimated_series_count(&self) -> usize {
160 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
161 let pk_column = self.batch.column(pk_column_idx);
162 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
163 dict_array.values().len()
164 } else {
165 0
166 }
167 }
168
169 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
171 let vectors = region_metadata
172 .schema
173 .column_schemas()
174 .iter()
175 .map(|col| match self.batch.column_by_name(&col.name) {
176 None => Ok(None),
177 Some(col) => Helper::try_into_vector(col).map(Some),
178 })
179 .collect::<datatypes::error::Result<Vec<_>>>()
180 .context(error::ComputeVectorSnafu)?;
181
182 let rows = (0..self.num_rows())
183 .map(|row_idx| {
184 let values = (0..self.batch.num_columns())
185 .map(|col_idx| {
186 if let Some(v) = &vectors[col_idx] {
187 value_to_grpc_value(v.get(row_idx))
188 } else {
189 api::v1::Value { value_data: None }
190 }
191 })
192 .collect::<Vec<_>>();
193 api::v1::Row { values }
194 })
195 .collect::<Vec<_>>();
196
197 let schema = region_metadata
198 .column_metadatas
199 .iter()
200 .map(|c| {
201 let data_type_wrapper =
202 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
203 Ok(api::v1::ColumnSchema {
204 column_name: c.column_schema.name.clone(),
205 datatype: data_type_wrapper.datatype() as i32,
206 semantic_type: c.semantic_type as i32,
207 ..Default::default()
208 })
209 })
210 .collect::<api::error::Result<Vec<_>>>()
211 .context(error::ConvertColumnDataTypeSnafu {
212 reason: "failed to convert region metadata to column schema",
213 })?;
214
215 let rows = api::v1::Rows { schema, rows };
216
217 Ok(Mutation {
218 op_type: OpType::Put as i32,
219 sequence: self.sequence,
220 rows: Some(rows),
221 write_hint: None,
222 })
223 }
224
225 pub fn timestamps(&self) -> &ArrayRef {
226 self.batch.column(self.timestamp_index)
227 }
228
229 pub fn num_rows(&self) -> usize {
230 self.batch.num_rows()
231 }
232}
233
234pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
236 batch
237 .columns()
238 .iter()
239 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
241 .sum()
242}
243
244enum PrimaryKeyColumnBuilder {
246 StringDict(StringDictionaryBuilder<UInt32Type>),
248 Vector(Box<dyn MutableVector>),
250}
251
252impl PrimaryKeyColumnBuilder {
253 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
255 match self {
256 PrimaryKeyColumnBuilder::StringDict(builder) => {
257 if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
258 builder.append_value(s);
260 } else {
261 builder.append_null();
262 }
263 }
264 PrimaryKeyColumnBuilder::Vector(builder) => {
265 builder.push_value_ref(&value);
266 }
267 }
268 Ok(())
269 }
270
271 fn into_arrow_array(self) -> ArrayRef {
273 match self {
274 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
275 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
276 }
277 }
278}
279
280pub struct BulkPartConverter {
282 region_metadata: RegionMetadataRef,
284 schema: SchemaRef,
286 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
288 key_buf: Vec<u8>,
290 key_array_builder: PrimaryKeyArrayBuilder,
292 value_builder: ValueBuilder,
294 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
297
298 max_ts: i64,
300 min_ts: i64,
302 max_sequence: SequenceNumber,
304}
305
306impl BulkPartConverter {
307 pub fn new(
312 region_metadata: &RegionMetadataRef,
313 schema: SchemaRef,
314 capacity: usize,
315 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
316 store_primary_key_columns: bool,
317 ) -> Self {
318 debug_assert_eq!(
319 region_metadata.primary_key_encoding,
320 primary_key_codec.encoding()
321 );
322
323 let primary_key_column_builders = if store_primary_key_columns
324 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
325 {
326 new_primary_key_column_builders(region_metadata, capacity)
327 } else {
328 Vec::new()
329 };
330
331 Self {
332 region_metadata: region_metadata.clone(),
333 schema,
334 primary_key_codec,
335 key_buf: Vec::new(),
336 key_array_builder: PrimaryKeyArrayBuilder::new(),
337 value_builder: ValueBuilder::new(region_metadata, capacity),
338 primary_key_column_builders,
339 min_ts: i64::MAX,
340 max_ts: i64::MIN,
341 max_sequence: SequenceNumber::MIN,
342 }
343 }
344
345 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
347 for kv in key_values.iter() {
348 self.append_key_value(&kv)?;
349 }
350
351 Ok(())
352 }
353
354 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
358 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
360 let mut primary_keys = kv.primary_keys();
363 if let Some(encoded) = primary_keys
364 .next()
365 .context(ColumnNotFoundSnafu {
366 column: PRIMARY_KEY_COLUMN_NAME,
367 })?
368 .as_binary()
369 .context(DataTypeMismatchSnafu)?
370 {
371 self.key_array_builder
372 .append(encoded)
373 .context(ComputeArrowSnafu)?;
374 } else {
375 self.key_array_builder
376 .append("")
377 .context(ComputeArrowSnafu)?;
378 }
379 } else {
380 self.key_buf.clear();
382 self.primary_key_codec
383 .encode_key_value(kv, &mut self.key_buf)
384 .context(EncodeSnafu)?;
385 self.key_array_builder
386 .append(&self.key_buf)
387 .context(ComputeArrowSnafu)?;
388 };
389
390 if !self.primary_key_column_builders.is_empty() {
392 for (builder, pk_value) in self
393 .primary_key_column_builders
394 .iter_mut()
395 .zip(kv.primary_keys())
396 {
397 builder.push_value_ref(pk_value)?;
398 }
399 }
400
401 self.value_builder.push(
403 kv.timestamp(),
404 kv.sequence(),
405 kv.op_type() as u8,
406 kv.fields(),
407 );
408
409 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
412 self.min_ts = self.min_ts.min(ts);
413 self.max_ts = self.max_ts.max(ts);
414 self.max_sequence = self.max_sequence.max(kv.sequence());
415
416 Ok(())
417 }
418
419 pub fn convert(mut self) -> Result<BulkPart> {
423 let values = Values::from(self.value_builder);
424 let mut columns =
425 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
426
427 for builder in self.primary_key_column_builders {
429 columns.push(builder.into_arrow_array());
430 }
431 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
433 let timestamp_index = columns.len();
435 columns.push(values.timestamp.to_arrow_array());
436 let pk_array = self.key_array_builder.finish();
438 columns.push(Arc::new(pk_array));
439 columns.push(values.sequence.to_arrow_array());
441 columns.push(values.op_type.to_arrow_array());
442
443 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
444 let batch = sort_primary_key_record_batch(&batch)?;
446
447 Ok(BulkPart {
448 batch,
449 max_timestamp: self.max_ts,
450 min_timestamp: self.min_ts,
451 sequence: self.max_sequence,
452 timestamp_index,
453 raw_data: None,
454 })
455 }
456}
457
458fn new_primary_key_column_builders(
459 metadata: &RegionMetadata,
460 capacity: usize,
461) -> Vec<PrimaryKeyColumnBuilder> {
462 metadata
463 .primary_key_columns()
464 .map(|col| {
465 if col.column_schema.data_type.is_string() {
466 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
467 capacity,
468 INIT_DICT_VALUE_CAPACITY,
469 capacity,
470 ))
471 } else {
472 PrimaryKeyColumnBuilder::Vector(
473 col.column_schema.data_type.create_mutable_vector(capacity),
474 )
475 }
476 })
477 .collect()
478}
479
480fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
482 let total_columns = batch.num_columns();
483 let sort_columns = vec![
484 SortColumn {
486 values: batch.column(total_columns - 3).clone(),
487 options: Some(SortOptions {
488 descending: false,
489 nulls_first: true,
490 }),
491 },
492 SortColumn {
494 values: batch.column(total_columns - 4).clone(),
495 options: Some(SortOptions {
496 descending: false,
497 nulls_first: true,
498 }),
499 },
500 SortColumn {
502 values: batch.column(total_columns - 2).clone(),
503 options: Some(SortOptions {
504 descending: true,
505 nulls_first: true,
506 }),
507 },
508 ];
509
510 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
511 .context(ComputeArrowSnafu)?;
512
513 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
514}
515
516#[derive(Debug, Clone)]
517pub struct EncodedBulkPart {
518 data: Bytes,
519 metadata: BulkPartMeta,
520}
521
522impl EncodedBulkPart {
523 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
524 Self { data, metadata }
525 }
526
527 pub(crate) fn metadata(&self) -> &BulkPartMeta {
528 &self.metadata
529 }
530
531 pub(crate) fn size_bytes(&self) -> usize {
533 self.data.len()
534 }
535
536 pub(crate) fn data(&self) -> &Bytes {
538 &self.data
539 }
540
541 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
549 let unit = self.metadata.region_metadata.time_index_type().unit();
550 SstInfo {
551 file_id,
552 time_range: (
553 Timestamp::new(self.metadata.min_timestamp, unit),
554 Timestamp::new(self.metadata.max_timestamp, unit),
555 ),
556 file_size: self.data.len() as u64,
557 num_rows: self.metadata.num_rows,
558 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
559 file_metadata: Some(self.metadata.parquet_metadata.clone()),
560 index_metadata: IndexOutput::default(),
561 }
562 }
563
564 pub(crate) fn read(
565 &self,
566 context: BulkIterContextRef,
567 sequence: Option<SequenceNumber>,
568 ) -> Result<Option<BoxedRecordBatchIterator>> {
569 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
571
572 if row_groups_to_read.is_empty() {
573 return Ok(None);
575 }
576
577 let iter = EncodedBulkPartIter::try_new(
578 context,
579 row_groups_to_read,
580 self.metadata.parquet_metadata.clone(),
581 self.data.clone(),
582 sequence,
583 )?;
584 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
585 }
586}
587
588#[derive(Debug, Clone)]
589pub struct BulkPartMeta {
590 pub num_rows: usize,
592 pub max_timestamp: i64,
594 pub min_timestamp: i64,
596 pub parquet_metadata: Arc<ParquetMetaData>,
598 pub region_metadata: RegionMetadataRef,
600}
601
602#[derive(Default, Debug)]
604pub struct BulkPartEncodeMetrics {
605 pub iter_cost: Duration,
607 pub write_cost: Duration,
609 pub raw_size: usize,
611 pub encoded_size: usize,
613 pub num_rows: usize,
615}
616
617pub struct BulkPartEncoder {
618 metadata: RegionMetadataRef,
619 row_group_size: usize,
620 writer_props: Option<WriterProperties>,
621}
622
623impl BulkPartEncoder {
624 pub(crate) fn new(
625 metadata: RegionMetadataRef,
626 row_group_size: usize,
627 ) -> Result<BulkPartEncoder> {
628 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
630 let key_value_meta =
631 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
632
633 let writer_props = Some(
635 WriterProperties::builder()
636 .set_key_value_metadata(Some(vec![key_value_meta]))
637 .set_write_batch_size(row_group_size)
638 .set_max_row_group_size(row_group_size)
639 .set_compression(Compression::ZSTD(ZstdLevel::default()))
640 .set_column_index_truncate_length(None)
641 .set_statistics_truncate_length(None)
642 .build(),
643 );
644
645 Ok(Self {
646 metadata,
647 row_group_size,
648 writer_props,
649 })
650 }
651}
652
653impl BulkPartEncoder {
654 pub fn encode_record_batch_iter(
656 &self,
657 iter: BoxedRecordBatchIterator,
658 arrow_schema: SchemaRef,
659 min_timestamp: i64,
660 max_timestamp: i64,
661 metrics: &mut BulkPartEncodeMetrics,
662 ) -> Result<Option<EncodedBulkPart>> {
663 let mut buf = Vec::with_capacity(4096);
664 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
665 .context(EncodeMemtableSnafu)?;
666 let mut total_rows = 0;
667
668 let mut iter_start = Instant::now();
670 for batch_result in iter {
671 metrics.iter_cost += iter_start.elapsed();
672 let batch = batch_result?;
673 if batch.num_rows() == 0 {
674 continue;
675 }
676
677 metrics.raw_size += record_batch_estimated_size(&batch);
678 let write_start = Instant::now();
679 writer.write(&batch).context(EncodeMemtableSnafu)?;
680 metrics.write_cost += write_start.elapsed();
681 total_rows += batch.num_rows();
682 iter_start = Instant::now();
683 }
684 metrics.iter_cost += iter_start.elapsed();
685 iter_start = Instant::now();
686
687 if total_rows == 0 {
688 return Ok(None);
689 }
690
691 let close_start = Instant::now();
692 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
693 metrics.write_cost += close_start.elapsed();
694 metrics.encoded_size += buf.len();
695 metrics.num_rows += total_rows;
696
697 let buf = Bytes::from(buf);
698 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
699
700 Ok(Some(EncodedBulkPart {
701 data: buf,
702 metadata: BulkPartMeta {
703 num_rows: total_rows,
704 max_timestamp,
705 min_timestamp,
706 parquet_metadata,
707 region_metadata: self.metadata.clone(),
708 },
709 }))
710 }
711
712 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
714 if part.batch.num_rows() == 0 {
715 return Ok(None);
716 }
717
718 let mut buf = Vec::with_capacity(4096);
719 let arrow_schema = part.batch.schema();
720
721 let file_metadata = {
722 let mut writer =
723 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
724 .context(EncodeMemtableSnafu)?;
725 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
726 writer.finish().context(EncodeMemtableSnafu)?
727 };
728
729 let buf = Bytes::from(buf);
730 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
731
732 Ok(Some(EncodedBulkPart {
733 data: buf,
734 metadata: BulkPartMeta {
735 num_rows: part.batch.num_rows(),
736 max_timestamp: part.max_timestamp,
737 min_timestamp: part.min_timestamp,
738 parquet_metadata,
739 region_metadata: self.metadata.clone(),
740 },
741 }))
742 }
743}
744
745fn mutations_to_record_batch(
747 mutations: &[Mutation],
748 metadata: &RegionMetadataRef,
749 pk_encoder: &DensePrimaryKeyCodec,
750 dedup: bool,
751) -> Result<Option<(RecordBatch, i64, i64)>> {
752 let total_rows: usize = mutations
753 .iter()
754 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
755 .sum();
756
757 if total_rows == 0 {
758 return Ok(None);
759 }
760
761 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
762
763 let mut ts_vector: Box<dyn MutableVector> = metadata
764 .time_index_column()
765 .column_schema
766 .data_type
767 .create_mutable_vector(total_rows);
768 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
769 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
770
771 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
772 .field_columns()
773 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
774 .collect();
775
776 let mut pk_buffer = vec![];
777 for m in mutations {
778 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
779 continue;
780 };
781
782 for row in key_values.iter() {
783 pk_buffer.clear();
784 pk_encoder
785 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
786 .context(EncodeSnafu)?;
787 pk_builder.append_value(pk_buffer.as_bytes());
788 ts_vector.push_value_ref(&row.timestamp());
789 sequence_builder.append_value(row.sequence());
790 op_type_builder.append_value(row.op_type() as u8);
791 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
792 builder.push_value_ref(&field);
793 }
794 }
795 }
796
797 let arrow_schema = to_sst_arrow_schema(metadata);
798 let timestamp_unit = metadata
800 .time_index_column()
801 .column_schema
802 .data_type
803 .as_timestamp()
804 .unwrap()
805 .unit();
806 let sorter = ArraysSorter {
807 encoded_primary_keys: pk_builder.finish(),
808 timestamp_unit,
809 timestamp: ts_vector.to_vector().to_arrow_array(),
810 sequence: sequence_builder.finish(),
811 op_type: op_type_builder.finish(),
812 fields: field_builders
813 .iter_mut()
814 .map(|f| f.to_vector().to_arrow_array()),
815 dedup,
816 arrow_schema,
817 };
818
819 sorter.sort().map(Some)
820}
821
822struct ArraysSorter<I> {
823 encoded_primary_keys: BinaryArray,
824 timestamp_unit: TimeUnit,
825 timestamp: ArrayRef,
826 sequence: UInt64Array,
827 op_type: UInt8Array,
828 fields: I,
829 dedup: bool,
830 arrow_schema: SchemaRef,
831}
832
833impl<I> ArraysSorter<I>
834where
835 I: Iterator<Item = ArrayRef>,
836{
837 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
839 debug_assert!(!self.timestamp.is_empty());
840 debug_assert!(self.timestamp.len() == self.sequence.len());
841 debug_assert!(self.timestamp.len() == self.op_type.len());
842 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
843
844 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
845 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
846 let mut to_sort = self
847 .encoded_primary_keys
848 .iter()
849 .zip(timestamp_iter)
850 .zip(self.sequence.iter())
851 .map(|((pk, timestamp), sequence)| {
852 max_timestamp = max_timestamp.max(*timestamp);
853 min_timestamp = min_timestamp.min(*timestamp);
854 (pk, timestamp, sequence)
855 })
856 .enumerate()
857 .collect::<Vec<_>>();
858
859 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
860 l_pk.cmp(r_pk)
861 .then(l_ts.cmp(r_ts))
862 .then(l_seq.cmp(r_seq).reverse())
863 });
864
865 if self.dedup {
866 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
868 l_pk == r_pk && l_ts == r_ts
869 });
870 }
871
872 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
873
874 let pk_dictionary = Arc::new(binary_array_to_dictionary(
875 arrow::compute::take(
877 &self.encoded_primary_keys,
878 &indices,
879 Some(TakeOptions {
880 check_bounds: false,
881 }),
882 )
883 .context(ComputeArrowSnafu)?
884 .as_any()
885 .downcast_ref::<BinaryArray>()
886 .unwrap(),
887 )?) as ArrayRef;
888
889 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
890 for arr in self.fields {
891 arrays.push(
892 arrow::compute::take(
893 &arr,
894 &indices,
895 Some(TakeOptions {
896 check_bounds: false,
897 }),
898 )
899 .context(ComputeArrowSnafu)?,
900 );
901 }
902
903 let timestamp = arrow::compute::take(
904 &self.timestamp,
905 &indices,
906 Some(TakeOptions {
907 check_bounds: false,
908 }),
909 )
910 .context(ComputeArrowSnafu)?;
911
912 arrays.push(timestamp);
913 arrays.push(pk_dictionary);
914 arrays.push(
915 arrow::compute::take(
916 &self.sequence,
917 &indices,
918 Some(TakeOptions {
919 check_bounds: false,
920 }),
921 )
922 .context(ComputeArrowSnafu)?,
923 );
924
925 arrays.push(
926 arrow::compute::take(
927 &self.op_type,
928 &indices,
929 Some(TakeOptions {
930 check_bounds: false,
931 }),
932 )
933 .context(ComputeArrowSnafu)?,
934 );
935
936 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
937 Ok((batch, min_timestamp, max_timestamp))
938 }
939}
940
941fn timestamp_array_to_iter(
943 timestamp_unit: TimeUnit,
944 timestamp: &ArrayRef,
945) -> impl Iterator<Item = &i64> {
946 match timestamp_unit {
947 TimeUnit::Second => timestamp
949 .as_any()
950 .downcast_ref::<TimestampSecondArray>()
951 .unwrap()
952 .values()
953 .iter(),
954 TimeUnit::Millisecond => timestamp
955 .as_any()
956 .downcast_ref::<TimestampMillisecondArray>()
957 .unwrap()
958 .values()
959 .iter(),
960 TimeUnit::Microsecond => timestamp
961 .as_any()
962 .downcast_ref::<TimestampMicrosecondArray>()
963 .unwrap()
964 .values()
965 .iter(),
966 TimeUnit::Nanosecond => timestamp
967 .as_any()
968 .downcast_ref::<TimestampNanosecondArray>()
969 .unwrap()
970 .values()
971 .iter(),
972 }
973}
974
975fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
977 if input.is_empty() {
978 return Ok(DictionaryArray::new(
979 UInt32Array::from(Vec::<u32>::new()),
980 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
981 ));
982 }
983 let mut keys = Vec::with_capacity(16);
984 let mut values = BinaryBuilder::new();
985 let mut prev: usize = 0;
986 keys.push(prev as u32);
987 values.append_value(input.value(prev));
988
989 for current_bytes in input.iter().skip(1) {
990 let current_bytes = current_bytes.unwrap();
992 let prev_bytes = input.value(prev);
993 if current_bytes != prev_bytes {
994 values.append_value(current_bytes);
995 prev += 1;
996 }
997 keys.push(prev as u32);
998 }
999
1000 Ok(DictionaryArray::new(
1001 UInt32Array::from(keys),
1002 Arc::new(values.finish()) as ArrayRef,
1003 ))
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use std::collections::VecDeque;
1009
1010 use api::v1::{Row, WriteHint};
1011 use datafusion_common::ScalarValue;
1012 use datatypes::arrow::array::Float64Array;
1013 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1014 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1015 use store_api::storage::consts::ReservedColumnId;
1016
1017 use super::*;
1018 use crate::memtable::bulk::context::BulkIterContext;
1019 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1020 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1021 use crate::test_util::memtable_util::{
1022 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1023 };
1024
1025 fn check_binary_array_to_dictionary(
1026 input: &[&[u8]],
1027 expected_keys: &[u32],
1028 expected_values: &[&[u8]],
1029 ) {
1030 let input = BinaryArray::from_iter_values(input.iter());
1031 let array = binary_array_to_dictionary(&input).unwrap();
1032 assert_eq!(
1033 &expected_keys,
1034 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1035 );
1036 assert_eq!(
1037 expected_values,
1038 &array
1039 .values()
1040 .as_any()
1041 .downcast_ref::<BinaryArray>()
1042 .unwrap()
1043 .iter()
1044 .map(|v| v.unwrap())
1045 .collect::<Vec<_>>()
1046 );
1047 }
1048
1049 #[test]
1050 fn test_binary_array_to_dictionary() {
1051 check_binary_array_to_dictionary(&[], &[], &[]);
1052
1053 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1054
1055 check_binary_array_to_dictionary(
1056 &["a".as_bytes(), "a".as_bytes()],
1057 &[0, 0],
1058 &["a".as_bytes()],
1059 );
1060
1061 check_binary_array_to_dictionary(
1062 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1063 &[0, 0, 1],
1064 &["a".as_bytes(), "b".as_bytes()],
1065 );
1066
1067 check_binary_array_to_dictionary(
1068 &[
1069 "a".as_bytes(),
1070 "a".as_bytes(),
1071 "b".as_bytes(),
1072 "c".as_bytes(),
1073 ],
1074 &[0, 0, 1, 2],
1075 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1076 );
1077 }
1078
1079 struct MutationInput<'a> {
1080 k0: &'a str,
1081 k1: u32,
1082 timestamps: &'a [i64],
1083 v1: &'a [Option<f64>],
1084 sequence: u64,
1085 }
1086
1087 #[derive(Debug, PartialOrd, PartialEq)]
1088 struct BatchOutput<'a> {
1089 pk_values: &'a [Value],
1090 timestamps: &'a [i64],
1091 v1: &'a [Option<f64>],
1092 }
1093
1094 fn check_mutations_to_record_batches(
1095 input: &[MutationInput],
1096 expected: &[BatchOutput],
1097 expected_timestamp: (i64, i64),
1098 dedup: bool,
1099 ) {
1100 let metadata = metadata_for_test();
1101 let mutations = input
1102 .iter()
1103 .map(|m| {
1104 build_key_values_with_ts_seq_values(
1105 &metadata,
1106 m.k0.to_string(),
1107 m.k1,
1108 m.timestamps.iter().copied(),
1109 m.v1.iter().copied(),
1110 m.sequence,
1111 )
1112 .mutation
1113 })
1114 .collect::<Vec<_>>();
1115 let total_rows: usize = mutations
1116 .iter()
1117 .flat_map(|m| m.rows.iter())
1118 .map(|r| r.rows.len())
1119 .sum();
1120
1121 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1122
1123 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1124 .unwrap()
1125 .unwrap();
1126 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1127 let mut batches = VecDeque::new();
1128 read_format
1129 .convert_record_batch(&batch, None, &mut batches)
1130 .unwrap();
1131 if !dedup {
1132 assert_eq!(
1133 total_rows,
1134 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1135 );
1136 }
1137 let batch_values = batches
1138 .into_iter()
1139 .map(|b| {
1140 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1141 let timestamps = b
1142 .timestamps()
1143 .as_any()
1144 .downcast_ref::<TimestampMillisecondVector>()
1145 .unwrap()
1146 .iter_data()
1147 .map(|v| v.unwrap().0.value())
1148 .collect::<Vec<_>>();
1149 let float_values = b.fields()[1]
1150 .data
1151 .as_any()
1152 .downcast_ref::<Float64Vector>()
1153 .unwrap()
1154 .iter_data()
1155 .collect::<Vec<_>>();
1156
1157 (pk_values, timestamps, float_values)
1158 })
1159 .collect::<Vec<_>>();
1160 assert_eq!(expected.len(), batch_values.len());
1161
1162 for idx in 0..expected.len() {
1163 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1164 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1165 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1166 }
1167 }
1168
1169 #[test]
1170 fn test_mutations_to_record_batch() {
1171 check_mutations_to_record_batches(
1172 &[MutationInput {
1173 k0: "a",
1174 k1: 0,
1175 timestamps: &[0],
1176 v1: &[Some(0.1)],
1177 sequence: 0,
1178 }],
1179 &[BatchOutput {
1180 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1181 timestamps: &[0],
1182 v1: &[Some(0.1)],
1183 }],
1184 (0, 0),
1185 true,
1186 );
1187
1188 check_mutations_to_record_batches(
1189 &[
1190 MutationInput {
1191 k0: "a",
1192 k1: 0,
1193 timestamps: &[0],
1194 v1: &[Some(0.1)],
1195 sequence: 0,
1196 },
1197 MutationInput {
1198 k0: "b",
1199 k1: 0,
1200 timestamps: &[0],
1201 v1: &[Some(0.0)],
1202 sequence: 0,
1203 },
1204 MutationInput {
1205 k0: "a",
1206 k1: 0,
1207 timestamps: &[1],
1208 v1: &[Some(0.2)],
1209 sequence: 1,
1210 },
1211 MutationInput {
1212 k0: "a",
1213 k1: 1,
1214 timestamps: &[1],
1215 v1: &[Some(0.3)],
1216 sequence: 2,
1217 },
1218 ],
1219 &[
1220 BatchOutput {
1221 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1222 timestamps: &[0, 1],
1223 v1: &[Some(0.1), Some(0.2)],
1224 },
1225 BatchOutput {
1226 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1227 timestamps: &[1],
1228 v1: &[Some(0.3)],
1229 },
1230 BatchOutput {
1231 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1232 timestamps: &[0],
1233 v1: &[Some(0.0)],
1234 },
1235 ],
1236 (0, 1),
1237 true,
1238 );
1239
1240 check_mutations_to_record_batches(
1241 &[
1242 MutationInput {
1243 k0: "a",
1244 k1: 0,
1245 timestamps: &[0],
1246 v1: &[Some(0.1)],
1247 sequence: 0,
1248 },
1249 MutationInput {
1250 k0: "b",
1251 k1: 0,
1252 timestamps: &[0],
1253 v1: &[Some(0.0)],
1254 sequence: 0,
1255 },
1256 MutationInput {
1257 k0: "a",
1258 k1: 0,
1259 timestamps: &[0],
1260 v1: &[Some(0.2)],
1261 sequence: 1,
1262 },
1263 ],
1264 &[
1265 BatchOutput {
1266 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1267 timestamps: &[0],
1268 v1: &[Some(0.2)],
1269 },
1270 BatchOutput {
1271 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1272 timestamps: &[0],
1273 v1: &[Some(0.0)],
1274 },
1275 ],
1276 (0, 0),
1277 true,
1278 );
1279 check_mutations_to_record_batches(
1280 &[
1281 MutationInput {
1282 k0: "a",
1283 k1: 0,
1284 timestamps: &[0],
1285 v1: &[Some(0.1)],
1286 sequence: 0,
1287 },
1288 MutationInput {
1289 k0: "b",
1290 k1: 0,
1291 timestamps: &[0],
1292 v1: &[Some(0.0)],
1293 sequence: 0,
1294 },
1295 MutationInput {
1296 k0: "a",
1297 k1: 0,
1298 timestamps: &[0],
1299 v1: &[Some(0.2)],
1300 sequence: 1,
1301 },
1302 ],
1303 &[
1304 BatchOutput {
1305 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1306 timestamps: &[0, 0],
1307 v1: &[Some(0.2), Some(0.1)],
1308 },
1309 BatchOutput {
1310 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1311 timestamps: &[0],
1312 v1: &[Some(0.0)],
1313 },
1314 ],
1315 (0, 0),
1316 false,
1317 );
1318 }
1319
1320 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1321 let metadata = metadata_for_test();
1322 let kvs = input
1323 .iter()
1324 .map(|m| {
1325 build_key_values_with_ts_seq_values(
1326 &metadata,
1327 m.k0.to_string(),
1328 m.k1,
1329 m.timestamps.iter().copied(),
1330 m.v1.iter().copied(),
1331 m.sequence,
1332 )
1333 })
1334 .collect::<Vec<_>>();
1335 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1336 let primary_key_codec = build_primary_key_codec(&metadata);
1337 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1338 for kv in kvs {
1339 converter.append_key_values(&kv).unwrap();
1340 }
1341 let part = converter.convert().unwrap();
1342 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1343 encoder.encode_part(&part).unwrap().unwrap()
1344 }
1345
1346 #[test]
1347 fn test_write_and_read_part_projection() {
1348 let part = encode(&[
1349 MutationInput {
1350 k0: "a",
1351 k1: 0,
1352 timestamps: &[1],
1353 v1: &[Some(0.1)],
1354 sequence: 0,
1355 },
1356 MutationInput {
1357 k0: "b",
1358 k1: 0,
1359 timestamps: &[1],
1360 v1: &[Some(0.0)],
1361 sequence: 0,
1362 },
1363 MutationInput {
1364 k0: "a",
1365 k1: 0,
1366 timestamps: &[2],
1367 v1: &[Some(0.2)],
1368 sequence: 1,
1369 },
1370 ]);
1371
1372 let projection = &[4u32];
1373 let mut reader = part
1374 .read(
1375 Arc::new(
1376 BulkIterContext::new(
1377 part.metadata.region_metadata.clone(),
1378 Some(projection.as_slice()),
1379 None,
1380 false,
1381 )
1382 .unwrap(),
1383 ),
1384 None,
1385 )
1386 .unwrap()
1387 .expect("expect at least one row group");
1388
1389 let mut total_rows_read = 0;
1390 let mut field: Vec<f64> = vec![];
1391 for res in reader {
1392 let batch = res.unwrap();
1393 assert_eq!(5, batch.num_columns());
1394 field.extend_from_slice(
1395 batch
1396 .column(0)
1397 .as_any()
1398 .downcast_ref::<Float64Array>()
1399 .unwrap()
1400 .values(),
1401 );
1402 total_rows_read += batch.num_rows();
1403 }
1404 assert_eq!(3, total_rows_read);
1405 assert_eq!(vec![0.1, 0.2, 0.0], field);
1406 }
1407
1408 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1409 let metadata = metadata_for_test();
1410 let kvs = key_values
1411 .into_iter()
1412 .map(|(k0, k1, (start, end), sequence)| {
1413 let ts = (start..end);
1414 let v1 = (start..end).map(|_| None);
1415 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1416 })
1417 .collect::<Vec<_>>();
1418 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1419 let primary_key_codec = build_primary_key_codec(&metadata);
1420 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1421 for kv in kvs {
1422 converter.append_key_values(&kv).unwrap();
1423 }
1424 let part = converter.convert().unwrap();
1425 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1426 encoder.encode_part(&part).unwrap().unwrap()
1427 }
1428
1429 fn check_prune_row_group(
1430 part: &EncodedBulkPart,
1431 predicate: Option<Predicate>,
1432 expected_rows: usize,
1433 ) {
1434 let context = Arc::new(
1435 BulkIterContext::new(
1436 part.metadata.region_metadata.clone(),
1437 None,
1438 predicate,
1439 false,
1440 )
1441 .unwrap(),
1442 );
1443 let mut reader = part
1444 .read(context, None)
1445 .unwrap()
1446 .expect("expect at least one row group");
1447 let mut total_rows_read = 0;
1448 for res in reader {
1449 let batch = res.unwrap();
1450 total_rows_read += batch.num_rows();
1451 }
1452 assert_eq!(expected_rows, total_rows_read);
1454 }
1455
1456 #[test]
1457 fn test_prune_row_groups() {
1458 let part = prepare(vec![
1459 ("a", 0, (0, 40), 1),
1460 ("a", 1, (0, 60), 1),
1461 ("b", 0, (0, 100), 2),
1462 ("b", 1, (100, 180), 3),
1463 ("b", 1, (180, 210), 4),
1464 ]);
1465
1466 let context = Arc::new(
1467 BulkIterContext::new(
1468 part.metadata.region_metadata.clone(),
1469 None,
1470 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1471 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1472 )])),
1473 false,
1474 )
1475 .unwrap(),
1476 );
1477 assert!(part.read(context, None).unwrap().is_none());
1478
1479 check_prune_row_group(&part, None, 310);
1480
1481 check_prune_row_group(
1482 &part,
1483 Some(Predicate::new(vec![
1484 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1485 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1486 ])),
1487 40,
1488 );
1489
1490 check_prune_row_group(
1491 &part,
1492 Some(Predicate::new(vec![
1493 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1494 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1495 ])),
1496 60,
1497 );
1498
1499 check_prune_row_group(
1500 &part,
1501 Some(Predicate::new(vec![
1502 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1503 ])),
1504 100,
1505 );
1506
1507 check_prune_row_group(
1508 &part,
1509 Some(Predicate::new(vec![
1510 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1511 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1512 ])),
1513 100,
1514 );
1515
1516 check_prune_row_group(
1518 &part,
1519 Some(Predicate::new(vec![
1520 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1521 ])),
1522 1,
1523 );
1524 }
1525
1526 #[test]
1527 fn test_bulk_part_converter_append_and_convert() {
1528 let metadata = metadata_for_test();
1529 let capacity = 100;
1530 let primary_key_codec = build_primary_key_codec(&metadata);
1531 let schema = to_flat_sst_arrow_schema(
1532 &metadata,
1533 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1534 );
1535
1536 let mut converter =
1537 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1538
1539 let key_values1 = build_key_values_with_ts_seq_values(
1540 &metadata,
1541 "key1".to_string(),
1542 1u32,
1543 vec![1000, 2000].into_iter(),
1544 vec![Some(1.0), Some(2.0)].into_iter(),
1545 1,
1546 );
1547
1548 let key_values2 = build_key_values_with_ts_seq_values(
1549 &metadata,
1550 "key2".to_string(),
1551 2u32,
1552 vec![1500].into_iter(),
1553 vec![Some(3.0)].into_iter(),
1554 2,
1555 );
1556
1557 converter.append_key_values(&key_values1).unwrap();
1558 converter.append_key_values(&key_values2).unwrap();
1559
1560 let bulk_part = converter.convert().unwrap();
1561
1562 assert_eq!(bulk_part.num_rows(), 3);
1563 assert_eq!(bulk_part.min_timestamp, 1000);
1564 assert_eq!(bulk_part.max_timestamp, 2000);
1565 assert_eq!(bulk_part.sequence, 2);
1566 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1567
1568 let schema = bulk_part.batch.schema();
1571 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1572 assert_eq!(
1573 field_names,
1574 vec![
1575 "k0",
1576 "k1",
1577 "v0",
1578 "v1",
1579 "ts",
1580 "__primary_key",
1581 "__sequence",
1582 "__op_type"
1583 ]
1584 );
1585 }
1586
1587 #[test]
1588 fn test_bulk_part_converter_sorting() {
1589 let metadata = metadata_for_test();
1590 let capacity = 100;
1591 let primary_key_codec = build_primary_key_codec(&metadata);
1592 let schema = to_flat_sst_arrow_schema(
1593 &metadata,
1594 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1595 );
1596
1597 let mut converter =
1598 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1599
1600 let key_values1 = build_key_values_with_ts_seq_values(
1601 &metadata,
1602 "z_key".to_string(),
1603 3u32,
1604 vec![3000].into_iter(),
1605 vec![Some(3.0)].into_iter(),
1606 3,
1607 );
1608
1609 let key_values2 = build_key_values_with_ts_seq_values(
1610 &metadata,
1611 "a_key".to_string(),
1612 1u32,
1613 vec![1000].into_iter(),
1614 vec![Some(1.0)].into_iter(),
1615 1,
1616 );
1617
1618 let key_values3 = build_key_values_with_ts_seq_values(
1619 &metadata,
1620 "m_key".to_string(),
1621 2u32,
1622 vec![2000].into_iter(),
1623 vec![Some(2.0)].into_iter(),
1624 2,
1625 );
1626
1627 converter.append_key_values(&key_values1).unwrap();
1628 converter.append_key_values(&key_values2).unwrap();
1629 converter.append_key_values(&key_values3).unwrap();
1630
1631 let bulk_part = converter.convert().unwrap();
1632
1633 assert_eq!(bulk_part.num_rows(), 3);
1634
1635 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1636 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1637
1638 let ts_array = ts_column
1639 .as_any()
1640 .downcast_ref::<TimestampMillisecondArray>()
1641 .unwrap();
1642 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1643
1644 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1645 assert_eq!(seq_array.values(), &[1, 2, 3]);
1646
1647 let schema = bulk_part.batch.schema();
1649 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1650 assert_eq!(
1651 field_names,
1652 vec![
1653 "k0",
1654 "k1",
1655 "v0",
1656 "v1",
1657 "ts",
1658 "__primary_key",
1659 "__sequence",
1660 "__op_type"
1661 ]
1662 );
1663 }
1664
1665 #[test]
1666 fn test_bulk_part_converter_empty() {
1667 let metadata = metadata_for_test();
1668 let capacity = 10;
1669 let primary_key_codec = build_primary_key_codec(&metadata);
1670 let schema = to_flat_sst_arrow_schema(
1671 &metadata,
1672 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1673 );
1674
1675 let converter =
1676 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1677
1678 let bulk_part = converter.convert().unwrap();
1679
1680 assert_eq!(bulk_part.num_rows(), 0);
1681 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1682 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1683 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1684
1685 let schema = bulk_part.batch.schema();
1687 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1688 assert_eq!(
1689 field_names,
1690 vec![
1691 "k0",
1692 "k1",
1693 "v0",
1694 "v1",
1695 "ts",
1696 "__primary_key",
1697 "__sequence",
1698 "__op_type"
1699 ]
1700 );
1701 }
1702
1703 #[test]
1704 fn test_bulk_part_converter_without_primary_key_columns() {
1705 let metadata = metadata_for_test();
1706 let primary_key_codec = build_primary_key_codec(&metadata);
1707 let schema = to_flat_sst_arrow_schema(
1708 &metadata,
1709 &FlatSchemaOptions {
1710 raw_pk_columns: false,
1711 string_pk_use_dict: true,
1712 },
1713 );
1714
1715 let capacity = 100;
1716 let mut converter =
1717 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1718
1719 let key_values1 = build_key_values_with_ts_seq_values(
1720 &metadata,
1721 "key1".to_string(),
1722 1u32,
1723 vec![1000, 2000].into_iter(),
1724 vec![Some(1.0), Some(2.0)].into_iter(),
1725 1,
1726 );
1727
1728 let key_values2 = build_key_values_with_ts_seq_values(
1729 &metadata,
1730 "key2".to_string(),
1731 2u32,
1732 vec![1500].into_iter(),
1733 vec![Some(3.0)].into_iter(),
1734 2,
1735 );
1736
1737 converter.append_key_values(&key_values1).unwrap();
1738 converter.append_key_values(&key_values2).unwrap();
1739
1740 let bulk_part = converter.convert().unwrap();
1741
1742 assert_eq!(bulk_part.num_rows(), 3);
1743 assert_eq!(bulk_part.min_timestamp, 1000);
1744 assert_eq!(bulk_part.max_timestamp, 2000);
1745 assert_eq!(bulk_part.sequence, 2);
1746 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1747
1748 let schema = bulk_part.batch.schema();
1750 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1751 assert_eq!(
1752 field_names,
1753 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1754 );
1755 }
1756
1757 #[allow(clippy::too_many_arguments)]
1758 fn build_key_values_with_sparse_encoding(
1759 metadata: &RegionMetadataRef,
1760 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1761 table_id: u32,
1762 tsid: u64,
1763 k0: String,
1764 k1: String,
1765 timestamps: impl Iterator<Item = i64>,
1766 values: impl Iterator<Item = Option<f64>>,
1767 sequence: SequenceNumber,
1768 ) -> KeyValues {
1769 let pk_values = vec![
1771 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1772 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1773 (0, Value::String(k0.clone().into())),
1774 (1, Value::String(k1.clone().into())),
1775 ];
1776 let mut encoded_key = Vec::new();
1777 primary_key_codec
1778 .encode_values(&pk_values, &mut encoded_key)
1779 .unwrap();
1780 assert!(!encoded_key.is_empty());
1781
1782 let column_schema = vec![
1784 api::v1::ColumnSchema {
1785 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1786 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1787 ConcreteDataType::binary_datatype(),
1788 )
1789 .unwrap()
1790 .datatype() as i32,
1791 semantic_type: api::v1::SemanticType::Tag as i32,
1792 ..Default::default()
1793 },
1794 api::v1::ColumnSchema {
1795 column_name: "ts".to_string(),
1796 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1797 ConcreteDataType::timestamp_millisecond_datatype(),
1798 )
1799 .unwrap()
1800 .datatype() as i32,
1801 semantic_type: api::v1::SemanticType::Timestamp as i32,
1802 ..Default::default()
1803 },
1804 api::v1::ColumnSchema {
1805 column_name: "v0".to_string(),
1806 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1807 ConcreteDataType::int64_datatype(),
1808 )
1809 .unwrap()
1810 .datatype() as i32,
1811 semantic_type: api::v1::SemanticType::Field as i32,
1812 ..Default::default()
1813 },
1814 api::v1::ColumnSchema {
1815 column_name: "v1".to_string(),
1816 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1817 ConcreteDataType::float64_datatype(),
1818 )
1819 .unwrap()
1820 .datatype() as i32,
1821 semantic_type: api::v1::SemanticType::Field as i32,
1822 ..Default::default()
1823 },
1824 ];
1825
1826 let rows = timestamps
1827 .zip(values)
1828 .map(|(ts, v)| Row {
1829 values: vec![
1830 api::v1::Value {
1831 value_data: Some(api::v1::value::ValueData::BinaryValue(
1832 encoded_key.clone(),
1833 )),
1834 },
1835 api::v1::Value {
1836 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1837 },
1838 api::v1::Value {
1839 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1840 },
1841 api::v1::Value {
1842 value_data: v.map(api::v1::value::ValueData::F64Value),
1843 },
1844 ],
1845 })
1846 .collect();
1847
1848 let mutation = api::v1::Mutation {
1849 op_type: 1,
1850 sequence,
1851 rows: Some(api::v1::Rows {
1852 schema: column_schema,
1853 rows,
1854 }),
1855 write_hint: Some(WriteHint {
1856 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1857 }),
1858 };
1859 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1860 }
1861
1862 #[test]
1863 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1864 use api::v1::SemanticType;
1865 use datatypes::schema::ColumnSchema;
1866 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1867 use store_api::storage::RegionId;
1868
1869 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1870 builder
1871 .push_column_metadata(ColumnMetadata {
1872 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1873 semantic_type: SemanticType::Tag,
1874 column_id: 0,
1875 })
1876 .push_column_metadata(ColumnMetadata {
1877 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1878 semantic_type: SemanticType::Tag,
1879 column_id: 1,
1880 })
1881 .push_column_metadata(ColumnMetadata {
1882 column_schema: ColumnSchema::new(
1883 "ts",
1884 ConcreteDataType::timestamp_millisecond_datatype(),
1885 false,
1886 ),
1887 semantic_type: SemanticType::Timestamp,
1888 column_id: 2,
1889 })
1890 .push_column_metadata(ColumnMetadata {
1891 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1892 semantic_type: SemanticType::Field,
1893 column_id: 3,
1894 })
1895 .push_column_metadata(ColumnMetadata {
1896 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1897 semantic_type: SemanticType::Field,
1898 column_id: 4,
1899 })
1900 .primary_key(vec![0, 1])
1901 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1902 let metadata = Arc::new(builder.build().unwrap());
1903
1904 let primary_key_codec = build_primary_key_codec(&metadata);
1905 let schema = to_flat_sst_arrow_schema(
1906 &metadata,
1907 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1908 );
1909
1910 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1911 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1912
1913 let capacity = 100;
1914 let mut converter =
1915 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1916
1917 let key_values1 = build_key_values_with_sparse_encoding(
1918 &metadata,
1919 &primary_key_codec,
1920 2048u32, 100u64, "key11".to_string(),
1923 "key21".to_string(),
1924 vec![1000, 2000].into_iter(),
1925 vec![Some(1.0), Some(2.0)].into_iter(),
1926 1,
1927 );
1928
1929 let key_values2 = build_key_values_with_sparse_encoding(
1930 &metadata,
1931 &primary_key_codec,
1932 4096u32, 200u64, "key12".to_string(),
1935 "key22".to_string(),
1936 vec![1500].into_iter(),
1937 vec![Some(3.0)].into_iter(),
1938 2,
1939 );
1940
1941 converter.append_key_values(&key_values1).unwrap();
1942 converter.append_key_values(&key_values2).unwrap();
1943
1944 let bulk_part = converter.convert().unwrap();
1945
1946 assert_eq!(bulk_part.num_rows(), 3);
1947 assert_eq!(bulk_part.min_timestamp, 1000);
1948 assert_eq!(bulk_part.max_timestamp, 2000);
1949 assert_eq!(bulk_part.sequence, 2);
1950 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1951
1952 let schema = bulk_part.batch.schema();
1956 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1957 assert_eq!(
1958 field_names,
1959 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1960 );
1961
1962 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1964 let dict_array = primary_key_column
1965 .as_any()
1966 .downcast_ref::<DictionaryArray<UInt32Type>>()
1967 .unwrap();
1968
1969 assert!(!dict_array.is_empty());
1971 assert_eq!(dict_array.len(), 3); let values = dict_array
1975 .values()
1976 .as_any()
1977 .downcast_ref::<BinaryArray>()
1978 .unwrap();
1979 for i in 0..values.len() {
1980 assert!(
1981 !values.value(i).is_empty(),
1982 "Encoded primary key should not be empty"
1983 );
1984 }
1985 }
1986}