mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber, SequenceRange};
57use table::predicate::Predicate;
58
59use crate::error::{
60    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61    EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::memtable::bulk::context::BulkIterContextRef;
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
71use crate::sst::parquet::helper::parse_parquet_metadata;
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77/// A raw bulk part in the memtable.
78#[derive(Clone)]
79pub struct BulkPart {
80    pub batch: RecordBatch,
81    pub max_timestamp: i64,
82    pub min_timestamp: i64,
83    pub sequence: u64,
84    pub timestamp_index: usize,
85    pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89    type Error = error::Error;
90
91    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92        match value.body.expect("Entry payload should be present") {
93            Body::ArrowIpc(ipc) => {
94                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95                    .context(error::ConvertBulkWalEntrySnafu)?;
96                let batch = decoder
97                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                Ok(Self {
100                    batch,
101                    max_timestamp: value.max_ts,
102                    min_timestamp: value.min_ts,
103                    sequence: value.sequence,
104                    timestamp_index: value.timestamp_index as usize,
105                    raw_data: Some(ipc),
106                })
107            }
108        }
109    }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113    type Error = error::Error;
114
115    fn try_from(value: &BulkPart) -> Result<Self> {
116        if let Some(ipc) = &value.raw_data {
117            Ok(BulkWalEntry {
118                sequence: value.sequence,
119                max_ts: value.max_timestamp,
120                min_ts: value.min_timestamp,
121                timestamp_index: value.timestamp_index as u32,
122                body: Some(Body::ArrowIpc(ipc.clone())),
123            })
124        } else {
125            let mut encoder = FlightEncoder::default();
126            let schema_bytes = encoder
127                .encode_schema(value.batch.schema().as_ref())
128                .data_header;
129            let [rb_data] = encoder
130                .encode(FlightMessage::RecordBatch(value.batch.clone()))
131                .try_into()
132                .map_err(|_| {
133                    error::UnsupportedOperationSnafu {
134                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135                    }
136                    .build()
137                })?;
138            Ok(BulkWalEntry {
139                sequence: value.sequence,
140                max_ts: value.max_timestamp,
141                min_ts: value.min_timestamp,
142                timestamp_index: value.timestamp_index as u32,
143                body: Some(Body::ArrowIpc(ArrowIpc {
144                    schema: schema_bytes,
145                    data_header: rb_data.data_header,
146                    payload: rb_data.data_body,
147                })),
148            })
149        }
150    }
151}
152
153impl BulkPart {
154    pub(crate) fn estimated_size(&self) -> usize {
155        record_batch_estimated_size(&self.batch)
156    }
157
158    /// Returns the estimated series count in this BulkPart.
159    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
160    pub fn estimated_series_count(&self) -> usize {
161        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162        let pk_column = self.batch.column(pk_column_idx);
163        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164            dict_array.values().len()
165        } else {
166            0
167        }
168    }
169
170    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
171    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
172        let vectors = region_metadata
173            .schema
174            .column_schemas()
175            .iter()
176            .map(|col| match self.batch.column_by_name(&col.name) {
177                None => Ok(None),
178                Some(col) => Helper::try_into_vector(col).map(Some),
179            })
180            .collect::<datatypes::error::Result<Vec<_>>>()
181            .context(error::ComputeVectorSnafu)?;
182
183        let rows = (0..self.num_rows())
184            .map(|row_idx| {
185                let values = (0..self.batch.num_columns())
186                    .map(|col_idx| {
187                        if let Some(v) = &vectors[col_idx] {
188                            value_to_grpc_value(v.get(row_idx))
189                        } else {
190                            api::v1::Value { value_data: None }
191                        }
192                    })
193                    .collect::<Vec<_>>();
194                api::v1::Row { values }
195            })
196            .collect::<Vec<_>>();
197
198        let schema = region_metadata
199            .column_metadatas
200            .iter()
201            .map(|c| {
202                let data_type_wrapper =
203                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
204                Ok(api::v1::ColumnSchema {
205                    column_name: c.column_schema.name.clone(),
206                    datatype: data_type_wrapper.datatype() as i32,
207                    semantic_type: c.semantic_type as i32,
208                    ..Default::default()
209                })
210            })
211            .collect::<api::error::Result<Vec<_>>>()
212            .context(error::ConvertColumnDataTypeSnafu {
213                reason: "failed to convert region metadata to column schema",
214            })?;
215
216        let rows = api::v1::Rows { schema, rows };
217
218        Ok(Mutation {
219            op_type: OpType::Put as i32,
220            sequence: self.sequence,
221            rows: Some(rows),
222            write_hint: None,
223        })
224    }
225
226    pub fn timestamps(&self) -> &ArrayRef {
227        self.batch.column(self.timestamp_index)
228    }
229
230    pub fn num_rows(&self) -> usize {
231        self.batch.num_rows()
232    }
233}
234
235/// More accurate estimation of the size of a record batch.
236pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
237    batch
238        .columns()
239        .iter()
240        // If can not get slice memory size, assume 0 here.
241        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
242        .sum()
243}
244
245/// Primary key column builder for handling strings specially.
246enum PrimaryKeyColumnBuilder {
247    /// String dictionary builder for string types.
248    StringDict(StringDictionaryBuilder<UInt32Type>),
249    /// Generic mutable vector for other types.
250    Vector(Box<dyn MutableVector>),
251}
252
253impl PrimaryKeyColumnBuilder {
254    /// Appends a value to the builder.
255    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
256        match self {
257            PrimaryKeyColumnBuilder::StringDict(builder) => {
258                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
259                    // We know the value is a string.
260                    builder.append_value(s);
261                } else {
262                    builder.append_null();
263                }
264            }
265            PrimaryKeyColumnBuilder::Vector(builder) => {
266                builder.push_value_ref(&value);
267            }
268        }
269        Ok(())
270    }
271
272    /// Converts the builder to an ArrayRef.
273    fn into_arrow_array(self) -> ArrayRef {
274        match self {
275            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
276            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
277        }
278    }
279}
280
281/// Converter that converts structs into [BulkPart].
282pub struct BulkPartConverter {
283    /// Region metadata.
284    region_metadata: RegionMetadataRef,
285    /// Schema of the converted batch.
286    schema: SchemaRef,
287    /// Primary key codec for encoding keys
288    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
289    /// Buffer for encoding primary key.
290    key_buf: Vec<u8>,
291    /// Primary key array builder.
292    key_array_builder: PrimaryKeyArrayBuilder,
293    /// Builders for non-primary key columns.
294    value_builder: ValueBuilder,
295    /// Builders for individual primary key columns.
296    /// The order of builders is the same as the order of primary key columns in the region metadata.
297    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
298
299    /// Max timestamp value.
300    max_ts: i64,
301    /// Min timestamp value.
302    min_ts: i64,
303    /// Max sequence number.
304    max_sequence: SequenceNumber,
305}
306
307impl BulkPartConverter {
308    /// Creates a new converter.
309    ///
310    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
311    /// stores primary key columns in arrays additionally.
312    pub fn new(
313        region_metadata: &RegionMetadataRef,
314        schema: SchemaRef,
315        capacity: usize,
316        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
317        store_primary_key_columns: bool,
318    ) -> Self {
319        debug_assert_eq!(
320            region_metadata.primary_key_encoding,
321            primary_key_codec.encoding()
322        );
323
324        let primary_key_column_builders = if store_primary_key_columns
325            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
326        {
327            new_primary_key_column_builders(region_metadata, capacity)
328        } else {
329            Vec::new()
330        };
331
332        Self {
333            region_metadata: region_metadata.clone(),
334            schema,
335            primary_key_codec,
336            key_buf: Vec::new(),
337            key_array_builder: PrimaryKeyArrayBuilder::new(),
338            value_builder: ValueBuilder::new(region_metadata, capacity),
339            primary_key_column_builders,
340            min_ts: i64::MAX,
341            max_ts: i64::MIN,
342            max_sequence: SequenceNumber::MIN,
343        }
344    }
345
346    /// Appends a [KeyValues] into the converter.
347    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
348        for kv in key_values.iter() {
349            self.append_key_value(&kv)?;
350        }
351
352        Ok(())
353    }
354
355    /// Appends a [KeyValue] to builders.
356    ///
357    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
358    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
359        // Handles primary key based on encoding type
360        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
361            // For sparse encoding, the primary key is already encoded in the KeyValue
362            // Gets the first (and only) primary key value which contains the encoded key
363            let mut primary_keys = kv.primary_keys();
364            if let Some(encoded) = primary_keys
365                .next()
366                .context(ColumnNotFoundSnafu {
367                    column: PRIMARY_KEY_COLUMN_NAME,
368                })?
369                .try_into_binary()
370                .context(DataTypeMismatchSnafu)?
371            {
372                self.key_array_builder
373                    .append(encoded)
374                    .context(ComputeArrowSnafu)?;
375            } else {
376                self.key_array_builder
377                    .append("")
378                    .context(ComputeArrowSnafu)?;
379            }
380        } else {
381            // For dense encoding, we need to encode the primary key columns
382            self.key_buf.clear();
383            self.primary_key_codec
384                .encode_key_value(kv, &mut self.key_buf)
385                .context(EncodeSnafu)?;
386            self.key_array_builder
387                .append(&self.key_buf)
388                .context(ComputeArrowSnafu)?;
389        };
390
391        // If storing primary key columns, append values to individual builders
392        if !self.primary_key_column_builders.is_empty() {
393            for (builder, pk_value) in self
394                .primary_key_column_builders
395                .iter_mut()
396                .zip(kv.primary_keys())
397            {
398                builder.push_value_ref(pk_value)?;
399            }
400        }
401
402        // Pushes other columns.
403        self.value_builder.push(
404            kv.timestamp(),
405            kv.sequence(),
406            kv.op_type() as u8,
407            kv.fields(),
408        );
409
410        // Updates statistics
411        // Safety: timestamp of kv must be both present and a valid timestamp value.
412        let ts = kv
413            .timestamp()
414            .try_into_timestamp()
415            .unwrap()
416            .unwrap()
417            .value();
418        self.min_ts = self.min_ts.min(ts);
419        self.max_ts = self.max_ts.max(ts);
420        self.max_sequence = self.max_sequence.max(kv.sequence());
421
422        Ok(())
423    }
424
425    /// Converts buffered content into a [BulkPart].
426    ///
427    /// It sorts the record batch by (primary key, timestamp, sequence desc).
428    pub fn convert(mut self) -> Result<BulkPart> {
429        let values = Values::from(self.value_builder);
430        let mut columns =
431            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
432
433        // Build primary key column arrays if enabled.
434        for builder in self.primary_key_column_builders {
435            columns.push(builder.into_arrow_array());
436        }
437        // Then fields columns.
438        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
439        // Time index.
440        let timestamp_index = columns.len();
441        columns.push(values.timestamp.to_arrow_array());
442        // Primary key.
443        let pk_array = self.key_array_builder.finish();
444        columns.push(Arc::new(pk_array));
445        // Sequence and op type.
446        columns.push(values.sequence.to_arrow_array());
447        columns.push(values.op_type.to_arrow_array());
448
449        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
450        // Sorts the record batch.
451        let batch = sort_primary_key_record_batch(&batch)?;
452
453        Ok(BulkPart {
454            batch,
455            max_timestamp: self.max_ts,
456            min_timestamp: self.min_ts,
457            sequence: self.max_sequence,
458            timestamp_index,
459            raw_data: None,
460        })
461    }
462}
463
464fn new_primary_key_column_builders(
465    metadata: &RegionMetadata,
466    capacity: usize,
467) -> Vec<PrimaryKeyColumnBuilder> {
468    metadata
469        .primary_key_columns()
470        .map(|col| {
471            if col.column_schema.data_type.is_string() {
472                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
473                    capacity,
474                    INIT_DICT_VALUE_CAPACITY,
475                    capacity,
476                ))
477            } else {
478                PrimaryKeyColumnBuilder::Vector(
479                    col.column_schema.data_type.create_mutable_vector(capacity),
480                )
481            }
482        })
483        .collect()
484}
485
486/// Sorts the record batch with primary key format.
487fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
488    let total_columns = batch.num_columns();
489    let sort_columns = vec![
490        // Primary key column (ascending)
491        SortColumn {
492            values: batch.column(total_columns - 3).clone(),
493            options: Some(SortOptions {
494                descending: false,
495                nulls_first: true,
496            }),
497        },
498        // Time index column (ascending)
499        SortColumn {
500            values: batch.column(total_columns - 4).clone(),
501            options: Some(SortOptions {
502                descending: false,
503                nulls_first: true,
504            }),
505        },
506        // Sequence column (descending)
507        SortColumn {
508            values: batch.column(total_columns - 2).clone(),
509            options: Some(SortOptions {
510                descending: true,
511                nulls_first: true,
512            }),
513        },
514    ];
515
516    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
517        .context(ComputeArrowSnafu)?;
518
519    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
520}
521
522#[derive(Debug, Clone)]
523pub struct EncodedBulkPart {
524    data: Bytes,
525    metadata: BulkPartMeta,
526}
527
528impl EncodedBulkPart {
529    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
530        Self { data, metadata }
531    }
532
533    pub(crate) fn metadata(&self) -> &BulkPartMeta {
534        &self.metadata
535    }
536
537    /// Returns the size of the encoded data in bytes
538    pub(crate) fn size_bytes(&self) -> usize {
539        self.data.len()
540    }
541
542    /// Returns the encoded data.
543    pub(crate) fn data(&self) -> &Bytes {
544        &self.data
545    }
546
547    /// Converts this `EncodedBulkPart` to `SstInfo`.
548    ///
549    /// # Arguments
550    /// * `file_id` - The SST file ID to assign to this part
551    ///
552    /// # Returns
553    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
554    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
555        let unit = self.metadata.region_metadata.time_index_type().unit();
556        SstInfo {
557            file_id,
558            time_range: (
559                Timestamp::new(self.metadata.min_timestamp, unit),
560                Timestamp::new(self.metadata.max_timestamp, unit),
561            ),
562            file_size: self.data.len() as u64,
563            num_rows: self.metadata.num_rows,
564            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
565            file_metadata: Some(self.metadata.parquet_metadata.clone()),
566            index_metadata: IndexOutput::default(),
567            num_series: self.metadata.num_series,
568        }
569    }
570
571    pub(crate) fn read(
572        &self,
573        context: BulkIterContextRef,
574        sequence: Option<SequenceRange>,
575    ) -> Result<Option<BoxedRecordBatchIterator>> {
576        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
577        let skip_fields_for_pruning =
578            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
579
580        // use predicate to find row groups to read.
581        let row_groups_to_read =
582            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
583
584        if row_groups_to_read.is_empty() {
585            // All row groups are filtered.
586            return Ok(None);
587        }
588
589        let iter = EncodedBulkPartIter::try_new(
590            context,
591            row_groups_to_read,
592            self.metadata.parquet_metadata.clone(),
593            self.data.clone(),
594            sequence,
595        )?;
596        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
597    }
598
599    /// Computes whether to skip field columns based on PreFilterMode.
600    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
601        match pre_filter_mode {
602            PreFilterMode::All => false,
603            PreFilterMode::SkipFields => true,
604            PreFilterMode::SkipFieldsOnDelete => {
605                // Check if any row group contains delete op
606                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
607                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
608                })
609            }
610        }
611    }
612}
613
614#[derive(Debug, Clone)]
615pub struct BulkPartMeta {
616    /// Total rows in part.
617    pub num_rows: usize,
618    /// Max timestamp in part.
619    pub max_timestamp: i64,
620    /// Min timestamp in part.
621    pub min_timestamp: i64,
622    /// Part file metadata.
623    pub parquet_metadata: Arc<ParquetMetaData>,
624    /// Part region schema.
625    pub region_metadata: RegionMetadataRef,
626    /// Number of series.
627    pub num_series: u64,
628}
629
630/// Metrics for encoding a part.
631#[derive(Default, Debug)]
632pub struct BulkPartEncodeMetrics {
633    /// Cost of iterating over the data.
634    pub iter_cost: Duration,
635    /// Cost of writing the data.
636    pub write_cost: Duration,
637    /// Size of data before encoding.
638    pub raw_size: usize,
639    /// Size of data after encoding.
640    pub encoded_size: usize,
641    /// Number of rows in part.
642    pub num_rows: usize,
643}
644
645pub struct BulkPartEncoder {
646    metadata: RegionMetadataRef,
647    row_group_size: usize,
648    writer_props: Option<WriterProperties>,
649}
650
651impl BulkPartEncoder {
652    pub(crate) fn new(
653        metadata: RegionMetadataRef,
654        row_group_size: usize,
655    ) -> Result<BulkPartEncoder> {
656        // TODO(yingwen): Skip arrow schema if needed.
657        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
658        let key_value_meta =
659            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
660
661        // TODO(yingwen): Do we need compression?
662        let writer_props = Some(
663            WriterProperties::builder()
664                .set_key_value_metadata(Some(vec![key_value_meta]))
665                .set_write_batch_size(row_group_size)
666                .set_max_row_group_size(row_group_size)
667                .set_compression(Compression::ZSTD(ZstdLevel::default()))
668                .set_column_index_truncate_length(None)
669                .set_statistics_truncate_length(None)
670                .build(),
671        );
672
673        Ok(Self {
674            metadata,
675            row_group_size,
676            writer_props,
677        })
678    }
679}
680
681impl BulkPartEncoder {
682    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
683    pub fn encode_record_batch_iter(
684        &self,
685        iter: BoxedRecordBatchIterator,
686        arrow_schema: SchemaRef,
687        min_timestamp: i64,
688        max_timestamp: i64,
689        metrics: &mut BulkPartEncodeMetrics,
690    ) -> Result<Option<EncodedBulkPart>> {
691        let mut buf = Vec::with_capacity(4096);
692        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
693            .context(EncodeMemtableSnafu)?;
694        let mut total_rows = 0;
695        let mut series_estimator = SeriesEstimator::default();
696
697        // Process each batch from the iterator
698        let mut iter_start = Instant::now();
699        for batch_result in iter {
700            metrics.iter_cost += iter_start.elapsed();
701            let batch = batch_result?;
702            if batch.num_rows() == 0 {
703                continue;
704            }
705
706            series_estimator.update_flat(&batch);
707            metrics.raw_size += record_batch_estimated_size(&batch);
708            let write_start = Instant::now();
709            writer.write(&batch).context(EncodeMemtableSnafu)?;
710            metrics.write_cost += write_start.elapsed();
711            total_rows += batch.num_rows();
712            iter_start = Instant::now();
713        }
714        metrics.iter_cost += iter_start.elapsed();
715        iter_start = Instant::now();
716
717        if total_rows == 0 {
718            return Ok(None);
719        }
720
721        let close_start = Instant::now();
722        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
723        metrics.write_cost += close_start.elapsed();
724        metrics.encoded_size += buf.len();
725        metrics.num_rows += total_rows;
726
727        let buf = Bytes::from(buf);
728        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
729        let num_series = series_estimator.finish();
730
731        Ok(Some(EncodedBulkPart {
732            data: buf,
733            metadata: BulkPartMeta {
734                num_rows: total_rows,
735                max_timestamp,
736                min_timestamp,
737                parquet_metadata,
738                region_metadata: self.metadata.clone(),
739                num_series,
740            },
741        }))
742    }
743
744    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
745    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
746        if part.batch.num_rows() == 0 {
747            return Ok(None);
748        }
749
750        let mut buf = Vec::with_capacity(4096);
751        let arrow_schema = part.batch.schema();
752
753        let file_metadata = {
754            let mut writer =
755                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
756                    .context(EncodeMemtableSnafu)?;
757            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
758            writer.finish().context(EncodeMemtableSnafu)?
759        };
760
761        let buf = Bytes::from(buf);
762        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
763
764        Ok(Some(EncodedBulkPart {
765            data: buf,
766            metadata: BulkPartMeta {
767                num_rows: part.batch.num_rows(),
768                max_timestamp: part.max_timestamp,
769                min_timestamp: part.min_timestamp,
770                parquet_metadata,
771                region_metadata: self.metadata.clone(),
772                num_series: part.estimated_series_count() as u64,
773            },
774        }))
775    }
776}
777
778/// Converts mutations to record batches.
779fn mutations_to_record_batch(
780    mutations: &[Mutation],
781    metadata: &RegionMetadataRef,
782    pk_encoder: &DensePrimaryKeyCodec,
783    dedup: bool,
784) -> Result<Option<(RecordBatch, i64, i64)>> {
785    let total_rows: usize = mutations
786        .iter()
787        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
788        .sum();
789
790    if total_rows == 0 {
791        return Ok(None);
792    }
793
794    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
795
796    let mut ts_vector: Box<dyn MutableVector> = metadata
797        .time_index_column()
798        .column_schema
799        .data_type
800        .create_mutable_vector(total_rows);
801    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
802    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
803
804    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
805        .field_columns()
806        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
807        .collect();
808
809    let mut pk_buffer = vec![];
810    for m in mutations {
811        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
812            continue;
813        };
814
815        for row in key_values.iter() {
816            pk_buffer.clear();
817            pk_encoder
818                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
819                .context(EncodeSnafu)?;
820            pk_builder.append_value(pk_buffer.as_bytes());
821            ts_vector.push_value_ref(&row.timestamp());
822            sequence_builder.append_value(row.sequence());
823            op_type_builder.append_value(row.op_type() as u8);
824            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
825                builder.push_value_ref(&field);
826            }
827        }
828    }
829
830    let arrow_schema = to_sst_arrow_schema(metadata);
831    // safety: timestamp column must be valid, and values must not be None.
832    let timestamp_unit = metadata
833        .time_index_column()
834        .column_schema
835        .data_type
836        .as_timestamp()
837        .unwrap()
838        .unit();
839    let sorter = ArraysSorter {
840        encoded_primary_keys: pk_builder.finish(),
841        timestamp_unit,
842        timestamp: ts_vector.to_vector().to_arrow_array(),
843        sequence: sequence_builder.finish(),
844        op_type: op_type_builder.finish(),
845        fields: field_builders
846            .iter_mut()
847            .map(|f| f.to_vector().to_arrow_array()),
848        dedup,
849        arrow_schema,
850    };
851
852    sorter.sort().map(Some)
853}
854
855struct ArraysSorter<I> {
856    encoded_primary_keys: BinaryArray,
857    timestamp_unit: TimeUnit,
858    timestamp: ArrayRef,
859    sequence: UInt64Array,
860    op_type: UInt8Array,
861    fields: I,
862    dedup: bool,
863    arrow_schema: SchemaRef,
864}
865
866impl<I> ArraysSorter<I>
867where
868    I: Iterator<Item = ArrayRef>,
869{
870    /// Converts arrays to record batch.
871    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
872        debug_assert!(!self.timestamp.is_empty());
873        debug_assert!(self.timestamp.len() == self.sequence.len());
874        debug_assert!(self.timestamp.len() == self.op_type.len());
875        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
876
877        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
878        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
879        let mut to_sort = self
880            .encoded_primary_keys
881            .iter()
882            .zip(timestamp_iter)
883            .zip(self.sequence.iter())
884            .map(|((pk, timestamp), sequence)| {
885                max_timestamp = max_timestamp.max(*timestamp);
886                min_timestamp = min_timestamp.min(*timestamp);
887                (pk, timestamp, sequence)
888            })
889            .enumerate()
890            .collect::<Vec<_>>();
891
892        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
893            l_pk.cmp(r_pk)
894                .then(l_ts.cmp(r_ts))
895                .then(l_seq.cmp(r_seq).reverse())
896        });
897
898        if self.dedup {
899            // Dedup by timestamps while ignore sequence.
900            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
901                l_pk == r_pk && l_ts == r_ts
902            });
903        }
904
905        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
906
907        let pk_dictionary = Arc::new(binary_array_to_dictionary(
908            // safety: pk must be BinaryArray
909            arrow::compute::take(
910                &self.encoded_primary_keys,
911                &indices,
912                Some(TakeOptions {
913                    check_bounds: false,
914                }),
915            )
916            .context(ComputeArrowSnafu)?
917            .as_any()
918            .downcast_ref::<BinaryArray>()
919            .unwrap(),
920        )?) as ArrayRef;
921
922        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
923        for arr in self.fields {
924            arrays.push(
925                arrow::compute::take(
926                    &arr,
927                    &indices,
928                    Some(TakeOptions {
929                        check_bounds: false,
930                    }),
931                )
932                .context(ComputeArrowSnafu)?,
933            );
934        }
935
936        let timestamp = arrow::compute::take(
937            &self.timestamp,
938            &indices,
939            Some(TakeOptions {
940                check_bounds: false,
941            }),
942        )
943        .context(ComputeArrowSnafu)?;
944
945        arrays.push(timestamp);
946        arrays.push(pk_dictionary);
947        arrays.push(
948            arrow::compute::take(
949                &self.sequence,
950                &indices,
951                Some(TakeOptions {
952                    check_bounds: false,
953                }),
954            )
955            .context(ComputeArrowSnafu)?,
956        );
957
958        arrays.push(
959            arrow::compute::take(
960                &self.op_type,
961                &indices,
962                Some(TakeOptions {
963                    check_bounds: false,
964                }),
965            )
966            .context(ComputeArrowSnafu)?,
967        );
968
969        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
970        Ok((batch, min_timestamp, max_timestamp))
971    }
972}
973
974/// Converts timestamp array to an iter of i64 values.
975fn timestamp_array_to_iter(
976    timestamp_unit: TimeUnit,
977    timestamp: &ArrayRef,
978) -> impl Iterator<Item = &i64> {
979    match timestamp_unit {
980        // safety: timestamp column must be valid.
981        TimeUnit::Second => timestamp
982            .as_any()
983            .downcast_ref::<TimestampSecondArray>()
984            .unwrap()
985            .values()
986            .iter(),
987        TimeUnit::Millisecond => timestamp
988            .as_any()
989            .downcast_ref::<TimestampMillisecondArray>()
990            .unwrap()
991            .values()
992            .iter(),
993        TimeUnit::Microsecond => timestamp
994            .as_any()
995            .downcast_ref::<TimestampMicrosecondArray>()
996            .unwrap()
997            .values()
998            .iter(),
999        TimeUnit::Nanosecond => timestamp
1000            .as_any()
1001            .downcast_ref::<TimestampNanosecondArray>()
1002            .unwrap()
1003            .values()
1004            .iter(),
1005    }
1006}
1007
1008/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
1009fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1010    if input.is_empty() {
1011        return Ok(DictionaryArray::new(
1012            UInt32Array::from(Vec::<u32>::new()),
1013            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1014        ));
1015    }
1016    let mut keys = Vec::with_capacity(16);
1017    let mut values = BinaryBuilder::new();
1018    let mut prev: usize = 0;
1019    keys.push(prev as u32);
1020    values.append_value(input.value(prev));
1021
1022    for current_bytes in input.iter().skip(1) {
1023        // safety: encoded pk must present.
1024        let current_bytes = current_bytes.unwrap();
1025        let prev_bytes = input.value(prev);
1026        if current_bytes != prev_bytes {
1027            values.append_value(current_bytes);
1028            prev += 1;
1029        }
1030        keys.push(prev as u32);
1031    }
1032
1033    Ok(DictionaryArray::new(
1034        UInt32Array::from(keys),
1035        Arc::new(values.finish()) as ArrayRef,
1036    ))
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use std::collections::VecDeque;
1042
1043    use api::v1::{Row, WriteHint};
1044    use datafusion_common::ScalarValue;
1045    use datatypes::arrow::array::Float64Array;
1046    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1047    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1048    use store_api::storage::consts::ReservedColumnId;
1049
1050    use super::*;
1051    use crate::memtable::bulk::context::BulkIterContext;
1052    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1053    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1054    use crate::test_util::memtable_util::{
1055        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1056    };
1057
1058    fn check_binary_array_to_dictionary(
1059        input: &[&[u8]],
1060        expected_keys: &[u32],
1061        expected_values: &[&[u8]],
1062    ) {
1063        let input = BinaryArray::from_iter_values(input.iter());
1064        let array = binary_array_to_dictionary(&input).unwrap();
1065        assert_eq!(
1066            &expected_keys,
1067            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1068        );
1069        assert_eq!(
1070            expected_values,
1071            &array
1072                .values()
1073                .as_any()
1074                .downcast_ref::<BinaryArray>()
1075                .unwrap()
1076                .iter()
1077                .map(|v| v.unwrap())
1078                .collect::<Vec<_>>()
1079        );
1080    }
1081
1082    #[test]
1083    fn test_binary_array_to_dictionary() {
1084        check_binary_array_to_dictionary(&[], &[], &[]);
1085
1086        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1087
1088        check_binary_array_to_dictionary(
1089            &["a".as_bytes(), "a".as_bytes()],
1090            &[0, 0],
1091            &["a".as_bytes()],
1092        );
1093
1094        check_binary_array_to_dictionary(
1095            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1096            &[0, 0, 1],
1097            &["a".as_bytes(), "b".as_bytes()],
1098        );
1099
1100        check_binary_array_to_dictionary(
1101            &[
1102                "a".as_bytes(),
1103                "a".as_bytes(),
1104                "b".as_bytes(),
1105                "c".as_bytes(),
1106            ],
1107            &[0, 0, 1, 2],
1108            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1109        );
1110    }
1111
1112    struct MutationInput<'a> {
1113        k0: &'a str,
1114        k1: u32,
1115        timestamps: &'a [i64],
1116        v1: &'a [Option<f64>],
1117        sequence: u64,
1118    }
1119
1120    #[derive(Debug, PartialOrd, PartialEq)]
1121    struct BatchOutput<'a> {
1122        pk_values: &'a [Value],
1123        timestamps: &'a [i64],
1124        v1: &'a [Option<f64>],
1125    }
1126
1127    fn check_mutations_to_record_batches(
1128        input: &[MutationInput],
1129        expected: &[BatchOutput],
1130        expected_timestamp: (i64, i64),
1131        dedup: bool,
1132    ) {
1133        let metadata = metadata_for_test();
1134        let mutations = input
1135            .iter()
1136            .map(|m| {
1137                build_key_values_with_ts_seq_values(
1138                    &metadata,
1139                    m.k0.to_string(),
1140                    m.k1,
1141                    m.timestamps.iter().copied(),
1142                    m.v1.iter().copied(),
1143                    m.sequence,
1144                )
1145                .mutation
1146            })
1147            .collect::<Vec<_>>();
1148        let total_rows: usize = mutations
1149            .iter()
1150            .flat_map(|m| m.rows.iter())
1151            .map(|r| r.rows.len())
1152            .sum();
1153
1154        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1155
1156        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1157            .unwrap()
1158            .unwrap();
1159        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1160        let mut batches = VecDeque::new();
1161        read_format
1162            .convert_record_batch(&batch, None, &mut batches)
1163            .unwrap();
1164        if !dedup {
1165            assert_eq!(
1166                total_rows,
1167                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1168            );
1169        }
1170        let batch_values = batches
1171            .into_iter()
1172            .map(|b| {
1173                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1174                let timestamps = b
1175                    .timestamps()
1176                    .as_any()
1177                    .downcast_ref::<TimestampMillisecondVector>()
1178                    .unwrap()
1179                    .iter_data()
1180                    .map(|v| v.unwrap().0.value())
1181                    .collect::<Vec<_>>();
1182                let float_values = b.fields()[1]
1183                    .data
1184                    .as_any()
1185                    .downcast_ref::<Float64Vector>()
1186                    .unwrap()
1187                    .iter_data()
1188                    .collect::<Vec<_>>();
1189
1190                (pk_values, timestamps, float_values)
1191            })
1192            .collect::<Vec<_>>();
1193        assert_eq!(expected.len(), batch_values.len());
1194
1195        for idx in 0..expected.len() {
1196            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1197            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1198            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1199        }
1200    }
1201
1202    #[test]
1203    fn test_mutations_to_record_batch() {
1204        check_mutations_to_record_batches(
1205            &[MutationInput {
1206                k0: "a",
1207                k1: 0,
1208                timestamps: &[0],
1209                v1: &[Some(0.1)],
1210                sequence: 0,
1211            }],
1212            &[BatchOutput {
1213                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1214                timestamps: &[0],
1215                v1: &[Some(0.1)],
1216            }],
1217            (0, 0),
1218            true,
1219        );
1220
1221        check_mutations_to_record_batches(
1222            &[
1223                MutationInput {
1224                    k0: "a",
1225                    k1: 0,
1226                    timestamps: &[0],
1227                    v1: &[Some(0.1)],
1228                    sequence: 0,
1229                },
1230                MutationInput {
1231                    k0: "b",
1232                    k1: 0,
1233                    timestamps: &[0],
1234                    v1: &[Some(0.0)],
1235                    sequence: 0,
1236                },
1237                MutationInput {
1238                    k0: "a",
1239                    k1: 0,
1240                    timestamps: &[1],
1241                    v1: &[Some(0.2)],
1242                    sequence: 1,
1243                },
1244                MutationInput {
1245                    k0: "a",
1246                    k1: 1,
1247                    timestamps: &[1],
1248                    v1: &[Some(0.3)],
1249                    sequence: 2,
1250                },
1251            ],
1252            &[
1253                BatchOutput {
1254                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1255                    timestamps: &[0, 1],
1256                    v1: &[Some(0.1), Some(0.2)],
1257                },
1258                BatchOutput {
1259                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1260                    timestamps: &[1],
1261                    v1: &[Some(0.3)],
1262                },
1263                BatchOutput {
1264                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1265                    timestamps: &[0],
1266                    v1: &[Some(0.0)],
1267                },
1268            ],
1269            (0, 1),
1270            true,
1271        );
1272
1273        check_mutations_to_record_batches(
1274            &[
1275                MutationInput {
1276                    k0: "a",
1277                    k1: 0,
1278                    timestamps: &[0],
1279                    v1: &[Some(0.1)],
1280                    sequence: 0,
1281                },
1282                MutationInput {
1283                    k0: "b",
1284                    k1: 0,
1285                    timestamps: &[0],
1286                    v1: &[Some(0.0)],
1287                    sequence: 0,
1288                },
1289                MutationInput {
1290                    k0: "a",
1291                    k1: 0,
1292                    timestamps: &[0],
1293                    v1: &[Some(0.2)],
1294                    sequence: 1,
1295                },
1296            ],
1297            &[
1298                BatchOutput {
1299                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1300                    timestamps: &[0],
1301                    v1: &[Some(0.2)],
1302                },
1303                BatchOutput {
1304                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1305                    timestamps: &[0],
1306                    v1: &[Some(0.0)],
1307                },
1308            ],
1309            (0, 0),
1310            true,
1311        );
1312        check_mutations_to_record_batches(
1313            &[
1314                MutationInput {
1315                    k0: "a",
1316                    k1: 0,
1317                    timestamps: &[0],
1318                    v1: &[Some(0.1)],
1319                    sequence: 0,
1320                },
1321                MutationInput {
1322                    k0: "b",
1323                    k1: 0,
1324                    timestamps: &[0],
1325                    v1: &[Some(0.0)],
1326                    sequence: 0,
1327                },
1328                MutationInput {
1329                    k0: "a",
1330                    k1: 0,
1331                    timestamps: &[0],
1332                    v1: &[Some(0.2)],
1333                    sequence: 1,
1334                },
1335            ],
1336            &[
1337                BatchOutput {
1338                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1339                    timestamps: &[0, 0],
1340                    v1: &[Some(0.2), Some(0.1)],
1341                },
1342                BatchOutput {
1343                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1344                    timestamps: &[0],
1345                    v1: &[Some(0.0)],
1346                },
1347            ],
1348            (0, 0),
1349            false,
1350        );
1351    }
1352
1353    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1354        let metadata = metadata_for_test();
1355        let kvs = input
1356            .iter()
1357            .map(|m| {
1358                build_key_values_with_ts_seq_values(
1359                    &metadata,
1360                    m.k0.to_string(),
1361                    m.k1,
1362                    m.timestamps.iter().copied(),
1363                    m.v1.iter().copied(),
1364                    m.sequence,
1365                )
1366            })
1367            .collect::<Vec<_>>();
1368        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1369        let primary_key_codec = build_primary_key_codec(&metadata);
1370        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1371        for kv in kvs {
1372            converter.append_key_values(&kv).unwrap();
1373        }
1374        let part = converter.convert().unwrap();
1375        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1376        encoder.encode_part(&part).unwrap().unwrap()
1377    }
1378
1379    #[test]
1380    fn test_write_and_read_part_projection() {
1381        let part = encode(&[
1382            MutationInput {
1383                k0: "a",
1384                k1: 0,
1385                timestamps: &[1],
1386                v1: &[Some(0.1)],
1387                sequence: 0,
1388            },
1389            MutationInput {
1390                k0: "b",
1391                k1: 0,
1392                timestamps: &[1],
1393                v1: &[Some(0.0)],
1394                sequence: 0,
1395            },
1396            MutationInput {
1397                k0: "a",
1398                k1: 0,
1399                timestamps: &[2],
1400                v1: &[Some(0.2)],
1401                sequence: 1,
1402            },
1403        ]);
1404
1405        let projection = &[4u32];
1406        let mut reader = part
1407            .read(
1408                Arc::new(
1409                    BulkIterContext::new(
1410                        part.metadata.region_metadata.clone(),
1411                        Some(projection.as_slice()),
1412                        None,
1413                        false,
1414                    )
1415                    .unwrap(),
1416                ),
1417                None,
1418            )
1419            .unwrap()
1420            .expect("expect at least one row group");
1421
1422        let mut total_rows_read = 0;
1423        let mut field: Vec<f64> = vec![];
1424        for res in reader {
1425            let batch = res.unwrap();
1426            assert_eq!(5, batch.num_columns());
1427            field.extend_from_slice(
1428                batch
1429                    .column(0)
1430                    .as_any()
1431                    .downcast_ref::<Float64Array>()
1432                    .unwrap()
1433                    .values(),
1434            );
1435            total_rows_read += batch.num_rows();
1436        }
1437        assert_eq!(3, total_rows_read);
1438        assert_eq!(vec![0.1, 0.2, 0.0], field);
1439    }
1440
1441    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1442        let metadata = metadata_for_test();
1443        let kvs = key_values
1444            .into_iter()
1445            .map(|(k0, k1, (start, end), sequence)| {
1446                let ts = (start..end);
1447                let v1 = (start..end).map(|_| None);
1448                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1449            })
1450            .collect::<Vec<_>>();
1451        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1452        let primary_key_codec = build_primary_key_codec(&metadata);
1453        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1454        for kv in kvs {
1455            converter.append_key_values(&kv).unwrap();
1456        }
1457        let part = converter.convert().unwrap();
1458        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1459        encoder.encode_part(&part).unwrap().unwrap()
1460    }
1461
1462    fn check_prune_row_group(
1463        part: &EncodedBulkPart,
1464        predicate: Option<Predicate>,
1465        expected_rows: usize,
1466    ) {
1467        let context = Arc::new(
1468            BulkIterContext::new(
1469                part.metadata.region_metadata.clone(),
1470                None,
1471                predicate,
1472                false,
1473            )
1474            .unwrap(),
1475        );
1476        let mut reader = part
1477            .read(context, None)
1478            .unwrap()
1479            .expect("expect at least one row group");
1480        let mut total_rows_read = 0;
1481        for res in reader {
1482            let batch = res.unwrap();
1483            total_rows_read += batch.num_rows();
1484        }
1485        // Should only read row group 1.
1486        assert_eq!(expected_rows, total_rows_read);
1487    }
1488
1489    #[test]
1490    fn test_prune_row_groups() {
1491        let part = prepare(vec![
1492            ("a", 0, (0, 40), 1),
1493            ("a", 1, (0, 60), 1),
1494            ("b", 0, (0, 100), 2),
1495            ("b", 1, (100, 180), 3),
1496            ("b", 1, (180, 210), 4),
1497        ]);
1498
1499        let context = Arc::new(
1500            BulkIterContext::new(
1501                part.metadata.region_metadata.clone(),
1502                None,
1503                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1504                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1505                )])),
1506                false,
1507            )
1508            .unwrap(),
1509        );
1510        assert!(part.read(context, None).unwrap().is_none());
1511
1512        check_prune_row_group(&part, None, 310);
1513
1514        check_prune_row_group(
1515            &part,
1516            Some(Predicate::new(vec![
1517                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1518                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1519            ])),
1520            40,
1521        );
1522
1523        check_prune_row_group(
1524            &part,
1525            Some(Predicate::new(vec![
1526                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1527                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1528            ])),
1529            60,
1530        );
1531
1532        check_prune_row_group(
1533            &part,
1534            Some(Predicate::new(vec![
1535                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1536            ])),
1537            100,
1538        );
1539
1540        check_prune_row_group(
1541            &part,
1542            Some(Predicate::new(vec![
1543                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1544                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1545            ])),
1546            100,
1547        );
1548
1549        /// Predicates over field column can do precise filtering.
1550        check_prune_row_group(
1551            &part,
1552            Some(Predicate::new(vec![
1553                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1554            ])),
1555            1,
1556        );
1557    }
1558
1559    #[test]
1560    fn test_bulk_part_converter_append_and_convert() {
1561        let metadata = metadata_for_test();
1562        let capacity = 100;
1563        let primary_key_codec = build_primary_key_codec(&metadata);
1564        let schema = to_flat_sst_arrow_schema(
1565            &metadata,
1566            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1567        );
1568
1569        let mut converter =
1570            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1571
1572        let key_values1 = build_key_values_with_ts_seq_values(
1573            &metadata,
1574            "key1".to_string(),
1575            1u32,
1576            vec![1000, 2000].into_iter(),
1577            vec![Some(1.0), Some(2.0)].into_iter(),
1578            1,
1579        );
1580
1581        let key_values2 = build_key_values_with_ts_seq_values(
1582            &metadata,
1583            "key2".to_string(),
1584            2u32,
1585            vec![1500].into_iter(),
1586            vec![Some(3.0)].into_iter(),
1587            2,
1588        );
1589
1590        converter.append_key_values(&key_values1).unwrap();
1591        converter.append_key_values(&key_values2).unwrap();
1592
1593        let bulk_part = converter.convert().unwrap();
1594
1595        assert_eq!(bulk_part.num_rows(), 3);
1596        assert_eq!(bulk_part.min_timestamp, 1000);
1597        assert_eq!(bulk_part.max_timestamp, 2000);
1598        assert_eq!(bulk_part.sequence, 2);
1599        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1600
1601        // Validate primary key columns are stored
1602        // Schema should include primary key columns k0 and k1 at the beginning
1603        let schema = bulk_part.batch.schema();
1604        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1605        assert_eq!(
1606            field_names,
1607            vec![
1608                "k0",
1609                "k1",
1610                "v0",
1611                "v1",
1612                "ts",
1613                "__primary_key",
1614                "__sequence",
1615                "__op_type"
1616            ]
1617        );
1618    }
1619
1620    #[test]
1621    fn test_bulk_part_converter_sorting() {
1622        let metadata = metadata_for_test();
1623        let capacity = 100;
1624        let primary_key_codec = build_primary_key_codec(&metadata);
1625        let schema = to_flat_sst_arrow_schema(
1626            &metadata,
1627            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1628        );
1629
1630        let mut converter =
1631            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1632
1633        let key_values1 = build_key_values_with_ts_seq_values(
1634            &metadata,
1635            "z_key".to_string(),
1636            3u32,
1637            vec![3000].into_iter(),
1638            vec![Some(3.0)].into_iter(),
1639            3,
1640        );
1641
1642        let key_values2 = build_key_values_with_ts_seq_values(
1643            &metadata,
1644            "a_key".to_string(),
1645            1u32,
1646            vec![1000].into_iter(),
1647            vec![Some(1.0)].into_iter(),
1648            1,
1649        );
1650
1651        let key_values3 = build_key_values_with_ts_seq_values(
1652            &metadata,
1653            "m_key".to_string(),
1654            2u32,
1655            vec![2000].into_iter(),
1656            vec![Some(2.0)].into_iter(),
1657            2,
1658        );
1659
1660        converter.append_key_values(&key_values1).unwrap();
1661        converter.append_key_values(&key_values2).unwrap();
1662        converter.append_key_values(&key_values3).unwrap();
1663
1664        let bulk_part = converter.convert().unwrap();
1665
1666        assert_eq!(bulk_part.num_rows(), 3);
1667
1668        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1669        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1670
1671        let ts_array = ts_column
1672            .as_any()
1673            .downcast_ref::<TimestampMillisecondArray>()
1674            .unwrap();
1675        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1676
1677        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1678        assert_eq!(seq_array.values(), &[1, 2, 3]);
1679
1680        // Validate primary key columns are stored
1681        let schema = bulk_part.batch.schema();
1682        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1683        assert_eq!(
1684            field_names,
1685            vec![
1686                "k0",
1687                "k1",
1688                "v0",
1689                "v1",
1690                "ts",
1691                "__primary_key",
1692                "__sequence",
1693                "__op_type"
1694            ]
1695        );
1696    }
1697
1698    #[test]
1699    fn test_bulk_part_converter_empty() {
1700        let metadata = metadata_for_test();
1701        let capacity = 10;
1702        let primary_key_codec = build_primary_key_codec(&metadata);
1703        let schema = to_flat_sst_arrow_schema(
1704            &metadata,
1705            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1706        );
1707
1708        let converter =
1709            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1710
1711        let bulk_part = converter.convert().unwrap();
1712
1713        assert_eq!(bulk_part.num_rows(), 0);
1714        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1715        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1716        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1717
1718        // Validate primary key columns are present in schema even for empty batch
1719        let schema = bulk_part.batch.schema();
1720        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1721        assert_eq!(
1722            field_names,
1723            vec![
1724                "k0",
1725                "k1",
1726                "v0",
1727                "v1",
1728                "ts",
1729                "__primary_key",
1730                "__sequence",
1731                "__op_type"
1732            ]
1733        );
1734    }
1735
1736    #[test]
1737    fn test_bulk_part_converter_without_primary_key_columns() {
1738        let metadata = metadata_for_test();
1739        let primary_key_codec = build_primary_key_codec(&metadata);
1740        let schema = to_flat_sst_arrow_schema(
1741            &metadata,
1742            &FlatSchemaOptions {
1743                raw_pk_columns: false,
1744                string_pk_use_dict: true,
1745            },
1746        );
1747
1748        let capacity = 100;
1749        let mut converter =
1750            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1751
1752        let key_values1 = build_key_values_with_ts_seq_values(
1753            &metadata,
1754            "key1".to_string(),
1755            1u32,
1756            vec![1000, 2000].into_iter(),
1757            vec![Some(1.0), Some(2.0)].into_iter(),
1758            1,
1759        );
1760
1761        let key_values2 = build_key_values_with_ts_seq_values(
1762            &metadata,
1763            "key2".to_string(),
1764            2u32,
1765            vec![1500].into_iter(),
1766            vec![Some(3.0)].into_iter(),
1767            2,
1768        );
1769
1770        converter.append_key_values(&key_values1).unwrap();
1771        converter.append_key_values(&key_values2).unwrap();
1772
1773        let bulk_part = converter.convert().unwrap();
1774
1775        assert_eq!(bulk_part.num_rows(), 3);
1776        assert_eq!(bulk_part.min_timestamp, 1000);
1777        assert_eq!(bulk_part.max_timestamp, 2000);
1778        assert_eq!(bulk_part.sequence, 2);
1779        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1780
1781        // Validate primary key columns are NOT stored individually
1782        let schema = bulk_part.batch.schema();
1783        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1784        assert_eq!(
1785            field_names,
1786            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1787        );
1788    }
1789
1790    #[allow(clippy::too_many_arguments)]
1791    fn build_key_values_with_sparse_encoding(
1792        metadata: &RegionMetadataRef,
1793        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1794        table_id: u32,
1795        tsid: u64,
1796        k0: String,
1797        k1: String,
1798        timestamps: impl Iterator<Item = i64>,
1799        values: impl Iterator<Item = Option<f64>>,
1800        sequence: SequenceNumber,
1801    ) -> KeyValues {
1802        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1803        let pk_values = vec![
1804            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1805            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1806            (0, Value::String(k0.clone().into())),
1807            (1, Value::String(k1.clone().into())),
1808        ];
1809        let mut encoded_key = Vec::new();
1810        primary_key_codec
1811            .encode_values(&pk_values, &mut encoded_key)
1812            .unwrap();
1813        assert!(!encoded_key.is_empty());
1814
1815        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1816        let column_schema = vec![
1817            api::v1::ColumnSchema {
1818                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1819                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1820                    ConcreteDataType::binary_datatype(),
1821                )
1822                .unwrap()
1823                .datatype() as i32,
1824                semantic_type: api::v1::SemanticType::Tag as i32,
1825                ..Default::default()
1826            },
1827            api::v1::ColumnSchema {
1828                column_name: "ts".to_string(),
1829                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1830                    ConcreteDataType::timestamp_millisecond_datatype(),
1831                )
1832                .unwrap()
1833                .datatype() as i32,
1834                semantic_type: api::v1::SemanticType::Timestamp as i32,
1835                ..Default::default()
1836            },
1837            api::v1::ColumnSchema {
1838                column_name: "v0".to_string(),
1839                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1840                    ConcreteDataType::int64_datatype(),
1841                )
1842                .unwrap()
1843                .datatype() as i32,
1844                semantic_type: api::v1::SemanticType::Field as i32,
1845                ..Default::default()
1846            },
1847            api::v1::ColumnSchema {
1848                column_name: "v1".to_string(),
1849                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1850                    ConcreteDataType::float64_datatype(),
1851                )
1852                .unwrap()
1853                .datatype() as i32,
1854                semantic_type: api::v1::SemanticType::Field as i32,
1855                ..Default::default()
1856            },
1857        ];
1858
1859        let rows = timestamps
1860            .zip(values)
1861            .map(|(ts, v)| Row {
1862                values: vec![
1863                    api::v1::Value {
1864                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1865                            encoded_key.clone(),
1866                        )),
1867                    },
1868                    api::v1::Value {
1869                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1870                    },
1871                    api::v1::Value {
1872                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1873                    },
1874                    api::v1::Value {
1875                        value_data: v.map(api::v1::value::ValueData::F64Value),
1876                    },
1877                ],
1878            })
1879            .collect();
1880
1881        let mutation = api::v1::Mutation {
1882            op_type: 1,
1883            sequence,
1884            rows: Some(api::v1::Rows {
1885                schema: column_schema,
1886                rows,
1887            }),
1888            write_hint: Some(WriteHint {
1889                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1890            }),
1891        };
1892        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1893    }
1894
1895    #[test]
1896    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1897        use api::v1::SemanticType;
1898        use datatypes::schema::ColumnSchema;
1899        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1900        use store_api::storage::RegionId;
1901
1902        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1903        builder
1904            .push_column_metadata(ColumnMetadata {
1905                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1906                semantic_type: SemanticType::Tag,
1907                column_id: 0,
1908            })
1909            .push_column_metadata(ColumnMetadata {
1910                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1911                semantic_type: SemanticType::Tag,
1912                column_id: 1,
1913            })
1914            .push_column_metadata(ColumnMetadata {
1915                column_schema: ColumnSchema::new(
1916                    "ts",
1917                    ConcreteDataType::timestamp_millisecond_datatype(),
1918                    false,
1919                ),
1920                semantic_type: SemanticType::Timestamp,
1921                column_id: 2,
1922            })
1923            .push_column_metadata(ColumnMetadata {
1924                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1925                semantic_type: SemanticType::Field,
1926                column_id: 3,
1927            })
1928            .push_column_metadata(ColumnMetadata {
1929                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1930                semantic_type: SemanticType::Field,
1931                column_id: 4,
1932            })
1933            .primary_key(vec![0, 1])
1934            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1935        let metadata = Arc::new(builder.build().unwrap());
1936
1937        let primary_key_codec = build_primary_key_codec(&metadata);
1938        let schema = to_flat_sst_arrow_schema(
1939            &metadata,
1940            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1941        );
1942
1943        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1944        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1945
1946        let capacity = 100;
1947        let mut converter =
1948            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1949
1950        let key_values1 = build_key_values_with_sparse_encoding(
1951            &metadata,
1952            &primary_key_codec,
1953            2048u32, // table_id
1954            100u64,  // tsid
1955            "key11".to_string(),
1956            "key21".to_string(),
1957            vec![1000, 2000].into_iter(),
1958            vec![Some(1.0), Some(2.0)].into_iter(),
1959            1,
1960        );
1961
1962        let key_values2 = build_key_values_with_sparse_encoding(
1963            &metadata,
1964            &primary_key_codec,
1965            4096u32, // table_id
1966            200u64,  // tsid
1967            "key12".to_string(),
1968            "key22".to_string(),
1969            vec![1500].into_iter(),
1970            vec![Some(3.0)].into_iter(),
1971            2,
1972        );
1973
1974        converter.append_key_values(&key_values1).unwrap();
1975        converter.append_key_values(&key_values2).unwrap();
1976
1977        let bulk_part = converter.convert().unwrap();
1978
1979        assert_eq!(bulk_part.num_rows(), 3);
1980        assert_eq!(bulk_part.min_timestamp, 1000);
1981        assert_eq!(bulk_part.max_timestamp, 2000);
1982        assert_eq!(bulk_part.sequence, 2);
1983        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1984
1985        // For sparse encoding, primary key columns should NOT be stored individually
1986        // even when store_primary_key_columns is true, because sparse encoding
1987        // stores the encoded primary key in the __primary_key column
1988        let schema = bulk_part.batch.schema();
1989        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1990        assert_eq!(
1991            field_names,
1992            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1993        );
1994
1995        // Verify the __primary_key column contains encoded sparse keys
1996        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1997        let dict_array = primary_key_column
1998            .as_any()
1999            .downcast_ref::<DictionaryArray<UInt32Type>>()
2000            .unwrap();
2001
2002        // Should have non-zero entries indicating encoded primary keys
2003        assert!(!dict_array.is_empty());
2004        assert_eq!(dict_array.len(), 3); // 3 rows total
2005
2006        // Verify values are properly encoded binary data (not empty)
2007        let values = dict_array
2008            .values()
2009            .as_any()
2010            .downcast_ref::<BinaryArray>()
2011            .unwrap();
2012        for i in 0..values.len() {
2013            assert!(
2014                !values.value(i).is_empty(),
2015                "Encoded primary key should not be empty"
2016            );
2017        }
2018    }
2019}