mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber, SequenceRange};
57use table::predicate::Predicate;
58
59use crate::error::{
60    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61    EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::bulk::context::BulkIterContextRef;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
71use crate::sst::parquet::helper::parse_parquet_metadata;
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77/// A raw bulk part in the memtable.
78#[derive(Clone)]
79pub struct BulkPart {
80    pub batch: RecordBatch,
81    pub max_timestamp: i64,
82    pub min_timestamp: i64,
83    pub sequence: u64,
84    pub timestamp_index: usize,
85    pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89    type Error = error::Error;
90
91    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92        match value.body.expect("Entry payload should be present") {
93            Body::ArrowIpc(ipc) => {
94                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95                    .context(error::ConvertBulkWalEntrySnafu)?;
96                let batch = decoder
97                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                Ok(Self {
100                    batch,
101                    max_timestamp: value.max_ts,
102                    min_timestamp: value.min_ts,
103                    sequence: value.sequence,
104                    timestamp_index: value.timestamp_index as usize,
105                    raw_data: Some(ipc),
106                })
107            }
108        }
109    }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113    type Error = error::Error;
114
115    fn try_from(value: &BulkPart) -> Result<Self> {
116        if let Some(ipc) = &value.raw_data {
117            Ok(BulkWalEntry {
118                sequence: value.sequence,
119                max_ts: value.max_timestamp,
120                min_ts: value.min_timestamp,
121                timestamp_index: value.timestamp_index as u32,
122                body: Some(Body::ArrowIpc(ipc.clone())),
123            })
124        } else {
125            let mut encoder = FlightEncoder::default();
126            let schema_bytes = encoder
127                .encode_schema(value.batch.schema().as_ref())
128                .data_header;
129            let [rb_data] = encoder
130                .encode(FlightMessage::RecordBatch(value.batch.clone()))
131                .try_into()
132                .map_err(|_| {
133                    error::UnsupportedOperationSnafu {
134                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135                    }
136                    .build()
137                })?;
138            Ok(BulkWalEntry {
139                sequence: value.sequence,
140                max_ts: value.max_timestamp,
141                min_ts: value.min_timestamp,
142                timestamp_index: value.timestamp_index as u32,
143                body: Some(Body::ArrowIpc(ArrowIpc {
144                    schema: schema_bytes,
145                    data_header: rb_data.data_header,
146                    payload: rb_data.data_body,
147                })),
148            })
149        }
150    }
151}
152
153impl BulkPart {
154    pub(crate) fn estimated_size(&self) -> usize {
155        record_batch_estimated_size(&self.batch)
156    }
157
158    /// Returns the estimated series count in this BulkPart.
159    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
160    pub fn estimated_series_count(&self) -> usize {
161        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162        let pk_column = self.batch.column(pk_column_idx);
163        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164            dict_array.values().len()
165        } else {
166            0
167        }
168    }
169
170    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
171    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
172        let vectors = region_metadata
173            .schema
174            .column_schemas()
175            .iter()
176            .map(|col| match self.batch.column_by_name(&col.name) {
177                None => Ok(None),
178                Some(col) => Helper::try_into_vector(col).map(Some),
179            })
180            .collect::<datatypes::error::Result<Vec<_>>>()
181            .context(error::ComputeVectorSnafu)?;
182
183        let rows = (0..self.num_rows())
184            .map(|row_idx| {
185                let values = (0..self.batch.num_columns())
186                    .map(|col_idx| {
187                        if let Some(v) = &vectors[col_idx] {
188                            value_to_grpc_value(v.get(row_idx))
189                        } else {
190                            api::v1::Value { value_data: None }
191                        }
192                    })
193                    .collect::<Vec<_>>();
194                api::v1::Row { values }
195            })
196            .collect::<Vec<_>>();
197
198        let schema = region_metadata
199            .column_metadatas
200            .iter()
201            .map(|c| {
202                let data_type_wrapper =
203                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
204                Ok(api::v1::ColumnSchema {
205                    column_name: c.column_schema.name.clone(),
206                    datatype: data_type_wrapper.datatype() as i32,
207                    semantic_type: c.semantic_type as i32,
208                    ..Default::default()
209                })
210            })
211            .collect::<api::error::Result<Vec<_>>>()
212            .context(error::ConvertColumnDataTypeSnafu {
213                reason: "failed to convert region metadata to column schema",
214            })?;
215
216        let rows = api::v1::Rows { schema, rows };
217
218        Ok(Mutation {
219            op_type: OpType::Put as i32,
220            sequence: self.sequence,
221            rows: Some(rows),
222            write_hint: None,
223        })
224    }
225
226    pub fn timestamps(&self) -> &ArrayRef {
227        self.batch.column(self.timestamp_index)
228    }
229
230    pub fn num_rows(&self) -> usize {
231        self.batch.num_rows()
232    }
233}
234
235/// A collection of small unordered bulk parts.
236/// Used to batch small parts together before merging them into a sorted part.
237pub struct UnorderedPart {
238    /// Small bulk parts that haven't been sorted yet.
239    parts: Vec<BulkPart>,
240    /// Total number of rows across all parts.
241    total_rows: usize,
242    /// Minimum timestamp across all parts.
243    min_timestamp: i64,
244    /// Maximum timestamp across all parts.
245    max_timestamp: i64,
246    /// Maximum sequence number across all parts.
247    max_sequence: u64,
248    /// Row count threshold for accepting parts (default: 1024).
249    threshold: usize,
250    /// Row count threshold for compacting (default: 4096).
251    compact_threshold: usize,
252}
253
254impl Default for UnorderedPart {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260impl UnorderedPart {
261    /// Creates a new empty UnorderedPart.
262    pub fn new() -> Self {
263        Self {
264            parts: Vec::new(),
265            total_rows: 0,
266            min_timestamp: i64::MAX,
267            max_timestamp: i64::MIN,
268            max_sequence: 0,
269            threshold: 1024,
270            compact_threshold: 4096,
271        }
272    }
273
274    /// Sets the threshold for accepting parts into unordered_part.
275    pub fn set_threshold(&mut self, threshold: usize) {
276        self.threshold = threshold;
277    }
278
279    /// Sets the threshold for compacting unordered_part.
280    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
281        self.compact_threshold = compact_threshold;
282    }
283
284    /// Returns the threshold for accepting parts.
285    pub fn threshold(&self) -> usize {
286        self.threshold
287    }
288
289    /// Returns the compact threshold.
290    pub fn compact_threshold(&self) -> usize {
291        self.compact_threshold
292    }
293
294    /// Returns true if this part should accept the given row count.
295    pub fn should_accept(&self, num_rows: usize) -> bool {
296        num_rows < self.threshold
297    }
298
299    /// Returns true if this part should be compacted.
300    pub fn should_compact(&self) -> bool {
301        self.total_rows >= self.compact_threshold
302    }
303
304    /// Adds a BulkPart to this unordered collection.
305    pub fn push(&mut self, part: BulkPart) {
306        self.total_rows += part.num_rows();
307        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
308        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
309        self.max_sequence = self.max_sequence.max(part.sequence);
310        self.parts.push(part);
311    }
312
313    /// Returns the total number of rows across all parts.
314    pub fn num_rows(&self) -> usize {
315        self.total_rows
316    }
317
318    /// Returns true if there are no parts.
319    pub fn is_empty(&self) -> bool {
320        self.parts.is_empty()
321    }
322
323    /// Returns the number of parts in this collection.
324    pub fn num_parts(&self) -> usize {
325        self.parts.len()
326    }
327
328    /// Concatenates and sorts all parts into a single RecordBatch.
329    /// Returns None if the collection is empty.
330    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
331        if self.parts.is_empty() {
332            return Ok(None);
333        }
334
335        if self.parts.len() == 1 {
336            // If there's only one part, return its batch directly
337            return Ok(Some(self.parts[0].batch.clone()));
338        }
339
340        // Get the schema from the first part
341        let schema = self.parts[0].batch.schema();
342
343        // Concatenate all record batches
344        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
345        let concatenated =
346            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
347
348        // Sort the concatenated batch
349        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
350
351        Ok(Some(sorted_batch))
352    }
353
354    /// Converts all parts into a single sorted BulkPart.
355    /// Returns None if the collection is empty.
356    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
357        let Some(sorted_batch) = self.concat_and_sort()? else {
358            return Ok(None);
359        };
360
361        let timestamp_index = self.parts[0].timestamp_index;
362
363        Ok(Some(BulkPart {
364            batch: sorted_batch,
365            max_timestamp: self.max_timestamp,
366            min_timestamp: self.min_timestamp,
367            sequence: self.max_sequence,
368            timestamp_index,
369            raw_data: None,
370        }))
371    }
372
373    /// Clears all parts from this collection.
374    pub fn clear(&mut self) {
375        self.parts.clear();
376        self.total_rows = 0;
377        self.min_timestamp = i64::MAX;
378        self.max_timestamp = i64::MIN;
379        self.max_sequence = 0;
380    }
381}
382
383/// More accurate estimation of the size of a record batch.
384pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
385    batch
386        .columns()
387        .iter()
388        // If can not get slice memory size, assume 0 here.
389        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
390        .sum()
391}
392
393/// Primary key column builder for handling strings specially.
394enum PrimaryKeyColumnBuilder {
395    /// String dictionary builder for string types.
396    StringDict(StringDictionaryBuilder<UInt32Type>),
397    /// Generic mutable vector for other types.
398    Vector(Box<dyn MutableVector>),
399}
400
401impl PrimaryKeyColumnBuilder {
402    /// Appends a value to the builder.
403    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
404        match self {
405            PrimaryKeyColumnBuilder::StringDict(builder) => {
406                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
407                    // We know the value is a string.
408                    builder.append_value(s);
409                } else {
410                    builder.append_null();
411                }
412            }
413            PrimaryKeyColumnBuilder::Vector(builder) => {
414                builder.push_value_ref(&value);
415            }
416        }
417        Ok(())
418    }
419
420    /// Converts the builder to an ArrayRef.
421    fn into_arrow_array(self) -> ArrayRef {
422        match self {
423            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
424            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
425        }
426    }
427}
428
429/// Converter that converts structs into [BulkPart].
430pub struct BulkPartConverter {
431    /// Region metadata.
432    region_metadata: RegionMetadataRef,
433    /// Schema of the converted batch.
434    schema: SchemaRef,
435    /// Primary key codec for encoding keys
436    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
437    /// Buffer for encoding primary key.
438    key_buf: Vec<u8>,
439    /// Primary key array builder.
440    key_array_builder: PrimaryKeyArrayBuilder,
441    /// Builders for non-primary key columns.
442    value_builder: ValueBuilder,
443    /// Builders for individual primary key columns.
444    /// The order of builders is the same as the order of primary key columns in the region metadata.
445    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
446
447    /// Max timestamp value.
448    max_ts: i64,
449    /// Min timestamp value.
450    min_ts: i64,
451    /// Max sequence number.
452    max_sequence: SequenceNumber,
453}
454
455impl BulkPartConverter {
456    /// Creates a new converter.
457    ///
458    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
459    /// stores primary key columns in arrays additionally.
460    pub fn new(
461        region_metadata: &RegionMetadataRef,
462        schema: SchemaRef,
463        capacity: usize,
464        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
465        store_primary_key_columns: bool,
466    ) -> Self {
467        debug_assert_eq!(
468            region_metadata.primary_key_encoding,
469            primary_key_codec.encoding()
470        );
471
472        let primary_key_column_builders = if store_primary_key_columns
473            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
474        {
475            new_primary_key_column_builders(region_metadata, capacity)
476        } else {
477            Vec::new()
478        };
479
480        Self {
481            region_metadata: region_metadata.clone(),
482            schema,
483            primary_key_codec,
484            key_buf: Vec::new(),
485            key_array_builder: PrimaryKeyArrayBuilder::new(),
486            value_builder: ValueBuilder::new(region_metadata, capacity),
487            primary_key_column_builders,
488            min_ts: i64::MAX,
489            max_ts: i64::MIN,
490            max_sequence: SequenceNumber::MIN,
491        }
492    }
493
494    /// Appends a [KeyValues] into the converter.
495    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
496        for kv in key_values.iter() {
497            self.append_key_value(&kv)?;
498        }
499
500        Ok(())
501    }
502
503    /// Appends a [KeyValue] to builders.
504    ///
505    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
506    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
507        // Handles primary key based on encoding type
508        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
509            // For sparse encoding, the primary key is already encoded in the KeyValue
510            // Gets the first (and only) primary key value which contains the encoded key
511            let mut primary_keys = kv.primary_keys();
512            if let Some(encoded) = primary_keys
513                .next()
514                .context(ColumnNotFoundSnafu {
515                    column: PRIMARY_KEY_COLUMN_NAME,
516                })?
517                .try_into_binary()
518                .context(DataTypeMismatchSnafu)?
519            {
520                self.key_array_builder
521                    .append(encoded)
522                    .context(ComputeArrowSnafu)?;
523            } else {
524                self.key_array_builder
525                    .append("")
526                    .context(ComputeArrowSnafu)?;
527            }
528        } else {
529            // For dense encoding, we need to encode the primary key columns
530            self.key_buf.clear();
531            self.primary_key_codec
532                .encode_key_value(kv, &mut self.key_buf)
533                .context(EncodeSnafu)?;
534            self.key_array_builder
535                .append(&self.key_buf)
536                .context(ComputeArrowSnafu)?;
537        };
538
539        // If storing primary key columns, append values to individual builders
540        if !self.primary_key_column_builders.is_empty() {
541            for (builder, pk_value) in self
542                .primary_key_column_builders
543                .iter_mut()
544                .zip(kv.primary_keys())
545            {
546                builder.push_value_ref(pk_value)?;
547            }
548        }
549
550        // Pushes other columns.
551        self.value_builder.push(
552            kv.timestamp(),
553            kv.sequence(),
554            kv.op_type() as u8,
555            kv.fields(),
556        );
557
558        // Updates statistics
559        // Safety: timestamp of kv must be both present and a valid timestamp value.
560        let ts = kv
561            .timestamp()
562            .try_into_timestamp()
563            .unwrap()
564            .unwrap()
565            .value();
566        self.min_ts = self.min_ts.min(ts);
567        self.max_ts = self.max_ts.max(ts);
568        self.max_sequence = self.max_sequence.max(kv.sequence());
569
570        Ok(())
571    }
572
573    /// Converts buffered content into a [BulkPart].
574    ///
575    /// It sorts the record batch by (primary key, timestamp, sequence desc).
576    pub fn convert(mut self) -> Result<BulkPart> {
577        let values = Values::from(self.value_builder);
578        let mut columns =
579            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
580
581        // Build primary key column arrays if enabled.
582        for builder in self.primary_key_column_builders {
583            columns.push(builder.into_arrow_array());
584        }
585        // Then fields columns.
586        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
587        // Time index.
588        let timestamp_index = columns.len();
589        columns.push(values.timestamp.to_arrow_array());
590        // Primary key.
591        let pk_array = self.key_array_builder.finish();
592        columns.push(Arc::new(pk_array));
593        // Sequence and op type.
594        columns.push(values.sequence.to_arrow_array());
595        columns.push(values.op_type.to_arrow_array());
596
597        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
598        // Sorts the record batch.
599        let batch = sort_primary_key_record_batch(&batch)?;
600
601        Ok(BulkPart {
602            batch,
603            max_timestamp: self.max_ts,
604            min_timestamp: self.min_ts,
605            sequence: self.max_sequence,
606            timestamp_index,
607            raw_data: None,
608        })
609    }
610}
611
612fn new_primary_key_column_builders(
613    metadata: &RegionMetadata,
614    capacity: usize,
615) -> Vec<PrimaryKeyColumnBuilder> {
616    metadata
617        .primary_key_columns()
618        .map(|col| {
619            if col.column_schema.data_type.is_string() {
620                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
621                    capacity,
622                    INIT_DICT_VALUE_CAPACITY,
623                    capacity,
624                ))
625            } else {
626                PrimaryKeyColumnBuilder::Vector(
627                    col.column_schema.data_type.create_mutable_vector(capacity),
628                )
629            }
630        })
631        .collect()
632}
633
634/// Sorts the record batch with primary key format.
635fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
636    let total_columns = batch.num_columns();
637    let sort_columns = vec![
638        // Primary key column (ascending)
639        SortColumn {
640            values: batch.column(total_columns - 3).clone(),
641            options: Some(SortOptions {
642                descending: false,
643                nulls_first: true,
644            }),
645        },
646        // Time index column (ascending)
647        SortColumn {
648            values: batch.column(total_columns - 4).clone(),
649            options: Some(SortOptions {
650                descending: false,
651                nulls_first: true,
652            }),
653        },
654        // Sequence column (descending)
655        SortColumn {
656            values: batch.column(total_columns - 2).clone(),
657            options: Some(SortOptions {
658                descending: true,
659                nulls_first: true,
660            }),
661        },
662    ];
663
664    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
665        .context(ComputeArrowSnafu)?;
666
667    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
668}
669
670#[derive(Debug, Clone)]
671pub struct EncodedBulkPart {
672    data: Bytes,
673    metadata: BulkPartMeta,
674}
675
676impl EncodedBulkPart {
677    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
678        Self { data, metadata }
679    }
680
681    pub(crate) fn metadata(&self) -> &BulkPartMeta {
682        &self.metadata
683    }
684
685    /// Returns the size of the encoded data in bytes
686    pub(crate) fn size_bytes(&self) -> usize {
687        self.data.len()
688    }
689
690    /// Returns the encoded data.
691    pub(crate) fn data(&self) -> &Bytes {
692        &self.data
693    }
694
695    /// Converts this `EncodedBulkPart` to `SstInfo`.
696    ///
697    /// # Arguments
698    /// * `file_id` - The SST file ID to assign to this part
699    ///
700    /// # Returns
701    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
702    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
703        let unit = self.metadata.region_metadata.time_index_type().unit();
704        SstInfo {
705            file_id,
706            time_range: (
707                Timestamp::new(self.metadata.min_timestamp, unit),
708                Timestamp::new(self.metadata.max_timestamp, unit),
709            ),
710            file_size: self.data.len() as u64,
711            num_rows: self.metadata.num_rows,
712            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
713            file_metadata: Some(self.metadata.parquet_metadata.clone()),
714            index_metadata: IndexOutput::default(),
715            num_series: self.metadata.num_series,
716        }
717    }
718
719    pub(crate) fn read(
720        &self,
721        context: BulkIterContextRef,
722        sequence: Option<SequenceRange>,
723        mem_scan_metrics: Option<MemScanMetrics>,
724    ) -> Result<Option<BoxedRecordBatchIterator>> {
725        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
726        let skip_fields_for_pruning =
727            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
728
729        // use predicate to find row groups to read.
730        let row_groups_to_read =
731            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
732
733        if row_groups_to_read.is_empty() {
734            // All row groups are filtered.
735            return Ok(None);
736        }
737
738        let iter = EncodedBulkPartIter::try_new(
739            self,
740            context,
741            row_groups_to_read,
742            sequence,
743            mem_scan_metrics,
744        )?;
745        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
746    }
747
748    /// Computes whether to skip field columns based on PreFilterMode.
749    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
750        match pre_filter_mode {
751            PreFilterMode::All => false,
752            PreFilterMode::SkipFields => true,
753            PreFilterMode::SkipFieldsOnDelete => {
754                // Check if any row group contains delete op
755                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
756                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
757                })
758            }
759        }
760    }
761}
762
763#[derive(Debug, Clone)]
764pub struct BulkPartMeta {
765    /// Total rows in part.
766    pub num_rows: usize,
767    /// Max timestamp in part.
768    pub max_timestamp: i64,
769    /// Min timestamp in part.
770    pub min_timestamp: i64,
771    /// Part file metadata.
772    pub parquet_metadata: Arc<ParquetMetaData>,
773    /// Part region schema.
774    pub region_metadata: RegionMetadataRef,
775    /// Number of series.
776    pub num_series: u64,
777}
778
779/// Metrics for encoding a part.
780#[derive(Default, Debug)]
781pub struct BulkPartEncodeMetrics {
782    /// Cost of iterating over the data.
783    pub iter_cost: Duration,
784    /// Cost of writing the data.
785    pub write_cost: Duration,
786    /// Size of data before encoding.
787    pub raw_size: usize,
788    /// Size of data after encoding.
789    pub encoded_size: usize,
790    /// Number of rows in part.
791    pub num_rows: usize,
792}
793
794pub struct BulkPartEncoder {
795    metadata: RegionMetadataRef,
796    row_group_size: usize,
797    writer_props: Option<WriterProperties>,
798}
799
800impl BulkPartEncoder {
801    pub(crate) fn new(
802        metadata: RegionMetadataRef,
803        row_group_size: usize,
804    ) -> Result<BulkPartEncoder> {
805        // TODO(yingwen): Skip arrow schema if needed.
806        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
807        let key_value_meta =
808            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
809
810        // TODO(yingwen): Do we need compression?
811        let writer_props = Some(
812            WriterProperties::builder()
813                .set_key_value_metadata(Some(vec![key_value_meta]))
814                .set_write_batch_size(row_group_size)
815                .set_max_row_group_size(row_group_size)
816                .set_compression(Compression::ZSTD(ZstdLevel::default()))
817                .set_column_index_truncate_length(None)
818                .set_statistics_truncate_length(None)
819                .build(),
820        );
821
822        Ok(Self {
823            metadata,
824            row_group_size,
825            writer_props,
826        })
827    }
828}
829
830impl BulkPartEncoder {
831    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
832    pub fn encode_record_batch_iter(
833        &self,
834        iter: BoxedRecordBatchIterator,
835        arrow_schema: SchemaRef,
836        min_timestamp: i64,
837        max_timestamp: i64,
838        metrics: &mut BulkPartEncodeMetrics,
839    ) -> Result<Option<EncodedBulkPart>> {
840        let mut buf = Vec::with_capacity(4096);
841        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
842            .context(EncodeMemtableSnafu)?;
843        let mut total_rows = 0;
844        let mut series_estimator = SeriesEstimator::default();
845
846        // Process each batch from the iterator
847        let mut iter_start = Instant::now();
848        for batch_result in iter {
849            metrics.iter_cost += iter_start.elapsed();
850            let batch = batch_result?;
851            if batch.num_rows() == 0 {
852                continue;
853            }
854
855            series_estimator.update_flat(&batch);
856            metrics.raw_size += record_batch_estimated_size(&batch);
857            let write_start = Instant::now();
858            writer.write(&batch).context(EncodeMemtableSnafu)?;
859            metrics.write_cost += write_start.elapsed();
860            total_rows += batch.num_rows();
861            iter_start = Instant::now();
862        }
863        metrics.iter_cost += iter_start.elapsed();
864        iter_start = Instant::now();
865
866        if total_rows == 0 {
867            return Ok(None);
868        }
869
870        let close_start = Instant::now();
871        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
872        metrics.write_cost += close_start.elapsed();
873        metrics.encoded_size += buf.len();
874        metrics.num_rows += total_rows;
875
876        let buf = Bytes::from(buf);
877        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
878        let num_series = series_estimator.finish();
879
880        Ok(Some(EncodedBulkPart {
881            data: buf,
882            metadata: BulkPartMeta {
883                num_rows: total_rows,
884                max_timestamp,
885                min_timestamp,
886                parquet_metadata,
887                region_metadata: self.metadata.clone(),
888                num_series,
889            },
890        }))
891    }
892
893    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
894    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
895        if part.batch.num_rows() == 0 {
896            return Ok(None);
897        }
898
899        let mut buf = Vec::with_capacity(4096);
900        let arrow_schema = part.batch.schema();
901
902        let file_metadata = {
903            let mut writer =
904                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
905                    .context(EncodeMemtableSnafu)?;
906            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
907            writer.finish().context(EncodeMemtableSnafu)?
908        };
909
910        let buf = Bytes::from(buf);
911        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
912
913        Ok(Some(EncodedBulkPart {
914            data: buf,
915            metadata: BulkPartMeta {
916                num_rows: part.batch.num_rows(),
917                max_timestamp: part.max_timestamp,
918                min_timestamp: part.min_timestamp,
919                parquet_metadata,
920                region_metadata: self.metadata.clone(),
921                num_series: part.estimated_series_count() as u64,
922            },
923        }))
924    }
925}
926
927/// Converts mutations to record batches.
928fn mutations_to_record_batch(
929    mutations: &[Mutation],
930    metadata: &RegionMetadataRef,
931    pk_encoder: &DensePrimaryKeyCodec,
932    dedup: bool,
933) -> Result<Option<(RecordBatch, i64, i64)>> {
934    let total_rows: usize = mutations
935        .iter()
936        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
937        .sum();
938
939    if total_rows == 0 {
940        return Ok(None);
941    }
942
943    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
944
945    let mut ts_vector: Box<dyn MutableVector> = metadata
946        .time_index_column()
947        .column_schema
948        .data_type
949        .create_mutable_vector(total_rows);
950    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
951    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
952
953    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
954        .field_columns()
955        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
956        .collect();
957
958    let mut pk_buffer = vec![];
959    for m in mutations {
960        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
961            continue;
962        };
963
964        for row in key_values.iter() {
965            pk_buffer.clear();
966            pk_encoder
967                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
968                .context(EncodeSnafu)?;
969            pk_builder.append_value(pk_buffer.as_bytes());
970            ts_vector.push_value_ref(&row.timestamp());
971            sequence_builder.append_value(row.sequence());
972            op_type_builder.append_value(row.op_type() as u8);
973            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
974                builder.push_value_ref(&field);
975            }
976        }
977    }
978
979    let arrow_schema = to_sst_arrow_schema(metadata);
980    // safety: timestamp column must be valid, and values must not be None.
981    let timestamp_unit = metadata
982        .time_index_column()
983        .column_schema
984        .data_type
985        .as_timestamp()
986        .unwrap()
987        .unit();
988    let sorter = ArraysSorter {
989        encoded_primary_keys: pk_builder.finish(),
990        timestamp_unit,
991        timestamp: ts_vector.to_vector().to_arrow_array(),
992        sequence: sequence_builder.finish(),
993        op_type: op_type_builder.finish(),
994        fields: field_builders
995            .iter_mut()
996            .map(|f| f.to_vector().to_arrow_array()),
997        dedup,
998        arrow_schema,
999    };
1000
1001    sorter.sort().map(Some)
1002}
1003
1004struct ArraysSorter<I> {
1005    encoded_primary_keys: BinaryArray,
1006    timestamp_unit: TimeUnit,
1007    timestamp: ArrayRef,
1008    sequence: UInt64Array,
1009    op_type: UInt8Array,
1010    fields: I,
1011    dedup: bool,
1012    arrow_schema: SchemaRef,
1013}
1014
1015impl<I> ArraysSorter<I>
1016where
1017    I: Iterator<Item = ArrayRef>,
1018{
1019    /// Converts arrays to record batch.
1020    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
1021        debug_assert!(!self.timestamp.is_empty());
1022        debug_assert!(self.timestamp.len() == self.sequence.len());
1023        debug_assert!(self.timestamp.len() == self.op_type.len());
1024        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
1025
1026        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
1027        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
1028        let mut to_sort = self
1029            .encoded_primary_keys
1030            .iter()
1031            .zip(timestamp_iter)
1032            .zip(self.sequence.iter())
1033            .map(|((pk, timestamp), sequence)| {
1034                max_timestamp = max_timestamp.max(*timestamp);
1035                min_timestamp = min_timestamp.min(*timestamp);
1036                (pk, timestamp, sequence)
1037            })
1038            .enumerate()
1039            .collect::<Vec<_>>();
1040
1041        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
1042            l_pk.cmp(r_pk)
1043                .then(l_ts.cmp(r_ts))
1044                .then(l_seq.cmp(r_seq).reverse())
1045        });
1046
1047        if self.dedup {
1048            // Dedup by timestamps while ignore sequence.
1049            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
1050                l_pk == r_pk && l_ts == r_ts
1051            });
1052        }
1053
1054        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
1055
1056        let pk_dictionary = Arc::new(binary_array_to_dictionary(
1057            // safety: pk must be BinaryArray
1058            arrow::compute::take(
1059                &self.encoded_primary_keys,
1060                &indices,
1061                Some(TakeOptions {
1062                    check_bounds: false,
1063                }),
1064            )
1065            .context(ComputeArrowSnafu)?
1066            .as_any()
1067            .downcast_ref::<BinaryArray>()
1068            .unwrap(),
1069        )?) as ArrayRef;
1070
1071        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
1072        for arr in self.fields {
1073            arrays.push(
1074                arrow::compute::take(
1075                    &arr,
1076                    &indices,
1077                    Some(TakeOptions {
1078                        check_bounds: false,
1079                    }),
1080                )
1081                .context(ComputeArrowSnafu)?,
1082            );
1083        }
1084
1085        let timestamp = arrow::compute::take(
1086            &self.timestamp,
1087            &indices,
1088            Some(TakeOptions {
1089                check_bounds: false,
1090            }),
1091        )
1092        .context(ComputeArrowSnafu)?;
1093
1094        arrays.push(timestamp);
1095        arrays.push(pk_dictionary);
1096        arrays.push(
1097            arrow::compute::take(
1098                &self.sequence,
1099                &indices,
1100                Some(TakeOptions {
1101                    check_bounds: false,
1102                }),
1103            )
1104            .context(ComputeArrowSnafu)?,
1105        );
1106
1107        arrays.push(
1108            arrow::compute::take(
1109                &self.op_type,
1110                &indices,
1111                Some(TakeOptions {
1112                    check_bounds: false,
1113                }),
1114            )
1115            .context(ComputeArrowSnafu)?,
1116        );
1117
1118        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
1119        Ok((batch, min_timestamp, max_timestamp))
1120    }
1121}
1122
1123/// Converts timestamp array to an iter of i64 values.
1124fn timestamp_array_to_iter(
1125    timestamp_unit: TimeUnit,
1126    timestamp: &ArrayRef,
1127) -> impl Iterator<Item = &i64> {
1128    match timestamp_unit {
1129        // safety: timestamp column must be valid.
1130        TimeUnit::Second => timestamp
1131            .as_any()
1132            .downcast_ref::<TimestampSecondArray>()
1133            .unwrap()
1134            .values()
1135            .iter(),
1136        TimeUnit::Millisecond => timestamp
1137            .as_any()
1138            .downcast_ref::<TimestampMillisecondArray>()
1139            .unwrap()
1140            .values()
1141            .iter(),
1142        TimeUnit::Microsecond => timestamp
1143            .as_any()
1144            .downcast_ref::<TimestampMicrosecondArray>()
1145            .unwrap()
1146            .values()
1147            .iter(),
1148        TimeUnit::Nanosecond => timestamp
1149            .as_any()
1150            .downcast_ref::<TimestampNanosecondArray>()
1151            .unwrap()
1152            .values()
1153            .iter(),
1154    }
1155}
1156
1157/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
1158fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1159    if input.is_empty() {
1160        return Ok(DictionaryArray::new(
1161            UInt32Array::from(Vec::<u32>::new()),
1162            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1163        ));
1164    }
1165    let mut keys = Vec::with_capacity(16);
1166    let mut values = BinaryBuilder::new();
1167    let mut prev: usize = 0;
1168    keys.push(prev as u32);
1169    values.append_value(input.value(prev));
1170
1171    for current_bytes in input.iter().skip(1) {
1172        // safety: encoded pk must present.
1173        let current_bytes = current_bytes.unwrap();
1174        let prev_bytes = input.value(prev);
1175        if current_bytes != prev_bytes {
1176            values.append_value(current_bytes);
1177            prev += 1;
1178        }
1179        keys.push(prev as u32);
1180    }
1181
1182    Ok(DictionaryArray::new(
1183        UInt32Array::from(keys),
1184        Arc::new(values.finish()) as ArrayRef,
1185    ))
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use std::collections::VecDeque;
1191
1192    use api::v1::{Row, WriteHint};
1193    use datafusion_common::ScalarValue;
1194    use datatypes::arrow::array::Float64Array;
1195    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1196    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1197    use store_api::storage::consts::ReservedColumnId;
1198
1199    use super::*;
1200    use crate::memtable::bulk::context::BulkIterContext;
1201    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1202    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1203    use crate::test_util::memtable_util::{
1204        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1205    };
1206
1207    fn check_binary_array_to_dictionary(
1208        input: &[&[u8]],
1209        expected_keys: &[u32],
1210        expected_values: &[&[u8]],
1211    ) {
1212        let input = BinaryArray::from_iter_values(input.iter());
1213        let array = binary_array_to_dictionary(&input).unwrap();
1214        assert_eq!(
1215            &expected_keys,
1216            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1217        );
1218        assert_eq!(
1219            expected_values,
1220            &array
1221                .values()
1222                .as_any()
1223                .downcast_ref::<BinaryArray>()
1224                .unwrap()
1225                .iter()
1226                .map(|v| v.unwrap())
1227                .collect::<Vec<_>>()
1228        );
1229    }
1230
1231    #[test]
1232    fn test_binary_array_to_dictionary() {
1233        check_binary_array_to_dictionary(&[], &[], &[]);
1234
1235        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1236
1237        check_binary_array_to_dictionary(
1238            &["a".as_bytes(), "a".as_bytes()],
1239            &[0, 0],
1240            &["a".as_bytes()],
1241        );
1242
1243        check_binary_array_to_dictionary(
1244            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1245            &[0, 0, 1],
1246            &["a".as_bytes(), "b".as_bytes()],
1247        );
1248
1249        check_binary_array_to_dictionary(
1250            &[
1251                "a".as_bytes(),
1252                "a".as_bytes(),
1253                "b".as_bytes(),
1254                "c".as_bytes(),
1255            ],
1256            &[0, 0, 1, 2],
1257            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1258        );
1259    }
1260
1261    struct MutationInput<'a> {
1262        k0: &'a str,
1263        k1: u32,
1264        timestamps: &'a [i64],
1265        v1: &'a [Option<f64>],
1266        sequence: u64,
1267    }
1268
1269    #[derive(Debug, PartialOrd, PartialEq)]
1270    struct BatchOutput<'a> {
1271        pk_values: &'a [Value],
1272        timestamps: &'a [i64],
1273        v1: &'a [Option<f64>],
1274    }
1275
1276    fn check_mutations_to_record_batches(
1277        input: &[MutationInput],
1278        expected: &[BatchOutput],
1279        expected_timestamp: (i64, i64),
1280        dedup: bool,
1281    ) {
1282        let metadata = metadata_for_test();
1283        let mutations = input
1284            .iter()
1285            .map(|m| {
1286                build_key_values_with_ts_seq_values(
1287                    &metadata,
1288                    m.k0.to_string(),
1289                    m.k1,
1290                    m.timestamps.iter().copied(),
1291                    m.v1.iter().copied(),
1292                    m.sequence,
1293                )
1294                .mutation
1295            })
1296            .collect::<Vec<_>>();
1297        let total_rows: usize = mutations
1298            .iter()
1299            .flat_map(|m| m.rows.iter())
1300            .map(|r| r.rows.len())
1301            .sum();
1302
1303        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1304
1305        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1306            .unwrap()
1307            .unwrap();
1308        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1309        let mut batches = VecDeque::new();
1310        read_format
1311            .convert_record_batch(&batch, None, &mut batches)
1312            .unwrap();
1313        if !dedup {
1314            assert_eq!(
1315                total_rows,
1316                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1317            );
1318        }
1319        let batch_values = batches
1320            .into_iter()
1321            .map(|b| {
1322                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1323                let timestamps = b
1324                    .timestamps()
1325                    .as_any()
1326                    .downcast_ref::<TimestampMillisecondVector>()
1327                    .unwrap()
1328                    .iter_data()
1329                    .map(|v| v.unwrap().0.value())
1330                    .collect::<Vec<_>>();
1331                let float_values = b.fields()[1]
1332                    .data
1333                    .as_any()
1334                    .downcast_ref::<Float64Vector>()
1335                    .unwrap()
1336                    .iter_data()
1337                    .collect::<Vec<_>>();
1338
1339                (pk_values, timestamps, float_values)
1340            })
1341            .collect::<Vec<_>>();
1342        assert_eq!(expected.len(), batch_values.len());
1343
1344        for idx in 0..expected.len() {
1345            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1346            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1347            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1348        }
1349    }
1350
1351    #[test]
1352    fn test_mutations_to_record_batch() {
1353        check_mutations_to_record_batches(
1354            &[MutationInput {
1355                k0: "a",
1356                k1: 0,
1357                timestamps: &[0],
1358                v1: &[Some(0.1)],
1359                sequence: 0,
1360            }],
1361            &[BatchOutput {
1362                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1363                timestamps: &[0],
1364                v1: &[Some(0.1)],
1365            }],
1366            (0, 0),
1367            true,
1368        );
1369
1370        check_mutations_to_record_batches(
1371            &[
1372                MutationInput {
1373                    k0: "a",
1374                    k1: 0,
1375                    timestamps: &[0],
1376                    v1: &[Some(0.1)],
1377                    sequence: 0,
1378                },
1379                MutationInput {
1380                    k0: "b",
1381                    k1: 0,
1382                    timestamps: &[0],
1383                    v1: &[Some(0.0)],
1384                    sequence: 0,
1385                },
1386                MutationInput {
1387                    k0: "a",
1388                    k1: 0,
1389                    timestamps: &[1],
1390                    v1: &[Some(0.2)],
1391                    sequence: 1,
1392                },
1393                MutationInput {
1394                    k0: "a",
1395                    k1: 1,
1396                    timestamps: &[1],
1397                    v1: &[Some(0.3)],
1398                    sequence: 2,
1399                },
1400            ],
1401            &[
1402                BatchOutput {
1403                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1404                    timestamps: &[0, 1],
1405                    v1: &[Some(0.1), Some(0.2)],
1406                },
1407                BatchOutput {
1408                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1409                    timestamps: &[1],
1410                    v1: &[Some(0.3)],
1411                },
1412                BatchOutput {
1413                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1414                    timestamps: &[0],
1415                    v1: &[Some(0.0)],
1416                },
1417            ],
1418            (0, 1),
1419            true,
1420        );
1421
1422        check_mutations_to_record_batches(
1423            &[
1424                MutationInput {
1425                    k0: "a",
1426                    k1: 0,
1427                    timestamps: &[0],
1428                    v1: &[Some(0.1)],
1429                    sequence: 0,
1430                },
1431                MutationInput {
1432                    k0: "b",
1433                    k1: 0,
1434                    timestamps: &[0],
1435                    v1: &[Some(0.0)],
1436                    sequence: 0,
1437                },
1438                MutationInput {
1439                    k0: "a",
1440                    k1: 0,
1441                    timestamps: &[0],
1442                    v1: &[Some(0.2)],
1443                    sequence: 1,
1444                },
1445            ],
1446            &[
1447                BatchOutput {
1448                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1449                    timestamps: &[0],
1450                    v1: &[Some(0.2)],
1451                },
1452                BatchOutput {
1453                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1454                    timestamps: &[0],
1455                    v1: &[Some(0.0)],
1456                },
1457            ],
1458            (0, 0),
1459            true,
1460        );
1461        check_mutations_to_record_batches(
1462            &[
1463                MutationInput {
1464                    k0: "a",
1465                    k1: 0,
1466                    timestamps: &[0],
1467                    v1: &[Some(0.1)],
1468                    sequence: 0,
1469                },
1470                MutationInput {
1471                    k0: "b",
1472                    k1: 0,
1473                    timestamps: &[0],
1474                    v1: &[Some(0.0)],
1475                    sequence: 0,
1476                },
1477                MutationInput {
1478                    k0: "a",
1479                    k1: 0,
1480                    timestamps: &[0],
1481                    v1: &[Some(0.2)],
1482                    sequence: 1,
1483                },
1484            ],
1485            &[
1486                BatchOutput {
1487                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1488                    timestamps: &[0, 0],
1489                    v1: &[Some(0.2), Some(0.1)],
1490                },
1491                BatchOutput {
1492                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1493                    timestamps: &[0],
1494                    v1: &[Some(0.0)],
1495                },
1496            ],
1497            (0, 0),
1498            false,
1499        );
1500    }
1501
1502    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1503        let metadata = metadata_for_test();
1504        let kvs = input
1505            .iter()
1506            .map(|m| {
1507                build_key_values_with_ts_seq_values(
1508                    &metadata,
1509                    m.k0.to_string(),
1510                    m.k1,
1511                    m.timestamps.iter().copied(),
1512                    m.v1.iter().copied(),
1513                    m.sequence,
1514                )
1515            })
1516            .collect::<Vec<_>>();
1517        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1518        let primary_key_codec = build_primary_key_codec(&metadata);
1519        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1520        for kv in kvs {
1521            converter.append_key_values(&kv).unwrap();
1522        }
1523        let part = converter.convert().unwrap();
1524        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1525        encoder.encode_part(&part).unwrap().unwrap()
1526    }
1527
1528    #[test]
1529    fn test_write_and_read_part_projection() {
1530        let part = encode(&[
1531            MutationInput {
1532                k0: "a",
1533                k1: 0,
1534                timestamps: &[1],
1535                v1: &[Some(0.1)],
1536                sequence: 0,
1537            },
1538            MutationInput {
1539                k0: "b",
1540                k1: 0,
1541                timestamps: &[1],
1542                v1: &[Some(0.0)],
1543                sequence: 0,
1544            },
1545            MutationInput {
1546                k0: "a",
1547                k1: 0,
1548                timestamps: &[2],
1549                v1: &[Some(0.2)],
1550                sequence: 1,
1551            },
1552        ]);
1553
1554        let projection = &[4u32];
1555        let mut reader = part
1556            .read(
1557                Arc::new(
1558                    BulkIterContext::new(
1559                        part.metadata.region_metadata.clone(),
1560                        Some(projection.as_slice()),
1561                        None,
1562                        false,
1563                    )
1564                    .unwrap(),
1565                ),
1566                None,
1567                None,
1568            )
1569            .unwrap()
1570            .expect("expect at least one row group");
1571
1572        let mut total_rows_read = 0;
1573        let mut field: Vec<f64> = vec![];
1574        for res in reader {
1575            let batch = res.unwrap();
1576            assert_eq!(5, batch.num_columns());
1577            field.extend_from_slice(
1578                batch
1579                    .column(0)
1580                    .as_any()
1581                    .downcast_ref::<Float64Array>()
1582                    .unwrap()
1583                    .values(),
1584            );
1585            total_rows_read += batch.num_rows();
1586        }
1587        assert_eq!(3, total_rows_read);
1588        assert_eq!(vec![0.1, 0.2, 0.0], field);
1589    }
1590
1591    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1592        let metadata = metadata_for_test();
1593        let kvs = key_values
1594            .into_iter()
1595            .map(|(k0, k1, (start, end), sequence)| {
1596                let ts = (start..end);
1597                let v1 = (start..end).map(|_| None);
1598                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1599            })
1600            .collect::<Vec<_>>();
1601        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1602        let primary_key_codec = build_primary_key_codec(&metadata);
1603        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1604        for kv in kvs {
1605            converter.append_key_values(&kv).unwrap();
1606        }
1607        let part = converter.convert().unwrap();
1608        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1609        encoder.encode_part(&part).unwrap().unwrap()
1610    }
1611
1612    fn check_prune_row_group(
1613        part: &EncodedBulkPart,
1614        predicate: Option<Predicate>,
1615        expected_rows: usize,
1616    ) {
1617        let context = Arc::new(
1618            BulkIterContext::new(
1619                part.metadata.region_metadata.clone(),
1620                None,
1621                predicate,
1622                false,
1623            )
1624            .unwrap(),
1625        );
1626        let mut reader = part
1627            .read(context, None, None)
1628            .unwrap()
1629            .expect("expect at least one row group");
1630        let mut total_rows_read = 0;
1631        for res in reader {
1632            let batch = res.unwrap();
1633            total_rows_read += batch.num_rows();
1634        }
1635        // Should only read row group 1.
1636        assert_eq!(expected_rows, total_rows_read);
1637    }
1638
1639    #[test]
1640    fn test_prune_row_groups() {
1641        let part = prepare(vec![
1642            ("a", 0, (0, 40), 1),
1643            ("a", 1, (0, 60), 1),
1644            ("b", 0, (0, 100), 2),
1645            ("b", 1, (100, 180), 3),
1646            ("b", 1, (180, 210), 4),
1647        ]);
1648
1649        let context = Arc::new(
1650            BulkIterContext::new(
1651                part.metadata.region_metadata.clone(),
1652                None,
1653                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1654                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1655                )])),
1656                false,
1657            )
1658            .unwrap(),
1659        );
1660        assert!(part.read(context, None, None).unwrap().is_none());
1661
1662        check_prune_row_group(&part, None, 310);
1663
1664        check_prune_row_group(
1665            &part,
1666            Some(Predicate::new(vec![
1667                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1668                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1669            ])),
1670            40,
1671        );
1672
1673        check_prune_row_group(
1674            &part,
1675            Some(Predicate::new(vec![
1676                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1677                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1678            ])),
1679            60,
1680        );
1681
1682        check_prune_row_group(
1683            &part,
1684            Some(Predicate::new(vec![
1685                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1686            ])),
1687            100,
1688        );
1689
1690        check_prune_row_group(
1691            &part,
1692            Some(Predicate::new(vec![
1693                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1694                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1695            ])),
1696            100,
1697        );
1698
1699        /// Predicates over field column can do precise filtering.
1700        check_prune_row_group(
1701            &part,
1702            Some(Predicate::new(vec![
1703                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1704            ])),
1705            1,
1706        );
1707    }
1708
1709    #[test]
1710    fn test_bulk_part_converter_append_and_convert() {
1711        let metadata = metadata_for_test();
1712        let capacity = 100;
1713        let primary_key_codec = build_primary_key_codec(&metadata);
1714        let schema = to_flat_sst_arrow_schema(
1715            &metadata,
1716            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1717        );
1718
1719        let mut converter =
1720            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1721
1722        let key_values1 = build_key_values_with_ts_seq_values(
1723            &metadata,
1724            "key1".to_string(),
1725            1u32,
1726            vec![1000, 2000].into_iter(),
1727            vec![Some(1.0), Some(2.0)].into_iter(),
1728            1,
1729        );
1730
1731        let key_values2 = build_key_values_with_ts_seq_values(
1732            &metadata,
1733            "key2".to_string(),
1734            2u32,
1735            vec![1500].into_iter(),
1736            vec![Some(3.0)].into_iter(),
1737            2,
1738        );
1739
1740        converter.append_key_values(&key_values1).unwrap();
1741        converter.append_key_values(&key_values2).unwrap();
1742
1743        let bulk_part = converter.convert().unwrap();
1744
1745        assert_eq!(bulk_part.num_rows(), 3);
1746        assert_eq!(bulk_part.min_timestamp, 1000);
1747        assert_eq!(bulk_part.max_timestamp, 2000);
1748        assert_eq!(bulk_part.sequence, 2);
1749        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1750
1751        // Validate primary key columns are stored
1752        // Schema should include primary key columns k0 and k1 at the beginning
1753        let schema = bulk_part.batch.schema();
1754        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1755        assert_eq!(
1756            field_names,
1757            vec![
1758                "k0",
1759                "k1",
1760                "v0",
1761                "v1",
1762                "ts",
1763                "__primary_key",
1764                "__sequence",
1765                "__op_type"
1766            ]
1767        );
1768    }
1769
1770    #[test]
1771    fn test_bulk_part_converter_sorting() {
1772        let metadata = metadata_for_test();
1773        let capacity = 100;
1774        let primary_key_codec = build_primary_key_codec(&metadata);
1775        let schema = to_flat_sst_arrow_schema(
1776            &metadata,
1777            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1778        );
1779
1780        let mut converter =
1781            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1782
1783        let key_values1 = build_key_values_with_ts_seq_values(
1784            &metadata,
1785            "z_key".to_string(),
1786            3u32,
1787            vec![3000].into_iter(),
1788            vec![Some(3.0)].into_iter(),
1789            3,
1790        );
1791
1792        let key_values2 = build_key_values_with_ts_seq_values(
1793            &metadata,
1794            "a_key".to_string(),
1795            1u32,
1796            vec![1000].into_iter(),
1797            vec![Some(1.0)].into_iter(),
1798            1,
1799        );
1800
1801        let key_values3 = build_key_values_with_ts_seq_values(
1802            &metadata,
1803            "m_key".to_string(),
1804            2u32,
1805            vec![2000].into_iter(),
1806            vec![Some(2.0)].into_iter(),
1807            2,
1808        );
1809
1810        converter.append_key_values(&key_values1).unwrap();
1811        converter.append_key_values(&key_values2).unwrap();
1812        converter.append_key_values(&key_values3).unwrap();
1813
1814        let bulk_part = converter.convert().unwrap();
1815
1816        assert_eq!(bulk_part.num_rows(), 3);
1817
1818        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1819        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1820
1821        let ts_array = ts_column
1822            .as_any()
1823            .downcast_ref::<TimestampMillisecondArray>()
1824            .unwrap();
1825        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1826
1827        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1828        assert_eq!(seq_array.values(), &[1, 2, 3]);
1829
1830        // Validate primary key columns are stored
1831        let schema = bulk_part.batch.schema();
1832        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1833        assert_eq!(
1834            field_names,
1835            vec![
1836                "k0",
1837                "k1",
1838                "v0",
1839                "v1",
1840                "ts",
1841                "__primary_key",
1842                "__sequence",
1843                "__op_type"
1844            ]
1845        );
1846    }
1847
1848    #[test]
1849    fn test_bulk_part_converter_empty() {
1850        let metadata = metadata_for_test();
1851        let capacity = 10;
1852        let primary_key_codec = build_primary_key_codec(&metadata);
1853        let schema = to_flat_sst_arrow_schema(
1854            &metadata,
1855            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1856        );
1857
1858        let converter =
1859            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1860
1861        let bulk_part = converter.convert().unwrap();
1862
1863        assert_eq!(bulk_part.num_rows(), 0);
1864        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1865        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1866        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1867
1868        // Validate primary key columns are present in schema even for empty batch
1869        let schema = bulk_part.batch.schema();
1870        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1871        assert_eq!(
1872            field_names,
1873            vec![
1874                "k0",
1875                "k1",
1876                "v0",
1877                "v1",
1878                "ts",
1879                "__primary_key",
1880                "__sequence",
1881                "__op_type"
1882            ]
1883        );
1884    }
1885
1886    #[test]
1887    fn test_bulk_part_converter_without_primary_key_columns() {
1888        let metadata = metadata_for_test();
1889        let primary_key_codec = build_primary_key_codec(&metadata);
1890        let schema = to_flat_sst_arrow_schema(
1891            &metadata,
1892            &FlatSchemaOptions {
1893                raw_pk_columns: false,
1894                string_pk_use_dict: true,
1895            },
1896        );
1897
1898        let capacity = 100;
1899        let mut converter =
1900            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1901
1902        let key_values1 = build_key_values_with_ts_seq_values(
1903            &metadata,
1904            "key1".to_string(),
1905            1u32,
1906            vec![1000, 2000].into_iter(),
1907            vec![Some(1.0), Some(2.0)].into_iter(),
1908            1,
1909        );
1910
1911        let key_values2 = build_key_values_with_ts_seq_values(
1912            &metadata,
1913            "key2".to_string(),
1914            2u32,
1915            vec![1500].into_iter(),
1916            vec![Some(3.0)].into_iter(),
1917            2,
1918        );
1919
1920        converter.append_key_values(&key_values1).unwrap();
1921        converter.append_key_values(&key_values2).unwrap();
1922
1923        let bulk_part = converter.convert().unwrap();
1924
1925        assert_eq!(bulk_part.num_rows(), 3);
1926        assert_eq!(bulk_part.min_timestamp, 1000);
1927        assert_eq!(bulk_part.max_timestamp, 2000);
1928        assert_eq!(bulk_part.sequence, 2);
1929        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1930
1931        // Validate primary key columns are NOT stored individually
1932        let schema = bulk_part.batch.schema();
1933        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1934        assert_eq!(
1935            field_names,
1936            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1937        );
1938    }
1939
1940    #[allow(clippy::too_many_arguments)]
1941    fn build_key_values_with_sparse_encoding(
1942        metadata: &RegionMetadataRef,
1943        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1944        table_id: u32,
1945        tsid: u64,
1946        k0: String,
1947        k1: String,
1948        timestamps: impl Iterator<Item = i64>,
1949        values: impl Iterator<Item = Option<f64>>,
1950        sequence: SequenceNumber,
1951    ) -> KeyValues {
1952        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1953        let pk_values = vec![
1954            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1955            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1956            (0, Value::String(k0.clone().into())),
1957            (1, Value::String(k1.clone().into())),
1958        ];
1959        let mut encoded_key = Vec::new();
1960        primary_key_codec
1961            .encode_values(&pk_values, &mut encoded_key)
1962            .unwrap();
1963        assert!(!encoded_key.is_empty());
1964
1965        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1966        let column_schema = vec![
1967            api::v1::ColumnSchema {
1968                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1969                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1970                    ConcreteDataType::binary_datatype(),
1971                )
1972                .unwrap()
1973                .datatype() as i32,
1974                semantic_type: api::v1::SemanticType::Tag as i32,
1975                ..Default::default()
1976            },
1977            api::v1::ColumnSchema {
1978                column_name: "ts".to_string(),
1979                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1980                    ConcreteDataType::timestamp_millisecond_datatype(),
1981                )
1982                .unwrap()
1983                .datatype() as i32,
1984                semantic_type: api::v1::SemanticType::Timestamp as i32,
1985                ..Default::default()
1986            },
1987            api::v1::ColumnSchema {
1988                column_name: "v0".to_string(),
1989                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1990                    ConcreteDataType::int64_datatype(),
1991                )
1992                .unwrap()
1993                .datatype() as i32,
1994                semantic_type: api::v1::SemanticType::Field as i32,
1995                ..Default::default()
1996            },
1997            api::v1::ColumnSchema {
1998                column_name: "v1".to_string(),
1999                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2000                    ConcreteDataType::float64_datatype(),
2001                )
2002                .unwrap()
2003                .datatype() as i32,
2004                semantic_type: api::v1::SemanticType::Field as i32,
2005                ..Default::default()
2006            },
2007        ];
2008
2009        let rows = timestamps
2010            .zip(values)
2011            .map(|(ts, v)| Row {
2012                values: vec![
2013                    api::v1::Value {
2014                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2015                            encoded_key.clone(),
2016                        )),
2017                    },
2018                    api::v1::Value {
2019                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2020                    },
2021                    api::v1::Value {
2022                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2023                    },
2024                    api::v1::Value {
2025                        value_data: v.map(api::v1::value::ValueData::F64Value),
2026                    },
2027                ],
2028            })
2029            .collect();
2030
2031        let mutation = api::v1::Mutation {
2032            op_type: 1,
2033            sequence,
2034            rows: Some(api::v1::Rows {
2035                schema: column_schema,
2036                rows,
2037            }),
2038            write_hint: Some(WriteHint {
2039                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2040            }),
2041        };
2042        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2043    }
2044
2045    #[test]
2046    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2047        use api::v1::SemanticType;
2048        use datatypes::schema::ColumnSchema;
2049        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2050        use store_api::storage::RegionId;
2051
2052        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2053        builder
2054            .push_column_metadata(ColumnMetadata {
2055                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2056                semantic_type: SemanticType::Tag,
2057                column_id: 0,
2058            })
2059            .push_column_metadata(ColumnMetadata {
2060                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2061                semantic_type: SemanticType::Tag,
2062                column_id: 1,
2063            })
2064            .push_column_metadata(ColumnMetadata {
2065                column_schema: ColumnSchema::new(
2066                    "ts",
2067                    ConcreteDataType::timestamp_millisecond_datatype(),
2068                    false,
2069                ),
2070                semantic_type: SemanticType::Timestamp,
2071                column_id: 2,
2072            })
2073            .push_column_metadata(ColumnMetadata {
2074                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2075                semantic_type: SemanticType::Field,
2076                column_id: 3,
2077            })
2078            .push_column_metadata(ColumnMetadata {
2079                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2080                semantic_type: SemanticType::Field,
2081                column_id: 4,
2082            })
2083            .primary_key(vec![0, 1])
2084            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2085        let metadata = Arc::new(builder.build().unwrap());
2086
2087        let primary_key_codec = build_primary_key_codec(&metadata);
2088        let schema = to_flat_sst_arrow_schema(
2089            &metadata,
2090            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2091        );
2092
2093        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2094        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2095
2096        let capacity = 100;
2097        let mut converter =
2098            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2099
2100        let key_values1 = build_key_values_with_sparse_encoding(
2101            &metadata,
2102            &primary_key_codec,
2103            2048u32, // table_id
2104            100u64,  // tsid
2105            "key11".to_string(),
2106            "key21".to_string(),
2107            vec![1000, 2000].into_iter(),
2108            vec![Some(1.0), Some(2.0)].into_iter(),
2109            1,
2110        );
2111
2112        let key_values2 = build_key_values_with_sparse_encoding(
2113            &metadata,
2114            &primary_key_codec,
2115            4096u32, // table_id
2116            200u64,  // tsid
2117            "key12".to_string(),
2118            "key22".to_string(),
2119            vec![1500].into_iter(),
2120            vec![Some(3.0)].into_iter(),
2121            2,
2122        );
2123
2124        converter.append_key_values(&key_values1).unwrap();
2125        converter.append_key_values(&key_values2).unwrap();
2126
2127        let bulk_part = converter.convert().unwrap();
2128
2129        assert_eq!(bulk_part.num_rows(), 3);
2130        assert_eq!(bulk_part.min_timestamp, 1000);
2131        assert_eq!(bulk_part.max_timestamp, 2000);
2132        assert_eq!(bulk_part.sequence, 2);
2133        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2134
2135        // For sparse encoding, primary key columns should NOT be stored individually
2136        // even when store_primary_key_columns is true, because sparse encoding
2137        // stores the encoded primary key in the __primary_key column
2138        let schema = bulk_part.batch.schema();
2139        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2140        assert_eq!(
2141            field_names,
2142            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2143        );
2144
2145        // Verify the __primary_key column contains encoded sparse keys
2146        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2147        let dict_array = primary_key_column
2148            .as_any()
2149            .downcast_ref::<DictionaryArray<UInt32Type>>()
2150            .unwrap();
2151
2152        // Should have non-zero entries indicating encoded primary keys
2153        assert!(!dict_array.is_empty());
2154        assert_eq!(dict_array.len(), 3); // 3 rows total
2155
2156        // Verify values are properly encoded binary data (not empty)
2157        let values = dict_array
2158            .values()
2159            .as_any()
2160            .downcast_ref::<BinaryArray>()
2161            .unwrap();
2162        for i in 0..values.len() {
2163            assert!(
2164                !values.value(i).is_empty(),
2165                "Encoded primary key should not be empty"
2166            );
2167        }
2168    }
2169}