1use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64 InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
75use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
76
77const INIT_DICT_VALUE_CAPACITY: usize = 8;
78
79#[derive(Clone)]
81pub struct BulkPart {
82 pub batch: RecordBatch,
83 pub max_timestamp: i64,
84 pub min_timestamp: i64,
85 pub sequence: u64,
86 pub timestamp_index: usize,
87 pub raw_data: Option<ArrowIpc>,
88}
89
90impl TryFrom<BulkWalEntry> for BulkPart {
91 type Error = error::Error;
92
93 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
94 match value.body.expect("Entry payload should be present") {
95 Body::ArrowIpc(ipc) => {
96 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
97 .context(error::ConvertBulkWalEntrySnafu)?;
98 let batch = decoder
99 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
100 .context(error::ConvertBulkWalEntrySnafu)?;
101 Ok(Self {
102 batch,
103 max_timestamp: value.max_ts,
104 min_timestamp: value.min_ts,
105 sequence: value.sequence,
106 timestamp_index: value.timestamp_index as usize,
107 raw_data: Some(ipc),
108 })
109 }
110 }
111 }
112}
113
114impl TryFrom<&BulkPart> for BulkWalEntry {
115 type Error = error::Error;
116
117 fn try_from(value: &BulkPart) -> Result<Self> {
118 if let Some(ipc) = &value.raw_data {
119 Ok(BulkWalEntry {
120 sequence: value.sequence,
121 max_ts: value.max_timestamp,
122 min_ts: value.min_timestamp,
123 timestamp_index: value.timestamp_index as u32,
124 body: Some(Body::ArrowIpc(ipc.clone())),
125 })
126 } else {
127 let mut encoder = FlightEncoder::default();
128 let schema_bytes = encoder
129 .encode_schema(value.batch.schema().as_ref())
130 .data_header;
131 let [rb_data] = encoder
132 .encode(FlightMessage::RecordBatch(value.batch.clone()))
133 .try_into()
134 .map_err(|_| {
135 error::UnsupportedOperationSnafu {
136 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
137 }
138 .build()
139 })?;
140 Ok(BulkWalEntry {
141 sequence: value.sequence,
142 max_ts: value.max_timestamp,
143 min_ts: value.min_timestamp,
144 timestamp_index: value.timestamp_index as u32,
145 body: Some(Body::ArrowIpc(ArrowIpc {
146 schema: schema_bytes,
147 data_header: rb_data.data_header,
148 payload: rb_data.data_body,
149 })),
150 })
151 }
152 }
153}
154
155impl BulkPart {
156 pub(crate) fn estimated_size(&self) -> usize {
157 record_batch_estimated_size(&self.batch)
158 }
159
160 pub fn estimated_series_count(&self) -> usize {
163 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
164 let pk_column = self.batch.column(pk_column_idx);
165 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
166 dict_array.values().len()
167 } else {
168 0
169 }
170 }
171
172 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
174 let ts_type = region_metadata.time_index_type();
175 let min_ts = ts_type.create_timestamp(self.min_timestamp);
176 let max_ts = ts_type.create_timestamp(self.max_timestamp);
177
178 MemtableStats {
179 estimated_bytes: self.estimated_size(),
180 time_range: Some((min_ts, max_ts)),
181 num_rows: self.num_rows(),
182 num_ranges: 1,
183 max_sequence: self.sequence,
184 series_count: self.estimated_series_count(),
185 }
186 }
187
188 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
198 let batch_schema = self.batch.schema();
200 let batch_columns: HashSet<_> = batch_schema
201 .fields()
202 .iter()
203 .map(|f| f.name().as_str())
204 .collect();
205
206 let mut columns_to_fill = Vec::new();
208 for column_meta in ®ion_metadata.column_metadatas {
209 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
212 columns_to_fill.push(column_meta);
213 }
214 }
215
216 if columns_to_fill.is_empty() {
217 return Ok(());
218 }
219
220 let num_rows = self.batch.num_rows();
221
222 let mut new_columns = Vec::new();
223 let mut new_fields = Vec::new();
224
225 new_fields.extend(batch_schema.fields().iter().cloned());
227 new_columns.extend_from_slice(self.batch.columns());
228
229 let region_id = region_metadata.region_id;
230 for column_meta in columns_to_fill {
232 let default_vector = column_meta
233 .column_schema
234 .create_default_vector(num_rows)
235 .context(CreateDefaultSnafu {
236 region_id,
237 column: &column_meta.column_schema.name,
238 })?
239 .with_context(|| InvalidRequestSnafu {
240 region_id,
241 reason: format!(
242 "column {} does not have default value",
243 column_meta.column_schema.name
244 ),
245 })?;
246 let arrow_array = default_vector.to_arrow_array();
247 column_meta.column_schema.data_type.as_arrow_type();
248
249 new_fields.push(Arc::new(Field::new(
250 column_meta.column_schema.name.clone(),
251 column_meta.column_schema.data_type.as_arrow_type(),
252 column_meta.column_schema.is_nullable(),
253 )));
254 new_columns.push(arrow_array);
255 }
256
257 let new_schema = Arc::new(Schema::new(new_fields));
259 let new_batch =
260 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
261
262 self.batch = new_batch;
264
265 Ok(())
266 }
267
268 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
270 let vectors = region_metadata
271 .schema
272 .column_schemas()
273 .iter()
274 .map(|col| match self.batch.column_by_name(&col.name) {
275 None => Ok(None),
276 Some(col) => Helper::try_into_vector(col).map(Some),
277 })
278 .collect::<datatypes::error::Result<Vec<_>>>()
279 .context(error::ComputeVectorSnafu)?;
280
281 let rows = (0..self.num_rows())
282 .map(|row_idx| {
283 let values = (0..self.batch.num_columns())
284 .map(|col_idx| {
285 if let Some(v) = &vectors[col_idx] {
286 to_grpc_value(v.get(row_idx))
287 } else {
288 api::v1::Value { value_data: None }
289 }
290 })
291 .collect::<Vec<_>>();
292 api::v1::Row { values }
293 })
294 .collect::<Vec<_>>();
295
296 let schema = region_metadata
297 .column_metadatas
298 .iter()
299 .map(|c| {
300 let data_type_wrapper =
301 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
302 Ok(api::v1::ColumnSchema {
303 column_name: c.column_schema.name.clone(),
304 datatype: data_type_wrapper.datatype() as i32,
305 semantic_type: c.semantic_type as i32,
306 ..Default::default()
307 })
308 })
309 .collect::<api::error::Result<Vec<_>>>()
310 .context(error::ConvertColumnDataTypeSnafu {
311 reason: "failed to convert region metadata to column schema",
312 })?;
313
314 let rows = api::v1::Rows { schema, rows };
315
316 Ok(Mutation {
317 op_type: OpType::Put as i32,
318 sequence: self.sequence,
319 rows: Some(rows),
320 write_hint: None,
321 })
322 }
323
324 pub fn timestamps(&self) -> &ArrayRef {
325 self.batch.column(self.timestamp_index)
326 }
327
328 pub fn num_rows(&self) -> usize {
329 self.batch.num_rows()
330 }
331}
332
333pub struct UnorderedPart {
336 parts: Vec<BulkPart>,
338 total_rows: usize,
340 min_timestamp: i64,
342 max_timestamp: i64,
344 max_sequence: u64,
346 threshold: usize,
348 compact_threshold: usize,
350}
351
352impl Default for UnorderedPart {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358impl UnorderedPart {
359 pub fn new() -> Self {
361 Self {
362 parts: Vec::new(),
363 total_rows: 0,
364 min_timestamp: i64::MAX,
365 max_timestamp: i64::MIN,
366 max_sequence: 0,
367 threshold: 1024,
368 compact_threshold: 4096,
369 }
370 }
371
372 pub fn set_threshold(&mut self, threshold: usize) {
374 self.threshold = threshold;
375 }
376
377 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
379 self.compact_threshold = compact_threshold;
380 }
381
382 pub fn threshold(&self) -> usize {
384 self.threshold
385 }
386
387 pub fn compact_threshold(&self) -> usize {
389 self.compact_threshold
390 }
391
392 pub fn should_accept(&self, num_rows: usize) -> bool {
394 num_rows < self.threshold
395 }
396
397 pub fn should_compact(&self) -> bool {
399 self.total_rows >= self.compact_threshold
400 }
401
402 pub fn push(&mut self, part: BulkPart) {
404 self.total_rows += part.num_rows();
405 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
406 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
407 self.max_sequence = self.max_sequence.max(part.sequence);
408 self.parts.push(part);
409 }
410
411 pub fn num_rows(&self) -> usize {
413 self.total_rows
414 }
415
416 pub fn is_empty(&self) -> bool {
418 self.parts.is_empty()
419 }
420
421 pub fn num_parts(&self) -> usize {
423 self.parts.len()
424 }
425
426 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
429 if self.parts.is_empty() {
430 return Ok(None);
431 }
432
433 if self.parts.len() == 1 {
434 return Ok(Some(self.parts[0].batch.clone()));
436 }
437
438 let schema = self.parts[0].batch.schema();
440
441 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
443 let concatenated =
444 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
445
446 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
448
449 Ok(Some(sorted_batch))
450 }
451
452 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
455 let Some(sorted_batch) = self.concat_and_sort()? else {
456 return Ok(None);
457 };
458
459 let timestamp_index = self.parts[0].timestamp_index;
460
461 Ok(Some(BulkPart {
462 batch: sorted_batch,
463 max_timestamp: self.max_timestamp,
464 min_timestamp: self.min_timestamp,
465 sequence: self.max_sequence,
466 timestamp_index,
467 raw_data: None,
468 }))
469 }
470
471 pub fn clear(&mut self) {
473 self.parts.clear();
474 self.total_rows = 0;
475 self.min_timestamp = i64::MAX;
476 self.max_timestamp = i64::MIN;
477 self.max_sequence = 0;
478 }
479}
480
481pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
483 batch
484 .columns()
485 .iter()
486 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
488 .sum()
489}
490
491enum PrimaryKeyColumnBuilder {
493 StringDict(StringDictionaryBuilder<UInt32Type>),
495 Vector(Box<dyn MutableVector>),
497}
498
499impl PrimaryKeyColumnBuilder {
500 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
502 match self {
503 PrimaryKeyColumnBuilder::StringDict(builder) => {
504 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
505 builder.append_value(s);
507 } else {
508 builder.append_null();
509 }
510 }
511 PrimaryKeyColumnBuilder::Vector(builder) => {
512 builder.push_value_ref(&value);
513 }
514 }
515 Ok(())
516 }
517
518 fn into_arrow_array(self) -> ArrayRef {
520 match self {
521 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
522 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
523 }
524 }
525}
526
527pub struct BulkPartConverter {
529 region_metadata: RegionMetadataRef,
531 schema: SchemaRef,
533 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
535 key_buf: Vec<u8>,
537 key_array_builder: PrimaryKeyArrayBuilder,
539 value_builder: ValueBuilder,
541 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
544
545 max_ts: i64,
547 min_ts: i64,
549 max_sequence: SequenceNumber,
551}
552
553impl BulkPartConverter {
554 pub fn new(
559 region_metadata: &RegionMetadataRef,
560 schema: SchemaRef,
561 capacity: usize,
562 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
563 store_primary_key_columns: bool,
564 ) -> Self {
565 debug_assert_eq!(
566 region_metadata.primary_key_encoding,
567 primary_key_codec.encoding()
568 );
569
570 let primary_key_column_builders = if store_primary_key_columns
571 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
572 {
573 new_primary_key_column_builders(region_metadata, capacity)
574 } else {
575 Vec::new()
576 };
577
578 Self {
579 region_metadata: region_metadata.clone(),
580 schema,
581 primary_key_codec,
582 key_buf: Vec::new(),
583 key_array_builder: PrimaryKeyArrayBuilder::new(),
584 value_builder: ValueBuilder::new(region_metadata, capacity),
585 primary_key_column_builders,
586 min_ts: i64::MAX,
587 max_ts: i64::MIN,
588 max_sequence: SequenceNumber::MIN,
589 }
590 }
591
592 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
594 for kv in key_values.iter() {
595 self.append_key_value(&kv)?;
596 }
597
598 Ok(())
599 }
600
601 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
605 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
607 let mut primary_keys = kv.primary_keys();
610 if let Some(encoded) = primary_keys
611 .next()
612 .context(ColumnNotFoundSnafu {
613 column: PRIMARY_KEY_COLUMN_NAME,
614 })?
615 .try_into_binary()
616 .context(DataTypeMismatchSnafu)?
617 {
618 self.key_array_builder
619 .append(encoded)
620 .context(ComputeArrowSnafu)?;
621 } else {
622 self.key_array_builder
623 .append("")
624 .context(ComputeArrowSnafu)?;
625 }
626 } else {
627 self.key_buf.clear();
629 self.primary_key_codec
630 .encode_key_value(kv, &mut self.key_buf)
631 .context(EncodeSnafu)?;
632 self.key_array_builder
633 .append(&self.key_buf)
634 .context(ComputeArrowSnafu)?;
635 };
636
637 if !self.primary_key_column_builders.is_empty() {
639 for (builder, pk_value) in self
640 .primary_key_column_builders
641 .iter_mut()
642 .zip(kv.primary_keys())
643 {
644 builder.push_value_ref(pk_value)?;
645 }
646 }
647
648 self.value_builder.push(
650 kv.timestamp(),
651 kv.sequence(),
652 kv.op_type() as u8,
653 kv.fields(),
654 );
655
656 let ts = kv
659 .timestamp()
660 .try_into_timestamp()
661 .unwrap()
662 .unwrap()
663 .value();
664 self.min_ts = self.min_ts.min(ts);
665 self.max_ts = self.max_ts.max(ts);
666 self.max_sequence = self.max_sequence.max(kv.sequence());
667
668 Ok(())
669 }
670
671 pub fn convert(mut self) -> Result<BulkPart> {
675 let values = Values::from(self.value_builder);
676 let mut columns =
677 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
678
679 for builder in self.primary_key_column_builders {
681 columns.push(builder.into_arrow_array());
682 }
683 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
685 let timestamp_index = columns.len();
687 columns.push(values.timestamp.to_arrow_array());
688 let pk_array = self.key_array_builder.finish();
690 columns.push(Arc::new(pk_array));
691 columns.push(values.sequence.to_arrow_array());
693 columns.push(values.op_type.to_arrow_array());
694
695 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
696 let batch = sort_primary_key_record_batch(&batch)?;
698
699 Ok(BulkPart {
700 batch,
701 max_timestamp: self.max_ts,
702 min_timestamp: self.min_ts,
703 sequence: self.max_sequence,
704 timestamp_index,
705 raw_data: None,
706 })
707 }
708}
709
710fn new_primary_key_column_builders(
711 metadata: &RegionMetadata,
712 capacity: usize,
713) -> Vec<PrimaryKeyColumnBuilder> {
714 metadata
715 .primary_key_columns()
716 .map(|col| {
717 if col.column_schema.data_type.is_string() {
718 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
719 capacity,
720 INIT_DICT_VALUE_CAPACITY,
721 capacity,
722 ))
723 } else {
724 PrimaryKeyColumnBuilder::Vector(
725 col.column_schema.data_type.create_mutable_vector(capacity),
726 )
727 }
728 })
729 .collect()
730}
731
732pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
734 let total_columns = batch.num_columns();
735 let sort_columns = vec![
736 SortColumn {
738 values: batch.column(total_columns - 3).clone(),
739 options: Some(SortOptions {
740 descending: false,
741 nulls_first: true,
742 }),
743 },
744 SortColumn {
746 values: batch.column(total_columns - 4).clone(),
747 options: Some(SortOptions {
748 descending: false,
749 nulls_first: true,
750 }),
751 },
752 SortColumn {
754 values: batch.column(total_columns - 2).clone(),
755 options: Some(SortOptions {
756 descending: true,
757 nulls_first: true,
758 }),
759 },
760 ];
761
762 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
763 .context(ComputeArrowSnafu)?;
764
765 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
766}
767
768pub fn convert_bulk_part(
794 part: BulkPart,
795 region_metadata: &RegionMetadataRef,
796 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
797 schema: SchemaRef,
798 store_primary_key_columns: bool,
799) -> Result<Option<BulkPart>> {
800 if part.num_rows() == 0 {
801 return Ok(None);
802 }
803
804 let num_rows = part.num_rows();
805 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
806
807 let input_schema = part.batch.schema();
809 let column_indices: HashMap<&str, usize> = input_schema
810 .fields()
811 .iter()
812 .enumerate()
813 .map(|(idx, field)| (field.name().as_str(), idx))
814 .collect();
815
816 let mut output_columns = Vec::new();
818
819 let pk_array = if is_sparse {
821 None
824 } else {
825 let pk_vectors: Result<Vec<_>> = region_metadata
827 .primary_key_columns()
828 .map(|col_meta| {
829 let col_idx = column_indices
830 .get(col_meta.column_schema.name.as_str())
831 .context(ColumnNotFoundSnafu {
832 column: &col_meta.column_schema.name,
833 })?;
834 let col = part.batch.column(*col_idx);
835 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
836 })
837 .collect();
838 let pk_vectors = pk_vectors?;
839
840 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
841 let mut encode_buf = Vec::new();
842
843 for row_idx in 0..num_rows {
844 encode_buf.clear();
845
846 let pk_values_with_ids: Vec<_> = region_metadata
848 .primary_key
849 .iter()
850 .zip(pk_vectors.iter())
851 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
852 .collect();
853
854 primary_key_codec
856 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
857 .context(EncodeSnafu)?;
858
859 key_array_builder
860 .append(&encode_buf)
861 .context(ComputeArrowSnafu)?;
862 }
863
864 Some(key_array_builder.finish())
865 };
866
867 if store_primary_key_columns && !is_sparse {
869 for col_meta in region_metadata.primary_key_columns() {
870 let col_idx = column_indices
871 .get(col_meta.column_schema.name.as_str())
872 .context(ColumnNotFoundSnafu {
873 column: &col_meta.column_schema.name,
874 })?;
875 let col = part.batch.column(*col_idx);
876
877 let col = if col_meta.column_schema.data_type.is_string() {
879 let target_type = ArrowDataType::Dictionary(
880 Box::new(ArrowDataType::UInt32),
881 Box::new(ArrowDataType::Utf8),
882 );
883 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
884 } else {
885 col.clone()
886 };
887 output_columns.push(col);
888 }
889 }
890
891 for col_meta in region_metadata.field_columns() {
893 let col_idx = column_indices
894 .get(col_meta.column_schema.name.as_str())
895 .context(ColumnNotFoundSnafu {
896 column: &col_meta.column_schema.name,
897 })?;
898 output_columns.push(part.batch.column(*col_idx).clone());
899 }
900
901 let new_timestamp_index = output_columns.len();
903 let ts_col_idx = column_indices
904 .get(
905 region_metadata
906 .time_index_column()
907 .column_schema
908 .name
909 .as_str(),
910 )
911 .context(ColumnNotFoundSnafu {
912 column: ®ion_metadata.time_index_column().column_schema.name,
913 })?;
914 output_columns.push(part.batch.column(*ts_col_idx).clone());
915
916 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
918 Arc::new(pk_dict_array) as ArrayRef
919 } else {
920 let pk_col_idx =
921 column_indices
922 .get(PRIMARY_KEY_COLUMN_NAME)
923 .context(ColumnNotFoundSnafu {
924 column: PRIMARY_KEY_COLUMN_NAME,
925 })?;
926 let col = part.batch.column(*pk_col_idx);
927
928 let target_type = ArrowDataType::Dictionary(
930 Box::new(ArrowDataType::UInt32),
931 Box::new(ArrowDataType::Binary),
932 );
933 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
934 };
935 output_columns.push(pk_dictionary);
936
937 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
938 output_columns.push(Arc::new(sequence_array) as ArrayRef);
939
940 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
941 output_columns.push(Arc::new(op_type_array) as ArrayRef);
942
943 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
944
945 let sorted_batch = sort_primary_key_record_batch(&batch)?;
947
948 Ok(Some(BulkPart {
949 batch: sorted_batch,
950 max_timestamp: part.max_timestamp,
951 min_timestamp: part.min_timestamp,
952 sequence: part.sequence,
953 timestamp_index: new_timestamp_index,
954 raw_data: None,
955 }))
956}
957
958#[derive(Debug, Clone)]
959pub struct EncodedBulkPart {
960 data: Bytes,
961 metadata: BulkPartMeta,
962}
963
964impl EncodedBulkPart {
965 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
966 Self { data, metadata }
967 }
968
969 pub(crate) fn metadata(&self) -> &BulkPartMeta {
970 &self.metadata
971 }
972
973 pub(crate) fn size_bytes(&self) -> usize {
975 self.data.len()
976 }
977
978 pub(crate) fn data(&self) -> &Bytes {
980 &self.data
981 }
982
983 pub fn to_memtable_stats(&self) -> MemtableStats {
985 let meta = &self.metadata;
986 let ts_type = meta.region_metadata.time_index_type();
987 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
988 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
989
990 MemtableStats {
991 estimated_bytes: self.size_bytes(),
992 time_range: Some((min_ts, max_ts)),
993 num_rows: meta.num_rows,
994 num_ranges: 1,
995 max_sequence: meta.max_sequence,
996 series_count: meta.num_series as usize,
997 }
998 }
999
1000 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1008 let unit = self.metadata.region_metadata.time_index_type().unit();
1009 let max_row_group_uncompressed_size: u64 = self
1010 .metadata
1011 .parquet_metadata
1012 .row_groups()
1013 .iter()
1014 .map(|rg| {
1015 rg.columns()
1016 .iter()
1017 .map(|c| c.uncompressed_size() as u64)
1018 .sum::<u64>()
1019 })
1020 .max()
1021 .unwrap_or(0);
1022 SstInfo {
1023 file_id,
1024 time_range: (
1025 Timestamp::new(self.metadata.min_timestamp, unit),
1026 Timestamp::new(self.metadata.max_timestamp, unit),
1027 ),
1028 file_size: self.data.len() as u64,
1029 max_row_group_uncompressed_size,
1030 num_rows: self.metadata.num_rows,
1031 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1032 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1033 index_metadata: IndexOutput::default(),
1034 num_series: self.metadata.num_series,
1035 }
1036 }
1037
1038 pub(crate) fn read(
1039 &self,
1040 context: BulkIterContextRef,
1041 sequence: Option<SequenceRange>,
1042 mem_scan_metrics: Option<MemScanMetrics>,
1043 ) -> Result<Option<BoxedRecordBatchIterator>> {
1044 let skip_fields_for_pruning =
1046 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1047
1048 let row_groups_to_read =
1050 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1051
1052 if row_groups_to_read.is_empty() {
1053 return Ok(None);
1055 }
1056
1057 let iter = EncodedBulkPartIter::try_new(
1058 self,
1059 context,
1060 row_groups_to_read,
1061 sequence,
1062 mem_scan_metrics,
1063 )?;
1064 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1065 }
1066
1067 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1069 match pre_filter_mode {
1070 PreFilterMode::All => false,
1071 PreFilterMode::SkipFields => true,
1072 PreFilterMode::SkipFieldsOnDelete => {
1073 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1075 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1076 })
1077 }
1078 }
1079 }
1080}
1081
1082#[derive(Debug, Clone)]
1083pub struct BulkPartMeta {
1084 pub num_rows: usize,
1086 pub max_timestamp: i64,
1088 pub min_timestamp: i64,
1090 pub parquet_metadata: Arc<ParquetMetaData>,
1092 pub region_metadata: RegionMetadataRef,
1094 pub num_series: u64,
1096 pub max_sequence: u64,
1098}
1099
1100#[derive(Default, Debug)]
1102pub struct BulkPartEncodeMetrics {
1103 pub iter_cost: Duration,
1105 pub write_cost: Duration,
1107 pub raw_size: usize,
1109 pub encoded_size: usize,
1111 pub num_rows: usize,
1113}
1114
1115pub struct BulkPartEncoder {
1116 metadata: RegionMetadataRef,
1117 row_group_size: usize,
1118 writer_props: Option<WriterProperties>,
1119}
1120
1121impl BulkPartEncoder {
1122 pub(crate) fn new(
1123 metadata: RegionMetadataRef,
1124 row_group_size: usize,
1125 ) -> Result<BulkPartEncoder> {
1126 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1128 let key_value_meta =
1129 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1130
1131 let writer_props = Some(
1133 WriterProperties::builder()
1134 .set_key_value_metadata(Some(vec![key_value_meta]))
1135 .set_write_batch_size(row_group_size)
1136 .set_max_row_group_size(row_group_size)
1137 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1138 .set_column_index_truncate_length(None)
1139 .set_statistics_truncate_length(None)
1140 .build(),
1141 );
1142
1143 Ok(Self {
1144 metadata,
1145 row_group_size,
1146 writer_props,
1147 })
1148 }
1149}
1150
1151impl BulkPartEncoder {
1152 pub fn encode_record_batch_iter(
1154 &self,
1155 iter: BoxedRecordBatchIterator,
1156 arrow_schema: SchemaRef,
1157 min_timestamp: i64,
1158 max_timestamp: i64,
1159 max_sequence: u64,
1160 metrics: &mut BulkPartEncodeMetrics,
1161 ) -> Result<Option<EncodedBulkPart>> {
1162 let mut buf = Vec::with_capacity(4096);
1163 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1164 .context(EncodeMemtableSnafu)?;
1165 let mut total_rows = 0;
1166 let mut series_estimator = SeriesEstimator::default();
1167
1168 let mut iter_start = Instant::now();
1170 for batch_result in iter {
1171 metrics.iter_cost += iter_start.elapsed();
1172 let batch = batch_result?;
1173 if batch.num_rows() == 0 {
1174 continue;
1175 }
1176
1177 series_estimator.update_flat(&batch);
1178 metrics.raw_size += record_batch_estimated_size(&batch);
1179 let write_start = Instant::now();
1180 writer.write(&batch).context(EncodeMemtableSnafu)?;
1181 metrics.write_cost += write_start.elapsed();
1182 total_rows += batch.num_rows();
1183 iter_start = Instant::now();
1184 }
1185 metrics.iter_cost += iter_start.elapsed();
1186 iter_start = Instant::now();
1187
1188 if total_rows == 0 {
1189 return Ok(None);
1190 }
1191
1192 let close_start = Instant::now();
1193 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1194 metrics.write_cost += close_start.elapsed();
1195 metrics.encoded_size += buf.len();
1196 metrics.num_rows += total_rows;
1197
1198 let buf = Bytes::from(buf);
1199 let parquet_metadata = Arc::new(file_metadata);
1200 let num_series = series_estimator.finish();
1201
1202 Ok(Some(EncodedBulkPart {
1203 data: buf,
1204 metadata: BulkPartMeta {
1205 num_rows: total_rows,
1206 max_timestamp,
1207 min_timestamp,
1208 parquet_metadata,
1209 region_metadata: self.metadata.clone(),
1210 num_series,
1211 max_sequence,
1212 },
1213 }))
1214 }
1215
1216 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1218 if part.batch.num_rows() == 0 {
1219 return Ok(None);
1220 }
1221
1222 let mut buf = Vec::with_capacity(4096);
1223 let arrow_schema = part.batch.schema();
1224
1225 let file_metadata = {
1226 let mut writer =
1227 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1228 .context(EncodeMemtableSnafu)?;
1229 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1230 writer.finish().context(EncodeMemtableSnafu)?
1231 };
1232
1233 let buf = Bytes::from(buf);
1234 let parquet_metadata = Arc::new(file_metadata);
1235
1236 Ok(Some(EncodedBulkPart {
1237 data: buf,
1238 metadata: BulkPartMeta {
1239 num_rows: part.batch.num_rows(),
1240 max_timestamp: part.max_timestamp,
1241 min_timestamp: part.min_timestamp,
1242 parquet_metadata,
1243 region_metadata: self.metadata.clone(),
1244 num_series: part.estimated_series_count() as u64,
1245 max_sequence: part.sequence,
1246 },
1247 }))
1248 }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use api::v1::{Row, SemanticType, WriteHint};
1254 use datafusion_common::ScalarValue;
1255 use datatypes::arrow::array::Float64Array;
1256 use datatypes::prelude::{ConcreteDataType, Value};
1257 use datatypes::schema::ColumnSchema;
1258 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1259 use store_api::storage::RegionId;
1260 use store_api::storage::consts::ReservedColumnId;
1261
1262 use super::*;
1263 use crate::memtable::bulk::context::BulkIterContext;
1264 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1265 use crate::test_util::memtable_util::{
1266 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1267 };
1268
1269 struct MutationInput<'a> {
1270 k0: &'a str,
1271 k1: u32,
1272 timestamps: &'a [i64],
1273 v1: &'a [Option<f64>],
1274 sequence: u64,
1275 }
1276
1277 #[derive(Debug, PartialOrd, PartialEq)]
1278 struct BatchOutput<'a> {
1279 pk_values: &'a [Value],
1280 timestamps: &'a [i64],
1281 v1: &'a [Option<f64>],
1282 }
1283
1284 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1285 let metadata = metadata_for_test();
1286 let kvs = input
1287 .iter()
1288 .map(|m| {
1289 build_key_values_with_ts_seq_values(
1290 &metadata,
1291 m.k0.to_string(),
1292 m.k1,
1293 m.timestamps.iter().copied(),
1294 m.v1.iter().copied(),
1295 m.sequence,
1296 )
1297 })
1298 .collect::<Vec<_>>();
1299 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1300 let primary_key_codec = build_primary_key_codec(&metadata);
1301 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1302 for kv in kvs {
1303 converter.append_key_values(&kv).unwrap();
1304 }
1305 let part = converter.convert().unwrap();
1306 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1307 encoder.encode_part(&part).unwrap().unwrap()
1308 }
1309
1310 #[test]
1311 fn test_write_and_read_part_projection() {
1312 let part = encode(&[
1313 MutationInput {
1314 k0: "a",
1315 k1: 0,
1316 timestamps: &[1],
1317 v1: &[Some(0.1)],
1318 sequence: 0,
1319 },
1320 MutationInput {
1321 k0: "b",
1322 k1: 0,
1323 timestamps: &[1],
1324 v1: &[Some(0.0)],
1325 sequence: 0,
1326 },
1327 MutationInput {
1328 k0: "a",
1329 k1: 0,
1330 timestamps: &[2],
1331 v1: &[Some(0.2)],
1332 sequence: 1,
1333 },
1334 ]);
1335
1336 let projection = &[4u32];
1337 let mut reader = part
1338 .read(
1339 Arc::new(
1340 BulkIterContext::new(
1341 part.metadata.region_metadata.clone(),
1342 Some(projection.as_slice()),
1343 None,
1344 false,
1345 )
1346 .unwrap(),
1347 ),
1348 None,
1349 None,
1350 )
1351 .unwrap()
1352 .expect("expect at least one row group");
1353
1354 let mut total_rows_read = 0;
1355 let mut field: Vec<f64> = vec![];
1356 for res in reader {
1357 let batch = res.unwrap();
1358 assert_eq!(5, batch.num_columns());
1359 field.extend_from_slice(
1360 batch
1361 .column(0)
1362 .as_any()
1363 .downcast_ref::<Float64Array>()
1364 .unwrap()
1365 .values(),
1366 );
1367 total_rows_read += batch.num_rows();
1368 }
1369 assert_eq!(3, total_rows_read);
1370 assert_eq!(vec![0.1, 0.2, 0.0], field);
1371 }
1372
1373 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1374 let metadata = metadata_for_test();
1375 let kvs = key_values
1376 .into_iter()
1377 .map(|(k0, k1, (start, end), sequence)| {
1378 let ts = (start..end);
1379 let v1 = (start..end).map(|_| None);
1380 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1381 })
1382 .collect::<Vec<_>>();
1383 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1384 let primary_key_codec = build_primary_key_codec(&metadata);
1385 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1386 for kv in kvs {
1387 converter.append_key_values(&kv).unwrap();
1388 }
1389 let part = converter.convert().unwrap();
1390 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1391 encoder.encode_part(&part).unwrap().unwrap()
1392 }
1393
1394 fn check_prune_row_group(
1395 part: &EncodedBulkPart,
1396 predicate: Option<Predicate>,
1397 expected_rows: usize,
1398 ) {
1399 let context = Arc::new(
1400 BulkIterContext::new(
1401 part.metadata.region_metadata.clone(),
1402 None,
1403 predicate,
1404 false,
1405 )
1406 .unwrap(),
1407 );
1408 let mut reader = part
1409 .read(context, None, None)
1410 .unwrap()
1411 .expect("expect at least one row group");
1412 let mut total_rows_read = 0;
1413 for res in reader {
1414 let batch = res.unwrap();
1415 total_rows_read += batch.num_rows();
1416 }
1417 assert_eq!(expected_rows, total_rows_read);
1419 }
1420
1421 #[test]
1422 fn test_prune_row_groups() {
1423 let part = prepare(vec![
1424 ("a", 0, (0, 40), 1),
1425 ("a", 1, (0, 60), 1),
1426 ("b", 0, (0, 100), 2),
1427 ("b", 1, (100, 180), 3),
1428 ("b", 1, (180, 210), 4),
1429 ]);
1430
1431 let context = Arc::new(
1432 BulkIterContext::new(
1433 part.metadata.region_metadata.clone(),
1434 None,
1435 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1436 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1437 )])),
1438 false,
1439 )
1440 .unwrap(),
1441 );
1442 assert!(part.read(context, None, None).unwrap().is_none());
1443
1444 check_prune_row_group(&part, None, 310);
1445
1446 check_prune_row_group(
1447 &part,
1448 Some(Predicate::new(vec![
1449 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1450 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1451 ])),
1452 40,
1453 );
1454
1455 check_prune_row_group(
1456 &part,
1457 Some(Predicate::new(vec![
1458 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1459 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1460 ])),
1461 60,
1462 );
1463
1464 check_prune_row_group(
1465 &part,
1466 Some(Predicate::new(vec![
1467 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1468 ])),
1469 100,
1470 );
1471
1472 check_prune_row_group(
1473 &part,
1474 Some(Predicate::new(vec![
1475 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1476 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1477 ])),
1478 100,
1479 );
1480
1481 check_prune_row_group(
1483 &part,
1484 Some(Predicate::new(vec![
1485 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1486 ])),
1487 1,
1488 );
1489 }
1490
1491 #[test]
1492 fn test_bulk_part_converter_append_and_convert() {
1493 let metadata = metadata_for_test();
1494 let capacity = 100;
1495 let primary_key_codec = build_primary_key_codec(&metadata);
1496 let schema = to_flat_sst_arrow_schema(
1497 &metadata,
1498 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1499 );
1500
1501 let mut converter =
1502 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1503
1504 let key_values1 = build_key_values_with_ts_seq_values(
1505 &metadata,
1506 "key1".to_string(),
1507 1u32,
1508 vec![1000, 2000].into_iter(),
1509 vec![Some(1.0), Some(2.0)].into_iter(),
1510 1,
1511 );
1512
1513 let key_values2 = build_key_values_with_ts_seq_values(
1514 &metadata,
1515 "key2".to_string(),
1516 2u32,
1517 vec![1500].into_iter(),
1518 vec![Some(3.0)].into_iter(),
1519 2,
1520 );
1521
1522 converter.append_key_values(&key_values1).unwrap();
1523 converter.append_key_values(&key_values2).unwrap();
1524
1525 let bulk_part = converter.convert().unwrap();
1526
1527 assert_eq!(bulk_part.num_rows(), 3);
1528 assert_eq!(bulk_part.min_timestamp, 1000);
1529 assert_eq!(bulk_part.max_timestamp, 2000);
1530 assert_eq!(bulk_part.sequence, 2);
1531 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1532
1533 let schema = bulk_part.batch.schema();
1536 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1537 assert_eq!(
1538 field_names,
1539 vec![
1540 "k0",
1541 "k1",
1542 "v0",
1543 "v1",
1544 "ts",
1545 "__primary_key",
1546 "__sequence",
1547 "__op_type"
1548 ]
1549 );
1550 }
1551
1552 #[test]
1553 fn test_bulk_part_converter_sorting() {
1554 let metadata = metadata_for_test();
1555 let capacity = 100;
1556 let primary_key_codec = build_primary_key_codec(&metadata);
1557 let schema = to_flat_sst_arrow_schema(
1558 &metadata,
1559 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1560 );
1561
1562 let mut converter =
1563 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1564
1565 let key_values1 = build_key_values_with_ts_seq_values(
1566 &metadata,
1567 "z_key".to_string(),
1568 3u32,
1569 vec![3000].into_iter(),
1570 vec![Some(3.0)].into_iter(),
1571 3,
1572 );
1573
1574 let key_values2 = build_key_values_with_ts_seq_values(
1575 &metadata,
1576 "a_key".to_string(),
1577 1u32,
1578 vec![1000].into_iter(),
1579 vec![Some(1.0)].into_iter(),
1580 1,
1581 );
1582
1583 let key_values3 = build_key_values_with_ts_seq_values(
1584 &metadata,
1585 "m_key".to_string(),
1586 2u32,
1587 vec![2000].into_iter(),
1588 vec![Some(2.0)].into_iter(),
1589 2,
1590 );
1591
1592 converter.append_key_values(&key_values1).unwrap();
1593 converter.append_key_values(&key_values2).unwrap();
1594 converter.append_key_values(&key_values3).unwrap();
1595
1596 let bulk_part = converter.convert().unwrap();
1597
1598 assert_eq!(bulk_part.num_rows(), 3);
1599
1600 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1601 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1602
1603 let ts_array = ts_column
1604 .as_any()
1605 .downcast_ref::<TimestampMillisecondArray>()
1606 .unwrap();
1607 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1608
1609 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1610 assert_eq!(seq_array.values(), &[1, 2, 3]);
1611
1612 let schema = bulk_part.batch.schema();
1614 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1615 assert_eq!(
1616 field_names,
1617 vec![
1618 "k0",
1619 "k1",
1620 "v0",
1621 "v1",
1622 "ts",
1623 "__primary_key",
1624 "__sequence",
1625 "__op_type"
1626 ]
1627 );
1628 }
1629
1630 #[test]
1631 fn test_bulk_part_converter_empty() {
1632 let metadata = metadata_for_test();
1633 let capacity = 10;
1634 let primary_key_codec = build_primary_key_codec(&metadata);
1635 let schema = to_flat_sst_arrow_schema(
1636 &metadata,
1637 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1638 );
1639
1640 let converter =
1641 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1642
1643 let bulk_part = converter.convert().unwrap();
1644
1645 assert_eq!(bulk_part.num_rows(), 0);
1646 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1647 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1648 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1649
1650 let schema = bulk_part.batch.schema();
1652 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1653 assert_eq!(
1654 field_names,
1655 vec![
1656 "k0",
1657 "k1",
1658 "v0",
1659 "v1",
1660 "ts",
1661 "__primary_key",
1662 "__sequence",
1663 "__op_type"
1664 ]
1665 );
1666 }
1667
1668 #[test]
1669 fn test_bulk_part_converter_without_primary_key_columns() {
1670 let metadata = metadata_for_test();
1671 let primary_key_codec = build_primary_key_codec(&metadata);
1672 let schema = to_flat_sst_arrow_schema(
1673 &metadata,
1674 &FlatSchemaOptions {
1675 raw_pk_columns: false,
1676 string_pk_use_dict: true,
1677 },
1678 );
1679
1680 let capacity = 100;
1681 let mut converter =
1682 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1683
1684 let key_values1 = build_key_values_with_ts_seq_values(
1685 &metadata,
1686 "key1".to_string(),
1687 1u32,
1688 vec![1000, 2000].into_iter(),
1689 vec![Some(1.0), Some(2.0)].into_iter(),
1690 1,
1691 );
1692
1693 let key_values2 = build_key_values_with_ts_seq_values(
1694 &metadata,
1695 "key2".to_string(),
1696 2u32,
1697 vec![1500].into_iter(),
1698 vec![Some(3.0)].into_iter(),
1699 2,
1700 );
1701
1702 converter.append_key_values(&key_values1).unwrap();
1703 converter.append_key_values(&key_values2).unwrap();
1704
1705 let bulk_part = converter.convert().unwrap();
1706
1707 assert_eq!(bulk_part.num_rows(), 3);
1708 assert_eq!(bulk_part.min_timestamp, 1000);
1709 assert_eq!(bulk_part.max_timestamp, 2000);
1710 assert_eq!(bulk_part.sequence, 2);
1711 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1712
1713 let schema = bulk_part.batch.schema();
1715 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1716 assert_eq!(
1717 field_names,
1718 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1719 );
1720 }
1721
1722 #[allow(clippy::too_many_arguments)]
1723 fn build_key_values_with_sparse_encoding(
1724 metadata: &RegionMetadataRef,
1725 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1726 table_id: u32,
1727 tsid: u64,
1728 k0: String,
1729 k1: String,
1730 timestamps: impl Iterator<Item = i64>,
1731 values: impl Iterator<Item = Option<f64>>,
1732 sequence: SequenceNumber,
1733 ) -> KeyValues {
1734 let pk_values = vec![
1736 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1737 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1738 (0, Value::String(k0.clone().into())),
1739 (1, Value::String(k1.clone().into())),
1740 ];
1741 let mut encoded_key = Vec::new();
1742 primary_key_codec
1743 .encode_values(&pk_values, &mut encoded_key)
1744 .unwrap();
1745 assert!(!encoded_key.is_empty());
1746
1747 let column_schema = vec![
1749 api::v1::ColumnSchema {
1750 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1751 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1752 ConcreteDataType::binary_datatype(),
1753 )
1754 .unwrap()
1755 .datatype() as i32,
1756 semantic_type: api::v1::SemanticType::Tag as i32,
1757 ..Default::default()
1758 },
1759 api::v1::ColumnSchema {
1760 column_name: "ts".to_string(),
1761 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1762 ConcreteDataType::timestamp_millisecond_datatype(),
1763 )
1764 .unwrap()
1765 .datatype() as i32,
1766 semantic_type: api::v1::SemanticType::Timestamp as i32,
1767 ..Default::default()
1768 },
1769 api::v1::ColumnSchema {
1770 column_name: "v0".to_string(),
1771 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1772 ConcreteDataType::int64_datatype(),
1773 )
1774 .unwrap()
1775 .datatype() as i32,
1776 semantic_type: api::v1::SemanticType::Field as i32,
1777 ..Default::default()
1778 },
1779 api::v1::ColumnSchema {
1780 column_name: "v1".to_string(),
1781 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1782 ConcreteDataType::float64_datatype(),
1783 )
1784 .unwrap()
1785 .datatype() as i32,
1786 semantic_type: api::v1::SemanticType::Field as i32,
1787 ..Default::default()
1788 },
1789 ];
1790
1791 let rows = timestamps
1792 .zip(values)
1793 .map(|(ts, v)| Row {
1794 values: vec![
1795 api::v1::Value {
1796 value_data: Some(api::v1::value::ValueData::BinaryValue(
1797 encoded_key.clone(),
1798 )),
1799 },
1800 api::v1::Value {
1801 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1802 },
1803 api::v1::Value {
1804 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1805 },
1806 api::v1::Value {
1807 value_data: v.map(api::v1::value::ValueData::F64Value),
1808 },
1809 ],
1810 })
1811 .collect();
1812
1813 let mutation = api::v1::Mutation {
1814 op_type: 1,
1815 sequence,
1816 rows: Some(api::v1::Rows {
1817 schema: column_schema,
1818 rows,
1819 }),
1820 write_hint: Some(WriteHint {
1821 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1822 }),
1823 };
1824 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1825 }
1826
1827 #[test]
1828 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1829 use api::v1::SemanticType;
1830 use datatypes::schema::ColumnSchema;
1831 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1832 use store_api::storage::RegionId;
1833
1834 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1835 builder
1836 .push_column_metadata(ColumnMetadata {
1837 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1838 semantic_type: SemanticType::Tag,
1839 column_id: 0,
1840 })
1841 .push_column_metadata(ColumnMetadata {
1842 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1843 semantic_type: SemanticType::Tag,
1844 column_id: 1,
1845 })
1846 .push_column_metadata(ColumnMetadata {
1847 column_schema: ColumnSchema::new(
1848 "ts",
1849 ConcreteDataType::timestamp_millisecond_datatype(),
1850 false,
1851 ),
1852 semantic_type: SemanticType::Timestamp,
1853 column_id: 2,
1854 })
1855 .push_column_metadata(ColumnMetadata {
1856 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1857 semantic_type: SemanticType::Field,
1858 column_id: 3,
1859 })
1860 .push_column_metadata(ColumnMetadata {
1861 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1862 semantic_type: SemanticType::Field,
1863 column_id: 4,
1864 })
1865 .primary_key(vec![0, 1])
1866 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1867 let metadata = Arc::new(builder.build().unwrap());
1868
1869 let primary_key_codec = build_primary_key_codec(&metadata);
1870 let schema = to_flat_sst_arrow_schema(
1871 &metadata,
1872 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1873 );
1874
1875 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1876 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1877
1878 let capacity = 100;
1879 let mut converter =
1880 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1881
1882 let key_values1 = build_key_values_with_sparse_encoding(
1883 &metadata,
1884 &primary_key_codec,
1885 2048u32, 100u64, "key11".to_string(),
1888 "key21".to_string(),
1889 vec![1000, 2000].into_iter(),
1890 vec![Some(1.0), Some(2.0)].into_iter(),
1891 1,
1892 );
1893
1894 let key_values2 = build_key_values_with_sparse_encoding(
1895 &metadata,
1896 &primary_key_codec,
1897 4096u32, 200u64, "key12".to_string(),
1900 "key22".to_string(),
1901 vec![1500].into_iter(),
1902 vec![Some(3.0)].into_iter(),
1903 2,
1904 );
1905
1906 converter.append_key_values(&key_values1).unwrap();
1907 converter.append_key_values(&key_values2).unwrap();
1908
1909 let bulk_part = converter.convert().unwrap();
1910
1911 assert_eq!(bulk_part.num_rows(), 3);
1912 assert_eq!(bulk_part.min_timestamp, 1000);
1913 assert_eq!(bulk_part.max_timestamp, 2000);
1914 assert_eq!(bulk_part.sequence, 2);
1915 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1916
1917 let schema = bulk_part.batch.schema();
1921 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1922 assert_eq!(
1923 field_names,
1924 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1925 );
1926
1927 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1929 let dict_array = primary_key_column
1930 .as_any()
1931 .downcast_ref::<DictionaryArray<UInt32Type>>()
1932 .unwrap();
1933
1934 assert!(!dict_array.is_empty());
1936 assert_eq!(dict_array.len(), 3); let values = dict_array
1940 .values()
1941 .as_any()
1942 .downcast_ref::<BinaryArray>()
1943 .unwrap();
1944 for i in 0..values.len() {
1945 assert!(
1946 !values.value(i).is_empty(),
1947 "Encoded primary key should not be empty"
1948 );
1949 }
1950 }
1951
1952 #[test]
1953 fn test_convert_bulk_part_empty() {
1954 let metadata = metadata_for_test();
1955 let schema = to_flat_sst_arrow_schema(
1956 &metadata,
1957 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1958 );
1959 let primary_key_codec = build_primary_key_codec(&metadata);
1960
1961 let empty_batch = RecordBatch::new_empty(schema.clone());
1963 let empty_part = BulkPart {
1964 batch: empty_batch,
1965 max_timestamp: 0,
1966 min_timestamp: 0,
1967 sequence: 0,
1968 timestamp_index: 0,
1969 raw_data: None,
1970 };
1971
1972 let result =
1973 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1974 assert!(result.is_none());
1975 }
1976
1977 #[test]
1978 fn test_convert_bulk_part_dense_with_pk_columns() {
1979 let metadata = metadata_for_test();
1980 let primary_key_codec = build_primary_key_codec(&metadata);
1981
1982 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1983 "key1", "key2", "key1",
1984 ]));
1985 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1986 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1987 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1988 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1989
1990 let input_schema = Arc::new(Schema::new(vec![
1991 Field::new("k0", ArrowDataType::Utf8, false),
1992 Field::new("k1", ArrowDataType::UInt32, false),
1993 Field::new("v0", ArrowDataType::Int64, true),
1994 Field::new("v1", ArrowDataType::Float64, true),
1995 Field::new(
1996 "ts",
1997 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1998 false,
1999 ),
2000 ]));
2001
2002 let input_batch = RecordBatch::try_new(
2003 input_schema,
2004 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2005 )
2006 .unwrap();
2007
2008 let part = BulkPart {
2009 batch: input_batch,
2010 max_timestamp: 2000,
2011 min_timestamp: 1000,
2012 sequence: 5,
2013 timestamp_index: 4,
2014 raw_data: None,
2015 };
2016
2017 let output_schema = to_flat_sst_arrow_schema(
2018 &metadata,
2019 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2020 );
2021
2022 let result = convert_bulk_part(
2023 part,
2024 &metadata,
2025 primary_key_codec,
2026 output_schema,
2027 true, )
2029 .unwrap();
2030
2031 let converted = result.unwrap();
2032
2033 assert_eq!(converted.num_rows(), 3);
2034 assert_eq!(converted.max_timestamp, 2000);
2035 assert_eq!(converted.min_timestamp, 1000);
2036 assert_eq!(converted.sequence, 5);
2037
2038 let schema = converted.batch.schema();
2039 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2040 assert_eq!(
2041 field_names,
2042 vec![
2043 "k0",
2044 "k1",
2045 "v0",
2046 "v1",
2047 "ts",
2048 "__primary_key",
2049 "__sequence",
2050 "__op_type"
2051 ]
2052 );
2053
2054 let k0_col = converted.batch.column_by_name("k0").unwrap();
2055 assert!(matches!(
2056 k0_col.data_type(),
2057 ArrowDataType::Dictionary(_, _)
2058 ));
2059
2060 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2061 let dict_array = pk_col
2062 .as_any()
2063 .downcast_ref::<DictionaryArray<UInt32Type>>()
2064 .unwrap();
2065 let keys = dict_array.keys();
2066
2067 assert_eq!(keys.len(), 3);
2068 }
2069
2070 #[test]
2071 fn test_convert_bulk_part_dense_without_pk_columns() {
2072 let metadata = metadata_for_test();
2073 let primary_key_codec = build_primary_key_codec(&metadata);
2074
2075 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2077 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2078 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2079 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2080 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2081
2082 let input_schema = Arc::new(Schema::new(vec![
2083 Field::new("k0", ArrowDataType::Utf8, false),
2084 Field::new("k1", ArrowDataType::UInt32, false),
2085 Field::new("v0", ArrowDataType::Int64, true),
2086 Field::new("v1", ArrowDataType::Float64, true),
2087 Field::new(
2088 "ts",
2089 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2090 false,
2091 ),
2092 ]));
2093
2094 let input_batch = RecordBatch::try_new(
2095 input_schema,
2096 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2097 )
2098 .unwrap();
2099
2100 let part = BulkPart {
2101 batch: input_batch,
2102 max_timestamp: 2000,
2103 min_timestamp: 1000,
2104 sequence: 3,
2105 timestamp_index: 4,
2106 raw_data: None,
2107 };
2108
2109 let output_schema = to_flat_sst_arrow_schema(
2110 &metadata,
2111 &FlatSchemaOptions {
2112 raw_pk_columns: false,
2113 string_pk_use_dict: true,
2114 },
2115 );
2116
2117 let result = convert_bulk_part(
2118 part,
2119 &metadata,
2120 primary_key_codec,
2121 output_schema,
2122 false, )
2124 .unwrap();
2125
2126 let converted = result.unwrap();
2127
2128 assert_eq!(converted.num_rows(), 2);
2129 assert_eq!(converted.max_timestamp, 2000);
2130 assert_eq!(converted.min_timestamp, 1000);
2131 assert_eq!(converted.sequence, 3);
2132
2133 let schema = converted.batch.schema();
2135 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2136 assert_eq!(
2137 field_names,
2138 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2139 );
2140
2141 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2143 assert!(matches!(
2144 pk_col.data_type(),
2145 ArrowDataType::Dictionary(_, _)
2146 ));
2147 }
2148
2149 #[test]
2150 fn test_convert_bulk_part_sparse_encoding() {
2151 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2152 builder
2153 .push_column_metadata(ColumnMetadata {
2154 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2155 semantic_type: SemanticType::Tag,
2156 column_id: 0,
2157 })
2158 .push_column_metadata(ColumnMetadata {
2159 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2160 semantic_type: SemanticType::Tag,
2161 column_id: 1,
2162 })
2163 .push_column_metadata(ColumnMetadata {
2164 column_schema: ColumnSchema::new(
2165 "ts",
2166 ConcreteDataType::timestamp_millisecond_datatype(),
2167 false,
2168 ),
2169 semantic_type: SemanticType::Timestamp,
2170 column_id: 2,
2171 })
2172 .push_column_metadata(ColumnMetadata {
2173 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2174 semantic_type: SemanticType::Field,
2175 column_id: 3,
2176 })
2177 .push_column_metadata(ColumnMetadata {
2178 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2179 semantic_type: SemanticType::Field,
2180 column_id: 4,
2181 })
2182 .primary_key(vec![0, 1])
2183 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2184 let metadata = Arc::new(builder.build().unwrap());
2185
2186 let primary_key_codec = build_primary_key_codec(&metadata);
2187
2188 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2190 b"encoded_key_1".as_slice(),
2191 b"encoded_key_2".as_slice(),
2192 ]));
2193 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2194 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2195 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2196
2197 let input_schema = Arc::new(Schema::new(vec![
2198 Field::new("__primary_key", ArrowDataType::Binary, false),
2199 Field::new("v0", ArrowDataType::Int64, true),
2200 Field::new("v1", ArrowDataType::Float64, true),
2201 Field::new(
2202 "ts",
2203 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2204 false,
2205 ),
2206 ]));
2207
2208 let input_batch =
2209 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2210 .unwrap();
2211
2212 let part = BulkPart {
2213 batch: input_batch,
2214 max_timestamp: 2000,
2215 min_timestamp: 1000,
2216 sequence: 7,
2217 timestamp_index: 3,
2218 raw_data: None,
2219 };
2220
2221 let output_schema = to_flat_sst_arrow_schema(
2222 &metadata,
2223 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2224 );
2225
2226 let result = convert_bulk_part(
2227 part,
2228 &metadata,
2229 primary_key_codec,
2230 output_schema,
2231 true, )
2233 .unwrap();
2234
2235 let converted = result.unwrap();
2236
2237 assert_eq!(converted.num_rows(), 2);
2238 assert_eq!(converted.max_timestamp, 2000);
2239 assert_eq!(converted.min_timestamp, 1000);
2240 assert_eq!(converted.sequence, 7);
2241
2242 let schema = converted.batch.schema();
2244 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2245 assert_eq!(
2246 field_names,
2247 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2248 );
2249
2250 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2252 assert!(matches!(
2253 pk_col.data_type(),
2254 ArrowDataType::Dictionary(_, _)
2255 ));
2256 }
2257
2258 #[test]
2259 fn test_convert_bulk_part_sorting_with_multiple_series() {
2260 let metadata = metadata_for_test();
2261 let primary_key_codec = build_primary_key_codec(&metadata);
2262
2263 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2265 "series_b", "series_a", "series_b", "series_a",
2266 ]));
2267 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2268 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2269 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2270 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2271 2000, 1000, 4000, 3000,
2272 ]));
2273
2274 let input_schema = Arc::new(Schema::new(vec![
2275 Field::new("k0", ArrowDataType::Utf8, false),
2276 Field::new("k1", ArrowDataType::UInt32, false),
2277 Field::new("v0", ArrowDataType::Int64, true),
2278 Field::new("v1", ArrowDataType::Float64, true),
2279 Field::new(
2280 "ts",
2281 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2282 false,
2283 ),
2284 ]));
2285
2286 let input_batch = RecordBatch::try_new(
2287 input_schema,
2288 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2289 )
2290 .unwrap();
2291
2292 let part = BulkPart {
2293 batch: input_batch,
2294 max_timestamp: 4000,
2295 min_timestamp: 1000,
2296 sequence: 10,
2297 timestamp_index: 4,
2298 raw_data: None,
2299 };
2300
2301 let output_schema = to_flat_sst_arrow_schema(
2302 &metadata,
2303 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2304 );
2305
2306 let result =
2307 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2308
2309 let converted = result.unwrap();
2310
2311 assert_eq!(converted.num_rows(), 4);
2312
2313 let ts_col = converted.batch.column(converted.timestamp_index);
2315 let ts_array = ts_col
2316 .as_any()
2317 .downcast_ref::<TimestampMillisecondArray>()
2318 .unwrap();
2319
2320 let timestamps: Vec<i64> = ts_array.values().to_vec();
2324 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2325 }
2326}