1use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64 InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80#[derive(Clone)]
82pub struct BulkPart {
83 pub batch: RecordBatch,
84 pub max_timestamp: i64,
85 pub min_timestamp: i64,
86 pub sequence: u64,
87 pub timestamp_index: usize,
88 pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92 type Error = error::Error;
93
94 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95 match value.body.expect("Entry payload should be present") {
96 Body::ArrowIpc(ipc) => {
97 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 let batch = decoder
100 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101 .context(error::ConvertBulkWalEntrySnafu)?;
102 Ok(Self {
103 batch,
104 max_timestamp: value.max_ts,
105 min_timestamp: value.min_ts,
106 sequence: value.sequence,
107 timestamp_index: value.timestamp_index as usize,
108 raw_data: Some(ipc),
109 })
110 }
111 }
112 }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116 type Error = error::Error;
117
118 fn try_from(value: &BulkPart) -> Result<Self> {
119 if let Some(ipc) = &value.raw_data {
120 Ok(BulkWalEntry {
121 sequence: value.sequence,
122 max_ts: value.max_timestamp,
123 min_ts: value.min_timestamp,
124 timestamp_index: value.timestamp_index as u32,
125 body: Some(Body::ArrowIpc(ipc.clone())),
126 })
127 } else {
128 let mut encoder = FlightEncoder::default();
129 let schema_bytes = encoder
130 .encode_schema(value.batch.schema().as_ref())
131 .data_header;
132 let [rb_data] = encoder
133 .encode(FlightMessage::RecordBatch(value.batch.clone()))
134 .try_into()
135 .map_err(|_| {
136 error::UnsupportedOperationSnafu {
137 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138 }
139 .build()
140 })?;
141 Ok(BulkWalEntry {
142 sequence: value.sequence,
143 max_ts: value.max_timestamp,
144 min_ts: value.min_timestamp,
145 timestamp_index: value.timestamp_index as u32,
146 body: Some(Body::ArrowIpc(ArrowIpc {
147 schema: schema_bytes,
148 data_header: rb_data.data_header,
149 payload: rb_data.data_body,
150 })),
151 })
152 }
153 }
154}
155
156impl BulkPart {
157 pub(crate) fn estimated_size(&self) -> usize {
158 record_batch_estimated_size(&self.batch)
159 }
160
161 pub fn estimated_series_count(&self) -> usize {
164 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165 let pk_column = self.batch.column(pk_column_idx);
166 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167 dict_array.values().len()
168 } else {
169 0
170 }
171 }
172
173 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
183 let batch_schema = self.batch.schema();
185 let batch_columns: HashSet<_> = batch_schema
186 .fields()
187 .iter()
188 .map(|f| f.name().as_str())
189 .collect();
190
191 let mut columns_to_fill = Vec::new();
193 for column_meta in ®ion_metadata.column_metadatas {
194 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
197 columns_to_fill.push(column_meta);
198 }
199 }
200
201 if columns_to_fill.is_empty() {
202 return Ok(());
203 }
204
205 let num_rows = self.batch.num_rows();
206
207 let mut new_columns = Vec::new();
208 let mut new_fields = Vec::new();
209
210 new_fields.extend(batch_schema.fields().iter().cloned());
212 new_columns.extend_from_slice(self.batch.columns());
213
214 let region_id = region_metadata.region_id;
215 for column_meta in columns_to_fill {
217 let default_vector = column_meta
218 .column_schema
219 .create_default_vector(num_rows)
220 .context(CreateDefaultSnafu {
221 region_id,
222 column: &column_meta.column_schema.name,
223 })?
224 .with_context(|| InvalidRequestSnafu {
225 region_id,
226 reason: format!(
227 "column {} does not have default value",
228 column_meta.column_schema.name
229 ),
230 })?;
231 let arrow_array = default_vector.to_arrow_array();
232 column_meta.column_schema.data_type.as_arrow_type();
233
234 new_fields.push(Arc::new(Field::new(
235 column_meta.column_schema.name.clone(),
236 column_meta.column_schema.data_type.as_arrow_type(),
237 column_meta.column_schema.is_nullable(),
238 )));
239 new_columns.push(arrow_array);
240 }
241
242 let new_schema = Arc::new(Schema::new(new_fields));
244 let new_batch =
245 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
246
247 self.batch = new_batch;
249
250 Ok(())
251 }
252
253 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
255 let vectors = region_metadata
256 .schema
257 .column_schemas()
258 .iter()
259 .map(|col| match self.batch.column_by_name(&col.name) {
260 None => Ok(None),
261 Some(col) => Helper::try_into_vector(col).map(Some),
262 })
263 .collect::<datatypes::error::Result<Vec<_>>>()
264 .context(error::ComputeVectorSnafu)?;
265
266 let rows = (0..self.num_rows())
267 .map(|row_idx| {
268 let values = (0..self.batch.num_columns())
269 .map(|col_idx| {
270 if let Some(v) = &vectors[col_idx] {
271 to_grpc_value(v.get(row_idx))
272 } else {
273 api::v1::Value { value_data: None }
274 }
275 })
276 .collect::<Vec<_>>();
277 api::v1::Row { values }
278 })
279 .collect::<Vec<_>>();
280
281 let schema = region_metadata
282 .column_metadatas
283 .iter()
284 .map(|c| {
285 let data_type_wrapper =
286 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
287 Ok(api::v1::ColumnSchema {
288 column_name: c.column_schema.name.clone(),
289 datatype: data_type_wrapper.datatype() as i32,
290 semantic_type: c.semantic_type as i32,
291 ..Default::default()
292 })
293 })
294 .collect::<api::error::Result<Vec<_>>>()
295 .context(error::ConvertColumnDataTypeSnafu {
296 reason: "failed to convert region metadata to column schema",
297 })?;
298
299 let rows = api::v1::Rows { schema, rows };
300
301 Ok(Mutation {
302 op_type: OpType::Put as i32,
303 sequence: self.sequence,
304 rows: Some(rows),
305 write_hint: None,
306 })
307 }
308
309 pub fn timestamps(&self) -> &ArrayRef {
310 self.batch.column(self.timestamp_index)
311 }
312
313 pub fn num_rows(&self) -> usize {
314 self.batch.num_rows()
315 }
316}
317
318pub struct UnorderedPart {
321 parts: Vec<BulkPart>,
323 total_rows: usize,
325 min_timestamp: i64,
327 max_timestamp: i64,
329 max_sequence: u64,
331 threshold: usize,
333 compact_threshold: usize,
335}
336
337impl Default for UnorderedPart {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343impl UnorderedPart {
344 pub fn new() -> Self {
346 Self {
347 parts: Vec::new(),
348 total_rows: 0,
349 min_timestamp: i64::MAX,
350 max_timestamp: i64::MIN,
351 max_sequence: 0,
352 threshold: 1024,
353 compact_threshold: 4096,
354 }
355 }
356
357 pub fn set_threshold(&mut self, threshold: usize) {
359 self.threshold = threshold;
360 }
361
362 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
364 self.compact_threshold = compact_threshold;
365 }
366
367 pub fn threshold(&self) -> usize {
369 self.threshold
370 }
371
372 pub fn compact_threshold(&self) -> usize {
374 self.compact_threshold
375 }
376
377 pub fn should_accept(&self, num_rows: usize) -> bool {
379 num_rows < self.threshold
380 }
381
382 pub fn should_compact(&self) -> bool {
384 self.total_rows >= self.compact_threshold
385 }
386
387 pub fn push(&mut self, part: BulkPart) {
389 self.total_rows += part.num_rows();
390 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
391 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
392 self.max_sequence = self.max_sequence.max(part.sequence);
393 self.parts.push(part);
394 }
395
396 pub fn num_rows(&self) -> usize {
398 self.total_rows
399 }
400
401 pub fn is_empty(&self) -> bool {
403 self.parts.is_empty()
404 }
405
406 pub fn num_parts(&self) -> usize {
408 self.parts.len()
409 }
410
411 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
414 if self.parts.is_empty() {
415 return Ok(None);
416 }
417
418 if self.parts.len() == 1 {
419 return Ok(Some(self.parts[0].batch.clone()));
421 }
422
423 let schema = self.parts[0].batch.schema();
425
426 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
428 let concatenated =
429 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
430
431 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
433
434 Ok(Some(sorted_batch))
435 }
436
437 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
440 let Some(sorted_batch) = self.concat_and_sort()? else {
441 return Ok(None);
442 };
443
444 let timestamp_index = self.parts[0].timestamp_index;
445
446 Ok(Some(BulkPart {
447 batch: sorted_batch,
448 max_timestamp: self.max_timestamp,
449 min_timestamp: self.min_timestamp,
450 sequence: self.max_sequence,
451 timestamp_index,
452 raw_data: None,
453 }))
454 }
455
456 pub fn clear(&mut self) {
458 self.parts.clear();
459 self.total_rows = 0;
460 self.min_timestamp = i64::MAX;
461 self.max_timestamp = i64::MIN;
462 self.max_sequence = 0;
463 }
464}
465
466pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
468 batch
469 .columns()
470 .iter()
471 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
473 .sum()
474}
475
476enum PrimaryKeyColumnBuilder {
478 StringDict(StringDictionaryBuilder<UInt32Type>),
480 Vector(Box<dyn MutableVector>),
482}
483
484impl PrimaryKeyColumnBuilder {
485 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
487 match self {
488 PrimaryKeyColumnBuilder::StringDict(builder) => {
489 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
490 builder.append_value(s);
492 } else {
493 builder.append_null();
494 }
495 }
496 PrimaryKeyColumnBuilder::Vector(builder) => {
497 builder.push_value_ref(&value);
498 }
499 }
500 Ok(())
501 }
502
503 fn into_arrow_array(self) -> ArrayRef {
505 match self {
506 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
507 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
508 }
509 }
510}
511
512pub struct BulkPartConverter {
514 region_metadata: RegionMetadataRef,
516 schema: SchemaRef,
518 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
520 key_buf: Vec<u8>,
522 key_array_builder: PrimaryKeyArrayBuilder,
524 value_builder: ValueBuilder,
526 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
529
530 max_ts: i64,
532 min_ts: i64,
534 max_sequence: SequenceNumber,
536}
537
538impl BulkPartConverter {
539 pub fn new(
544 region_metadata: &RegionMetadataRef,
545 schema: SchemaRef,
546 capacity: usize,
547 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
548 store_primary_key_columns: bool,
549 ) -> Self {
550 debug_assert_eq!(
551 region_metadata.primary_key_encoding,
552 primary_key_codec.encoding()
553 );
554
555 let primary_key_column_builders = if store_primary_key_columns
556 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
557 {
558 new_primary_key_column_builders(region_metadata, capacity)
559 } else {
560 Vec::new()
561 };
562
563 Self {
564 region_metadata: region_metadata.clone(),
565 schema,
566 primary_key_codec,
567 key_buf: Vec::new(),
568 key_array_builder: PrimaryKeyArrayBuilder::new(),
569 value_builder: ValueBuilder::new(region_metadata, capacity),
570 primary_key_column_builders,
571 min_ts: i64::MAX,
572 max_ts: i64::MIN,
573 max_sequence: SequenceNumber::MIN,
574 }
575 }
576
577 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
579 for kv in key_values.iter() {
580 self.append_key_value(&kv)?;
581 }
582
583 Ok(())
584 }
585
586 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
590 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
592 let mut primary_keys = kv.primary_keys();
595 if let Some(encoded) = primary_keys
596 .next()
597 .context(ColumnNotFoundSnafu {
598 column: PRIMARY_KEY_COLUMN_NAME,
599 })?
600 .try_into_binary()
601 .context(DataTypeMismatchSnafu)?
602 {
603 self.key_array_builder
604 .append(encoded)
605 .context(ComputeArrowSnafu)?;
606 } else {
607 self.key_array_builder
608 .append("")
609 .context(ComputeArrowSnafu)?;
610 }
611 } else {
612 self.key_buf.clear();
614 self.primary_key_codec
615 .encode_key_value(kv, &mut self.key_buf)
616 .context(EncodeSnafu)?;
617 self.key_array_builder
618 .append(&self.key_buf)
619 .context(ComputeArrowSnafu)?;
620 };
621
622 if !self.primary_key_column_builders.is_empty() {
624 for (builder, pk_value) in self
625 .primary_key_column_builders
626 .iter_mut()
627 .zip(kv.primary_keys())
628 {
629 builder.push_value_ref(pk_value)?;
630 }
631 }
632
633 self.value_builder.push(
635 kv.timestamp(),
636 kv.sequence(),
637 kv.op_type() as u8,
638 kv.fields(),
639 );
640
641 let ts = kv
644 .timestamp()
645 .try_into_timestamp()
646 .unwrap()
647 .unwrap()
648 .value();
649 self.min_ts = self.min_ts.min(ts);
650 self.max_ts = self.max_ts.max(ts);
651 self.max_sequence = self.max_sequence.max(kv.sequence());
652
653 Ok(())
654 }
655
656 pub fn convert(mut self) -> Result<BulkPart> {
660 let values = Values::from(self.value_builder);
661 let mut columns =
662 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
663
664 for builder in self.primary_key_column_builders {
666 columns.push(builder.into_arrow_array());
667 }
668 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
670 let timestamp_index = columns.len();
672 columns.push(values.timestamp.to_arrow_array());
673 let pk_array = self.key_array_builder.finish();
675 columns.push(Arc::new(pk_array));
676 columns.push(values.sequence.to_arrow_array());
678 columns.push(values.op_type.to_arrow_array());
679
680 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
681 let batch = sort_primary_key_record_batch(&batch)?;
683
684 Ok(BulkPart {
685 batch,
686 max_timestamp: self.max_ts,
687 min_timestamp: self.min_ts,
688 sequence: self.max_sequence,
689 timestamp_index,
690 raw_data: None,
691 })
692 }
693}
694
695fn new_primary_key_column_builders(
696 metadata: &RegionMetadata,
697 capacity: usize,
698) -> Vec<PrimaryKeyColumnBuilder> {
699 metadata
700 .primary_key_columns()
701 .map(|col| {
702 if col.column_schema.data_type.is_string() {
703 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
704 capacity,
705 INIT_DICT_VALUE_CAPACITY,
706 capacity,
707 ))
708 } else {
709 PrimaryKeyColumnBuilder::Vector(
710 col.column_schema.data_type.create_mutable_vector(capacity),
711 )
712 }
713 })
714 .collect()
715}
716
717pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
719 let total_columns = batch.num_columns();
720 let sort_columns = vec![
721 SortColumn {
723 values: batch.column(total_columns - 3).clone(),
724 options: Some(SortOptions {
725 descending: false,
726 nulls_first: true,
727 }),
728 },
729 SortColumn {
731 values: batch.column(total_columns - 4).clone(),
732 options: Some(SortOptions {
733 descending: false,
734 nulls_first: true,
735 }),
736 },
737 SortColumn {
739 values: batch.column(total_columns - 2).clone(),
740 options: Some(SortOptions {
741 descending: true,
742 nulls_first: true,
743 }),
744 },
745 ];
746
747 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
748 .context(ComputeArrowSnafu)?;
749
750 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
751}
752
753pub fn convert_bulk_part(
779 part: BulkPart,
780 region_metadata: &RegionMetadataRef,
781 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
782 schema: SchemaRef,
783 store_primary_key_columns: bool,
784) -> Result<Option<BulkPart>> {
785 if part.num_rows() == 0 {
786 return Ok(None);
787 }
788
789 let num_rows = part.num_rows();
790 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
791
792 let input_schema = part.batch.schema();
794 let column_indices: HashMap<&str, usize> = input_schema
795 .fields()
796 .iter()
797 .enumerate()
798 .map(|(idx, field)| (field.name().as_str(), idx))
799 .collect();
800
801 let mut output_columns = Vec::new();
803
804 let pk_array = if is_sparse {
806 None
809 } else {
810 let pk_vectors: Result<Vec<_>> = region_metadata
812 .primary_key_columns()
813 .map(|col_meta| {
814 let col_idx = column_indices
815 .get(col_meta.column_schema.name.as_str())
816 .context(ColumnNotFoundSnafu {
817 column: &col_meta.column_schema.name,
818 })?;
819 let col = part.batch.column(*col_idx);
820 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
821 })
822 .collect();
823 let pk_vectors = pk_vectors?;
824
825 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
826 let mut encode_buf = Vec::new();
827
828 for row_idx in 0..num_rows {
829 encode_buf.clear();
830
831 let pk_values_with_ids: Vec<_> = region_metadata
833 .primary_key
834 .iter()
835 .zip(pk_vectors.iter())
836 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
837 .collect();
838
839 primary_key_codec
841 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
842 .context(EncodeSnafu)?;
843
844 key_array_builder
845 .append(&encode_buf)
846 .context(ComputeArrowSnafu)?;
847 }
848
849 Some(key_array_builder.finish())
850 };
851
852 if store_primary_key_columns && !is_sparse {
854 for col_meta in region_metadata.primary_key_columns() {
855 let col_idx = column_indices
856 .get(col_meta.column_schema.name.as_str())
857 .context(ColumnNotFoundSnafu {
858 column: &col_meta.column_schema.name,
859 })?;
860 let col = part.batch.column(*col_idx);
861
862 let col = if col_meta.column_schema.data_type.is_string() {
864 let target_type = ArrowDataType::Dictionary(
865 Box::new(ArrowDataType::UInt32),
866 Box::new(ArrowDataType::Utf8),
867 );
868 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
869 } else {
870 col.clone()
871 };
872 output_columns.push(col);
873 }
874 }
875
876 for col_meta in region_metadata.field_columns() {
878 let col_idx = column_indices
879 .get(col_meta.column_schema.name.as_str())
880 .context(ColumnNotFoundSnafu {
881 column: &col_meta.column_schema.name,
882 })?;
883 output_columns.push(part.batch.column(*col_idx).clone());
884 }
885
886 let new_timestamp_index = output_columns.len();
888 let ts_col_idx = column_indices
889 .get(
890 region_metadata
891 .time_index_column()
892 .column_schema
893 .name
894 .as_str(),
895 )
896 .context(ColumnNotFoundSnafu {
897 column: ®ion_metadata.time_index_column().column_schema.name,
898 })?;
899 output_columns.push(part.batch.column(*ts_col_idx).clone());
900
901 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
903 Arc::new(pk_dict_array) as ArrayRef
904 } else {
905 let pk_col_idx =
906 column_indices
907 .get(PRIMARY_KEY_COLUMN_NAME)
908 .context(ColumnNotFoundSnafu {
909 column: PRIMARY_KEY_COLUMN_NAME,
910 })?;
911 let col = part.batch.column(*pk_col_idx);
912
913 let target_type = ArrowDataType::Dictionary(
915 Box::new(ArrowDataType::UInt32),
916 Box::new(ArrowDataType::Binary),
917 );
918 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
919 };
920 output_columns.push(pk_dictionary);
921
922 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
923 output_columns.push(Arc::new(sequence_array) as ArrayRef);
924
925 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
926 output_columns.push(Arc::new(op_type_array) as ArrayRef);
927
928 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
929
930 let sorted_batch = sort_primary_key_record_batch(&batch)?;
932
933 Ok(Some(BulkPart {
934 batch: sorted_batch,
935 max_timestamp: part.max_timestamp,
936 min_timestamp: part.min_timestamp,
937 sequence: part.sequence,
938 timestamp_index: new_timestamp_index,
939 raw_data: None,
940 }))
941}
942
943#[derive(Debug, Clone)]
944pub struct EncodedBulkPart {
945 data: Bytes,
946 metadata: BulkPartMeta,
947}
948
949impl EncodedBulkPart {
950 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
951 Self { data, metadata }
952 }
953
954 pub(crate) fn metadata(&self) -> &BulkPartMeta {
955 &self.metadata
956 }
957
958 pub(crate) fn size_bytes(&self) -> usize {
960 self.data.len()
961 }
962
963 pub(crate) fn data(&self) -> &Bytes {
965 &self.data
966 }
967
968 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
976 let unit = self.metadata.region_metadata.time_index_type().unit();
977 let max_row_group_uncompressed_size: u64 = self
978 .metadata
979 .parquet_metadata
980 .row_groups()
981 .iter()
982 .map(|rg| {
983 rg.columns()
984 .iter()
985 .map(|c| c.uncompressed_size() as u64)
986 .sum::<u64>()
987 })
988 .max()
989 .unwrap_or(0);
990 SstInfo {
991 file_id,
992 time_range: (
993 Timestamp::new(self.metadata.min_timestamp, unit),
994 Timestamp::new(self.metadata.max_timestamp, unit),
995 ),
996 file_size: self.data.len() as u64,
997 max_row_group_uncompressed_size,
998 num_rows: self.metadata.num_rows,
999 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1000 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1001 index_metadata: IndexOutput::default(),
1002 num_series: self.metadata.num_series,
1003 }
1004 }
1005
1006 pub(crate) fn read(
1007 &self,
1008 context: BulkIterContextRef,
1009 sequence: Option<SequenceRange>,
1010 mem_scan_metrics: Option<MemScanMetrics>,
1011 ) -> Result<Option<BoxedRecordBatchIterator>> {
1012 let skip_fields_for_pruning =
1014 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1015
1016 let row_groups_to_read =
1018 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1019
1020 if row_groups_to_read.is_empty() {
1021 return Ok(None);
1023 }
1024
1025 let iter = EncodedBulkPartIter::try_new(
1026 self,
1027 context,
1028 row_groups_to_read,
1029 sequence,
1030 mem_scan_metrics,
1031 )?;
1032 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1033 }
1034
1035 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1037 match pre_filter_mode {
1038 PreFilterMode::All => false,
1039 PreFilterMode::SkipFields => true,
1040 PreFilterMode::SkipFieldsOnDelete => {
1041 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1043 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1044 })
1045 }
1046 }
1047 }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub struct BulkPartMeta {
1052 pub num_rows: usize,
1054 pub max_timestamp: i64,
1056 pub min_timestamp: i64,
1058 pub parquet_metadata: Arc<ParquetMetaData>,
1060 pub region_metadata: RegionMetadataRef,
1062 pub num_series: u64,
1064}
1065
1066#[derive(Default, Debug)]
1068pub struct BulkPartEncodeMetrics {
1069 pub iter_cost: Duration,
1071 pub write_cost: Duration,
1073 pub raw_size: usize,
1075 pub encoded_size: usize,
1077 pub num_rows: usize,
1079}
1080
1081pub struct BulkPartEncoder {
1082 metadata: RegionMetadataRef,
1083 row_group_size: usize,
1084 writer_props: Option<WriterProperties>,
1085}
1086
1087impl BulkPartEncoder {
1088 pub(crate) fn new(
1089 metadata: RegionMetadataRef,
1090 row_group_size: usize,
1091 ) -> Result<BulkPartEncoder> {
1092 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1094 let key_value_meta =
1095 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1096
1097 let writer_props = Some(
1099 WriterProperties::builder()
1100 .set_key_value_metadata(Some(vec![key_value_meta]))
1101 .set_write_batch_size(row_group_size)
1102 .set_max_row_group_size(row_group_size)
1103 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1104 .set_column_index_truncate_length(None)
1105 .set_statistics_truncate_length(None)
1106 .build(),
1107 );
1108
1109 Ok(Self {
1110 metadata,
1111 row_group_size,
1112 writer_props,
1113 })
1114 }
1115}
1116
1117impl BulkPartEncoder {
1118 pub fn encode_record_batch_iter(
1120 &self,
1121 iter: BoxedRecordBatchIterator,
1122 arrow_schema: SchemaRef,
1123 min_timestamp: i64,
1124 max_timestamp: i64,
1125 metrics: &mut BulkPartEncodeMetrics,
1126 ) -> Result<Option<EncodedBulkPart>> {
1127 let mut buf = Vec::with_capacity(4096);
1128 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1129 .context(EncodeMemtableSnafu)?;
1130 let mut total_rows = 0;
1131 let mut series_estimator = SeriesEstimator::default();
1132
1133 let mut iter_start = Instant::now();
1135 for batch_result in iter {
1136 metrics.iter_cost += iter_start.elapsed();
1137 let batch = batch_result?;
1138 if batch.num_rows() == 0 {
1139 continue;
1140 }
1141
1142 series_estimator.update_flat(&batch);
1143 metrics.raw_size += record_batch_estimated_size(&batch);
1144 let write_start = Instant::now();
1145 writer.write(&batch).context(EncodeMemtableSnafu)?;
1146 metrics.write_cost += write_start.elapsed();
1147 total_rows += batch.num_rows();
1148 iter_start = Instant::now();
1149 }
1150 metrics.iter_cost += iter_start.elapsed();
1151 iter_start = Instant::now();
1152
1153 if total_rows == 0 {
1154 return Ok(None);
1155 }
1156
1157 let close_start = Instant::now();
1158 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1159 metrics.write_cost += close_start.elapsed();
1160 metrics.encoded_size += buf.len();
1161 metrics.num_rows += total_rows;
1162
1163 let buf = Bytes::from(buf);
1164 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1165 let num_series = series_estimator.finish();
1166
1167 Ok(Some(EncodedBulkPart {
1168 data: buf,
1169 metadata: BulkPartMeta {
1170 num_rows: total_rows,
1171 max_timestamp,
1172 min_timestamp,
1173 parquet_metadata,
1174 region_metadata: self.metadata.clone(),
1175 num_series,
1176 },
1177 }))
1178 }
1179
1180 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1182 if part.batch.num_rows() == 0 {
1183 return Ok(None);
1184 }
1185
1186 let mut buf = Vec::with_capacity(4096);
1187 let arrow_schema = part.batch.schema();
1188
1189 let file_metadata = {
1190 let mut writer =
1191 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1192 .context(EncodeMemtableSnafu)?;
1193 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1194 writer.finish().context(EncodeMemtableSnafu)?
1195 };
1196
1197 let buf = Bytes::from(buf);
1198 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1199
1200 Ok(Some(EncodedBulkPart {
1201 data: buf,
1202 metadata: BulkPartMeta {
1203 num_rows: part.batch.num_rows(),
1204 max_timestamp: part.max_timestamp,
1205 min_timestamp: part.min_timestamp,
1206 parquet_metadata,
1207 region_metadata: self.metadata.clone(),
1208 num_series: part.estimated_series_count() as u64,
1209 },
1210 }))
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use api::v1::{Row, SemanticType, WriteHint};
1217 use datafusion_common::ScalarValue;
1218 use datatypes::arrow::array::Float64Array;
1219 use datatypes::prelude::{ConcreteDataType, Value};
1220 use datatypes::schema::ColumnSchema;
1221 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1222 use store_api::storage::RegionId;
1223 use store_api::storage::consts::ReservedColumnId;
1224
1225 use super::*;
1226 use crate::memtable::bulk::context::BulkIterContext;
1227 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1228 use crate::test_util::memtable_util::{
1229 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1230 };
1231
1232 struct MutationInput<'a> {
1233 k0: &'a str,
1234 k1: u32,
1235 timestamps: &'a [i64],
1236 v1: &'a [Option<f64>],
1237 sequence: u64,
1238 }
1239
1240 #[derive(Debug, PartialOrd, PartialEq)]
1241 struct BatchOutput<'a> {
1242 pk_values: &'a [Value],
1243 timestamps: &'a [i64],
1244 v1: &'a [Option<f64>],
1245 }
1246
1247 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1248 let metadata = metadata_for_test();
1249 let kvs = input
1250 .iter()
1251 .map(|m| {
1252 build_key_values_with_ts_seq_values(
1253 &metadata,
1254 m.k0.to_string(),
1255 m.k1,
1256 m.timestamps.iter().copied(),
1257 m.v1.iter().copied(),
1258 m.sequence,
1259 )
1260 })
1261 .collect::<Vec<_>>();
1262 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1263 let primary_key_codec = build_primary_key_codec(&metadata);
1264 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1265 for kv in kvs {
1266 converter.append_key_values(&kv).unwrap();
1267 }
1268 let part = converter.convert().unwrap();
1269 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1270 encoder.encode_part(&part).unwrap().unwrap()
1271 }
1272
1273 #[test]
1274 fn test_write_and_read_part_projection() {
1275 let part = encode(&[
1276 MutationInput {
1277 k0: "a",
1278 k1: 0,
1279 timestamps: &[1],
1280 v1: &[Some(0.1)],
1281 sequence: 0,
1282 },
1283 MutationInput {
1284 k0: "b",
1285 k1: 0,
1286 timestamps: &[1],
1287 v1: &[Some(0.0)],
1288 sequence: 0,
1289 },
1290 MutationInput {
1291 k0: "a",
1292 k1: 0,
1293 timestamps: &[2],
1294 v1: &[Some(0.2)],
1295 sequence: 1,
1296 },
1297 ]);
1298
1299 let projection = &[4u32];
1300 let mut reader = part
1301 .read(
1302 Arc::new(
1303 BulkIterContext::new(
1304 part.metadata.region_metadata.clone(),
1305 Some(projection.as_slice()),
1306 None,
1307 false,
1308 )
1309 .unwrap(),
1310 ),
1311 None,
1312 None,
1313 )
1314 .unwrap()
1315 .expect("expect at least one row group");
1316
1317 let mut total_rows_read = 0;
1318 let mut field: Vec<f64> = vec![];
1319 for res in reader {
1320 let batch = res.unwrap();
1321 assert_eq!(5, batch.num_columns());
1322 field.extend_from_slice(
1323 batch
1324 .column(0)
1325 .as_any()
1326 .downcast_ref::<Float64Array>()
1327 .unwrap()
1328 .values(),
1329 );
1330 total_rows_read += batch.num_rows();
1331 }
1332 assert_eq!(3, total_rows_read);
1333 assert_eq!(vec![0.1, 0.2, 0.0], field);
1334 }
1335
1336 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1337 let metadata = metadata_for_test();
1338 let kvs = key_values
1339 .into_iter()
1340 .map(|(k0, k1, (start, end), sequence)| {
1341 let ts = (start..end);
1342 let v1 = (start..end).map(|_| None);
1343 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1344 })
1345 .collect::<Vec<_>>();
1346 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1347 let primary_key_codec = build_primary_key_codec(&metadata);
1348 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1349 for kv in kvs {
1350 converter.append_key_values(&kv).unwrap();
1351 }
1352 let part = converter.convert().unwrap();
1353 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1354 encoder.encode_part(&part).unwrap().unwrap()
1355 }
1356
1357 fn check_prune_row_group(
1358 part: &EncodedBulkPart,
1359 predicate: Option<Predicate>,
1360 expected_rows: usize,
1361 ) {
1362 let context = Arc::new(
1363 BulkIterContext::new(
1364 part.metadata.region_metadata.clone(),
1365 None,
1366 predicate,
1367 false,
1368 )
1369 .unwrap(),
1370 );
1371 let mut reader = part
1372 .read(context, None, None)
1373 .unwrap()
1374 .expect("expect at least one row group");
1375 let mut total_rows_read = 0;
1376 for res in reader {
1377 let batch = res.unwrap();
1378 total_rows_read += batch.num_rows();
1379 }
1380 assert_eq!(expected_rows, total_rows_read);
1382 }
1383
1384 #[test]
1385 fn test_prune_row_groups() {
1386 let part = prepare(vec![
1387 ("a", 0, (0, 40), 1),
1388 ("a", 1, (0, 60), 1),
1389 ("b", 0, (0, 100), 2),
1390 ("b", 1, (100, 180), 3),
1391 ("b", 1, (180, 210), 4),
1392 ]);
1393
1394 let context = Arc::new(
1395 BulkIterContext::new(
1396 part.metadata.region_metadata.clone(),
1397 None,
1398 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1399 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1400 )])),
1401 false,
1402 )
1403 .unwrap(),
1404 );
1405 assert!(part.read(context, None, None).unwrap().is_none());
1406
1407 check_prune_row_group(&part, None, 310);
1408
1409 check_prune_row_group(
1410 &part,
1411 Some(Predicate::new(vec![
1412 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1413 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1414 ])),
1415 40,
1416 );
1417
1418 check_prune_row_group(
1419 &part,
1420 Some(Predicate::new(vec![
1421 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1422 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1423 ])),
1424 60,
1425 );
1426
1427 check_prune_row_group(
1428 &part,
1429 Some(Predicate::new(vec![
1430 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1431 ])),
1432 100,
1433 );
1434
1435 check_prune_row_group(
1436 &part,
1437 Some(Predicate::new(vec![
1438 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1439 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1440 ])),
1441 100,
1442 );
1443
1444 check_prune_row_group(
1446 &part,
1447 Some(Predicate::new(vec![
1448 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1449 ])),
1450 1,
1451 );
1452 }
1453
1454 #[test]
1455 fn test_bulk_part_converter_append_and_convert() {
1456 let metadata = metadata_for_test();
1457 let capacity = 100;
1458 let primary_key_codec = build_primary_key_codec(&metadata);
1459 let schema = to_flat_sst_arrow_schema(
1460 &metadata,
1461 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1462 );
1463
1464 let mut converter =
1465 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1466
1467 let key_values1 = build_key_values_with_ts_seq_values(
1468 &metadata,
1469 "key1".to_string(),
1470 1u32,
1471 vec![1000, 2000].into_iter(),
1472 vec![Some(1.0), Some(2.0)].into_iter(),
1473 1,
1474 );
1475
1476 let key_values2 = build_key_values_with_ts_seq_values(
1477 &metadata,
1478 "key2".to_string(),
1479 2u32,
1480 vec![1500].into_iter(),
1481 vec![Some(3.0)].into_iter(),
1482 2,
1483 );
1484
1485 converter.append_key_values(&key_values1).unwrap();
1486 converter.append_key_values(&key_values2).unwrap();
1487
1488 let bulk_part = converter.convert().unwrap();
1489
1490 assert_eq!(bulk_part.num_rows(), 3);
1491 assert_eq!(bulk_part.min_timestamp, 1000);
1492 assert_eq!(bulk_part.max_timestamp, 2000);
1493 assert_eq!(bulk_part.sequence, 2);
1494 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1495
1496 let schema = bulk_part.batch.schema();
1499 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1500 assert_eq!(
1501 field_names,
1502 vec![
1503 "k0",
1504 "k1",
1505 "v0",
1506 "v1",
1507 "ts",
1508 "__primary_key",
1509 "__sequence",
1510 "__op_type"
1511 ]
1512 );
1513 }
1514
1515 #[test]
1516 fn test_bulk_part_converter_sorting() {
1517 let metadata = metadata_for_test();
1518 let capacity = 100;
1519 let primary_key_codec = build_primary_key_codec(&metadata);
1520 let schema = to_flat_sst_arrow_schema(
1521 &metadata,
1522 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1523 );
1524
1525 let mut converter =
1526 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1527
1528 let key_values1 = build_key_values_with_ts_seq_values(
1529 &metadata,
1530 "z_key".to_string(),
1531 3u32,
1532 vec![3000].into_iter(),
1533 vec![Some(3.0)].into_iter(),
1534 3,
1535 );
1536
1537 let key_values2 = build_key_values_with_ts_seq_values(
1538 &metadata,
1539 "a_key".to_string(),
1540 1u32,
1541 vec![1000].into_iter(),
1542 vec![Some(1.0)].into_iter(),
1543 1,
1544 );
1545
1546 let key_values3 = build_key_values_with_ts_seq_values(
1547 &metadata,
1548 "m_key".to_string(),
1549 2u32,
1550 vec![2000].into_iter(),
1551 vec![Some(2.0)].into_iter(),
1552 2,
1553 );
1554
1555 converter.append_key_values(&key_values1).unwrap();
1556 converter.append_key_values(&key_values2).unwrap();
1557 converter.append_key_values(&key_values3).unwrap();
1558
1559 let bulk_part = converter.convert().unwrap();
1560
1561 assert_eq!(bulk_part.num_rows(), 3);
1562
1563 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1564 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1565
1566 let ts_array = ts_column
1567 .as_any()
1568 .downcast_ref::<TimestampMillisecondArray>()
1569 .unwrap();
1570 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1571
1572 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1573 assert_eq!(seq_array.values(), &[1, 2, 3]);
1574
1575 let schema = bulk_part.batch.schema();
1577 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1578 assert_eq!(
1579 field_names,
1580 vec![
1581 "k0",
1582 "k1",
1583 "v0",
1584 "v1",
1585 "ts",
1586 "__primary_key",
1587 "__sequence",
1588 "__op_type"
1589 ]
1590 );
1591 }
1592
1593 #[test]
1594 fn test_bulk_part_converter_empty() {
1595 let metadata = metadata_for_test();
1596 let capacity = 10;
1597 let primary_key_codec = build_primary_key_codec(&metadata);
1598 let schema = to_flat_sst_arrow_schema(
1599 &metadata,
1600 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1601 );
1602
1603 let converter =
1604 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1605
1606 let bulk_part = converter.convert().unwrap();
1607
1608 assert_eq!(bulk_part.num_rows(), 0);
1609 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1610 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1611 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1612
1613 let schema = bulk_part.batch.schema();
1615 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1616 assert_eq!(
1617 field_names,
1618 vec![
1619 "k0",
1620 "k1",
1621 "v0",
1622 "v1",
1623 "ts",
1624 "__primary_key",
1625 "__sequence",
1626 "__op_type"
1627 ]
1628 );
1629 }
1630
1631 #[test]
1632 fn test_bulk_part_converter_without_primary_key_columns() {
1633 let metadata = metadata_for_test();
1634 let primary_key_codec = build_primary_key_codec(&metadata);
1635 let schema = to_flat_sst_arrow_schema(
1636 &metadata,
1637 &FlatSchemaOptions {
1638 raw_pk_columns: false,
1639 string_pk_use_dict: true,
1640 },
1641 );
1642
1643 let capacity = 100;
1644 let mut converter =
1645 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1646
1647 let key_values1 = build_key_values_with_ts_seq_values(
1648 &metadata,
1649 "key1".to_string(),
1650 1u32,
1651 vec![1000, 2000].into_iter(),
1652 vec![Some(1.0), Some(2.0)].into_iter(),
1653 1,
1654 );
1655
1656 let key_values2 = build_key_values_with_ts_seq_values(
1657 &metadata,
1658 "key2".to_string(),
1659 2u32,
1660 vec![1500].into_iter(),
1661 vec![Some(3.0)].into_iter(),
1662 2,
1663 );
1664
1665 converter.append_key_values(&key_values1).unwrap();
1666 converter.append_key_values(&key_values2).unwrap();
1667
1668 let bulk_part = converter.convert().unwrap();
1669
1670 assert_eq!(bulk_part.num_rows(), 3);
1671 assert_eq!(bulk_part.min_timestamp, 1000);
1672 assert_eq!(bulk_part.max_timestamp, 2000);
1673 assert_eq!(bulk_part.sequence, 2);
1674 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1675
1676 let schema = bulk_part.batch.schema();
1678 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1679 assert_eq!(
1680 field_names,
1681 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1682 );
1683 }
1684
1685 #[allow(clippy::too_many_arguments)]
1686 fn build_key_values_with_sparse_encoding(
1687 metadata: &RegionMetadataRef,
1688 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1689 table_id: u32,
1690 tsid: u64,
1691 k0: String,
1692 k1: String,
1693 timestamps: impl Iterator<Item = i64>,
1694 values: impl Iterator<Item = Option<f64>>,
1695 sequence: SequenceNumber,
1696 ) -> KeyValues {
1697 let pk_values = vec![
1699 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1700 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1701 (0, Value::String(k0.clone().into())),
1702 (1, Value::String(k1.clone().into())),
1703 ];
1704 let mut encoded_key = Vec::new();
1705 primary_key_codec
1706 .encode_values(&pk_values, &mut encoded_key)
1707 .unwrap();
1708 assert!(!encoded_key.is_empty());
1709
1710 let column_schema = vec![
1712 api::v1::ColumnSchema {
1713 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1714 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1715 ConcreteDataType::binary_datatype(),
1716 )
1717 .unwrap()
1718 .datatype() as i32,
1719 semantic_type: api::v1::SemanticType::Tag as i32,
1720 ..Default::default()
1721 },
1722 api::v1::ColumnSchema {
1723 column_name: "ts".to_string(),
1724 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1725 ConcreteDataType::timestamp_millisecond_datatype(),
1726 )
1727 .unwrap()
1728 .datatype() as i32,
1729 semantic_type: api::v1::SemanticType::Timestamp as i32,
1730 ..Default::default()
1731 },
1732 api::v1::ColumnSchema {
1733 column_name: "v0".to_string(),
1734 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1735 ConcreteDataType::int64_datatype(),
1736 )
1737 .unwrap()
1738 .datatype() as i32,
1739 semantic_type: api::v1::SemanticType::Field as i32,
1740 ..Default::default()
1741 },
1742 api::v1::ColumnSchema {
1743 column_name: "v1".to_string(),
1744 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1745 ConcreteDataType::float64_datatype(),
1746 )
1747 .unwrap()
1748 .datatype() as i32,
1749 semantic_type: api::v1::SemanticType::Field as i32,
1750 ..Default::default()
1751 },
1752 ];
1753
1754 let rows = timestamps
1755 .zip(values)
1756 .map(|(ts, v)| Row {
1757 values: vec![
1758 api::v1::Value {
1759 value_data: Some(api::v1::value::ValueData::BinaryValue(
1760 encoded_key.clone(),
1761 )),
1762 },
1763 api::v1::Value {
1764 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1765 },
1766 api::v1::Value {
1767 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1768 },
1769 api::v1::Value {
1770 value_data: v.map(api::v1::value::ValueData::F64Value),
1771 },
1772 ],
1773 })
1774 .collect();
1775
1776 let mutation = api::v1::Mutation {
1777 op_type: 1,
1778 sequence,
1779 rows: Some(api::v1::Rows {
1780 schema: column_schema,
1781 rows,
1782 }),
1783 write_hint: Some(WriteHint {
1784 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1785 }),
1786 };
1787 KeyValues::new(metadata.as_ref(), mutation).unwrap()
1788 }
1789
1790 #[test]
1791 fn test_bulk_part_converter_sparse_primary_key_encoding() {
1792 use api::v1::SemanticType;
1793 use datatypes::schema::ColumnSchema;
1794 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1795 use store_api::storage::RegionId;
1796
1797 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1798 builder
1799 .push_column_metadata(ColumnMetadata {
1800 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1801 semantic_type: SemanticType::Tag,
1802 column_id: 0,
1803 })
1804 .push_column_metadata(ColumnMetadata {
1805 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1806 semantic_type: SemanticType::Tag,
1807 column_id: 1,
1808 })
1809 .push_column_metadata(ColumnMetadata {
1810 column_schema: ColumnSchema::new(
1811 "ts",
1812 ConcreteDataType::timestamp_millisecond_datatype(),
1813 false,
1814 ),
1815 semantic_type: SemanticType::Timestamp,
1816 column_id: 2,
1817 })
1818 .push_column_metadata(ColumnMetadata {
1819 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1820 semantic_type: SemanticType::Field,
1821 column_id: 3,
1822 })
1823 .push_column_metadata(ColumnMetadata {
1824 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1825 semantic_type: SemanticType::Field,
1826 column_id: 4,
1827 })
1828 .primary_key(vec![0, 1])
1829 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1830 let metadata = Arc::new(builder.build().unwrap());
1831
1832 let primary_key_codec = build_primary_key_codec(&metadata);
1833 let schema = to_flat_sst_arrow_schema(
1834 &metadata,
1835 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1836 );
1837
1838 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1839 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1840
1841 let capacity = 100;
1842 let mut converter =
1843 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1844
1845 let key_values1 = build_key_values_with_sparse_encoding(
1846 &metadata,
1847 &primary_key_codec,
1848 2048u32, 100u64, "key11".to_string(),
1851 "key21".to_string(),
1852 vec![1000, 2000].into_iter(),
1853 vec![Some(1.0), Some(2.0)].into_iter(),
1854 1,
1855 );
1856
1857 let key_values2 = build_key_values_with_sparse_encoding(
1858 &metadata,
1859 &primary_key_codec,
1860 4096u32, 200u64, "key12".to_string(),
1863 "key22".to_string(),
1864 vec![1500].into_iter(),
1865 vec![Some(3.0)].into_iter(),
1866 2,
1867 );
1868
1869 converter.append_key_values(&key_values1).unwrap();
1870 converter.append_key_values(&key_values2).unwrap();
1871
1872 let bulk_part = converter.convert().unwrap();
1873
1874 assert_eq!(bulk_part.num_rows(), 3);
1875 assert_eq!(bulk_part.min_timestamp, 1000);
1876 assert_eq!(bulk_part.max_timestamp, 2000);
1877 assert_eq!(bulk_part.sequence, 2);
1878 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1879
1880 let schema = bulk_part.batch.schema();
1884 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1885 assert_eq!(
1886 field_names,
1887 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1888 );
1889
1890 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1892 let dict_array = primary_key_column
1893 .as_any()
1894 .downcast_ref::<DictionaryArray<UInt32Type>>()
1895 .unwrap();
1896
1897 assert!(!dict_array.is_empty());
1899 assert_eq!(dict_array.len(), 3); let values = dict_array
1903 .values()
1904 .as_any()
1905 .downcast_ref::<BinaryArray>()
1906 .unwrap();
1907 for i in 0..values.len() {
1908 assert!(
1909 !values.value(i).is_empty(),
1910 "Encoded primary key should not be empty"
1911 );
1912 }
1913 }
1914
1915 #[test]
1916 fn test_convert_bulk_part_empty() {
1917 let metadata = metadata_for_test();
1918 let schema = to_flat_sst_arrow_schema(
1919 &metadata,
1920 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1921 );
1922 let primary_key_codec = build_primary_key_codec(&metadata);
1923
1924 let empty_batch = RecordBatch::new_empty(schema.clone());
1926 let empty_part = BulkPart {
1927 batch: empty_batch,
1928 max_timestamp: 0,
1929 min_timestamp: 0,
1930 sequence: 0,
1931 timestamp_index: 0,
1932 raw_data: None,
1933 };
1934
1935 let result =
1936 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1937 assert!(result.is_none());
1938 }
1939
1940 #[test]
1941 fn test_convert_bulk_part_dense_with_pk_columns() {
1942 let metadata = metadata_for_test();
1943 let primary_key_codec = build_primary_key_codec(&metadata);
1944
1945 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1946 "key1", "key2", "key1",
1947 ]));
1948 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1949 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1950 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1951 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1952
1953 let input_schema = Arc::new(Schema::new(vec![
1954 Field::new("k0", ArrowDataType::Utf8, false),
1955 Field::new("k1", ArrowDataType::UInt32, false),
1956 Field::new("v0", ArrowDataType::Int64, true),
1957 Field::new("v1", ArrowDataType::Float64, true),
1958 Field::new(
1959 "ts",
1960 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1961 false,
1962 ),
1963 ]));
1964
1965 let input_batch = RecordBatch::try_new(
1966 input_schema,
1967 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
1968 )
1969 .unwrap();
1970
1971 let part = BulkPart {
1972 batch: input_batch,
1973 max_timestamp: 2000,
1974 min_timestamp: 1000,
1975 sequence: 5,
1976 timestamp_index: 4,
1977 raw_data: None,
1978 };
1979
1980 let output_schema = to_flat_sst_arrow_schema(
1981 &metadata,
1982 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1983 );
1984
1985 let result = convert_bulk_part(
1986 part,
1987 &metadata,
1988 primary_key_codec,
1989 output_schema,
1990 true, )
1992 .unwrap();
1993
1994 let converted = result.unwrap();
1995
1996 assert_eq!(converted.num_rows(), 3);
1997 assert_eq!(converted.max_timestamp, 2000);
1998 assert_eq!(converted.min_timestamp, 1000);
1999 assert_eq!(converted.sequence, 5);
2000
2001 let schema = converted.batch.schema();
2002 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2003 assert_eq!(
2004 field_names,
2005 vec![
2006 "k0",
2007 "k1",
2008 "v0",
2009 "v1",
2010 "ts",
2011 "__primary_key",
2012 "__sequence",
2013 "__op_type"
2014 ]
2015 );
2016
2017 let k0_col = converted.batch.column_by_name("k0").unwrap();
2018 assert!(matches!(
2019 k0_col.data_type(),
2020 ArrowDataType::Dictionary(_, _)
2021 ));
2022
2023 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2024 let dict_array = pk_col
2025 .as_any()
2026 .downcast_ref::<DictionaryArray<UInt32Type>>()
2027 .unwrap();
2028 let keys = dict_array.keys();
2029
2030 assert_eq!(keys.len(), 3);
2031 }
2032
2033 #[test]
2034 fn test_convert_bulk_part_dense_without_pk_columns() {
2035 let metadata = metadata_for_test();
2036 let primary_key_codec = build_primary_key_codec(&metadata);
2037
2038 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2040 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2041 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2042 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2043 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2044
2045 let input_schema = Arc::new(Schema::new(vec![
2046 Field::new("k0", ArrowDataType::Utf8, false),
2047 Field::new("k1", ArrowDataType::UInt32, false),
2048 Field::new("v0", ArrowDataType::Int64, true),
2049 Field::new("v1", ArrowDataType::Float64, true),
2050 Field::new(
2051 "ts",
2052 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2053 false,
2054 ),
2055 ]));
2056
2057 let input_batch = RecordBatch::try_new(
2058 input_schema,
2059 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2060 )
2061 .unwrap();
2062
2063 let part = BulkPart {
2064 batch: input_batch,
2065 max_timestamp: 2000,
2066 min_timestamp: 1000,
2067 sequence: 3,
2068 timestamp_index: 4,
2069 raw_data: None,
2070 };
2071
2072 let output_schema = to_flat_sst_arrow_schema(
2073 &metadata,
2074 &FlatSchemaOptions {
2075 raw_pk_columns: false,
2076 string_pk_use_dict: true,
2077 },
2078 );
2079
2080 let result = convert_bulk_part(
2081 part,
2082 &metadata,
2083 primary_key_codec,
2084 output_schema,
2085 false, )
2087 .unwrap();
2088
2089 let converted = result.unwrap();
2090
2091 assert_eq!(converted.num_rows(), 2);
2092 assert_eq!(converted.max_timestamp, 2000);
2093 assert_eq!(converted.min_timestamp, 1000);
2094 assert_eq!(converted.sequence, 3);
2095
2096 let schema = converted.batch.schema();
2098 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2099 assert_eq!(
2100 field_names,
2101 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2102 );
2103
2104 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2106 assert!(matches!(
2107 pk_col.data_type(),
2108 ArrowDataType::Dictionary(_, _)
2109 ));
2110 }
2111
2112 #[test]
2113 fn test_convert_bulk_part_sparse_encoding() {
2114 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2115 builder
2116 .push_column_metadata(ColumnMetadata {
2117 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2118 semantic_type: SemanticType::Tag,
2119 column_id: 0,
2120 })
2121 .push_column_metadata(ColumnMetadata {
2122 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2123 semantic_type: SemanticType::Tag,
2124 column_id: 1,
2125 })
2126 .push_column_metadata(ColumnMetadata {
2127 column_schema: ColumnSchema::new(
2128 "ts",
2129 ConcreteDataType::timestamp_millisecond_datatype(),
2130 false,
2131 ),
2132 semantic_type: SemanticType::Timestamp,
2133 column_id: 2,
2134 })
2135 .push_column_metadata(ColumnMetadata {
2136 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2137 semantic_type: SemanticType::Field,
2138 column_id: 3,
2139 })
2140 .push_column_metadata(ColumnMetadata {
2141 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2142 semantic_type: SemanticType::Field,
2143 column_id: 4,
2144 })
2145 .primary_key(vec![0, 1])
2146 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2147 let metadata = Arc::new(builder.build().unwrap());
2148
2149 let primary_key_codec = build_primary_key_codec(&metadata);
2150
2151 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2153 b"encoded_key_1".as_slice(),
2154 b"encoded_key_2".as_slice(),
2155 ]));
2156 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2157 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2158 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2159
2160 let input_schema = Arc::new(Schema::new(vec![
2161 Field::new("__primary_key", ArrowDataType::Binary, false),
2162 Field::new("v0", ArrowDataType::Int64, true),
2163 Field::new("v1", ArrowDataType::Float64, true),
2164 Field::new(
2165 "ts",
2166 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2167 false,
2168 ),
2169 ]));
2170
2171 let input_batch =
2172 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2173 .unwrap();
2174
2175 let part = BulkPart {
2176 batch: input_batch,
2177 max_timestamp: 2000,
2178 min_timestamp: 1000,
2179 sequence: 7,
2180 timestamp_index: 3,
2181 raw_data: None,
2182 };
2183
2184 let output_schema = to_flat_sst_arrow_schema(
2185 &metadata,
2186 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2187 );
2188
2189 let result = convert_bulk_part(
2190 part,
2191 &metadata,
2192 primary_key_codec,
2193 output_schema,
2194 true, )
2196 .unwrap();
2197
2198 let converted = result.unwrap();
2199
2200 assert_eq!(converted.num_rows(), 2);
2201 assert_eq!(converted.max_timestamp, 2000);
2202 assert_eq!(converted.min_timestamp, 1000);
2203 assert_eq!(converted.sequence, 7);
2204
2205 let schema = converted.batch.schema();
2207 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2208 assert_eq!(
2209 field_names,
2210 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2211 );
2212
2213 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2215 assert!(matches!(
2216 pk_col.data_type(),
2217 ArrowDataType::Dictionary(_, _)
2218 ));
2219 }
2220
2221 #[test]
2222 fn test_convert_bulk_part_sorting_with_multiple_series() {
2223 let metadata = metadata_for_test();
2224 let primary_key_codec = build_primary_key_codec(&metadata);
2225
2226 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2228 "series_b", "series_a", "series_b", "series_a",
2229 ]));
2230 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2231 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2232 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2233 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2234 2000, 1000, 4000, 3000,
2235 ]));
2236
2237 let input_schema = Arc::new(Schema::new(vec![
2238 Field::new("k0", ArrowDataType::Utf8, false),
2239 Field::new("k1", ArrowDataType::UInt32, false),
2240 Field::new("v0", ArrowDataType::Int64, true),
2241 Field::new("v1", ArrowDataType::Float64, true),
2242 Field::new(
2243 "ts",
2244 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2245 false,
2246 ),
2247 ]));
2248
2249 let input_batch = RecordBatch::try_new(
2250 input_schema,
2251 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2252 )
2253 .unwrap();
2254
2255 let part = BulkPart {
2256 batch: input_batch,
2257 max_timestamp: 4000,
2258 min_timestamp: 1000,
2259 sequence: 10,
2260 timestamp_index: 4,
2261 raw_data: None,
2262 };
2263
2264 let output_schema = to_flat_sst_arrow_schema(
2265 &metadata,
2266 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2267 );
2268
2269 let result =
2270 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2271
2272 let converted = result.unwrap();
2273
2274 assert_eq!(converted.num_rows(), 4);
2275
2276 let ts_col = converted.batch.column(converted.timestamp_index);
2278 let ts_array = ts_col
2279 .as_any()
2280 .downcast_ref::<TimestampMillisecondArray>()
2281 .unwrap();
2282
2283 let timestamps: Vec<i64> = ts_array.values().to_vec();
2287 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2288 }
2289}