1use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64 InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80#[derive(Clone)]
82pub struct BulkPart {
83 pub batch: RecordBatch,
84 pub max_timestamp: i64,
85 pub min_timestamp: i64,
86 pub sequence: u64,
87 pub timestamp_index: usize,
88 pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92 type Error = error::Error;
93
94 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95 match value.body.expect("Entry payload should be present") {
96 Body::ArrowIpc(ipc) => {
97 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 let batch = decoder
100 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101 .context(error::ConvertBulkWalEntrySnafu)?;
102 Ok(Self {
103 batch,
104 max_timestamp: value.max_ts,
105 min_timestamp: value.min_ts,
106 sequence: value.sequence,
107 timestamp_index: value.timestamp_index as usize,
108 raw_data: Some(ipc),
109 })
110 }
111 }
112 }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116 type Error = error::Error;
117
118 fn try_from(value: &BulkPart) -> Result<Self> {
119 if let Some(ipc) = &value.raw_data {
120 Ok(BulkWalEntry {
121 sequence: value.sequence,
122 max_ts: value.max_timestamp,
123 min_ts: value.min_timestamp,
124 timestamp_index: value.timestamp_index as u32,
125 body: Some(Body::ArrowIpc(ipc.clone())),
126 })
127 } else {
128 let mut encoder = FlightEncoder::default();
129 let schema_bytes = encoder
130 .encode_schema(value.batch.schema().as_ref())
131 .data_header;
132 let [rb_data] = encoder
133 .encode(FlightMessage::RecordBatch(value.batch.clone()))
134 .try_into()
135 .map_err(|_| {
136 error::UnsupportedOperationSnafu {
137 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138 }
139 .build()
140 })?;
141 Ok(BulkWalEntry {
142 sequence: value.sequence,
143 max_ts: value.max_timestamp,
144 min_ts: value.min_timestamp,
145 timestamp_index: value.timestamp_index as u32,
146 body: Some(Body::ArrowIpc(ArrowIpc {
147 schema: schema_bytes,
148 data_header: rb_data.data_header,
149 payload: rb_data.data_body,
150 })),
151 })
152 }
153 }
154}
155
156impl BulkPart {
157 pub(crate) fn estimated_size(&self) -> usize {
158 record_batch_estimated_size(&self.batch)
159 }
160
161 pub fn estimated_series_count(&self) -> usize {
164 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165 let pk_column = self.batch.column(pk_column_idx);
166 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167 dict_array.values().len()
168 } else {
169 0
170 }
171 }
172
173 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
183 let batch_schema = self.batch.schema();
185 let batch_columns: HashSet<_> = batch_schema
186 .fields()
187 .iter()
188 .map(|f| f.name().as_str())
189 .collect();
190
191 let mut columns_to_fill = Vec::new();
193 for column_meta in ®ion_metadata.column_metadatas {
194 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
197 columns_to_fill.push(column_meta);
198 }
199 }
200
201 if columns_to_fill.is_empty() {
202 return Ok(());
203 }
204
205 let num_rows = self.batch.num_rows();
206
207 let mut new_columns = Vec::new();
208 let mut new_fields = Vec::new();
209
210 new_fields.extend(batch_schema.fields().iter().cloned());
212 new_columns.extend_from_slice(self.batch.columns());
213
214 let region_id = region_metadata.region_id;
215 for column_meta in columns_to_fill {
217 let default_vector = column_meta
218 .column_schema
219 .create_default_vector(num_rows)
220 .context(CreateDefaultSnafu {
221 region_id,
222 column: &column_meta.column_schema.name,
223 })?
224 .with_context(|| InvalidRequestSnafu {
225 region_id,
226 reason: format!(
227 "column {} does not have default value",
228 column_meta.column_schema.name
229 ),
230 })?;
231 let arrow_array = default_vector.to_arrow_array();
232 column_meta.column_schema.data_type.as_arrow_type();
233
234 new_fields.push(Arc::new(Field::new(
235 column_meta.column_schema.name.clone(),
236 column_meta.column_schema.data_type.as_arrow_type(),
237 column_meta.column_schema.is_nullable(),
238 )));
239 new_columns.push(arrow_array);
240 }
241
242 let new_schema = Arc::new(Schema::new(new_fields));
244 let new_batch =
245 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
246
247 self.batch = new_batch;
249
250 Ok(())
251 }
252
253 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
255 let vectors = region_metadata
256 .schema
257 .column_schemas()
258 .iter()
259 .map(|col| match self.batch.column_by_name(&col.name) {
260 None => Ok(None),
261 Some(col) => Helper::try_into_vector(col).map(Some),
262 })
263 .collect::<datatypes::error::Result<Vec<_>>>()
264 .context(error::ComputeVectorSnafu)?;
265
266 let rows = (0..self.num_rows())
267 .map(|row_idx| {
268 let values = (0..self.batch.num_columns())
269 .map(|col_idx| {
270 if let Some(v) = &vectors[col_idx] {
271 to_grpc_value(v.get(row_idx))
272 } else {
273 api::v1::Value { value_data: None }
274 }
275 })
276 .collect::<Vec<_>>();
277 api::v1::Row { values }
278 })
279 .collect::<Vec<_>>();
280
281 let schema = region_metadata
282 .column_metadatas
283 .iter()
284 .map(|c| {
285 let data_type_wrapper =
286 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
287 Ok(api::v1::ColumnSchema {
288 column_name: c.column_schema.name.clone(),
289 datatype: data_type_wrapper.datatype() as i32,
290 semantic_type: c.semantic_type as i32,
291 ..Default::default()
292 })
293 })
294 .collect::<api::error::Result<Vec<_>>>()
295 .context(error::ConvertColumnDataTypeSnafu {
296 reason: "failed to convert region metadata to column schema",
297 })?;
298
299 let rows = api::v1::Rows { schema, rows };
300
301 Ok(Mutation {
302 op_type: OpType::Put as i32,
303 sequence: self.sequence,
304 rows: Some(rows),
305 write_hint: None,
306 })
307 }
308
309 pub fn timestamps(&self) -> &ArrayRef {
310 self.batch.column(self.timestamp_index)
311 }
312
313 pub fn num_rows(&self) -> usize {
314 self.batch.num_rows()
315 }
316}
317
318pub struct UnorderedPart {
321 parts: Vec<BulkPart>,
323 total_rows: usize,
325 min_timestamp: i64,
327 max_timestamp: i64,
329 max_sequence: u64,
331 threshold: usize,
333 compact_threshold: usize,
335}
336
337impl Default for UnorderedPart {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343impl UnorderedPart {
344 pub fn new() -> Self {
346 Self {
347 parts: Vec::new(),
348 total_rows: 0,
349 min_timestamp: i64::MAX,
350 max_timestamp: i64::MIN,
351 max_sequence: 0,
352 threshold: 1024,
353 compact_threshold: 4096,
354 }
355 }
356
357 pub fn set_threshold(&mut self, threshold: usize) {
359 self.threshold = threshold;
360 }
361
362 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
364 self.compact_threshold = compact_threshold;
365 }
366
367 pub fn threshold(&self) -> usize {
369 self.threshold
370 }
371
372 pub fn compact_threshold(&self) -> usize {
374 self.compact_threshold
375 }
376
377 pub fn should_accept(&self, num_rows: usize) -> bool {
379 num_rows < self.threshold
380 }
381
382 pub fn should_compact(&self) -> bool {
384 self.total_rows >= self.compact_threshold
385 }
386
387 pub fn push(&mut self, part: BulkPart) {
389 self.total_rows += part.num_rows();
390 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
391 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
392 self.max_sequence = self.max_sequence.max(part.sequence);
393 self.parts.push(part);
394 }
395
396 pub fn num_rows(&self) -> usize {
398 self.total_rows
399 }
400
401 pub fn is_empty(&self) -> bool {
403 self.parts.is_empty()
404 }
405
406 pub fn num_parts(&self) -> usize {
408 self.parts.len()
409 }
410
411 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
414 if self.parts.is_empty() {
415 return Ok(None);
416 }
417
418 if self.parts.len() == 1 {
419 return Ok(Some(self.parts[0].batch.clone()));
421 }
422
423 let schema = self.parts[0].batch.schema();
425
426 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
428 let concatenated =
429 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
430
431 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
433
434 Ok(Some(sorted_batch))
435 }
436
437 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
440 let Some(sorted_batch) = self.concat_and_sort()? else {
441 return Ok(None);
442 };
443
444 let timestamp_index = self.parts[0].timestamp_index;
445
446 Ok(Some(BulkPart {
447 batch: sorted_batch,
448 max_timestamp: self.max_timestamp,
449 min_timestamp: self.min_timestamp,
450 sequence: self.max_sequence,
451 timestamp_index,
452 raw_data: None,
453 }))
454 }
455
456 pub fn clear(&mut self) {
458 self.parts.clear();
459 self.total_rows = 0;
460 self.min_timestamp = i64::MAX;
461 self.max_timestamp = i64::MIN;
462 self.max_sequence = 0;
463 }
464}
465
466pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
468 batch
469 .columns()
470 .iter()
471 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
473 .sum()
474}
475
476enum PrimaryKeyColumnBuilder {
478 StringDict(StringDictionaryBuilder<UInt32Type>),
480 Vector(Box<dyn MutableVector>),
482}
483
484impl PrimaryKeyColumnBuilder {
485 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
487 match self {
488 PrimaryKeyColumnBuilder::StringDict(builder) => {
489 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
490 builder.append_value(s);
492 } else {
493 builder.append_null();
494 }
495 }
496 PrimaryKeyColumnBuilder::Vector(builder) => {
497 builder.push_value_ref(&value);
498 }
499 }
500 Ok(())
501 }
502
503 fn into_arrow_array(self) -> ArrayRef {
505 match self {
506 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
507 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
508 }
509 }
510}
511
512pub struct BulkPartConverter {
514 region_metadata: RegionMetadataRef,
516 schema: SchemaRef,
518 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
520 key_buf: Vec<u8>,
522 key_array_builder: PrimaryKeyArrayBuilder,
524 value_builder: ValueBuilder,
526 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
529
530 max_ts: i64,
532 min_ts: i64,
534 max_sequence: SequenceNumber,
536}
537
538impl BulkPartConverter {
539 pub fn new(
544 region_metadata: &RegionMetadataRef,
545 schema: SchemaRef,
546 capacity: usize,
547 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
548 store_primary_key_columns: bool,
549 ) -> Self {
550 debug_assert_eq!(
551 region_metadata.primary_key_encoding,
552 primary_key_codec.encoding()
553 );
554
555 let primary_key_column_builders = if store_primary_key_columns
556 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
557 {
558 new_primary_key_column_builders(region_metadata, capacity)
559 } else {
560 Vec::new()
561 };
562
563 Self {
564 region_metadata: region_metadata.clone(),
565 schema,
566 primary_key_codec,
567 key_buf: Vec::new(),
568 key_array_builder: PrimaryKeyArrayBuilder::new(),
569 value_builder: ValueBuilder::new(region_metadata, capacity),
570 primary_key_column_builders,
571 min_ts: i64::MAX,
572 max_ts: i64::MIN,
573 max_sequence: SequenceNumber::MIN,
574 }
575 }
576
577 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
579 for kv in key_values.iter() {
580 self.append_key_value(&kv)?;
581 }
582
583 Ok(())
584 }
585
586 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
590 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
592 let mut primary_keys = kv.primary_keys();
595 if let Some(encoded) = primary_keys
596 .next()
597 .context(ColumnNotFoundSnafu {
598 column: PRIMARY_KEY_COLUMN_NAME,
599 })?
600 .try_into_binary()
601 .context(DataTypeMismatchSnafu)?
602 {
603 self.key_array_builder
604 .append(encoded)
605 .context(ComputeArrowSnafu)?;
606 } else {
607 self.key_array_builder
608 .append("")
609 .context(ComputeArrowSnafu)?;
610 }
611 } else {
612 self.key_buf.clear();
614 self.primary_key_codec
615 .encode_key_value(kv, &mut self.key_buf)
616 .context(EncodeSnafu)?;
617 self.key_array_builder
618 .append(&self.key_buf)
619 .context(ComputeArrowSnafu)?;
620 };
621
622 if !self.primary_key_column_builders.is_empty() {
624 for (builder, pk_value) in self
625 .primary_key_column_builders
626 .iter_mut()
627 .zip(kv.primary_keys())
628 {
629 builder.push_value_ref(pk_value)?;
630 }
631 }
632
633 self.value_builder.push(
635 kv.timestamp(),
636 kv.sequence(),
637 kv.op_type() as u8,
638 kv.fields(),
639 );
640
641 let ts = kv
644 .timestamp()
645 .try_into_timestamp()
646 .unwrap()
647 .unwrap()
648 .value();
649 self.min_ts = self.min_ts.min(ts);
650 self.max_ts = self.max_ts.max(ts);
651 self.max_sequence = self.max_sequence.max(kv.sequence());
652
653 Ok(())
654 }
655
656 pub fn convert(mut self) -> Result<BulkPart> {
660 let values = Values::from(self.value_builder);
661 let mut columns =
662 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
663
664 for builder in self.primary_key_column_builders {
666 columns.push(builder.into_arrow_array());
667 }
668 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
670 let timestamp_index = columns.len();
672 columns.push(values.timestamp.to_arrow_array());
673 let pk_array = self.key_array_builder.finish();
675 columns.push(Arc::new(pk_array));
676 columns.push(values.sequence.to_arrow_array());
678 columns.push(values.op_type.to_arrow_array());
679
680 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
681 let batch = sort_primary_key_record_batch(&batch)?;
683
684 Ok(BulkPart {
685 batch,
686 max_timestamp: self.max_ts,
687 min_timestamp: self.min_ts,
688 sequence: self.max_sequence,
689 timestamp_index,
690 raw_data: None,
691 })
692 }
693}
694
695fn new_primary_key_column_builders(
696 metadata: &RegionMetadata,
697 capacity: usize,
698) -> Vec<PrimaryKeyColumnBuilder> {
699 metadata
700 .primary_key_columns()
701 .map(|col| {
702 if col.column_schema.data_type.is_string() {
703 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
704 capacity,
705 INIT_DICT_VALUE_CAPACITY,
706 capacity,
707 ))
708 } else {
709 PrimaryKeyColumnBuilder::Vector(
710 col.column_schema.data_type.create_mutable_vector(capacity),
711 )
712 }
713 })
714 .collect()
715}
716
717fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
719 let total_columns = batch.num_columns();
720 let sort_columns = vec![
721 SortColumn {
723 values: batch.column(total_columns - 3).clone(),
724 options: Some(SortOptions {
725 descending: false,
726 nulls_first: true,
727 }),
728 },
729 SortColumn {
731 values: batch.column(total_columns - 4).clone(),
732 options: Some(SortOptions {
733 descending: false,
734 nulls_first: true,
735 }),
736 },
737 SortColumn {
739 values: batch.column(total_columns - 2).clone(),
740 options: Some(SortOptions {
741 descending: true,
742 nulls_first: true,
743 }),
744 },
745 ];
746
747 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
748 .context(ComputeArrowSnafu)?;
749
750 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
751}
752
753pub fn convert_bulk_part(
779 part: BulkPart,
780 region_metadata: &RegionMetadataRef,
781 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
782 schema: SchemaRef,
783 store_primary_key_columns: bool,
784) -> Result<Option<BulkPart>> {
785 if part.num_rows() == 0 {
786 return Ok(None);
787 }
788
789 let num_rows = part.num_rows();
790 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
791
792 let input_schema = part.batch.schema();
794 let column_indices: HashMap<&str, usize> = input_schema
795 .fields()
796 .iter()
797 .enumerate()
798 .map(|(idx, field)| (field.name().as_str(), idx))
799 .collect();
800
801 let mut output_columns = Vec::new();
803
804 let pk_array = if is_sparse {
806 None
809 } else {
810 let pk_vectors: Result<Vec<_>> = region_metadata
812 .primary_key_columns()
813 .map(|col_meta| {
814 let col_idx = column_indices
815 .get(col_meta.column_schema.name.as_str())
816 .context(ColumnNotFoundSnafu {
817 column: &col_meta.column_schema.name,
818 })?;
819 let col = part.batch.column(*col_idx);
820 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
821 })
822 .collect();
823 let pk_vectors = pk_vectors?;
824
825 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
826 let mut encode_buf = Vec::new();
827
828 for row_idx in 0..num_rows {
829 encode_buf.clear();
830
831 let pk_values_with_ids: Vec<_> = region_metadata
833 .primary_key
834 .iter()
835 .zip(pk_vectors.iter())
836 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
837 .collect();
838
839 primary_key_codec
841 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
842 .context(EncodeSnafu)?;
843
844 key_array_builder
845 .append(&encode_buf)
846 .context(ComputeArrowSnafu)?;
847 }
848
849 Some(key_array_builder.finish())
850 };
851
852 if store_primary_key_columns && !is_sparse {
854 for col_meta in region_metadata.primary_key_columns() {
855 let col_idx = column_indices
856 .get(col_meta.column_schema.name.as_str())
857 .context(ColumnNotFoundSnafu {
858 column: &col_meta.column_schema.name,
859 })?;
860 let col = part.batch.column(*col_idx);
861
862 let col = if col_meta.column_schema.data_type.is_string() {
864 let target_type = ArrowDataType::Dictionary(
865 Box::new(ArrowDataType::UInt32),
866 Box::new(ArrowDataType::Utf8),
867 );
868 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
869 } else {
870 col.clone()
871 };
872 output_columns.push(col);
873 }
874 }
875
876 for col_meta in region_metadata.field_columns() {
878 let col_idx = column_indices
879 .get(col_meta.column_schema.name.as_str())
880 .context(ColumnNotFoundSnafu {
881 column: &col_meta.column_schema.name,
882 })?;
883 output_columns.push(part.batch.column(*col_idx).clone());
884 }
885
886 let new_timestamp_index = output_columns.len();
888 let ts_col_idx = column_indices
889 .get(
890 region_metadata
891 .time_index_column()
892 .column_schema
893 .name
894 .as_str(),
895 )
896 .context(ColumnNotFoundSnafu {
897 column: ®ion_metadata.time_index_column().column_schema.name,
898 })?;
899 output_columns.push(part.batch.column(*ts_col_idx).clone());
900
901 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
903 Arc::new(pk_dict_array) as ArrayRef
904 } else {
905 let pk_col_idx =
906 column_indices
907 .get(PRIMARY_KEY_COLUMN_NAME)
908 .context(ColumnNotFoundSnafu {
909 column: PRIMARY_KEY_COLUMN_NAME,
910 })?;
911 let col = part.batch.column(*pk_col_idx);
912
913 let target_type = ArrowDataType::Dictionary(
915 Box::new(ArrowDataType::UInt32),
916 Box::new(ArrowDataType::Binary),
917 );
918 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
919 };
920 output_columns.push(pk_dictionary);
921
922 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
923 output_columns.push(Arc::new(sequence_array) as ArrayRef);
924
925 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
926 output_columns.push(Arc::new(op_type_array) as ArrayRef);
927
928 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
929
930 let sorted_batch = sort_primary_key_record_batch(&batch)?;
932
933 Ok(Some(BulkPart {
934 batch: sorted_batch,
935 max_timestamp: part.max_timestamp,
936 min_timestamp: part.min_timestamp,
937 sequence: part.sequence,
938 timestamp_index: new_timestamp_index,
939 raw_data: None,
940 }))
941}
942
943#[derive(Debug, Clone)]
944pub struct EncodedBulkPart {
945 data: Bytes,
946 metadata: BulkPartMeta,
947}
948
949impl EncodedBulkPart {
950 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
951 Self { data, metadata }
952 }
953
954 pub(crate) fn metadata(&self) -> &BulkPartMeta {
955 &self.metadata
956 }
957
958 pub(crate) fn size_bytes(&self) -> usize {
960 self.data.len()
961 }
962
963 pub(crate) fn data(&self) -> &Bytes {
965 &self.data
966 }
967
968 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
976 let unit = self.metadata.region_metadata.time_index_type().unit();
977 SstInfo {
978 file_id,
979 time_range: (
980 Timestamp::new(self.metadata.min_timestamp, unit),
981 Timestamp::new(self.metadata.max_timestamp, unit),
982 ),
983 file_size: self.data.len() as u64,
984 num_rows: self.metadata.num_rows,
985 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
986 file_metadata: Some(self.metadata.parquet_metadata.clone()),
987 index_metadata: IndexOutput::default(),
988 num_series: self.metadata.num_series,
989 }
990 }
991
992 pub(crate) fn read(
993 &self,
994 context: BulkIterContextRef,
995 sequence: Option<SequenceRange>,
996 mem_scan_metrics: Option<MemScanMetrics>,
997 ) -> Result<Option<BoxedRecordBatchIterator>> {
998 let skip_fields_for_pruning =
1000 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1001
1002 let row_groups_to_read =
1004 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1005
1006 if row_groups_to_read.is_empty() {
1007 return Ok(None);
1009 }
1010
1011 let iter = EncodedBulkPartIter::try_new(
1012 self,
1013 context,
1014 row_groups_to_read,
1015 sequence,
1016 mem_scan_metrics,
1017 )?;
1018 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1019 }
1020
1021 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1023 match pre_filter_mode {
1024 PreFilterMode::All => false,
1025 PreFilterMode::SkipFields => true,
1026 PreFilterMode::SkipFieldsOnDelete => {
1027 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1029 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1030 })
1031 }
1032 }
1033 }
1034}
1035
1036#[derive(Debug, Clone)]
1037pub struct BulkPartMeta {
1038 pub num_rows: usize,
1040 pub max_timestamp: i64,
1042 pub min_timestamp: i64,
1044 pub parquet_metadata: Arc<ParquetMetaData>,
1046 pub region_metadata: RegionMetadataRef,
1048 pub num_series: u64,
1050}
1051
1052#[derive(Default, Debug)]
1054pub struct BulkPartEncodeMetrics {
1055 pub iter_cost: Duration,
1057 pub write_cost: Duration,
1059 pub raw_size: usize,
1061 pub encoded_size: usize,
1063 pub num_rows: usize,
1065}
1066
1067pub struct BulkPartEncoder {
1068 metadata: RegionMetadataRef,
1069 row_group_size: usize,
1070 writer_props: Option<WriterProperties>,
1071}
1072
1073impl BulkPartEncoder {
1074 pub(crate) fn new(
1075 metadata: RegionMetadataRef,
1076 row_group_size: usize,
1077 ) -> Result<BulkPartEncoder> {
1078 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1080 let key_value_meta =
1081 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1082
1083 let writer_props = Some(
1085 WriterProperties::builder()
1086 .set_key_value_metadata(Some(vec![key_value_meta]))
1087 .set_write_batch_size(row_group_size)
1088 .set_max_row_group_size(row_group_size)
1089 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1090 .set_column_index_truncate_length(None)
1091 .set_statistics_truncate_length(None)
1092 .build(),
1093 );
1094
1095 Ok(Self {
1096 metadata,
1097 row_group_size,
1098 writer_props,
1099 })
1100 }
1101}
1102
1103impl BulkPartEncoder {
1104 pub fn encode_record_batch_iter(
1106 &self,
1107 iter: BoxedRecordBatchIterator,
1108 arrow_schema: SchemaRef,
1109 min_timestamp: i64,
1110 max_timestamp: i64,
1111 metrics: &mut BulkPartEncodeMetrics,
1112 ) -> Result<Option<EncodedBulkPart>> {
1113 let mut buf = Vec::with_capacity(4096);
1114 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1115 .context(EncodeMemtableSnafu)?;
1116 let mut total_rows = 0;
1117 let mut series_estimator = SeriesEstimator::default();
1118
1119 let mut iter_start = Instant::now();
1121 for batch_result in iter {
1122 metrics.iter_cost += iter_start.elapsed();
1123 let batch = batch_result?;
1124 if batch.num_rows() == 0 {
1125 continue;
1126 }
1127
1128 series_estimator.update_flat(&batch);
1129 metrics.raw_size += record_batch_estimated_size(&batch);
1130 let write_start = Instant::now();
1131 writer.write(&batch).context(EncodeMemtableSnafu)?;
1132 metrics.write_cost += write_start.elapsed();
1133 total_rows += batch.num_rows();
1134 iter_start = Instant::now();
1135 }
1136 metrics.iter_cost += iter_start.elapsed();
1137 iter_start = Instant::now();
1138
1139 if total_rows == 0 {
1140 return Ok(None);
1141 }
1142
1143 let close_start = Instant::now();
1144 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1145 metrics.write_cost += close_start.elapsed();
1146 metrics.encoded_size += buf.len();
1147 metrics.num_rows += total_rows;
1148
1149 let buf = Bytes::from(buf);
1150 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1151 let num_series = series_estimator.finish();
1152
1153 Ok(Some(EncodedBulkPart {
1154 data: buf,
1155 metadata: BulkPartMeta {
1156 num_rows: total_rows,
1157 max_timestamp,
1158 min_timestamp,
1159 parquet_metadata,
1160 region_metadata: self.metadata.clone(),
1161 num_series,
1162 },
1163 }))
1164 }
1165
1166 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1168 if part.batch.num_rows() == 0 {
1169 return Ok(None);
1170 }
1171
1172 let mut buf = Vec::with_capacity(4096);
1173 let arrow_schema = part.batch.schema();
1174
1175 let file_metadata = {
1176 let mut writer =
1177 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1178 .context(EncodeMemtableSnafu)?;
1179 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1180 writer.finish().context(EncodeMemtableSnafu)?
1181 };
1182
1183 let buf = Bytes::from(buf);
1184 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1185
1186 Ok(Some(EncodedBulkPart {
1187 data: buf,
1188 metadata: BulkPartMeta {
1189 num_rows: part.batch.num_rows(),
1190 max_timestamp: part.max_timestamp,
1191 min_timestamp: part.min_timestamp,
1192 parquet_metadata,
1193 region_metadata: self.metadata.clone(),
1194 num_series: part.estimated_series_count() as u64,
1195 },
1196 }))
1197 }
1198}
1199
1200fn mutations_to_record_batch(
1202 mutations: &[Mutation],
1203 metadata: &RegionMetadataRef,
1204 pk_encoder: &DensePrimaryKeyCodec,
1205 dedup: bool,
1206) -> Result<Option<(RecordBatch, i64, i64)>> {
1207 let total_rows: usize = mutations
1208 .iter()
1209 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
1210 .sum();
1211
1212 if total_rows == 0 {
1213 return Ok(None);
1214 }
1215
1216 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
1217
1218 let mut ts_vector: Box<dyn MutableVector> = metadata
1219 .time_index_column()
1220 .column_schema
1221 .data_type
1222 .create_mutable_vector(total_rows);
1223 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
1224 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
1225
1226 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
1227 .field_columns()
1228 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
1229 .collect();
1230
1231 let mut pk_buffer = vec![];
1232 for m in mutations {
1233 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
1234 continue;
1235 };
1236
1237 for row in key_values.iter() {
1238 pk_buffer.clear();
1239 pk_encoder
1240 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
1241 .context(EncodeSnafu)?;
1242 pk_builder.append_value(pk_buffer.as_bytes());
1243 ts_vector.push_value_ref(&row.timestamp());
1244 sequence_builder.append_value(row.sequence());
1245 op_type_builder.append_value(row.op_type() as u8);
1246 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
1247 builder.push_value_ref(&field);
1248 }
1249 }
1250 }
1251
1252 let arrow_schema = to_sst_arrow_schema(metadata);
1253 let timestamp_unit = metadata
1255 .time_index_column()
1256 .column_schema
1257 .data_type
1258 .as_timestamp()
1259 .unwrap()
1260 .unit();
1261 let sorter = ArraysSorter {
1262 encoded_primary_keys: pk_builder.finish(),
1263 timestamp_unit,
1264 timestamp: ts_vector.to_vector().to_arrow_array(),
1265 sequence: sequence_builder.finish(),
1266 op_type: op_type_builder.finish(),
1267 fields: field_builders
1268 .iter_mut()
1269 .map(|f| f.to_vector().to_arrow_array()),
1270 dedup,
1271 arrow_schema,
1272 };
1273
1274 sorter.sort().map(Some)
1275}
1276
1277struct ArraysSorter<I> {
1278 encoded_primary_keys: BinaryArray,
1279 timestamp_unit: TimeUnit,
1280 timestamp: ArrayRef,
1281 sequence: UInt64Array,
1282 op_type: UInt8Array,
1283 fields: I,
1284 dedup: bool,
1285 arrow_schema: SchemaRef,
1286}
1287
1288impl<I> ArraysSorter<I>
1289where
1290 I: Iterator<Item = ArrayRef>,
1291{
1292 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
1294 debug_assert!(!self.timestamp.is_empty());
1295 debug_assert!(self.timestamp.len() == self.sequence.len());
1296 debug_assert!(self.timestamp.len() == self.op_type.len());
1297 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
1298
1299 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
1300 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
1301 let mut to_sort = self
1302 .encoded_primary_keys
1303 .iter()
1304 .zip(timestamp_iter)
1305 .zip(self.sequence.iter())
1306 .map(|((pk, timestamp), sequence)| {
1307 max_timestamp = max_timestamp.max(*timestamp);
1308 min_timestamp = min_timestamp.min(*timestamp);
1309 (pk, timestamp, sequence)
1310 })
1311 .enumerate()
1312 .collect::<Vec<_>>();
1313
1314 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
1315 l_pk.cmp(r_pk)
1316 .then(l_ts.cmp(r_ts))
1317 .then(l_seq.cmp(r_seq).reverse())
1318 });
1319
1320 if self.dedup {
1321 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
1323 l_pk == r_pk && l_ts == r_ts
1324 });
1325 }
1326
1327 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
1328
1329 let pk_dictionary = Arc::new(binary_array_to_dictionary(
1330 arrow::compute::take(
1332 &self.encoded_primary_keys,
1333 &indices,
1334 Some(TakeOptions {
1335 check_bounds: false,
1336 }),
1337 )
1338 .context(ComputeArrowSnafu)?
1339 .as_any()
1340 .downcast_ref::<BinaryArray>()
1341 .unwrap(),
1342 )?) as ArrayRef;
1343
1344 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
1345 for arr in self.fields {
1346 arrays.push(
1347 arrow::compute::take(
1348 &arr,
1349 &indices,
1350 Some(TakeOptions {
1351 check_bounds: false,
1352 }),
1353 )
1354 .context(ComputeArrowSnafu)?,
1355 );
1356 }
1357
1358 let timestamp = arrow::compute::take(
1359 &self.timestamp,
1360 &indices,
1361 Some(TakeOptions {
1362 check_bounds: false,
1363 }),
1364 )
1365 .context(ComputeArrowSnafu)?;
1366
1367 arrays.push(timestamp);
1368 arrays.push(pk_dictionary);
1369 arrays.push(
1370 arrow::compute::take(
1371 &self.sequence,
1372 &indices,
1373 Some(TakeOptions {
1374 check_bounds: false,
1375 }),
1376 )
1377 .context(ComputeArrowSnafu)?,
1378 );
1379
1380 arrays.push(
1381 arrow::compute::take(
1382 &self.op_type,
1383 &indices,
1384 Some(TakeOptions {
1385 check_bounds: false,
1386 }),
1387 )
1388 .context(ComputeArrowSnafu)?,
1389 );
1390
1391 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
1392 Ok((batch, min_timestamp, max_timestamp))
1393 }
1394}
1395
1396fn timestamp_array_to_iter(
1398 timestamp_unit: TimeUnit,
1399 timestamp: &ArrayRef,
1400) -> impl Iterator<Item = &i64> {
1401 match timestamp_unit {
1402 TimeUnit::Second => timestamp
1404 .as_any()
1405 .downcast_ref::<TimestampSecondArray>()
1406 .unwrap()
1407 .values()
1408 .iter(),
1409 TimeUnit::Millisecond => timestamp
1410 .as_any()
1411 .downcast_ref::<TimestampMillisecondArray>()
1412 .unwrap()
1413 .values()
1414 .iter(),
1415 TimeUnit::Microsecond => timestamp
1416 .as_any()
1417 .downcast_ref::<TimestampMicrosecondArray>()
1418 .unwrap()
1419 .values()
1420 .iter(),
1421 TimeUnit::Nanosecond => timestamp
1422 .as_any()
1423 .downcast_ref::<TimestampNanosecondArray>()
1424 .unwrap()
1425 .values()
1426 .iter(),
1427 }
1428}
1429
1430fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1432 if input.is_empty() {
1433 return Ok(DictionaryArray::new(
1434 UInt32Array::from(Vec::<u32>::new()),
1435 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1436 ));
1437 }
1438 let mut keys = Vec::with_capacity(16);
1439 let mut values = BinaryBuilder::new();
1440 let mut prev: usize = 0;
1441 keys.push(prev as u32);
1442 values.append_value(input.value(prev));
1443
1444 for current_bytes in input.iter().skip(1) {
1445 let current_bytes = current_bytes.unwrap();
1447 let prev_bytes = input.value(prev);
1448 if current_bytes != prev_bytes {
1449 values.append_value(current_bytes);
1450 prev += 1;
1451 }
1452 keys.push(prev as u32);
1453 }
1454
1455 Ok(DictionaryArray::new(
1456 UInt32Array::from(keys),
1457 Arc::new(values.finish()) as ArrayRef,
1458 ))
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use std::collections::VecDeque;
1464
1465 use api::v1::{Row, SemanticType, WriteHint};
1466 use datafusion_common::ScalarValue;
1467 use datatypes::arrow::array::Float64Array;
1468 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1469 use datatypes::schema::ColumnSchema;
1470 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1471 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1472 use store_api::storage::RegionId;
1473 use store_api::storage::consts::ReservedColumnId;
1474
1475 use super::*;
1476 use crate::memtable::bulk::context::BulkIterContext;
1477 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1478 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1479 use crate::test_util::memtable_util::{
1480 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1481 };
1482
1483 fn check_binary_array_to_dictionary(
1484 input: &[&[u8]],
1485 expected_keys: &[u32],
1486 expected_values: &[&[u8]],
1487 ) {
1488 let input = BinaryArray::from_iter_values(input.iter());
1489 let array = binary_array_to_dictionary(&input).unwrap();
1490 assert_eq!(
1491 &expected_keys,
1492 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1493 );
1494 assert_eq!(
1495 expected_values,
1496 &array
1497 .values()
1498 .as_any()
1499 .downcast_ref::<BinaryArray>()
1500 .unwrap()
1501 .iter()
1502 .map(|v| v.unwrap())
1503 .collect::<Vec<_>>()
1504 );
1505 }
1506
1507 #[test]
1508 fn test_binary_array_to_dictionary() {
1509 check_binary_array_to_dictionary(&[], &[], &[]);
1510
1511 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1512
1513 check_binary_array_to_dictionary(
1514 &["a".as_bytes(), "a".as_bytes()],
1515 &[0, 0],
1516 &["a".as_bytes()],
1517 );
1518
1519 check_binary_array_to_dictionary(
1520 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1521 &[0, 0, 1],
1522 &["a".as_bytes(), "b".as_bytes()],
1523 );
1524
1525 check_binary_array_to_dictionary(
1526 &[
1527 "a".as_bytes(),
1528 "a".as_bytes(),
1529 "b".as_bytes(),
1530 "c".as_bytes(),
1531 ],
1532 &[0, 0, 1, 2],
1533 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1534 );
1535 }
1536
1537 struct MutationInput<'a> {
1538 k0: &'a str,
1539 k1: u32,
1540 timestamps: &'a [i64],
1541 v1: &'a [Option<f64>],
1542 sequence: u64,
1543 }
1544
1545 #[derive(Debug, PartialOrd, PartialEq)]
1546 struct BatchOutput<'a> {
1547 pk_values: &'a [Value],
1548 timestamps: &'a [i64],
1549 v1: &'a [Option<f64>],
1550 }
1551
1552 fn check_mutations_to_record_batches(
1553 input: &[MutationInput],
1554 expected: &[BatchOutput],
1555 expected_timestamp: (i64, i64),
1556 dedup: bool,
1557 ) {
1558 let metadata = metadata_for_test();
1559 let mutations = input
1560 .iter()
1561 .map(|m| {
1562 build_key_values_with_ts_seq_values(
1563 &metadata,
1564 m.k0.to_string(),
1565 m.k1,
1566 m.timestamps.iter().copied(),
1567 m.v1.iter().copied(),
1568 m.sequence,
1569 )
1570 .mutation
1571 })
1572 .collect::<Vec<_>>();
1573 let total_rows: usize = mutations
1574 .iter()
1575 .flat_map(|m| m.rows.iter())
1576 .map(|r| r.rows.len())
1577 .sum();
1578
1579 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1580
1581 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1582 .unwrap()
1583 .unwrap();
1584 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1585 let mut batches = VecDeque::new();
1586 read_format
1587 .convert_record_batch(&batch, None, &mut batches)
1588 .unwrap();
1589 if !dedup {
1590 assert_eq!(
1591 total_rows,
1592 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1593 );
1594 }
1595 let batch_values = batches
1596 .into_iter()
1597 .map(|b| {
1598 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1599 let timestamps = b
1600 .timestamps()
1601 .as_any()
1602 .downcast_ref::<TimestampMillisecondVector>()
1603 .unwrap()
1604 .iter_data()
1605 .map(|v| v.unwrap().0.value())
1606 .collect::<Vec<_>>();
1607 let float_values = b.fields()[1]
1608 .data
1609 .as_any()
1610 .downcast_ref::<Float64Vector>()
1611 .unwrap()
1612 .iter_data()
1613 .collect::<Vec<_>>();
1614
1615 (pk_values, timestamps, float_values)
1616 })
1617 .collect::<Vec<_>>();
1618 assert_eq!(expected.len(), batch_values.len());
1619
1620 for idx in 0..expected.len() {
1621 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1622 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1623 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1624 }
1625 }
1626
1627 #[test]
1628 fn test_mutations_to_record_batch() {
1629 check_mutations_to_record_batches(
1630 &[MutationInput {
1631 k0: "a",
1632 k1: 0,
1633 timestamps: &[0],
1634 v1: &[Some(0.1)],
1635 sequence: 0,
1636 }],
1637 &[BatchOutput {
1638 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1639 timestamps: &[0],
1640 v1: &[Some(0.1)],
1641 }],
1642 (0, 0),
1643 true,
1644 );
1645
1646 check_mutations_to_record_batches(
1647 &[
1648 MutationInput {
1649 k0: "a",
1650 k1: 0,
1651 timestamps: &[0],
1652 v1: &[Some(0.1)],
1653 sequence: 0,
1654 },
1655 MutationInput {
1656 k0: "b",
1657 k1: 0,
1658 timestamps: &[0],
1659 v1: &[Some(0.0)],
1660 sequence: 0,
1661 },
1662 MutationInput {
1663 k0: "a",
1664 k1: 0,
1665 timestamps: &[1],
1666 v1: &[Some(0.2)],
1667 sequence: 1,
1668 },
1669 MutationInput {
1670 k0: "a",
1671 k1: 1,
1672 timestamps: &[1],
1673 v1: &[Some(0.3)],
1674 sequence: 2,
1675 },
1676 ],
1677 &[
1678 BatchOutput {
1679 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1680 timestamps: &[0, 1],
1681 v1: &[Some(0.1), Some(0.2)],
1682 },
1683 BatchOutput {
1684 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1685 timestamps: &[1],
1686 v1: &[Some(0.3)],
1687 },
1688 BatchOutput {
1689 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1690 timestamps: &[0],
1691 v1: &[Some(0.0)],
1692 },
1693 ],
1694 (0, 1),
1695 true,
1696 );
1697
1698 check_mutations_to_record_batches(
1699 &[
1700 MutationInput {
1701 k0: "a",
1702 k1: 0,
1703 timestamps: &[0],
1704 v1: &[Some(0.1)],
1705 sequence: 0,
1706 },
1707 MutationInput {
1708 k0: "b",
1709 k1: 0,
1710 timestamps: &[0],
1711 v1: &[Some(0.0)],
1712 sequence: 0,
1713 },
1714 MutationInput {
1715 k0: "a",
1716 k1: 0,
1717 timestamps: &[0],
1718 v1: &[Some(0.2)],
1719 sequence: 1,
1720 },
1721 ],
1722 &[
1723 BatchOutput {
1724 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1725 timestamps: &[0],
1726 v1: &[Some(0.2)],
1727 },
1728 BatchOutput {
1729 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1730 timestamps: &[0],
1731 v1: &[Some(0.0)],
1732 },
1733 ],
1734 (0, 0),
1735 true,
1736 );
1737 check_mutations_to_record_batches(
1738 &[
1739 MutationInput {
1740 k0: "a",
1741 k1: 0,
1742 timestamps: &[0],
1743 v1: &[Some(0.1)],
1744 sequence: 0,
1745 },
1746 MutationInput {
1747 k0: "b",
1748 k1: 0,
1749 timestamps: &[0],
1750 v1: &[Some(0.0)],
1751 sequence: 0,
1752 },
1753 MutationInput {
1754 k0: "a",
1755 k1: 0,
1756 timestamps: &[0],
1757 v1: &[Some(0.2)],
1758 sequence: 1,
1759 },
1760 ],
1761 &[
1762 BatchOutput {
1763 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1764 timestamps: &[0, 0],
1765 v1: &[Some(0.2), Some(0.1)],
1766 },
1767 BatchOutput {
1768 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1769 timestamps: &[0],
1770 v1: &[Some(0.0)],
1771 },
1772 ],
1773 (0, 0),
1774 false,
1775 );
1776 }
1777
1778 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1779 let metadata = metadata_for_test();
1780 let kvs = input
1781 .iter()
1782 .map(|m| {
1783 build_key_values_with_ts_seq_values(
1784 &metadata,
1785 m.k0.to_string(),
1786 m.k1,
1787 m.timestamps.iter().copied(),
1788 m.v1.iter().copied(),
1789 m.sequence,
1790 )
1791 })
1792 .collect::<Vec<_>>();
1793 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1794 let primary_key_codec = build_primary_key_codec(&metadata);
1795 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1796 for kv in kvs {
1797 converter.append_key_values(&kv).unwrap();
1798 }
1799 let part = converter.convert().unwrap();
1800 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1801 encoder.encode_part(&part).unwrap().unwrap()
1802 }
1803
1804 #[test]
1805 fn test_write_and_read_part_projection() {
1806 let part = encode(&[
1807 MutationInput {
1808 k0: "a",
1809 k1: 0,
1810 timestamps: &[1],
1811 v1: &[Some(0.1)],
1812 sequence: 0,
1813 },
1814 MutationInput {
1815 k0: "b",
1816 k1: 0,
1817 timestamps: &[1],
1818 v1: &[Some(0.0)],
1819 sequence: 0,
1820 },
1821 MutationInput {
1822 k0: "a",
1823 k1: 0,
1824 timestamps: &[2],
1825 v1: &[Some(0.2)],
1826 sequence: 1,
1827 },
1828 ]);
1829
1830 let projection = &[4u32];
1831 let mut reader = part
1832 .read(
1833 Arc::new(
1834 BulkIterContext::new(
1835 part.metadata.region_metadata.clone(),
1836 Some(projection.as_slice()),
1837 None,
1838 false,
1839 )
1840 .unwrap(),
1841 ),
1842 None,
1843 None,
1844 )
1845 .unwrap()
1846 .expect("expect at least one row group");
1847
1848 let mut total_rows_read = 0;
1849 let mut field: Vec<f64> = vec![];
1850 for res in reader {
1851 let batch = res.unwrap();
1852 assert_eq!(5, batch.num_columns());
1853 field.extend_from_slice(
1854 batch
1855 .column(0)
1856 .as_any()
1857 .downcast_ref::<Float64Array>()
1858 .unwrap()
1859 .values(),
1860 );
1861 total_rows_read += batch.num_rows();
1862 }
1863 assert_eq!(3, total_rows_read);
1864 assert_eq!(vec![0.1, 0.2, 0.0], field);
1865 }
1866
1867 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1868 let metadata = metadata_for_test();
1869 let kvs = key_values
1870 .into_iter()
1871 .map(|(k0, k1, (start, end), sequence)| {
1872 let ts = (start..end);
1873 let v1 = (start..end).map(|_| None);
1874 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1875 })
1876 .collect::<Vec<_>>();
1877 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1878 let primary_key_codec = build_primary_key_codec(&metadata);
1879 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1880 for kv in kvs {
1881 converter.append_key_values(&kv).unwrap();
1882 }
1883 let part = converter.convert().unwrap();
1884 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1885 encoder.encode_part(&part).unwrap().unwrap()
1886 }
1887
1888 fn check_prune_row_group(
1889 part: &EncodedBulkPart,
1890 predicate: Option<Predicate>,
1891 expected_rows: usize,
1892 ) {
1893 let context = Arc::new(
1894 BulkIterContext::new(
1895 part.metadata.region_metadata.clone(),
1896 None,
1897 predicate,
1898 false,
1899 )
1900 .unwrap(),
1901 );
1902 let mut reader = part
1903 .read(context, None, None)
1904 .unwrap()
1905 .expect("expect at least one row group");
1906 let mut total_rows_read = 0;
1907 for res in reader {
1908 let batch = res.unwrap();
1909 total_rows_read += batch.num_rows();
1910 }
1911 assert_eq!(expected_rows, total_rows_read);
1913 }
1914
1915 #[test]
1916 fn test_prune_row_groups() {
1917 let part = prepare(vec![
1918 ("a", 0, (0, 40), 1),
1919 ("a", 1, (0, 60), 1),
1920 ("b", 0, (0, 100), 2),
1921 ("b", 1, (100, 180), 3),
1922 ("b", 1, (180, 210), 4),
1923 ]);
1924
1925 let context = Arc::new(
1926 BulkIterContext::new(
1927 part.metadata.region_metadata.clone(),
1928 None,
1929 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1930 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1931 )])),
1932 false,
1933 )
1934 .unwrap(),
1935 );
1936 assert!(part.read(context, None, None).unwrap().is_none());
1937
1938 check_prune_row_group(&part, None, 310);
1939
1940 check_prune_row_group(
1941 &part,
1942 Some(Predicate::new(vec![
1943 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1944 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1945 ])),
1946 40,
1947 );
1948
1949 check_prune_row_group(
1950 &part,
1951 Some(Predicate::new(vec![
1952 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1953 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1954 ])),
1955 60,
1956 );
1957
1958 check_prune_row_group(
1959 &part,
1960 Some(Predicate::new(vec![
1961 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1962 ])),
1963 100,
1964 );
1965
1966 check_prune_row_group(
1967 &part,
1968 Some(Predicate::new(vec![
1969 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1970 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1971 ])),
1972 100,
1973 );
1974
1975 check_prune_row_group(
1977 &part,
1978 Some(Predicate::new(vec![
1979 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1980 ])),
1981 1,
1982 );
1983 }
1984
1985 #[test]
1986 fn test_bulk_part_converter_append_and_convert() {
1987 let metadata = metadata_for_test();
1988 let capacity = 100;
1989 let primary_key_codec = build_primary_key_codec(&metadata);
1990 let schema = to_flat_sst_arrow_schema(
1991 &metadata,
1992 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1993 );
1994
1995 let mut converter =
1996 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1997
1998 let key_values1 = build_key_values_with_ts_seq_values(
1999 &metadata,
2000 "key1".to_string(),
2001 1u32,
2002 vec![1000, 2000].into_iter(),
2003 vec![Some(1.0), Some(2.0)].into_iter(),
2004 1,
2005 );
2006
2007 let key_values2 = build_key_values_with_ts_seq_values(
2008 &metadata,
2009 "key2".to_string(),
2010 2u32,
2011 vec![1500].into_iter(),
2012 vec![Some(3.0)].into_iter(),
2013 2,
2014 );
2015
2016 converter.append_key_values(&key_values1).unwrap();
2017 converter.append_key_values(&key_values2).unwrap();
2018
2019 let bulk_part = converter.convert().unwrap();
2020
2021 assert_eq!(bulk_part.num_rows(), 3);
2022 assert_eq!(bulk_part.min_timestamp, 1000);
2023 assert_eq!(bulk_part.max_timestamp, 2000);
2024 assert_eq!(bulk_part.sequence, 2);
2025 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2026
2027 let schema = bulk_part.batch.schema();
2030 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2031 assert_eq!(
2032 field_names,
2033 vec![
2034 "k0",
2035 "k1",
2036 "v0",
2037 "v1",
2038 "ts",
2039 "__primary_key",
2040 "__sequence",
2041 "__op_type"
2042 ]
2043 );
2044 }
2045
2046 #[test]
2047 fn test_bulk_part_converter_sorting() {
2048 let metadata = metadata_for_test();
2049 let capacity = 100;
2050 let primary_key_codec = build_primary_key_codec(&metadata);
2051 let schema = to_flat_sst_arrow_schema(
2052 &metadata,
2053 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2054 );
2055
2056 let mut converter =
2057 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2058
2059 let key_values1 = build_key_values_with_ts_seq_values(
2060 &metadata,
2061 "z_key".to_string(),
2062 3u32,
2063 vec![3000].into_iter(),
2064 vec![Some(3.0)].into_iter(),
2065 3,
2066 );
2067
2068 let key_values2 = build_key_values_with_ts_seq_values(
2069 &metadata,
2070 "a_key".to_string(),
2071 1u32,
2072 vec![1000].into_iter(),
2073 vec![Some(1.0)].into_iter(),
2074 1,
2075 );
2076
2077 let key_values3 = build_key_values_with_ts_seq_values(
2078 &metadata,
2079 "m_key".to_string(),
2080 2u32,
2081 vec![2000].into_iter(),
2082 vec![Some(2.0)].into_iter(),
2083 2,
2084 );
2085
2086 converter.append_key_values(&key_values1).unwrap();
2087 converter.append_key_values(&key_values2).unwrap();
2088 converter.append_key_values(&key_values3).unwrap();
2089
2090 let bulk_part = converter.convert().unwrap();
2091
2092 assert_eq!(bulk_part.num_rows(), 3);
2093
2094 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2095 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2096
2097 let ts_array = ts_column
2098 .as_any()
2099 .downcast_ref::<TimestampMillisecondArray>()
2100 .unwrap();
2101 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2102
2103 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2104 assert_eq!(seq_array.values(), &[1, 2, 3]);
2105
2106 let schema = bulk_part.batch.schema();
2108 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2109 assert_eq!(
2110 field_names,
2111 vec![
2112 "k0",
2113 "k1",
2114 "v0",
2115 "v1",
2116 "ts",
2117 "__primary_key",
2118 "__sequence",
2119 "__op_type"
2120 ]
2121 );
2122 }
2123
2124 #[test]
2125 fn test_bulk_part_converter_empty() {
2126 let metadata = metadata_for_test();
2127 let capacity = 10;
2128 let primary_key_codec = build_primary_key_codec(&metadata);
2129 let schema = to_flat_sst_arrow_schema(
2130 &metadata,
2131 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2132 );
2133
2134 let converter =
2135 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2136
2137 let bulk_part = converter.convert().unwrap();
2138
2139 assert_eq!(bulk_part.num_rows(), 0);
2140 assert_eq!(bulk_part.min_timestamp, i64::MAX);
2141 assert_eq!(bulk_part.max_timestamp, i64::MIN);
2142 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2143
2144 let schema = bulk_part.batch.schema();
2146 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2147 assert_eq!(
2148 field_names,
2149 vec![
2150 "k0",
2151 "k1",
2152 "v0",
2153 "v1",
2154 "ts",
2155 "__primary_key",
2156 "__sequence",
2157 "__op_type"
2158 ]
2159 );
2160 }
2161
2162 #[test]
2163 fn test_bulk_part_converter_without_primary_key_columns() {
2164 let metadata = metadata_for_test();
2165 let primary_key_codec = build_primary_key_codec(&metadata);
2166 let schema = to_flat_sst_arrow_schema(
2167 &metadata,
2168 &FlatSchemaOptions {
2169 raw_pk_columns: false,
2170 string_pk_use_dict: true,
2171 },
2172 );
2173
2174 let capacity = 100;
2175 let mut converter =
2176 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2177
2178 let key_values1 = build_key_values_with_ts_seq_values(
2179 &metadata,
2180 "key1".to_string(),
2181 1u32,
2182 vec![1000, 2000].into_iter(),
2183 vec![Some(1.0), Some(2.0)].into_iter(),
2184 1,
2185 );
2186
2187 let key_values2 = build_key_values_with_ts_seq_values(
2188 &metadata,
2189 "key2".to_string(),
2190 2u32,
2191 vec![1500].into_iter(),
2192 vec![Some(3.0)].into_iter(),
2193 2,
2194 );
2195
2196 converter.append_key_values(&key_values1).unwrap();
2197 converter.append_key_values(&key_values2).unwrap();
2198
2199 let bulk_part = converter.convert().unwrap();
2200
2201 assert_eq!(bulk_part.num_rows(), 3);
2202 assert_eq!(bulk_part.min_timestamp, 1000);
2203 assert_eq!(bulk_part.max_timestamp, 2000);
2204 assert_eq!(bulk_part.sequence, 2);
2205 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2206
2207 let schema = bulk_part.batch.schema();
2209 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2210 assert_eq!(
2211 field_names,
2212 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2213 );
2214 }
2215
2216 #[allow(clippy::too_many_arguments)]
2217 fn build_key_values_with_sparse_encoding(
2218 metadata: &RegionMetadataRef,
2219 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2220 table_id: u32,
2221 tsid: u64,
2222 k0: String,
2223 k1: String,
2224 timestamps: impl Iterator<Item = i64>,
2225 values: impl Iterator<Item = Option<f64>>,
2226 sequence: SequenceNumber,
2227 ) -> KeyValues {
2228 let pk_values = vec![
2230 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2231 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2232 (0, Value::String(k0.clone().into())),
2233 (1, Value::String(k1.clone().into())),
2234 ];
2235 let mut encoded_key = Vec::new();
2236 primary_key_codec
2237 .encode_values(&pk_values, &mut encoded_key)
2238 .unwrap();
2239 assert!(!encoded_key.is_empty());
2240
2241 let column_schema = vec![
2243 api::v1::ColumnSchema {
2244 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2245 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2246 ConcreteDataType::binary_datatype(),
2247 )
2248 .unwrap()
2249 .datatype() as i32,
2250 semantic_type: api::v1::SemanticType::Tag as i32,
2251 ..Default::default()
2252 },
2253 api::v1::ColumnSchema {
2254 column_name: "ts".to_string(),
2255 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2256 ConcreteDataType::timestamp_millisecond_datatype(),
2257 )
2258 .unwrap()
2259 .datatype() as i32,
2260 semantic_type: api::v1::SemanticType::Timestamp as i32,
2261 ..Default::default()
2262 },
2263 api::v1::ColumnSchema {
2264 column_name: "v0".to_string(),
2265 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2266 ConcreteDataType::int64_datatype(),
2267 )
2268 .unwrap()
2269 .datatype() as i32,
2270 semantic_type: api::v1::SemanticType::Field as i32,
2271 ..Default::default()
2272 },
2273 api::v1::ColumnSchema {
2274 column_name: "v1".to_string(),
2275 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2276 ConcreteDataType::float64_datatype(),
2277 )
2278 .unwrap()
2279 .datatype() as i32,
2280 semantic_type: api::v1::SemanticType::Field as i32,
2281 ..Default::default()
2282 },
2283 ];
2284
2285 let rows = timestamps
2286 .zip(values)
2287 .map(|(ts, v)| Row {
2288 values: vec![
2289 api::v1::Value {
2290 value_data: Some(api::v1::value::ValueData::BinaryValue(
2291 encoded_key.clone(),
2292 )),
2293 },
2294 api::v1::Value {
2295 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2296 },
2297 api::v1::Value {
2298 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2299 },
2300 api::v1::Value {
2301 value_data: v.map(api::v1::value::ValueData::F64Value),
2302 },
2303 ],
2304 })
2305 .collect();
2306
2307 let mutation = api::v1::Mutation {
2308 op_type: 1,
2309 sequence,
2310 rows: Some(api::v1::Rows {
2311 schema: column_schema,
2312 rows,
2313 }),
2314 write_hint: Some(WriteHint {
2315 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2316 }),
2317 };
2318 KeyValues::new(metadata.as_ref(), mutation).unwrap()
2319 }
2320
2321 #[test]
2322 fn test_bulk_part_converter_sparse_primary_key_encoding() {
2323 use api::v1::SemanticType;
2324 use datatypes::schema::ColumnSchema;
2325 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2326 use store_api::storage::RegionId;
2327
2328 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2329 builder
2330 .push_column_metadata(ColumnMetadata {
2331 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2332 semantic_type: SemanticType::Tag,
2333 column_id: 0,
2334 })
2335 .push_column_metadata(ColumnMetadata {
2336 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2337 semantic_type: SemanticType::Tag,
2338 column_id: 1,
2339 })
2340 .push_column_metadata(ColumnMetadata {
2341 column_schema: ColumnSchema::new(
2342 "ts",
2343 ConcreteDataType::timestamp_millisecond_datatype(),
2344 false,
2345 ),
2346 semantic_type: SemanticType::Timestamp,
2347 column_id: 2,
2348 })
2349 .push_column_metadata(ColumnMetadata {
2350 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2351 semantic_type: SemanticType::Field,
2352 column_id: 3,
2353 })
2354 .push_column_metadata(ColumnMetadata {
2355 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2356 semantic_type: SemanticType::Field,
2357 column_id: 4,
2358 })
2359 .primary_key(vec![0, 1])
2360 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2361 let metadata = Arc::new(builder.build().unwrap());
2362
2363 let primary_key_codec = build_primary_key_codec(&metadata);
2364 let schema = to_flat_sst_arrow_schema(
2365 &metadata,
2366 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2367 );
2368
2369 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2370 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2371
2372 let capacity = 100;
2373 let mut converter =
2374 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2375
2376 let key_values1 = build_key_values_with_sparse_encoding(
2377 &metadata,
2378 &primary_key_codec,
2379 2048u32, 100u64, "key11".to_string(),
2382 "key21".to_string(),
2383 vec![1000, 2000].into_iter(),
2384 vec![Some(1.0), Some(2.0)].into_iter(),
2385 1,
2386 );
2387
2388 let key_values2 = build_key_values_with_sparse_encoding(
2389 &metadata,
2390 &primary_key_codec,
2391 4096u32, 200u64, "key12".to_string(),
2394 "key22".to_string(),
2395 vec![1500].into_iter(),
2396 vec![Some(3.0)].into_iter(),
2397 2,
2398 );
2399
2400 converter.append_key_values(&key_values1).unwrap();
2401 converter.append_key_values(&key_values2).unwrap();
2402
2403 let bulk_part = converter.convert().unwrap();
2404
2405 assert_eq!(bulk_part.num_rows(), 3);
2406 assert_eq!(bulk_part.min_timestamp, 1000);
2407 assert_eq!(bulk_part.max_timestamp, 2000);
2408 assert_eq!(bulk_part.sequence, 2);
2409 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2410
2411 let schema = bulk_part.batch.schema();
2415 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2416 assert_eq!(
2417 field_names,
2418 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2419 );
2420
2421 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2423 let dict_array = primary_key_column
2424 .as_any()
2425 .downcast_ref::<DictionaryArray<UInt32Type>>()
2426 .unwrap();
2427
2428 assert!(!dict_array.is_empty());
2430 assert_eq!(dict_array.len(), 3); let values = dict_array
2434 .values()
2435 .as_any()
2436 .downcast_ref::<BinaryArray>()
2437 .unwrap();
2438 for i in 0..values.len() {
2439 assert!(
2440 !values.value(i).is_empty(),
2441 "Encoded primary key should not be empty"
2442 );
2443 }
2444 }
2445
2446 #[test]
2447 fn test_convert_bulk_part_empty() {
2448 let metadata = metadata_for_test();
2449 let schema = to_flat_sst_arrow_schema(
2450 &metadata,
2451 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2452 );
2453 let primary_key_codec = build_primary_key_codec(&metadata);
2454
2455 let empty_batch = RecordBatch::new_empty(schema.clone());
2457 let empty_part = BulkPart {
2458 batch: empty_batch,
2459 max_timestamp: 0,
2460 min_timestamp: 0,
2461 sequence: 0,
2462 timestamp_index: 0,
2463 raw_data: None,
2464 };
2465
2466 let result =
2467 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2468 assert!(result.is_none());
2469 }
2470
2471 #[test]
2472 fn test_convert_bulk_part_dense_with_pk_columns() {
2473 let metadata = metadata_for_test();
2474 let primary_key_codec = build_primary_key_codec(&metadata);
2475
2476 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2477 "key1", "key2", "key1",
2478 ]));
2479 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2480 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2481 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2482 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2483
2484 let input_schema = Arc::new(Schema::new(vec![
2485 Field::new("k0", ArrowDataType::Utf8, false),
2486 Field::new("k1", ArrowDataType::UInt32, false),
2487 Field::new("v0", ArrowDataType::Int64, true),
2488 Field::new("v1", ArrowDataType::Float64, true),
2489 Field::new(
2490 "ts",
2491 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2492 false,
2493 ),
2494 ]));
2495
2496 let input_batch = RecordBatch::try_new(
2497 input_schema,
2498 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2499 )
2500 .unwrap();
2501
2502 let part = BulkPart {
2503 batch: input_batch,
2504 max_timestamp: 2000,
2505 min_timestamp: 1000,
2506 sequence: 5,
2507 timestamp_index: 4,
2508 raw_data: None,
2509 };
2510
2511 let output_schema = to_flat_sst_arrow_schema(
2512 &metadata,
2513 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2514 );
2515
2516 let result = convert_bulk_part(
2517 part,
2518 &metadata,
2519 primary_key_codec,
2520 output_schema,
2521 true, )
2523 .unwrap();
2524
2525 let converted = result.unwrap();
2526
2527 assert_eq!(converted.num_rows(), 3);
2528 assert_eq!(converted.max_timestamp, 2000);
2529 assert_eq!(converted.min_timestamp, 1000);
2530 assert_eq!(converted.sequence, 5);
2531
2532 let schema = converted.batch.schema();
2533 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2534 assert_eq!(
2535 field_names,
2536 vec![
2537 "k0",
2538 "k1",
2539 "v0",
2540 "v1",
2541 "ts",
2542 "__primary_key",
2543 "__sequence",
2544 "__op_type"
2545 ]
2546 );
2547
2548 let k0_col = converted.batch.column_by_name("k0").unwrap();
2549 assert!(matches!(
2550 k0_col.data_type(),
2551 ArrowDataType::Dictionary(_, _)
2552 ));
2553
2554 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2555 let dict_array = pk_col
2556 .as_any()
2557 .downcast_ref::<DictionaryArray<UInt32Type>>()
2558 .unwrap();
2559 let keys = dict_array.keys();
2560
2561 assert_eq!(keys.len(), 3);
2562 }
2563
2564 #[test]
2565 fn test_convert_bulk_part_dense_without_pk_columns() {
2566 let metadata = metadata_for_test();
2567 let primary_key_codec = build_primary_key_codec(&metadata);
2568
2569 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2571 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2572 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2573 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2574 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2575
2576 let input_schema = Arc::new(Schema::new(vec![
2577 Field::new("k0", ArrowDataType::Utf8, false),
2578 Field::new("k1", ArrowDataType::UInt32, false),
2579 Field::new("v0", ArrowDataType::Int64, true),
2580 Field::new("v1", ArrowDataType::Float64, true),
2581 Field::new(
2582 "ts",
2583 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2584 false,
2585 ),
2586 ]));
2587
2588 let input_batch = RecordBatch::try_new(
2589 input_schema,
2590 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2591 )
2592 .unwrap();
2593
2594 let part = BulkPart {
2595 batch: input_batch,
2596 max_timestamp: 2000,
2597 min_timestamp: 1000,
2598 sequence: 3,
2599 timestamp_index: 4,
2600 raw_data: None,
2601 };
2602
2603 let output_schema = to_flat_sst_arrow_schema(
2604 &metadata,
2605 &FlatSchemaOptions {
2606 raw_pk_columns: false,
2607 string_pk_use_dict: true,
2608 },
2609 );
2610
2611 let result = convert_bulk_part(
2612 part,
2613 &metadata,
2614 primary_key_codec,
2615 output_schema,
2616 false, )
2618 .unwrap();
2619
2620 let converted = result.unwrap();
2621
2622 assert_eq!(converted.num_rows(), 2);
2623 assert_eq!(converted.max_timestamp, 2000);
2624 assert_eq!(converted.min_timestamp, 1000);
2625 assert_eq!(converted.sequence, 3);
2626
2627 let schema = converted.batch.schema();
2629 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2630 assert_eq!(
2631 field_names,
2632 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2633 );
2634
2635 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2637 assert!(matches!(
2638 pk_col.data_type(),
2639 ArrowDataType::Dictionary(_, _)
2640 ));
2641 }
2642
2643 #[test]
2644 fn test_convert_bulk_part_sparse_encoding() {
2645 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2646 builder
2647 .push_column_metadata(ColumnMetadata {
2648 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2649 semantic_type: SemanticType::Tag,
2650 column_id: 0,
2651 })
2652 .push_column_metadata(ColumnMetadata {
2653 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2654 semantic_type: SemanticType::Tag,
2655 column_id: 1,
2656 })
2657 .push_column_metadata(ColumnMetadata {
2658 column_schema: ColumnSchema::new(
2659 "ts",
2660 ConcreteDataType::timestamp_millisecond_datatype(),
2661 false,
2662 ),
2663 semantic_type: SemanticType::Timestamp,
2664 column_id: 2,
2665 })
2666 .push_column_metadata(ColumnMetadata {
2667 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2668 semantic_type: SemanticType::Field,
2669 column_id: 3,
2670 })
2671 .push_column_metadata(ColumnMetadata {
2672 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2673 semantic_type: SemanticType::Field,
2674 column_id: 4,
2675 })
2676 .primary_key(vec![0, 1])
2677 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2678 let metadata = Arc::new(builder.build().unwrap());
2679
2680 let primary_key_codec = build_primary_key_codec(&metadata);
2681
2682 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2684 b"encoded_key_1".as_slice(),
2685 b"encoded_key_2".as_slice(),
2686 ]));
2687 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2688 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2689 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2690
2691 let input_schema = Arc::new(Schema::new(vec![
2692 Field::new("__primary_key", ArrowDataType::Binary, false),
2693 Field::new("v0", ArrowDataType::Int64, true),
2694 Field::new("v1", ArrowDataType::Float64, true),
2695 Field::new(
2696 "ts",
2697 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2698 false,
2699 ),
2700 ]));
2701
2702 let input_batch =
2703 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2704 .unwrap();
2705
2706 let part = BulkPart {
2707 batch: input_batch,
2708 max_timestamp: 2000,
2709 min_timestamp: 1000,
2710 sequence: 7,
2711 timestamp_index: 3,
2712 raw_data: None,
2713 };
2714
2715 let output_schema = to_flat_sst_arrow_schema(
2716 &metadata,
2717 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2718 );
2719
2720 let result = convert_bulk_part(
2721 part,
2722 &metadata,
2723 primary_key_codec,
2724 output_schema,
2725 true, )
2727 .unwrap();
2728
2729 let converted = result.unwrap();
2730
2731 assert_eq!(converted.num_rows(), 2);
2732 assert_eq!(converted.max_timestamp, 2000);
2733 assert_eq!(converted.min_timestamp, 1000);
2734 assert_eq!(converted.sequence, 7);
2735
2736 let schema = converted.batch.schema();
2738 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2739 assert_eq!(
2740 field_names,
2741 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2742 );
2743
2744 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2746 assert!(matches!(
2747 pk_col.data_type(),
2748 ArrowDataType::Dictionary(_, _)
2749 ));
2750 }
2751
2752 #[test]
2753 fn test_convert_bulk_part_sorting_with_multiple_series() {
2754 let metadata = metadata_for_test();
2755 let primary_key_codec = build_primary_key_codec(&metadata);
2756
2757 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2759 "series_b", "series_a", "series_b", "series_a",
2760 ]));
2761 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2762 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2763 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2764 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2765 2000, 1000, 4000, 3000,
2766 ]));
2767
2768 let input_schema = Arc::new(Schema::new(vec![
2769 Field::new("k0", ArrowDataType::Utf8, false),
2770 Field::new("k1", ArrowDataType::UInt32, false),
2771 Field::new("v0", ArrowDataType::Int64, true),
2772 Field::new("v1", ArrowDataType::Float64, true),
2773 Field::new(
2774 "ts",
2775 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2776 false,
2777 ),
2778 ]));
2779
2780 let input_batch = RecordBatch::try_new(
2781 input_schema,
2782 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2783 )
2784 .unwrap();
2785
2786 let part = BulkPart {
2787 batch: input_batch,
2788 max_timestamp: 4000,
2789 min_timestamp: 1000,
2790 sequence: 10,
2791 timestamp_index: 4,
2792 raw_data: None,
2793 };
2794
2795 let output_schema = to_flat_sst_arrow_schema(
2796 &metadata,
2797 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2798 );
2799
2800 let result =
2801 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2802
2803 let converted = result.unwrap();
2804
2805 assert_eq!(converted.num_rows(), 4);
2806
2807 let ts_col = converted.batch.column(converted.timestamp_index);
2809 let ts_array = ts_col
2810 .as_any()
2811 .downcast_ref::<TimestampMillisecondArray>()
2812 .unwrap();
2813
2814 let timestamps: Vec<i64> = ts_array.values().to_vec();
2818 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2819 }
2820}