mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64    InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80/// A raw bulk part in the memtable.
81#[derive(Clone)]
82pub struct BulkPart {
83    pub batch: RecordBatch,
84    pub max_timestamp: i64,
85    pub min_timestamp: i64,
86    pub sequence: u64,
87    pub timestamp_index: usize,
88    pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92    type Error = error::Error;
93
94    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95        match value.body.expect("Entry payload should be present") {
96            Body::ArrowIpc(ipc) => {
97                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                let batch = decoder
100                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101                    .context(error::ConvertBulkWalEntrySnafu)?;
102                Ok(Self {
103                    batch,
104                    max_timestamp: value.max_ts,
105                    min_timestamp: value.min_ts,
106                    sequence: value.sequence,
107                    timestamp_index: value.timestamp_index as usize,
108                    raw_data: Some(ipc),
109                })
110            }
111        }
112    }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116    type Error = error::Error;
117
118    fn try_from(value: &BulkPart) -> Result<Self> {
119        if let Some(ipc) = &value.raw_data {
120            Ok(BulkWalEntry {
121                sequence: value.sequence,
122                max_ts: value.max_timestamp,
123                min_ts: value.min_timestamp,
124                timestamp_index: value.timestamp_index as u32,
125                body: Some(Body::ArrowIpc(ipc.clone())),
126            })
127        } else {
128            let mut encoder = FlightEncoder::default();
129            let schema_bytes = encoder
130                .encode_schema(value.batch.schema().as_ref())
131                .data_header;
132            let [rb_data] = encoder
133                .encode(FlightMessage::RecordBatch(value.batch.clone()))
134                .try_into()
135                .map_err(|_| {
136                    error::UnsupportedOperationSnafu {
137                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138                    }
139                    .build()
140                })?;
141            Ok(BulkWalEntry {
142                sequence: value.sequence,
143                max_ts: value.max_timestamp,
144                min_ts: value.min_timestamp,
145                timestamp_index: value.timestamp_index as u32,
146                body: Some(Body::ArrowIpc(ArrowIpc {
147                    schema: schema_bytes,
148                    data_header: rb_data.data_header,
149                    payload: rb_data.data_body,
150                })),
151            })
152        }
153    }
154}
155
156impl BulkPart {
157    pub(crate) fn estimated_size(&self) -> usize {
158        record_batch_estimated_size(&self.batch)
159    }
160
161    /// Returns the estimated series count in this BulkPart.
162    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
163    pub fn estimated_series_count(&self) -> usize {
164        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165        let pk_column = self.batch.column(pk_column_idx);
166        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167            dict_array.values().len()
168        } else {
169            0
170        }
171    }
172
173    /// Fills missing columns in the BulkPart batch with default values.
174    ///
175    /// This function checks if the batch schema matches the region metadata schema,
176    /// and if there are missing columns, it fills them with default values (or null
177    /// for nullable columns).
178    ///
179    /// # Arguments
180    ///
181    /// * `region_metadata` - The region metadata containing the expected schema
182    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
183        // Builds a map of existing columns in the batch
184        let batch_schema = self.batch.schema();
185        let batch_columns: HashSet<_> = batch_schema
186            .fields()
187            .iter()
188            .map(|f| f.name().as_str())
189            .collect();
190
191        // Finds columns that need to be filled
192        let mut columns_to_fill = Vec::new();
193        for column_meta in &region_metadata.column_metadatas {
194            // TODO(yingwen): Returns error if it is impure default after we support filling
195            // bulk insert request in the frontend
196            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
197                columns_to_fill.push(column_meta);
198            }
199        }
200
201        if columns_to_fill.is_empty() {
202            return Ok(());
203        }
204
205        let num_rows = self.batch.num_rows();
206
207        let mut new_columns = Vec::new();
208        let mut new_fields = Vec::new();
209
210        // First, adds all existing columns
211        new_fields.extend(batch_schema.fields().iter().cloned());
212        new_columns.extend_from_slice(self.batch.columns());
213
214        let region_id = region_metadata.region_id;
215        // Then adds the missing columns with default values
216        for column_meta in columns_to_fill {
217            let default_vector = column_meta
218                .column_schema
219                .create_default_vector(num_rows)
220                .context(CreateDefaultSnafu {
221                    region_id,
222                    column: &column_meta.column_schema.name,
223                })?
224                .with_context(|| InvalidRequestSnafu {
225                    region_id,
226                    reason: format!(
227                        "column {} does not have default value",
228                        column_meta.column_schema.name
229                    ),
230                })?;
231            let arrow_array = default_vector.to_arrow_array();
232            column_meta.column_schema.data_type.as_arrow_type();
233
234            new_fields.push(Arc::new(Field::new(
235                column_meta.column_schema.name.clone(),
236                column_meta.column_schema.data_type.as_arrow_type(),
237                column_meta.column_schema.is_nullable(),
238            )));
239            new_columns.push(arrow_array);
240        }
241
242        // Create a new schema and batch with the filled columns
243        let new_schema = Arc::new(Schema::new(new_fields));
244        let new_batch =
245            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
246
247        // Update the batch
248        self.batch = new_batch;
249
250        Ok(())
251    }
252
253    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
254    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
255        let vectors = region_metadata
256            .schema
257            .column_schemas()
258            .iter()
259            .map(|col| match self.batch.column_by_name(&col.name) {
260                None => Ok(None),
261                Some(col) => Helper::try_into_vector(col).map(Some),
262            })
263            .collect::<datatypes::error::Result<Vec<_>>>()
264            .context(error::ComputeVectorSnafu)?;
265
266        let rows = (0..self.num_rows())
267            .map(|row_idx| {
268                let values = (0..self.batch.num_columns())
269                    .map(|col_idx| {
270                        if let Some(v) = &vectors[col_idx] {
271                            to_grpc_value(v.get(row_idx))
272                        } else {
273                            api::v1::Value { value_data: None }
274                        }
275                    })
276                    .collect::<Vec<_>>();
277                api::v1::Row { values }
278            })
279            .collect::<Vec<_>>();
280
281        let schema = region_metadata
282            .column_metadatas
283            .iter()
284            .map(|c| {
285                let data_type_wrapper =
286                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
287                Ok(api::v1::ColumnSchema {
288                    column_name: c.column_schema.name.clone(),
289                    datatype: data_type_wrapper.datatype() as i32,
290                    semantic_type: c.semantic_type as i32,
291                    ..Default::default()
292                })
293            })
294            .collect::<api::error::Result<Vec<_>>>()
295            .context(error::ConvertColumnDataTypeSnafu {
296                reason: "failed to convert region metadata to column schema",
297            })?;
298
299        let rows = api::v1::Rows { schema, rows };
300
301        Ok(Mutation {
302            op_type: OpType::Put as i32,
303            sequence: self.sequence,
304            rows: Some(rows),
305            write_hint: None,
306        })
307    }
308
309    pub fn timestamps(&self) -> &ArrayRef {
310        self.batch.column(self.timestamp_index)
311    }
312
313    pub fn num_rows(&self) -> usize {
314        self.batch.num_rows()
315    }
316}
317
318/// A collection of small unordered bulk parts.
319/// Used to batch small parts together before merging them into a sorted part.
320pub struct UnorderedPart {
321    /// Small bulk parts that haven't been sorted yet.
322    parts: Vec<BulkPart>,
323    /// Total number of rows across all parts.
324    total_rows: usize,
325    /// Minimum timestamp across all parts.
326    min_timestamp: i64,
327    /// Maximum timestamp across all parts.
328    max_timestamp: i64,
329    /// Maximum sequence number across all parts.
330    max_sequence: u64,
331    /// Row count threshold for accepting parts (default: 1024).
332    threshold: usize,
333    /// Row count threshold for compacting (default: 4096).
334    compact_threshold: usize,
335}
336
337impl Default for UnorderedPart {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343impl UnorderedPart {
344    /// Creates a new empty UnorderedPart.
345    pub fn new() -> Self {
346        Self {
347            parts: Vec::new(),
348            total_rows: 0,
349            min_timestamp: i64::MAX,
350            max_timestamp: i64::MIN,
351            max_sequence: 0,
352            threshold: 1024,
353            compact_threshold: 4096,
354        }
355    }
356
357    /// Sets the threshold for accepting parts into unordered_part.
358    pub fn set_threshold(&mut self, threshold: usize) {
359        self.threshold = threshold;
360    }
361
362    /// Sets the threshold for compacting unordered_part.
363    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
364        self.compact_threshold = compact_threshold;
365    }
366
367    /// Returns the threshold for accepting parts.
368    pub fn threshold(&self) -> usize {
369        self.threshold
370    }
371
372    /// Returns the compact threshold.
373    pub fn compact_threshold(&self) -> usize {
374        self.compact_threshold
375    }
376
377    /// Returns true if this part should accept the given row count.
378    pub fn should_accept(&self, num_rows: usize) -> bool {
379        num_rows < self.threshold
380    }
381
382    /// Returns true if this part should be compacted.
383    pub fn should_compact(&self) -> bool {
384        self.total_rows >= self.compact_threshold
385    }
386
387    /// Adds a BulkPart to this unordered collection.
388    pub fn push(&mut self, part: BulkPart) {
389        self.total_rows += part.num_rows();
390        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
391        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
392        self.max_sequence = self.max_sequence.max(part.sequence);
393        self.parts.push(part);
394    }
395
396    /// Returns the total number of rows across all parts.
397    pub fn num_rows(&self) -> usize {
398        self.total_rows
399    }
400
401    /// Returns true if there are no parts.
402    pub fn is_empty(&self) -> bool {
403        self.parts.is_empty()
404    }
405
406    /// Returns the number of parts in this collection.
407    pub fn num_parts(&self) -> usize {
408        self.parts.len()
409    }
410
411    /// Concatenates and sorts all parts into a single RecordBatch.
412    /// Returns None if the collection is empty.
413    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
414        if self.parts.is_empty() {
415            return Ok(None);
416        }
417
418        if self.parts.len() == 1 {
419            // If there's only one part, return its batch directly
420            return Ok(Some(self.parts[0].batch.clone()));
421        }
422
423        // Get the schema from the first part
424        let schema = self.parts[0].batch.schema();
425
426        // Concatenate all record batches
427        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
428        let concatenated =
429            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
430
431        // Sort the concatenated batch
432        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
433
434        Ok(Some(sorted_batch))
435    }
436
437    /// Converts all parts into a single sorted BulkPart.
438    /// Returns None if the collection is empty.
439    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
440        let Some(sorted_batch) = self.concat_and_sort()? else {
441            return Ok(None);
442        };
443
444        let timestamp_index = self.parts[0].timestamp_index;
445
446        Ok(Some(BulkPart {
447            batch: sorted_batch,
448            max_timestamp: self.max_timestamp,
449            min_timestamp: self.min_timestamp,
450            sequence: self.max_sequence,
451            timestamp_index,
452            raw_data: None,
453        }))
454    }
455
456    /// Clears all parts from this collection.
457    pub fn clear(&mut self) {
458        self.parts.clear();
459        self.total_rows = 0;
460        self.min_timestamp = i64::MAX;
461        self.max_timestamp = i64::MIN;
462        self.max_sequence = 0;
463    }
464}
465
466/// More accurate estimation of the size of a record batch.
467pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
468    batch
469        .columns()
470        .iter()
471        // If can not get slice memory size, assume 0 here.
472        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
473        .sum()
474}
475
476/// Primary key column builder for handling strings specially.
477enum PrimaryKeyColumnBuilder {
478    /// String dictionary builder for string types.
479    StringDict(StringDictionaryBuilder<UInt32Type>),
480    /// Generic mutable vector for other types.
481    Vector(Box<dyn MutableVector>),
482}
483
484impl PrimaryKeyColumnBuilder {
485    /// Appends a value to the builder.
486    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
487        match self {
488            PrimaryKeyColumnBuilder::StringDict(builder) => {
489                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
490                    // We know the value is a string.
491                    builder.append_value(s);
492                } else {
493                    builder.append_null();
494                }
495            }
496            PrimaryKeyColumnBuilder::Vector(builder) => {
497                builder.push_value_ref(&value);
498            }
499        }
500        Ok(())
501    }
502
503    /// Converts the builder to an ArrayRef.
504    fn into_arrow_array(self) -> ArrayRef {
505        match self {
506            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
507            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
508        }
509    }
510}
511
512/// Converter that converts structs into [BulkPart].
513pub struct BulkPartConverter {
514    /// Region metadata.
515    region_metadata: RegionMetadataRef,
516    /// Schema of the converted batch.
517    schema: SchemaRef,
518    /// Primary key codec for encoding keys
519    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
520    /// Buffer for encoding primary key.
521    key_buf: Vec<u8>,
522    /// Primary key array builder.
523    key_array_builder: PrimaryKeyArrayBuilder,
524    /// Builders for non-primary key columns.
525    value_builder: ValueBuilder,
526    /// Builders for individual primary key columns.
527    /// The order of builders is the same as the order of primary key columns in the region metadata.
528    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
529
530    /// Max timestamp value.
531    max_ts: i64,
532    /// Min timestamp value.
533    min_ts: i64,
534    /// Max sequence number.
535    max_sequence: SequenceNumber,
536}
537
538impl BulkPartConverter {
539    /// Creates a new converter.
540    ///
541    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
542    /// stores primary key columns in arrays additionally.
543    pub fn new(
544        region_metadata: &RegionMetadataRef,
545        schema: SchemaRef,
546        capacity: usize,
547        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
548        store_primary_key_columns: bool,
549    ) -> Self {
550        debug_assert_eq!(
551            region_metadata.primary_key_encoding,
552            primary_key_codec.encoding()
553        );
554
555        let primary_key_column_builders = if store_primary_key_columns
556            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
557        {
558            new_primary_key_column_builders(region_metadata, capacity)
559        } else {
560            Vec::new()
561        };
562
563        Self {
564            region_metadata: region_metadata.clone(),
565            schema,
566            primary_key_codec,
567            key_buf: Vec::new(),
568            key_array_builder: PrimaryKeyArrayBuilder::new(),
569            value_builder: ValueBuilder::new(region_metadata, capacity),
570            primary_key_column_builders,
571            min_ts: i64::MAX,
572            max_ts: i64::MIN,
573            max_sequence: SequenceNumber::MIN,
574        }
575    }
576
577    /// Appends a [KeyValues] into the converter.
578    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
579        for kv in key_values.iter() {
580            self.append_key_value(&kv)?;
581        }
582
583        Ok(())
584    }
585
586    /// Appends a [KeyValue] to builders.
587    ///
588    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
589    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
590        // Handles primary key based on encoding type
591        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
592            // For sparse encoding, the primary key is already encoded in the KeyValue
593            // Gets the first (and only) primary key value which contains the encoded key
594            let mut primary_keys = kv.primary_keys();
595            if let Some(encoded) = primary_keys
596                .next()
597                .context(ColumnNotFoundSnafu {
598                    column: PRIMARY_KEY_COLUMN_NAME,
599                })?
600                .try_into_binary()
601                .context(DataTypeMismatchSnafu)?
602            {
603                self.key_array_builder
604                    .append(encoded)
605                    .context(ComputeArrowSnafu)?;
606            } else {
607                self.key_array_builder
608                    .append("")
609                    .context(ComputeArrowSnafu)?;
610            }
611        } else {
612            // For dense encoding, we need to encode the primary key columns
613            self.key_buf.clear();
614            self.primary_key_codec
615                .encode_key_value(kv, &mut self.key_buf)
616                .context(EncodeSnafu)?;
617            self.key_array_builder
618                .append(&self.key_buf)
619                .context(ComputeArrowSnafu)?;
620        };
621
622        // If storing primary key columns, append values to individual builders
623        if !self.primary_key_column_builders.is_empty() {
624            for (builder, pk_value) in self
625                .primary_key_column_builders
626                .iter_mut()
627                .zip(kv.primary_keys())
628            {
629                builder.push_value_ref(pk_value)?;
630            }
631        }
632
633        // Pushes other columns.
634        self.value_builder.push(
635            kv.timestamp(),
636            kv.sequence(),
637            kv.op_type() as u8,
638            kv.fields(),
639        );
640
641        // Updates statistics
642        // Safety: timestamp of kv must be both present and a valid timestamp value.
643        let ts = kv
644            .timestamp()
645            .try_into_timestamp()
646            .unwrap()
647            .unwrap()
648            .value();
649        self.min_ts = self.min_ts.min(ts);
650        self.max_ts = self.max_ts.max(ts);
651        self.max_sequence = self.max_sequence.max(kv.sequence());
652
653        Ok(())
654    }
655
656    /// Converts buffered content into a [BulkPart].
657    ///
658    /// It sorts the record batch by (primary key, timestamp, sequence desc).
659    pub fn convert(mut self) -> Result<BulkPart> {
660        let values = Values::from(self.value_builder);
661        let mut columns =
662            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
663
664        // Build primary key column arrays if enabled.
665        for builder in self.primary_key_column_builders {
666            columns.push(builder.into_arrow_array());
667        }
668        // Then fields columns.
669        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
670        // Time index.
671        let timestamp_index = columns.len();
672        columns.push(values.timestamp.to_arrow_array());
673        // Primary key.
674        let pk_array = self.key_array_builder.finish();
675        columns.push(Arc::new(pk_array));
676        // Sequence and op type.
677        columns.push(values.sequence.to_arrow_array());
678        columns.push(values.op_type.to_arrow_array());
679
680        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
681        // Sorts the record batch.
682        let batch = sort_primary_key_record_batch(&batch)?;
683
684        Ok(BulkPart {
685            batch,
686            max_timestamp: self.max_ts,
687            min_timestamp: self.min_ts,
688            sequence: self.max_sequence,
689            timestamp_index,
690            raw_data: None,
691        })
692    }
693}
694
695fn new_primary_key_column_builders(
696    metadata: &RegionMetadata,
697    capacity: usize,
698) -> Vec<PrimaryKeyColumnBuilder> {
699    metadata
700        .primary_key_columns()
701        .map(|col| {
702            if col.column_schema.data_type.is_string() {
703                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
704                    capacity,
705                    INIT_DICT_VALUE_CAPACITY,
706                    capacity,
707                ))
708            } else {
709                PrimaryKeyColumnBuilder::Vector(
710                    col.column_schema.data_type.create_mutable_vector(capacity),
711                )
712            }
713        })
714        .collect()
715}
716
717/// Sorts the record batch with primary key format.
718pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
719    let total_columns = batch.num_columns();
720    let sort_columns = vec![
721        // Primary key column (ascending)
722        SortColumn {
723            values: batch.column(total_columns - 3).clone(),
724            options: Some(SortOptions {
725                descending: false,
726                nulls_first: true,
727            }),
728        },
729        // Time index column (ascending)
730        SortColumn {
731            values: batch.column(total_columns - 4).clone(),
732            options: Some(SortOptions {
733                descending: false,
734                nulls_first: true,
735            }),
736        },
737        // Sequence column (descending)
738        SortColumn {
739            values: batch.column(total_columns - 2).clone(),
740            options: Some(SortOptions {
741                descending: true,
742                nulls_first: true,
743            }),
744        },
745    ];
746
747    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
748        .context(ComputeArrowSnafu)?;
749
750    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
751}
752
753/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
754/// with the same format as produced by [BulkPartConverter].
755///
756/// This function takes a `BulkPart` where:
757/// - For dense encoding: Primary key columns may be stored as individual columns
758/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
759/// - The batch may not be sorted
760///
761/// And produces a `BulkPart` where:
762/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
763/// - An encoded `__primary_key` dictionary column is present
764/// - The batch is sorted by (primary_key, timestamp, sequence desc)
765///
766/// # Arguments
767///
768/// * `part` - The input `BulkPart` to convert
769/// * `region_metadata` - Region metadata containing schema information
770/// * `primary_key_codec` - Codec for encoding primary keys
771/// * `schema` - Target schema for the output batch
772/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
773///
774/// # Returns
775///
776/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
777/// encoded primary keys and sorted data.
778pub fn convert_bulk_part(
779    part: BulkPart,
780    region_metadata: &RegionMetadataRef,
781    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
782    schema: SchemaRef,
783    store_primary_key_columns: bool,
784) -> Result<Option<BulkPart>> {
785    if part.num_rows() == 0 {
786        return Ok(None);
787    }
788
789    let num_rows = part.num_rows();
790    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
791
792    // Builds a column name-to-index map for efficient lookups
793    let input_schema = part.batch.schema();
794    let column_indices: HashMap<&str, usize> = input_schema
795        .fields()
796        .iter()
797        .enumerate()
798        .map(|(idx, field)| (field.name().as_str(), idx))
799        .collect();
800
801    // Determines the structure of the input batch by looking up columns by name
802    let mut output_columns = Vec::new();
803
804    // Extracts primary key columns if we need to encode them (dense encoding)
805    let pk_array = if is_sparse {
806        // For sparse encoding, the input should already have the __primary_key column
807        // We need to find it in the input batch
808        None
809    } else {
810        // For dense encoding, extract and encode primary key columns by name
811        let pk_vectors: Result<Vec<_>> = region_metadata
812            .primary_key_columns()
813            .map(|col_meta| {
814                let col_idx = column_indices
815                    .get(col_meta.column_schema.name.as_str())
816                    .context(ColumnNotFoundSnafu {
817                        column: &col_meta.column_schema.name,
818                    })?;
819                let col = part.batch.column(*col_idx);
820                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
821            })
822            .collect();
823        let pk_vectors = pk_vectors?;
824
825        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
826        let mut encode_buf = Vec::new();
827
828        for row_idx in 0..num_rows {
829            encode_buf.clear();
830
831            // Collects primary key values with column IDs for this row
832            let pk_values_with_ids: Vec<_> = region_metadata
833                .primary_key
834                .iter()
835                .zip(pk_vectors.iter())
836                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
837                .collect();
838
839            // Encodes the primary key
840            primary_key_codec
841                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
842                .context(EncodeSnafu)?;
843
844            key_array_builder
845                .append(&encode_buf)
846                .context(ComputeArrowSnafu)?;
847        }
848
849        Some(key_array_builder.finish())
850    };
851
852    // Adds primary key columns if storing them (only for dense encoding)
853    if store_primary_key_columns && !is_sparse {
854        for col_meta in region_metadata.primary_key_columns() {
855            let col_idx = column_indices
856                .get(col_meta.column_schema.name.as_str())
857                .context(ColumnNotFoundSnafu {
858                    column: &col_meta.column_schema.name,
859                })?;
860            let col = part.batch.column(*col_idx);
861
862            // Converts to dictionary if needed for string types
863            let col = if col_meta.column_schema.data_type.is_string() {
864                let target_type = ArrowDataType::Dictionary(
865                    Box::new(ArrowDataType::UInt32),
866                    Box::new(ArrowDataType::Utf8),
867                );
868                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
869            } else {
870                col.clone()
871            };
872            output_columns.push(col);
873        }
874    }
875
876    // Adds field columns
877    for col_meta in region_metadata.field_columns() {
878        let col_idx = column_indices
879            .get(col_meta.column_schema.name.as_str())
880            .context(ColumnNotFoundSnafu {
881                column: &col_meta.column_schema.name,
882            })?;
883        output_columns.push(part.batch.column(*col_idx).clone());
884    }
885
886    // Adds timestamp column
887    let new_timestamp_index = output_columns.len();
888    let ts_col_idx = column_indices
889        .get(
890            region_metadata
891                .time_index_column()
892                .column_schema
893                .name
894                .as_str(),
895        )
896        .context(ColumnNotFoundSnafu {
897            column: &region_metadata.time_index_column().column_schema.name,
898        })?;
899    output_columns.push(part.batch.column(*ts_col_idx).clone());
900
901    // Adds encoded primary key dictionary column
902    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
903        Arc::new(pk_dict_array) as ArrayRef
904    } else {
905        let pk_col_idx =
906            column_indices
907                .get(PRIMARY_KEY_COLUMN_NAME)
908                .context(ColumnNotFoundSnafu {
909                    column: PRIMARY_KEY_COLUMN_NAME,
910                })?;
911        let col = part.batch.column(*pk_col_idx);
912
913        // Casts to dictionary type if needed
914        let target_type = ArrowDataType::Dictionary(
915            Box::new(ArrowDataType::UInt32),
916            Box::new(ArrowDataType::Binary),
917        );
918        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
919    };
920    output_columns.push(pk_dictionary);
921
922    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
923    output_columns.push(Arc::new(sequence_array) as ArrayRef);
924
925    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
926    output_columns.push(Arc::new(op_type_array) as ArrayRef);
927
928    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
929
930    // Sorts the batch by (primary_key, timestamp, sequence desc)
931    let sorted_batch = sort_primary_key_record_batch(&batch)?;
932
933    Ok(Some(BulkPart {
934        batch: sorted_batch,
935        max_timestamp: part.max_timestamp,
936        min_timestamp: part.min_timestamp,
937        sequence: part.sequence,
938        timestamp_index: new_timestamp_index,
939        raw_data: None,
940    }))
941}
942
943#[derive(Debug, Clone)]
944pub struct EncodedBulkPart {
945    data: Bytes,
946    metadata: BulkPartMeta,
947}
948
949impl EncodedBulkPart {
950    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
951        Self { data, metadata }
952    }
953
954    pub(crate) fn metadata(&self) -> &BulkPartMeta {
955        &self.metadata
956    }
957
958    /// Returns the size of the encoded data in bytes
959    pub(crate) fn size_bytes(&self) -> usize {
960        self.data.len()
961    }
962
963    /// Returns the encoded data.
964    pub(crate) fn data(&self) -> &Bytes {
965        &self.data
966    }
967
968    /// Converts this `EncodedBulkPart` to `SstInfo`.
969    ///
970    /// # Arguments
971    /// * `file_id` - The SST file ID to assign to this part
972    ///
973    /// # Returns
974    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
975    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
976        let unit = self.metadata.region_metadata.time_index_type().unit();
977        let max_row_group_uncompressed_size: u64 = self
978            .metadata
979            .parquet_metadata
980            .row_groups()
981            .iter()
982            .map(|rg| {
983                rg.columns()
984                    .iter()
985                    .map(|c| c.uncompressed_size() as u64)
986                    .sum::<u64>()
987            })
988            .max()
989            .unwrap_or(0);
990        SstInfo {
991            file_id,
992            time_range: (
993                Timestamp::new(self.metadata.min_timestamp, unit),
994                Timestamp::new(self.metadata.max_timestamp, unit),
995            ),
996            file_size: self.data.len() as u64,
997            max_row_group_uncompressed_size,
998            num_rows: self.metadata.num_rows,
999            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1000            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1001            index_metadata: IndexOutput::default(),
1002            num_series: self.metadata.num_series,
1003        }
1004    }
1005
1006    pub(crate) fn read(
1007        &self,
1008        context: BulkIterContextRef,
1009        sequence: Option<SequenceRange>,
1010        mem_scan_metrics: Option<MemScanMetrics>,
1011    ) -> Result<Option<BoxedRecordBatchIterator>> {
1012        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
1013        let skip_fields_for_pruning =
1014            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1015
1016        // use predicate to find row groups to read.
1017        let row_groups_to_read =
1018            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1019
1020        if row_groups_to_read.is_empty() {
1021            // All row groups are filtered.
1022            return Ok(None);
1023        }
1024
1025        let iter = EncodedBulkPartIter::try_new(
1026            self,
1027            context,
1028            row_groups_to_read,
1029            sequence,
1030            mem_scan_metrics,
1031        )?;
1032        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1033    }
1034
1035    /// Computes whether to skip field columns based on PreFilterMode.
1036    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1037        match pre_filter_mode {
1038            PreFilterMode::All => false,
1039            PreFilterMode::SkipFields => true,
1040            PreFilterMode::SkipFieldsOnDelete => {
1041                // Check if any row group contains delete op
1042                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1043                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1044                })
1045            }
1046        }
1047    }
1048}
1049
1050#[derive(Debug, Clone)]
1051pub struct BulkPartMeta {
1052    /// Total rows in part.
1053    pub num_rows: usize,
1054    /// Max timestamp in part.
1055    pub max_timestamp: i64,
1056    /// Min timestamp in part.
1057    pub min_timestamp: i64,
1058    /// Part file metadata.
1059    pub parquet_metadata: Arc<ParquetMetaData>,
1060    /// Part region schema.
1061    pub region_metadata: RegionMetadataRef,
1062    /// Number of series.
1063    pub num_series: u64,
1064}
1065
1066/// Metrics for encoding a part.
1067#[derive(Default, Debug)]
1068pub struct BulkPartEncodeMetrics {
1069    /// Cost of iterating over the data.
1070    pub iter_cost: Duration,
1071    /// Cost of writing the data.
1072    pub write_cost: Duration,
1073    /// Size of data before encoding.
1074    pub raw_size: usize,
1075    /// Size of data after encoding.
1076    pub encoded_size: usize,
1077    /// Number of rows in part.
1078    pub num_rows: usize,
1079}
1080
1081pub struct BulkPartEncoder {
1082    metadata: RegionMetadataRef,
1083    row_group_size: usize,
1084    writer_props: Option<WriterProperties>,
1085}
1086
1087impl BulkPartEncoder {
1088    pub(crate) fn new(
1089        metadata: RegionMetadataRef,
1090        row_group_size: usize,
1091    ) -> Result<BulkPartEncoder> {
1092        // TODO(yingwen): Skip arrow schema if needed.
1093        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1094        let key_value_meta =
1095            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1096
1097        // TODO(yingwen): Do we need compression?
1098        let writer_props = Some(
1099            WriterProperties::builder()
1100                .set_key_value_metadata(Some(vec![key_value_meta]))
1101                .set_write_batch_size(row_group_size)
1102                .set_max_row_group_size(row_group_size)
1103                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1104                .set_column_index_truncate_length(None)
1105                .set_statistics_truncate_length(None)
1106                .build(),
1107        );
1108
1109        Ok(Self {
1110            metadata,
1111            row_group_size,
1112            writer_props,
1113        })
1114    }
1115}
1116
1117impl BulkPartEncoder {
1118    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1119    pub fn encode_record_batch_iter(
1120        &self,
1121        iter: BoxedRecordBatchIterator,
1122        arrow_schema: SchemaRef,
1123        min_timestamp: i64,
1124        max_timestamp: i64,
1125        metrics: &mut BulkPartEncodeMetrics,
1126    ) -> Result<Option<EncodedBulkPart>> {
1127        let mut buf = Vec::with_capacity(4096);
1128        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1129            .context(EncodeMemtableSnafu)?;
1130        let mut total_rows = 0;
1131        let mut series_estimator = SeriesEstimator::default();
1132
1133        // Process each batch from the iterator
1134        let mut iter_start = Instant::now();
1135        for batch_result in iter {
1136            metrics.iter_cost += iter_start.elapsed();
1137            let batch = batch_result?;
1138            if batch.num_rows() == 0 {
1139                continue;
1140            }
1141
1142            series_estimator.update_flat(&batch);
1143            metrics.raw_size += record_batch_estimated_size(&batch);
1144            let write_start = Instant::now();
1145            writer.write(&batch).context(EncodeMemtableSnafu)?;
1146            metrics.write_cost += write_start.elapsed();
1147            total_rows += batch.num_rows();
1148            iter_start = Instant::now();
1149        }
1150        metrics.iter_cost += iter_start.elapsed();
1151        iter_start = Instant::now();
1152
1153        if total_rows == 0 {
1154            return Ok(None);
1155        }
1156
1157        let close_start = Instant::now();
1158        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1159        metrics.write_cost += close_start.elapsed();
1160        metrics.encoded_size += buf.len();
1161        metrics.num_rows += total_rows;
1162
1163        let buf = Bytes::from(buf);
1164        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1165        let num_series = series_estimator.finish();
1166
1167        Ok(Some(EncodedBulkPart {
1168            data: buf,
1169            metadata: BulkPartMeta {
1170                num_rows: total_rows,
1171                max_timestamp,
1172                min_timestamp,
1173                parquet_metadata,
1174                region_metadata: self.metadata.clone(),
1175                num_series,
1176            },
1177        }))
1178    }
1179
1180    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1181    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1182        if part.batch.num_rows() == 0 {
1183            return Ok(None);
1184        }
1185
1186        let mut buf = Vec::with_capacity(4096);
1187        let arrow_schema = part.batch.schema();
1188
1189        let file_metadata = {
1190            let mut writer =
1191                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1192                    .context(EncodeMemtableSnafu)?;
1193            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1194            writer.finish().context(EncodeMemtableSnafu)?
1195        };
1196
1197        let buf = Bytes::from(buf);
1198        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1199
1200        Ok(Some(EncodedBulkPart {
1201            data: buf,
1202            metadata: BulkPartMeta {
1203                num_rows: part.batch.num_rows(),
1204                max_timestamp: part.max_timestamp,
1205                min_timestamp: part.min_timestamp,
1206                parquet_metadata,
1207                region_metadata: self.metadata.clone(),
1208                num_series: part.estimated_series_count() as u64,
1209            },
1210        }))
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use api::v1::{Row, SemanticType, WriteHint};
1217    use datafusion_common::ScalarValue;
1218    use datatypes::arrow::array::Float64Array;
1219    use datatypes::prelude::{ConcreteDataType, Value};
1220    use datatypes::schema::ColumnSchema;
1221    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1222    use store_api::storage::RegionId;
1223    use store_api::storage::consts::ReservedColumnId;
1224
1225    use super::*;
1226    use crate::memtable::bulk::context::BulkIterContext;
1227    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1228    use crate::test_util::memtable_util::{
1229        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1230    };
1231
1232    struct MutationInput<'a> {
1233        k0: &'a str,
1234        k1: u32,
1235        timestamps: &'a [i64],
1236        v1: &'a [Option<f64>],
1237        sequence: u64,
1238    }
1239
1240    #[derive(Debug, PartialOrd, PartialEq)]
1241    struct BatchOutput<'a> {
1242        pk_values: &'a [Value],
1243        timestamps: &'a [i64],
1244        v1: &'a [Option<f64>],
1245    }
1246
1247    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1248        let metadata = metadata_for_test();
1249        let kvs = input
1250            .iter()
1251            .map(|m| {
1252                build_key_values_with_ts_seq_values(
1253                    &metadata,
1254                    m.k0.to_string(),
1255                    m.k1,
1256                    m.timestamps.iter().copied(),
1257                    m.v1.iter().copied(),
1258                    m.sequence,
1259                )
1260            })
1261            .collect::<Vec<_>>();
1262        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1263        let primary_key_codec = build_primary_key_codec(&metadata);
1264        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1265        for kv in kvs {
1266            converter.append_key_values(&kv).unwrap();
1267        }
1268        let part = converter.convert().unwrap();
1269        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1270        encoder.encode_part(&part).unwrap().unwrap()
1271    }
1272
1273    #[test]
1274    fn test_write_and_read_part_projection() {
1275        let part = encode(&[
1276            MutationInput {
1277                k0: "a",
1278                k1: 0,
1279                timestamps: &[1],
1280                v1: &[Some(0.1)],
1281                sequence: 0,
1282            },
1283            MutationInput {
1284                k0: "b",
1285                k1: 0,
1286                timestamps: &[1],
1287                v1: &[Some(0.0)],
1288                sequence: 0,
1289            },
1290            MutationInput {
1291                k0: "a",
1292                k1: 0,
1293                timestamps: &[2],
1294                v1: &[Some(0.2)],
1295                sequence: 1,
1296            },
1297        ]);
1298
1299        let projection = &[4u32];
1300        let mut reader = part
1301            .read(
1302                Arc::new(
1303                    BulkIterContext::new(
1304                        part.metadata.region_metadata.clone(),
1305                        Some(projection.as_slice()),
1306                        None,
1307                        false,
1308                    )
1309                    .unwrap(),
1310                ),
1311                None,
1312                None,
1313            )
1314            .unwrap()
1315            .expect("expect at least one row group");
1316
1317        let mut total_rows_read = 0;
1318        let mut field: Vec<f64> = vec![];
1319        for res in reader {
1320            let batch = res.unwrap();
1321            assert_eq!(5, batch.num_columns());
1322            field.extend_from_slice(
1323                batch
1324                    .column(0)
1325                    .as_any()
1326                    .downcast_ref::<Float64Array>()
1327                    .unwrap()
1328                    .values(),
1329            );
1330            total_rows_read += batch.num_rows();
1331        }
1332        assert_eq!(3, total_rows_read);
1333        assert_eq!(vec![0.1, 0.2, 0.0], field);
1334    }
1335
1336    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1337        let metadata = metadata_for_test();
1338        let kvs = key_values
1339            .into_iter()
1340            .map(|(k0, k1, (start, end), sequence)| {
1341                let ts = (start..end);
1342                let v1 = (start..end).map(|_| None);
1343                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1344            })
1345            .collect::<Vec<_>>();
1346        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1347        let primary_key_codec = build_primary_key_codec(&metadata);
1348        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1349        for kv in kvs {
1350            converter.append_key_values(&kv).unwrap();
1351        }
1352        let part = converter.convert().unwrap();
1353        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1354        encoder.encode_part(&part).unwrap().unwrap()
1355    }
1356
1357    fn check_prune_row_group(
1358        part: &EncodedBulkPart,
1359        predicate: Option<Predicate>,
1360        expected_rows: usize,
1361    ) {
1362        let context = Arc::new(
1363            BulkIterContext::new(
1364                part.metadata.region_metadata.clone(),
1365                None,
1366                predicate,
1367                false,
1368            )
1369            .unwrap(),
1370        );
1371        let mut reader = part
1372            .read(context, None, None)
1373            .unwrap()
1374            .expect("expect at least one row group");
1375        let mut total_rows_read = 0;
1376        for res in reader {
1377            let batch = res.unwrap();
1378            total_rows_read += batch.num_rows();
1379        }
1380        // Should only read row group 1.
1381        assert_eq!(expected_rows, total_rows_read);
1382    }
1383
1384    #[test]
1385    fn test_prune_row_groups() {
1386        let part = prepare(vec![
1387            ("a", 0, (0, 40), 1),
1388            ("a", 1, (0, 60), 1),
1389            ("b", 0, (0, 100), 2),
1390            ("b", 1, (100, 180), 3),
1391            ("b", 1, (180, 210), 4),
1392        ]);
1393
1394        let context = Arc::new(
1395            BulkIterContext::new(
1396                part.metadata.region_metadata.clone(),
1397                None,
1398                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1399                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1400                )])),
1401                false,
1402            )
1403            .unwrap(),
1404        );
1405        assert!(part.read(context, None, None).unwrap().is_none());
1406
1407        check_prune_row_group(&part, None, 310);
1408
1409        check_prune_row_group(
1410            &part,
1411            Some(Predicate::new(vec![
1412                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1413                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1414            ])),
1415            40,
1416        );
1417
1418        check_prune_row_group(
1419            &part,
1420            Some(Predicate::new(vec![
1421                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1422                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1423            ])),
1424            60,
1425        );
1426
1427        check_prune_row_group(
1428            &part,
1429            Some(Predicate::new(vec![
1430                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1431            ])),
1432            100,
1433        );
1434
1435        check_prune_row_group(
1436            &part,
1437            Some(Predicate::new(vec![
1438                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1439                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1440            ])),
1441            100,
1442        );
1443
1444        /// Predicates over field column can do precise filtering.
1445        check_prune_row_group(
1446            &part,
1447            Some(Predicate::new(vec![
1448                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1449            ])),
1450            1,
1451        );
1452    }
1453
1454    #[test]
1455    fn test_bulk_part_converter_append_and_convert() {
1456        let metadata = metadata_for_test();
1457        let capacity = 100;
1458        let primary_key_codec = build_primary_key_codec(&metadata);
1459        let schema = to_flat_sst_arrow_schema(
1460            &metadata,
1461            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1462        );
1463
1464        let mut converter =
1465            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1466
1467        let key_values1 = build_key_values_with_ts_seq_values(
1468            &metadata,
1469            "key1".to_string(),
1470            1u32,
1471            vec![1000, 2000].into_iter(),
1472            vec![Some(1.0), Some(2.0)].into_iter(),
1473            1,
1474        );
1475
1476        let key_values2 = build_key_values_with_ts_seq_values(
1477            &metadata,
1478            "key2".to_string(),
1479            2u32,
1480            vec![1500].into_iter(),
1481            vec![Some(3.0)].into_iter(),
1482            2,
1483        );
1484
1485        converter.append_key_values(&key_values1).unwrap();
1486        converter.append_key_values(&key_values2).unwrap();
1487
1488        let bulk_part = converter.convert().unwrap();
1489
1490        assert_eq!(bulk_part.num_rows(), 3);
1491        assert_eq!(bulk_part.min_timestamp, 1000);
1492        assert_eq!(bulk_part.max_timestamp, 2000);
1493        assert_eq!(bulk_part.sequence, 2);
1494        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1495
1496        // Validate primary key columns are stored
1497        // Schema should include primary key columns k0 and k1 at the beginning
1498        let schema = bulk_part.batch.schema();
1499        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1500        assert_eq!(
1501            field_names,
1502            vec![
1503                "k0",
1504                "k1",
1505                "v0",
1506                "v1",
1507                "ts",
1508                "__primary_key",
1509                "__sequence",
1510                "__op_type"
1511            ]
1512        );
1513    }
1514
1515    #[test]
1516    fn test_bulk_part_converter_sorting() {
1517        let metadata = metadata_for_test();
1518        let capacity = 100;
1519        let primary_key_codec = build_primary_key_codec(&metadata);
1520        let schema = to_flat_sst_arrow_schema(
1521            &metadata,
1522            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1523        );
1524
1525        let mut converter =
1526            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1527
1528        let key_values1 = build_key_values_with_ts_seq_values(
1529            &metadata,
1530            "z_key".to_string(),
1531            3u32,
1532            vec![3000].into_iter(),
1533            vec![Some(3.0)].into_iter(),
1534            3,
1535        );
1536
1537        let key_values2 = build_key_values_with_ts_seq_values(
1538            &metadata,
1539            "a_key".to_string(),
1540            1u32,
1541            vec![1000].into_iter(),
1542            vec![Some(1.0)].into_iter(),
1543            1,
1544        );
1545
1546        let key_values3 = build_key_values_with_ts_seq_values(
1547            &metadata,
1548            "m_key".to_string(),
1549            2u32,
1550            vec![2000].into_iter(),
1551            vec![Some(2.0)].into_iter(),
1552            2,
1553        );
1554
1555        converter.append_key_values(&key_values1).unwrap();
1556        converter.append_key_values(&key_values2).unwrap();
1557        converter.append_key_values(&key_values3).unwrap();
1558
1559        let bulk_part = converter.convert().unwrap();
1560
1561        assert_eq!(bulk_part.num_rows(), 3);
1562
1563        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1564        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1565
1566        let ts_array = ts_column
1567            .as_any()
1568            .downcast_ref::<TimestampMillisecondArray>()
1569            .unwrap();
1570        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1571
1572        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1573        assert_eq!(seq_array.values(), &[1, 2, 3]);
1574
1575        // Validate primary key columns are stored
1576        let schema = bulk_part.batch.schema();
1577        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1578        assert_eq!(
1579            field_names,
1580            vec![
1581                "k0",
1582                "k1",
1583                "v0",
1584                "v1",
1585                "ts",
1586                "__primary_key",
1587                "__sequence",
1588                "__op_type"
1589            ]
1590        );
1591    }
1592
1593    #[test]
1594    fn test_bulk_part_converter_empty() {
1595        let metadata = metadata_for_test();
1596        let capacity = 10;
1597        let primary_key_codec = build_primary_key_codec(&metadata);
1598        let schema = to_flat_sst_arrow_schema(
1599            &metadata,
1600            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1601        );
1602
1603        let converter =
1604            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1605
1606        let bulk_part = converter.convert().unwrap();
1607
1608        assert_eq!(bulk_part.num_rows(), 0);
1609        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1610        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1611        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1612
1613        // Validate primary key columns are present in schema even for empty batch
1614        let schema = bulk_part.batch.schema();
1615        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1616        assert_eq!(
1617            field_names,
1618            vec![
1619                "k0",
1620                "k1",
1621                "v0",
1622                "v1",
1623                "ts",
1624                "__primary_key",
1625                "__sequence",
1626                "__op_type"
1627            ]
1628        );
1629    }
1630
1631    #[test]
1632    fn test_bulk_part_converter_without_primary_key_columns() {
1633        let metadata = metadata_for_test();
1634        let primary_key_codec = build_primary_key_codec(&metadata);
1635        let schema = to_flat_sst_arrow_schema(
1636            &metadata,
1637            &FlatSchemaOptions {
1638                raw_pk_columns: false,
1639                string_pk_use_dict: true,
1640            },
1641        );
1642
1643        let capacity = 100;
1644        let mut converter =
1645            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1646
1647        let key_values1 = build_key_values_with_ts_seq_values(
1648            &metadata,
1649            "key1".to_string(),
1650            1u32,
1651            vec![1000, 2000].into_iter(),
1652            vec![Some(1.0), Some(2.0)].into_iter(),
1653            1,
1654        );
1655
1656        let key_values2 = build_key_values_with_ts_seq_values(
1657            &metadata,
1658            "key2".to_string(),
1659            2u32,
1660            vec![1500].into_iter(),
1661            vec![Some(3.0)].into_iter(),
1662            2,
1663        );
1664
1665        converter.append_key_values(&key_values1).unwrap();
1666        converter.append_key_values(&key_values2).unwrap();
1667
1668        let bulk_part = converter.convert().unwrap();
1669
1670        assert_eq!(bulk_part.num_rows(), 3);
1671        assert_eq!(bulk_part.min_timestamp, 1000);
1672        assert_eq!(bulk_part.max_timestamp, 2000);
1673        assert_eq!(bulk_part.sequence, 2);
1674        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1675
1676        // Validate primary key columns are NOT stored individually
1677        let schema = bulk_part.batch.schema();
1678        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1679        assert_eq!(
1680            field_names,
1681            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1682        );
1683    }
1684
1685    #[allow(clippy::too_many_arguments)]
1686    fn build_key_values_with_sparse_encoding(
1687        metadata: &RegionMetadataRef,
1688        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1689        table_id: u32,
1690        tsid: u64,
1691        k0: String,
1692        k1: String,
1693        timestamps: impl Iterator<Item = i64>,
1694        values: impl Iterator<Item = Option<f64>>,
1695        sequence: SequenceNumber,
1696    ) -> KeyValues {
1697        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1698        let pk_values = vec![
1699            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1700            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1701            (0, Value::String(k0.clone().into())),
1702            (1, Value::String(k1.clone().into())),
1703        ];
1704        let mut encoded_key = Vec::new();
1705        primary_key_codec
1706            .encode_values(&pk_values, &mut encoded_key)
1707            .unwrap();
1708        assert!(!encoded_key.is_empty());
1709
1710        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1711        let column_schema = vec![
1712            api::v1::ColumnSchema {
1713                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1714                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1715                    ConcreteDataType::binary_datatype(),
1716                )
1717                .unwrap()
1718                .datatype() as i32,
1719                semantic_type: api::v1::SemanticType::Tag as i32,
1720                ..Default::default()
1721            },
1722            api::v1::ColumnSchema {
1723                column_name: "ts".to_string(),
1724                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1725                    ConcreteDataType::timestamp_millisecond_datatype(),
1726                )
1727                .unwrap()
1728                .datatype() as i32,
1729                semantic_type: api::v1::SemanticType::Timestamp as i32,
1730                ..Default::default()
1731            },
1732            api::v1::ColumnSchema {
1733                column_name: "v0".to_string(),
1734                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1735                    ConcreteDataType::int64_datatype(),
1736                )
1737                .unwrap()
1738                .datatype() as i32,
1739                semantic_type: api::v1::SemanticType::Field as i32,
1740                ..Default::default()
1741            },
1742            api::v1::ColumnSchema {
1743                column_name: "v1".to_string(),
1744                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1745                    ConcreteDataType::float64_datatype(),
1746                )
1747                .unwrap()
1748                .datatype() as i32,
1749                semantic_type: api::v1::SemanticType::Field as i32,
1750                ..Default::default()
1751            },
1752        ];
1753
1754        let rows = timestamps
1755            .zip(values)
1756            .map(|(ts, v)| Row {
1757                values: vec![
1758                    api::v1::Value {
1759                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1760                            encoded_key.clone(),
1761                        )),
1762                    },
1763                    api::v1::Value {
1764                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1765                    },
1766                    api::v1::Value {
1767                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1768                    },
1769                    api::v1::Value {
1770                        value_data: v.map(api::v1::value::ValueData::F64Value),
1771                    },
1772                ],
1773            })
1774            .collect();
1775
1776        let mutation = api::v1::Mutation {
1777            op_type: 1,
1778            sequence,
1779            rows: Some(api::v1::Rows {
1780                schema: column_schema,
1781                rows,
1782            }),
1783            write_hint: Some(WriteHint {
1784                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1785            }),
1786        };
1787        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1788    }
1789
1790    #[test]
1791    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1792        use api::v1::SemanticType;
1793        use datatypes::schema::ColumnSchema;
1794        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1795        use store_api::storage::RegionId;
1796
1797        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1798        builder
1799            .push_column_metadata(ColumnMetadata {
1800                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1801                semantic_type: SemanticType::Tag,
1802                column_id: 0,
1803            })
1804            .push_column_metadata(ColumnMetadata {
1805                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1806                semantic_type: SemanticType::Tag,
1807                column_id: 1,
1808            })
1809            .push_column_metadata(ColumnMetadata {
1810                column_schema: ColumnSchema::new(
1811                    "ts",
1812                    ConcreteDataType::timestamp_millisecond_datatype(),
1813                    false,
1814                ),
1815                semantic_type: SemanticType::Timestamp,
1816                column_id: 2,
1817            })
1818            .push_column_metadata(ColumnMetadata {
1819                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1820                semantic_type: SemanticType::Field,
1821                column_id: 3,
1822            })
1823            .push_column_metadata(ColumnMetadata {
1824                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1825                semantic_type: SemanticType::Field,
1826                column_id: 4,
1827            })
1828            .primary_key(vec![0, 1])
1829            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1830        let metadata = Arc::new(builder.build().unwrap());
1831
1832        let primary_key_codec = build_primary_key_codec(&metadata);
1833        let schema = to_flat_sst_arrow_schema(
1834            &metadata,
1835            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1836        );
1837
1838        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1839        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1840
1841        let capacity = 100;
1842        let mut converter =
1843            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1844
1845        let key_values1 = build_key_values_with_sparse_encoding(
1846            &metadata,
1847            &primary_key_codec,
1848            2048u32, // table_id
1849            100u64,  // tsid
1850            "key11".to_string(),
1851            "key21".to_string(),
1852            vec![1000, 2000].into_iter(),
1853            vec![Some(1.0), Some(2.0)].into_iter(),
1854            1,
1855        );
1856
1857        let key_values2 = build_key_values_with_sparse_encoding(
1858            &metadata,
1859            &primary_key_codec,
1860            4096u32, // table_id
1861            200u64,  // tsid
1862            "key12".to_string(),
1863            "key22".to_string(),
1864            vec![1500].into_iter(),
1865            vec![Some(3.0)].into_iter(),
1866            2,
1867        );
1868
1869        converter.append_key_values(&key_values1).unwrap();
1870        converter.append_key_values(&key_values2).unwrap();
1871
1872        let bulk_part = converter.convert().unwrap();
1873
1874        assert_eq!(bulk_part.num_rows(), 3);
1875        assert_eq!(bulk_part.min_timestamp, 1000);
1876        assert_eq!(bulk_part.max_timestamp, 2000);
1877        assert_eq!(bulk_part.sequence, 2);
1878        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1879
1880        // For sparse encoding, primary key columns should NOT be stored individually
1881        // even when store_primary_key_columns is true, because sparse encoding
1882        // stores the encoded primary key in the __primary_key column
1883        let schema = bulk_part.batch.schema();
1884        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1885        assert_eq!(
1886            field_names,
1887            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1888        );
1889
1890        // Verify the __primary_key column contains encoded sparse keys
1891        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1892        let dict_array = primary_key_column
1893            .as_any()
1894            .downcast_ref::<DictionaryArray<UInt32Type>>()
1895            .unwrap();
1896
1897        // Should have non-zero entries indicating encoded primary keys
1898        assert!(!dict_array.is_empty());
1899        assert_eq!(dict_array.len(), 3); // 3 rows total
1900
1901        // Verify values are properly encoded binary data (not empty)
1902        let values = dict_array
1903            .values()
1904            .as_any()
1905            .downcast_ref::<BinaryArray>()
1906            .unwrap();
1907        for i in 0..values.len() {
1908            assert!(
1909                !values.value(i).is_empty(),
1910                "Encoded primary key should not be empty"
1911            );
1912        }
1913    }
1914
1915    #[test]
1916    fn test_convert_bulk_part_empty() {
1917        let metadata = metadata_for_test();
1918        let schema = to_flat_sst_arrow_schema(
1919            &metadata,
1920            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1921        );
1922        let primary_key_codec = build_primary_key_codec(&metadata);
1923
1924        // Create empty batch
1925        let empty_batch = RecordBatch::new_empty(schema.clone());
1926        let empty_part = BulkPart {
1927            batch: empty_batch,
1928            max_timestamp: 0,
1929            min_timestamp: 0,
1930            sequence: 0,
1931            timestamp_index: 0,
1932            raw_data: None,
1933        };
1934
1935        let result =
1936            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1937        assert!(result.is_none());
1938    }
1939
1940    #[test]
1941    fn test_convert_bulk_part_dense_with_pk_columns() {
1942        let metadata = metadata_for_test();
1943        let primary_key_codec = build_primary_key_codec(&metadata);
1944
1945        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1946            "key1", "key2", "key1",
1947        ]));
1948        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1949        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1950        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1951        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1952
1953        let input_schema = Arc::new(Schema::new(vec![
1954            Field::new("k0", ArrowDataType::Utf8, false),
1955            Field::new("k1", ArrowDataType::UInt32, false),
1956            Field::new("v0", ArrowDataType::Int64, true),
1957            Field::new("v1", ArrowDataType::Float64, true),
1958            Field::new(
1959                "ts",
1960                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1961                false,
1962            ),
1963        ]));
1964
1965        let input_batch = RecordBatch::try_new(
1966            input_schema,
1967            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
1968        )
1969        .unwrap();
1970
1971        let part = BulkPart {
1972            batch: input_batch,
1973            max_timestamp: 2000,
1974            min_timestamp: 1000,
1975            sequence: 5,
1976            timestamp_index: 4,
1977            raw_data: None,
1978        };
1979
1980        let output_schema = to_flat_sst_arrow_schema(
1981            &metadata,
1982            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1983        );
1984
1985        let result = convert_bulk_part(
1986            part,
1987            &metadata,
1988            primary_key_codec,
1989            output_schema,
1990            true, // store primary key columns
1991        )
1992        .unwrap();
1993
1994        let converted = result.unwrap();
1995
1996        assert_eq!(converted.num_rows(), 3);
1997        assert_eq!(converted.max_timestamp, 2000);
1998        assert_eq!(converted.min_timestamp, 1000);
1999        assert_eq!(converted.sequence, 5);
2000
2001        let schema = converted.batch.schema();
2002        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2003        assert_eq!(
2004            field_names,
2005            vec![
2006                "k0",
2007                "k1",
2008                "v0",
2009                "v1",
2010                "ts",
2011                "__primary_key",
2012                "__sequence",
2013                "__op_type"
2014            ]
2015        );
2016
2017        let k0_col = converted.batch.column_by_name("k0").unwrap();
2018        assert!(matches!(
2019            k0_col.data_type(),
2020            ArrowDataType::Dictionary(_, _)
2021        ));
2022
2023        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2024        let dict_array = pk_col
2025            .as_any()
2026            .downcast_ref::<DictionaryArray<UInt32Type>>()
2027            .unwrap();
2028        let keys = dict_array.keys();
2029
2030        assert_eq!(keys.len(), 3);
2031    }
2032
2033    #[test]
2034    fn test_convert_bulk_part_dense_without_pk_columns() {
2035        let metadata = metadata_for_test();
2036        let primary_key_codec = build_primary_key_codec(&metadata);
2037
2038        // Create input batch with primary key columns (k0, k1)
2039        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2040        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2041        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2042        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2043        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2044
2045        let input_schema = Arc::new(Schema::new(vec![
2046            Field::new("k0", ArrowDataType::Utf8, false),
2047            Field::new("k1", ArrowDataType::UInt32, false),
2048            Field::new("v0", ArrowDataType::Int64, true),
2049            Field::new("v1", ArrowDataType::Float64, true),
2050            Field::new(
2051                "ts",
2052                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2053                false,
2054            ),
2055        ]));
2056
2057        let input_batch = RecordBatch::try_new(
2058            input_schema,
2059            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2060        )
2061        .unwrap();
2062
2063        let part = BulkPart {
2064            batch: input_batch,
2065            max_timestamp: 2000,
2066            min_timestamp: 1000,
2067            sequence: 3,
2068            timestamp_index: 4,
2069            raw_data: None,
2070        };
2071
2072        let output_schema = to_flat_sst_arrow_schema(
2073            &metadata,
2074            &FlatSchemaOptions {
2075                raw_pk_columns: false,
2076                string_pk_use_dict: true,
2077            },
2078        );
2079
2080        let result = convert_bulk_part(
2081            part,
2082            &metadata,
2083            primary_key_codec,
2084            output_schema,
2085            false, // don't store primary key columns
2086        )
2087        .unwrap();
2088
2089        let converted = result.unwrap();
2090
2091        assert_eq!(converted.num_rows(), 2);
2092        assert_eq!(converted.max_timestamp, 2000);
2093        assert_eq!(converted.min_timestamp, 1000);
2094        assert_eq!(converted.sequence, 3);
2095
2096        // Verify schema does NOT include individual primary key columns
2097        let schema = converted.batch.schema();
2098        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2099        assert_eq!(
2100            field_names,
2101            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2102        );
2103
2104        // Verify __primary_key column is present and is a dictionary
2105        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2106        assert!(matches!(
2107            pk_col.data_type(),
2108            ArrowDataType::Dictionary(_, _)
2109        ));
2110    }
2111
2112    #[test]
2113    fn test_convert_bulk_part_sparse_encoding() {
2114        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2115        builder
2116            .push_column_metadata(ColumnMetadata {
2117                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2118                semantic_type: SemanticType::Tag,
2119                column_id: 0,
2120            })
2121            .push_column_metadata(ColumnMetadata {
2122                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2123                semantic_type: SemanticType::Tag,
2124                column_id: 1,
2125            })
2126            .push_column_metadata(ColumnMetadata {
2127                column_schema: ColumnSchema::new(
2128                    "ts",
2129                    ConcreteDataType::timestamp_millisecond_datatype(),
2130                    false,
2131                ),
2132                semantic_type: SemanticType::Timestamp,
2133                column_id: 2,
2134            })
2135            .push_column_metadata(ColumnMetadata {
2136                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2137                semantic_type: SemanticType::Field,
2138                column_id: 3,
2139            })
2140            .push_column_metadata(ColumnMetadata {
2141                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2142                semantic_type: SemanticType::Field,
2143                column_id: 4,
2144            })
2145            .primary_key(vec![0, 1])
2146            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2147        let metadata = Arc::new(builder.build().unwrap());
2148
2149        let primary_key_codec = build_primary_key_codec(&metadata);
2150
2151        // Create input batch with __primary_key column (sparse encoding)
2152        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2153            b"encoded_key_1".as_slice(),
2154            b"encoded_key_2".as_slice(),
2155        ]));
2156        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2157        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2158        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2159
2160        let input_schema = Arc::new(Schema::new(vec![
2161            Field::new("__primary_key", ArrowDataType::Binary, false),
2162            Field::new("v0", ArrowDataType::Int64, true),
2163            Field::new("v1", ArrowDataType::Float64, true),
2164            Field::new(
2165                "ts",
2166                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2167                false,
2168            ),
2169        ]));
2170
2171        let input_batch =
2172            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2173                .unwrap();
2174
2175        let part = BulkPart {
2176            batch: input_batch,
2177            max_timestamp: 2000,
2178            min_timestamp: 1000,
2179            sequence: 7,
2180            timestamp_index: 3,
2181            raw_data: None,
2182        };
2183
2184        let output_schema = to_flat_sst_arrow_schema(
2185            &metadata,
2186            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2187        );
2188
2189        let result = convert_bulk_part(
2190            part,
2191            &metadata,
2192            primary_key_codec,
2193            output_schema,
2194            true, // store_primary_key_columns (ignored for sparse)
2195        )
2196        .unwrap();
2197
2198        let converted = result.unwrap();
2199
2200        assert_eq!(converted.num_rows(), 2);
2201        assert_eq!(converted.max_timestamp, 2000);
2202        assert_eq!(converted.min_timestamp, 1000);
2203        assert_eq!(converted.sequence, 7);
2204
2205        // Verify schema does NOT include individual primary key columns (sparse encoding)
2206        let schema = converted.batch.schema();
2207        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2208        assert_eq!(
2209            field_names,
2210            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2211        );
2212
2213        // Verify __primary_key is dictionary encoded
2214        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2215        assert!(matches!(
2216            pk_col.data_type(),
2217            ArrowDataType::Dictionary(_, _)
2218        ));
2219    }
2220
2221    #[test]
2222    fn test_convert_bulk_part_sorting_with_multiple_series() {
2223        let metadata = metadata_for_test();
2224        let primary_key_codec = build_primary_key_codec(&metadata);
2225
2226        // Create unsorted batch with multiple series and timestamps
2227        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2228            "series_b", "series_a", "series_b", "series_a",
2229        ]));
2230        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2231        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2232        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2233        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2234            2000, 1000, 4000, 3000,
2235        ]));
2236
2237        let input_schema = Arc::new(Schema::new(vec![
2238            Field::new("k0", ArrowDataType::Utf8, false),
2239            Field::new("k1", ArrowDataType::UInt32, false),
2240            Field::new("v0", ArrowDataType::Int64, true),
2241            Field::new("v1", ArrowDataType::Float64, true),
2242            Field::new(
2243                "ts",
2244                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2245                false,
2246            ),
2247        ]));
2248
2249        let input_batch = RecordBatch::try_new(
2250            input_schema,
2251            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2252        )
2253        .unwrap();
2254
2255        let part = BulkPart {
2256            batch: input_batch,
2257            max_timestamp: 4000,
2258            min_timestamp: 1000,
2259            sequence: 10,
2260            timestamp_index: 4,
2261            raw_data: None,
2262        };
2263
2264        let output_schema = to_flat_sst_arrow_schema(
2265            &metadata,
2266            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2267        );
2268
2269        let result =
2270            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2271
2272        let converted = result.unwrap();
2273
2274        assert_eq!(converted.num_rows(), 4);
2275
2276        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2277        let ts_col = converted.batch.column(converted.timestamp_index);
2278        let ts_array = ts_col
2279            .as_any()
2280            .downcast_ref::<TimestampMillisecondArray>()
2281            .unwrap();
2282
2283        // After sorting by (pk, ts), we should have:
2284        // series_a,1: ts=1000, 3000
2285        // series_b,2: ts=2000, 4000
2286        let timestamps: Vec<i64> = ts_array.values().to_vec();
2287        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2288    }
2289}