mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64    InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80/// A raw bulk part in the memtable.
81#[derive(Clone)]
82pub struct BulkPart {
83    pub batch: RecordBatch,
84    pub max_timestamp: i64,
85    pub min_timestamp: i64,
86    pub sequence: u64,
87    pub timestamp_index: usize,
88    pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92    type Error = error::Error;
93
94    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95        match value.body.expect("Entry payload should be present") {
96            Body::ArrowIpc(ipc) => {
97                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                let batch = decoder
100                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101                    .context(error::ConvertBulkWalEntrySnafu)?;
102                Ok(Self {
103                    batch,
104                    max_timestamp: value.max_ts,
105                    min_timestamp: value.min_ts,
106                    sequence: value.sequence,
107                    timestamp_index: value.timestamp_index as usize,
108                    raw_data: Some(ipc),
109                })
110            }
111        }
112    }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116    type Error = error::Error;
117
118    fn try_from(value: &BulkPart) -> Result<Self> {
119        if let Some(ipc) = &value.raw_data {
120            Ok(BulkWalEntry {
121                sequence: value.sequence,
122                max_ts: value.max_timestamp,
123                min_ts: value.min_timestamp,
124                timestamp_index: value.timestamp_index as u32,
125                body: Some(Body::ArrowIpc(ipc.clone())),
126            })
127        } else {
128            let mut encoder = FlightEncoder::default();
129            let schema_bytes = encoder
130                .encode_schema(value.batch.schema().as_ref())
131                .data_header;
132            let [rb_data] = encoder
133                .encode(FlightMessage::RecordBatch(value.batch.clone()))
134                .try_into()
135                .map_err(|_| {
136                    error::UnsupportedOperationSnafu {
137                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138                    }
139                    .build()
140                })?;
141            Ok(BulkWalEntry {
142                sequence: value.sequence,
143                max_ts: value.max_timestamp,
144                min_ts: value.min_timestamp,
145                timestamp_index: value.timestamp_index as u32,
146                body: Some(Body::ArrowIpc(ArrowIpc {
147                    schema: schema_bytes,
148                    data_header: rb_data.data_header,
149                    payload: rb_data.data_body,
150                })),
151            })
152        }
153    }
154}
155
156impl BulkPart {
157    pub(crate) fn estimated_size(&self) -> usize {
158        record_batch_estimated_size(&self.batch)
159    }
160
161    /// Returns the estimated series count in this BulkPart.
162    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
163    pub fn estimated_series_count(&self) -> usize {
164        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165        let pk_column = self.batch.column(pk_column_idx);
166        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167            dict_array.values().len()
168        } else {
169            0
170        }
171    }
172
173    /// Fills missing columns in the BulkPart batch with default values.
174    ///
175    /// This function checks if the batch schema matches the region metadata schema,
176    /// and if there are missing columns, it fills them with default values (or null
177    /// for nullable columns).
178    ///
179    /// # Arguments
180    ///
181    /// * `region_metadata` - The region metadata containing the expected schema
182    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
183        // Builds a map of existing columns in the batch
184        let batch_schema = self.batch.schema();
185        let batch_columns: HashSet<_> = batch_schema
186            .fields()
187            .iter()
188            .map(|f| f.name().as_str())
189            .collect();
190
191        // Finds columns that need to be filled
192        let mut columns_to_fill = Vec::new();
193        for column_meta in &region_metadata.column_metadatas {
194            // TODO(yingwen): Returns error if it is impure default after we support filling
195            // bulk insert request in the frontend
196            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
197                columns_to_fill.push(column_meta);
198            }
199        }
200
201        if columns_to_fill.is_empty() {
202            return Ok(());
203        }
204
205        let num_rows = self.batch.num_rows();
206
207        let mut new_columns = Vec::new();
208        let mut new_fields = Vec::new();
209
210        // First, adds all existing columns
211        new_fields.extend(batch_schema.fields().iter().cloned());
212        new_columns.extend_from_slice(self.batch.columns());
213
214        let region_id = region_metadata.region_id;
215        // Then adds the missing columns with default values
216        for column_meta in columns_to_fill {
217            let default_vector = column_meta
218                .column_schema
219                .create_default_vector(num_rows)
220                .context(CreateDefaultSnafu {
221                    region_id,
222                    column: &column_meta.column_schema.name,
223                })?
224                .with_context(|| InvalidRequestSnafu {
225                    region_id,
226                    reason: format!(
227                        "column {} does not have default value",
228                        column_meta.column_schema.name
229                    ),
230                })?;
231            let arrow_array = default_vector.to_arrow_array();
232            column_meta.column_schema.data_type.as_arrow_type();
233
234            new_fields.push(Arc::new(Field::new(
235                column_meta.column_schema.name.clone(),
236                column_meta.column_schema.data_type.as_arrow_type(),
237                column_meta.column_schema.is_nullable(),
238            )));
239            new_columns.push(arrow_array);
240        }
241
242        // Create a new schema and batch with the filled columns
243        let new_schema = Arc::new(Schema::new(new_fields));
244        let new_batch =
245            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
246
247        // Update the batch
248        self.batch = new_batch;
249
250        Ok(())
251    }
252
253    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
254    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
255        let vectors = region_metadata
256            .schema
257            .column_schemas()
258            .iter()
259            .map(|col| match self.batch.column_by_name(&col.name) {
260                None => Ok(None),
261                Some(col) => Helper::try_into_vector(col).map(Some),
262            })
263            .collect::<datatypes::error::Result<Vec<_>>>()
264            .context(error::ComputeVectorSnafu)?;
265
266        let rows = (0..self.num_rows())
267            .map(|row_idx| {
268                let values = (0..self.batch.num_columns())
269                    .map(|col_idx| {
270                        if let Some(v) = &vectors[col_idx] {
271                            to_grpc_value(v.get(row_idx))
272                        } else {
273                            api::v1::Value { value_data: None }
274                        }
275                    })
276                    .collect::<Vec<_>>();
277                api::v1::Row { values }
278            })
279            .collect::<Vec<_>>();
280
281        let schema = region_metadata
282            .column_metadatas
283            .iter()
284            .map(|c| {
285                let data_type_wrapper =
286                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
287                Ok(api::v1::ColumnSchema {
288                    column_name: c.column_schema.name.clone(),
289                    datatype: data_type_wrapper.datatype() as i32,
290                    semantic_type: c.semantic_type as i32,
291                    ..Default::default()
292                })
293            })
294            .collect::<api::error::Result<Vec<_>>>()
295            .context(error::ConvertColumnDataTypeSnafu {
296                reason: "failed to convert region metadata to column schema",
297            })?;
298
299        let rows = api::v1::Rows { schema, rows };
300
301        Ok(Mutation {
302            op_type: OpType::Put as i32,
303            sequence: self.sequence,
304            rows: Some(rows),
305            write_hint: None,
306        })
307    }
308
309    pub fn timestamps(&self) -> &ArrayRef {
310        self.batch.column(self.timestamp_index)
311    }
312
313    pub fn num_rows(&self) -> usize {
314        self.batch.num_rows()
315    }
316}
317
318/// A collection of small unordered bulk parts.
319/// Used to batch small parts together before merging them into a sorted part.
320pub struct UnorderedPart {
321    /// Small bulk parts that haven't been sorted yet.
322    parts: Vec<BulkPart>,
323    /// Total number of rows across all parts.
324    total_rows: usize,
325    /// Minimum timestamp across all parts.
326    min_timestamp: i64,
327    /// Maximum timestamp across all parts.
328    max_timestamp: i64,
329    /// Maximum sequence number across all parts.
330    max_sequence: u64,
331    /// Row count threshold for accepting parts (default: 1024).
332    threshold: usize,
333    /// Row count threshold for compacting (default: 4096).
334    compact_threshold: usize,
335}
336
337impl Default for UnorderedPart {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343impl UnorderedPart {
344    /// Creates a new empty UnorderedPart.
345    pub fn new() -> Self {
346        Self {
347            parts: Vec::new(),
348            total_rows: 0,
349            min_timestamp: i64::MAX,
350            max_timestamp: i64::MIN,
351            max_sequence: 0,
352            threshold: 1024,
353            compact_threshold: 4096,
354        }
355    }
356
357    /// Sets the threshold for accepting parts into unordered_part.
358    pub fn set_threshold(&mut self, threshold: usize) {
359        self.threshold = threshold;
360    }
361
362    /// Sets the threshold for compacting unordered_part.
363    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
364        self.compact_threshold = compact_threshold;
365    }
366
367    /// Returns the threshold for accepting parts.
368    pub fn threshold(&self) -> usize {
369        self.threshold
370    }
371
372    /// Returns the compact threshold.
373    pub fn compact_threshold(&self) -> usize {
374        self.compact_threshold
375    }
376
377    /// Returns true if this part should accept the given row count.
378    pub fn should_accept(&self, num_rows: usize) -> bool {
379        num_rows < self.threshold
380    }
381
382    /// Returns true if this part should be compacted.
383    pub fn should_compact(&self) -> bool {
384        self.total_rows >= self.compact_threshold
385    }
386
387    /// Adds a BulkPart to this unordered collection.
388    pub fn push(&mut self, part: BulkPart) {
389        self.total_rows += part.num_rows();
390        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
391        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
392        self.max_sequence = self.max_sequence.max(part.sequence);
393        self.parts.push(part);
394    }
395
396    /// Returns the total number of rows across all parts.
397    pub fn num_rows(&self) -> usize {
398        self.total_rows
399    }
400
401    /// Returns true if there are no parts.
402    pub fn is_empty(&self) -> bool {
403        self.parts.is_empty()
404    }
405
406    /// Returns the number of parts in this collection.
407    pub fn num_parts(&self) -> usize {
408        self.parts.len()
409    }
410
411    /// Concatenates and sorts all parts into a single RecordBatch.
412    /// Returns None if the collection is empty.
413    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
414        if self.parts.is_empty() {
415            return Ok(None);
416        }
417
418        if self.parts.len() == 1 {
419            // If there's only one part, return its batch directly
420            return Ok(Some(self.parts[0].batch.clone()));
421        }
422
423        // Get the schema from the first part
424        let schema = self.parts[0].batch.schema();
425
426        // Concatenate all record batches
427        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
428        let concatenated =
429            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
430
431        // Sort the concatenated batch
432        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
433
434        Ok(Some(sorted_batch))
435    }
436
437    /// Converts all parts into a single sorted BulkPart.
438    /// Returns None if the collection is empty.
439    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
440        let Some(sorted_batch) = self.concat_and_sort()? else {
441            return Ok(None);
442        };
443
444        let timestamp_index = self.parts[0].timestamp_index;
445
446        Ok(Some(BulkPart {
447            batch: sorted_batch,
448            max_timestamp: self.max_timestamp,
449            min_timestamp: self.min_timestamp,
450            sequence: self.max_sequence,
451            timestamp_index,
452            raw_data: None,
453        }))
454    }
455
456    /// Clears all parts from this collection.
457    pub fn clear(&mut self) {
458        self.parts.clear();
459        self.total_rows = 0;
460        self.min_timestamp = i64::MAX;
461        self.max_timestamp = i64::MIN;
462        self.max_sequence = 0;
463    }
464}
465
466/// More accurate estimation of the size of a record batch.
467pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
468    batch
469        .columns()
470        .iter()
471        // If can not get slice memory size, assume 0 here.
472        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
473        .sum()
474}
475
476/// Primary key column builder for handling strings specially.
477enum PrimaryKeyColumnBuilder {
478    /// String dictionary builder for string types.
479    StringDict(StringDictionaryBuilder<UInt32Type>),
480    /// Generic mutable vector for other types.
481    Vector(Box<dyn MutableVector>),
482}
483
484impl PrimaryKeyColumnBuilder {
485    /// Appends a value to the builder.
486    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
487        match self {
488            PrimaryKeyColumnBuilder::StringDict(builder) => {
489                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
490                    // We know the value is a string.
491                    builder.append_value(s);
492                } else {
493                    builder.append_null();
494                }
495            }
496            PrimaryKeyColumnBuilder::Vector(builder) => {
497                builder.push_value_ref(&value);
498            }
499        }
500        Ok(())
501    }
502
503    /// Converts the builder to an ArrayRef.
504    fn into_arrow_array(self) -> ArrayRef {
505        match self {
506            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
507            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
508        }
509    }
510}
511
512/// Converter that converts structs into [BulkPart].
513pub struct BulkPartConverter {
514    /// Region metadata.
515    region_metadata: RegionMetadataRef,
516    /// Schema of the converted batch.
517    schema: SchemaRef,
518    /// Primary key codec for encoding keys
519    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
520    /// Buffer for encoding primary key.
521    key_buf: Vec<u8>,
522    /// Primary key array builder.
523    key_array_builder: PrimaryKeyArrayBuilder,
524    /// Builders for non-primary key columns.
525    value_builder: ValueBuilder,
526    /// Builders for individual primary key columns.
527    /// The order of builders is the same as the order of primary key columns in the region metadata.
528    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
529
530    /// Max timestamp value.
531    max_ts: i64,
532    /// Min timestamp value.
533    min_ts: i64,
534    /// Max sequence number.
535    max_sequence: SequenceNumber,
536}
537
538impl BulkPartConverter {
539    /// Creates a new converter.
540    ///
541    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
542    /// stores primary key columns in arrays additionally.
543    pub fn new(
544        region_metadata: &RegionMetadataRef,
545        schema: SchemaRef,
546        capacity: usize,
547        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
548        store_primary_key_columns: bool,
549    ) -> Self {
550        debug_assert_eq!(
551            region_metadata.primary_key_encoding,
552            primary_key_codec.encoding()
553        );
554
555        let primary_key_column_builders = if store_primary_key_columns
556            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
557        {
558            new_primary_key_column_builders(region_metadata, capacity)
559        } else {
560            Vec::new()
561        };
562
563        Self {
564            region_metadata: region_metadata.clone(),
565            schema,
566            primary_key_codec,
567            key_buf: Vec::new(),
568            key_array_builder: PrimaryKeyArrayBuilder::new(),
569            value_builder: ValueBuilder::new(region_metadata, capacity),
570            primary_key_column_builders,
571            min_ts: i64::MAX,
572            max_ts: i64::MIN,
573            max_sequence: SequenceNumber::MIN,
574        }
575    }
576
577    /// Appends a [KeyValues] into the converter.
578    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
579        for kv in key_values.iter() {
580            self.append_key_value(&kv)?;
581        }
582
583        Ok(())
584    }
585
586    /// Appends a [KeyValue] to builders.
587    ///
588    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
589    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
590        // Handles primary key based on encoding type
591        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
592            // For sparse encoding, the primary key is already encoded in the KeyValue
593            // Gets the first (and only) primary key value which contains the encoded key
594            let mut primary_keys = kv.primary_keys();
595            if let Some(encoded) = primary_keys
596                .next()
597                .context(ColumnNotFoundSnafu {
598                    column: PRIMARY_KEY_COLUMN_NAME,
599                })?
600                .try_into_binary()
601                .context(DataTypeMismatchSnafu)?
602            {
603                self.key_array_builder
604                    .append(encoded)
605                    .context(ComputeArrowSnafu)?;
606            } else {
607                self.key_array_builder
608                    .append("")
609                    .context(ComputeArrowSnafu)?;
610            }
611        } else {
612            // For dense encoding, we need to encode the primary key columns
613            self.key_buf.clear();
614            self.primary_key_codec
615                .encode_key_value(kv, &mut self.key_buf)
616                .context(EncodeSnafu)?;
617            self.key_array_builder
618                .append(&self.key_buf)
619                .context(ComputeArrowSnafu)?;
620        };
621
622        // If storing primary key columns, append values to individual builders
623        if !self.primary_key_column_builders.is_empty() {
624            for (builder, pk_value) in self
625                .primary_key_column_builders
626                .iter_mut()
627                .zip(kv.primary_keys())
628            {
629                builder.push_value_ref(pk_value)?;
630            }
631        }
632
633        // Pushes other columns.
634        self.value_builder.push(
635            kv.timestamp(),
636            kv.sequence(),
637            kv.op_type() as u8,
638            kv.fields(),
639        );
640
641        // Updates statistics
642        // Safety: timestamp of kv must be both present and a valid timestamp value.
643        let ts = kv
644            .timestamp()
645            .try_into_timestamp()
646            .unwrap()
647            .unwrap()
648            .value();
649        self.min_ts = self.min_ts.min(ts);
650        self.max_ts = self.max_ts.max(ts);
651        self.max_sequence = self.max_sequence.max(kv.sequence());
652
653        Ok(())
654    }
655
656    /// Converts buffered content into a [BulkPart].
657    ///
658    /// It sorts the record batch by (primary key, timestamp, sequence desc).
659    pub fn convert(mut self) -> Result<BulkPart> {
660        let values = Values::from(self.value_builder);
661        let mut columns =
662            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
663
664        // Build primary key column arrays if enabled.
665        for builder in self.primary_key_column_builders {
666            columns.push(builder.into_arrow_array());
667        }
668        // Then fields columns.
669        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
670        // Time index.
671        let timestamp_index = columns.len();
672        columns.push(values.timestamp.to_arrow_array());
673        // Primary key.
674        let pk_array = self.key_array_builder.finish();
675        columns.push(Arc::new(pk_array));
676        // Sequence and op type.
677        columns.push(values.sequence.to_arrow_array());
678        columns.push(values.op_type.to_arrow_array());
679
680        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
681        // Sorts the record batch.
682        let batch = sort_primary_key_record_batch(&batch)?;
683
684        Ok(BulkPart {
685            batch,
686            max_timestamp: self.max_ts,
687            min_timestamp: self.min_ts,
688            sequence: self.max_sequence,
689            timestamp_index,
690            raw_data: None,
691        })
692    }
693}
694
695fn new_primary_key_column_builders(
696    metadata: &RegionMetadata,
697    capacity: usize,
698) -> Vec<PrimaryKeyColumnBuilder> {
699    metadata
700        .primary_key_columns()
701        .map(|col| {
702            if col.column_schema.data_type.is_string() {
703                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
704                    capacity,
705                    INIT_DICT_VALUE_CAPACITY,
706                    capacity,
707                ))
708            } else {
709                PrimaryKeyColumnBuilder::Vector(
710                    col.column_schema.data_type.create_mutable_vector(capacity),
711                )
712            }
713        })
714        .collect()
715}
716
717/// Sorts the record batch with primary key format.
718fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
719    let total_columns = batch.num_columns();
720    let sort_columns = vec![
721        // Primary key column (ascending)
722        SortColumn {
723            values: batch.column(total_columns - 3).clone(),
724            options: Some(SortOptions {
725                descending: false,
726                nulls_first: true,
727            }),
728        },
729        // Time index column (ascending)
730        SortColumn {
731            values: batch.column(total_columns - 4).clone(),
732            options: Some(SortOptions {
733                descending: false,
734                nulls_first: true,
735            }),
736        },
737        // Sequence column (descending)
738        SortColumn {
739            values: batch.column(total_columns - 2).clone(),
740            options: Some(SortOptions {
741                descending: true,
742                nulls_first: true,
743            }),
744        },
745    ];
746
747    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
748        .context(ComputeArrowSnafu)?;
749
750    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
751}
752
753/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
754/// with the same format as produced by [BulkPartConverter].
755///
756/// This function takes a `BulkPart` where:
757/// - For dense encoding: Primary key columns may be stored as individual columns
758/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
759/// - The batch may not be sorted
760///
761/// And produces a `BulkPart` where:
762/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
763/// - An encoded `__primary_key` dictionary column is present
764/// - The batch is sorted by (primary_key, timestamp, sequence desc)
765///
766/// # Arguments
767///
768/// * `part` - The input `BulkPart` to convert
769/// * `region_metadata` - Region metadata containing schema information
770/// * `primary_key_codec` - Codec for encoding primary keys
771/// * `schema` - Target schema for the output batch
772/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
773///
774/// # Returns
775///
776/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
777/// encoded primary keys and sorted data.
778pub fn convert_bulk_part(
779    part: BulkPart,
780    region_metadata: &RegionMetadataRef,
781    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
782    schema: SchemaRef,
783    store_primary_key_columns: bool,
784) -> Result<Option<BulkPart>> {
785    if part.num_rows() == 0 {
786        return Ok(None);
787    }
788
789    let num_rows = part.num_rows();
790    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
791
792    // Builds a column name-to-index map for efficient lookups
793    let input_schema = part.batch.schema();
794    let column_indices: HashMap<&str, usize> = input_schema
795        .fields()
796        .iter()
797        .enumerate()
798        .map(|(idx, field)| (field.name().as_str(), idx))
799        .collect();
800
801    // Determines the structure of the input batch by looking up columns by name
802    let mut output_columns = Vec::new();
803
804    // Extracts primary key columns if we need to encode them (dense encoding)
805    let pk_array = if is_sparse {
806        // For sparse encoding, the input should already have the __primary_key column
807        // We need to find it in the input batch
808        None
809    } else {
810        // For dense encoding, extract and encode primary key columns by name
811        let pk_vectors: Result<Vec<_>> = region_metadata
812            .primary_key_columns()
813            .map(|col_meta| {
814                let col_idx = column_indices
815                    .get(col_meta.column_schema.name.as_str())
816                    .context(ColumnNotFoundSnafu {
817                        column: &col_meta.column_schema.name,
818                    })?;
819                let col = part.batch.column(*col_idx);
820                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
821            })
822            .collect();
823        let pk_vectors = pk_vectors?;
824
825        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
826        let mut encode_buf = Vec::new();
827
828        for row_idx in 0..num_rows {
829            encode_buf.clear();
830
831            // Collects primary key values with column IDs for this row
832            let pk_values_with_ids: Vec<_> = region_metadata
833                .primary_key
834                .iter()
835                .zip(pk_vectors.iter())
836                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
837                .collect();
838
839            // Encodes the primary key
840            primary_key_codec
841                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
842                .context(EncodeSnafu)?;
843
844            key_array_builder
845                .append(&encode_buf)
846                .context(ComputeArrowSnafu)?;
847        }
848
849        Some(key_array_builder.finish())
850    };
851
852    // Adds primary key columns if storing them (only for dense encoding)
853    if store_primary_key_columns && !is_sparse {
854        for col_meta in region_metadata.primary_key_columns() {
855            let col_idx = column_indices
856                .get(col_meta.column_schema.name.as_str())
857                .context(ColumnNotFoundSnafu {
858                    column: &col_meta.column_schema.name,
859                })?;
860            let col = part.batch.column(*col_idx);
861
862            // Converts to dictionary if needed for string types
863            let col = if col_meta.column_schema.data_type.is_string() {
864                let target_type = ArrowDataType::Dictionary(
865                    Box::new(ArrowDataType::UInt32),
866                    Box::new(ArrowDataType::Utf8),
867                );
868                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
869            } else {
870                col.clone()
871            };
872            output_columns.push(col);
873        }
874    }
875
876    // Adds field columns
877    for col_meta in region_metadata.field_columns() {
878        let col_idx = column_indices
879            .get(col_meta.column_schema.name.as_str())
880            .context(ColumnNotFoundSnafu {
881                column: &col_meta.column_schema.name,
882            })?;
883        output_columns.push(part.batch.column(*col_idx).clone());
884    }
885
886    // Adds timestamp column
887    let new_timestamp_index = output_columns.len();
888    let ts_col_idx = column_indices
889        .get(
890            region_metadata
891                .time_index_column()
892                .column_schema
893                .name
894                .as_str(),
895        )
896        .context(ColumnNotFoundSnafu {
897            column: &region_metadata.time_index_column().column_schema.name,
898        })?;
899    output_columns.push(part.batch.column(*ts_col_idx).clone());
900
901    // Adds encoded primary key dictionary column
902    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
903        Arc::new(pk_dict_array) as ArrayRef
904    } else {
905        let pk_col_idx =
906            column_indices
907                .get(PRIMARY_KEY_COLUMN_NAME)
908                .context(ColumnNotFoundSnafu {
909                    column: PRIMARY_KEY_COLUMN_NAME,
910                })?;
911        let col = part.batch.column(*pk_col_idx);
912
913        // Casts to dictionary type if needed
914        let target_type = ArrowDataType::Dictionary(
915            Box::new(ArrowDataType::UInt32),
916            Box::new(ArrowDataType::Binary),
917        );
918        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
919    };
920    output_columns.push(pk_dictionary);
921
922    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
923    output_columns.push(Arc::new(sequence_array) as ArrayRef);
924
925    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
926    output_columns.push(Arc::new(op_type_array) as ArrayRef);
927
928    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
929
930    // Sorts the batch by (primary_key, timestamp, sequence desc)
931    let sorted_batch = sort_primary_key_record_batch(&batch)?;
932
933    Ok(Some(BulkPart {
934        batch: sorted_batch,
935        max_timestamp: part.max_timestamp,
936        min_timestamp: part.min_timestamp,
937        sequence: part.sequence,
938        timestamp_index: new_timestamp_index,
939        raw_data: None,
940    }))
941}
942
943#[derive(Debug, Clone)]
944pub struct EncodedBulkPart {
945    data: Bytes,
946    metadata: BulkPartMeta,
947}
948
949impl EncodedBulkPart {
950    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
951        Self { data, metadata }
952    }
953
954    pub(crate) fn metadata(&self) -> &BulkPartMeta {
955        &self.metadata
956    }
957
958    /// Returns the size of the encoded data in bytes
959    pub(crate) fn size_bytes(&self) -> usize {
960        self.data.len()
961    }
962
963    /// Returns the encoded data.
964    pub(crate) fn data(&self) -> &Bytes {
965        &self.data
966    }
967
968    /// Converts this `EncodedBulkPart` to `SstInfo`.
969    ///
970    /// # Arguments
971    /// * `file_id` - The SST file ID to assign to this part
972    ///
973    /// # Returns
974    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
975    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
976        let unit = self.metadata.region_metadata.time_index_type().unit();
977        SstInfo {
978            file_id,
979            time_range: (
980                Timestamp::new(self.metadata.min_timestamp, unit),
981                Timestamp::new(self.metadata.max_timestamp, unit),
982            ),
983            file_size: self.data.len() as u64,
984            num_rows: self.metadata.num_rows,
985            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
986            file_metadata: Some(self.metadata.parquet_metadata.clone()),
987            index_metadata: IndexOutput::default(),
988            num_series: self.metadata.num_series,
989        }
990    }
991
992    pub(crate) fn read(
993        &self,
994        context: BulkIterContextRef,
995        sequence: Option<SequenceRange>,
996        mem_scan_metrics: Option<MemScanMetrics>,
997    ) -> Result<Option<BoxedRecordBatchIterator>> {
998        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
999        let skip_fields_for_pruning =
1000            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1001
1002        // use predicate to find row groups to read.
1003        let row_groups_to_read =
1004            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1005
1006        if row_groups_to_read.is_empty() {
1007            // All row groups are filtered.
1008            return Ok(None);
1009        }
1010
1011        let iter = EncodedBulkPartIter::try_new(
1012            self,
1013            context,
1014            row_groups_to_read,
1015            sequence,
1016            mem_scan_metrics,
1017        )?;
1018        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1019    }
1020
1021    /// Computes whether to skip field columns based on PreFilterMode.
1022    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1023        match pre_filter_mode {
1024            PreFilterMode::All => false,
1025            PreFilterMode::SkipFields => true,
1026            PreFilterMode::SkipFieldsOnDelete => {
1027                // Check if any row group contains delete op
1028                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1029                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1030                })
1031            }
1032        }
1033    }
1034}
1035
1036#[derive(Debug, Clone)]
1037pub struct BulkPartMeta {
1038    /// Total rows in part.
1039    pub num_rows: usize,
1040    /// Max timestamp in part.
1041    pub max_timestamp: i64,
1042    /// Min timestamp in part.
1043    pub min_timestamp: i64,
1044    /// Part file metadata.
1045    pub parquet_metadata: Arc<ParquetMetaData>,
1046    /// Part region schema.
1047    pub region_metadata: RegionMetadataRef,
1048    /// Number of series.
1049    pub num_series: u64,
1050}
1051
1052/// Metrics for encoding a part.
1053#[derive(Default, Debug)]
1054pub struct BulkPartEncodeMetrics {
1055    /// Cost of iterating over the data.
1056    pub iter_cost: Duration,
1057    /// Cost of writing the data.
1058    pub write_cost: Duration,
1059    /// Size of data before encoding.
1060    pub raw_size: usize,
1061    /// Size of data after encoding.
1062    pub encoded_size: usize,
1063    /// Number of rows in part.
1064    pub num_rows: usize,
1065}
1066
1067pub struct BulkPartEncoder {
1068    metadata: RegionMetadataRef,
1069    row_group_size: usize,
1070    writer_props: Option<WriterProperties>,
1071}
1072
1073impl BulkPartEncoder {
1074    pub(crate) fn new(
1075        metadata: RegionMetadataRef,
1076        row_group_size: usize,
1077    ) -> Result<BulkPartEncoder> {
1078        // TODO(yingwen): Skip arrow schema if needed.
1079        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1080        let key_value_meta =
1081            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1082
1083        // TODO(yingwen): Do we need compression?
1084        let writer_props = Some(
1085            WriterProperties::builder()
1086                .set_key_value_metadata(Some(vec![key_value_meta]))
1087                .set_write_batch_size(row_group_size)
1088                .set_max_row_group_size(row_group_size)
1089                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1090                .set_column_index_truncate_length(None)
1091                .set_statistics_truncate_length(None)
1092                .build(),
1093        );
1094
1095        Ok(Self {
1096            metadata,
1097            row_group_size,
1098            writer_props,
1099        })
1100    }
1101}
1102
1103impl BulkPartEncoder {
1104    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1105    pub fn encode_record_batch_iter(
1106        &self,
1107        iter: BoxedRecordBatchIterator,
1108        arrow_schema: SchemaRef,
1109        min_timestamp: i64,
1110        max_timestamp: i64,
1111        metrics: &mut BulkPartEncodeMetrics,
1112    ) -> Result<Option<EncodedBulkPart>> {
1113        let mut buf = Vec::with_capacity(4096);
1114        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1115            .context(EncodeMemtableSnafu)?;
1116        let mut total_rows = 0;
1117        let mut series_estimator = SeriesEstimator::default();
1118
1119        // Process each batch from the iterator
1120        let mut iter_start = Instant::now();
1121        for batch_result in iter {
1122            metrics.iter_cost += iter_start.elapsed();
1123            let batch = batch_result?;
1124            if batch.num_rows() == 0 {
1125                continue;
1126            }
1127
1128            series_estimator.update_flat(&batch);
1129            metrics.raw_size += record_batch_estimated_size(&batch);
1130            let write_start = Instant::now();
1131            writer.write(&batch).context(EncodeMemtableSnafu)?;
1132            metrics.write_cost += write_start.elapsed();
1133            total_rows += batch.num_rows();
1134            iter_start = Instant::now();
1135        }
1136        metrics.iter_cost += iter_start.elapsed();
1137        iter_start = Instant::now();
1138
1139        if total_rows == 0 {
1140            return Ok(None);
1141        }
1142
1143        let close_start = Instant::now();
1144        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1145        metrics.write_cost += close_start.elapsed();
1146        metrics.encoded_size += buf.len();
1147        metrics.num_rows += total_rows;
1148
1149        let buf = Bytes::from(buf);
1150        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1151        let num_series = series_estimator.finish();
1152
1153        Ok(Some(EncodedBulkPart {
1154            data: buf,
1155            metadata: BulkPartMeta {
1156                num_rows: total_rows,
1157                max_timestamp,
1158                min_timestamp,
1159                parquet_metadata,
1160                region_metadata: self.metadata.clone(),
1161                num_series,
1162            },
1163        }))
1164    }
1165
1166    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1167    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1168        if part.batch.num_rows() == 0 {
1169            return Ok(None);
1170        }
1171
1172        let mut buf = Vec::with_capacity(4096);
1173        let arrow_schema = part.batch.schema();
1174
1175        let file_metadata = {
1176            let mut writer =
1177                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1178                    .context(EncodeMemtableSnafu)?;
1179            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1180            writer.finish().context(EncodeMemtableSnafu)?
1181        };
1182
1183        let buf = Bytes::from(buf);
1184        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1185
1186        Ok(Some(EncodedBulkPart {
1187            data: buf,
1188            metadata: BulkPartMeta {
1189                num_rows: part.batch.num_rows(),
1190                max_timestamp: part.max_timestamp,
1191                min_timestamp: part.min_timestamp,
1192                parquet_metadata,
1193                region_metadata: self.metadata.clone(),
1194                num_series: part.estimated_series_count() as u64,
1195            },
1196        }))
1197    }
1198}
1199
1200/// Converts mutations to record batches.
1201fn mutations_to_record_batch(
1202    mutations: &[Mutation],
1203    metadata: &RegionMetadataRef,
1204    pk_encoder: &DensePrimaryKeyCodec,
1205    dedup: bool,
1206) -> Result<Option<(RecordBatch, i64, i64)>> {
1207    let total_rows: usize = mutations
1208        .iter()
1209        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
1210        .sum();
1211
1212    if total_rows == 0 {
1213        return Ok(None);
1214    }
1215
1216    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
1217
1218    let mut ts_vector: Box<dyn MutableVector> = metadata
1219        .time_index_column()
1220        .column_schema
1221        .data_type
1222        .create_mutable_vector(total_rows);
1223    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
1224    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
1225
1226    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
1227        .field_columns()
1228        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
1229        .collect();
1230
1231    let mut pk_buffer = vec![];
1232    for m in mutations {
1233        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
1234            continue;
1235        };
1236
1237        for row in key_values.iter() {
1238            pk_buffer.clear();
1239            pk_encoder
1240                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
1241                .context(EncodeSnafu)?;
1242            pk_builder.append_value(pk_buffer.as_bytes());
1243            ts_vector.push_value_ref(&row.timestamp());
1244            sequence_builder.append_value(row.sequence());
1245            op_type_builder.append_value(row.op_type() as u8);
1246            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
1247                builder.push_value_ref(&field);
1248            }
1249        }
1250    }
1251
1252    let arrow_schema = to_sst_arrow_schema(metadata);
1253    // safety: timestamp column must be valid, and values must not be None.
1254    let timestamp_unit = metadata
1255        .time_index_column()
1256        .column_schema
1257        .data_type
1258        .as_timestamp()
1259        .unwrap()
1260        .unit();
1261    let sorter = ArraysSorter {
1262        encoded_primary_keys: pk_builder.finish(),
1263        timestamp_unit,
1264        timestamp: ts_vector.to_vector().to_arrow_array(),
1265        sequence: sequence_builder.finish(),
1266        op_type: op_type_builder.finish(),
1267        fields: field_builders
1268            .iter_mut()
1269            .map(|f| f.to_vector().to_arrow_array()),
1270        dedup,
1271        arrow_schema,
1272    };
1273
1274    sorter.sort().map(Some)
1275}
1276
1277struct ArraysSorter<I> {
1278    encoded_primary_keys: BinaryArray,
1279    timestamp_unit: TimeUnit,
1280    timestamp: ArrayRef,
1281    sequence: UInt64Array,
1282    op_type: UInt8Array,
1283    fields: I,
1284    dedup: bool,
1285    arrow_schema: SchemaRef,
1286}
1287
1288impl<I> ArraysSorter<I>
1289where
1290    I: Iterator<Item = ArrayRef>,
1291{
1292    /// Converts arrays to record batch.
1293    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
1294        debug_assert!(!self.timestamp.is_empty());
1295        debug_assert!(self.timestamp.len() == self.sequence.len());
1296        debug_assert!(self.timestamp.len() == self.op_type.len());
1297        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
1298
1299        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
1300        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
1301        let mut to_sort = self
1302            .encoded_primary_keys
1303            .iter()
1304            .zip(timestamp_iter)
1305            .zip(self.sequence.iter())
1306            .map(|((pk, timestamp), sequence)| {
1307                max_timestamp = max_timestamp.max(*timestamp);
1308                min_timestamp = min_timestamp.min(*timestamp);
1309                (pk, timestamp, sequence)
1310            })
1311            .enumerate()
1312            .collect::<Vec<_>>();
1313
1314        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
1315            l_pk.cmp(r_pk)
1316                .then(l_ts.cmp(r_ts))
1317                .then(l_seq.cmp(r_seq).reverse())
1318        });
1319
1320        if self.dedup {
1321            // Dedup by timestamps while ignore sequence.
1322            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
1323                l_pk == r_pk && l_ts == r_ts
1324            });
1325        }
1326
1327        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
1328
1329        let pk_dictionary = Arc::new(binary_array_to_dictionary(
1330            // safety: pk must be BinaryArray
1331            arrow::compute::take(
1332                &self.encoded_primary_keys,
1333                &indices,
1334                Some(TakeOptions {
1335                    check_bounds: false,
1336                }),
1337            )
1338            .context(ComputeArrowSnafu)?
1339            .as_any()
1340            .downcast_ref::<BinaryArray>()
1341            .unwrap(),
1342        )?) as ArrayRef;
1343
1344        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
1345        for arr in self.fields {
1346            arrays.push(
1347                arrow::compute::take(
1348                    &arr,
1349                    &indices,
1350                    Some(TakeOptions {
1351                        check_bounds: false,
1352                    }),
1353                )
1354                .context(ComputeArrowSnafu)?,
1355            );
1356        }
1357
1358        let timestamp = arrow::compute::take(
1359            &self.timestamp,
1360            &indices,
1361            Some(TakeOptions {
1362                check_bounds: false,
1363            }),
1364        )
1365        .context(ComputeArrowSnafu)?;
1366
1367        arrays.push(timestamp);
1368        arrays.push(pk_dictionary);
1369        arrays.push(
1370            arrow::compute::take(
1371                &self.sequence,
1372                &indices,
1373                Some(TakeOptions {
1374                    check_bounds: false,
1375                }),
1376            )
1377            .context(ComputeArrowSnafu)?,
1378        );
1379
1380        arrays.push(
1381            arrow::compute::take(
1382                &self.op_type,
1383                &indices,
1384                Some(TakeOptions {
1385                    check_bounds: false,
1386                }),
1387            )
1388            .context(ComputeArrowSnafu)?,
1389        );
1390
1391        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
1392        Ok((batch, min_timestamp, max_timestamp))
1393    }
1394}
1395
1396/// Converts timestamp array to an iter of i64 values.
1397fn timestamp_array_to_iter(
1398    timestamp_unit: TimeUnit,
1399    timestamp: &ArrayRef,
1400) -> impl Iterator<Item = &i64> {
1401    match timestamp_unit {
1402        // safety: timestamp column must be valid.
1403        TimeUnit::Second => timestamp
1404            .as_any()
1405            .downcast_ref::<TimestampSecondArray>()
1406            .unwrap()
1407            .values()
1408            .iter(),
1409        TimeUnit::Millisecond => timestamp
1410            .as_any()
1411            .downcast_ref::<TimestampMillisecondArray>()
1412            .unwrap()
1413            .values()
1414            .iter(),
1415        TimeUnit::Microsecond => timestamp
1416            .as_any()
1417            .downcast_ref::<TimestampMicrosecondArray>()
1418            .unwrap()
1419            .values()
1420            .iter(),
1421        TimeUnit::Nanosecond => timestamp
1422            .as_any()
1423            .downcast_ref::<TimestampNanosecondArray>()
1424            .unwrap()
1425            .values()
1426            .iter(),
1427    }
1428}
1429
1430/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
1431fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
1432    if input.is_empty() {
1433        return Ok(DictionaryArray::new(
1434            UInt32Array::from(Vec::<u32>::new()),
1435            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
1436        ));
1437    }
1438    let mut keys = Vec::with_capacity(16);
1439    let mut values = BinaryBuilder::new();
1440    let mut prev: usize = 0;
1441    keys.push(prev as u32);
1442    values.append_value(input.value(prev));
1443
1444    for current_bytes in input.iter().skip(1) {
1445        // safety: encoded pk must present.
1446        let current_bytes = current_bytes.unwrap();
1447        let prev_bytes = input.value(prev);
1448        if current_bytes != prev_bytes {
1449            values.append_value(current_bytes);
1450            prev += 1;
1451        }
1452        keys.push(prev as u32);
1453    }
1454
1455    Ok(DictionaryArray::new(
1456        UInt32Array::from(keys),
1457        Arc::new(values.finish()) as ArrayRef,
1458    ))
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use std::collections::VecDeque;
1464
1465    use api::v1::{Row, SemanticType, WriteHint};
1466    use datafusion_common::ScalarValue;
1467    use datatypes::arrow::array::Float64Array;
1468    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1469    use datatypes::schema::ColumnSchema;
1470    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1471    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1472    use store_api::storage::RegionId;
1473    use store_api::storage::consts::ReservedColumnId;
1474
1475    use super::*;
1476    use crate::memtable::bulk::context::BulkIterContext;
1477    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1478    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1479    use crate::test_util::memtable_util::{
1480        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1481    };
1482
1483    fn check_binary_array_to_dictionary(
1484        input: &[&[u8]],
1485        expected_keys: &[u32],
1486        expected_values: &[&[u8]],
1487    ) {
1488        let input = BinaryArray::from_iter_values(input.iter());
1489        let array = binary_array_to_dictionary(&input).unwrap();
1490        assert_eq!(
1491            &expected_keys,
1492            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1493        );
1494        assert_eq!(
1495            expected_values,
1496            &array
1497                .values()
1498                .as_any()
1499                .downcast_ref::<BinaryArray>()
1500                .unwrap()
1501                .iter()
1502                .map(|v| v.unwrap())
1503                .collect::<Vec<_>>()
1504        );
1505    }
1506
1507    #[test]
1508    fn test_binary_array_to_dictionary() {
1509        check_binary_array_to_dictionary(&[], &[], &[]);
1510
1511        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1512
1513        check_binary_array_to_dictionary(
1514            &["a".as_bytes(), "a".as_bytes()],
1515            &[0, 0],
1516            &["a".as_bytes()],
1517        );
1518
1519        check_binary_array_to_dictionary(
1520            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1521            &[0, 0, 1],
1522            &["a".as_bytes(), "b".as_bytes()],
1523        );
1524
1525        check_binary_array_to_dictionary(
1526            &[
1527                "a".as_bytes(),
1528                "a".as_bytes(),
1529                "b".as_bytes(),
1530                "c".as_bytes(),
1531            ],
1532            &[0, 0, 1, 2],
1533            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1534        );
1535    }
1536
1537    struct MutationInput<'a> {
1538        k0: &'a str,
1539        k1: u32,
1540        timestamps: &'a [i64],
1541        v1: &'a [Option<f64>],
1542        sequence: u64,
1543    }
1544
1545    #[derive(Debug, PartialOrd, PartialEq)]
1546    struct BatchOutput<'a> {
1547        pk_values: &'a [Value],
1548        timestamps: &'a [i64],
1549        v1: &'a [Option<f64>],
1550    }
1551
1552    fn check_mutations_to_record_batches(
1553        input: &[MutationInput],
1554        expected: &[BatchOutput],
1555        expected_timestamp: (i64, i64),
1556        dedup: bool,
1557    ) {
1558        let metadata = metadata_for_test();
1559        let mutations = input
1560            .iter()
1561            .map(|m| {
1562                build_key_values_with_ts_seq_values(
1563                    &metadata,
1564                    m.k0.to_string(),
1565                    m.k1,
1566                    m.timestamps.iter().copied(),
1567                    m.v1.iter().copied(),
1568                    m.sequence,
1569                )
1570                .mutation
1571            })
1572            .collect::<Vec<_>>();
1573        let total_rows: usize = mutations
1574            .iter()
1575            .flat_map(|m| m.rows.iter())
1576            .map(|r| r.rows.len())
1577            .sum();
1578
1579        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1580
1581        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1582            .unwrap()
1583            .unwrap();
1584        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1585        let mut batches = VecDeque::new();
1586        read_format
1587            .convert_record_batch(&batch, None, &mut batches)
1588            .unwrap();
1589        if !dedup {
1590            assert_eq!(
1591                total_rows,
1592                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1593            );
1594        }
1595        let batch_values = batches
1596            .into_iter()
1597            .map(|b| {
1598                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1599                let timestamps = b
1600                    .timestamps()
1601                    .as_any()
1602                    .downcast_ref::<TimestampMillisecondVector>()
1603                    .unwrap()
1604                    .iter_data()
1605                    .map(|v| v.unwrap().0.value())
1606                    .collect::<Vec<_>>();
1607                let float_values = b.fields()[1]
1608                    .data
1609                    .as_any()
1610                    .downcast_ref::<Float64Vector>()
1611                    .unwrap()
1612                    .iter_data()
1613                    .collect::<Vec<_>>();
1614
1615                (pk_values, timestamps, float_values)
1616            })
1617            .collect::<Vec<_>>();
1618        assert_eq!(expected.len(), batch_values.len());
1619
1620        for idx in 0..expected.len() {
1621            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1622            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1623            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1624        }
1625    }
1626
1627    #[test]
1628    fn test_mutations_to_record_batch() {
1629        check_mutations_to_record_batches(
1630            &[MutationInput {
1631                k0: "a",
1632                k1: 0,
1633                timestamps: &[0],
1634                v1: &[Some(0.1)],
1635                sequence: 0,
1636            }],
1637            &[BatchOutput {
1638                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1639                timestamps: &[0],
1640                v1: &[Some(0.1)],
1641            }],
1642            (0, 0),
1643            true,
1644        );
1645
1646        check_mutations_to_record_batches(
1647            &[
1648                MutationInput {
1649                    k0: "a",
1650                    k1: 0,
1651                    timestamps: &[0],
1652                    v1: &[Some(0.1)],
1653                    sequence: 0,
1654                },
1655                MutationInput {
1656                    k0: "b",
1657                    k1: 0,
1658                    timestamps: &[0],
1659                    v1: &[Some(0.0)],
1660                    sequence: 0,
1661                },
1662                MutationInput {
1663                    k0: "a",
1664                    k1: 0,
1665                    timestamps: &[1],
1666                    v1: &[Some(0.2)],
1667                    sequence: 1,
1668                },
1669                MutationInput {
1670                    k0: "a",
1671                    k1: 1,
1672                    timestamps: &[1],
1673                    v1: &[Some(0.3)],
1674                    sequence: 2,
1675                },
1676            ],
1677            &[
1678                BatchOutput {
1679                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1680                    timestamps: &[0, 1],
1681                    v1: &[Some(0.1), Some(0.2)],
1682                },
1683                BatchOutput {
1684                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1685                    timestamps: &[1],
1686                    v1: &[Some(0.3)],
1687                },
1688                BatchOutput {
1689                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1690                    timestamps: &[0],
1691                    v1: &[Some(0.0)],
1692                },
1693            ],
1694            (0, 1),
1695            true,
1696        );
1697
1698        check_mutations_to_record_batches(
1699            &[
1700                MutationInput {
1701                    k0: "a",
1702                    k1: 0,
1703                    timestamps: &[0],
1704                    v1: &[Some(0.1)],
1705                    sequence: 0,
1706                },
1707                MutationInput {
1708                    k0: "b",
1709                    k1: 0,
1710                    timestamps: &[0],
1711                    v1: &[Some(0.0)],
1712                    sequence: 0,
1713                },
1714                MutationInput {
1715                    k0: "a",
1716                    k1: 0,
1717                    timestamps: &[0],
1718                    v1: &[Some(0.2)],
1719                    sequence: 1,
1720                },
1721            ],
1722            &[
1723                BatchOutput {
1724                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1725                    timestamps: &[0],
1726                    v1: &[Some(0.2)],
1727                },
1728                BatchOutput {
1729                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1730                    timestamps: &[0],
1731                    v1: &[Some(0.0)],
1732                },
1733            ],
1734            (0, 0),
1735            true,
1736        );
1737        check_mutations_to_record_batches(
1738            &[
1739                MutationInput {
1740                    k0: "a",
1741                    k1: 0,
1742                    timestamps: &[0],
1743                    v1: &[Some(0.1)],
1744                    sequence: 0,
1745                },
1746                MutationInput {
1747                    k0: "b",
1748                    k1: 0,
1749                    timestamps: &[0],
1750                    v1: &[Some(0.0)],
1751                    sequence: 0,
1752                },
1753                MutationInput {
1754                    k0: "a",
1755                    k1: 0,
1756                    timestamps: &[0],
1757                    v1: &[Some(0.2)],
1758                    sequence: 1,
1759                },
1760            ],
1761            &[
1762                BatchOutput {
1763                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1764                    timestamps: &[0, 0],
1765                    v1: &[Some(0.2), Some(0.1)],
1766                },
1767                BatchOutput {
1768                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1769                    timestamps: &[0],
1770                    v1: &[Some(0.0)],
1771                },
1772            ],
1773            (0, 0),
1774            false,
1775        );
1776    }
1777
1778    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1779        let metadata = metadata_for_test();
1780        let kvs = input
1781            .iter()
1782            .map(|m| {
1783                build_key_values_with_ts_seq_values(
1784                    &metadata,
1785                    m.k0.to_string(),
1786                    m.k1,
1787                    m.timestamps.iter().copied(),
1788                    m.v1.iter().copied(),
1789                    m.sequence,
1790                )
1791            })
1792            .collect::<Vec<_>>();
1793        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1794        let primary_key_codec = build_primary_key_codec(&metadata);
1795        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1796        for kv in kvs {
1797            converter.append_key_values(&kv).unwrap();
1798        }
1799        let part = converter.convert().unwrap();
1800        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1801        encoder.encode_part(&part).unwrap().unwrap()
1802    }
1803
1804    #[test]
1805    fn test_write_and_read_part_projection() {
1806        let part = encode(&[
1807            MutationInput {
1808                k0: "a",
1809                k1: 0,
1810                timestamps: &[1],
1811                v1: &[Some(0.1)],
1812                sequence: 0,
1813            },
1814            MutationInput {
1815                k0: "b",
1816                k1: 0,
1817                timestamps: &[1],
1818                v1: &[Some(0.0)],
1819                sequence: 0,
1820            },
1821            MutationInput {
1822                k0: "a",
1823                k1: 0,
1824                timestamps: &[2],
1825                v1: &[Some(0.2)],
1826                sequence: 1,
1827            },
1828        ]);
1829
1830        let projection = &[4u32];
1831        let mut reader = part
1832            .read(
1833                Arc::new(
1834                    BulkIterContext::new(
1835                        part.metadata.region_metadata.clone(),
1836                        Some(projection.as_slice()),
1837                        None,
1838                        false,
1839                    )
1840                    .unwrap(),
1841                ),
1842                None,
1843                None,
1844            )
1845            .unwrap()
1846            .expect("expect at least one row group");
1847
1848        let mut total_rows_read = 0;
1849        let mut field: Vec<f64> = vec![];
1850        for res in reader {
1851            let batch = res.unwrap();
1852            assert_eq!(5, batch.num_columns());
1853            field.extend_from_slice(
1854                batch
1855                    .column(0)
1856                    .as_any()
1857                    .downcast_ref::<Float64Array>()
1858                    .unwrap()
1859                    .values(),
1860            );
1861            total_rows_read += batch.num_rows();
1862        }
1863        assert_eq!(3, total_rows_read);
1864        assert_eq!(vec![0.1, 0.2, 0.0], field);
1865    }
1866
1867    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1868        let metadata = metadata_for_test();
1869        let kvs = key_values
1870            .into_iter()
1871            .map(|(k0, k1, (start, end), sequence)| {
1872                let ts = (start..end);
1873                let v1 = (start..end).map(|_| None);
1874                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1875            })
1876            .collect::<Vec<_>>();
1877        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1878        let primary_key_codec = build_primary_key_codec(&metadata);
1879        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1880        for kv in kvs {
1881            converter.append_key_values(&kv).unwrap();
1882        }
1883        let part = converter.convert().unwrap();
1884        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1885        encoder.encode_part(&part).unwrap().unwrap()
1886    }
1887
1888    fn check_prune_row_group(
1889        part: &EncodedBulkPart,
1890        predicate: Option<Predicate>,
1891        expected_rows: usize,
1892    ) {
1893        let context = Arc::new(
1894            BulkIterContext::new(
1895                part.metadata.region_metadata.clone(),
1896                None,
1897                predicate,
1898                false,
1899            )
1900            .unwrap(),
1901        );
1902        let mut reader = part
1903            .read(context, None, None)
1904            .unwrap()
1905            .expect("expect at least one row group");
1906        let mut total_rows_read = 0;
1907        for res in reader {
1908            let batch = res.unwrap();
1909            total_rows_read += batch.num_rows();
1910        }
1911        // Should only read row group 1.
1912        assert_eq!(expected_rows, total_rows_read);
1913    }
1914
1915    #[test]
1916    fn test_prune_row_groups() {
1917        let part = prepare(vec![
1918            ("a", 0, (0, 40), 1),
1919            ("a", 1, (0, 60), 1),
1920            ("b", 0, (0, 100), 2),
1921            ("b", 1, (100, 180), 3),
1922            ("b", 1, (180, 210), 4),
1923        ]);
1924
1925        let context = Arc::new(
1926            BulkIterContext::new(
1927                part.metadata.region_metadata.clone(),
1928                None,
1929                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1930                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1931                )])),
1932                false,
1933            )
1934            .unwrap(),
1935        );
1936        assert!(part.read(context, None, None).unwrap().is_none());
1937
1938        check_prune_row_group(&part, None, 310);
1939
1940        check_prune_row_group(
1941            &part,
1942            Some(Predicate::new(vec![
1943                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1944                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1945            ])),
1946            40,
1947        );
1948
1949        check_prune_row_group(
1950            &part,
1951            Some(Predicate::new(vec![
1952                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1953                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1954            ])),
1955            60,
1956        );
1957
1958        check_prune_row_group(
1959            &part,
1960            Some(Predicate::new(vec![
1961                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1962            ])),
1963            100,
1964        );
1965
1966        check_prune_row_group(
1967            &part,
1968            Some(Predicate::new(vec![
1969                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1970                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1971            ])),
1972            100,
1973        );
1974
1975        /// Predicates over field column can do precise filtering.
1976        check_prune_row_group(
1977            &part,
1978            Some(Predicate::new(vec![
1979                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1980            ])),
1981            1,
1982        );
1983    }
1984
1985    #[test]
1986    fn test_bulk_part_converter_append_and_convert() {
1987        let metadata = metadata_for_test();
1988        let capacity = 100;
1989        let primary_key_codec = build_primary_key_codec(&metadata);
1990        let schema = to_flat_sst_arrow_schema(
1991            &metadata,
1992            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1993        );
1994
1995        let mut converter =
1996            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1997
1998        let key_values1 = build_key_values_with_ts_seq_values(
1999            &metadata,
2000            "key1".to_string(),
2001            1u32,
2002            vec![1000, 2000].into_iter(),
2003            vec![Some(1.0), Some(2.0)].into_iter(),
2004            1,
2005        );
2006
2007        let key_values2 = build_key_values_with_ts_seq_values(
2008            &metadata,
2009            "key2".to_string(),
2010            2u32,
2011            vec![1500].into_iter(),
2012            vec![Some(3.0)].into_iter(),
2013            2,
2014        );
2015
2016        converter.append_key_values(&key_values1).unwrap();
2017        converter.append_key_values(&key_values2).unwrap();
2018
2019        let bulk_part = converter.convert().unwrap();
2020
2021        assert_eq!(bulk_part.num_rows(), 3);
2022        assert_eq!(bulk_part.min_timestamp, 1000);
2023        assert_eq!(bulk_part.max_timestamp, 2000);
2024        assert_eq!(bulk_part.sequence, 2);
2025        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2026
2027        // Validate primary key columns are stored
2028        // Schema should include primary key columns k0 and k1 at the beginning
2029        let schema = bulk_part.batch.schema();
2030        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2031        assert_eq!(
2032            field_names,
2033            vec![
2034                "k0",
2035                "k1",
2036                "v0",
2037                "v1",
2038                "ts",
2039                "__primary_key",
2040                "__sequence",
2041                "__op_type"
2042            ]
2043        );
2044    }
2045
2046    #[test]
2047    fn test_bulk_part_converter_sorting() {
2048        let metadata = metadata_for_test();
2049        let capacity = 100;
2050        let primary_key_codec = build_primary_key_codec(&metadata);
2051        let schema = to_flat_sst_arrow_schema(
2052            &metadata,
2053            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2054        );
2055
2056        let mut converter =
2057            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2058
2059        let key_values1 = build_key_values_with_ts_seq_values(
2060            &metadata,
2061            "z_key".to_string(),
2062            3u32,
2063            vec![3000].into_iter(),
2064            vec![Some(3.0)].into_iter(),
2065            3,
2066        );
2067
2068        let key_values2 = build_key_values_with_ts_seq_values(
2069            &metadata,
2070            "a_key".to_string(),
2071            1u32,
2072            vec![1000].into_iter(),
2073            vec![Some(1.0)].into_iter(),
2074            1,
2075        );
2076
2077        let key_values3 = build_key_values_with_ts_seq_values(
2078            &metadata,
2079            "m_key".to_string(),
2080            2u32,
2081            vec![2000].into_iter(),
2082            vec![Some(2.0)].into_iter(),
2083            2,
2084        );
2085
2086        converter.append_key_values(&key_values1).unwrap();
2087        converter.append_key_values(&key_values2).unwrap();
2088        converter.append_key_values(&key_values3).unwrap();
2089
2090        let bulk_part = converter.convert().unwrap();
2091
2092        assert_eq!(bulk_part.num_rows(), 3);
2093
2094        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2095        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2096
2097        let ts_array = ts_column
2098            .as_any()
2099            .downcast_ref::<TimestampMillisecondArray>()
2100            .unwrap();
2101        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2102
2103        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2104        assert_eq!(seq_array.values(), &[1, 2, 3]);
2105
2106        // Validate primary key columns are stored
2107        let schema = bulk_part.batch.schema();
2108        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2109        assert_eq!(
2110            field_names,
2111            vec![
2112                "k0",
2113                "k1",
2114                "v0",
2115                "v1",
2116                "ts",
2117                "__primary_key",
2118                "__sequence",
2119                "__op_type"
2120            ]
2121        );
2122    }
2123
2124    #[test]
2125    fn test_bulk_part_converter_empty() {
2126        let metadata = metadata_for_test();
2127        let capacity = 10;
2128        let primary_key_codec = build_primary_key_codec(&metadata);
2129        let schema = to_flat_sst_arrow_schema(
2130            &metadata,
2131            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2132        );
2133
2134        let converter =
2135            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2136
2137        let bulk_part = converter.convert().unwrap();
2138
2139        assert_eq!(bulk_part.num_rows(), 0);
2140        assert_eq!(bulk_part.min_timestamp, i64::MAX);
2141        assert_eq!(bulk_part.max_timestamp, i64::MIN);
2142        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2143
2144        // Validate primary key columns are present in schema even for empty batch
2145        let schema = bulk_part.batch.schema();
2146        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2147        assert_eq!(
2148            field_names,
2149            vec![
2150                "k0",
2151                "k1",
2152                "v0",
2153                "v1",
2154                "ts",
2155                "__primary_key",
2156                "__sequence",
2157                "__op_type"
2158            ]
2159        );
2160    }
2161
2162    #[test]
2163    fn test_bulk_part_converter_without_primary_key_columns() {
2164        let metadata = metadata_for_test();
2165        let primary_key_codec = build_primary_key_codec(&metadata);
2166        let schema = to_flat_sst_arrow_schema(
2167            &metadata,
2168            &FlatSchemaOptions {
2169                raw_pk_columns: false,
2170                string_pk_use_dict: true,
2171            },
2172        );
2173
2174        let capacity = 100;
2175        let mut converter =
2176            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2177
2178        let key_values1 = build_key_values_with_ts_seq_values(
2179            &metadata,
2180            "key1".to_string(),
2181            1u32,
2182            vec![1000, 2000].into_iter(),
2183            vec![Some(1.0), Some(2.0)].into_iter(),
2184            1,
2185        );
2186
2187        let key_values2 = build_key_values_with_ts_seq_values(
2188            &metadata,
2189            "key2".to_string(),
2190            2u32,
2191            vec![1500].into_iter(),
2192            vec![Some(3.0)].into_iter(),
2193            2,
2194        );
2195
2196        converter.append_key_values(&key_values1).unwrap();
2197        converter.append_key_values(&key_values2).unwrap();
2198
2199        let bulk_part = converter.convert().unwrap();
2200
2201        assert_eq!(bulk_part.num_rows(), 3);
2202        assert_eq!(bulk_part.min_timestamp, 1000);
2203        assert_eq!(bulk_part.max_timestamp, 2000);
2204        assert_eq!(bulk_part.sequence, 2);
2205        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2206
2207        // Validate primary key columns are NOT stored individually
2208        let schema = bulk_part.batch.schema();
2209        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2210        assert_eq!(
2211            field_names,
2212            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2213        );
2214    }
2215
2216    #[allow(clippy::too_many_arguments)]
2217    fn build_key_values_with_sparse_encoding(
2218        metadata: &RegionMetadataRef,
2219        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2220        table_id: u32,
2221        tsid: u64,
2222        k0: String,
2223        k1: String,
2224        timestamps: impl Iterator<Item = i64>,
2225        values: impl Iterator<Item = Option<f64>>,
2226        sequence: SequenceNumber,
2227    ) -> KeyValues {
2228        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
2229        let pk_values = vec![
2230            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2231            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2232            (0, Value::String(k0.clone().into())),
2233            (1, Value::String(k1.clone().into())),
2234        ];
2235        let mut encoded_key = Vec::new();
2236        primary_key_codec
2237            .encode_values(&pk_values, &mut encoded_key)
2238            .unwrap();
2239        assert!(!encoded_key.is_empty());
2240
2241        // Create schema for sparse encoding: __primary_key, ts, v0, v1
2242        let column_schema = vec![
2243            api::v1::ColumnSchema {
2244                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2245                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2246                    ConcreteDataType::binary_datatype(),
2247                )
2248                .unwrap()
2249                .datatype() as i32,
2250                semantic_type: api::v1::SemanticType::Tag as i32,
2251                ..Default::default()
2252            },
2253            api::v1::ColumnSchema {
2254                column_name: "ts".to_string(),
2255                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2256                    ConcreteDataType::timestamp_millisecond_datatype(),
2257                )
2258                .unwrap()
2259                .datatype() as i32,
2260                semantic_type: api::v1::SemanticType::Timestamp as i32,
2261                ..Default::default()
2262            },
2263            api::v1::ColumnSchema {
2264                column_name: "v0".to_string(),
2265                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2266                    ConcreteDataType::int64_datatype(),
2267                )
2268                .unwrap()
2269                .datatype() as i32,
2270                semantic_type: api::v1::SemanticType::Field as i32,
2271                ..Default::default()
2272            },
2273            api::v1::ColumnSchema {
2274                column_name: "v1".to_string(),
2275                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2276                    ConcreteDataType::float64_datatype(),
2277                )
2278                .unwrap()
2279                .datatype() as i32,
2280                semantic_type: api::v1::SemanticType::Field as i32,
2281                ..Default::default()
2282            },
2283        ];
2284
2285        let rows = timestamps
2286            .zip(values)
2287            .map(|(ts, v)| Row {
2288                values: vec![
2289                    api::v1::Value {
2290                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2291                            encoded_key.clone(),
2292                        )),
2293                    },
2294                    api::v1::Value {
2295                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2296                    },
2297                    api::v1::Value {
2298                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2299                    },
2300                    api::v1::Value {
2301                        value_data: v.map(api::v1::value::ValueData::F64Value),
2302                    },
2303                ],
2304            })
2305            .collect();
2306
2307        let mutation = api::v1::Mutation {
2308            op_type: 1,
2309            sequence,
2310            rows: Some(api::v1::Rows {
2311                schema: column_schema,
2312                rows,
2313            }),
2314            write_hint: Some(WriteHint {
2315                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2316            }),
2317        };
2318        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2319    }
2320
2321    #[test]
2322    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2323        use api::v1::SemanticType;
2324        use datatypes::schema::ColumnSchema;
2325        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2326        use store_api::storage::RegionId;
2327
2328        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2329        builder
2330            .push_column_metadata(ColumnMetadata {
2331                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2332                semantic_type: SemanticType::Tag,
2333                column_id: 0,
2334            })
2335            .push_column_metadata(ColumnMetadata {
2336                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2337                semantic_type: SemanticType::Tag,
2338                column_id: 1,
2339            })
2340            .push_column_metadata(ColumnMetadata {
2341                column_schema: ColumnSchema::new(
2342                    "ts",
2343                    ConcreteDataType::timestamp_millisecond_datatype(),
2344                    false,
2345                ),
2346                semantic_type: SemanticType::Timestamp,
2347                column_id: 2,
2348            })
2349            .push_column_metadata(ColumnMetadata {
2350                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2351                semantic_type: SemanticType::Field,
2352                column_id: 3,
2353            })
2354            .push_column_metadata(ColumnMetadata {
2355                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2356                semantic_type: SemanticType::Field,
2357                column_id: 4,
2358            })
2359            .primary_key(vec![0, 1])
2360            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2361        let metadata = Arc::new(builder.build().unwrap());
2362
2363        let primary_key_codec = build_primary_key_codec(&metadata);
2364        let schema = to_flat_sst_arrow_schema(
2365            &metadata,
2366            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2367        );
2368
2369        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2370        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2371
2372        let capacity = 100;
2373        let mut converter =
2374            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2375
2376        let key_values1 = build_key_values_with_sparse_encoding(
2377            &metadata,
2378            &primary_key_codec,
2379            2048u32, // table_id
2380            100u64,  // tsid
2381            "key11".to_string(),
2382            "key21".to_string(),
2383            vec![1000, 2000].into_iter(),
2384            vec![Some(1.0), Some(2.0)].into_iter(),
2385            1,
2386        );
2387
2388        let key_values2 = build_key_values_with_sparse_encoding(
2389            &metadata,
2390            &primary_key_codec,
2391            4096u32, // table_id
2392            200u64,  // tsid
2393            "key12".to_string(),
2394            "key22".to_string(),
2395            vec![1500].into_iter(),
2396            vec![Some(3.0)].into_iter(),
2397            2,
2398        );
2399
2400        converter.append_key_values(&key_values1).unwrap();
2401        converter.append_key_values(&key_values2).unwrap();
2402
2403        let bulk_part = converter.convert().unwrap();
2404
2405        assert_eq!(bulk_part.num_rows(), 3);
2406        assert_eq!(bulk_part.min_timestamp, 1000);
2407        assert_eq!(bulk_part.max_timestamp, 2000);
2408        assert_eq!(bulk_part.sequence, 2);
2409        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2410
2411        // For sparse encoding, primary key columns should NOT be stored individually
2412        // even when store_primary_key_columns is true, because sparse encoding
2413        // stores the encoded primary key in the __primary_key column
2414        let schema = bulk_part.batch.schema();
2415        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2416        assert_eq!(
2417            field_names,
2418            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2419        );
2420
2421        // Verify the __primary_key column contains encoded sparse keys
2422        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2423        let dict_array = primary_key_column
2424            .as_any()
2425            .downcast_ref::<DictionaryArray<UInt32Type>>()
2426            .unwrap();
2427
2428        // Should have non-zero entries indicating encoded primary keys
2429        assert!(!dict_array.is_empty());
2430        assert_eq!(dict_array.len(), 3); // 3 rows total
2431
2432        // Verify values are properly encoded binary data (not empty)
2433        let values = dict_array
2434            .values()
2435            .as_any()
2436            .downcast_ref::<BinaryArray>()
2437            .unwrap();
2438        for i in 0..values.len() {
2439            assert!(
2440                !values.value(i).is_empty(),
2441                "Encoded primary key should not be empty"
2442            );
2443        }
2444    }
2445
2446    #[test]
2447    fn test_convert_bulk_part_empty() {
2448        let metadata = metadata_for_test();
2449        let schema = to_flat_sst_arrow_schema(
2450            &metadata,
2451            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2452        );
2453        let primary_key_codec = build_primary_key_codec(&metadata);
2454
2455        // Create empty batch
2456        let empty_batch = RecordBatch::new_empty(schema.clone());
2457        let empty_part = BulkPart {
2458            batch: empty_batch,
2459            max_timestamp: 0,
2460            min_timestamp: 0,
2461            sequence: 0,
2462            timestamp_index: 0,
2463            raw_data: None,
2464        };
2465
2466        let result =
2467            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2468        assert!(result.is_none());
2469    }
2470
2471    #[test]
2472    fn test_convert_bulk_part_dense_with_pk_columns() {
2473        let metadata = metadata_for_test();
2474        let primary_key_codec = build_primary_key_codec(&metadata);
2475
2476        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2477            "key1", "key2", "key1",
2478        ]));
2479        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2480        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2481        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2482        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2483
2484        let input_schema = Arc::new(Schema::new(vec![
2485            Field::new("k0", ArrowDataType::Utf8, false),
2486            Field::new("k1", ArrowDataType::UInt32, false),
2487            Field::new("v0", ArrowDataType::Int64, true),
2488            Field::new("v1", ArrowDataType::Float64, true),
2489            Field::new(
2490                "ts",
2491                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2492                false,
2493            ),
2494        ]));
2495
2496        let input_batch = RecordBatch::try_new(
2497            input_schema,
2498            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2499        )
2500        .unwrap();
2501
2502        let part = BulkPart {
2503            batch: input_batch,
2504            max_timestamp: 2000,
2505            min_timestamp: 1000,
2506            sequence: 5,
2507            timestamp_index: 4,
2508            raw_data: None,
2509        };
2510
2511        let output_schema = to_flat_sst_arrow_schema(
2512            &metadata,
2513            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2514        );
2515
2516        let result = convert_bulk_part(
2517            part,
2518            &metadata,
2519            primary_key_codec,
2520            output_schema,
2521            true, // store primary key columns
2522        )
2523        .unwrap();
2524
2525        let converted = result.unwrap();
2526
2527        assert_eq!(converted.num_rows(), 3);
2528        assert_eq!(converted.max_timestamp, 2000);
2529        assert_eq!(converted.min_timestamp, 1000);
2530        assert_eq!(converted.sequence, 5);
2531
2532        let schema = converted.batch.schema();
2533        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2534        assert_eq!(
2535            field_names,
2536            vec![
2537                "k0",
2538                "k1",
2539                "v0",
2540                "v1",
2541                "ts",
2542                "__primary_key",
2543                "__sequence",
2544                "__op_type"
2545            ]
2546        );
2547
2548        let k0_col = converted.batch.column_by_name("k0").unwrap();
2549        assert!(matches!(
2550            k0_col.data_type(),
2551            ArrowDataType::Dictionary(_, _)
2552        ));
2553
2554        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2555        let dict_array = pk_col
2556            .as_any()
2557            .downcast_ref::<DictionaryArray<UInt32Type>>()
2558            .unwrap();
2559        let keys = dict_array.keys();
2560
2561        assert_eq!(keys.len(), 3);
2562    }
2563
2564    #[test]
2565    fn test_convert_bulk_part_dense_without_pk_columns() {
2566        let metadata = metadata_for_test();
2567        let primary_key_codec = build_primary_key_codec(&metadata);
2568
2569        // Create input batch with primary key columns (k0, k1)
2570        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2571        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2572        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2573        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2574        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2575
2576        let input_schema = Arc::new(Schema::new(vec![
2577            Field::new("k0", ArrowDataType::Utf8, false),
2578            Field::new("k1", ArrowDataType::UInt32, false),
2579            Field::new("v0", ArrowDataType::Int64, true),
2580            Field::new("v1", ArrowDataType::Float64, true),
2581            Field::new(
2582                "ts",
2583                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2584                false,
2585            ),
2586        ]));
2587
2588        let input_batch = RecordBatch::try_new(
2589            input_schema,
2590            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2591        )
2592        .unwrap();
2593
2594        let part = BulkPart {
2595            batch: input_batch,
2596            max_timestamp: 2000,
2597            min_timestamp: 1000,
2598            sequence: 3,
2599            timestamp_index: 4,
2600            raw_data: None,
2601        };
2602
2603        let output_schema = to_flat_sst_arrow_schema(
2604            &metadata,
2605            &FlatSchemaOptions {
2606                raw_pk_columns: false,
2607                string_pk_use_dict: true,
2608            },
2609        );
2610
2611        let result = convert_bulk_part(
2612            part,
2613            &metadata,
2614            primary_key_codec,
2615            output_schema,
2616            false, // don't store primary key columns
2617        )
2618        .unwrap();
2619
2620        let converted = result.unwrap();
2621
2622        assert_eq!(converted.num_rows(), 2);
2623        assert_eq!(converted.max_timestamp, 2000);
2624        assert_eq!(converted.min_timestamp, 1000);
2625        assert_eq!(converted.sequence, 3);
2626
2627        // Verify schema does NOT include individual primary key columns
2628        let schema = converted.batch.schema();
2629        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2630        assert_eq!(
2631            field_names,
2632            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2633        );
2634
2635        // Verify __primary_key column is present and is a dictionary
2636        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2637        assert!(matches!(
2638            pk_col.data_type(),
2639            ArrowDataType::Dictionary(_, _)
2640        ));
2641    }
2642
2643    #[test]
2644    fn test_convert_bulk_part_sparse_encoding() {
2645        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2646        builder
2647            .push_column_metadata(ColumnMetadata {
2648                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2649                semantic_type: SemanticType::Tag,
2650                column_id: 0,
2651            })
2652            .push_column_metadata(ColumnMetadata {
2653                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2654                semantic_type: SemanticType::Tag,
2655                column_id: 1,
2656            })
2657            .push_column_metadata(ColumnMetadata {
2658                column_schema: ColumnSchema::new(
2659                    "ts",
2660                    ConcreteDataType::timestamp_millisecond_datatype(),
2661                    false,
2662                ),
2663                semantic_type: SemanticType::Timestamp,
2664                column_id: 2,
2665            })
2666            .push_column_metadata(ColumnMetadata {
2667                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2668                semantic_type: SemanticType::Field,
2669                column_id: 3,
2670            })
2671            .push_column_metadata(ColumnMetadata {
2672                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2673                semantic_type: SemanticType::Field,
2674                column_id: 4,
2675            })
2676            .primary_key(vec![0, 1])
2677            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2678        let metadata = Arc::new(builder.build().unwrap());
2679
2680        let primary_key_codec = build_primary_key_codec(&metadata);
2681
2682        // Create input batch with __primary_key column (sparse encoding)
2683        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2684            b"encoded_key_1".as_slice(),
2685            b"encoded_key_2".as_slice(),
2686        ]));
2687        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2688        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2689        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2690
2691        let input_schema = Arc::new(Schema::new(vec![
2692            Field::new("__primary_key", ArrowDataType::Binary, false),
2693            Field::new("v0", ArrowDataType::Int64, true),
2694            Field::new("v1", ArrowDataType::Float64, true),
2695            Field::new(
2696                "ts",
2697                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2698                false,
2699            ),
2700        ]));
2701
2702        let input_batch =
2703            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2704                .unwrap();
2705
2706        let part = BulkPart {
2707            batch: input_batch,
2708            max_timestamp: 2000,
2709            min_timestamp: 1000,
2710            sequence: 7,
2711            timestamp_index: 3,
2712            raw_data: None,
2713        };
2714
2715        let output_schema = to_flat_sst_arrow_schema(
2716            &metadata,
2717            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2718        );
2719
2720        let result = convert_bulk_part(
2721            part,
2722            &metadata,
2723            primary_key_codec,
2724            output_schema,
2725            true, // store_primary_key_columns (ignored for sparse)
2726        )
2727        .unwrap();
2728
2729        let converted = result.unwrap();
2730
2731        assert_eq!(converted.num_rows(), 2);
2732        assert_eq!(converted.max_timestamp, 2000);
2733        assert_eq!(converted.min_timestamp, 1000);
2734        assert_eq!(converted.sequence, 7);
2735
2736        // Verify schema does NOT include individual primary key columns (sparse encoding)
2737        let schema = converted.batch.schema();
2738        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2739        assert_eq!(
2740            field_names,
2741            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2742        );
2743
2744        // Verify __primary_key is dictionary encoded
2745        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2746        assert!(matches!(
2747            pk_col.data_type(),
2748            ArrowDataType::Dictionary(_, _)
2749        ));
2750    }
2751
2752    #[test]
2753    fn test_convert_bulk_part_sorting_with_multiple_series() {
2754        let metadata = metadata_for_test();
2755        let primary_key_codec = build_primary_key_codec(&metadata);
2756
2757        // Create unsorted batch with multiple series and timestamps
2758        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2759            "series_b", "series_a", "series_b", "series_a",
2760        ]));
2761        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2762        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2763        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2764        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2765            2000, 1000, 4000, 3000,
2766        ]));
2767
2768        let input_schema = Arc::new(Schema::new(vec![
2769            Field::new("k0", ArrowDataType::Utf8, false),
2770            Field::new("k1", ArrowDataType::UInt32, false),
2771            Field::new("v0", ArrowDataType::Int64, true),
2772            Field::new("v1", ArrowDataType::Float64, true),
2773            Field::new(
2774                "ts",
2775                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2776                false,
2777            ),
2778        ]));
2779
2780        let input_batch = RecordBatch::try_new(
2781            input_schema,
2782            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2783        )
2784        .unwrap();
2785
2786        let part = BulkPart {
2787            batch: input_batch,
2788            max_timestamp: 4000,
2789            min_timestamp: 1000,
2790            sequence: 10,
2791            timestamp_index: 4,
2792            raw_data: None,
2793        };
2794
2795        let output_schema = to_flat_sst_arrow_schema(
2796            &metadata,
2797            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2798        );
2799
2800        let result =
2801            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2802
2803        let converted = result.unwrap();
2804
2805        assert_eq!(converted.num_rows(), 4);
2806
2807        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2808        let ts_col = converted.batch.column(converted.timestamp_index);
2809        let ts_array = ts_col
2810            .as_any()
2811            .downcast_ref::<TimestampMillisecondArray>()
2812            .unwrap();
2813
2814        // After sorting by (pk, ts), we should have:
2815        // series_a,1: ts=1000, 3000
2816        // series_b,2: ts=2000, 4000
2817        let timestamps: Vec<i64> = ts_array.values().to_vec();
2818        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2819    }
2820}