1use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31 Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32 StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34 UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber, SequenceRange};
57use table::predicate::Predicate;
58
59use crate::error::{
60 self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61 EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::bulk::context::BulkIterContextRef;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
71use crate::sst::parquet::helper::parse_parquet_metadata;
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77#[derive(Clone)]
79pub struct BulkPart {
80 pub batch: RecordBatch,
81 pub max_timestamp: i64,
82 pub min_timestamp: i64,
83 pub sequence: u64,
84 pub timestamp_index: usize,
85 pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89 type Error = error::Error;
90
91 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92 match value.body.expect("Entry payload should be present") {
93 Body::ArrowIpc(ipc) => {
94 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95 .context(error::ConvertBulkWalEntrySnafu)?;
96 let batch = decoder
97 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 Ok(Self {
100 batch,
101 max_timestamp: value.max_ts,
102 min_timestamp: value.min_ts,
103 sequence: value.sequence,
104 timestamp_index: value.timestamp_index as usize,
105 raw_data: Some(ipc),
106 })
107 }
108 }
109 }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113 type Error = error::Error;
114
115 fn try_from(value: &BulkPart) -> Result<Self> {
116 if let Some(ipc) = &value.raw_data {
117 Ok(BulkWalEntry {
118 sequence: value.sequence,
119 max_ts: value.max_timestamp,
120 min_ts: value.min_timestamp,
121 timestamp_index: value.timestamp_index as u32,
122 body: Some(Body::ArrowIpc(ipc.clone())),
123 })
124 } else {
125 let mut encoder = FlightEncoder::default();
126 let schema_bytes = encoder
127 .encode_schema(value.batch.schema().as_ref())
128 .data_header;
129 let [rb_data] = encoder
130 .encode(FlightMessage::RecordBatch(value.batch.clone()))
131 .try_into()
132 .map_err(|_| {
133 error::UnsupportedOperationSnafu {
134 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135 }
136 .build()
137 })?;
138 Ok(BulkWalEntry {
139 sequence: value.sequence,
140 max_ts: value.max_timestamp,
141 min_ts: value.min_timestamp,
142 timestamp_index: value.timestamp_index as u32,
143 body: Some(Body::ArrowIpc(ArrowIpc {
144 schema: schema_bytes,
145 data_header: rb_data.data_header,
146 payload: rb_data.data_body,
147 })),
148 })
149 }
150 }
151}
152
153impl BulkPart {
154 pub(crate) fn estimated_size(&self) -> usize {
155 record_batch_estimated_size(&self.batch)
156 }
157
158 pub fn estimated_series_count(&self) -> usize {
161 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162 let pk_column = self.batch.column(pk_column_idx);
163 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164 dict_array.values().len()
165 } else {
166 0
167 }
168 }
169
170 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
172 let vectors = region_metadata
173 .schema
174 .column_schemas()
175 .iter()
176 .map(|col| match self.batch.column_by_name(&col.name) {
177 None => Ok(None),
178 Some(col) => Helper::try_into_vector(col).map(Some),
179 })
180 .collect::<datatypes::error::Result<Vec<_>>>()
181 .context(error::ComputeVectorSnafu)?;
182
183 let rows = (0..self.num_rows())
184 .map(|row_idx| {
185 let values = (0..self.batch.num_columns())
186 .map(|col_idx| {
187 if let Some(v) = &vectors[col_idx] {
188 value_to_grpc_value(v.get(row_idx))
189 } else {
190 api::v1::Value { value_data: None }
191 }
192 })
193 .collect::<Vec<_>>();
194 api::v1::Row { values }
195 })
196 .collect::<Vec<_>>();
197
198 let schema = region_metadata
199 .column_metadatas
200 .iter()
201 .map(|c| {
202 let data_type_wrapper =
203 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
204 Ok(api::v1::ColumnSchema {
205 column_name: c.column_schema.name.clone(),
206 datatype: data_type_wrapper.datatype() as i32,
207 semantic_type: c.semantic_type as i32,
208 ..Default::default()
209 })
210 })
211 .collect::<api::error::Result<Vec<_>>>()
212 .context(error::ConvertColumnDataTypeSnafu {
213 reason: "failed to convert region metadata to column schema",
214 })?;
215
216 let rows = api::v1::Rows { schema, rows };
217
218 Ok(Mutation {
219 op_type: OpType::Put as i32,
220 sequence: self.sequence,
221 rows: Some(rows),
222 write_hint: None,
223 })
224 }
225
226 pub fn timestamps(&self) -> &ArrayRef {
227 self.batch.column(self.timestamp_index)
228 }
229
230 pub fn num_rows(&self) -> usize {
231 self.batch.num_rows()
232 }
233}
234
235pub struct UnorderedPart {
238 parts: Vec<BulkPart>,
240 total_rows: usize,
242 min_timestamp: i64,
244 max_timestamp: i64,
246 max_sequence: u64,
248 threshold: usize,
250 compact_threshold: usize,
252}
253
254impl Default for UnorderedPart {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260impl UnorderedPart {
261 pub fn new() -> Self {
263 Self {
264 parts: Vec::new(),
265 total_rows: 0,
266 min_timestamp: i64::MAX,
267 max_timestamp: i64::MIN,
268 max_sequence: 0,
269 threshold: 1024,
270 compact_threshold: 4096,
271 }
272 }
273
274 pub fn set_threshold(&mut self, threshold: usize) {
276 self.threshold = threshold;
277 }
278
279 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
281 self.compact_threshold = compact_threshold;
282 }
283
284 pub fn threshold(&self) -> usize {
286 self.threshold
287 }
288
289 pub fn compact_threshold(&self) -> usize {
291 self.compact_threshold
292 }
293
294 pub fn should_accept(&self, num_rows: usize) -> bool {
296 num_rows < self.threshold
297 }
298
299 pub fn should_compact(&self) -> bool {
301 self.total_rows >= self.compact_threshold
302 }
303
304 pub fn push(&mut self, part: BulkPart) {
306 self.total_rows += part.num_rows();
307 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
308 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
309 self.max_sequence = self.max_sequence.max(part.sequence);
310 self.parts.push(part);
311 }
312
313 pub fn num_rows(&self) -> usize {
315 self.total_rows
316 }
317
318 pub fn is_empty(&self) -> bool {
320 self.parts.is_empty()
321 }
322
323 pub fn num_parts(&self) -> usize {
325 self.parts.len()
326 }
327
328 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
331 if self.parts.is_empty() {
332 return Ok(None);
333 }
334
335 if self.parts.len() == 1 {
336 return Ok(Some(self.parts[0].batch.clone()));
338 }
339
340 let schema = self.parts[0].batch.schema();
342
343 let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
345 let concatenated =
346 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
347
348 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
350
351 Ok(Some(sorted_batch))
352 }
353
354 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
357 let Some(sorted_batch) = self.concat_and_sort()? else {
358 return Ok(None);
359 };
360
361 let timestamp_index = self.parts[0].timestamp_index;
362
363 Ok(Some(BulkPart {
364 batch: sorted_batch,
365 max_timestamp: self.max_timestamp,
366 min_timestamp: self.min_timestamp,
367 sequence: self.max_sequence,
368 timestamp_index,
369 raw_data: None,
370 }))
371 }
372
373 pub fn clear(&mut self) {
375 self.parts.clear();
376 self.total_rows = 0;
377 self.min_timestamp = i64::MAX;
378 self.max_timestamp = i64::MIN;
379 self.max_sequence = 0;
380 }
381}
382
383pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
385 batch
386 .columns()
387 .iter()
388 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
390 .sum()
391}
392
393enum PrimaryKeyColumnBuilder {
395 StringDict(StringDictionaryBuilder<UInt32Type>),
397 Vector(Box<dyn MutableVector>),
399}
400
401impl PrimaryKeyColumnBuilder {
402 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
404 match self {
405 PrimaryKeyColumnBuilder::StringDict(builder) => {
406 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
407 builder.append_value(s);
409 } else {
410 builder.append_null();
411 }
412 }
413 PrimaryKeyColumnBuilder::Vector(builder) => {
414 builder.push_value_ref(&value);
415 }
416 }
417 Ok(())
418 }
419
420 fn into_arrow_array(self) -> ArrayRef {
422 match self {
423 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
424 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
425 }
426 }
427}
428
429pub struct BulkPartConverter {
431 region_metadata: RegionMetadataRef,
433 schema: SchemaRef,
435 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
437 key_buf: Vec<u8>,
439 key_array_builder: PrimaryKeyArrayBuilder,
441 value_builder: ValueBuilder,
443 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
446
447 max_ts: i64,
449 min_ts: i64,
451 max_sequence: SequenceNumber,
453}
454
455impl BulkPartConverter {
456 pub fn new(
461 region_metadata: &RegionMetadataRef,
462 schema: SchemaRef,
463 capacity: usize,
464 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
465 store_primary_key_columns: bool,
466 ) -> Self {
467 debug_assert_eq!(
468 region_metadata.primary_key_encoding,
469 primary_key_codec.encoding()
470 );
471
472 let primary_key_column_builders = if store_primary_key_columns
473 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
474 {
475 new_primary_key_column_builders(region_metadata, capacity)
476 } else {
477 Vec::new()
478 };
479
480 Self {
481 region_metadata: region_metadata.clone(),
482 schema,
483 primary_key_codec,
484 key_buf: Vec::new(),
485 key_array_builder: PrimaryKeyArrayBuilder::new(),
486 value_builder: ValueBuilder::new(region_metadata, capacity),
487 primary_key_column_builders,
488 min_ts: i64::MAX,
489 max_ts: i64::MIN,
490 max_sequence: SequenceNumber::MIN,
491 }
492 }
493
494 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
496 for kv in key_values.iter() {
497 self.append_key_value(&kv)?;
498 }
499
500 Ok(())
501 }
502
503 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
507 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
509 let mut primary_keys = kv.primary_keys();
512 if let Some(encoded) = primary_keys
513 .next()
514 .context(ColumnNotFoundSnafu {
515 column: PRIMARY_KEY_COLUMN_NAME,
516 })?
517 .try_into_binary()
518 .context(DataTypeMismatchSnafu)?
519 {
520 self.key_array_builder
521 .append(encoded)
522 .context(ComputeArrowSnafu)?;
523 } else {
524 self.key_array_builder
525 .append("")
526 .context(ComputeArrowSnafu)?;
527 }
528 } else {
529 self.key_buf.clear();
531 self.primary_key_codec
532 .encode_key_value(kv, &mut self.key_buf)
533 .context(EncodeSnafu)?;
534 self.key_array_builder
535 .append(&self.key_buf)
536 .context(ComputeArrowSnafu)?;
537 };
538
539 if !self.primary_key_column_builders.is_empty() {
541 for (builder, pk_value) in self
542 .primary_key_column_builders
543 .iter_mut()
544 .zip(kv.primary_keys())
545 {
546 builder.push_value_ref(pk_value)?;
547 }
548 }
549
550 self.value_builder.push(
552 kv.timestamp(),
553 kv.sequence(),
554 kv.op_type() as u8,
555 kv.fields(),
556 );
557
558 let ts = kv
561 .timestamp()
562 .try_into_timestamp()
563 .unwrap()
564 .unwrap()
565 .value();
566 self.min_ts = self.min_ts.min(ts);
567 self.max_ts = self.max_ts.max(ts);
568 self.max_sequence = self.max_sequence.max(kv.sequence());
569
570 Ok(())
571 }
572
573 pub fn convert(mut self) -> Result<BulkPart> {
577 let values = Values::from(self.value_builder);
578 let mut columns =
579 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
580
581 for builder in self.primary_key_column_builders {
583 columns.push(builder.into_arrow_array());
584 }
585 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
587 let timestamp_index = columns.len();
589 columns.push(values.timestamp.to_arrow_array());
590 let pk_array = self.key_array_builder.finish();
592 columns.push(Arc::new(pk_array));
593 columns.push(values.sequence.to_arrow_array());
595 columns.push(values.op_type.to_arrow_array());
596
597 let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
598 let batch = sort_primary_key_record_batch(&batch)?;
600
601 Ok(BulkPart {
602 batch,
603 max_timestamp: self.max_ts,
604 min_timestamp: self.min_ts,
605 sequence: self.max_sequence,
606 timestamp_index,
607 raw_data: None,
608 })
609 }
610}
611
612fn new_primary_key_column_builders(
613 metadata: &RegionMetadata,
614 capacity: usize,
615) -> Vec<PrimaryKeyColumnBuilder> {
616 metadata
617 .primary_key_columns()
618 .map(|col| {
619 if col.column_schema.data_type.is_string() {
620 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
621 capacity,
622 INIT_DICT_VALUE_CAPACITY,
623 capacity,
624 ))
625 } else {
626 PrimaryKeyColumnBuilder::Vector(
627 col.column_schema.data_type.create_mutable_vector(capacity),
628 )
629 }
630 })
631 .collect()
632}
633
634fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
636 let total_columns = batch.num_columns();
637 let sort_columns = vec![
638 SortColumn {
640 values: batch.column(total_columns - 3).clone(),
641 options: Some(SortOptions {
642 descending: false,
643 nulls_first: true,
644 }),
645 },
646 SortColumn {
648 values: batch.column(total_columns - 4).clone(),
649 options: Some(SortOptions {
650 descending: false,
651 nulls_first: true,
652 }),
653 },
654 SortColumn {
656 values: batch.column(total_columns - 2).clone(),
657 options: Some(SortOptions {
658 descending: true,
659 nulls_first: true,
660 }),
661 },
662 ];
663
664 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
665 .context(ComputeArrowSnafu)?;
666
667 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
668}
669
670#[derive(Debug, Clone)]
671pub struct EncodedBulkPart {
672 data: Bytes,
673 metadata: BulkPartMeta,
674}
675
676impl EncodedBulkPart {
677 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
678 Self { data, metadata }
679 }
680
681 pub(crate) fn metadata(&self) -> &BulkPartMeta {
682 &self.metadata
683 }
684
685 pub(crate) fn size_bytes(&self) -> usize {
687 self.data.len()
688 }
689
690 pub(crate) fn data(&self) -> &Bytes {
692 &self.data
693 }
694
695 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
703 let unit = self.metadata.region_metadata.time_index_type().unit();
704 SstInfo {
705 file_id,
706 time_range: (
707 Timestamp::new(self.metadata.min_timestamp, unit),
708 Timestamp::new(self.metadata.max_timestamp, unit),
709 ),
710 file_size: self.data.len() as u64,
711 num_rows: self.metadata.num_rows,
712 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
713 file_metadata: Some(self.metadata.parquet_metadata.clone()),
714 index_metadata: IndexOutput::default(),
715 num_series: self.metadata.num_series,
716 }
717 }
718
719 pub(crate) fn read(
720 &self,
721 context: BulkIterContextRef,
722 sequence: Option<SequenceRange>,
723 mem_scan_metrics: Option<MemScanMetrics>,
724 ) -> Result<Option<BoxedRecordBatchIterator>> {
725 let skip_fields_for_pruning =
727 Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
728
729 let row_groups_to_read =
731 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
732
733 if row_groups_to_read.is_empty() {
734 return Ok(None);
736 }
737
738 let iter = EncodedBulkPartIter::try_new(
739 self,
740 context,
741 row_groups_to_read,
742 sequence,
743 mem_scan_metrics,
744 )?;
745 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
746 }
747
748 fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
750 match pre_filter_mode {
751 PreFilterMode::All => false,
752 PreFilterMode::SkipFields => true,
753 PreFilterMode::SkipFieldsOnDelete => {
754 (0..parquet_meta.num_row_groups()).any(|rg_idx| {
756 row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
757 })
758 }
759 }
760 }
761}
762
763#[derive(Debug, Clone)]
764pub struct BulkPartMeta {
765 pub num_rows: usize,
767 pub max_timestamp: i64,
769 pub min_timestamp: i64,
771 pub parquet_metadata: Arc<ParquetMetaData>,
773 pub region_metadata: RegionMetadataRef,
775 pub num_series: u64,
777}
778
779#[derive(Default, Debug)]
781pub struct BulkPartEncodeMetrics {
782 pub iter_cost: Duration,
784 pub write_cost: Duration,
786 pub raw_size: usize,
788 pub encoded_size: usize,
790 pub num_rows: usize,
792}
793
794pub struct BulkPartEncoder {
795 metadata: RegionMetadataRef,
796 row_group_size: usize,
797 writer_props: Option<WriterProperties>,
798}
799
800impl BulkPartEncoder {
801 pub(crate) fn new(
802 metadata: RegionMetadataRef,
803 row_group_size: usize,
804 ) -> Result<BulkPartEncoder> {
805 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
807 let key_value_meta =
808 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
809
810 let writer_props = Some(
812 WriterProperties::builder()
813 .set_key_value_metadata(Some(vec![key_value_meta]))
814 .set_write_batch_size(row_group_size)
815 .set_max_row_group_size(row_group_size)
816 .set_compression(Compression::ZSTD(ZstdLevel::default()))
817 .set_column_index_truncate_length(None)
818 .set_statistics_truncate_length(None)
819 .build(),
820 );
821
822 Ok(Self {
823 metadata,
824 row_group_size,
825 writer_props,
826 })
827 }
828}
829
830impl BulkPartEncoder {
831 pub fn encode_record_batch_iter(
833 &self,
834 iter: BoxedRecordBatchIterator,
835 arrow_schema: SchemaRef,
836 min_timestamp: i64,
837 max_timestamp: i64,
838 metrics: &mut BulkPartEncodeMetrics,
839 ) -> Result<Option<EncodedBulkPart>> {
840 let mut buf = Vec::with_capacity(4096);
841 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
842 .context(EncodeMemtableSnafu)?;
843 let mut total_rows = 0;
844 let mut series_estimator = SeriesEstimator::default();
845
846 let mut iter_start = Instant::now();
848 for batch_result in iter {
849 metrics.iter_cost += iter_start.elapsed();
850 let batch = batch_result?;
851 if batch.num_rows() == 0 {
852 continue;
853 }
854
855 series_estimator.update_flat(&batch);
856 metrics.raw_size += record_batch_estimated_size(&batch);
857 let write_start = Instant::now();
858 writer.write(&batch).context(EncodeMemtableSnafu)?;
859 metrics.write_cost += write_start.elapsed();
860 total_rows += batch.num_rows();
861 iter_start = Instant::now();
862 }
863 metrics.iter_cost += iter_start.elapsed();
864 iter_start = Instant::now();
865
866 if total_rows == 0 {
867 return Ok(None);
868 }
869
870 let close_start = Instant::now();
871 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
872 metrics.write_cost += close_start.elapsed();
873 metrics.encoded_size += buf.len();
874 metrics.num_rows += total_rows;
875
876 let buf = Bytes::from(buf);
877 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
878 let num_series = series_estimator.finish();
879
880 Ok(Some(EncodedBulkPart {
881 data: buf,
882 metadata: BulkPartMeta {
883 num_rows: total_rows,
884 max_timestamp,
885 min_timestamp,
886 parquet_metadata,
887 region_metadata: self.metadata.clone(),
888 num_series,
889 },
890 }))
891 }
892
893 fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
895 if part.batch.num_rows() == 0 {
896 return Ok(None);
897 }
898
899 let mut buf = Vec::with_capacity(4096);
900 let arrow_schema = part.batch.schema();
901
902 let file_metadata = {
903 let mut writer =
904 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
905 .context(EncodeMemtableSnafu)?;
906 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
907 writer.finish().context(EncodeMemtableSnafu)?
908 };
909
910 let buf = Bytes::from(buf);
911 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
912
913 Ok(Some(EncodedBulkPart {
914 data: buf,
915 metadata: BulkPartMeta {
916 num_rows: part.batch.num_rows(),
917 max_timestamp: part.max_timestamp,
918 min_timestamp: part.min_timestamp,
919 parquet_metadata,
920 region_metadata: self.metadata.clone(),
921 num_series: part.estimated_series_count() as u64,
922 },
923 }))
924 }
925}
926
927fn mutations_to_record_batch(
929 mutations: &[Mutation],
930 metadata: &RegionMetadataRef,
931 pk_encoder: &DensePrimaryKeyCodec,
932 dedup: bool,
933) -> Result<Option<(RecordBatch, i64, i64)>> {
934 let total_rows: usize = mutations
935 .iter()
936 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
937 .sum();
938
939 if total_rows == 0 {
940 return Ok(None);
941 }
942
943 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
944
945 let mut ts_vector: Box<dyn MutableVector> = metadata
946 .time_index_column()
947 .column_schema
948 .data_type
949 .create_mutable_vector(total_rows);
950 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
951 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
952
953 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
954 .field_columns()
955 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
956 .collect();
957
958 let mut pk_buffer = vec![];
959 for m in mutations {
960 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
961 continue;
962 };
963
964 for row in key_values.iter() {
965 pk_buffer.clear();
966 pk_encoder
967 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
968 .context(EncodeSnafu)?;
969 pk_builder.append_value(pk_buffer.as_bytes());
970 ts_vector.push_value_ref(&row.timestamp());
971 sequence_builder.append_value(row.sequence());
972 op_type_builder.append_value(row.op_type() as u8);
973 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
974 builder.push_value_ref(&field);
975 }
976 }
977 }
978
979 let arrow_schema = to_sst_arrow_schema(metadata);
980 let timestamp_unit = metadata
982 .time_index_column()
983 .column_schema
984 .data_type
985 .as_timestamp()
986 .unwrap()
987 .unit();
988 let sorter = ArraysSorter {
989 encoded_primary_keys: pk_builder.finish(),
990 timestamp_unit,
991 timestamp: ts_vector.to_vector().to_arrow_array(),
992 sequence: sequence_builder.finish(),
993 op_type: op_type_builder.finish(),
994 fields: field_builders
995 .iter_mut()
996 .map(|f| f.to_vector().to_arrow_array()),
997 dedup,
998 arrow_schema,
999 };
1000
1001 sorter.sort().map(Some)
1002}
1003
1004struct ArraysSorter<I> {
1005 encoded_primary_keys: BinaryArray,
1006 timestamp_unit: TimeUnit,
1007 timestamp: ArrayRef,
1008 sequence: UInt64Array,
1009 op_type: UInt8Array,
1010 fields: I,
1011 dedup: bool,
1012 arrow_schema: SchemaRef,
1013}
1014
1015impl<I> ArraysSorter<I>
1016where
1017 I: Iterator<Item = ArrayRef>,
1018{
1019 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
1021 debug_assert!(!self.timestamp.is_empty());
1022 debug_assert!(self.timestamp.len() == self.sequence.len());
1023 debug_assert!(self.timestamp.len() == self.op_type.len());
1024 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
1025
1026 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
1027 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
1028 let mut to_sort = self
1029 .encoded_primary_keys
1030 .iter()
1031 .zip(timestamp_iter)
1032 .zip(self.sequence.iter())
1033 .map(|((pk, timestamp), sequence)| {
1034 max_timestamp = max_timestamp.max(*timestamp);
1035 min_timestamp = min_timestamp.min(*timestamp);
1036 (pk, timestamp, sequence)
1037 })
1038 .enumerate()
1039 .collect::<Vec<_>>();
1040
1041 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
1042 l_pk.cmp(r_pk)
1043 .then(l_ts.cmp(r_ts))
1044 .then(l_seq.cmp(r_seq).reverse())
1045 });
1046
1047 if self.dedup {
1048 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
1050 l_pk == r_pk && l_ts == r_ts
1051 });
1052 }
1053
1054 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
1055
1056 let pk_dictionary = Arc::new(binary_array_to_dictionary(
1057 arrow::compute::take(
1059 &self.encoded_primary_keys,
1060 &indices,
1061 Some(TakeOptions {
1062 check_bounds: false,
1063 }),
1064 )
1065 .context(ComputeArrowSnafu)?
1066 .as_any()
1067 .downcast_ref::<BinaryArray>()
1068 .unwrap(),
1069 )?) as ArrayRef;
1070
1071 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
1072 for arr in self.fields {
1073 arrays.push(
1074 arrow::compute::take(
1075 &arr,
1076 &indices,
1077 Some(TakeOptions {
1078 check_bounds: false,
1079 }),
1080 )
1081 .context(ComputeArrowSnafu)?,
1082 );
1083 }
1084
1085 let timestamp = arrow::compute::take(
1086 &self.timestamp,
1087 &indices,
1088 Some(TakeOptions {
1089 check_bounds: false,
1090 }),
1091 )
1092 .context(ComputeArrowSnafu)?;
1093
1094 arrays.push(timestamp);
1095 arrays.push(pk_dictionary);
1096 arrays.push(
1097 arrow::compute::take(
1098 &self.sequence,
1099 &indices,
1100 Some(TakeOptions {
1101 check_bounds: false,
1102 }),
1103 )
1104 .context(ComputeArrowSnafu)?,
1105 );
1106
1107 arrays.push(
1108 arrow::compute::take(
1109 &self.op_type,
1110 &indices,
1111 Some(TakeOptions {
1112 check_bounds: false,
1113 }),
1114 )
1115 .context(ComputeArrowSnafu)?,
1116 );
1117
1118 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
1119 Ok((batch, min_timestamp, max_timestamp))
1120 }
1121}
1122
1123fn timestamp_array_to_iter(
1125 timestamp_unit: TimeUnit,
1126 timestamp: &ArrayRef,
1127) -> impl Iterator<Item = &i64> {
1128 match timestamp_unit {
1129 TimeUnit::Second => timestamp
1131 .as_any()
1132 .downcast_ref::<TimestampSecondArray>()
1133 .unwrap()
1134 .values()
1135 .iter(),
1136 TimeUnit::Millisecond => timestamp
1137 .as_any()
1138 .downcast_ref::<TimestampMillisecondArray>()
1139 .unwrap()
1140 .values()
1141 .iter(),
1142 TimeUnit::Microsecond => timestamp
1143 .as_any()
1144 .downcast_ref::<TimestampMicrosecondArray>()
1145 .unwrap()
1146 .values()
1147 .iter(),
1148 TimeUnit::Nanosecond => timestamp
1149 .as_any()
1150 .downcast_ref::<TimestampNanosecondArray>()
1151 .unwrap()
1152 .values()
1153 .iter(),
1154 }
1155}
1156
1157fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1159 if input.is_empty() {
1160 return Ok(DictionaryArray::new(
1161 UInt32Array::from(Vec::<u32>::new()),
1162 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1163 ));
1164 }
1165 let mut keys = Vec::with_capacity(16);
1166 let mut values = BinaryBuilder::new();
1167 let mut prev: usize = 0;
1168 keys.push(prev as u32);
1169 values.append_value(input.value(prev));
1170
1171 for current_bytes in input.iter().skip(1) {
1172 let current_bytes = current_bytes.unwrap();
1174 let prev_bytes = input.value(prev);
1175 if current_bytes != prev_bytes {
1176 values.append_value(current_bytes);
1177 prev += 1;
1178 }
1179 keys.push(prev as u32);
1180 }
1181
1182 Ok(DictionaryArray::new(
1183 UInt32Array::from(keys),
1184 Arc::new(values.finish()) as ArrayRef,
1185 ))
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use std::collections::VecDeque;
1191
1192 use api::v1::{Row, WriteHint};
1193 use datafusion_common::ScalarValue;
1194 use datatypes::arrow::array::Float64Array;
1195 use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1196 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1197 use store_api::storage::consts::ReservedColumnId;
1198
1199 use super::*;
1200 use crate::memtable::bulk::context::BulkIterContext;
1201 use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1202 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1203 use crate::test_util::memtable_util::{
1204 build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1205 };
1206
1207 fn check_binary_array_to_dictionary(
1208 input: &[&[u8]],
1209 expected_keys: &[u32],
1210 expected_values: &[&[u8]],
1211 ) {
1212 let input = BinaryArray::from_iter_values(input.iter());
1213 let array = binary_array_to_dictionary(&input).unwrap();
1214 assert_eq!(
1215 &expected_keys,
1216 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1217 );
1218 assert_eq!(
1219 expected_values,
1220 &array
1221 .values()
1222 .as_any()
1223 .downcast_ref::<BinaryArray>()
1224 .unwrap()
1225 .iter()
1226 .map(|v| v.unwrap())
1227 .collect::<Vec<_>>()
1228 );
1229 }
1230
1231 #[test]
1232 fn test_binary_array_to_dictionary() {
1233 check_binary_array_to_dictionary(&[], &[], &[]);
1234
1235 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1236
1237 check_binary_array_to_dictionary(
1238 &["a".as_bytes(), "a".as_bytes()],
1239 &[0, 0],
1240 &["a".as_bytes()],
1241 );
1242
1243 check_binary_array_to_dictionary(
1244 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1245 &[0, 0, 1],
1246 &["a".as_bytes(), "b".as_bytes()],
1247 );
1248
1249 check_binary_array_to_dictionary(
1250 &[
1251 "a".as_bytes(),
1252 "a".as_bytes(),
1253 "b".as_bytes(),
1254 "c".as_bytes(),
1255 ],
1256 &[0, 0, 1, 2],
1257 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1258 );
1259 }
1260
1261 struct MutationInput<'a> {
1262 k0: &'a str,
1263 k1: u32,
1264 timestamps: &'a [i64],
1265 v1: &'a [Option<f64>],
1266 sequence: u64,
1267 }
1268
1269 #[derive(Debug, PartialOrd, PartialEq)]
1270 struct BatchOutput<'a> {
1271 pk_values: &'a [Value],
1272 timestamps: &'a [i64],
1273 v1: &'a [Option<f64>],
1274 }
1275
1276 fn check_mutations_to_record_batches(
1277 input: &[MutationInput],
1278 expected: &[BatchOutput],
1279 expected_timestamp: (i64, i64),
1280 dedup: bool,
1281 ) {
1282 let metadata = metadata_for_test();
1283 let mutations = input
1284 .iter()
1285 .map(|m| {
1286 build_key_values_with_ts_seq_values(
1287 &metadata,
1288 m.k0.to_string(),
1289 m.k1,
1290 m.timestamps.iter().copied(),
1291 m.v1.iter().copied(),
1292 m.sequence,
1293 )
1294 .mutation
1295 })
1296 .collect::<Vec<_>>();
1297 let total_rows: usize = mutations
1298 .iter()
1299 .flat_map(|m| m.rows.iter())
1300 .map(|r| r.rows.len())
1301 .sum();
1302
1303 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1304
1305 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1306 .unwrap()
1307 .unwrap();
1308 let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1309 let mut batches = VecDeque::new();
1310 read_format
1311 .convert_record_batch(&batch, None, &mut batches)
1312 .unwrap();
1313 if !dedup {
1314 assert_eq!(
1315 total_rows,
1316 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1317 );
1318 }
1319 let batch_values = batches
1320 .into_iter()
1321 .map(|b| {
1322 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1323 let timestamps = b
1324 .timestamps()
1325 .as_any()
1326 .downcast_ref::<TimestampMillisecondVector>()
1327 .unwrap()
1328 .iter_data()
1329 .map(|v| v.unwrap().0.value())
1330 .collect::<Vec<_>>();
1331 let float_values = b.fields()[1]
1332 .data
1333 .as_any()
1334 .downcast_ref::<Float64Vector>()
1335 .unwrap()
1336 .iter_data()
1337 .collect::<Vec<_>>();
1338
1339 (pk_values, timestamps, float_values)
1340 })
1341 .collect::<Vec<_>>();
1342 assert_eq!(expected.len(), batch_values.len());
1343
1344 for idx in 0..expected.len() {
1345 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1346 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1347 assert_eq!(expected[idx].v1, &batch_values[idx].2);
1348 }
1349 }
1350
1351 #[test]
1352 fn test_mutations_to_record_batch() {
1353 check_mutations_to_record_batches(
1354 &[MutationInput {
1355 k0: "a",
1356 k1: 0,
1357 timestamps: &[0],
1358 v1: &[Some(0.1)],
1359 sequence: 0,
1360 }],
1361 &[BatchOutput {
1362 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1363 timestamps: &[0],
1364 v1: &[Some(0.1)],
1365 }],
1366 (0, 0),
1367 true,
1368 );
1369
1370 check_mutations_to_record_batches(
1371 &[
1372 MutationInput {
1373 k0: "a",
1374 k1: 0,
1375 timestamps: &[0],
1376 v1: &[Some(0.1)],
1377 sequence: 0,
1378 },
1379 MutationInput {
1380 k0: "b",
1381 k1: 0,
1382 timestamps: &[0],
1383 v1: &[Some(0.0)],
1384 sequence: 0,
1385 },
1386 MutationInput {
1387 k0: "a",
1388 k1: 0,
1389 timestamps: &[1],
1390 v1: &[Some(0.2)],
1391 sequence: 1,
1392 },
1393 MutationInput {
1394 k0: "a",
1395 k1: 1,
1396 timestamps: &[1],
1397 v1: &[Some(0.3)],
1398 sequence: 2,
1399 },
1400 ],
1401 &[
1402 BatchOutput {
1403 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1404 timestamps: &[0, 1],
1405 v1: &[Some(0.1), Some(0.2)],
1406 },
1407 BatchOutput {
1408 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1409 timestamps: &[1],
1410 v1: &[Some(0.3)],
1411 },
1412 BatchOutput {
1413 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1414 timestamps: &[0],
1415 v1: &[Some(0.0)],
1416 },
1417 ],
1418 (0, 1),
1419 true,
1420 );
1421
1422 check_mutations_to_record_batches(
1423 &[
1424 MutationInput {
1425 k0: "a",
1426 k1: 0,
1427 timestamps: &[0],
1428 v1: &[Some(0.1)],
1429 sequence: 0,
1430 },
1431 MutationInput {
1432 k0: "b",
1433 k1: 0,
1434 timestamps: &[0],
1435 v1: &[Some(0.0)],
1436 sequence: 0,
1437 },
1438 MutationInput {
1439 k0: "a",
1440 k1: 0,
1441 timestamps: &[0],
1442 v1: &[Some(0.2)],
1443 sequence: 1,
1444 },
1445 ],
1446 &[
1447 BatchOutput {
1448 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1449 timestamps: &[0],
1450 v1: &[Some(0.2)],
1451 },
1452 BatchOutput {
1453 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1454 timestamps: &[0],
1455 v1: &[Some(0.0)],
1456 },
1457 ],
1458 (0, 0),
1459 true,
1460 );
1461 check_mutations_to_record_batches(
1462 &[
1463 MutationInput {
1464 k0: "a",
1465 k1: 0,
1466 timestamps: &[0],
1467 v1: &[Some(0.1)],
1468 sequence: 0,
1469 },
1470 MutationInput {
1471 k0: "b",
1472 k1: 0,
1473 timestamps: &[0],
1474 v1: &[Some(0.0)],
1475 sequence: 0,
1476 },
1477 MutationInput {
1478 k0: "a",
1479 k1: 0,
1480 timestamps: &[0],
1481 v1: &[Some(0.2)],
1482 sequence: 1,
1483 },
1484 ],
1485 &[
1486 BatchOutput {
1487 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1488 timestamps: &[0, 0],
1489 v1: &[Some(0.2), Some(0.1)],
1490 },
1491 BatchOutput {
1492 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1493 timestamps: &[0],
1494 v1: &[Some(0.0)],
1495 },
1496 ],
1497 (0, 0),
1498 false,
1499 );
1500 }
1501
1502 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1503 let metadata = metadata_for_test();
1504 let kvs = input
1505 .iter()
1506 .map(|m| {
1507 build_key_values_with_ts_seq_values(
1508 &metadata,
1509 m.k0.to_string(),
1510 m.k1,
1511 m.timestamps.iter().copied(),
1512 m.v1.iter().copied(),
1513 m.sequence,
1514 )
1515 })
1516 .collect::<Vec<_>>();
1517 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1518 let primary_key_codec = build_primary_key_codec(&metadata);
1519 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1520 for kv in kvs {
1521 converter.append_key_values(&kv).unwrap();
1522 }
1523 let part = converter.convert().unwrap();
1524 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1525 encoder.encode_part(&part).unwrap().unwrap()
1526 }
1527
1528 #[test]
1529 fn test_write_and_read_part_projection() {
1530 let part = encode(&[
1531 MutationInput {
1532 k0: "a",
1533 k1: 0,
1534 timestamps: &[1],
1535 v1: &[Some(0.1)],
1536 sequence: 0,
1537 },
1538 MutationInput {
1539 k0: "b",
1540 k1: 0,
1541 timestamps: &[1],
1542 v1: &[Some(0.0)],
1543 sequence: 0,
1544 },
1545 MutationInput {
1546 k0: "a",
1547 k1: 0,
1548 timestamps: &[2],
1549 v1: &[Some(0.2)],
1550 sequence: 1,
1551 },
1552 ]);
1553
1554 let projection = &[4u32];
1555 let mut reader = part
1556 .read(
1557 Arc::new(
1558 BulkIterContext::new(
1559 part.metadata.region_metadata.clone(),
1560 Some(projection.as_slice()),
1561 None,
1562 false,
1563 )
1564 .unwrap(),
1565 ),
1566 None,
1567 None,
1568 )
1569 .unwrap()
1570 .expect("expect at least one row group");
1571
1572 let mut total_rows_read = 0;
1573 let mut field: Vec<f64> = vec![];
1574 for res in reader {
1575 let batch = res.unwrap();
1576 assert_eq!(5, batch.num_columns());
1577 field.extend_from_slice(
1578 batch
1579 .column(0)
1580 .as_any()
1581 .downcast_ref::<Float64Array>()
1582 .unwrap()
1583 .values(),
1584 );
1585 total_rows_read += batch.num_rows();
1586 }
1587 assert_eq!(3, total_rows_read);
1588 assert_eq!(vec![0.1, 0.2, 0.0], field);
1589 }
1590
1591 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1592 let metadata = metadata_for_test();
1593 let kvs = key_values
1594 .into_iter()
1595 .map(|(k0, k1, (start, end), sequence)| {
1596 let ts = (start..end);
1597 let v1 = (start..end).map(|_| None);
1598 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1599 })
1600 .collect::<Vec<_>>();
1601 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1602 let primary_key_codec = build_primary_key_codec(&metadata);
1603 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1604 for kv in kvs {
1605 converter.append_key_values(&kv).unwrap();
1606 }
1607 let part = converter.convert().unwrap();
1608 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1609 encoder.encode_part(&part).unwrap().unwrap()
1610 }
1611
1612 fn check_prune_row_group(
1613 part: &EncodedBulkPart,
1614 predicate: Option<Predicate>,
1615 expected_rows: usize,
1616 ) {
1617 let context = Arc::new(
1618 BulkIterContext::new(
1619 part.metadata.region_metadata.clone(),
1620 None,
1621 predicate,
1622 false,
1623 )
1624 .unwrap(),
1625 );
1626 let mut reader = part
1627 .read(context, None, None)
1628 .unwrap()
1629 .expect("expect at least one row group");
1630 let mut total_rows_read = 0;
1631 for res in reader {
1632 let batch = res.unwrap();
1633 total_rows_read += batch.num_rows();
1634 }
1635 assert_eq!(expected_rows, total_rows_read);
1637 }
1638
1639 #[test]
1640 fn test_prune_row_groups() {
1641 let part = prepare(vec![
1642 ("a", 0, (0, 40), 1),
1643 ("a", 1, (0, 60), 1),
1644 ("b", 0, (0, 100), 2),
1645 ("b", 1, (100, 180), 3),
1646 ("b", 1, (180, 210), 4),
1647 ]);
1648
1649 let context = Arc::new(
1650 BulkIterContext::new(
1651 part.metadata.region_metadata.clone(),
1652 None,
1653 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1654 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1655 )])),
1656 false,
1657 )
1658 .unwrap(),
1659 );
1660 assert!(part.read(context, None, None).unwrap().is_none());
1661
1662 check_prune_row_group(&part, None, 310);
1663
1664 check_prune_row_group(
1665 &part,
1666 Some(Predicate::new(vec![
1667 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1668 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1669 ])),
1670 40,
1671 );
1672
1673 check_prune_row_group(
1674 &part,
1675 Some(Predicate::new(vec![
1676 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1677 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1678 ])),
1679 60,
1680 );
1681
1682 check_prune_row_group(
1683 &part,
1684 Some(Predicate::new(vec![
1685 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1686 ])),
1687 100,
1688 );
1689
1690 check_prune_row_group(
1691 &part,
1692 Some(Predicate::new(vec![
1693 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1694 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1695 ])),
1696 100,
1697 );
1698
1699 check_prune_row_group(
1701 &part,
1702 Some(Predicate::new(vec![
1703 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1704 ])),
1705 1,
1706 );
1707 }
1708
1709 #[test]
1710 fn test_bulk_part_converter_append_and_convert() {
1711 let metadata = metadata_for_test();
1712 let capacity = 100;
1713 let primary_key_codec = build_primary_key_codec(&metadata);
1714 let schema = to_flat_sst_arrow_schema(
1715 &metadata,
1716 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1717 );
1718
1719 let mut converter =
1720 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1721
1722 let key_values1 = build_key_values_with_ts_seq_values(
1723 &metadata,
1724 "key1".to_string(),
1725 1u32,
1726 vec![1000, 2000].into_iter(),
1727 vec![Some(1.0), Some(2.0)].into_iter(),
1728 1,
1729 );
1730
1731 let key_values2 = build_key_values_with_ts_seq_values(
1732 &metadata,
1733 "key2".to_string(),
1734 2u32,
1735 vec![1500].into_iter(),
1736 vec![Some(3.0)].into_iter(),
1737 2,
1738 );
1739
1740 converter.append_key_values(&key_values1).unwrap();
1741 converter.append_key_values(&key_values2).unwrap();
1742
1743 let bulk_part = converter.convert().unwrap();
1744
1745 assert_eq!(bulk_part.num_rows(), 3);
1746 assert_eq!(bulk_part.min_timestamp, 1000);
1747 assert_eq!(bulk_part.max_timestamp, 2000);
1748 assert_eq!(bulk_part.sequence, 2);
1749 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1750
1751 let schema = bulk_part.batch.schema();
1754 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1755 assert_eq!(
1756 field_names,
1757 vec![
1758 "k0",
1759 "k1",
1760 "v0",
1761 "v1",
1762 "ts",
1763 "__primary_key",
1764 "__sequence",
1765 "__op_type"
1766 ]
1767 );
1768 }
1769
1770 #[test]
1771 fn test_bulk_part_converter_sorting() {
1772 let metadata = metadata_for_test();
1773 let capacity = 100;
1774 let primary_key_codec = build_primary_key_codec(&metadata);
1775 let schema = to_flat_sst_arrow_schema(
1776 &metadata,
1777 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1778 );
1779
1780 let mut converter =
1781 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1782
1783 let key_values1 = build_key_values_with_ts_seq_values(
1784 &metadata,
1785 "z_key".to_string(),
1786 3u32,
1787 vec![3000].into_iter(),
1788 vec![Some(3.0)].into_iter(),
1789 3,
1790 );
1791
1792 let key_values2 = build_key_values_with_ts_seq_values(
1793 &metadata,
1794 "a_key".to_string(),
1795 1u32,
1796 vec![1000].into_iter(),
1797 vec![Some(1.0)].into_iter(),
1798 1,
1799 );
1800
1801 let key_values3 = build_key_values_with_ts_seq_values(
1802 &metadata,
1803 "m_key".to_string(),
1804 2u32,
1805 vec![2000].into_iter(),
1806 vec![Some(2.0)].into_iter(),
1807 2,
1808 );
1809
1810 converter.append_key_values(&key_values1).unwrap();
1811 converter.append_key_values(&key_values2).unwrap();
1812 converter.append_key_values(&key_values3).unwrap();
1813
1814 let bulk_part = converter.convert().unwrap();
1815
1816 assert_eq!(bulk_part.num_rows(), 3);
1817
1818 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1819 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1820
1821 let ts_array = ts_column
1822 .as_any()
1823 .downcast_ref::<TimestampMillisecondArray>()
1824 .unwrap();
1825 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1826
1827 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1828 assert_eq!(seq_array.values(), &[1, 2, 3]);
1829
1830 let schema = bulk_part.batch.schema();
1832 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1833 assert_eq!(
1834 field_names,
1835 vec![
1836 "k0",
1837 "k1",
1838 "v0",
1839 "v1",
1840 "ts",
1841 "__primary_key",
1842 "__sequence",
1843 "__op_type"
1844 ]
1845 );
1846 }
1847
1848 #[test]
1849 fn test_bulk_part_converter_empty() {
1850 let metadata = metadata_for_test();
1851 let capacity = 10;
1852 let primary_key_codec = build_primary_key_codec(&metadata);
1853 let schema = to_flat_sst_arrow_schema(
1854 &metadata,
1855 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1856 );
1857
1858 let converter =
1859 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1860
1861 let bulk_part = converter.convert().unwrap();
1862
1863 assert_eq!(bulk_part.num_rows(), 0);
1864 assert_eq!(bulk_part.min_timestamp, i64::MAX);
1865 assert_eq!(bulk_part.max_timestamp, i64::MIN);
1866 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1867
1868 let schema = bulk_part.batch.schema();
1870 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1871 assert_eq!(
1872 field_names,
1873 vec![
1874 "k0",
1875 "k1",
1876 "v0",
1877 "v1",
1878 "ts",
1879 "__primary_key",
1880 "__sequence",
1881 "__op_type"
1882 ]
1883 );
1884 }
1885
1886 #[test]
1887 fn test_bulk_part_converter_without_primary_key_columns() {
1888 let metadata = metadata_for_test();
1889 let primary_key_codec = build_primary_key_codec(&metadata);
1890 let schema = to_flat_sst_arrow_schema(
1891 &metadata,
1892 &FlatSchemaOptions {
1893 raw_pk_columns: false,
1894 string_pk_use_dict: true,
1895 },
1896 );
1897
1898 let capacity = 100;
1899 let mut converter =
1900 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1901
1902 let key_values1 = build_key_values_with_ts_seq_values(
1903 &metadata,
1904 "key1".to_string(),
1905 1u32,
1906 vec![1000, 2000].into_iter(),
1907 vec![Some(1.0), Some(2.0)].into_iter(),
1908 1,
1909 );
1910
1911 let key_values2 = build_key_values_with_ts_seq_values(
1912 &metadata,
1913 "key2".to_string(),
1914 2u32,
1915 vec![1500].into_iter(),
1916 vec![Some(3.0)].into_iter(),
1917 2,
1918 );
1919
1920 converter.append_key_values(&key_values1).unwrap();
1921 converter.append_key_values(&key_values2).unwrap();
1922
1923 let bulk_part = converter.convert().unwrap();
1924
1925 assert_eq!(bulk_part.num_rows(), 3);
1926 assert_eq!(bulk_part.min_timestamp, 1000);
1927 assert_eq!(bulk_part.max_timestamp, 2000);
1928 assert_eq!(bulk_part.sequence, 2);
1929 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1930
1931 let schema = bulk_part.batch.schema();
1933 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1934 assert_eq!(
1935 field_names,
1936 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1937 );
1938 }
1939
1940 #[allow(clippy::too_many_arguments)]
1941 fn build_key_values_with_sparse_encoding(
1942 metadata: &RegionMetadataRef,
1943 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1944 table_id: u32,
1945 tsid: u64,
1946 k0: String,
1947 k1: String,
1948 timestamps: impl Iterator<Item = i64>,
1949 values: impl Iterator<Item = Option<f64>>,
1950 sequence: SequenceNumber,
1951 ) -> KeyValues {
1952 let pk_values = vec![
1954 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1955 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1956 (0, Value::String(k0.clone().into())),
1957 (1, Value::String(k1.clone().into())),
1958 ];
1959 let mut encoded_key = Vec::new();
1960 primary_key_codec
1961 .encode_values(&pk_values, &mut encoded_key)
1962 .unwrap();
1963 assert!(!encoded_key.is_empty());
1964
1965 let column_schema = vec![
1967 api::v1::ColumnSchema {
1968 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1969 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1970 ConcreteDataType::binary_datatype(),
1971 )
1972 .unwrap()
1973 .datatype() as i32,
1974 semantic_type: api::v1::SemanticType::Tag as i32,
1975 ..Default::default()
1976 },
1977 api::v1::ColumnSchema {
1978 column_name: "ts".to_string(),
1979 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1980 ConcreteDataType::timestamp_millisecond_datatype(),
1981 )
1982 .unwrap()
1983 .datatype() as i32,
1984 semantic_type: api::v1::SemanticType::Timestamp as i32,
1985 ..Default::default()
1986 },
1987 api::v1::ColumnSchema {
1988 column_name: "v0".to_string(),
1989 datatype: api::helper::ColumnDataTypeWrapper::try_from(
1990 ConcreteDataType::int64_datatype(),
1991 )
1992 .unwrap()
1993 .datatype() as i32,
1994 semantic_type: api::v1::SemanticType::Field as i32,
1995 ..Default::default()
1996 },
1997 api::v1::ColumnSchema {
1998 column_name: "v1".to_string(),
1999 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2000 ConcreteDataType::float64_datatype(),
2001 )
2002 .unwrap()
2003 .datatype() as i32,
2004 semantic_type: api::v1::SemanticType::Field as i32,
2005 ..Default::default()
2006 },
2007 ];
2008
2009 let rows = timestamps
2010 .zip(values)
2011 .map(|(ts, v)| Row {
2012 values: vec![
2013 api::v1::Value {
2014 value_data: Some(api::v1::value::ValueData::BinaryValue(
2015 encoded_key.clone(),
2016 )),
2017 },
2018 api::v1::Value {
2019 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2020 },
2021 api::v1::Value {
2022 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2023 },
2024 api::v1::Value {
2025 value_data: v.map(api::v1::value::ValueData::F64Value),
2026 },
2027 ],
2028 })
2029 .collect();
2030
2031 let mutation = api::v1::Mutation {
2032 op_type: 1,
2033 sequence,
2034 rows: Some(api::v1::Rows {
2035 schema: column_schema,
2036 rows,
2037 }),
2038 write_hint: Some(WriteHint {
2039 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2040 }),
2041 };
2042 KeyValues::new(metadata.as_ref(), mutation).unwrap()
2043 }
2044
2045 #[test]
2046 fn test_bulk_part_converter_sparse_primary_key_encoding() {
2047 use api::v1::SemanticType;
2048 use datatypes::schema::ColumnSchema;
2049 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2050 use store_api::storage::RegionId;
2051
2052 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2053 builder
2054 .push_column_metadata(ColumnMetadata {
2055 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2056 semantic_type: SemanticType::Tag,
2057 column_id: 0,
2058 })
2059 .push_column_metadata(ColumnMetadata {
2060 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2061 semantic_type: SemanticType::Tag,
2062 column_id: 1,
2063 })
2064 .push_column_metadata(ColumnMetadata {
2065 column_schema: ColumnSchema::new(
2066 "ts",
2067 ConcreteDataType::timestamp_millisecond_datatype(),
2068 false,
2069 ),
2070 semantic_type: SemanticType::Timestamp,
2071 column_id: 2,
2072 })
2073 .push_column_metadata(ColumnMetadata {
2074 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2075 semantic_type: SemanticType::Field,
2076 column_id: 3,
2077 })
2078 .push_column_metadata(ColumnMetadata {
2079 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2080 semantic_type: SemanticType::Field,
2081 column_id: 4,
2082 })
2083 .primary_key(vec![0, 1])
2084 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2085 let metadata = Arc::new(builder.build().unwrap());
2086
2087 let primary_key_codec = build_primary_key_codec(&metadata);
2088 let schema = to_flat_sst_arrow_schema(
2089 &metadata,
2090 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2091 );
2092
2093 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2094 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2095
2096 let capacity = 100;
2097 let mut converter =
2098 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2099
2100 let key_values1 = build_key_values_with_sparse_encoding(
2101 &metadata,
2102 &primary_key_codec,
2103 2048u32, 100u64, "key11".to_string(),
2106 "key21".to_string(),
2107 vec![1000, 2000].into_iter(),
2108 vec![Some(1.0), Some(2.0)].into_iter(),
2109 1,
2110 );
2111
2112 let key_values2 = build_key_values_with_sparse_encoding(
2113 &metadata,
2114 &primary_key_codec,
2115 4096u32, 200u64, "key12".to_string(),
2118 "key22".to_string(),
2119 vec![1500].into_iter(),
2120 vec![Some(3.0)].into_iter(),
2121 2,
2122 );
2123
2124 converter.append_key_values(&key_values1).unwrap();
2125 converter.append_key_values(&key_values2).unwrap();
2126
2127 let bulk_part = converter.convert().unwrap();
2128
2129 assert_eq!(bulk_part.num_rows(), 3);
2130 assert_eq!(bulk_part.min_timestamp, 1000);
2131 assert_eq!(bulk_part.max_timestamp, 2000);
2132 assert_eq!(bulk_part.sequence, 2);
2133 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2134
2135 let schema = bulk_part.batch.schema();
2139 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2140 assert_eq!(
2141 field_names,
2142 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2143 );
2144
2145 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2147 let dict_array = primary_key_column
2148 .as_any()
2149 .downcast_ref::<DictionaryArray<UInt32Type>>()
2150 .unwrap();
2151
2152 assert!(!dict_array.is_empty());
2154 assert_eq!(dict_array.len(), 3); let values = dict_array
2158 .values()
2159 .as_any()
2160 .downcast_ref::<BinaryArray>()
2161 .unwrap();
2162 for i in 0..values.len() {
2163 assert!(
2164 !values.value(i).is_empty(),
2165 "Encoded primary key should not be empty"
2166 );
2167 }
2168 }
2169}