1use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datatypes::arrow;
29use datatypes::arrow::array::{Array, ArrayRef, StringDictionaryBuilder, UInt8Array, UInt64Array};
30use datatypes::arrow::compute::{SortColumn, SortOptions};
31use datatypes::arrow::datatypes::{
32 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
33};
34use datatypes::data_type::DataType;
35use datatypes::prelude::{MutableVector, Vector};
36use datatypes::value::ValueRef;
37use datatypes::vectors::Helper;
38use mito_codec::key_values::{KeyValue, KeyValues};
39use mito_codec::row_converter::PrimaryKeyCodec;
40use parquet::arrow::ArrowWriter;
41use parquet::basic::{Compression, ZstdLevel};
42use parquet::file::metadata::ParquetMetaData;
43use parquet::file::properties::WriterProperties;
44use smallvec::SmallVec;
45use snafu::{OptionExt, ResultExt};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
49use store_api::storage::{FileId, SequenceNumber, SequenceRange};
50
51use crate::error::{
52 self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
53 EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
54 NewRecordBatchSnafu, Result,
55};
56use crate::memtable::bulk::context::BulkIterContextRef;
57use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
58use crate::memtable::time_series::{ValueBuilder, Values};
59use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
60use crate::sst::SeriesEstimator;
61use crate::sst::index::IndexOutput;
62use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
63use crate::sst::parquet::flat_format::primary_key_column_index;
64use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
65use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
66
67const INIT_DICT_VALUE_CAPACITY: usize = 8;
68
69#[derive(Clone)]
71pub struct BulkPart {
72 pub batch: RecordBatch,
73 pub max_timestamp: i64,
74 pub min_timestamp: i64,
75 pub sequence: u64,
76 pub timestamp_index: usize,
77 pub raw_data: Option<ArrowIpc>,
78}
79
80impl TryFrom<BulkWalEntry> for BulkPart {
81 type Error = error::Error;
82
83 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
84 match value.body.expect("Entry payload should be present") {
85 Body::ArrowIpc(ipc) => {
86 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
87 .context(error::ConvertBulkWalEntrySnafu)?;
88 let batch = decoder
89 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
90 .context(error::ConvertBulkWalEntrySnafu)?;
91 Ok(Self {
92 batch,
93 max_timestamp: value.max_ts,
94 min_timestamp: value.min_ts,
95 sequence: value.sequence,
96 timestamp_index: value.timestamp_index as usize,
97 raw_data: Some(ipc),
98 })
99 }
100 }
101 }
102}
103
104impl TryFrom<&BulkPart> for BulkWalEntry {
105 type Error = error::Error;
106
107 fn try_from(value: &BulkPart) -> Result<Self> {
108 if let Some(ipc) = &value.raw_data {
109 Ok(BulkWalEntry {
110 sequence: value.sequence,
111 max_ts: value.max_timestamp,
112 min_ts: value.min_timestamp,
113 timestamp_index: value.timestamp_index as u32,
114 body: Some(Body::ArrowIpc(ipc.clone())),
115 })
116 } else {
117 let mut encoder = FlightEncoder::default();
118 let schema_bytes = encoder
119 .encode_schema(value.batch.schema().as_ref())
120 .data_header;
121 let [rb_data] = encoder
122 .encode(FlightMessage::RecordBatch(value.batch.clone()))
123 .try_into()
124 .map_err(|_| {
125 error::UnsupportedOperationSnafu {
126 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
127 }
128 .build()
129 })?;
130 Ok(BulkWalEntry {
131 sequence: value.sequence,
132 max_ts: value.max_timestamp,
133 min_ts: value.min_timestamp,
134 timestamp_index: value.timestamp_index as u32,
135 body: Some(Body::ArrowIpc(ArrowIpc {
136 schema: schema_bytes,
137 data_header: rb_data.data_header,
138 payload: rb_data.data_body,
139 })),
140 })
141 }
142 }
143}
144
145impl BulkPart {
146 pub(crate) fn estimated_size(&self) -> usize {
147 record_batch_estimated_size(&self.batch)
148 }
149
150 pub fn estimated_series_count(&self) -> usize {
153 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
154 let pk_column = self.batch.column(pk_column_idx);
155 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
156 dict_array.values().len()
157 } else {
158 0
159 }
160 }
161
162 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
164 let ts_type = region_metadata.time_index_type();
165 let min_ts = ts_type.create_timestamp(self.min_timestamp);
166 let max_ts = ts_type.create_timestamp(self.max_timestamp);
167
168 MemtableStats {
169 estimated_bytes: self.estimated_size(),
170 time_range: Some((min_ts, max_ts)),
171 num_rows: self.num_rows(),
172 num_ranges: 1,
173 max_sequence: self.sequence,
174 series_count: self.estimated_series_count(),
175 }
176 }
177
178 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
188 let batch_schema = self.batch.schema();
190 let batch_columns: HashSet<_> = batch_schema
191 .fields()
192 .iter()
193 .map(|f| f.name().as_str())
194 .collect();
195
196 let mut columns_to_fill = Vec::new();
198 for column_meta in ®ion_metadata.column_metadatas {
199 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
202 columns_to_fill.push(column_meta);
203 }
204 }
205
206 if columns_to_fill.is_empty() {
207 return Ok(());
208 }
209
210 let num_rows = self.batch.num_rows();
211
212 let mut new_columns = Vec::new();
213 let mut new_fields = Vec::new();
214
215 new_fields.extend(batch_schema.fields().iter().cloned());
217 new_columns.extend_from_slice(self.batch.columns());
218
219 let region_id = region_metadata.region_id;
220 for column_meta in columns_to_fill {
222 let default_vector = column_meta
223 .column_schema
224 .create_default_vector(num_rows)
225 .context(CreateDefaultSnafu {
226 region_id,
227 column: &column_meta.column_schema.name,
228 })?
229 .with_context(|| InvalidRequestSnafu {
230 region_id,
231 reason: format!(
232 "column {} does not have default value",
233 column_meta.column_schema.name
234 ),
235 })?;
236 let arrow_array = default_vector.to_arrow_array();
237 column_meta.column_schema.data_type.as_arrow_type();
238
239 new_fields.push(Arc::new(Field::new(
240 column_meta.column_schema.name.clone(),
241 column_meta.column_schema.data_type.as_arrow_type(),
242 column_meta.column_schema.is_nullable(),
243 )));
244 new_columns.push(arrow_array);
245 }
246
247 let new_schema = Arc::new(Schema::new(new_fields));
249 let new_batch =
250 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
251
252 self.batch = new_batch;
254
255 Ok(())
256 }
257
258 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
260 let vectors = region_metadata
261 .schema
262 .column_schemas()
263 .iter()
264 .map(|col| match self.batch.column_by_name(&col.name) {
265 None => Ok(None),
266 Some(col) => Helper::try_into_vector(col).map(Some),
267 })
268 .collect::<datatypes::error::Result<Vec<_>>>()
269 .context(error::ComputeVectorSnafu)?;
270
271 let rows = (0..self.num_rows())
272 .map(|row_idx| {
273 let values = (0..self.batch.num_columns())
274 .map(|col_idx| {
275 if let Some(v) = &vectors[col_idx] {
276 to_grpc_value(v.get(row_idx))
277 } else {
278 api::v1::Value { value_data: None }
279 }
280 })
281 .collect::<Vec<_>>();
282 api::v1::Row { values }
283 })
284 .collect::<Vec<_>>();
285
286 let schema = region_metadata
287 .column_metadatas
288 .iter()
289 .map(|c| {
290 let data_type_wrapper =
291 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
292 Ok(api::v1::ColumnSchema {
293 column_name: c.column_schema.name.clone(),
294 datatype: data_type_wrapper.datatype() as i32,
295 semantic_type: c.semantic_type as i32,
296 ..Default::default()
297 })
298 })
299 .collect::<api::error::Result<Vec<_>>>()
300 .context(error::ConvertColumnDataTypeSnafu {
301 reason: "failed to convert region metadata to column schema",
302 })?;
303
304 let rows = api::v1::Rows { schema, rows };
305
306 Ok(Mutation {
307 op_type: OpType::Put as i32,
308 sequence: self.sequence,
309 rows: Some(rows),
310 write_hint: None,
311 })
312 }
313
314 pub fn timestamps(&self) -> &ArrayRef {
315 self.batch.column(self.timestamp_index)
316 }
317
318 pub fn num_rows(&self) -> usize {
319 self.batch.num_rows()
320 }
321}
322
323pub struct UnorderedPart {
326 parts: Vec<BulkPart>,
328 total_rows: usize,
330 min_timestamp: i64,
332 max_timestamp: i64,
334 max_sequence: u64,
336 threshold: usize,
338 compact_threshold: usize,
340}
341
342impl Default for UnorderedPart {
343 fn default() -> Self {
344 Self::new()
345 }
346}
347
348impl UnorderedPart {
349 pub fn new() -> Self {
351 Self {
352 parts: Vec::new(),
353 total_rows: 0,
354 min_timestamp: i64::MAX,
355 max_timestamp: i64::MIN,
356 max_sequence: 0,
357 threshold: 1024,
358 compact_threshold: 4096,
359 }
360 }
361
362 pub fn set_threshold(&mut self, threshold: usize) {
364 self.threshold = threshold;
365 }
366
367 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
369 self.compact_threshold = compact_threshold;
370 }
371
372 pub fn threshold(&self) -> usize {
374 self.threshold
375 }
376
377 pub fn compact_threshold(&self) -> usize {
379 self.compact_threshold
380 }
381
382 pub fn should_accept(&self, num_rows: usize) -> bool {
384 num_rows < self.threshold
385 }
386
387 pub fn should_compact(&self) -> bool {
389 self.total_rows >= self.compact_threshold
390 }
391
392 pub fn push(&mut self, part: BulkPart) {
394 self.total_rows += part.num_rows();
395 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
396 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
397 self.max_sequence = self.max_sequence.max(part.sequence);
398 self.parts.push(part);
399 }
400
401 pub fn num_rows(&self) -> usize {
403 self.total_rows
404 }
405
406 pub fn is_empty(&self) -> bool {
408 self.parts.is_empty()
409 }
410
411 pub fn num_parts(&self) -> usize {
413 self.parts.len()
414 }
415
416 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
419 if self.parts.is_empty() {
420 return Ok(None);
421 }
422
423 if self.parts.len() == 1 {
424 return Ok(Some(self.parts[0].batch.clone()));
426 }
427
428 let schema = self.parts[0].batch.schema();
430
431 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
433 let concatenated =
434 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
435
436 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
438
439 Ok(Some(sorted_batch))
440 }
441
442 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
445 let Some(sorted_batch) = self.concat_and_sort()? else {
446 return Ok(None);
447 };
448
449 let timestamp_index = self.parts[0].timestamp_index;
450
451 Ok(Some(BulkPart {
452 batch: sorted_batch,
453 max_timestamp: self.max_timestamp,
454 min_timestamp: self.min_timestamp,
455 sequence: self.max_sequence,
456 timestamp_index,
457 raw_data: None,
458 }))
459 }
460
461 pub fn clear(&mut self) {
463 self.parts.clear();
464 self.total_rows = 0;
465 self.min_timestamp = i64::MAX;
466 self.max_timestamp = i64::MIN;
467 self.max_sequence = 0;
468 }
469}
470
471pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
473 batch
474 .columns()
475 .iter()
476 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
478 .sum()
479}
480
481enum PrimaryKeyColumnBuilder {
483 StringDict(StringDictionaryBuilder<UInt32Type>),
485 Vector(Box<dyn MutableVector>),
487}
488
489impl PrimaryKeyColumnBuilder {
490 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
492 match self {
493 PrimaryKeyColumnBuilder::StringDict(builder) => {
494 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
495 builder.append_value(s);
497 } else {
498 builder.append_null();
499 }
500 }
501 PrimaryKeyColumnBuilder::Vector(builder) => {
502 builder.push_value_ref(&value);
503 }
504 }
505 Ok(())
506 }
507
508 fn into_arrow_array(self) -> ArrayRef {
510 match self {
511 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
512 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
513 }
514 }
515}
516
517pub struct BulkPartConverter {
519 schema: SchemaRef,
521 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
523 key_buf: Vec<u8>,
525 key_array_builder: PrimaryKeyArrayBuilder,
527 value_builder: ValueBuilder,
529 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
532
533 max_ts: i64,
535 min_ts: i64,
537 max_sequence: SequenceNumber,
539}
540
541impl BulkPartConverter {
542 pub fn new(
547 region_metadata: &RegionMetadataRef,
548 schema: SchemaRef,
549 capacity: usize,
550 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
551 store_primary_key_columns: bool,
552 ) -> Self {
553 debug_assert_eq!(
554 region_metadata.primary_key_encoding,
555 primary_key_codec.encoding()
556 );
557
558 let primary_key_column_builders = if store_primary_key_columns
559 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
560 {
561 new_primary_key_column_builders(region_metadata, capacity)
562 } else {
563 Vec::new()
564 };
565
566 Self {
567 schema,
568 primary_key_codec,
569 key_buf: Vec::new(),
570 key_array_builder: PrimaryKeyArrayBuilder::new(),
571 value_builder: ValueBuilder::new(region_metadata, capacity),
572 primary_key_column_builders,
573 min_ts: i64::MAX,
574 max_ts: i64::MIN,
575 max_sequence: SequenceNumber::MIN,
576 }
577 }
578
579 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
581 for kv in key_values.iter() {
582 self.append_key_value(&kv)?;
583 }
584
585 Ok(())
586 }
587
588 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
592 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
594 let mut primary_keys = kv.primary_keys();
597 if let Some(encoded) = primary_keys
598 .next()
599 .context(ColumnNotFoundSnafu {
600 column: PRIMARY_KEY_COLUMN_NAME,
601 })?
602 .try_into_binary()
603 .context(DataTypeMismatchSnafu)?
604 {
605 self.key_array_builder
606 .append(encoded)
607 .context(ComputeArrowSnafu)?;
608 } else {
609 self.key_array_builder
610 .append("")
611 .context(ComputeArrowSnafu)?;
612 }
613 } else {
614 self.key_buf.clear();
616 self.primary_key_codec
617 .encode_key_value(kv, &mut self.key_buf)
618 .context(EncodeSnafu)?;
619 self.key_array_builder
620 .append(&self.key_buf)
621 .context(ComputeArrowSnafu)?;
622 };
623
624 if !self.primary_key_column_builders.is_empty() {
626 for (builder, pk_value) in self
627 .primary_key_column_builders
628 .iter_mut()
629 .zip(kv.primary_keys())
630 {
631 builder.push_value_ref(pk_value)?;
632 }
633 }
634
635 self.value_builder.push(
637 kv.timestamp(),
638 kv.sequence(),
639 kv.op_type() as u8,
640 kv.fields(),
641 );
642
643 let ts = kv
646 .timestamp()
647 .try_into_timestamp()
648 .unwrap()
649 .unwrap()
650 .value();
651 self.min_ts = self.min_ts.min(ts);
652 self.max_ts = self.max_ts.max(ts);
653 self.max_sequence = self.max_sequence.max(kv.sequence());
654
655 Ok(())
656 }
657
658 pub fn convert(mut self) -> Result<BulkPart> {
662 let values = Values::from(self.value_builder);
663 let mut columns =
664 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
665
666 for builder in self.primary_key_column_builders {
668 columns.push(builder.into_arrow_array());
669 }
670 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
672 let timestamp_index = columns.len();
674 columns.push(values.timestamp.to_arrow_array());
675 let pk_array = self.key_array_builder.finish();
677 columns.push(Arc::new(pk_array));
678 columns.push(values.sequence.to_arrow_array());
680 columns.push(values.op_type.to_arrow_array());
681
682 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
683 let batch = sort_primary_key_record_batch(&batch)?;
685
686 Ok(BulkPart {
687 batch,
688 max_timestamp: self.max_ts,
689 min_timestamp: self.min_ts,
690 sequence: self.max_sequence,
691 timestamp_index,
692 raw_data: None,
693 })
694 }
695}
696
697fn new_primary_key_column_builders(
698 metadata: &RegionMetadata,
699 capacity: usize,
700) -> Vec<PrimaryKeyColumnBuilder> {
701 metadata
702 .primary_key_columns()
703 .map(|col| {
704 if col.column_schema.data_type.is_string() {
705 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
706 capacity,
707 INIT_DICT_VALUE_CAPACITY,
708 capacity,
709 ))
710 } else {
711 PrimaryKeyColumnBuilder::Vector(
712 col.column_schema.data_type.create_mutable_vector(capacity),
713 )
714 }
715 })
716 .collect()
717}
718
719pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
721 let total_columns = batch.num_columns();
722 let sort_columns = vec![
723 SortColumn {
725 values: batch.column(total_columns - 3).clone(),
726 options: Some(SortOptions {
727 descending: false,
728 nulls_first: true,
729 }),
730 },
731 SortColumn {
733 values: batch.column(total_columns - 4).clone(),
734 options: Some(SortOptions {
735 descending: false,
736 nulls_first: true,
737 }),
738 },
739 SortColumn {
741 values: batch.column(total_columns - 2).clone(),
742 options: Some(SortOptions {
743 descending: true,
744 nulls_first: true,
745 }),
746 },
747 ];
748
749 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
750 .context(ComputeArrowSnafu)?;
751
752 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
753}
754
755pub fn convert_bulk_part(
781 part: BulkPart,
782 region_metadata: &RegionMetadataRef,
783 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
784 schema: SchemaRef,
785 store_primary_key_columns: bool,
786) -> Result<Option<BulkPart>> {
787 if part.num_rows() == 0 {
788 return Ok(None);
789 }
790
791 let num_rows = part.num_rows();
792 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
793
794 let input_schema = part.batch.schema();
796 let column_indices: HashMap<&str, usize> = input_schema
797 .fields()
798 .iter()
799 .enumerate()
800 .map(|(idx, field)| (field.name().as_str(), idx))
801 .collect();
802
803 let mut output_columns = Vec::new();
805
806 let pk_array = if is_sparse {
808 None
811 } else {
812 let pk_vectors: Result<Vec<_>> = region_metadata
814 .primary_key_columns()
815 .map(|col_meta| {
816 let col_idx = column_indices
817 .get(col_meta.column_schema.name.as_str())
818 .context(ColumnNotFoundSnafu {
819 column: &col_meta.column_schema.name,
820 })?;
821 let col = part.batch.column(*col_idx);
822 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
823 })
824 .collect();
825 let pk_vectors = pk_vectors?;
826
827 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
828 let mut encode_buf = Vec::new();
829
830 for row_idx in 0..num_rows {
831 encode_buf.clear();
832
833 let pk_values_with_ids: Vec<_> = region_metadata
835 .primary_key
836 .iter()
837 .zip(pk_vectors.iter())
838 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
839 .collect();
840
841 primary_key_codec
843 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
844 .context(EncodeSnafu)?;
845
846 key_array_builder
847 .append(&encode_buf)
848 .context(ComputeArrowSnafu)?;
849 }
850
851 Some(key_array_builder.finish())
852 };
853
854 if store_primary_key_columns && !is_sparse {
856 for col_meta in region_metadata.primary_key_columns() {
857 let col_idx = column_indices
858 .get(col_meta.column_schema.name.as_str())
859 .context(ColumnNotFoundSnafu {
860 column: &col_meta.column_schema.name,
861 })?;
862 let col = part.batch.column(*col_idx);
863
864 let col = if col_meta.column_schema.data_type.is_string() {
866 let target_type = ArrowDataType::Dictionary(
867 Box::new(ArrowDataType::UInt32),
868 Box::new(ArrowDataType::Utf8),
869 );
870 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
871 } else {
872 col.clone()
873 };
874 output_columns.push(col);
875 }
876 }
877
878 for col_meta in region_metadata.field_columns() {
880 let col_idx = column_indices
881 .get(col_meta.column_schema.name.as_str())
882 .context(ColumnNotFoundSnafu {
883 column: &col_meta.column_schema.name,
884 })?;
885 output_columns.push(part.batch.column(*col_idx).clone());
886 }
887
888 let new_timestamp_index = output_columns.len();
890 let ts_col_idx = column_indices
891 .get(
892 region_metadata
893 .time_index_column()
894 .column_schema
895 .name
896 .as_str(),
897 )
898 .context(ColumnNotFoundSnafu {
899 column: ®ion_metadata.time_index_column().column_schema.name,
900 })?;
901 output_columns.push(part.batch.column(*ts_col_idx).clone());
902
903 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
905 Arc::new(pk_dict_array) as ArrayRef
906 } else {
907 let pk_col_idx =
908 column_indices
909 .get(PRIMARY_KEY_COLUMN_NAME)
910 .context(ColumnNotFoundSnafu {
911 column: PRIMARY_KEY_COLUMN_NAME,
912 })?;
913 let col = part.batch.column(*pk_col_idx);
914
915 let target_type = ArrowDataType::Dictionary(
917 Box::new(ArrowDataType::UInt32),
918 Box::new(ArrowDataType::Binary),
919 );
920 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
921 };
922 output_columns.push(pk_dictionary);
923
924 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
925 output_columns.push(Arc::new(sequence_array) as ArrayRef);
926
927 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
928 output_columns.push(Arc::new(op_type_array) as ArrayRef);
929
930 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
931
932 let sorted_batch = sort_primary_key_record_batch(&batch)?;
934
935 Ok(Some(BulkPart {
936 batch: sorted_batch,
937 max_timestamp: part.max_timestamp,
938 min_timestamp: part.min_timestamp,
939 sequence: part.sequence,
940 timestamp_index: new_timestamp_index,
941 raw_data: None,
942 }))
943}
944
945#[derive(Debug, Clone)]
946pub struct EncodedBulkPart {
947 data: Bytes,
948 metadata: BulkPartMeta,
949}
950
951impl EncodedBulkPart {
952 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
953 Self { data, metadata }
954 }
955
956 pub fn metadata(&self) -> &BulkPartMeta {
957 &self.metadata
958 }
959
960 pub(crate) fn size_bytes(&self) -> usize {
962 self.data.len()
963 }
964
965 pub fn data(&self) -> &Bytes {
967 &self.data
968 }
969
970 pub fn to_memtable_stats(&self) -> MemtableStats {
972 let meta = &self.metadata;
973 let ts_type = meta.region_metadata.time_index_type();
974 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
975 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
976
977 MemtableStats {
978 estimated_bytes: self.size_bytes(),
979 time_range: Some((min_ts, max_ts)),
980 num_rows: meta.num_rows,
981 num_ranges: 1,
982 max_sequence: meta.max_sequence,
983 series_count: meta.num_series as usize,
984 }
985 }
986
987 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
995 let unit = self.metadata.region_metadata.time_index_type().unit();
996 let max_row_group_uncompressed_size: u64 = self
997 .metadata
998 .parquet_metadata
999 .row_groups()
1000 .iter()
1001 .map(|rg| {
1002 rg.columns()
1003 .iter()
1004 .map(|c| c.uncompressed_size() as u64)
1005 .sum::<u64>()
1006 })
1007 .max()
1008 .unwrap_or(0);
1009 SstInfo {
1010 file_id,
1011 time_range: (
1012 Timestamp::new(self.metadata.min_timestamp, unit),
1013 Timestamp::new(self.metadata.max_timestamp, unit),
1014 ),
1015 file_size: self.data.len() as u64,
1016 max_row_group_uncompressed_size,
1017 num_rows: self.metadata.num_rows,
1018 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1019 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1020 index_metadata: IndexOutput::default(),
1021 num_series: self.metadata.num_series,
1022 }
1023 }
1024
1025 pub(crate) fn read(
1026 &self,
1027 context: BulkIterContextRef,
1028 sequence: Option<SequenceRange>,
1029 mem_scan_metrics: Option<MemScanMetrics>,
1030 ) -> Result<Option<BoxedRecordBatchIterator>> {
1031 let skip_fields_for_pruning =
1033 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1034
1035 let row_groups_to_read =
1037 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1038
1039 if row_groups_to_read.is_empty() {
1040 return Ok(None);
1042 }
1043
1044 let iter = EncodedBulkPartIter::try_new(
1045 self,
1046 context,
1047 row_groups_to_read,
1048 sequence,
1049 mem_scan_metrics,
1050 )?;
1051 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1052 }
1053
1054 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1056 match pre_filter_mode {
1057 PreFilterMode::All => false,
1058 PreFilterMode::SkipFields => true,
1059 PreFilterMode::SkipFieldsOnDelete => {
1060 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1062 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1063 })
1064 }
1065 }
1066 }
1067}
1068
1069#[derive(Debug, Clone)]
1071pub struct BulkPartMeta {
1072 pub num_rows: usize,
1074 pub max_timestamp: i64,
1076 pub min_timestamp: i64,
1078 pub parquet_metadata: Arc<ParquetMetaData>,
1080 pub region_metadata: RegionMetadataRef,
1082 pub num_series: u64,
1084 pub max_sequence: u64,
1086}
1087
1088#[derive(Default, Debug)]
1090pub struct BulkPartEncodeMetrics {
1091 pub iter_cost: Duration,
1093 pub write_cost: Duration,
1095 pub raw_size: usize,
1097 pub encoded_size: usize,
1099 pub num_rows: usize,
1101}
1102
1103pub struct BulkPartEncoder {
1104 metadata: RegionMetadataRef,
1105 writer_props: Option<WriterProperties>,
1106}
1107
1108impl BulkPartEncoder {
1109 pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1110 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1112 let key_value_meta =
1113 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1114
1115 let writer_props = Some(
1117 WriterProperties::builder()
1118 .set_key_value_metadata(Some(vec![key_value_meta]))
1119 .set_write_batch_size(row_group_size)
1120 .set_max_row_group_size(row_group_size)
1121 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1122 .set_column_index_truncate_length(None)
1123 .set_statistics_truncate_length(None)
1124 .build(),
1125 );
1126
1127 Ok(Self {
1128 metadata,
1129 writer_props,
1130 })
1131 }
1132}
1133
1134impl BulkPartEncoder {
1135 pub fn encode_record_batch_iter(
1137 &self,
1138 iter: BoxedRecordBatchIterator,
1139 arrow_schema: SchemaRef,
1140 min_timestamp: i64,
1141 max_timestamp: i64,
1142 max_sequence: u64,
1143 metrics: &mut BulkPartEncodeMetrics,
1144 ) -> Result<Option<EncodedBulkPart>> {
1145 let mut buf = Vec::with_capacity(4096);
1146 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1147 .context(EncodeMemtableSnafu)?;
1148 let mut total_rows = 0;
1149 let mut series_estimator = SeriesEstimator::default();
1150
1151 let mut iter_start = Instant::now();
1153 for batch_result in iter {
1154 metrics.iter_cost += iter_start.elapsed();
1155 let batch = batch_result?;
1156 if batch.num_rows() == 0 {
1157 continue;
1158 }
1159
1160 series_estimator.update_flat(&batch);
1161 metrics.raw_size += record_batch_estimated_size(&batch);
1162 let write_start = Instant::now();
1163 writer.write(&batch).context(EncodeMemtableSnafu)?;
1164 metrics.write_cost += write_start.elapsed();
1165 total_rows += batch.num_rows();
1166 iter_start = Instant::now();
1167 }
1168 metrics.iter_cost += iter_start.elapsed();
1169
1170 if total_rows == 0 {
1171 return Ok(None);
1172 }
1173
1174 let close_start = Instant::now();
1175 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1176 metrics.write_cost += close_start.elapsed();
1177 metrics.encoded_size += buf.len();
1178 metrics.num_rows += total_rows;
1179
1180 let buf = Bytes::from(buf);
1181 let parquet_metadata = Arc::new(file_metadata);
1182 let num_series = series_estimator.finish();
1183
1184 Ok(Some(EncodedBulkPart {
1185 data: buf,
1186 metadata: BulkPartMeta {
1187 num_rows: total_rows,
1188 max_timestamp,
1189 min_timestamp,
1190 parquet_metadata,
1191 region_metadata: self.metadata.clone(),
1192 num_series,
1193 max_sequence,
1194 },
1195 }))
1196 }
1197
1198 pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1200 if part.batch.num_rows() == 0 {
1201 return Ok(None);
1202 }
1203
1204 let mut buf = Vec::with_capacity(4096);
1205 let arrow_schema = part.batch.schema();
1206
1207 let file_metadata = {
1208 let mut writer =
1209 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1210 .context(EncodeMemtableSnafu)?;
1211 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1212 writer.finish().context(EncodeMemtableSnafu)?
1213 };
1214
1215 let buf = Bytes::from(buf);
1216 let parquet_metadata = Arc::new(file_metadata);
1217
1218 Ok(Some(EncodedBulkPart {
1219 data: buf,
1220 metadata: BulkPartMeta {
1221 num_rows: part.batch.num_rows(),
1222 max_timestamp: part.max_timestamp,
1223 min_timestamp: part.min_timestamp,
1224 parquet_metadata,
1225 region_metadata: self.metadata.clone(),
1226 num_series: part.estimated_series_count() as u64,
1227 max_sequence: part.sequence,
1228 },
1229 }))
1230 }
1231}
1232
1233#[derive(Debug, Clone)]
1239pub struct MultiBulkPart {
1240 batches: SmallVec<[RecordBatch; 4]>,
1242 total_rows: usize,
1244 max_timestamp: i64,
1246 min_timestamp: i64,
1248 max_sequence: SequenceNumber,
1250 series_count: usize,
1252}
1253
1254impl MultiBulkPart {
1255 pub fn from_bulk_part(part: BulkPart) -> Self {
1257 let num_rows = part.num_rows();
1258 let series_count = part.estimated_series_count();
1259 let mut batches = SmallVec::new();
1260 batches.push(part.batch);
1261
1262 Self {
1263 batches,
1264 total_rows: num_rows,
1265 max_timestamp: part.max_timestamp,
1266 min_timestamp: part.min_timestamp,
1267 max_sequence: part.sequence,
1268 series_count,
1269 }
1270 }
1271
1272 pub fn new(
1284 batches: Vec<RecordBatch>,
1285 min_timestamp: i64,
1286 max_timestamp: i64,
1287 max_sequence: SequenceNumber,
1288 series_count: usize,
1289 ) -> Self {
1290 assert!(!batches.is_empty(), "batches must not be empty");
1291
1292 let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1293
1294 Self {
1295 batches: SmallVec::from_vec(batches),
1296 total_rows,
1297 max_timestamp,
1298 min_timestamp,
1299 max_sequence,
1300 series_count,
1301 }
1302 }
1303
1304 pub fn num_rows(&self) -> usize {
1306 self.total_rows
1307 }
1308
1309 pub fn min_timestamp(&self) -> i64 {
1311 self.min_timestamp
1312 }
1313
1314 pub fn max_timestamp(&self) -> i64 {
1316 self.max_timestamp
1317 }
1318
1319 pub fn max_sequence(&self) -> SequenceNumber {
1321 self.max_sequence
1322 }
1323
1324 pub fn series_count(&self) -> usize {
1326 self.series_count
1327 }
1328
1329 pub fn num_batches(&self) -> usize {
1331 self.batches.len()
1332 }
1333
1334 pub(crate) fn estimated_size(&self) -> usize {
1336 self.batches.iter().map(record_batch_estimated_size).sum()
1337 }
1338
1339 pub(crate) fn read(
1341 &self,
1342 context: BulkIterContextRef,
1343 sequence: Option<SequenceRange>,
1344 mem_scan_metrics: Option<MemScanMetrics>,
1345 ) -> Result<Option<BoxedRecordBatchIterator>> {
1346 if self.batches.is_empty() {
1347 return Ok(None);
1348 }
1349
1350 let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1351 self.batches.iter().cloned().collect(),
1352 context,
1353 sequence,
1354 self.series_count,
1355 mem_scan_metrics,
1356 );
1357 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1358 }
1359
1360 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1362 let ts_type = region_metadata.time_index_type();
1363 let min_ts = ts_type.create_timestamp(self.min_timestamp);
1364 let max_ts = ts_type.create_timestamp(self.max_timestamp);
1365
1366 MemtableStats {
1367 estimated_bytes: self.estimated_size(),
1368 time_range: Some((min_ts, max_ts)),
1369 num_rows: self.num_rows(),
1370 num_ranges: 1,
1371 max_sequence: self.max_sequence,
1372 series_count: self.series_count,
1373 }
1374 }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379 use api::v1::{Row, SemanticType, WriteHint};
1380 use datafusion_common::ScalarValue;
1381 use datatypes::arrow::array::{
1382 BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1383 };
1384 use datatypes::arrow::datatypes::UInt32Type;
1385 use datatypes::prelude::{ConcreteDataType, Value};
1386 use datatypes::schema::ColumnSchema;
1387 use mito_codec::row_converter::build_primary_key_codec;
1388 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1389 use store_api::storage::RegionId;
1390 use store_api::storage::consts::ReservedColumnId;
1391 use table::predicate::Predicate;
1392
1393 use super::*;
1394 use crate::memtable::bulk::context::BulkIterContext;
1395 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1396 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1397
1398 struct MutationInput<'a> {
1399 k0: &'a str,
1400 k1: u32,
1401 timestamps: &'a [i64],
1402 v1: &'a [Option<f64>],
1403 sequence: u64,
1404 }
1405
1406 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1407 let metadata = metadata_for_test();
1408 let kvs = input
1409 .iter()
1410 .map(|m| {
1411 build_key_values_with_ts_seq_values(
1412 &metadata,
1413 m.k0.to_string(),
1414 m.k1,
1415 m.timestamps.iter().copied(),
1416 m.v1.iter().copied(),
1417 m.sequence,
1418 )
1419 })
1420 .collect::<Vec<_>>();
1421 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1422 let primary_key_codec = build_primary_key_codec(&metadata);
1423 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1424 for kv in kvs {
1425 converter.append_key_values(&kv).unwrap();
1426 }
1427 let part = converter.convert().unwrap();
1428 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1429 encoder.encode_part(&part).unwrap().unwrap()
1430 }
1431
1432 #[test]
1433 fn test_write_and_read_part_projection() {
1434 let part = encode(&[
1435 MutationInput {
1436 k0: "a",
1437 k1: 0,
1438 timestamps: &[1],
1439 v1: &[Some(0.1)],
1440 sequence: 0,
1441 },
1442 MutationInput {
1443 k0: "b",
1444 k1: 0,
1445 timestamps: &[1],
1446 v1: &[Some(0.0)],
1447 sequence: 0,
1448 },
1449 MutationInput {
1450 k0: "a",
1451 k1: 0,
1452 timestamps: &[2],
1453 v1: &[Some(0.2)],
1454 sequence: 1,
1455 },
1456 ]);
1457
1458 let projection = &[4u32];
1459 let reader = part
1460 .read(
1461 Arc::new(
1462 BulkIterContext::new(
1463 part.metadata.region_metadata.clone(),
1464 Some(projection.as_slice()),
1465 None,
1466 false,
1467 )
1468 .unwrap(),
1469 ),
1470 None,
1471 None,
1472 )
1473 .unwrap()
1474 .expect("expect at least one row group");
1475
1476 let mut total_rows_read = 0;
1477 let mut field: Vec<f64> = vec![];
1478 for res in reader {
1479 let batch = res.unwrap();
1480 assert_eq!(5, batch.num_columns());
1481 field.extend_from_slice(
1482 batch
1483 .column(0)
1484 .as_any()
1485 .downcast_ref::<Float64Array>()
1486 .unwrap()
1487 .values(),
1488 );
1489 total_rows_read += batch.num_rows();
1490 }
1491 assert_eq!(3, total_rows_read);
1492 assert_eq!(vec![0.1, 0.2, 0.0], field);
1493 }
1494
1495 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1496 let metadata = metadata_for_test();
1497 let kvs = key_values
1498 .into_iter()
1499 .map(|(k0, k1, (start, end), sequence)| {
1500 let ts = start..end;
1501 let v1 = (start..end).map(|_| None);
1502 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1503 })
1504 .collect::<Vec<_>>();
1505 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1506 let primary_key_codec = build_primary_key_codec(&metadata);
1507 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1508 for kv in kvs {
1509 converter.append_key_values(&kv).unwrap();
1510 }
1511 let part = converter.convert().unwrap();
1512 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1513 encoder.encode_part(&part).unwrap().unwrap()
1514 }
1515
1516 fn check_prune_row_group(
1517 part: &EncodedBulkPart,
1518 predicate: Option<Predicate>,
1519 expected_rows: usize,
1520 ) {
1521 let context = Arc::new(
1522 BulkIterContext::new(
1523 part.metadata.region_metadata.clone(),
1524 None,
1525 predicate,
1526 false,
1527 )
1528 .unwrap(),
1529 );
1530 let reader = part
1531 .read(context, None, None)
1532 .unwrap()
1533 .expect("expect at least one row group");
1534 let mut total_rows_read = 0;
1535 for res in reader {
1536 let batch = res.unwrap();
1537 total_rows_read += batch.num_rows();
1538 }
1539 assert_eq!(expected_rows, total_rows_read);
1541 }
1542
1543 #[test]
1544 fn test_prune_row_groups() {
1545 let part = prepare(vec![
1546 ("a", 0, (0, 40), 1),
1547 ("a", 1, (0, 60), 1),
1548 ("b", 0, (0, 100), 2),
1549 ("b", 1, (100, 180), 3),
1550 ("b", 1, (180, 210), 4),
1551 ]);
1552
1553 let context = Arc::new(
1554 BulkIterContext::new(
1555 part.metadata.region_metadata.clone(),
1556 None,
1557 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1558 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1559 )])),
1560 false,
1561 )
1562 .unwrap(),
1563 );
1564 assert!(part.read(context, None, None).unwrap().is_none());
1565
1566 check_prune_row_group(&part, None, 310);
1567
1568 check_prune_row_group(
1569 &part,
1570 Some(Predicate::new(vec![
1571 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1572 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1573 ])),
1574 40,
1575 );
1576
1577 check_prune_row_group(
1578 &part,
1579 Some(Predicate::new(vec![
1580 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1581 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1582 ])),
1583 60,
1584 );
1585
1586 check_prune_row_group(
1587 &part,
1588 Some(Predicate::new(vec![
1589 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1590 ])),
1591 100,
1592 );
1593
1594 check_prune_row_group(
1595 &part,
1596 Some(Predicate::new(vec![
1597 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1598 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1599 ])),
1600 100,
1601 );
1602
1603 check_prune_row_group(
1605 &part,
1606 Some(Predicate::new(vec![
1607 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1608 ])),
1609 1,
1610 );
1611 }
1612
1613 #[test]
1614 fn test_bulk_part_converter_append_and_convert() {
1615 let metadata = metadata_for_test();
1616 let capacity = 100;
1617 let primary_key_codec = build_primary_key_codec(&metadata);
1618 let schema = to_flat_sst_arrow_schema(
1619 &metadata,
1620 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1621 );
1622
1623 let mut converter =
1624 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1625
1626 let key_values1 = build_key_values_with_ts_seq_values(
1627 &metadata,
1628 "key1".to_string(),
1629 1u32,
1630 vec![1000, 2000].into_iter(),
1631 vec![Some(1.0), Some(2.0)].into_iter(),
1632 1,
1633 );
1634
1635 let key_values2 = build_key_values_with_ts_seq_values(
1636 &metadata,
1637 "key2".to_string(),
1638 2u32,
1639 vec![1500].into_iter(),
1640 vec![Some(3.0)].into_iter(),
1641 2,
1642 );
1643
1644 converter.append_key_values(&key_values1).unwrap();
1645 converter.append_key_values(&key_values2).unwrap();
1646
1647 let bulk_part = converter.convert().unwrap();
1648
1649 assert_eq!(bulk_part.num_rows(), 3);
1650 assert_eq!(bulk_part.min_timestamp, 1000);
1651 assert_eq!(bulk_part.max_timestamp, 2000);
1652 assert_eq!(bulk_part.sequence, 2);
1653 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1654
1655 let schema = bulk_part.batch.schema();
1658 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1659 assert_eq!(
1660 field_names,
1661 vec![
1662 "k0",
1663 "k1",
1664 "v0",
1665 "v1",
1666 "ts",
1667 "__primary_key",
1668 "__sequence",
1669 "__op_type"
1670 ]
1671 );
1672 }
1673
1674 #[test]
1675 fn test_bulk_part_converter_sorting() {
1676 let metadata = metadata_for_test();
1677 let capacity = 100;
1678 let primary_key_codec = build_primary_key_codec(&metadata);
1679 let schema = to_flat_sst_arrow_schema(
1680 &metadata,
1681 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1682 );
1683
1684 let mut converter =
1685 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1686
1687 let key_values1 = build_key_values_with_ts_seq_values(
1688 &metadata,
1689 "z_key".to_string(),
1690 3u32,
1691 vec![3000].into_iter(),
1692 vec![Some(3.0)].into_iter(),
1693 3,
1694 );
1695
1696 let key_values2 = build_key_values_with_ts_seq_values(
1697 &metadata,
1698 "a_key".to_string(),
1699 1u32,
1700 vec![1000].into_iter(),
1701 vec![Some(1.0)].into_iter(),
1702 1,
1703 );
1704
1705 let key_values3 = build_key_values_with_ts_seq_values(
1706 &metadata,
1707 "m_key".to_string(),
1708 2u32,
1709 vec![2000].into_iter(),
1710 vec![Some(2.0)].into_iter(),
1711 2,
1712 );
1713
1714 converter.append_key_values(&key_values1).unwrap();
1715 converter.append_key_values(&key_values2).unwrap();
1716 converter.append_key_values(&key_values3).unwrap();
1717
1718 let bulk_part = converter.convert().unwrap();
1719
1720 assert_eq!(bulk_part.num_rows(), 3);
1721
1722 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1723 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1724
1725 let ts_array = ts_column
1726 .as_any()
1727 .downcast_ref::<TimestampMillisecondArray>()
1728 .unwrap();
1729 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1730
1731 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1732 assert_eq!(seq_array.values(), &[1, 2, 3]);
1733
1734 let schema = bulk_part.batch.schema();
1736 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1737 assert_eq!(
1738 field_names,
1739 vec![
1740 "k0",
1741 "k1",
1742 "v0",
1743 "v1",
1744 "ts",
1745 "__primary_key",
1746 "__sequence",
1747 "__op_type"
1748 ]
1749 );
1750 }
1751
1752 #[test]
1753 fn test_bulk_part_converter_empty() {
1754 let metadata = metadata_for_test();
1755 let capacity = 10;
1756 let primary_key_codec = build_primary_key_codec(&metadata);
1757 let schema = to_flat_sst_arrow_schema(
1758 &metadata,
1759 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1760 );
1761
1762 let converter =
1763 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1764
1765 let bulk_part = converter.convert().unwrap();
1766
1767 assert_eq!(bulk_part.num_rows(), 0);
1768 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1769 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1770 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1771
1772 let schema = bulk_part.batch.schema();
1774 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1775 assert_eq!(
1776 field_names,
1777 vec![
1778 "k0",
1779 "k1",
1780 "v0",
1781 "v1",
1782 "ts",
1783 "__primary_key",
1784 "__sequence",
1785 "__op_type"
1786 ]
1787 );
1788 }
1789
1790 #[test]
1791 fn test_bulk_part_converter_without_primary_key_columns() {
1792 let metadata = metadata_for_test();
1793 let primary_key_codec = build_primary_key_codec(&metadata);
1794 let schema = to_flat_sst_arrow_schema(
1795 &metadata,
1796 &FlatSchemaOptions {
1797 raw_pk_columns: false,
1798 string_pk_use_dict: true,
1799 },
1800 );
1801
1802 let capacity = 100;
1803 let mut converter =
1804 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1805
1806 let key_values1 = build_key_values_with_ts_seq_values(
1807 &metadata,
1808 "key1".to_string(),
1809 1u32,
1810 vec![1000, 2000].into_iter(),
1811 vec![Some(1.0), Some(2.0)].into_iter(),
1812 1,
1813 );
1814
1815 let key_values2 = build_key_values_with_ts_seq_values(
1816 &metadata,
1817 "key2".to_string(),
1818 2u32,
1819 vec![1500].into_iter(),
1820 vec![Some(3.0)].into_iter(),
1821 2,
1822 );
1823
1824 converter.append_key_values(&key_values1).unwrap();
1825 converter.append_key_values(&key_values2).unwrap();
1826
1827 let bulk_part = converter.convert().unwrap();
1828
1829 assert_eq!(bulk_part.num_rows(), 3);
1830 assert_eq!(bulk_part.min_timestamp, 1000);
1831 assert_eq!(bulk_part.max_timestamp, 2000);
1832 assert_eq!(bulk_part.sequence, 2);
1833 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1834
1835 let schema = bulk_part.batch.schema();
1837 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1838 assert_eq!(
1839 field_names,
1840 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1841 );
1842 }
1843
1844 #[allow(clippy::too_many_arguments)]
1845 fn build_key_values_with_sparse_encoding(
1846 metadata: &RegionMetadataRef,
1847 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1848 table_id: u32,
1849 tsid: u64,
1850 k0: String,
1851 k1: String,
1852 timestamps: impl Iterator<Item = i64>,
1853 values: impl Iterator<Item = Option<f64>>,
1854 sequence: SequenceNumber,
1855 ) -> KeyValues {
1856 let pk_values = vec![
1858 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1859 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1860 (0, Value::String(k0.clone().into())),
1861 (1, Value::String(k1.clone().into())),
1862 ];
1863 let mut encoded_key = Vec::new();
1864 primary_key_codec
1865 .encode_values(&pk_values, &mut encoded_key)
1866 .unwrap();
1867 assert!(!encoded_key.is_empty());
1868
1869 let column_schema = vec![
1871 api::v1::ColumnSchema {
1872 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1873 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1874 ConcreteDataType::binary_datatype(),
1875 )
1876 .unwrap()
1877 .datatype() as i32,
1878 semantic_type: api::v1::SemanticType::Tag as i32,
1879 ..Default::default()
1880 },
1881 api::v1::ColumnSchema {
1882 column_name: "ts".to_string(),
1883 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1884 ConcreteDataType::timestamp_millisecond_datatype(),
1885 )
1886 .unwrap()
1887 .datatype() as i32,
1888 semantic_type: api::v1::SemanticType::Timestamp as i32,
1889 ..Default::default()
1890 },
1891 api::v1::ColumnSchema {
1892 column_name: "v0".to_string(),
1893 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1894 ConcreteDataType::int64_datatype(),
1895 )
1896 .unwrap()
1897 .datatype() as i32,
1898 semantic_type: api::v1::SemanticType::Field as i32,
1899 ..Default::default()
1900 },
1901 api::v1::ColumnSchema {
1902 column_name: "v1".to_string(),
1903 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1904 ConcreteDataType::float64_datatype(),
1905 )
1906 .unwrap()
1907 .datatype() as i32,
1908 semantic_type: api::v1::SemanticType::Field as i32,
1909 ..Default::default()
1910 },
1911 ];
1912
1913 let rows = timestamps
1914 .zip(values)
1915 .map(|(ts, v)| Row {
1916 values: vec![
1917 api::v1::Value {
1918 value_data: Some(api::v1::value::ValueData::BinaryValue(
1919 encoded_key.clone(),
1920 )),
1921 },
1922 api::v1::Value {
1923 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1924 },
1925 api::v1::Value {
1926 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1927 },
1928 api::v1::Value {
1929 value_data: v.map(api::v1::value::ValueData::F64Value),
1930 },
1931 ],
1932 })
1933 .collect();
1934
1935 let mutation = api::v1::Mutation {
1936 op_type: 1,
1937 sequence,
1938 rows: Some(api::v1::Rows {
1939 schema: column_schema,
1940 rows,
1941 }),
1942 write_hint: Some(WriteHint {
1943 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1944 }),
1945 };
1946 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1947 }
1948
1949 #[test]
1950 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1951 use api::v1::SemanticType;
1952 use datatypes::schema::ColumnSchema;
1953 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1954 use store_api::storage::RegionId;
1955
1956 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1957 builder
1958 .push_column_metadata(ColumnMetadata {
1959 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1960 semantic_type: SemanticType::Tag,
1961 column_id: 0,
1962 })
1963 .push_column_metadata(ColumnMetadata {
1964 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1965 semantic_type: SemanticType::Tag,
1966 column_id: 1,
1967 })
1968 .push_column_metadata(ColumnMetadata {
1969 column_schema: ColumnSchema::new(
1970 "ts",
1971 ConcreteDataType::timestamp_millisecond_datatype(),
1972 false,
1973 ),
1974 semantic_type: SemanticType::Timestamp,
1975 column_id: 2,
1976 })
1977 .push_column_metadata(ColumnMetadata {
1978 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1979 semantic_type: SemanticType::Field,
1980 column_id: 3,
1981 })
1982 .push_column_metadata(ColumnMetadata {
1983 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1984 semantic_type: SemanticType::Field,
1985 column_id: 4,
1986 })
1987 .primary_key(vec![0, 1])
1988 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1989 let metadata = Arc::new(builder.build().unwrap());
1990
1991 let primary_key_codec = build_primary_key_codec(&metadata);
1992 let schema = to_flat_sst_arrow_schema(
1993 &metadata,
1994 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1995 );
1996
1997 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1998 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1999
2000 let capacity = 100;
2001 let mut converter =
2002 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2003
2004 let key_values1 = build_key_values_with_sparse_encoding(
2005 &metadata,
2006 &primary_key_codec,
2007 2048u32, 100u64, "key11".to_string(),
2010 "key21".to_string(),
2011 vec![1000, 2000].into_iter(),
2012 vec![Some(1.0), Some(2.0)].into_iter(),
2013 1,
2014 );
2015
2016 let key_values2 = build_key_values_with_sparse_encoding(
2017 &metadata,
2018 &primary_key_codec,
2019 4096u32, 200u64, "key12".to_string(),
2022 "key22".to_string(),
2023 vec![1500].into_iter(),
2024 vec![Some(3.0)].into_iter(),
2025 2,
2026 );
2027
2028 converter.append_key_values(&key_values1).unwrap();
2029 converter.append_key_values(&key_values2).unwrap();
2030
2031 let bulk_part = converter.convert().unwrap();
2032
2033 assert_eq!(bulk_part.num_rows(), 3);
2034 assert_eq!(bulk_part.min_timestamp, 1000);
2035 assert_eq!(bulk_part.max_timestamp, 2000);
2036 assert_eq!(bulk_part.sequence, 2);
2037 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2038
2039 let schema = bulk_part.batch.schema();
2043 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2044 assert_eq!(
2045 field_names,
2046 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2047 );
2048
2049 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2051 let dict_array = primary_key_column
2052 .as_any()
2053 .downcast_ref::<DictionaryArray<UInt32Type>>()
2054 .unwrap();
2055
2056 assert!(!dict_array.is_empty());
2058 assert_eq!(dict_array.len(), 3); let values = dict_array
2062 .values()
2063 .as_any()
2064 .downcast_ref::<BinaryArray>()
2065 .unwrap();
2066 for i in 0..values.len() {
2067 assert!(
2068 !values.value(i).is_empty(),
2069 "Encoded primary key should not be empty"
2070 );
2071 }
2072 }
2073
2074 #[test]
2075 fn test_convert_bulk_part_empty() {
2076 let metadata = metadata_for_test();
2077 let schema = to_flat_sst_arrow_schema(
2078 &metadata,
2079 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2080 );
2081 let primary_key_codec = build_primary_key_codec(&metadata);
2082
2083 let empty_batch = RecordBatch::new_empty(schema.clone());
2085 let empty_part = BulkPart {
2086 batch: empty_batch,
2087 max_timestamp: 0,
2088 min_timestamp: 0,
2089 sequence: 0,
2090 timestamp_index: 0,
2091 raw_data: None,
2092 };
2093
2094 let result =
2095 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2096 assert!(result.is_none());
2097 }
2098
2099 #[test]
2100 fn test_convert_bulk_part_dense_with_pk_columns() {
2101 let metadata = metadata_for_test();
2102 let primary_key_codec = build_primary_key_codec(&metadata);
2103
2104 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2105 "key1", "key2", "key1",
2106 ]));
2107 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2108 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2109 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2110 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2111
2112 let input_schema = Arc::new(Schema::new(vec![
2113 Field::new("k0", ArrowDataType::Utf8, false),
2114 Field::new("k1", ArrowDataType::UInt32, false),
2115 Field::new("v0", ArrowDataType::Int64, true),
2116 Field::new("v1", ArrowDataType::Float64, true),
2117 Field::new(
2118 "ts",
2119 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2120 false,
2121 ),
2122 ]));
2123
2124 let input_batch = RecordBatch::try_new(
2125 input_schema,
2126 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2127 )
2128 .unwrap();
2129
2130 let part = BulkPart {
2131 batch: input_batch,
2132 max_timestamp: 2000,
2133 min_timestamp: 1000,
2134 sequence: 5,
2135 timestamp_index: 4,
2136 raw_data: None,
2137 };
2138
2139 let output_schema = to_flat_sst_arrow_schema(
2140 &metadata,
2141 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2142 );
2143
2144 let result = convert_bulk_part(
2145 part,
2146 &metadata,
2147 primary_key_codec,
2148 output_schema,
2149 true, )
2151 .unwrap();
2152
2153 let converted = result.unwrap();
2154
2155 assert_eq!(converted.num_rows(), 3);
2156 assert_eq!(converted.max_timestamp, 2000);
2157 assert_eq!(converted.min_timestamp, 1000);
2158 assert_eq!(converted.sequence, 5);
2159
2160 let schema = converted.batch.schema();
2161 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2162 assert_eq!(
2163 field_names,
2164 vec![
2165 "k0",
2166 "k1",
2167 "v0",
2168 "v1",
2169 "ts",
2170 "__primary_key",
2171 "__sequence",
2172 "__op_type"
2173 ]
2174 );
2175
2176 let k0_col = converted.batch.column_by_name("k0").unwrap();
2177 assert!(matches!(
2178 k0_col.data_type(),
2179 ArrowDataType::Dictionary(_, _)
2180 ));
2181
2182 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2183 let dict_array = pk_col
2184 .as_any()
2185 .downcast_ref::<DictionaryArray<UInt32Type>>()
2186 .unwrap();
2187 let keys = dict_array.keys();
2188
2189 assert_eq!(keys.len(), 3);
2190 }
2191
2192 #[test]
2193 fn test_convert_bulk_part_dense_without_pk_columns() {
2194 let metadata = metadata_for_test();
2195 let primary_key_codec = build_primary_key_codec(&metadata);
2196
2197 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2199 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2200 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2201 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2202 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2203
2204 let input_schema = Arc::new(Schema::new(vec![
2205 Field::new("k0", ArrowDataType::Utf8, false),
2206 Field::new("k1", ArrowDataType::UInt32, false),
2207 Field::new("v0", ArrowDataType::Int64, true),
2208 Field::new("v1", ArrowDataType::Float64, true),
2209 Field::new(
2210 "ts",
2211 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2212 false,
2213 ),
2214 ]));
2215
2216 let input_batch = RecordBatch::try_new(
2217 input_schema,
2218 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2219 )
2220 .unwrap();
2221
2222 let part = BulkPart {
2223 batch: input_batch,
2224 max_timestamp: 2000,
2225 min_timestamp: 1000,
2226 sequence: 3,
2227 timestamp_index: 4,
2228 raw_data: None,
2229 };
2230
2231 let output_schema = to_flat_sst_arrow_schema(
2232 &metadata,
2233 &FlatSchemaOptions {
2234 raw_pk_columns: false,
2235 string_pk_use_dict: true,
2236 },
2237 );
2238
2239 let result = convert_bulk_part(
2240 part,
2241 &metadata,
2242 primary_key_codec,
2243 output_schema,
2244 false, )
2246 .unwrap();
2247
2248 let converted = result.unwrap();
2249
2250 assert_eq!(converted.num_rows(), 2);
2251 assert_eq!(converted.max_timestamp, 2000);
2252 assert_eq!(converted.min_timestamp, 1000);
2253 assert_eq!(converted.sequence, 3);
2254
2255 let schema = converted.batch.schema();
2257 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2258 assert_eq!(
2259 field_names,
2260 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2261 );
2262
2263 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2265 assert!(matches!(
2266 pk_col.data_type(),
2267 ArrowDataType::Dictionary(_, _)
2268 ));
2269 }
2270
2271 #[test]
2272 fn test_convert_bulk_part_sparse_encoding() {
2273 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2274 builder
2275 .push_column_metadata(ColumnMetadata {
2276 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2277 semantic_type: SemanticType::Tag,
2278 column_id: 0,
2279 })
2280 .push_column_metadata(ColumnMetadata {
2281 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2282 semantic_type: SemanticType::Tag,
2283 column_id: 1,
2284 })
2285 .push_column_metadata(ColumnMetadata {
2286 column_schema: ColumnSchema::new(
2287 "ts",
2288 ConcreteDataType::timestamp_millisecond_datatype(),
2289 false,
2290 ),
2291 semantic_type: SemanticType::Timestamp,
2292 column_id: 2,
2293 })
2294 .push_column_metadata(ColumnMetadata {
2295 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2296 semantic_type: SemanticType::Field,
2297 column_id: 3,
2298 })
2299 .push_column_metadata(ColumnMetadata {
2300 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2301 semantic_type: SemanticType::Field,
2302 column_id: 4,
2303 })
2304 .primary_key(vec![0, 1])
2305 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2306 let metadata = Arc::new(builder.build().unwrap());
2307
2308 let primary_key_codec = build_primary_key_codec(&metadata);
2309
2310 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2312 b"encoded_key_1".as_slice(),
2313 b"encoded_key_2".as_slice(),
2314 ]));
2315 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2316 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2317 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2318
2319 let input_schema = Arc::new(Schema::new(vec![
2320 Field::new("__primary_key", ArrowDataType::Binary, false),
2321 Field::new("v0", ArrowDataType::Int64, true),
2322 Field::new("v1", ArrowDataType::Float64, true),
2323 Field::new(
2324 "ts",
2325 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2326 false,
2327 ),
2328 ]));
2329
2330 let input_batch =
2331 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2332 .unwrap();
2333
2334 let part = BulkPart {
2335 batch: input_batch,
2336 max_timestamp: 2000,
2337 min_timestamp: 1000,
2338 sequence: 7,
2339 timestamp_index: 3,
2340 raw_data: None,
2341 };
2342
2343 let output_schema = to_flat_sst_arrow_schema(
2344 &metadata,
2345 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2346 );
2347
2348 let result = convert_bulk_part(
2349 part,
2350 &metadata,
2351 primary_key_codec,
2352 output_schema,
2353 true, )
2355 .unwrap();
2356
2357 let converted = result.unwrap();
2358
2359 assert_eq!(converted.num_rows(), 2);
2360 assert_eq!(converted.max_timestamp, 2000);
2361 assert_eq!(converted.min_timestamp, 1000);
2362 assert_eq!(converted.sequence, 7);
2363
2364 let schema = converted.batch.schema();
2366 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2367 assert_eq!(
2368 field_names,
2369 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2370 );
2371
2372 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2374 assert!(matches!(
2375 pk_col.data_type(),
2376 ArrowDataType::Dictionary(_, _)
2377 ));
2378 }
2379
2380 #[test]
2381 fn test_convert_bulk_part_sorting_with_multiple_series() {
2382 let metadata = metadata_for_test();
2383 let primary_key_codec = build_primary_key_codec(&metadata);
2384
2385 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2387 "series_b", "series_a", "series_b", "series_a",
2388 ]));
2389 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2390 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2391 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2392 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2393 2000, 1000, 4000, 3000,
2394 ]));
2395
2396 let input_schema = Arc::new(Schema::new(vec![
2397 Field::new("k0", ArrowDataType::Utf8, false),
2398 Field::new("k1", ArrowDataType::UInt32, false),
2399 Field::new("v0", ArrowDataType::Int64, true),
2400 Field::new("v1", ArrowDataType::Float64, true),
2401 Field::new(
2402 "ts",
2403 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2404 false,
2405 ),
2406 ]));
2407
2408 let input_batch = RecordBatch::try_new(
2409 input_schema,
2410 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2411 )
2412 .unwrap();
2413
2414 let part = BulkPart {
2415 batch: input_batch,
2416 max_timestamp: 4000,
2417 min_timestamp: 1000,
2418 sequence: 10,
2419 timestamp_index: 4,
2420 raw_data: None,
2421 };
2422
2423 let output_schema = to_flat_sst_arrow_schema(
2424 &metadata,
2425 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2426 );
2427
2428 let result =
2429 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2430
2431 let converted = result.unwrap();
2432
2433 assert_eq!(converted.num_rows(), 4);
2434
2435 let ts_col = converted.batch.column(converted.timestamp_index);
2437 let ts_array = ts_col
2438 .as_any()
2439 .downcast_ref::<TimestampMillisecondArray>()
2440 .unwrap();
2441
2442 let timestamps: Vec<i64> = ts_array.values().to_vec();
2446 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2447 }
2448}