1use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use smallvec::SmallVec;
55use snafu::{OptionExt, ResultExt, Snafu};
56use store_api::codec::PrimaryKeyEncoding;
57use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
58use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
59use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
60use table::predicate::Predicate;
61
62use crate::error::{
63 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
64 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
65 InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
66};
67use crate::memtable::bulk::context::BulkIterContextRef;
68use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
69use crate::memtable::time_series::{ValueBuilder, Values};
70use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
71use crate::sst::index::IndexOutput;
72use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
73use crate::sst::parquet::flat_format::primary_key_column_index;
74use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80#[derive(Clone)]
82pub struct BulkPart {
83 pub batch: RecordBatch,
84 pub max_timestamp: i64,
85 pub min_timestamp: i64,
86 pub sequence: u64,
87 pub timestamp_index: usize,
88 pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92 type Error = error::Error;
93
94 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95 match value.body.expect("Entry payload should be present") {
96 Body::ArrowIpc(ipc) => {
97 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 let batch = decoder
100 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101 .context(error::ConvertBulkWalEntrySnafu)?;
102 Ok(Self {
103 batch,
104 max_timestamp: value.max_ts,
105 min_timestamp: value.min_ts,
106 sequence: value.sequence,
107 timestamp_index: value.timestamp_index as usize,
108 raw_data: Some(ipc),
109 })
110 }
111 }
112 }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116 type Error = error::Error;
117
118 fn try_from(value: &BulkPart) -> Result<Self> {
119 if let Some(ipc) = &value.raw_data {
120 Ok(BulkWalEntry {
121 sequence: value.sequence,
122 max_ts: value.max_timestamp,
123 min_ts: value.min_timestamp,
124 timestamp_index: value.timestamp_index as u32,
125 body: Some(Body::ArrowIpc(ipc.clone())),
126 })
127 } else {
128 let mut encoder = FlightEncoder::default();
129 let schema_bytes = encoder
130 .encode_schema(value.batch.schema().as_ref())
131 .data_header;
132 let [rb_data] = encoder
133 .encode(FlightMessage::RecordBatch(value.batch.clone()))
134 .try_into()
135 .map_err(|_| {
136 error::UnsupportedOperationSnafu {
137 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138 }
139 .build()
140 })?;
141 Ok(BulkWalEntry {
142 sequence: value.sequence,
143 max_ts: value.max_timestamp,
144 min_ts: value.min_timestamp,
145 timestamp_index: value.timestamp_index as u32,
146 body: Some(Body::ArrowIpc(ArrowIpc {
147 schema: schema_bytes,
148 data_header: rb_data.data_header,
149 payload: rb_data.data_body,
150 })),
151 })
152 }
153 }
154}
155
156impl BulkPart {
157 pub(crate) fn estimated_size(&self) -> usize {
158 record_batch_estimated_size(&self.batch)
159 }
160
161 pub fn estimated_series_count(&self) -> usize {
164 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165 let pk_column = self.batch.column(pk_column_idx);
166 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167 dict_array.values().len()
168 } else {
169 0
170 }
171 }
172
173 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
175 let ts_type = region_metadata.time_index_type();
176 let min_ts = ts_type.create_timestamp(self.min_timestamp);
177 let max_ts = ts_type.create_timestamp(self.max_timestamp);
178
179 MemtableStats {
180 estimated_bytes: self.estimated_size(),
181 time_range: Some((min_ts, max_ts)),
182 num_rows: self.num_rows(),
183 num_ranges: 1,
184 max_sequence: self.sequence,
185 series_count: self.estimated_series_count(),
186 }
187 }
188
189 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
199 let batch_schema = self.batch.schema();
201 let batch_columns: HashSet<_> = batch_schema
202 .fields()
203 .iter()
204 .map(|f| f.name().as_str())
205 .collect();
206
207 let mut columns_to_fill = Vec::new();
209 for column_meta in ®ion_metadata.column_metadatas {
210 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
213 columns_to_fill.push(column_meta);
214 }
215 }
216
217 if columns_to_fill.is_empty() {
218 return Ok(());
219 }
220
221 let num_rows = self.batch.num_rows();
222
223 let mut new_columns = Vec::new();
224 let mut new_fields = Vec::new();
225
226 new_fields.extend(batch_schema.fields().iter().cloned());
228 new_columns.extend_from_slice(self.batch.columns());
229
230 let region_id = region_metadata.region_id;
231 for column_meta in columns_to_fill {
233 let default_vector = column_meta
234 .column_schema
235 .create_default_vector(num_rows)
236 .context(CreateDefaultSnafu {
237 region_id,
238 column: &column_meta.column_schema.name,
239 })?
240 .with_context(|| InvalidRequestSnafu {
241 region_id,
242 reason: format!(
243 "column {} does not have default value",
244 column_meta.column_schema.name
245 ),
246 })?;
247 let arrow_array = default_vector.to_arrow_array();
248 column_meta.column_schema.data_type.as_arrow_type();
249
250 new_fields.push(Arc::new(Field::new(
251 column_meta.column_schema.name.clone(),
252 column_meta.column_schema.data_type.as_arrow_type(),
253 column_meta.column_schema.is_nullable(),
254 )));
255 new_columns.push(arrow_array);
256 }
257
258 let new_schema = Arc::new(Schema::new(new_fields));
260 let new_batch =
261 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
262
263 self.batch = new_batch;
265
266 Ok(())
267 }
268
269 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
271 let vectors = region_metadata
272 .schema
273 .column_schemas()
274 .iter()
275 .map(|col| match self.batch.column_by_name(&col.name) {
276 None => Ok(None),
277 Some(col) => Helper::try_into_vector(col).map(Some),
278 })
279 .collect::<datatypes::error::Result<Vec<_>>>()
280 .context(error::ComputeVectorSnafu)?;
281
282 let rows = (0..self.num_rows())
283 .map(|row_idx| {
284 let values = (0..self.batch.num_columns())
285 .map(|col_idx| {
286 if let Some(v) = &vectors[col_idx] {
287 to_grpc_value(v.get(row_idx))
288 } else {
289 api::v1::Value { value_data: None }
290 }
291 })
292 .collect::<Vec<_>>();
293 api::v1::Row { values }
294 })
295 .collect::<Vec<_>>();
296
297 let schema = region_metadata
298 .column_metadatas
299 .iter()
300 .map(|c| {
301 let data_type_wrapper =
302 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
303 Ok(api::v1::ColumnSchema {
304 column_name: c.column_schema.name.clone(),
305 datatype: data_type_wrapper.datatype() as i32,
306 semantic_type: c.semantic_type as i32,
307 ..Default::default()
308 })
309 })
310 .collect::<api::error::Result<Vec<_>>>()
311 .context(error::ConvertColumnDataTypeSnafu {
312 reason: "failed to convert region metadata to column schema",
313 })?;
314
315 let rows = api::v1::Rows { schema, rows };
316
317 Ok(Mutation {
318 op_type: OpType::Put as i32,
319 sequence: self.sequence,
320 rows: Some(rows),
321 write_hint: None,
322 })
323 }
324
325 pub fn timestamps(&self) -> &ArrayRef {
326 self.batch.column(self.timestamp_index)
327 }
328
329 pub fn num_rows(&self) -> usize {
330 self.batch.num_rows()
331 }
332}
333
334pub struct UnorderedPart {
337 parts: Vec<BulkPart>,
339 total_rows: usize,
341 min_timestamp: i64,
343 max_timestamp: i64,
345 max_sequence: u64,
347 threshold: usize,
349 compact_threshold: usize,
351}
352
353impl Default for UnorderedPart {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359impl UnorderedPart {
360 pub fn new() -> Self {
362 Self {
363 parts: Vec::new(),
364 total_rows: 0,
365 min_timestamp: i64::MAX,
366 max_timestamp: i64::MIN,
367 max_sequence: 0,
368 threshold: 1024,
369 compact_threshold: 4096,
370 }
371 }
372
373 pub fn set_threshold(&mut self, threshold: usize) {
375 self.threshold = threshold;
376 }
377
378 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
380 self.compact_threshold = compact_threshold;
381 }
382
383 pub fn threshold(&self) -> usize {
385 self.threshold
386 }
387
388 pub fn compact_threshold(&self) -> usize {
390 self.compact_threshold
391 }
392
393 pub fn should_accept(&self, num_rows: usize) -> bool {
395 num_rows < self.threshold
396 }
397
398 pub fn should_compact(&self) -> bool {
400 self.total_rows >= self.compact_threshold
401 }
402
403 pub fn push(&mut self, part: BulkPart) {
405 self.total_rows += part.num_rows();
406 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
407 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
408 self.max_sequence = self.max_sequence.max(part.sequence);
409 self.parts.push(part);
410 }
411
412 pub fn num_rows(&self) -> usize {
414 self.total_rows
415 }
416
417 pub fn is_empty(&self) -> bool {
419 self.parts.is_empty()
420 }
421
422 pub fn num_parts(&self) -> usize {
424 self.parts.len()
425 }
426
427 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
430 if self.parts.is_empty() {
431 return Ok(None);
432 }
433
434 if self.parts.len() == 1 {
435 return Ok(Some(self.parts[0].batch.clone()));
437 }
438
439 let schema = self.parts[0].batch.schema();
441
442 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
444 let concatenated =
445 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
446
447 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
449
450 Ok(Some(sorted_batch))
451 }
452
453 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
456 let Some(sorted_batch) = self.concat_and_sort()? else {
457 return Ok(None);
458 };
459
460 let timestamp_index = self.parts[0].timestamp_index;
461
462 Ok(Some(BulkPart {
463 batch: sorted_batch,
464 max_timestamp: self.max_timestamp,
465 min_timestamp: self.min_timestamp,
466 sequence: self.max_sequence,
467 timestamp_index,
468 raw_data: None,
469 }))
470 }
471
472 pub fn clear(&mut self) {
474 self.parts.clear();
475 self.total_rows = 0;
476 self.min_timestamp = i64::MAX;
477 self.max_timestamp = i64::MIN;
478 self.max_sequence = 0;
479 }
480}
481
482pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
484 batch
485 .columns()
486 .iter()
487 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
489 .sum()
490}
491
492enum PrimaryKeyColumnBuilder {
494 StringDict(StringDictionaryBuilder<UInt32Type>),
496 Vector(Box<dyn MutableVector>),
498}
499
500impl PrimaryKeyColumnBuilder {
501 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
503 match self {
504 PrimaryKeyColumnBuilder::StringDict(builder) => {
505 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
506 builder.append_value(s);
508 } else {
509 builder.append_null();
510 }
511 }
512 PrimaryKeyColumnBuilder::Vector(builder) => {
513 builder.push_value_ref(&value);
514 }
515 }
516 Ok(())
517 }
518
519 fn into_arrow_array(self) -> ArrayRef {
521 match self {
522 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
523 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
524 }
525 }
526}
527
528pub struct BulkPartConverter {
530 region_metadata: RegionMetadataRef,
532 schema: SchemaRef,
534 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
536 key_buf: Vec<u8>,
538 key_array_builder: PrimaryKeyArrayBuilder,
540 value_builder: ValueBuilder,
542 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
545
546 max_ts: i64,
548 min_ts: i64,
550 max_sequence: SequenceNumber,
552}
553
554impl BulkPartConverter {
555 pub fn new(
560 region_metadata: &RegionMetadataRef,
561 schema: SchemaRef,
562 capacity: usize,
563 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
564 store_primary_key_columns: bool,
565 ) -> Self {
566 debug_assert_eq!(
567 region_metadata.primary_key_encoding,
568 primary_key_codec.encoding()
569 );
570
571 let primary_key_column_builders = if store_primary_key_columns
572 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
573 {
574 new_primary_key_column_builders(region_metadata, capacity)
575 } else {
576 Vec::new()
577 };
578
579 Self {
580 region_metadata: region_metadata.clone(),
581 schema,
582 primary_key_codec,
583 key_buf: Vec::new(),
584 key_array_builder: PrimaryKeyArrayBuilder::new(),
585 value_builder: ValueBuilder::new(region_metadata, capacity),
586 primary_key_column_builders,
587 min_ts: i64::MAX,
588 max_ts: i64::MIN,
589 max_sequence: SequenceNumber::MIN,
590 }
591 }
592
593 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595 for kv in key_values.iter() {
596 self.append_key_value(&kv)?;
597 }
598
599 Ok(())
600 }
601
602 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608 let mut primary_keys = kv.primary_keys();
611 if let Some(encoded) = primary_keys
612 .next()
613 .context(ColumnNotFoundSnafu {
614 column: PRIMARY_KEY_COLUMN_NAME,
615 })?
616 .try_into_binary()
617 .context(DataTypeMismatchSnafu)?
618 {
619 self.key_array_builder
620 .append(encoded)
621 .context(ComputeArrowSnafu)?;
622 } else {
623 self.key_array_builder
624 .append("")
625 .context(ComputeArrowSnafu)?;
626 }
627 } else {
628 self.key_buf.clear();
630 self.primary_key_codec
631 .encode_key_value(kv, &mut self.key_buf)
632 .context(EncodeSnafu)?;
633 self.key_array_builder
634 .append(&self.key_buf)
635 .context(ComputeArrowSnafu)?;
636 };
637
638 if !self.primary_key_column_builders.is_empty() {
640 for (builder, pk_value) in self
641 .primary_key_column_builders
642 .iter_mut()
643 .zip(kv.primary_keys())
644 {
645 builder.push_value_ref(pk_value)?;
646 }
647 }
648
649 self.value_builder.push(
651 kv.timestamp(),
652 kv.sequence(),
653 kv.op_type() as u8,
654 kv.fields(),
655 );
656
657 let ts = kv
660 .timestamp()
661 .try_into_timestamp()
662 .unwrap()
663 .unwrap()
664 .value();
665 self.min_ts = self.min_ts.min(ts);
666 self.max_ts = self.max_ts.max(ts);
667 self.max_sequence = self.max_sequence.max(kv.sequence());
668
669 Ok(())
670 }
671
672 pub fn convert(mut self) -> Result<BulkPart> {
676 let values = Values::from(self.value_builder);
677 let mut columns =
678 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680 for builder in self.primary_key_column_builders {
682 columns.push(builder.into_arrow_array());
683 }
684 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686 let timestamp_index = columns.len();
688 columns.push(values.timestamp.to_arrow_array());
689 let pk_array = self.key_array_builder.finish();
691 columns.push(Arc::new(pk_array));
692 columns.push(values.sequence.to_arrow_array());
694 columns.push(values.op_type.to_arrow_array());
695
696 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
697 let batch = sort_primary_key_record_batch(&batch)?;
699
700 Ok(BulkPart {
701 batch,
702 max_timestamp: self.max_ts,
703 min_timestamp: self.min_ts,
704 sequence: self.max_sequence,
705 timestamp_index,
706 raw_data: None,
707 })
708 }
709}
710
711fn new_primary_key_column_builders(
712 metadata: &RegionMetadata,
713 capacity: usize,
714) -> Vec<PrimaryKeyColumnBuilder> {
715 metadata
716 .primary_key_columns()
717 .map(|col| {
718 if col.column_schema.data_type.is_string() {
719 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
720 capacity,
721 INIT_DICT_VALUE_CAPACITY,
722 capacity,
723 ))
724 } else {
725 PrimaryKeyColumnBuilder::Vector(
726 col.column_schema.data_type.create_mutable_vector(capacity),
727 )
728 }
729 })
730 .collect()
731}
732
733pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
735 let total_columns = batch.num_columns();
736 let sort_columns = vec![
737 SortColumn {
739 values: batch.column(total_columns - 3).clone(),
740 options: Some(SortOptions {
741 descending: false,
742 nulls_first: true,
743 }),
744 },
745 SortColumn {
747 values: batch.column(total_columns - 4).clone(),
748 options: Some(SortOptions {
749 descending: false,
750 nulls_first: true,
751 }),
752 },
753 SortColumn {
755 values: batch.column(total_columns - 2).clone(),
756 options: Some(SortOptions {
757 descending: true,
758 nulls_first: true,
759 }),
760 },
761 ];
762
763 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
764 .context(ComputeArrowSnafu)?;
765
766 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
767}
768
769pub fn convert_bulk_part(
795 part: BulkPart,
796 region_metadata: &RegionMetadataRef,
797 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
798 schema: SchemaRef,
799 store_primary_key_columns: bool,
800) -> Result<Option<BulkPart>> {
801 if part.num_rows() == 0 {
802 return Ok(None);
803 }
804
805 let num_rows = part.num_rows();
806 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
807
808 let input_schema = part.batch.schema();
810 let column_indices: HashMap<&str, usize> = input_schema
811 .fields()
812 .iter()
813 .enumerate()
814 .map(|(idx, field)| (field.name().as_str(), idx))
815 .collect();
816
817 let mut output_columns = Vec::new();
819
820 let pk_array = if is_sparse {
822 None
825 } else {
826 let pk_vectors: Result<Vec<_>> = region_metadata
828 .primary_key_columns()
829 .map(|col_meta| {
830 let col_idx = column_indices
831 .get(col_meta.column_schema.name.as_str())
832 .context(ColumnNotFoundSnafu {
833 column: &col_meta.column_schema.name,
834 })?;
835 let col = part.batch.column(*col_idx);
836 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
837 })
838 .collect();
839 let pk_vectors = pk_vectors?;
840
841 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
842 let mut encode_buf = Vec::new();
843
844 for row_idx in 0..num_rows {
845 encode_buf.clear();
846
847 let pk_values_with_ids: Vec<_> = region_metadata
849 .primary_key
850 .iter()
851 .zip(pk_vectors.iter())
852 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
853 .collect();
854
855 primary_key_codec
857 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
858 .context(EncodeSnafu)?;
859
860 key_array_builder
861 .append(&encode_buf)
862 .context(ComputeArrowSnafu)?;
863 }
864
865 Some(key_array_builder.finish())
866 };
867
868 if store_primary_key_columns && !is_sparse {
870 for col_meta in region_metadata.primary_key_columns() {
871 let col_idx = column_indices
872 .get(col_meta.column_schema.name.as_str())
873 .context(ColumnNotFoundSnafu {
874 column: &col_meta.column_schema.name,
875 })?;
876 let col = part.batch.column(*col_idx);
877
878 let col = if col_meta.column_schema.data_type.is_string() {
880 let target_type = ArrowDataType::Dictionary(
881 Box::new(ArrowDataType::UInt32),
882 Box::new(ArrowDataType::Utf8),
883 );
884 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
885 } else {
886 col.clone()
887 };
888 output_columns.push(col);
889 }
890 }
891
892 for col_meta in region_metadata.field_columns() {
894 let col_idx = column_indices
895 .get(col_meta.column_schema.name.as_str())
896 .context(ColumnNotFoundSnafu {
897 column: &col_meta.column_schema.name,
898 })?;
899 output_columns.push(part.batch.column(*col_idx).clone());
900 }
901
902 let new_timestamp_index = output_columns.len();
904 let ts_col_idx = column_indices
905 .get(
906 region_metadata
907 .time_index_column()
908 .column_schema
909 .name
910 .as_str(),
911 )
912 .context(ColumnNotFoundSnafu {
913 column: ®ion_metadata.time_index_column().column_schema.name,
914 })?;
915 output_columns.push(part.batch.column(*ts_col_idx).clone());
916
917 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
919 Arc::new(pk_dict_array) as ArrayRef
920 } else {
921 let pk_col_idx =
922 column_indices
923 .get(PRIMARY_KEY_COLUMN_NAME)
924 .context(ColumnNotFoundSnafu {
925 column: PRIMARY_KEY_COLUMN_NAME,
926 })?;
927 let col = part.batch.column(*pk_col_idx);
928
929 let target_type = ArrowDataType::Dictionary(
931 Box::new(ArrowDataType::UInt32),
932 Box::new(ArrowDataType::Binary),
933 );
934 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
935 };
936 output_columns.push(pk_dictionary);
937
938 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
939 output_columns.push(Arc::new(sequence_array) as ArrayRef);
940
941 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
942 output_columns.push(Arc::new(op_type_array) as ArrayRef);
943
944 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
945
946 let sorted_batch = sort_primary_key_record_batch(&batch)?;
948
949 Ok(Some(BulkPart {
950 batch: sorted_batch,
951 max_timestamp: part.max_timestamp,
952 min_timestamp: part.min_timestamp,
953 sequence: part.sequence,
954 timestamp_index: new_timestamp_index,
955 raw_data: None,
956 }))
957}
958
959#[derive(Debug, Clone)]
960pub struct EncodedBulkPart {
961 data: Bytes,
962 metadata: BulkPartMeta,
963}
964
965impl EncodedBulkPart {
966 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
967 Self { data, metadata }
968 }
969
970 pub(crate) fn metadata(&self) -> &BulkPartMeta {
971 &self.metadata
972 }
973
974 pub(crate) fn size_bytes(&self) -> usize {
976 self.data.len()
977 }
978
979 pub(crate) fn data(&self) -> &Bytes {
981 &self.data
982 }
983
984 pub fn to_memtable_stats(&self) -> MemtableStats {
986 let meta = &self.metadata;
987 let ts_type = meta.region_metadata.time_index_type();
988 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
989 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
990
991 MemtableStats {
992 estimated_bytes: self.size_bytes(),
993 time_range: Some((min_ts, max_ts)),
994 num_rows: meta.num_rows,
995 num_ranges: 1,
996 max_sequence: meta.max_sequence,
997 series_count: meta.num_series as usize,
998 }
999 }
1000
1001 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1009 let unit = self.metadata.region_metadata.time_index_type().unit();
1010 let max_row_group_uncompressed_size: u64 = self
1011 .metadata
1012 .parquet_metadata
1013 .row_groups()
1014 .iter()
1015 .map(|rg| {
1016 rg.columns()
1017 .iter()
1018 .map(|c| c.uncompressed_size() as u64)
1019 .sum::<u64>()
1020 })
1021 .max()
1022 .unwrap_or(0);
1023 SstInfo {
1024 file_id,
1025 time_range: (
1026 Timestamp::new(self.metadata.min_timestamp, unit),
1027 Timestamp::new(self.metadata.max_timestamp, unit),
1028 ),
1029 file_size: self.data.len() as u64,
1030 max_row_group_uncompressed_size,
1031 num_rows: self.metadata.num_rows,
1032 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1033 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1034 index_metadata: IndexOutput::default(),
1035 num_series: self.metadata.num_series,
1036 }
1037 }
1038
1039 pub(crate) fn read(
1040 &self,
1041 context: BulkIterContextRef,
1042 sequence: Option<SequenceRange>,
1043 mem_scan_metrics: Option<MemScanMetrics>,
1044 ) -> Result<Option<BoxedRecordBatchIterator>> {
1045 let skip_fields_for_pruning =
1047 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1048
1049 let row_groups_to_read =
1051 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1052
1053 if row_groups_to_read.is_empty() {
1054 return Ok(None);
1056 }
1057
1058 let iter = EncodedBulkPartIter::try_new(
1059 self,
1060 context,
1061 row_groups_to_read,
1062 sequence,
1063 mem_scan_metrics,
1064 )?;
1065 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1066 }
1067
1068 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1070 match pre_filter_mode {
1071 PreFilterMode::All => false,
1072 PreFilterMode::SkipFields => true,
1073 PreFilterMode::SkipFieldsOnDelete => {
1074 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1076 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1077 })
1078 }
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone)]
1085pub struct BulkPartMeta {
1086 pub num_rows: usize,
1088 pub max_timestamp: i64,
1090 pub min_timestamp: i64,
1092 pub parquet_metadata: Arc<ParquetMetaData>,
1094 pub region_metadata: RegionMetadataRef,
1096 pub num_series: u64,
1098 pub max_sequence: u64,
1100}
1101
1102#[derive(Default, Debug)]
1104pub struct BulkPartEncodeMetrics {
1105 pub iter_cost: Duration,
1107 pub write_cost: Duration,
1109 pub raw_size: usize,
1111 pub encoded_size: usize,
1113 pub num_rows: usize,
1115}
1116
1117pub struct BulkPartEncoder {
1118 metadata: RegionMetadataRef,
1119 row_group_size: usize,
1120 writer_props: Option<WriterProperties>,
1121}
1122
1123impl BulkPartEncoder {
1124 pub(crate) fn new(
1125 metadata: RegionMetadataRef,
1126 row_group_size: usize,
1127 ) -> Result<BulkPartEncoder> {
1128 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1130 let key_value_meta =
1131 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1132
1133 let writer_props = Some(
1135 WriterProperties::builder()
1136 .set_key_value_metadata(Some(vec![key_value_meta]))
1137 .set_write_batch_size(row_group_size)
1138 .set_max_row_group_size(row_group_size)
1139 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1140 .set_column_index_truncate_length(None)
1141 .set_statistics_truncate_length(None)
1142 .build(),
1143 );
1144
1145 Ok(Self {
1146 metadata,
1147 row_group_size,
1148 writer_props,
1149 })
1150 }
1151}
1152
1153impl BulkPartEncoder {
1154 pub fn encode_record_batch_iter(
1156 &self,
1157 iter: BoxedRecordBatchIterator,
1158 arrow_schema: SchemaRef,
1159 min_timestamp: i64,
1160 max_timestamp: i64,
1161 max_sequence: u64,
1162 metrics: &mut BulkPartEncodeMetrics,
1163 ) -> Result<Option<EncodedBulkPart>> {
1164 let mut buf = Vec::with_capacity(4096);
1165 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1166 .context(EncodeMemtableSnafu)?;
1167 let mut total_rows = 0;
1168 let mut series_estimator = SeriesEstimator::default();
1169
1170 let mut iter_start = Instant::now();
1172 for batch_result in iter {
1173 metrics.iter_cost += iter_start.elapsed();
1174 let batch = batch_result?;
1175 if batch.num_rows() == 0 {
1176 continue;
1177 }
1178
1179 series_estimator.update_flat(&batch);
1180 metrics.raw_size += record_batch_estimated_size(&batch);
1181 let write_start = Instant::now();
1182 writer.write(&batch).context(EncodeMemtableSnafu)?;
1183 metrics.write_cost += write_start.elapsed();
1184 total_rows += batch.num_rows();
1185 iter_start = Instant::now();
1186 }
1187 metrics.iter_cost += iter_start.elapsed();
1188 iter_start = Instant::now();
1189
1190 if total_rows == 0 {
1191 return Ok(None);
1192 }
1193
1194 let close_start = Instant::now();
1195 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1196 metrics.write_cost += close_start.elapsed();
1197 metrics.encoded_size += buf.len();
1198 metrics.num_rows += total_rows;
1199
1200 let buf = Bytes::from(buf);
1201 let parquet_metadata = Arc::new(file_metadata);
1202 let num_series = series_estimator.finish();
1203
1204 Ok(Some(EncodedBulkPart {
1205 data: buf,
1206 metadata: BulkPartMeta {
1207 num_rows: total_rows,
1208 max_timestamp,
1209 min_timestamp,
1210 parquet_metadata,
1211 region_metadata: self.metadata.clone(),
1212 num_series,
1213 max_sequence,
1214 },
1215 }))
1216 }
1217
1218 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1220 if part.batch.num_rows() == 0 {
1221 return Ok(None);
1222 }
1223
1224 let mut buf = Vec::with_capacity(4096);
1225 let arrow_schema = part.batch.schema();
1226
1227 let file_metadata = {
1228 let mut writer =
1229 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1230 .context(EncodeMemtableSnafu)?;
1231 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1232 writer.finish().context(EncodeMemtableSnafu)?
1233 };
1234
1235 let buf = Bytes::from(buf);
1236 let parquet_metadata = Arc::new(file_metadata);
1237
1238 Ok(Some(EncodedBulkPart {
1239 data: buf,
1240 metadata: BulkPartMeta {
1241 num_rows: part.batch.num_rows(),
1242 max_timestamp: part.max_timestamp,
1243 min_timestamp: part.min_timestamp,
1244 parquet_metadata,
1245 region_metadata: self.metadata.clone(),
1246 num_series: part.estimated_series_count() as u64,
1247 max_sequence: part.sequence,
1248 },
1249 }))
1250 }
1251}
1252
1253#[derive(Debug, Clone)]
1259pub struct MultiBulkPart {
1260 batches: SmallVec<[RecordBatch; 4]>,
1262 total_rows: usize,
1264 max_timestamp: i64,
1266 min_timestamp: i64,
1268 max_sequence: SequenceNumber,
1270 series_count: usize,
1272}
1273
1274impl MultiBulkPart {
1275 pub fn from_bulk_part(part: BulkPart) -> Self {
1277 let num_rows = part.num_rows();
1278 let series_count = part.estimated_series_count();
1279 let mut batches = SmallVec::new();
1280 batches.push(part.batch);
1281
1282 Self {
1283 batches,
1284 total_rows: num_rows,
1285 max_timestamp: part.max_timestamp,
1286 min_timestamp: part.min_timestamp,
1287 max_sequence: part.sequence,
1288 series_count,
1289 }
1290 }
1291
1292 pub fn new(
1304 batches: Vec<RecordBatch>,
1305 min_timestamp: i64,
1306 max_timestamp: i64,
1307 max_sequence: SequenceNumber,
1308 series_count: usize,
1309 ) -> Self {
1310 assert!(!batches.is_empty(), "batches must not be empty");
1311
1312 let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1313
1314 Self {
1315 batches: SmallVec::from_vec(batches),
1316 total_rows,
1317 max_timestamp,
1318 min_timestamp,
1319 max_sequence,
1320 series_count,
1321 }
1322 }
1323
1324 pub fn num_rows(&self) -> usize {
1326 self.total_rows
1327 }
1328
1329 pub fn min_timestamp(&self) -> i64 {
1331 self.min_timestamp
1332 }
1333
1334 pub fn max_timestamp(&self) -> i64 {
1336 self.max_timestamp
1337 }
1338
1339 pub fn max_sequence(&self) -> SequenceNumber {
1341 self.max_sequence
1342 }
1343
1344 pub fn series_count(&self) -> usize {
1346 self.series_count
1347 }
1348
1349 pub fn num_batches(&self) -> usize {
1351 self.batches.len()
1352 }
1353
1354 pub(crate) fn batches(&self) -> impl Iterator<Item = &RecordBatch> {
1356 self.batches.iter()
1357 }
1358
1359 pub(crate) fn estimated_size(&self) -> usize {
1361 self.batches.iter().map(record_batch_estimated_size).sum()
1362 }
1363
1364 pub(crate) fn read(
1366 &self,
1367 context: BulkIterContextRef,
1368 sequence: Option<SequenceRange>,
1369 mem_scan_metrics: Option<MemScanMetrics>,
1370 ) -> Result<Option<BoxedRecordBatchIterator>> {
1371 if self.batches.is_empty() {
1372 return Ok(None);
1373 }
1374
1375 let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1376 self.batches.iter().cloned().collect(),
1377 context,
1378 sequence,
1379 self.series_count,
1380 mem_scan_metrics,
1381 );
1382 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1383 }
1384
1385 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1387 let ts_type = region_metadata.time_index_type();
1388 let min_ts = ts_type.create_timestamp(self.min_timestamp);
1389 let max_ts = ts_type.create_timestamp(self.max_timestamp);
1390
1391 MemtableStats {
1392 estimated_bytes: self.estimated_size(),
1393 time_range: Some((min_ts, max_ts)),
1394 num_rows: self.num_rows(),
1395 num_ranges: 1,
1396 max_sequence: self.max_sequence,
1397 series_count: self.series_count,
1398 }
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use api::v1::{Row, SemanticType, WriteHint};
1405 use datafusion_common::ScalarValue;
1406 use datatypes::arrow::array::Float64Array;
1407 use datatypes::prelude::{ConcreteDataType, Value};
1408 use datatypes::schema::ColumnSchema;
1409 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1410 use store_api::storage::RegionId;
1411 use store_api::storage::consts::ReservedColumnId;
1412
1413 use super::*;
1414 use crate::memtable::bulk::context::BulkIterContext;
1415 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1416 use crate::test_util::memtable_util::{
1417 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1418 };
1419
1420 struct MutationInput<'a> {
1421 k0: &'a str,
1422 k1: u32,
1423 timestamps: &'a [i64],
1424 v1: &'a [Option<f64>],
1425 sequence: u64,
1426 }
1427
1428 #[derive(Debug, PartialOrd, PartialEq)]
1429 struct BatchOutput<'a> {
1430 pk_values: &'a [Value],
1431 timestamps: &'a [i64],
1432 v1: &'a [Option<f64>],
1433 }
1434
1435 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1436 let metadata = metadata_for_test();
1437 let kvs = input
1438 .iter()
1439 .map(|m| {
1440 build_key_values_with_ts_seq_values(
1441 &metadata,
1442 m.k0.to_string(),
1443 m.k1,
1444 m.timestamps.iter().copied(),
1445 m.v1.iter().copied(),
1446 m.sequence,
1447 )
1448 })
1449 .collect::<Vec<_>>();
1450 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1451 let primary_key_codec = build_primary_key_codec(&metadata);
1452 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1453 for kv in kvs {
1454 converter.append_key_values(&kv).unwrap();
1455 }
1456 let part = converter.convert().unwrap();
1457 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1458 encoder.encode_part(&part).unwrap().unwrap()
1459 }
1460
1461 #[test]
1462 fn test_write_and_read_part_projection() {
1463 let part = encode(&[
1464 MutationInput {
1465 k0: "a",
1466 k1: 0,
1467 timestamps: &[1],
1468 v1: &[Some(0.1)],
1469 sequence: 0,
1470 },
1471 MutationInput {
1472 k0: "b",
1473 k1: 0,
1474 timestamps: &[1],
1475 v1: &[Some(0.0)],
1476 sequence: 0,
1477 },
1478 MutationInput {
1479 k0: "a",
1480 k1: 0,
1481 timestamps: &[2],
1482 v1: &[Some(0.2)],
1483 sequence: 1,
1484 },
1485 ]);
1486
1487 let projection = &[4u32];
1488 let mut reader = part
1489 .read(
1490 Arc::new(
1491 BulkIterContext::new(
1492 part.metadata.region_metadata.clone(),
1493 Some(projection.as_slice()),
1494 None,
1495 false,
1496 )
1497 .unwrap(),
1498 ),
1499 None,
1500 None,
1501 )
1502 .unwrap()
1503 .expect("expect at least one row group");
1504
1505 let mut total_rows_read = 0;
1506 let mut field: Vec<f64> = vec![];
1507 for res in reader {
1508 let batch = res.unwrap();
1509 assert_eq!(5, batch.num_columns());
1510 field.extend_from_slice(
1511 batch
1512 .column(0)
1513 .as_any()
1514 .downcast_ref::<Float64Array>()
1515 .unwrap()
1516 .values(),
1517 );
1518 total_rows_read += batch.num_rows();
1519 }
1520 assert_eq!(3, total_rows_read);
1521 assert_eq!(vec![0.1, 0.2, 0.0], field);
1522 }
1523
1524 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1525 let metadata = metadata_for_test();
1526 let kvs = key_values
1527 .into_iter()
1528 .map(|(k0, k1, (start, end), sequence)| {
1529 let ts = (start..end);
1530 let v1 = (start..end).map(|_| None);
1531 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1532 })
1533 .collect::<Vec<_>>();
1534 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1535 let primary_key_codec = build_primary_key_codec(&metadata);
1536 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1537 for kv in kvs {
1538 converter.append_key_values(&kv).unwrap();
1539 }
1540 let part = converter.convert().unwrap();
1541 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1542 encoder.encode_part(&part).unwrap().unwrap()
1543 }
1544
1545 fn check_prune_row_group(
1546 part: &EncodedBulkPart,
1547 predicate: Option<Predicate>,
1548 expected_rows: usize,
1549 ) {
1550 let context = Arc::new(
1551 BulkIterContext::new(
1552 part.metadata.region_metadata.clone(),
1553 None,
1554 predicate,
1555 false,
1556 )
1557 .unwrap(),
1558 );
1559 let mut reader = part
1560 .read(context, None, None)
1561 .unwrap()
1562 .expect("expect at least one row group");
1563 let mut total_rows_read = 0;
1564 for res in reader {
1565 let batch = res.unwrap();
1566 total_rows_read += batch.num_rows();
1567 }
1568 assert_eq!(expected_rows, total_rows_read);
1570 }
1571
1572 #[test]
1573 fn test_prune_row_groups() {
1574 let part = prepare(vec![
1575 ("a", 0, (0, 40), 1),
1576 ("a", 1, (0, 60), 1),
1577 ("b", 0, (0, 100), 2),
1578 ("b", 1, (100, 180), 3),
1579 ("b", 1, (180, 210), 4),
1580 ]);
1581
1582 let context = Arc::new(
1583 BulkIterContext::new(
1584 part.metadata.region_metadata.clone(),
1585 None,
1586 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1587 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1588 )])),
1589 false,
1590 )
1591 .unwrap(),
1592 );
1593 assert!(part.read(context, None, None).unwrap().is_none());
1594
1595 check_prune_row_group(&part, None, 310);
1596
1597 check_prune_row_group(
1598 &part,
1599 Some(Predicate::new(vec![
1600 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1601 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1602 ])),
1603 40,
1604 );
1605
1606 check_prune_row_group(
1607 &part,
1608 Some(Predicate::new(vec![
1609 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1610 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1611 ])),
1612 60,
1613 );
1614
1615 check_prune_row_group(
1616 &part,
1617 Some(Predicate::new(vec![
1618 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1619 ])),
1620 100,
1621 );
1622
1623 check_prune_row_group(
1624 &part,
1625 Some(Predicate::new(vec![
1626 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1627 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1628 ])),
1629 100,
1630 );
1631
1632 check_prune_row_group(
1634 &part,
1635 Some(Predicate::new(vec![
1636 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1637 ])),
1638 1,
1639 );
1640 }
1641
1642 #[test]
1643 fn test_bulk_part_converter_append_and_convert() {
1644 let metadata = metadata_for_test();
1645 let capacity = 100;
1646 let primary_key_codec = build_primary_key_codec(&metadata);
1647 let schema = to_flat_sst_arrow_schema(
1648 &metadata,
1649 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1650 );
1651
1652 let mut converter =
1653 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1654
1655 let key_values1 = build_key_values_with_ts_seq_values(
1656 &metadata,
1657 "key1".to_string(),
1658 1u32,
1659 vec![1000, 2000].into_iter(),
1660 vec![Some(1.0), Some(2.0)].into_iter(),
1661 1,
1662 );
1663
1664 let key_values2 = build_key_values_with_ts_seq_values(
1665 &metadata,
1666 "key2".to_string(),
1667 2u32,
1668 vec![1500].into_iter(),
1669 vec![Some(3.0)].into_iter(),
1670 2,
1671 );
1672
1673 converter.append_key_values(&key_values1).unwrap();
1674 converter.append_key_values(&key_values2).unwrap();
1675
1676 let bulk_part = converter.convert().unwrap();
1677
1678 assert_eq!(bulk_part.num_rows(), 3);
1679 assert_eq!(bulk_part.min_timestamp, 1000);
1680 assert_eq!(bulk_part.max_timestamp, 2000);
1681 assert_eq!(bulk_part.sequence, 2);
1682 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1683
1684 let schema = bulk_part.batch.schema();
1687 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1688 assert_eq!(
1689 field_names,
1690 vec![
1691 "k0",
1692 "k1",
1693 "v0",
1694 "v1",
1695 "ts",
1696 "__primary_key",
1697 "__sequence",
1698 "__op_type"
1699 ]
1700 );
1701 }
1702
1703 #[test]
1704 fn test_bulk_part_converter_sorting() {
1705 let metadata = metadata_for_test();
1706 let capacity = 100;
1707 let primary_key_codec = build_primary_key_codec(&metadata);
1708 let schema = to_flat_sst_arrow_schema(
1709 &metadata,
1710 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1711 );
1712
1713 let mut converter =
1714 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1715
1716 let key_values1 = build_key_values_with_ts_seq_values(
1717 &metadata,
1718 "z_key".to_string(),
1719 3u32,
1720 vec![3000].into_iter(),
1721 vec![Some(3.0)].into_iter(),
1722 3,
1723 );
1724
1725 let key_values2 = build_key_values_with_ts_seq_values(
1726 &metadata,
1727 "a_key".to_string(),
1728 1u32,
1729 vec![1000].into_iter(),
1730 vec![Some(1.0)].into_iter(),
1731 1,
1732 );
1733
1734 let key_values3 = build_key_values_with_ts_seq_values(
1735 &metadata,
1736 "m_key".to_string(),
1737 2u32,
1738 vec![2000].into_iter(),
1739 vec![Some(2.0)].into_iter(),
1740 2,
1741 );
1742
1743 converter.append_key_values(&key_values1).unwrap();
1744 converter.append_key_values(&key_values2).unwrap();
1745 converter.append_key_values(&key_values3).unwrap();
1746
1747 let bulk_part = converter.convert().unwrap();
1748
1749 assert_eq!(bulk_part.num_rows(), 3);
1750
1751 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1752 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1753
1754 let ts_array = ts_column
1755 .as_any()
1756 .downcast_ref::<TimestampMillisecondArray>()
1757 .unwrap();
1758 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1759
1760 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1761 assert_eq!(seq_array.values(), &[1, 2, 3]);
1762
1763 let schema = bulk_part.batch.schema();
1765 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1766 assert_eq!(
1767 field_names,
1768 vec![
1769 "k0",
1770 "k1",
1771 "v0",
1772 "v1",
1773 "ts",
1774 "__primary_key",
1775 "__sequence",
1776 "__op_type"
1777 ]
1778 );
1779 }
1780
1781 #[test]
1782 fn test_bulk_part_converter_empty() {
1783 let metadata = metadata_for_test();
1784 let capacity = 10;
1785 let primary_key_codec = build_primary_key_codec(&metadata);
1786 let schema = to_flat_sst_arrow_schema(
1787 &metadata,
1788 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1789 );
1790
1791 let converter =
1792 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1793
1794 let bulk_part = converter.convert().unwrap();
1795
1796 assert_eq!(bulk_part.num_rows(), 0);
1797 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1798 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1799 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1800
1801 let schema = bulk_part.batch.schema();
1803 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1804 assert_eq!(
1805 field_names,
1806 vec![
1807 "k0",
1808 "k1",
1809 "v0",
1810 "v1",
1811 "ts",
1812 "__primary_key",
1813 "__sequence",
1814 "__op_type"
1815 ]
1816 );
1817 }
1818
1819 #[test]
1820 fn test_bulk_part_converter_without_primary_key_columns() {
1821 let metadata = metadata_for_test();
1822 let primary_key_codec = build_primary_key_codec(&metadata);
1823 let schema = to_flat_sst_arrow_schema(
1824 &metadata,
1825 &FlatSchemaOptions {
1826 raw_pk_columns: false,
1827 string_pk_use_dict: true,
1828 },
1829 );
1830
1831 let capacity = 100;
1832 let mut converter =
1833 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1834
1835 let key_values1 = build_key_values_with_ts_seq_values(
1836 &metadata,
1837 "key1".to_string(),
1838 1u32,
1839 vec![1000, 2000].into_iter(),
1840 vec![Some(1.0), Some(2.0)].into_iter(),
1841 1,
1842 );
1843
1844 let key_values2 = build_key_values_with_ts_seq_values(
1845 &metadata,
1846 "key2".to_string(),
1847 2u32,
1848 vec![1500].into_iter(),
1849 vec![Some(3.0)].into_iter(),
1850 2,
1851 );
1852
1853 converter.append_key_values(&key_values1).unwrap();
1854 converter.append_key_values(&key_values2).unwrap();
1855
1856 let bulk_part = converter.convert().unwrap();
1857
1858 assert_eq!(bulk_part.num_rows(), 3);
1859 assert_eq!(bulk_part.min_timestamp, 1000);
1860 assert_eq!(bulk_part.max_timestamp, 2000);
1861 assert_eq!(bulk_part.sequence, 2);
1862 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1863
1864 let schema = bulk_part.batch.schema();
1866 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1867 assert_eq!(
1868 field_names,
1869 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1870 );
1871 }
1872
1873 #[allow(clippy::too_many_arguments)]
1874 fn build_key_values_with_sparse_encoding(
1875 metadata: &RegionMetadataRef,
1876 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1877 table_id: u32,
1878 tsid: u64,
1879 k0: String,
1880 k1: String,
1881 timestamps: impl Iterator<Item = i64>,
1882 values: impl Iterator<Item = Option<f64>>,
1883 sequence: SequenceNumber,
1884 ) -> KeyValues {
1885 let pk_values = vec![
1887 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1888 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1889 (0, Value::String(k0.clone().into())),
1890 (1, Value::String(k1.clone().into())),
1891 ];
1892 let mut encoded_key = Vec::new();
1893 primary_key_codec
1894 .encode_values(&pk_values, &mut encoded_key)
1895 .unwrap();
1896 assert!(!encoded_key.is_empty());
1897
1898 let column_schema = vec![
1900 api::v1::ColumnSchema {
1901 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1902 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1903 ConcreteDataType::binary_datatype(),
1904 )
1905 .unwrap()
1906 .datatype() as i32,
1907 semantic_type: api::v1::SemanticType::Tag as i32,
1908 ..Default::default()
1909 },
1910 api::v1::ColumnSchema {
1911 column_name: "ts".to_string(),
1912 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1913 ConcreteDataType::timestamp_millisecond_datatype(),
1914 )
1915 .unwrap()
1916 .datatype() as i32,
1917 semantic_type: api::v1::SemanticType::Timestamp as i32,
1918 ..Default::default()
1919 },
1920 api::v1::ColumnSchema {
1921 column_name: "v0".to_string(),
1922 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1923 ConcreteDataType::int64_datatype(),
1924 )
1925 .unwrap()
1926 .datatype() as i32,
1927 semantic_type: api::v1::SemanticType::Field as i32,
1928 ..Default::default()
1929 },
1930 api::v1::ColumnSchema {
1931 column_name: "v1".to_string(),
1932 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1933 ConcreteDataType::float64_datatype(),
1934 )
1935 .unwrap()
1936 .datatype() as i32,
1937 semantic_type: api::v1::SemanticType::Field as i32,
1938 ..Default::default()
1939 },
1940 ];
1941
1942 let rows = timestamps
1943 .zip(values)
1944 .map(|(ts, v)| Row {
1945 values: vec![
1946 api::v1::Value {
1947 value_data: Some(api::v1::value::ValueData::BinaryValue(
1948 encoded_key.clone(),
1949 )),
1950 },
1951 api::v1::Value {
1952 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1953 },
1954 api::v1::Value {
1955 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1956 },
1957 api::v1::Value {
1958 value_data: v.map(api::v1::value::ValueData::F64Value),
1959 },
1960 ],
1961 })
1962 .collect();
1963
1964 let mutation = api::v1::Mutation {
1965 op_type: 1,
1966 sequence,
1967 rows: Some(api::v1::Rows {
1968 schema: column_schema,
1969 rows,
1970 }),
1971 write_hint: Some(WriteHint {
1972 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1973 }),
1974 };
1975 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1976 }
1977
1978 #[test]
1979 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1980 use api::v1::SemanticType;
1981 use datatypes::schema::ColumnSchema;
1982 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1983 use store_api::storage::RegionId;
1984
1985 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1986 builder
1987 .push_column_metadata(ColumnMetadata {
1988 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1989 semantic_type: SemanticType::Tag,
1990 column_id: 0,
1991 })
1992 .push_column_metadata(ColumnMetadata {
1993 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1994 semantic_type: SemanticType::Tag,
1995 column_id: 1,
1996 })
1997 .push_column_metadata(ColumnMetadata {
1998 column_schema: ColumnSchema::new(
1999 "ts",
2000 ConcreteDataType::timestamp_millisecond_datatype(),
2001 false,
2002 ),
2003 semantic_type: SemanticType::Timestamp,
2004 column_id: 2,
2005 })
2006 .push_column_metadata(ColumnMetadata {
2007 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2008 semantic_type: SemanticType::Field,
2009 column_id: 3,
2010 })
2011 .push_column_metadata(ColumnMetadata {
2012 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2013 semantic_type: SemanticType::Field,
2014 column_id: 4,
2015 })
2016 .primary_key(vec![0, 1])
2017 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2018 let metadata = Arc::new(builder.build().unwrap());
2019
2020 let primary_key_codec = build_primary_key_codec(&metadata);
2021 let schema = to_flat_sst_arrow_schema(
2022 &metadata,
2023 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2024 );
2025
2026 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2027 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2028
2029 let capacity = 100;
2030 let mut converter =
2031 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2032
2033 let key_values1 = build_key_values_with_sparse_encoding(
2034 &metadata,
2035 &primary_key_codec,
2036 2048u32, 100u64, "key11".to_string(),
2039 "key21".to_string(),
2040 vec![1000, 2000].into_iter(),
2041 vec![Some(1.0), Some(2.0)].into_iter(),
2042 1,
2043 );
2044
2045 let key_values2 = build_key_values_with_sparse_encoding(
2046 &metadata,
2047 &primary_key_codec,
2048 4096u32, 200u64, "key12".to_string(),
2051 "key22".to_string(),
2052 vec![1500].into_iter(),
2053 vec![Some(3.0)].into_iter(),
2054 2,
2055 );
2056
2057 converter.append_key_values(&key_values1).unwrap();
2058 converter.append_key_values(&key_values2).unwrap();
2059
2060 let bulk_part = converter.convert().unwrap();
2061
2062 assert_eq!(bulk_part.num_rows(), 3);
2063 assert_eq!(bulk_part.min_timestamp, 1000);
2064 assert_eq!(bulk_part.max_timestamp, 2000);
2065 assert_eq!(bulk_part.sequence, 2);
2066 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2067
2068 let schema = bulk_part.batch.schema();
2072 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2073 assert_eq!(
2074 field_names,
2075 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2076 );
2077
2078 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2080 let dict_array = primary_key_column
2081 .as_any()
2082 .downcast_ref::<DictionaryArray<UInt32Type>>()
2083 .unwrap();
2084
2085 assert!(!dict_array.is_empty());
2087 assert_eq!(dict_array.len(), 3); let values = dict_array
2091 .values()
2092 .as_any()
2093 .downcast_ref::<BinaryArray>()
2094 .unwrap();
2095 for i in 0..values.len() {
2096 assert!(
2097 !values.value(i).is_empty(),
2098 "Encoded primary key should not be empty"
2099 );
2100 }
2101 }
2102
2103 #[test]
2104 fn test_convert_bulk_part_empty() {
2105 let metadata = metadata_for_test();
2106 let schema = to_flat_sst_arrow_schema(
2107 &metadata,
2108 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2109 );
2110 let primary_key_codec = build_primary_key_codec(&metadata);
2111
2112 let empty_batch = RecordBatch::new_empty(schema.clone());
2114 let empty_part = BulkPart {
2115 batch: empty_batch,
2116 max_timestamp: 0,
2117 min_timestamp: 0,
2118 sequence: 0,
2119 timestamp_index: 0,
2120 raw_data: None,
2121 };
2122
2123 let result =
2124 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2125 assert!(result.is_none());
2126 }
2127
2128 #[test]
2129 fn test_convert_bulk_part_dense_with_pk_columns() {
2130 let metadata = metadata_for_test();
2131 let primary_key_codec = build_primary_key_codec(&metadata);
2132
2133 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2134 "key1", "key2", "key1",
2135 ]));
2136 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2137 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2138 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2139 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2140
2141 let input_schema = Arc::new(Schema::new(vec![
2142 Field::new("k0", ArrowDataType::Utf8, false),
2143 Field::new("k1", ArrowDataType::UInt32, false),
2144 Field::new("v0", ArrowDataType::Int64, true),
2145 Field::new("v1", ArrowDataType::Float64, true),
2146 Field::new(
2147 "ts",
2148 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2149 false,
2150 ),
2151 ]));
2152
2153 let input_batch = RecordBatch::try_new(
2154 input_schema,
2155 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2156 )
2157 .unwrap();
2158
2159 let part = BulkPart {
2160 batch: input_batch,
2161 max_timestamp: 2000,
2162 min_timestamp: 1000,
2163 sequence: 5,
2164 timestamp_index: 4,
2165 raw_data: None,
2166 };
2167
2168 let output_schema = to_flat_sst_arrow_schema(
2169 &metadata,
2170 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2171 );
2172
2173 let result = convert_bulk_part(
2174 part,
2175 &metadata,
2176 primary_key_codec,
2177 output_schema,
2178 true, )
2180 .unwrap();
2181
2182 let converted = result.unwrap();
2183
2184 assert_eq!(converted.num_rows(), 3);
2185 assert_eq!(converted.max_timestamp, 2000);
2186 assert_eq!(converted.min_timestamp, 1000);
2187 assert_eq!(converted.sequence, 5);
2188
2189 let schema = converted.batch.schema();
2190 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2191 assert_eq!(
2192 field_names,
2193 vec![
2194 "k0",
2195 "k1",
2196 "v0",
2197 "v1",
2198 "ts",
2199 "__primary_key",
2200 "__sequence",
2201 "__op_type"
2202 ]
2203 );
2204
2205 let k0_col = converted.batch.column_by_name("k0").unwrap();
2206 assert!(matches!(
2207 k0_col.data_type(),
2208 ArrowDataType::Dictionary(_, _)
2209 ));
2210
2211 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2212 let dict_array = pk_col
2213 .as_any()
2214 .downcast_ref::<DictionaryArray<UInt32Type>>()
2215 .unwrap();
2216 let keys = dict_array.keys();
2217
2218 assert_eq!(keys.len(), 3);
2219 }
2220
2221 #[test]
2222 fn test_convert_bulk_part_dense_without_pk_columns() {
2223 let metadata = metadata_for_test();
2224 let primary_key_codec = build_primary_key_codec(&metadata);
2225
2226 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2228 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2229 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2230 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2231 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2232
2233 let input_schema = Arc::new(Schema::new(vec![
2234 Field::new("k0", ArrowDataType::Utf8, false),
2235 Field::new("k1", ArrowDataType::UInt32, false),
2236 Field::new("v0", ArrowDataType::Int64, true),
2237 Field::new("v1", ArrowDataType::Float64, true),
2238 Field::new(
2239 "ts",
2240 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2241 false,
2242 ),
2243 ]));
2244
2245 let input_batch = RecordBatch::try_new(
2246 input_schema,
2247 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2248 )
2249 .unwrap();
2250
2251 let part = BulkPart {
2252 batch: input_batch,
2253 max_timestamp: 2000,
2254 min_timestamp: 1000,
2255 sequence: 3,
2256 timestamp_index: 4,
2257 raw_data: None,
2258 };
2259
2260 let output_schema = to_flat_sst_arrow_schema(
2261 &metadata,
2262 &FlatSchemaOptions {
2263 raw_pk_columns: false,
2264 string_pk_use_dict: true,
2265 },
2266 );
2267
2268 let result = convert_bulk_part(
2269 part,
2270 &metadata,
2271 primary_key_codec,
2272 output_schema,
2273 false, )
2275 .unwrap();
2276
2277 let converted = result.unwrap();
2278
2279 assert_eq!(converted.num_rows(), 2);
2280 assert_eq!(converted.max_timestamp, 2000);
2281 assert_eq!(converted.min_timestamp, 1000);
2282 assert_eq!(converted.sequence, 3);
2283
2284 let schema = converted.batch.schema();
2286 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2287 assert_eq!(
2288 field_names,
2289 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2290 );
2291
2292 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2294 assert!(matches!(
2295 pk_col.data_type(),
2296 ArrowDataType::Dictionary(_, _)
2297 ));
2298 }
2299
2300 #[test]
2301 fn test_convert_bulk_part_sparse_encoding() {
2302 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2303 builder
2304 .push_column_metadata(ColumnMetadata {
2305 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2306 semantic_type: SemanticType::Tag,
2307 column_id: 0,
2308 })
2309 .push_column_metadata(ColumnMetadata {
2310 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2311 semantic_type: SemanticType::Tag,
2312 column_id: 1,
2313 })
2314 .push_column_metadata(ColumnMetadata {
2315 column_schema: ColumnSchema::new(
2316 "ts",
2317 ConcreteDataType::timestamp_millisecond_datatype(),
2318 false,
2319 ),
2320 semantic_type: SemanticType::Timestamp,
2321 column_id: 2,
2322 })
2323 .push_column_metadata(ColumnMetadata {
2324 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2325 semantic_type: SemanticType::Field,
2326 column_id: 3,
2327 })
2328 .push_column_metadata(ColumnMetadata {
2329 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2330 semantic_type: SemanticType::Field,
2331 column_id: 4,
2332 })
2333 .primary_key(vec![0, 1])
2334 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2335 let metadata = Arc::new(builder.build().unwrap());
2336
2337 let primary_key_codec = build_primary_key_codec(&metadata);
2338
2339 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2341 b"encoded_key_1".as_slice(),
2342 b"encoded_key_2".as_slice(),
2343 ]));
2344 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2345 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2346 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2347
2348 let input_schema = Arc::new(Schema::new(vec![
2349 Field::new("__primary_key", ArrowDataType::Binary, false),
2350 Field::new("v0", ArrowDataType::Int64, true),
2351 Field::new("v1", ArrowDataType::Float64, true),
2352 Field::new(
2353 "ts",
2354 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2355 false,
2356 ),
2357 ]));
2358
2359 let input_batch =
2360 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2361 .unwrap();
2362
2363 let part = BulkPart {
2364 batch: input_batch,
2365 max_timestamp: 2000,
2366 min_timestamp: 1000,
2367 sequence: 7,
2368 timestamp_index: 3,
2369 raw_data: None,
2370 };
2371
2372 let output_schema = to_flat_sst_arrow_schema(
2373 &metadata,
2374 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2375 );
2376
2377 let result = convert_bulk_part(
2378 part,
2379 &metadata,
2380 primary_key_codec,
2381 output_schema,
2382 true, )
2384 .unwrap();
2385
2386 let converted = result.unwrap();
2387
2388 assert_eq!(converted.num_rows(), 2);
2389 assert_eq!(converted.max_timestamp, 2000);
2390 assert_eq!(converted.min_timestamp, 1000);
2391 assert_eq!(converted.sequence, 7);
2392
2393 let schema = converted.batch.schema();
2395 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2396 assert_eq!(
2397 field_names,
2398 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2399 );
2400
2401 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2403 assert!(matches!(
2404 pk_col.data_type(),
2405 ArrowDataType::Dictionary(_, _)
2406 ));
2407 }
2408
2409 #[test]
2410 fn test_convert_bulk_part_sorting_with_multiple_series() {
2411 let metadata = metadata_for_test();
2412 let primary_key_codec = build_primary_key_codec(&metadata);
2413
2414 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2416 "series_b", "series_a", "series_b", "series_a",
2417 ]));
2418 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2419 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2420 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2421 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2422 2000, 1000, 4000, 3000,
2423 ]));
2424
2425 let input_schema = Arc::new(Schema::new(vec![
2426 Field::new("k0", ArrowDataType::Utf8, false),
2427 Field::new("k1", ArrowDataType::UInt32, false),
2428 Field::new("v0", ArrowDataType::Int64, true),
2429 Field::new("v1", ArrowDataType::Float64, true),
2430 Field::new(
2431 "ts",
2432 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2433 false,
2434 ),
2435 ]));
2436
2437 let input_batch = RecordBatch::try_new(
2438 input_schema,
2439 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2440 )
2441 .unwrap();
2442
2443 let part = BulkPart {
2444 batch: input_batch,
2445 max_timestamp: 4000,
2446 min_timestamp: 1000,
2447 sequence: 10,
2448 timestamp_index: 4,
2449 raw_data: None,
2450 };
2451
2452 let output_schema = to_flat_sst_arrow_schema(
2453 &metadata,
2454 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2455 );
2456
2457 let result =
2458 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2459
2460 let converted = result.unwrap();
2461
2462 assert_eq!(converted.num_rows(), 4);
2463
2464 let ts_col = converted.batch.column(converted.timestamp_index);
2466 let ts_array = ts_col
2467 .as_any()
2468 .downcast_ref::<TimestampMillisecondArray>()
2469 .unwrap();
2470
2471 let timestamps: Vec<i64> = ts_array.values().to_vec();
2475 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2476 }
2477}