1use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64 InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80#[derive(Clone)]
82pub struct BulkPart {
83 pub batch: RecordBatch,
84 pub max_timestamp: i64,
85 pub min_timestamp: i64,
86 pub sequence: u64,
87 pub timestamp_index: usize,
88 pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92 type Error = error::Error;
93
94 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95 match value.body.expect("Entry payload should be present") {
96 Body::ArrowIpc(ipc) => {
97 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 let batch = decoder
100 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101 .context(error::ConvertBulkWalEntrySnafu)?;
102 Ok(Self {
103 batch,
104 max_timestamp: value.max_ts,
105 min_timestamp: value.min_ts,
106 sequence: value.sequence,
107 timestamp_index: value.timestamp_index as usize,
108 raw_data: Some(ipc),
109 })
110 }
111 }
112 }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116 type Error = error::Error;
117
118 fn try_from(value: &BulkPart) -> Result<Self> {
119 if let Some(ipc) = &value.raw_data {
120 Ok(BulkWalEntry {
121 sequence: value.sequence,
122 max_ts: value.max_timestamp,
123 min_ts: value.min_timestamp,
124 timestamp_index: value.timestamp_index as u32,
125 body: Some(Body::ArrowIpc(ipc.clone())),
126 })
127 } else {
128 let mut encoder = FlightEncoder::default();
129 let schema_bytes = encoder
130 .encode_schema(value.batch.schema().as_ref())
131 .data_header;
132 let [rb_data] = encoder
133 .encode(FlightMessage::RecordBatch(value.batch.clone()))
134 .try_into()
135 .map_err(|_| {
136 error::UnsupportedOperationSnafu {
137 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138 }
139 .build()
140 })?;
141 Ok(BulkWalEntry {
142 sequence: value.sequence,
143 max_ts: value.max_timestamp,
144 min_ts: value.min_timestamp,
145 timestamp_index: value.timestamp_index as u32,
146 body: Some(Body::ArrowIpc(ArrowIpc {
147 schema: schema_bytes,
148 data_header: rb_data.data_header,
149 payload: rb_data.data_body,
150 })),
151 })
152 }
153 }
154}
155
156impl BulkPart {
157 pub(crate) fn estimated_size(&self) -> usize {
158 record_batch_estimated_size(&self.batch)
159 }
160
161 pub fn estimated_series_count(&self) -> usize {
164 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165 let pk_column = self.batch.column(pk_column_idx);
166 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167 dict_array.values().len()
168 } else {
169 0
170 }
171 }
172
173 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
175 let ts_type = region_metadata.time_index_type();
176 let min_ts = ts_type.create_timestamp(self.min_timestamp);
177 let max_ts = ts_type.create_timestamp(self.max_timestamp);
178
179 MemtableStats {
180 estimated_bytes: self.estimated_size(),
181 time_range: Some((min_ts, max_ts)),
182 num_rows: self.num_rows(),
183 num_ranges: 1,
184 max_sequence: self.sequence,
185 series_count: self.estimated_series_count(),
186 }
187 }
188
189 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
199 let batch_schema = self.batch.schema();
201 let batch_columns: HashSet<_> = batch_schema
202 .fields()
203 .iter()
204 .map(|f| f.name().as_str())
205 .collect();
206
207 let mut columns_to_fill = Vec::new();
209 for column_meta in ®ion_metadata.column_metadatas {
210 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
213 columns_to_fill.push(column_meta);
214 }
215 }
216
217 if columns_to_fill.is_empty() {
218 return Ok(());
219 }
220
221 let num_rows = self.batch.num_rows();
222
223 let mut new_columns = Vec::new();
224 let mut new_fields = Vec::new();
225
226 new_fields.extend(batch_schema.fields().iter().cloned());
228 new_columns.extend_from_slice(self.batch.columns());
229
230 let region_id = region_metadata.region_id;
231 for column_meta in columns_to_fill {
233 let default_vector = column_meta
234 .column_schema
235 .create_default_vector(num_rows)
236 .context(CreateDefaultSnafu {
237 region_id,
238 column: &column_meta.column_schema.name,
239 })?
240 .with_context(|| InvalidRequestSnafu {
241 region_id,
242 reason: format!(
243 "column {} does not have default value",
244 column_meta.column_schema.name
245 ),
246 })?;
247 let arrow_array = default_vector.to_arrow_array();
248 column_meta.column_schema.data_type.as_arrow_type();
249
250 new_fields.push(Arc::new(Field::new(
251 column_meta.column_schema.name.clone(),
252 column_meta.column_schema.data_type.as_arrow_type(),
253 column_meta.column_schema.is_nullable(),
254 )));
255 new_columns.push(arrow_array);
256 }
257
258 let new_schema = Arc::new(Schema::new(new_fields));
260 let new_batch =
261 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
262
263 self.batch = new_batch;
265
266 Ok(())
267 }
268
269 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
271 let vectors = region_metadata
272 .schema
273 .column_schemas()
274 .iter()
275 .map(|col| match self.batch.column_by_name(&col.name) {
276 None => Ok(None),
277 Some(col) => Helper::try_into_vector(col).map(Some),
278 })
279 .collect::<datatypes::error::Result<Vec<_>>>()
280 .context(error::ComputeVectorSnafu)?;
281
282 let rows = (0..self.num_rows())
283 .map(|row_idx| {
284 let values = (0..self.batch.num_columns())
285 .map(|col_idx| {
286 if let Some(v) = &vectors[col_idx] {
287 to_grpc_value(v.get(row_idx))
288 } else {
289 api::v1::Value { value_data: None }
290 }
291 })
292 .collect::<Vec<_>>();
293 api::v1::Row { values }
294 })
295 .collect::<Vec<_>>();
296
297 let schema = region_metadata
298 .column_metadatas
299 .iter()
300 .map(|c| {
301 let data_type_wrapper =
302 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
303 Ok(api::v1::ColumnSchema {
304 column_name: c.column_schema.name.clone(),
305 datatype: data_type_wrapper.datatype() as i32,
306 semantic_type: c.semantic_type as i32,
307 ..Default::default()
308 })
309 })
310 .collect::<api::error::Result<Vec<_>>>()
311 .context(error::ConvertColumnDataTypeSnafu {
312 reason: "failed to convert region metadata to column schema",
313 })?;
314
315 let rows = api::v1::Rows { schema, rows };
316
317 Ok(Mutation {
318 op_type: OpType::Put as i32,
319 sequence: self.sequence,
320 rows: Some(rows),
321 write_hint: None,
322 })
323 }
324
325 pub fn timestamps(&self) -> &ArrayRef {
326 self.batch.column(self.timestamp_index)
327 }
328
329 pub fn num_rows(&self) -> usize {
330 self.batch.num_rows()
331 }
332}
333
334pub struct UnorderedPart {
337 parts: Vec<BulkPart>,
339 total_rows: usize,
341 min_timestamp: i64,
343 max_timestamp: i64,
345 max_sequence: u64,
347 threshold: usize,
349 compact_threshold: usize,
351}
352
353impl Default for UnorderedPart {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359impl UnorderedPart {
360 pub fn new() -> Self {
362 Self {
363 parts: Vec::new(),
364 total_rows: 0,
365 min_timestamp: i64::MAX,
366 max_timestamp: i64::MIN,
367 max_sequence: 0,
368 threshold: 1024,
369 compact_threshold: 4096,
370 }
371 }
372
373 pub fn set_threshold(&mut self, threshold: usize) {
375 self.threshold = threshold;
376 }
377
378 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
380 self.compact_threshold = compact_threshold;
381 }
382
383 pub fn threshold(&self) -> usize {
385 self.threshold
386 }
387
388 pub fn compact_threshold(&self) -> usize {
390 self.compact_threshold
391 }
392
393 pub fn should_accept(&self, num_rows: usize) -> bool {
395 num_rows < self.threshold
396 }
397
398 pub fn should_compact(&self) -> bool {
400 self.total_rows >= self.compact_threshold
401 }
402
403 pub fn push(&mut self, part: BulkPart) {
405 self.total_rows += part.num_rows();
406 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
407 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
408 self.max_sequence = self.max_sequence.max(part.sequence);
409 self.parts.push(part);
410 }
411
412 pub fn num_rows(&self) -> usize {
414 self.total_rows
415 }
416
417 pub fn is_empty(&self) -> bool {
419 self.parts.is_empty()
420 }
421
422 pub fn num_parts(&self) -> usize {
424 self.parts.len()
425 }
426
427 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
430 if self.parts.is_empty() {
431 return Ok(None);
432 }
433
434 if self.parts.len() == 1 {
435 return Ok(Some(self.parts[0].batch.clone()));
437 }
438
439 let schema = self.parts[0].batch.schema();
441
442 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
444 let concatenated =
445 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
446
447 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
449
450 Ok(Some(sorted_batch))
451 }
452
453 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
456 let Some(sorted_batch) = self.concat_and_sort()? else {
457 return Ok(None);
458 };
459
460 let timestamp_index = self.parts[0].timestamp_index;
461
462 Ok(Some(BulkPart {
463 batch: sorted_batch,
464 max_timestamp: self.max_timestamp,
465 min_timestamp: self.min_timestamp,
466 sequence: self.max_sequence,
467 timestamp_index,
468 raw_data: None,
469 }))
470 }
471
472 pub fn clear(&mut self) {
474 self.parts.clear();
475 self.total_rows = 0;
476 self.min_timestamp = i64::MAX;
477 self.max_timestamp = i64::MIN;
478 self.max_sequence = 0;
479 }
480}
481
482pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
484 batch
485 .columns()
486 .iter()
487 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
489 .sum()
490}
491
492enum PrimaryKeyColumnBuilder {
494 StringDict(StringDictionaryBuilder<UInt32Type>),
496 Vector(Box<dyn MutableVector>),
498}
499
500impl PrimaryKeyColumnBuilder {
501 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
503 match self {
504 PrimaryKeyColumnBuilder::StringDict(builder) => {
505 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
506 builder.append_value(s);
508 } else {
509 builder.append_null();
510 }
511 }
512 PrimaryKeyColumnBuilder::Vector(builder) => {
513 builder.push_value_ref(&value);
514 }
515 }
516 Ok(())
517 }
518
519 fn into_arrow_array(self) -> ArrayRef {
521 match self {
522 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
523 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
524 }
525 }
526}
527
528pub struct BulkPartConverter {
530 region_metadata: RegionMetadataRef,
532 schema: SchemaRef,
534 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
536 key_buf: Vec<u8>,
538 key_array_builder: PrimaryKeyArrayBuilder,
540 value_builder: ValueBuilder,
542 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
545
546 max_ts: i64,
548 min_ts: i64,
550 max_sequence: SequenceNumber,
552}
553
554impl BulkPartConverter {
555 pub fn new(
560 region_metadata: &RegionMetadataRef,
561 schema: SchemaRef,
562 capacity: usize,
563 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
564 store_primary_key_columns: bool,
565 ) -> Self {
566 debug_assert_eq!(
567 region_metadata.primary_key_encoding,
568 primary_key_codec.encoding()
569 );
570
571 let primary_key_column_builders = if store_primary_key_columns
572 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
573 {
574 new_primary_key_column_builders(region_metadata, capacity)
575 } else {
576 Vec::new()
577 };
578
579 Self {
580 region_metadata: region_metadata.clone(),
581 schema,
582 primary_key_codec,
583 key_buf: Vec::new(),
584 key_array_builder: PrimaryKeyArrayBuilder::new(),
585 value_builder: ValueBuilder::new(region_metadata, capacity),
586 primary_key_column_builders,
587 min_ts: i64::MAX,
588 max_ts: i64::MIN,
589 max_sequence: SequenceNumber::MIN,
590 }
591 }
592
593 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595 for kv in key_values.iter() {
596 self.append_key_value(&kv)?;
597 }
598
599 Ok(())
600 }
601
602 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608 let mut primary_keys = kv.primary_keys();
611 if let Some(encoded) = primary_keys
612 .next()
613 .context(ColumnNotFoundSnafu {
614 column: PRIMARY_KEY_COLUMN_NAME,
615 })?
616 .try_into_binary()
617 .context(DataTypeMismatchSnafu)?
618 {
619 self.key_array_builder
620 .append(encoded)
621 .context(ComputeArrowSnafu)?;
622 } else {
623 self.key_array_builder
624 .append("")
625 .context(ComputeArrowSnafu)?;
626 }
627 } else {
628 self.key_buf.clear();
630 self.primary_key_codec
631 .encode_key_value(kv, &mut self.key_buf)
632 .context(EncodeSnafu)?;
633 self.key_array_builder
634 .append(&self.key_buf)
635 .context(ComputeArrowSnafu)?;
636 };
637
638 if !self.primary_key_column_builders.is_empty() {
640 for (builder, pk_value) in self
641 .primary_key_column_builders
642 .iter_mut()
643 .zip(kv.primary_keys())
644 {
645 builder.push_value_ref(pk_value)?;
646 }
647 }
648
649 self.value_builder.push(
651 kv.timestamp(),
652 kv.sequence(),
653 kv.op_type() as u8,
654 kv.fields(),
655 );
656
657 let ts = kv
660 .timestamp()
661 .try_into_timestamp()
662 .unwrap()
663 .unwrap()
664 .value();
665 self.min_ts = self.min_ts.min(ts);
666 self.max_ts = self.max_ts.max(ts);
667 self.max_sequence = self.max_sequence.max(kv.sequence());
668
669 Ok(())
670 }
671
672 pub fn convert(mut self) -> Result<BulkPart> {
676 let values = Values::from(self.value_builder);
677 let mut columns =
678 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680 for builder in self.primary_key_column_builders {
682 columns.push(builder.into_arrow_array());
683 }
684 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686 let timestamp_index = columns.len();
688 columns.push(values.timestamp.to_arrow_array());
689 let pk_array = self.key_array_builder.finish();
691 columns.push(Arc::new(pk_array));
692 columns.push(values.sequence.to_arrow_array());
694 columns.push(values.op_type.to_arrow_array());
695
696 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
697 let batch = sort_primary_key_record_batch(&batch)?;
699
700 Ok(BulkPart {
701 batch,
702 max_timestamp: self.max_ts,
703 min_timestamp: self.min_ts,
704 sequence: self.max_sequence,
705 timestamp_index,
706 raw_data: None,
707 })
708 }
709}
710
711fn new_primary_key_column_builders(
712 metadata: &RegionMetadata,
713 capacity: usize,
714) -> Vec<PrimaryKeyColumnBuilder> {
715 metadata
716 .primary_key_columns()
717 .map(|col| {
718 if col.column_schema.data_type.is_string() {
719 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
720 capacity,
721 INIT_DICT_VALUE_CAPACITY,
722 capacity,
723 ))
724 } else {
725 PrimaryKeyColumnBuilder::Vector(
726 col.column_schema.data_type.create_mutable_vector(capacity),
727 )
728 }
729 })
730 .collect()
731}
732
733pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
735 let total_columns = batch.num_columns();
736 let sort_columns = vec![
737 SortColumn {
739 values: batch.column(total_columns - 3).clone(),
740 options: Some(SortOptions {
741 descending: false,
742 nulls_first: true,
743 }),
744 },
745 SortColumn {
747 values: batch.column(total_columns - 4).clone(),
748 options: Some(SortOptions {
749 descending: false,
750 nulls_first: true,
751 }),
752 },
753 SortColumn {
755 values: batch.column(total_columns - 2).clone(),
756 options: Some(SortOptions {
757 descending: true,
758 nulls_first: true,
759 }),
760 },
761 ];
762
763 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
764 .context(ComputeArrowSnafu)?;
765
766 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
767}
768
769pub fn convert_bulk_part(
795 part: BulkPart,
796 region_metadata: &RegionMetadataRef,
797 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
798 schema: SchemaRef,
799 store_primary_key_columns: bool,
800) -> Result<Option<BulkPart>> {
801 if part.num_rows() == 0 {
802 return Ok(None);
803 }
804
805 let num_rows = part.num_rows();
806 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
807
808 let input_schema = part.batch.schema();
810 let column_indices: HashMap<&str, usize> = input_schema
811 .fields()
812 .iter()
813 .enumerate()
814 .map(|(idx, field)| (field.name().as_str(), idx))
815 .collect();
816
817 let mut output_columns = Vec::new();
819
820 let pk_array = if is_sparse {
822 None
825 } else {
826 let pk_vectors: Result<Vec<_>> = region_metadata
828 .primary_key_columns()
829 .map(|col_meta| {
830 let col_idx = column_indices
831 .get(col_meta.column_schema.name.as_str())
832 .context(ColumnNotFoundSnafu {
833 column: &col_meta.column_schema.name,
834 })?;
835 let col = part.batch.column(*col_idx);
836 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
837 })
838 .collect();
839 let pk_vectors = pk_vectors?;
840
841 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
842 let mut encode_buf = Vec::new();
843
844 for row_idx in 0..num_rows {
845 encode_buf.clear();
846
847 let pk_values_with_ids: Vec<_> = region_metadata
849 .primary_key
850 .iter()
851 .zip(pk_vectors.iter())
852 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
853 .collect();
854
855 primary_key_codec
857 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
858 .context(EncodeSnafu)?;
859
860 key_array_builder
861 .append(&encode_buf)
862 .context(ComputeArrowSnafu)?;
863 }
864
865 Some(key_array_builder.finish())
866 };
867
868 if store_primary_key_columns && !is_sparse {
870 for col_meta in region_metadata.primary_key_columns() {
871 let col_idx = column_indices
872 .get(col_meta.column_schema.name.as_str())
873 .context(ColumnNotFoundSnafu {
874 column: &col_meta.column_schema.name,
875 })?;
876 let col = part.batch.column(*col_idx);
877
878 let col = if col_meta.column_schema.data_type.is_string() {
880 let target_type = ArrowDataType::Dictionary(
881 Box::new(ArrowDataType::UInt32),
882 Box::new(ArrowDataType::Utf8),
883 );
884 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
885 } else {
886 col.clone()
887 };
888 output_columns.push(col);
889 }
890 }
891
892 for col_meta in region_metadata.field_columns() {
894 let col_idx = column_indices
895 .get(col_meta.column_schema.name.as_str())
896 .context(ColumnNotFoundSnafu {
897 column: &col_meta.column_schema.name,
898 })?;
899 output_columns.push(part.batch.column(*col_idx).clone());
900 }
901
902 let new_timestamp_index = output_columns.len();
904 let ts_col_idx = column_indices
905 .get(
906 region_metadata
907 .time_index_column()
908 .column_schema
909 .name
910 .as_str(),
911 )
912 .context(ColumnNotFoundSnafu {
913 column: ®ion_metadata.time_index_column().column_schema.name,
914 })?;
915 output_columns.push(part.batch.column(*ts_col_idx).clone());
916
917 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
919 Arc::new(pk_dict_array) as ArrayRef
920 } else {
921 let pk_col_idx =
922 column_indices
923 .get(PRIMARY_KEY_COLUMN_NAME)
924 .context(ColumnNotFoundSnafu {
925 column: PRIMARY_KEY_COLUMN_NAME,
926 })?;
927 let col = part.batch.column(*pk_col_idx);
928
929 let target_type = ArrowDataType::Dictionary(
931 Box::new(ArrowDataType::UInt32),
932 Box::new(ArrowDataType::Binary),
933 );
934 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
935 };
936 output_columns.push(pk_dictionary);
937
938 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
939 output_columns.push(Arc::new(sequence_array) as ArrayRef);
940
941 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
942 output_columns.push(Arc::new(op_type_array) as ArrayRef);
943
944 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
945
946 let sorted_batch = sort_primary_key_record_batch(&batch)?;
948
949 Ok(Some(BulkPart {
950 batch: sorted_batch,
951 max_timestamp: part.max_timestamp,
952 min_timestamp: part.min_timestamp,
953 sequence: part.sequence,
954 timestamp_index: new_timestamp_index,
955 raw_data: None,
956 }))
957}
958
959#[derive(Debug, Clone)]
960pub struct EncodedBulkPart {
961 data: Bytes,
962 metadata: BulkPartMeta,
963}
964
965impl EncodedBulkPart {
966 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
967 Self { data, metadata }
968 }
969
970 pub(crate) fn metadata(&self) -> &BulkPartMeta {
971 &self.metadata
972 }
973
974 pub(crate) fn size_bytes(&self) -> usize {
976 self.data.len()
977 }
978
979 pub(crate) fn data(&self) -> &Bytes {
981 &self.data
982 }
983
984 pub fn to_memtable_stats(&self) -> MemtableStats {
986 let meta = &self.metadata;
987 let ts_type = meta.region_metadata.time_index_type();
988 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
989 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
990
991 MemtableStats {
992 estimated_bytes: self.size_bytes(),
993 time_range: Some((min_ts, max_ts)),
994 num_rows: meta.num_rows,
995 num_ranges: 1,
996 max_sequence: meta.max_sequence,
997 series_count: meta.num_series as usize,
998 }
999 }
1000
1001 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1009 let unit = self.metadata.region_metadata.time_index_type().unit();
1010 let max_row_group_uncompressed_size: u64 = self
1011 .metadata
1012 .parquet_metadata
1013 .row_groups()
1014 .iter()
1015 .map(|rg| {
1016 rg.columns()
1017 .iter()
1018 .map(|c| c.uncompressed_size() as u64)
1019 .sum::<u64>()
1020 })
1021 .max()
1022 .unwrap_or(0);
1023 SstInfo {
1024 file_id,
1025 time_range: (
1026 Timestamp::new(self.metadata.min_timestamp, unit),
1027 Timestamp::new(self.metadata.max_timestamp, unit),
1028 ),
1029 file_size: self.data.len() as u64,
1030 max_row_group_uncompressed_size,
1031 num_rows: self.metadata.num_rows,
1032 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1033 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1034 index_metadata: IndexOutput::default(),
1035 num_series: self.metadata.num_series,
1036 }
1037 }
1038
1039 pub(crate) fn read(
1040 &self,
1041 context: BulkIterContextRef,
1042 sequence: Option<SequenceRange>,
1043 mem_scan_metrics: Option<MemScanMetrics>,
1044 ) -> Result<Option<BoxedRecordBatchIterator>> {
1045 let skip_fields_for_pruning =
1047 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1048
1049 let row_groups_to_read =
1051 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1052
1053 if row_groups_to_read.is_empty() {
1054 return Ok(None);
1056 }
1057
1058 let iter = EncodedBulkPartIter::try_new(
1059 self,
1060 context,
1061 row_groups_to_read,
1062 sequence,
1063 mem_scan_metrics,
1064 )?;
1065 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1066 }
1067
1068 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1070 match pre_filter_mode {
1071 PreFilterMode::All => false,
1072 PreFilterMode::SkipFields => true,
1073 PreFilterMode::SkipFieldsOnDelete => {
1074 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1076 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1077 })
1078 }
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone)]
1084pub struct BulkPartMeta {
1085 pub num_rows: usize,
1087 pub max_timestamp: i64,
1089 pub min_timestamp: i64,
1091 pub parquet_metadata: Arc<ParquetMetaData>,
1093 pub region_metadata: RegionMetadataRef,
1095 pub num_series: u64,
1097 pub max_sequence: u64,
1099}
1100
1101#[derive(Default, Debug)]
1103pub struct BulkPartEncodeMetrics {
1104 pub iter_cost: Duration,
1106 pub write_cost: Duration,
1108 pub raw_size: usize,
1110 pub encoded_size: usize,
1112 pub num_rows: usize,
1114}
1115
1116pub struct BulkPartEncoder {
1117 metadata: RegionMetadataRef,
1118 row_group_size: usize,
1119 writer_props: Option<WriterProperties>,
1120}
1121
1122impl BulkPartEncoder {
1123 pub(crate) fn new(
1124 metadata: RegionMetadataRef,
1125 row_group_size: usize,
1126 ) -> Result<BulkPartEncoder> {
1127 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1129 let key_value_meta =
1130 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1131
1132 let writer_props = Some(
1134 WriterProperties::builder()
1135 .set_key_value_metadata(Some(vec![key_value_meta]))
1136 .set_write_batch_size(row_group_size)
1137 .set_max_row_group_size(row_group_size)
1138 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1139 .set_column_index_truncate_length(None)
1140 .set_statistics_truncate_length(None)
1141 .build(),
1142 );
1143
1144 Ok(Self {
1145 metadata,
1146 row_group_size,
1147 writer_props,
1148 })
1149 }
1150}
1151
1152impl BulkPartEncoder {
1153 pub fn encode_record_batch_iter(
1155 &self,
1156 iter: BoxedRecordBatchIterator,
1157 arrow_schema: SchemaRef,
1158 min_timestamp: i64,
1159 max_timestamp: i64,
1160 max_sequence: u64,
1161 metrics: &mut BulkPartEncodeMetrics,
1162 ) -> Result<Option<EncodedBulkPart>> {
1163 let mut buf = Vec::with_capacity(4096);
1164 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1165 .context(EncodeMemtableSnafu)?;
1166 let mut total_rows = 0;
1167 let mut series_estimator = SeriesEstimator::default();
1168
1169 let mut iter_start = Instant::now();
1171 for batch_result in iter {
1172 metrics.iter_cost += iter_start.elapsed();
1173 let batch = batch_result?;
1174 if batch.num_rows() == 0 {
1175 continue;
1176 }
1177
1178 series_estimator.update_flat(&batch);
1179 metrics.raw_size += record_batch_estimated_size(&batch);
1180 let write_start = Instant::now();
1181 writer.write(&batch).context(EncodeMemtableSnafu)?;
1182 metrics.write_cost += write_start.elapsed();
1183 total_rows += batch.num_rows();
1184 iter_start = Instant::now();
1185 }
1186 metrics.iter_cost += iter_start.elapsed();
1187 iter_start = Instant::now();
1188
1189 if total_rows == 0 {
1190 return Ok(None);
1191 }
1192
1193 let close_start = Instant::now();
1194 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1195 metrics.write_cost += close_start.elapsed();
1196 metrics.encoded_size += buf.len();
1197 metrics.num_rows += total_rows;
1198
1199 let buf = Bytes::from(buf);
1200 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1201 let num_series = series_estimator.finish();
1202
1203 Ok(Some(EncodedBulkPart {
1204 data: buf,
1205 metadata: BulkPartMeta {
1206 num_rows: total_rows,
1207 max_timestamp,
1208 min_timestamp,
1209 parquet_metadata,
1210 region_metadata: self.metadata.clone(),
1211 num_series,
1212 max_sequence,
1213 },
1214 }))
1215 }
1216
1217 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1219 if part.batch.num_rows() == 0 {
1220 return Ok(None);
1221 }
1222
1223 let mut buf = Vec::with_capacity(4096);
1224 let arrow_schema = part.batch.schema();
1225
1226 let file_metadata = {
1227 let mut writer =
1228 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1229 .context(EncodeMemtableSnafu)?;
1230 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1231 writer.finish().context(EncodeMemtableSnafu)?
1232 };
1233
1234 let buf = Bytes::from(buf);
1235 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1236
1237 Ok(Some(EncodedBulkPart {
1238 data: buf,
1239 metadata: BulkPartMeta {
1240 num_rows: part.batch.num_rows(),
1241 max_timestamp: part.max_timestamp,
1242 min_timestamp: part.min_timestamp,
1243 parquet_metadata,
1244 region_metadata: self.metadata.clone(),
1245 num_series: part.estimated_series_count() as u64,
1246 max_sequence: part.sequence,
1247 },
1248 }))
1249 }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use api::v1::{Row, SemanticType, WriteHint};
1255 use datafusion_common::ScalarValue;
1256 use datatypes::arrow::array::Float64Array;
1257 use datatypes::prelude::{ConcreteDataType, Value};
1258 use datatypes::schema::ColumnSchema;
1259 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1260 use store_api::storage::RegionId;
1261 use store_api::storage::consts::ReservedColumnId;
1262
1263 use super::*;
1264 use crate::memtable::bulk::context::BulkIterContext;
1265 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1266 use crate::test_util::memtable_util::{
1267 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1268 };
1269
1270 struct MutationInput<'a> {
1271 k0: &'a str,
1272 k1: u32,
1273 timestamps: &'a [i64],
1274 v1: &'a [Option<f64>],
1275 sequence: u64,
1276 }
1277
1278 #[derive(Debug, PartialOrd, PartialEq)]
1279 struct BatchOutput<'a> {
1280 pk_values: &'a [Value],
1281 timestamps: &'a [i64],
1282 v1: &'a [Option<f64>],
1283 }
1284
1285 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1286 let metadata = metadata_for_test();
1287 let kvs = input
1288 .iter()
1289 .map(|m| {
1290 build_key_values_with_ts_seq_values(
1291 &metadata,
1292 m.k0.to_string(),
1293 m.k1,
1294 m.timestamps.iter().copied(),
1295 m.v1.iter().copied(),
1296 m.sequence,
1297 )
1298 })
1299 .collect::<Vec<_>>();
1300 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1301 let primary_key_codec = build_primary_key_codec(&metadata);
1302 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1303 for kv in kvs {
1304 converter.append_key_values(&kv).unwrap();
1305 }
1306 let part = converter.convert().unwrap();
1307 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1308 encoder.encode_part(&part).unwrap().unwrap()
1309 }
1310
1311 #[test]
1312 fn test_write_and_read_part_projection() {
1313 let part = encode(&[
1314 MutationInput {
1315 k0: "a",
1316 k1: 0,
1317 timestamps: &[1],
1318 v1: &[Some(0.1)],
1319 sequence: 0,
1320 },
1321 MutationInput {
1322 k0: "b",
1323 k1: 0,
1324 timestamps: &[1],
1325 v1: &[Some(0.0)],
1326 sequence: 0,
1327 },
1328 MutationInput {
1329 k0: "a",
1330 k1: 0,
1331 timestamps: &[2],
1332 v1: &[Some(0.2)],
1333 sequence: 1,
1334 },
1335 ]);
1336
1337 let projection = &[4u32];
1338 let mut reader = part
1339 .read(
1340 Arc::new(
1341 BulkIterContext::new(
1342 part.metadata.region_metadata.clone(),
1343 Some(projection.as_slice()),
1344 None,
1345 false,
1346 )
1347 .unwrap(),
1348 ),
1349 None,
1350 None,
1351 )
1352 .unwrap()
1353 .expect("expect at least one row group");
1354
1355 let mut total_rows_read = 0;
1356 let mut field: Vec<f64> = vec![];
1357 for res in reader {
1358 let batch = res.unwrap();
1359 assert_eq!(5, batch.num_columns());
1360 field.extend_from_slice(
1361 batch
1362 .column(0)
1363 .as_any()
1364 .downcast_ref::<Float64Array>()
1365 .unwrap()
1366 .values(),
1367 );
1368 total_rows_read += batch.num_rows();
1369 }
1370 assert_eq!(3, total_rows_read);
1371 assert_eq!(vec![0.1, 0.2, 0.0], field);
1372 }
1373
1374 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1375 let metadata = metadata_for_test();
1376 let kvs = key_values
1377 .into_iter()
1378 .map(|(k0, k1, (start, end), sequence)| {
1379 let ts = (start..end);
1380 let v1 = (start..end).map(|_| None);
1381 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1382 })
1383 .collect::<Vec<_>>();
1384 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1385 let primary_key_codec = build_primary_key_codec(&metadata);
1386 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1387 for kv in kvs {
1388 converter.append_key_values(&kv).unwrap();
1389 }
1390 let part = converter.convert().unwrap();
1391 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1392 encoder.encode_part(&part).unwrap().unwrap()
1393 }
1394
1395 fn check_prune_row_group(
1396 part: &EncodedBulkPart,
1397 predicate: Option<Predicate>,
1398 expected_rows: usize,
1399 ) {
1400 let context = Arc::new(
1401 BulkIterContext::new(
1402 part.metadata.region_metadata.clone(),
1403 None,
1404 predicate,
1405 false,
1406 )
1407 .unwrap(),
1408 );
1409 let mut reader = part
1410 .read(context, None, None)
1411 .unwrap()
1412 .expect("expect at least one row group");
1413 let mut total_rows_read = 0;
1414 for res in reader {
1415 let batch = res.unwrap();
1416 total_rows_read += batch.num_rows();
1417 }
1418 assert_eq!(expected_rows, total_rows_read);
1420 }
1421
1422 #[test]
1423 fn test_prune_row_groups() {
1424 let part = prepare(vec![
1425 ("a", 0, (0, 40), 1),
1426 ("a", 1, (0, 60), 1),
1427 ("b", 0, (0, 100), 2),
1428 ("b", 1, (100, 180), 3),
1429 ("b", 1, (180, 210), 4),
1430 ]);
1431
1432 let context = Arc::new(
1433 BulkIterContext::new(
1434 part.metadata.region_metadata.clone(),
1435 None,
1436 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1437 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1438 )])),
1439 false,
1440 )
1441 .unwrap(),
1442 );
1443 assert!(part.read(context, None, None).unwrap().is_none());
1444
1445 check_prune_row_group(&part, None, 310);
1446
1447 check_prune_row_group(
1448 &part,
1449 Some(Predicate::new(vec![
1450 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1451 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1452 ])),
1453 40,
1454 );
1455
1456 check_prune_row_group(
1457 &part,
1458 Some(Predicate::new(vec![
1459 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1460 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1461 ])),
1462 60,
1463 );
1464
1465 check_prune_row_group(
1466 &part,
1467 Some(Predicate::new(vec![
1468 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1469 ])),
1470 100,
1471 );
1472
1473 check_prune_row_group(
1474 &part,
1475 Some(Predicate::new(vec![
1476 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1477 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1478 ])),
1479 100,
1480 );
1481
1482 check_prune_row_group(
1484 &part,
1485 Some(Predicate::new(vec![
1486 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1487 ])),
1488 1,
1489 );
1490 }
1491
1492 #[test]
1493 fn test_bulk_part_converter_append_and_convert() {
1494 let metadata = metadata_for_test();
1495 let capacity = 100;
1496 let primary_key_codec = build_primary_key_codec(&metadata);
1497 let schema = to_flat_sst_arrow_schema(
1498 &metadata,
1499 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1500 );
1501
1502 let mut converter =
1503 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1504
1505 let key_values1 = build_key_values_with_ts_seq_values(
1506 &metadata,
1507 "key1".to_string(),
1508 1u32,
1509 vec![1000, 2000].into_iter(),
1510 vec![Some(1.0), Some(2.0)].into_iter(),
1511 1,
1512 );
1513
1514 let key_values2 = build_key_values_with_ts_seq_values(
1515 &metadata,
1516 "key2".to_string(),
1517 2u32,
1518 vec![1500].into_iter(),
1519 vec![Some(3.0)].into_iter(),
1520 2,
1521 );
1522
1523 converter.append_key_values(&key_values1).unwrap();
1524 converter.append_key_values(&key_values2).unwrap();
1525
1526 let bulk_part = converter.convert().unwrap();
1527
1528 assert_eq!(bulk_part.num_rows(), 3);
1529 assert_eq!(bulk_part.min_timestamp, 1000);
1530 assert_eq!(bulk_part.max_timestamp, 2000);
1531 assert_eq!(bulk_part.sequence, 2);
1532 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1533
1534 let schema = bulk_part.batch.schema();
1537 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1538 assert_eq!(
1539 field_names,
1540 vec![
1541 "k0",
1542 "k1",
1543 "v0",
1544 "v1",
1545 "ts",
1546 "__primary_key",
1547 "__sequence",
1548 "__op_type"
1549 ]
1550 );
1551 }
1552
1553 #[test]
1554 fn test_bulk_part_converter_sorting() {
1555 let metadata = metadata_for_test();
1556 let capacity = 100;
1557 let primary_key_codec = build_primary_key_codec(&metadata);
1558 let schema = to_flat_sst_arrow_schema(
1559 &metadata,
1560 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1561 );
1562
1563 let mut converter =
1564 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1565
1566 let key_values1 = build_key_values_with_ts_seq_values(
1567 &metadata,
1568 "z_key".to_string(),
1569 3u32,
1570 vec![3000].into_iter(),
1571 vec![Some(3.0)].into_iter(),
1572 3,
1573 );
1574
1575 let key_values2 = build_key_values_with_ts_seq_values(
1576 &metadata,
1577 "a_key".to_string(),
1578 1u32,
1579 vec![1000].into_iter(),
1580 vec![Some(1.0)].into_iter(),
1581 1,
1582 );
1583
1584 let key_values3 = build_key_values_with_ts_seq_values(
1585 &metadata,
1586 "m_key".to_string(),
1587 2u32,
1588 vec![2000].into_iter(),
1589 vec![Some(2.0)].into_iter(),
1590 2,
1591 );
1592
1593 converter.append_key_values(&key_values1).unwrap();
1594 converter.append_key_values(&key_values2).unwrap();
1595 converter.append_key_values(&key_values3).unwrap();
1596
1597 let bulk_part = converter.convert().unwrap();
1598
1599 assert_eq!(bulk_part.num_rows(), 3);
1600
1601 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1602 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1603
1604 let ts_array = ts_column
1605 .as_any()
1606 .downcast_ref::<TimestampMillisecondArray>()
1607 .unwrap();
1608 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1609
1610 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1611 assert_eq!(seq_array.values(), &[1, 2, 3]);
1612
1613 let schema = bulk_part.batch.schema();
1615 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1616 assert_eq!(
1617 field_names,
1618 vec![
1619 "k0",
1620 "k1",
1621 "v0",
1622 "v1",
1623 "ts",
1624 "__primary_key",
1625 "__sequence",
1626 "__op_type"
1627 ]
1628 );
1629 }
1630
1631 #[test]
1632 fn test_bulk_part_converter_empty() {
1633 let metadata = metadata_for_test();
1634 let capacity = 10;
1635 let primary_key_codec = build_primary_key_codec(&metadata);
1636 let schema = to_flat_sst_arrow_schema(
1637 &metadata,
1638 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1639 );
1640
1641 let converter =
1642 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1643
1644 let bulk_part = converter.convert().unwrap();
1645
1646 assert_eq!(bulk_part.num_rows(), 0);
1647 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1648 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1649 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1650
1651 let schema = bulk_part.batch.schema();
1653 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1654 assert_eq!(
1655 field_names,
1656 vec![
1657 "k0",
1658 "k1",
1659 "v0",
1660 "v1",
1661 "ts",
1662 "__primary_key",
1663 "__sequence",
1664 "__op_type"
1665 ]
1666 );
1667 }
1668
1669 #[test]
1670 fn test_bulk_part_converter_without_primary_key_columns() {
1671 let metadata = metadata_for_test();
1672 let primary_key_codec = build_primary_key_codec(&metadata);
1673 let schema = to_flat_sst_arrow_schema(
1674 &metadata,
1675 &FlatSchemaOptions {
1676 raw_pk_columns: false,
1677 string_pk_use_dict: true,
1678 },
1679 );
1680
1681 let capacity = 100;
1682 let mut converter =
1683 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1684
1685 let key_values1 = build_key_values_with_ts_seq_values(
1686 &metadata,
1687 "key1".to_string(),
1688 1u32,
1689 vec![1000, 2000].into_iter(),
1690 vec![Some(1.0), Some(2.0)].into_iter(),
1691 1,
1692 );
1693
1694 let key_values2 = build_key_values_with_ts_seq_values(
1695 &metadata,
1696 "key2".to_string(),
1697 2u32,
1698 vec![1500].into_iter(),
1699 vec![Some(3.0)].into_iter(),
1700 2,
1701 );
1702
1703 converter.append_key_values(&key_values1).unwrap();
1704 converter.append_key_values(&key_values2).unwrap();
1705
1706 let bulk_part = converter.convert().unwrap();
1707
1708 assert_eq!(bulk_part.num_rows(), 3);
1709 assert_eq!(bulk_part.min_timestamp, 1000);
1710 assert_eq!(bulk_part.max_timestamp, 2000);
1711 assert_eq!(bulk_part.sequence, 2);
1712 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1713
1714 let schema = bulk_part.batch.schema();
1716 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1717 assert_eq!(
1718 field_names,
1719 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1720 );
1721 }
1722
1723 #[allow(clippy::too_many_arguments)]
1724 fn build_key_values_with_sparse_encoding(
1725 metadata: &RegionMetadataRef,
1726 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1727 table_id: u32,
1728 tsid: u64,
1729 k0: String,
1730 k1: String,
1731 timestamps: impl Iterator<Item = i64>,
1732 values: impl Iterator<Item = Option<f64>>,
1733 sequence: SequenceNumber,
1734 ) -> KeyValues {
1735 let pk_values = vec![
1737 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1738 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1739 (0, Value::String(k0.clone().into())),
1740 (1, Value::String(k1.clone().into())),
1741 ];
1742 let mut encoded_key = Vec::new();
1743 primary_key_codec
1744 .encode_values(&pk_values, &mut encoded_key)
1745 .unwrap();
1746 assert!(!encoded_key.is_empty());
1747
1748 let column_schema = vec![
1750 api::v1::ColumnSchema {
1751 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1752 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1753 ConcreteDataType::binary_datatype(),
1754 )
1755 .unwrap()
1756 .datatype() as i32,
1757 semantic_type: api::v1::SemanticType::Tag as i32,
1758 ..Default::default()
1759 },
1760 api::v1::ColumnSchema {
1761 column_name: "ts".to_string(),
1762 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1763 ConcreteDataType::timestamp_millisecond_datatype(),
1764 )
1765 .unwrap()
1766 .datatype() as i32,
1767 semantic_type: api::v1::SemanticType::Timestamp as i32,
1768 ..Default::default()
1769 },
1770 api::v1::ColumnSchema {
1771 column_name: "v0".to_string(),
1772 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1773 ConcreteDataType::int64_datatype(),
1774 )
1775 .unwrap()
1776 .datatype() as i32,
1777 semantic_type: api::v1::SemanticType::Field as i32,
1778 ..Default::default()
1779 },
1780 api::v1::ColumnSchema {
1781 column_name: "v1".to_string(),
1782 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1783 ConcreteDataType::float64_datatype(),
1784 )
1785 .unwrap()
1786 .datatype() as i32,
1787 semantic_type: api::v1::SemanticType::Field as i32,
1788 ..Default::default()
1789 },
1790 ];
1791
1792 let rows = timestamps
1793 .zip(values)
1794 .map(|(ts, v)| Row {
1795 values: vec![
1796 api::v1::Value {
1797 value_data: Some(api::v1::value::ValueData::BinaryValue(
1798 encoded_key.clone(),
1799 )),
1800 },
1801 api::v1::Value {
1802 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1803 },
1804 api::v1::Value {
1805 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1806 },
1807 api::v1::Value {
1808 value_data: v.map(api::v1::value::ValueData::F64Value),
1809 },
1810 ],
1811 })
1812 .collect();
1813
1814 let mutation = api::v1::Mutation {
1815 op_type: 1,
1816 sequence,
1817 rows: Some(api::v1::Rows {
1818 schema: column_schema,
1819 rows,
1820 }),
1821 write_hint: Some(WriteHint {
1822 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1823 }),
1824 };
1825 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1826 }
1827
1828 #[test]
1829 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1830 use api::v1::SemanticType;
1831 use datatypes::schema::ColumnSchema;
1832 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1833 use store_api::storage::RegionId;
1834
1835 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1836 builder
1837 .push_column_metadata(ColumnMetadata {
1838 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1839 semantic_type: SemanticType::Tag,
1840 column_id: 0,
1841 })
1842 .push_column_metadata(ColumnMetadata {
1843 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1844 semantic_type: SemanticType::Tag,
1845 column_id: 1,
1846 })
1847 .push_column_metadata(ColumnMetadata {
1848 column_schema: ColumnSchema::new(
1849 "ts",
1850 ConcreteDataType::timestamp_millisecond_datatype(),
1851 false,
1852 ),
1853 semantic_type: SemanticType::Timestamp,
1854 column_id: 2,
1855 })
1856 .push_column_metadata(ColumnMetadata {
1857 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1858 semantic_type: SemanticType::Field,
1859 column_id: 3,
1860 })
1861 .push_column_metadata(ColumnMetadata {
1862 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1863 semantic_type: SemanticType::Field,
1864 column_id: 4,
1865 })
1866 .primary_key(vec![0, 1])
1867 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1868 let metadata = Arc::new(builder.build().unwrap());
1869
1870 let primary_key_codec = build_primary_key_codec(&metadata);
1871 let schema = to_flat_sst_arrow_schema(
1872 &metadata,
1873 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1874 );
1875
1876 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1877 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1878
1879 let capacity = 100;
1880 let mut converter =
1881 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1882
1883 let key_values1 = build_key_values_with_sparse_encoding(
1884 &metadata,
1885 &primary_key_codec,
1886 2048u32, 100u64, "key11".to_string(),
1889 "key21".to_string(),
1890 vec![1000, 2000].into_iter(),
1891 vec![Some(1.0), Some(2.0)].into_iter(),
1892 1,
1893 );
1894
1895 let key_values2 = build_key_values_with_sparse_encoding(
1896 &metadata,
1897 &primary_key_codec,
1898 4096u32, 200u64, "key12".to_string(),
1901 "key22".to_string(),
1902 vec![1500].into_iter(),
1903 vec![Some(3.0)].into_iter(),
1904 2,
1905 );
1906
1907 converter.append_key_values(&key_values1).unwrap();
1908 converter.append_key_values(&key_values2).unwrap();
1909
1910 let bulk_part = converter.convert().unwrap();
1911
1912 assert_eq!(bulk_part.num_rows(), 3);
1913 assert_eq!(bulk_part.min_timestamp, 1000);
1914 assert_eq!(bulk_part.max_timestamp, 2000);
1915 assert_eq!(bulk_part.sequence, 2);
1916 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1917
1918 let schema = bulk_part.batch.schema();
1922 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1923 assert_eq!(
1924 field_names,
1925 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1926 );
1927
1928 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1930 let dict_array = primary_key_column
1931 .as_any()
1932 .downcast_ref::<DictionaryArray<UInt32Type>>()
1933 .unwrap();
1934
1935 assert!(!dict_array.is_empty());
1937 assert_eq!(dict_array.len(), 3); let values = dict_array
1941 .values()
1942 .as_any()
1943 .downcast_ref::<BinaryArray>()
1944 .unwrap();
1945 for i in 0..values.len() {
1946 assert!(
1947 !values.value(i).is_empty(),
1948 "Encoded primary key should not be empty"
1949 );
1950 }
1951 }
1952
1953 #[test]
1954 fn test_convert_bulk_part_empty() {
1955 let metadata = metadata_for_test();
1956 let schema = to_flat_sst_arrow_schema(
1957 &metadata,
1958 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1959 );
1960 let primary_key_codec = build_primary_key_codec(&metadata);
1961
1962 let empty_batch = RecordBatch::new_empty(schema.clone());
1964 let empty_part = BulkPart {
1965 batch: empty_batch,
1966 max_timestamp: 0,
1967 min_timestamp: 0,
1968 sequence: 0,
1969 timestamp_index: 0,
1970 raw_data: None,
1971 };
1972
1973 let result =
1974 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1975 assert!(result.is_none());
1976 }
1977
1978 #[test]
1979 fn test_convert_bulk_part_dense_with_pk_columns() {
1980 let metadata = metadata_for_test();
1981 let primary_key_codec = build_primary_key_codec(&metadata);
1982
1983 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1984 "key1", "key2", "key1",
1985 ]));
1986 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1987 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1988 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1989 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1990
1991 let input_schema = Arc::new(Schema::new(vec![
1992 Field::new("k0", ArrowDataType::Utf8, false),
1993 Field::new("k1", ArrowDataType::UInt32, false),
1994 Field::new("v0", ArrowDataType::Int64, true),
1995 Field::new("v1", ArrowDataType::Float64, true),
1996 Field::new(
1997 "ts",
1998 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1999 false,
2000 ),
2001 ]));
2002
2003 let input_batch = RecordBatch::try_new(
2004 input_schema,
2005 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2006 )
2007 .unwrap();
2008
2009 let part = BulkPart {
2010 batch: input_batch,
2011 max_timestamp: 2000,
2012 min_timestamp: 1000,
2013 sequence: 5,
2014 timestamp_index: 4,
2015 raw_data: None,
2016 };
2017
2018 let output_schema = to_flat_sst_arrow_schema(
2019 &metadata,
2020 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2021 );
2022
2023 let result = convert_bulk_part(
2024 part,
2025 &metadata,
2026 primary_key_codec,
2027 output_schema,
2028 true, )
2030 .unwrap();
2031
2032 let converted = result.unwrap();
2033
2034 assert_eq!(converted.num_rows(), 3);
2035 assert_eq!(converted.max_timestamp, 2000);
2036 assert_eq!(converted.min_timestamp, 1000);
2037 assert_eq!(converted.sequence, 5);
2038
2039 let schema = converted.batch.schema();
2040 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2041 assert_eq!(
2042 field_names,
2043 vec![
2044 "k0",
2045 "k1",
2046 "v0",
2047 "v1",
2048 "ts",
2049 "__primary_key",
2050 "__sequence",
2051 "__op_type"
2052 ]
2053 );
2054
2055 let k0_col = converted.batch.column_by_name("k0").unwrap();
2056 assert!(matches!(
2057 k0_col.data_type(),
2058 ArrowDataType::Dictionary(_, _)
2059 ));
2060
2061 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2062 let dict_array = pk_col
2063 .as_any()
2064 .downcast_ref::<DictionaryArray<UInt32Type>>()
2065 .unwrap();
2066 let keys = dict_array.keys();
2067
2068 assert_eq!(keys.len(), 3);
2069 }
2070
2071 #[test]
2072 fn test_convert_bulk_part_dense_without_pk_columns() {
2073 let metadata = metadata_for_test();
2074 let primary_key_codec = build_primary_key_codec(&metadata);
2075
2076 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2078 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2079 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2080 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2081 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2082
2083 let input_schema = Arc::new(Schema::new(vec![
2084 Field::new("k0", ArrowDataType::Utf8, false),
2085 Field::new("k1", ArrowDataType::UInt32, false),
2086 Field::new("v0", ArrowDataType::Int64, true),
2087 Field::new("v1", ArrowDataType::Float64, true),
2088 Field::new(
2089 "ts",
2090 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2091 false,
2092 ),
2093 ]));
2094
2095 let input_batch = RecordBatch::try_new(
2096 input_schema,
2097 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2098 )
2099 .unwrap();
2100
2101 let part = BulkPart {
2102 batch: input_batch,
2103 max_timestamp: 2000,
2104 min_timestamp: 1000,
2105 sequence: 3,
2106 timestamp_index: 4,
2107 raw_data: None,
2108 };
2109
2110 let output_schema = to_flat_sst_arrow_schema(
2111 &metadata,
2112 &FlatSchemaOptions {
2113 raw_pk_columns: false,
2114 string_pk_use_dict: true,
2115 },
2116 );
2117
2118 let result = convert_bulk_part(
2119 part,
2120 &metadata,
2121 primary_key_codec,
2122 output_schema,
2123 false, )
2125 .unwrap();
2126
2127 let converted = result.unwrap();
2128
2129 assert_eq!(converted.num_rows(), 2);
2130 assert_eq!(converted.max_timestamp, 2000);
2131 assert_eq!(converted.min_timestamp, 1000);
2132 assert_eq!(converted.sequence, 3);
2133
2134 let schema = converted.batch.schema();
2136 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2137 assert_eq!(
2138 field_names,
2139 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2140 );
2141
2142 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2144 assert!(matches!(
2145 pk_col.data_type(),
2146 ArrowDataType::Dictionary(_, _)
2147 ));
2148 }
2149
2150 #[test]
2151 fn test_convert_bulk_part_sparse_encoding() {
2152 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2153 builder
2154 .push_column_metadata(ColumnMetadata {
2155 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2156 semantic_type: SemanticType::Tag,
2157 column_id: 0,
2158 })
2159 .push_column_metadata(ColumnMetadata {
2160 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2161 semantic_type: SemanticType::Tag,
2162 column_id: 1,
2163 })
2164 .push_column_metadata(ColumnMetadata {
2165 column_schema: ColumnSchema::new(
2166 "ts",
2167 ConcreteDataType::timestamp_millisecond_datatype(),
2168 false,
2169 ),
2170 semantic_type: SemanticType::Timestamp,
2171 column_id: 2,
2172 })
2173 .push_column_metadata(ColumnMetadata {
2174 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2175 semantic_type: SemanticType::Field,
2176 column_id: 3,
2177 })
2178 .push_column_metadata(ColumnMetadata {
2179 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2180 semantic_type: SemanticType::Field,
2181 column_id: 4,
2182 })
2183 .primary_key(vec![0, 1])
2184 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2185 let metadata = Arc::new(builder.build().unwrap());
2186
2187 let primary_key_codec = build_primary_key_codec(&metadata);
2188
2189 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2191 b"encoded_key_1".as_slice(),
2192 b"encoded_key_2".as_slice(),
2193 ]));
2194 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2195 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2196 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2197
2198 let input_schema = Arc::new(Schema::new(vec![
2199 Field::new("__primary_key", ArrowDataType::Binary, false),
2200 Field::new("v0", ArrowDataType::Int64, true),
2201 Field::new("v1", ArrowDataType::Float64, true),
2202 Field::new(
2203 "ts",
2204 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2205 false,
2206 ),
2207 ]));
2208
2209 let input_batch =
2210 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2211 .unwrap();
2212
2213 let part = BulkPart {
2214 batch: input_batch,
2215 max_timestamp: 2000,
2216 min_timestamp: 1000,
2217 sequence: 7,
2218 timestamp_index: 3,
2219 raw_data: None,
2220 };
2221
2222 let output_schema = to_flat_sst_arrow_schema(
2223 &metadata,
2224 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2225 );
2226
2227 let result = convert_bulk_part(
2228 part,
2229 &metadata,
2230 primary_key_codec,
2231 output_schema,
2232 true, )
2234 .unwrap();
2235
2236 let converted = result.unwrap();
2237
2238 assert_eq!(converted.num_rows(), 2);
2239 assert_eq!(converted.max_timestamp, 2000);
2240 assert_eq!(converted.min_timestamp, 1000);
2241 assert_eq!(converted.sequence, 7);
2242
2243 let schema = converted.batch.schema();
2245 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2246 assert_eq!(
2247 field_names,
2248 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2249 );
2250
2251 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2253 assert!(matches!(
2254 pk_col.data_type(),
2255 ArrowDataType::Dictionary(_, _)
2256 ));
2257 }
2258
2259 #[test]
2260 fn test_convert_bulk_part_sorting_with_multiple_series() {
2261 let metadata = metadata_for_test();
2262 let primary_key_codec = build_primary_key_codec(&metadata);
2263
2264 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2266 "series_b", "series_a", "series_b", "series_a",
2267 ]));
2268 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2269 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2270 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2271 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2272 2000, 1000, 4000, 3000,
2273 ]));
2274
2275 let input_schema = Arc::new(Schema::new(vec![
2276 Field::new("k0", ArrowDataType::Utf8, false),
2277 Field::new("k1", ArrowDataType::UInt32, false),
2278 Field::new("v0", ArrowDataType::Int64, true),
2279 Field::new("v1", ArrowDataType::Float64, true),
2280 Field::new(
2281 "ts",
2282 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2283 false,
2284 ),
2285 ]));
2286
2287 let input_batch = RecordBatch::try_new(
2288 input_schema,
2289 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2290 )
2291 .unwrap();
2292
2293 let part = BulkPart {
2294 batch: input_batch,
2295 max_timestamp: 4000,
2296 min_timestamp: 1000,
2297 sequence: 10,
2298 timestamp_index: 4,
2299 raw_data: None,
2300 };
2301
2302 let output_schema = to_flat_sst_arrow_schema(
2303 &metadata,
2304 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2305 );
2306
2307 let result =
2308 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2309
2310 let converted = result.unwrap();
2311
2312 assert_eq!(converted.num_rows(), 4);
2313
2314 let ts_col = converted.batch.column(converted.timestamp_index);
2316 let ts_array = ts_col
2317 .as_any()
2318 .downcast_ref::<TimestampMillisecondArray>()
2319 .unwrap();
2320
2321 let timestamps: Vec<i64> = ts_array.values().to_vec();
2325 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2326 }
2327}