mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64    InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
75use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
76
77const INIT_DICT_VALUE_CAPACITY: usize = 8;
78
79/// A raw bulk part in the memtable.
80#[derive(Clone)]
81pub struct BulkPart {
82    pub batch: RecordBatch,
83    pub max_timestamp: i64,
84    pub min_timestamp: i64,
85    pub sequence: u64,
86    pub timestamp_index: usize,
87    pub raw_data: Option<ArrowIpc>,
88}
89
90impl TryFrom<BulkWalEntry> for BulkPart {
91    type Error = error::Error;
92
93    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
94        match value.body.expect("Entry payload should be present") {
95            Body::ArrowIpc(ipc) => {
96                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
97                    .context(error::ConvertBulkWalEntrySnafu)?;
98                let batch = decoder
99                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
100                    .context(error::ConvertBulkWalEntrySnafu)?;
101                Ok(Self {
102                    batch,
103                    max_timestamp: value.max_ts,
104                    min_timestamp: value.min_ts,
105                    sequence: value.sequence,
106                    timestamp_index: value.timestamp_index as usize,
107                    raw_data: Some(ipc),
108                })
109            }
110        }
111    }
112}
113
114impl TryFrom<&BulkPart> for BulkWalEntry {
115    type Error = error::Error;
116
117    fn try_from(value: &BulkPart) -> Result<Self> {
118        if let Some(ipc) = &value.raw_data {
119            Ok(BulkWalEntry {
120                sequence: value.sequence,
121                max_ts: value.max_timestamp,
122                min_ts: value.min_timestamp,
123                timestamp_index: value.timestamp_index as u32,
124                body: Some(Body::ArrowIpc(ipc.clone())),
125            })
126        } else {
127            let mut encoder = FlightEncoder::default();
128            let schema_bytes = encoder
129                .encode_schema(value.batch.schema().as_ref())
130                .data_header;
131            let [rb_data] = encoder
132                .encode(FlightMessage::RecordBatch(value.batch.clone()))
133                .try_into()
134                .map_err(|_| {
135                    error::UnsupportedOperationSnafu {
136                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
137                    }
138                    .build()
139                })?;
140            Ok(BulkWalEntry {
141                sequence: value.sequence,
142                max_ts: value.max_timestamp,
143                min_ts: value.min_timestamp,
144                timestamp_index: value.timestamp_index as u32,
145                body: Some(Body::ArrowIpc(ArrowIpc {
146                    schema: schema_bytes,
147                    data_header: rb_data.data_header,
148                    payload: rb_data.data_body,
149                })),
150            })
151        }
152    }
153}
154
155impl BulkPart {
156    pub(crate) fn estimated_size(&self) -> usize {
157        record_batch_estimated_size(&self.batch)
158    }
159
160    /// Returns the estimated series count in this BulkPart.
161    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
162    pub fn estimated_series_count(&self) -> usize {
163        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
164        let pk_column = self.batch.column(pk_column_idx);
165        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
166            dict_array.values().len()
167        } else {
168            0
169        }
170    }
171
172    /// Creates MemtableStats from this BulkPart.
173    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
174        let ts_type = region_metadata.time_index_type();
175        let min_ts = ts_type.create_timestamp(self.min_timestamp);
176        let max_ts = ts_type.create_timestamp(self.max_timestamp);
177
178        MemtableStats {
179            estimated_bytes: self.estimated_size(),
180            time_range: Some((min_ts, max_ts)),
181            num_rows: self.num_rows(),
182            num_ranges: 1,
183            max_sequence: self.sequence,
184            series_count: self.estimated_series_count(),
185        }
186    }
187
188    /// Fills missing columns in the BulkPart batch with default values.
189    ///
190    /// This function checks if the batch schema matches the region metadata schema,
191    /// and if there are missing columns, it fills them with default values (or null
192    /// for nullable columns).
193    ///
194    /// # Arguments
195    ///
196    /// * `region_metadata` - The region metadata containing the expected schema
197    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
198        // Builds a map of existing columns in the batch
199        let batch_schema = self.batch.schema();
200        let batch_columns: HashSet<_> = batch_schema
201            .fields()
202            .iter()
203            .map(|f| f.name().as_str())
204            .collect();
205
206        // Finds columns that need to be filled
207        let mut columns_to_fill = Vec::new();
208        for column_meta in &region_metadata.column_metadatas {
209            // TODO(yingwen): Returns error if it is impure default after we support filling
210            // bulk insert request in the frontend
211            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
212                columns_to_fill.push(column_meta);
213            }
214        }
215
216        if columns_to_fill.is_empty() {
217            return Ok(());
218        }
219
220        let num_rows = self.batch.num_rows();
221
222        let mut new_columns = Vec::new();
223        let mut new_fields = Vec::new();
224
225        // First, adds all existing columns
226        new_fields.extend(batch_schema.fields().iter().cloned());
227        new_columns.extend_from_slice(self.batch.columns());
228
229        let region_id = region_metadata.region_id;
230        // Then adds the missing columns with default values
231        for column_meta in columns_to_fill {
232            let default_vector = column_meta
233                .column_schema
234                .create_default_vector(num_rows)
235                .context(CreateDefaultSnafu {
236                    region_id,
237                    column: &column_meta.column_schema.name,
238                })?
239                .with_context(|| InvalidRequestSnafu {
240                    region_id,
241                    reason: format!(
242                        "column {} does not have default value",
243                        column_meta.column_schema.name
244                    ),
245                })?;
246            let arrow_array = default_vector.to_arrow_array();
247            column_meta.column_schema.data_type.as_arrow_type();
248
249            new_fields.push(Arc::new(Field::new(
250                column_meta.column_schema.name.clone(),
251                column_meta.column_schema.data_type.as_arrow_type(),
252                column_meta.column_schema.is_nullable(),
253            )));
254            new_columns.push(arrow_array);
255        }
256
257        // Create a new schema and batch with the filled columns
258        let new_schema = Arc::new(Schema::new(new_fields));
259        let new_batch =
260            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
261
262        // Update the batch
263        self.batch = new_batch;
264
265        Ok(())
266    }
267
268    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
269    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
270        let vectors = region_metadata
271            .schema
272            .column_schemas()
273            .iter()
274            .map(|col| match self.batch.column_by_name(&col.name) {
275                None => Ok(None),
276                Some(col) => Helper::try_into_vector(col).map(Some),
277            })
278            .collect::<datatypes::error::Result<Vec<_>>>()
279            .context(error::ComputeVectorSnafu)?;
280
281        let rows = (0..self.num_rows())
282            .map(|row_idx| {
283                let values = (0..self.batch.num_columns())
284                    .map(|col_idx| {
285                        if let Some(v) = &vectors[col_idx] {
286                            to_grpc_value(v.get(row_idx))
287                        } else {
288                            api::v1::Value { value_data: None }
289                        }
290                    })
291                    .collect::<Vec<_>>();
292                api::v1::Row { values }
293            })
294            .collect::<Vec<_>>();
295
296        let schema = region_metadata
297            .column_metadatas
298            .iter()
299            .map(|c| {
300                let data_type_wrapper =
301                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
302                Ok(api::v1::ColumnSchema {
303                    column_name: c.column_schema.name.clone(),
304                    datatype: data_type_wrapper.datatype() as i32,
305                    semantic_type: c.semantic_type as i32,
306                    ..Default::default()
307                })
308            })
309            .collect::<api::error::Result<Vec<_>>>()
310            .context(error::ConvertColumnDataTypeSnafu {
311                reason: "failed to convert region metadata to column schema",
312            })?;
313
314        let rows = api::v1::Rows { schema, rows };
315
316        Ok(Mutation {
317            op_type: OpType::Put as i32,
318            sequence: self.sequence,
319            rows: Some(rows),
320            write_hint: None,
321        })
322    }
323
324    pub fn timestamps(&self) -> &ArrayRef {
325        self.batch.column(self.timestamp_index)
326    }
327
328    pub fn num_rows(&self) -> usize {
329        self.batch.num_rows()
330    }
331}
332
333/// A collection of small unordered bulk parts.
334/// Used to batch small parts together before merging them into a sorted part.
335pub struct UnorderedPart {
336    /// Small bulk parts that haven't been sorted yet.
337    parts: Vec<BulkPart>,
338    /// Total number of rows across all parts.
339    total_rows: usize,
340    /// Minimum timestamp across all parts.
341    min_timestamp: i64,
342    /// Maximum timestamp across all parts.
343    max_timestamp: i64,
344    /// Maximum sequence number across all parts.
345    max_sequence: u64,
346    /// Row count threshold for accepting parts (default: 1024).
347    threshold: usize,
348    /// Row count threshold for compacting (default: 4096).
349    compact_threshold: usize,
350}
351
352impl Default for UnorderedPart {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358impl UnorderedPart {
359    /// Creates a new empty UnorderedPart.
360    pub fn new() -> Self {
361        Self {
362            parts: Vec::new(),
363            total_rows: 0,
364            min_timestamp: i64::MAX,
365            max_timestamp: i64::MIN,
366            max_sequence: 0,
367            threshold: 1024,
368            compact_threshold: 4096,
369        }
370    }
371
372    /// Sets the threshold for accepting parts into unordered_part.
373    pub fn set_threshold(&mut self, threshold: usize) {
374        self.threshold = threshold;
375    }
376
377    /// Sets the threshold for compacting unordered_part.
378    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
379        self.compact_threshold = compact_threshold;
380    }
381
382    /// Returns the threshold for accepting parts.
383    pub fn threshold(&self) -> usize {
384        self.threshold
385    }
386
387    /// Returns the compact threshold.
388    pub fn compact_threshold(&self) -> usize {
389        self.compact_threshold
390    }
391
392    /// Returns true if this part should accept the given row count.
393    pub fn should_accept(&self, num_rows: usize) -> bool {
394        num_rows < self.threshold
395    }
396
397    /// Returns true if this part should be compacted.
398    pub fn should_compact(&self) -> bool {
399        self.total_rows >= self.compact_threshold
400    }
401
402    /// Adds a BulkPart to this unordered collection.
403    pub fn push(&mut self, part: BulkPart) {
404        self.total_rows += part.num_rows();
405        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
406        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
407        self.max_sequence = self.max_sequence.max(part.sequence);
408        self.parts.push(part);
409    }
410
411    /// Returns the total number of rows across all parts.
412    pub fn num_rows(&self) -> usize {
413        self.total_rows
414    }
415
416    /// Returns true if there are no parts.
417    pub fn is_empty(&self) -> bool {
418        self.parts.is_empty()
419    }
420
421    /// Returns the number of parts in this collection.
422    pub fn num_parts(&self) -> usize {
423        self.parts.len()
424    }
425
426    /// Concatenates and sorts all parts into a single RecordBatch.
427    /// Returns None if the collection is empty.
428    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
429        if self.parts.is_empty() {
430            return Ok(None);
431        }
432
433        if self.parts.len() == 1 {
434            // If there's only one part, return its batch directly
435            return Ok(Some(self.parts[0].batch.clone()));
436        }
437
438        // Get the schema from the first part
439        let schema = self.parts[0].batch.schema();
440
441        // Concatenate all record batches
442        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
443        let concatenated =
444            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
445
446        // Sort the concatenated batch
447        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
448
449        Ok(Some(sorted_batch))
450    }
451
452    /// Converts all parts into a single sorted BulkPart.
453    /// Returns None if the collection is empty.
454    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
455        let Some(sorted_batch) = self.concat_and_sort()? else {
456            return Ok(None);
457        };
458
459        let timestamp_index = self.parts[0].timestamp_index;
460
461        Ok(Some(BulkPart {
462            batch: sorted_batch,
463            max_timestamp: self.max_timestamp,
464            min_timestamp: self.min_timestamp,
465            sequence: self.max_sequence,
466            timestamp_index,
467            raw_data: None,
468        }))
469    }
470
471    /// Clears all parts from this collection.
472    pub fn clear(&mut self) {
473        self.parts.clear();
474        self.total_rows = 0;
475        self.min_timestamp = i64::MAX;
476        self.max_timestamp = i64::MIN;
477        self.max_sequence = 0;
478    }
479}
480
481/// More accurate estimation of the size of a record batch.
482pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
483    batch
484        .columns()
485        .iter()
486        // If can not get slice memory size, assume 0 here.
487        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
488        .sum()
489}
490
491/// Primary key column builder for handling strings specially.
492enum PrimaryKeyColumnBuilder {
493    /// String dictionary builder for string types.
494    StringDict(StringDictionaryBuilder<UInt32Type>),
495    /// Generic mutable vector for other types.
496    Vector(Box<dyn MutableVector>),
497}
498
499impl PrimaryKeyColumnBuilder {
500    /// Appends a value to the builder.
501    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
502        match self {
503            PrimaryKeyColumnBuilder::StringDict(builder) => {
504                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
505                    // We know the value is a string.
506                    builder.append_value(s);
507                } else {
508                    builder.append_null();
509                }
510            }
511            PrimaryKeyColumnBuilder::Vector(builder) => {
512                builder.push_value_ref(&value);
513            }
514        }
515        Ok(())
516    }
517
518    /// Converts the builder to an ArrayRef.
519    fn into_arrow_array(self) -> ArrayRef {
520        match self {
521            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
522            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
523        }
524    }
525}
526
527/// Converter that converts structs into [BulkPart].
528pub struct BulkPartConverter {
529    /// Region metadata.
530    region_metadata: RegionMetadataRef,
531    /// Schema of the converted batch.
532    schema: SchemaRef,
533    /// Primary key codec for encoding keys
534    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
535    /// Buffer for encoding primary key.
536    key_buf: Vec<u8>,
537    /// Primary key array builder.
538    key_array_builder: PrimaryKeyArrayBuilder,
539    /// Builders for non-primary key columns.
540    value_builder: ValueBuilder,
541    /// Builders for individual primary key columns.
542    /// The order of builders is the same as the order of primary key columns in the region metadata.
543    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
544
545    /// Max timestamp value.
546    max_ts: i64,
547    /// Min timestamp value.
548    min_ts: i64,
549    /// Max sequence number.
550    max_sequence: SequenceNumber,
551}
552
553impl BulkPartConverter {
554    /// Creates a new converter.
555    ///
556    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
557    /// stores primary key columns in arrays additionally.
558    pub fn new(
559        region_metadata: &RegionMetadataRef,
560        schema: SchemaRef,
561        capacity: usize,
562        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
563        store_primary_key_columns: bool,
564    ) -> Self {
565        debug_assert_eq!(
566            region_metadata.primary_key_encoding,
567            primary_key_codec.encoding()
568        );
569
570        let primary_key_column_builders = if store_primary_key_columns
571            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
572        {
573            new_primary_key_column_builders(region_metadata, capacity)
574        } else {
575            Vec::new()
576        };
577
578        Self {
579            region_metadata: region_metadata.clone(),
580            schema,
581            primary_key_codec,
582            key_buf: Vec::new(),
583            key_array_builder: PrimaryKeyArrayBuilder::new(),
584            value_builder: ValueBuilder::new(region_metadata, capacity),
585            primary_key_column_builders,
586            min_ts: i64::MAX,
587            max_ts: i64::MIN,
588            max_sequence: SequenceNumber::MIN,
589        }
590    }
591
592    /// Appends a [KeyValues] into the converter.
593    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
594        for kv in key_values.iter() {
595            self.append_key_value(&kv)?;
596        }
597
598        Ok(())
599    }
600
601    /// Appends a [KeyValue] to builders.
602    ///
603    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
604    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
605        // Handles primary key based on encoding type
606        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
607            // For sparse encoding, the primary key is already encoded in the KeyValue
608            // Gets the first (and only) primary key value which contains the encoded key
609            let mut primary_keys = kv.primary_keys();
610            if let Some(encoded) = primary_keys
611                .next()
612                .context(ColumnNotFoundSnafu {
613                    column: PRIMARY_KEY_COLUMN_NAME,
614                })?
615                .try_into_binary()
616                .context(DataTypeMismatchSnafu)?
617            {
618                self.key_array_builder
619                    .append(encoded)
620                    .context(ComputeArrowSnafu)?;
621            } else {
622                self.key_array_builder
623                    .append("")
624                    .context(ComputeArrowSnafu)?;
625            }
626        } else {
627            // For dense encoding, we need to encode the primary key columns
628            self.key_buf.clear();
629            self.primary_key_codec
630                .encode_key_value(kv, &mut self.key_buf)
631                .context(EncodeSnafu)?;
632            self.key_array_builder
633                .append(&self.key_buf)
634                .context(ComputeArrowSnafu)?;
635        };
636
637        // If storing primary key columns, append values to individual builders
638        if !self.primary_key_column_builders.is_empty() {
639            for (builder, pk_value) in self
640                .primary_key_column_builders
641                .iter_mut()
642                .zip(kv.primary_keys())
643            {
644                builder.push_value_ref(pk_value)?;
645            }
646        }
647
648        // Pushes other columns.
649        self.value_builder.push(
650            kv.timestamp(),
651            kv.sequence(),
652            kv.op_type() as u8,
653            kv.fields(),
654        );
655
656        // Updates statistics
657        // Safety: timestamp of kv must be both present and a valid timestamp value.
658        let ts = kv
659            .timestamp()
660            .try_into_timestamp()
661            .unwrap()
662            .unwrap()
663            .value();
664        self.min_ts = self.min_ts.min(ts);
665        self.max_ts = self.max_ts.max(ts);
666        self.max_sequence = self.max_sequence.max(kv.sequence());
667
668        Ok(())
669    }
670
671    /// Converts buffered content into a [BulkPart].
672    ///
673    /// It sorts the record batch by (primary key, timestamp, sequence desc).
674    pub fn convert(mut self) -> Result<BulkPart> {
675        let values = Values::from(self.value_builder);
676        let mut columns =
677            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
678
679        // Build primary key column arrays if enabled.
680        for builder in self.primary_key_column_builders {
681            columns.push(builder.into_arrow_array());
682        }
683        // Then fields columns.
684        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
685        // Time index.
686        let timestamp_index = columns.len();
687        columns.push(values.timestamp.to_arrow_array());
688        // Primary key.
689        let pk_array = self.key_array_builder.finish();
690        columns.push(Arc::new(pk_array));
691        // Sequence and op type.
692        columns.push(values.sequence.to_arrow_array());
693        columns.push(values.op_type.to_arrow_array());
694
695        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
696        // Sorts the record batch.
697        let batch = sort_primary_key_record_batch(&batch)?;
698
699        Ok(BulkPart {
700            batch,
701            max_timestamp: self.max_ts,
702            min_timestamp: self.min_ts,
703            sequence: self.max_sequence,
704            timestamp_index,
705            raw_data: None,
706        })
707    }
708}
709
710fn new_primary_key_column_builders(
711    metadata: &RegionMetadata,
712    capacity: usize,
713) -> Vec<PrimaryKeyColumnBuilder> {
714    metadata
715        .primary_key_columns()
716        .map(|col| {
717            if col.column_schema.data_type.is_string() {
718                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
719                    capacity,
720                    INIT_DICT_VALUE_CAPACITY,
721                    capacity,
722                ))
723            } else {
724                PrimaryKeyColumnBuilder::Vector(
725                    col.column_schema.data_type.create_mutable_vector(capacity),
726                )
727            }
728        })
729        .collect()
730}
731
732/// Sorts the record batch with primary key format.
733pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
734    let total_columns = batch.num_columns();
735    let sort_columns = vec![
736        // Primary key column (ascending)
737        SortColumn {
738            values: batch.column(total_columns - 3).clone(),
739            options: Some(SortOptions {
740                descending: false,
741                nulls_first: true,
742            }),
743        },
744        // Time index column (ascending)
745        SortColumn {
746            values: batch.column(total_columns - 4).clone(),
747            options: Some(SortOptions {
748                descending: false,
749                nulls_first: true,
750            }),
751        },
752        // Sequence column (descending)
753        SortColumn {
754            values: batch.column(total_columns - 2).clone(),
755            options: Some(SortOptions {
756                descending: true,
757                nulls_first: true,
758            }),
759        },
760    ];
761
762    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
763        .context(ComputeArrowSnafu)?;
764
765    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
766}
767
768/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
769/// with the same format as produced by [BulkPartConverter].
770///
771/// This function takes a `BulkPart` where:
772/// - For dense encoding: Primary key columns may be stored as individual columns
773/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
774/// - The batch may not be sorted
775///
776/// And produces a `BulkPart` where:
777/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
778/// - An encoded `__primary_key` dictionary column is present
779/// - The batch is sorted by (primary_key, timestamp, sequence desc)
780///
781/// # Arguments
782///
783/// * `part` - The input `BulkPart` to convert
784/// * `region_metadata` - Region metadata containing schema information
785/// * `primary_key_codec` - Codec for encoding primary keys
786/// * `schema` - Target schema for the output batch
787/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
788///
789/// # Returns
790///
791/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
792/// encoded primary keys and sorted data.
793pub fn convert_bulk_part(
794    part: BulkPart,
795    region_metadata: &RegionMetadataRef,
796    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
797    schema: SchemaRef,
798    store_primary_key_columns: bool,
799) -> Result<Option<BulkPart>> {
800    if part.num_rows() == 0 {
801        return Ok(None);
802    }
803
804    let num_rows = part.num_rows();
805    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
806
807    // Builds a column name-to-index map for efficient lookups
808    let input_schema = part.batch.schema();
809    let column_indices: HashMap<&str, usize> = input_schema
810        .fields()
811        .iter()
812        .enumerate()
813        .map(|(idx, field)| (field.name().as_str(), idx))
814        .collect();
815
816    // Determines the structure of the input batch by looking up columns by name
817    let mut output_columns = Vec::new();
818
819    // Extracts primary key columns if we need to encode them (dense encoding)
820    let pk_array = if is_sparse {
821        // For sparse encoding, the input should already have the __primary_key column
822        // We need to find it in the input batch
823        None
824    } else {
825        // For dense encoding, extract and encode primary key columns by name
826        let pk_vectors: Result<Vec<_>> = region_metadata
827            .primary_key_columns()
828            .map(|col_meta| {
829                let col_idx = column_indices
830                    .get(col_meta.column_schema.name.as_str())
831                    .context(ColumnNotFoundSnafu {
832                        column: &col_meta.column_schema.name,
833                    })?;
834                let col = part.batch.column(*col_idx);
835                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
836            })
837            .collect();
838        let pk_vectors = pk_vectors?;
839
840        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
841        let mut encode_buf = Vec::new();
842
843        for row_idx in 0..num_rows {
844            encode_buf.clear();
845
846            // Collects primary key values with column IDs for this row
847            let pk_values_with_ids: Vec<_> = region_metadata
848                .primary_key
849                .iter()
850                .zip(pk_vectors.iter())
851                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
852                .collect();
853
854            // Encodes the primary key
855            primary_key_codec
856                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
857                .context(EncodeSnafu)?;
858
859            key_array_builder
860                .append(&encode_buf)
861                .context(ComputeArrowSnafu)?;
862        }
863
864        Some(key_array_builder.finish())
865    };
866
867    // Adds primary key columns if storing them (only for dense encoding)
868    if store_primary_key_columns && !is_sparse {
869        for col_meta in region_metadata.primary_key_columns() {
870            let col_idx = column_indices
871                .get(col_meta.column_schema.name.as_str())
872                .context(ColumnNotFoundSnafu {
873                    column: &col_meta.column_schema.name,
874                })?;
875            let col = part.batch.column(*col_idx);
876
877            // Converts to dictionary if needed for string types
878            let col = if col_meta.column_schema.data_type.is_string() {
879                let target_type = ArrowDataType::Dictionary(
880                    Box::new(ArrowDataType::UInt32),
881                    Box::new(ArrowDataType::Utf8),
882                );
883                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
884            } else {
885                col.clone()
886            };
887            output_columns.push(col);
888        }
889    }
890
891    // Adds field columns
892    for col_meta in region_metadata.field_columns() {
893        let col_idx = column_indices
894            .get(col_meta.column_schema.name.as_str())
895            .context(ColumnNotFoundSnafu {
896                column: &col_meta.column_schema.name,
897            })?;
898        output_columns.push(part.batch.column(*col_idx).clone());
899    }
900
901    // Adds timestamp column
902    let new_timestamp_index = output_columns.len();
903    let ts_col_idx = column_indices
904        .get(
905            region_metadata
906                .time_index_column()
907                .column_schema
908                .name
909                .as_str(),
910        )
911        .context(ColumnNotFoundSnafu {
912            column: &region_metadata.time_index_column().column_schema.name,
913        })?;
914    output_columns.push(part.batch.column(*ts_col_idx).clone());
915
916    // Adds encoded primary key dictionary column
917    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
918        Arc::new(pk_dict_array) as ArrayRef
919    } else {
920        let pk_col_idx =
921            column_indices
922                .get(PRIMARY_KEY_COLUMN_NAME)
923                .context(ColumnNotFoundSnafu {
924                    column: PRIMARY_KEY_COLUMN_NAME,
925                })?;
926        let col = part.batch.column(*pk_col_idx);
927
928        // Casts to dictionary type if needed
929        let target_type = ArrowDataType::Dictionary(
930            Box::new(ArrowDataType::UInt32),
931            Box::new(ArrowDataType::Binary),
932        );
933        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
934    };
935    output_columns.push(pk_dictionary);
936
937    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
938    output_columns.push(Arc::new(sequence_array) as ArrayRef);
939
940    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
941    output_columns.push(Arc::new(op_type_array) as ArrayRef);
942
943    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
944
945    // Sorts the batch by (primary_key, timestamp, sequence desc)
946    let sorted_batch = sort_primary_key_record_batch(&batch)?;
947
948    Ok(Some(BulkPart {
949        batch: sorted_batch,
950        max_timestamp: part.max_timestamp,
951        min_timestamp: part.min_timestamp,
952        sequence: part.sequence,
953        timestamp_index: new_timestamp_index,
954        raw_data: None,
955    }))
956}
957
958#[derive(Debug, Clone)]
959pub struct EncodedBulkPart {
960    data: Bytes,
961    metadata: BulkPartMeta,
962}
963
964impl EncodedBulkPart {
965    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
966        Self { data, metadata }
967    }
968
969    pub(crate) fn metadata(&self) -> &BulkPartMeta {
970        &self.metadata
971    }
972
973    /// Returns the size of the encoded data in bytes
974    pub(crate) fn size_bytes(&self) -> usize {
975        self.data.len()
976    }
977
978    /// Returns the encoded data.
979    pub(crate) fn data(&self) -> &Bytes {
980        &self.data
981    }
982
983    /// Creates MemtableStats from this EncodedBulkPart.
984    pub fn to_memtable_stats(&self) -> MemtableStats {
985        let meta = &self.metadata;
986        let ts_type = meta.region_metadata.time_index_type();
987        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
988        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
989
990        MemtableStats {
991            estimated_bytes: self.size_bytes(),
992            time_range: Some((min_ts, max_ts)),
993            num_rows: meta.num_rows,
994            num_ranges: 1,
995            max_sequence: meta.max_sequence,
996            series_count: meta.num_series as usize,
997        }
998    }
999
1000    /// Converts this `EncodedBulkPart` to `SstInfo`.
1001    ///
1002    /// # Arguments
1003    /// * `file_id` - The SST file ID to assign to this part
1004    ///
1005    /// # Returns
1006    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1007    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1008        let unit = self.metadata.region_metadata.time_index_type().unit();
1009        let max_row_group_uncompressed_size: u64 = self
1010            .metadata
1011            .parquet_metadata
1012            .row_groups()
1013            .iter()
1014            .map(|rg| {
1015                rg.columns()
1016                    .iter()
1017                    .map(|c| c.uncompressed_size() as u64)
1018                    .sum::<u64>()
1019            })
1020            .max()
1021            .unwrap_or(0);
1022        SstInfo {
1023            file_id,
1024            time_range: (
1025                Timestamp::new(self.metadata.min_timestamp, unit),
1026                Timestamp::new(self.metadata.max_timestamp, unit),
1027            ),
1028            file_size: self.data.len() as u64,
1029            max_row_group_uncompressed_size,
1030            num_rows: self.metadata.num_rows,
1031            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1032            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1033            index_metadata: IndexOutput::default(),
1034            num_series: self.metadata.num_series,
1035        }
1036    }
1037
1038    pub(crate) fn read(
1039        &self,
1040        context: BulkIterContextRef,
1041        sequence: Option<SequenceRange>,
1042        mem_scan_metrics: Option<MemScanMetrics>,
1043    ) -> Result<Option<BoxedRecordBatchIterator>> {
1044        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
1045        let skip_fields_for_pruning =
1046            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1047
1048        // use predicate to find row groups to read.
1049        let row_groups_to_read =
1050            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1051
1052        if row_groups_to_read.is_empty() {
1053            // All row groups are filtered.
1054            return Ok(None);
1055        }
1056
1057        let iter = EncodedBulkPartIter::try_new(
1058            self,
1059            context,
1060            row_groups_to_read,
1061            sequence,
1062            mem_scan_metrics,
1063        )?;
1064        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1065    }
1066
1067    /// Computes whether to skip field columns based on PreFilterMode.
1068    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1069        match pre_filter_mode {
1070            PreFilterMode::All => false,
1071            PreFilterMode::SkipFields => true,
1072            PreFilterMode::SkipFieldsOnDelete => {
1073                // Check if any row group contains delete op
1074                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1075                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1076                })
1077            }
1078        }
1079    }
1080}
1081
1082#[derive(Debug, Clone)]
1083pub struct BulkPartMeta {
1084    /// Total rows in part.
1085    pub num_rows: usize,
1086    /// Max timestamp in part.
1087    pub max_timestamp: i64,
1088    /// Min timestamp in part.
1089    pub min_timestamp: i64,
1090    /// Part file metadata.
1091    pub parquet_metadata: Arc<ParquetMetaData>,
1092    /// Part region schema.
1093    pub region_metadata: RegionMetadataRef,
1094    /// Number of series.
1095    pub num_series: u64,
1096    /// Maximum sequence number in part.
1097    pub max_sequence: u64,
1098}
1099
1100/// Metrics for encoding a part.
1101#[derive(Default, Debug)]
1102pub struct BulkPartEncodeMetrics {
1103    /// Cost of iterating over the data.
1104    pub iter_cost: Duration,
1105    /// Cost of writing the data.
1106    pub write_cost: Duration,
1107    /// Size of data before encoding.
1108    pub raw_size: usize,
1109    /// Size of data after encoding.
1110    pub encoded_size: usize,
1111    /// Number of rows in part.
1112    pub num_rows: usize,
1113}
1114
1115pub struct BulkPartEncoder {
1116    metadata: RegionMetadataRef,
1117    row_group_size: usize,
1118    writer_props: Option<WriterProperties>,
1119}
1120
1121impl BulkPartEncoder {
1122    pub(crate) fn new(
1123        metadata: RegionMetadataRef,
1124        row_group_size: usize,
1125    ) -> Result<BulkPartEncoder> {
1126        // TODO(yingwen): Skip arrow schema if needed.
1127        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1128        let key_value_meta =
1129            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1130
1131        // TODO(yingwen): Do we need compression?
1132        let writer_props = Some(
1133            WriterProperties::builder()
1134                .set_key_value_metadata(Some(vec![key_value_meta]))
1135                .set_write_batch_size(row_group_size)
1136                .set_max_row_group_size(row_group_size)
1137                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1138                .set_column_index_truncate_length(None)
1139                .set_statistics_truncate_length(None)
1140                .build(),
1141        );
1142
1143        Ok(Self {
1144            metadata,
1145            row_group_size,
1146            writer_props,
1147        })
1148    }
1149}
1150
1151impl BulkPartEncoder {
1152    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1153    pub fn encode_record_batch_iter(
1154        &self,
1155        iter: BoxedRecordBatchIterator,
1156        arrow_schema: SchemaRef,
1157        min_timestamp: i64,
1158        max_timestamp: i64,
1159        max_sequence: u64,
1160        metrics: &mut BulkPartEncodeMetrics,
1161    ) -> Result<Option<EncodedBulkPart>> {
1162        let mut buf = Vec::with_capacity(4096);
1163        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1164            .context(EncodeMemtableSnafu)?;
1165        let mut total_rows = 0;
1166        let mut series_estimator = SeriesEstimator::default();
1167
1168        // Process each batch from the iterator
1169        let mut iter_start = Instant::now();
1170        for batch_result in iter {
1171            metrics.iter_cost += iter_start.elapsed();
1172            let batch = batch_result?;
1173            if batch.num_rows() == 0 {
1174                continue;
1175            }
1176
1177            series_estimator.update_flat(&batch);
1178            metrics.raw_size += record_batch_estimated_size(&batch);
1179            let write_start = Instant::now();
1180            writer.write(&batch).context(EncodeMemtableSnafu)?;
1181            metrics.write_cost += write_start.elapsed();
1182            total_rows += batch.num_rows();
1183            iter_start = Instant::now();
1184        }
1185        metrics.iter_cost += iter_start.elapsed();
1186        iter_start = Instant::now();
1187
1188        if total_rows == 0 {
1189            return Ok(None);
1190        }
1191
1192        let close_start = Instant::now();
1193        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1194        metrics.write_cost += close_start.elapsed();
1195        metrics.encoded_size += buf.len();
1196        metrics.num_rows += total_rows;
1197
1198        let buf = Bytes::from(buf);
1199        let parquet_metadata = Arc::new(file_metadata);
1200        let num_series = series_estimator.finish();
1201
1202        Ok(Some(EncodedBulkPart {
1203            data: buf,
1204            metadata: BulkPartMeta {
1205                num_rows: total_rows,
1206                max_timestamp,
1207                min_timestamp,
1208                parquet_metadata,
1209                region_metadata: self.metadata.clone(),
1210                num_series,
1211                max_sequence,
1212            },
1213        }))
1214    }
1215
1216    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1217    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1218        if part.batch.num_rows() == 0 {
1219            return Ok(None);
1220        }
1221
1222        let mut buf = Vec::with_capacity(4096);
1223        let arrow_schema = part.batch.schema();
1224
1225        let file_metadata = {
1226            let mut writer =
1227                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1228                    .context(EncodeMemtableSnafu)?;
1229            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1230            writer.finish().context(EncodeMemtableSnafu)?
1231        };
1232
1233        let buf = Bytes::from(buf);
1234        let parquet_metadata = Arc::new(file_metadata);
1235
1236        Ok(Some(EncodedBulkPart {
1237            data: buf,
1238            metadata: BulkPartMeta {
1239                num_rows: part.batch.num_rows(),
1240                max_timestamp: part.max_timestamp,
1241                min_timestamp: part.min_timestamp,
1242                parquet_metadata,
1243                region_metadata: self.metadata.clone(),
1244                num_series: part.estimated_series_count() as u64,
1245                max_sequence: part.sequence,
1246            },
1247        }))
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use api::v1::{Row, SemanticType, WriteHint};
1254    use datafusion_common::ScalarValue;
1255    use datatypes::arrow::array::Float64Array;
1256    use datatypes::prelude::{ConcreteDataType, Value};
1257    use datatypes::schema::ColumnSchema;
1258    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1259    use store_api::storage::RegionId;
1260    use store_api::storage::consts::ReservedColumnId;
1261
1262    use super::*;
1263    use crate::memtable::bulk::context::BulkIterContext;
1264    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1265    use crate::test_util::memtable_util::{
1266        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1267    };
1268
1269    struct MutationInput<'a> {
1270        k0: &'a str,
1271        k1: u32,
1272        timestamps: &'a [i64],
1273        v1: &'a [Option<f64>],
1274        sequence: u64,
1275    }
1276
1277    #[derive(Debug, PartialOrd, PartialEq)]
1278    struct BatchOutput<'a> {
1279        pk_values: &'a [Value],
1280        timestamps: &'a [i64],
1281        v1: &'a [Option<f64>],
1282    }
1283
1284    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1285        let metadata = metadata_for_test();
1286        let kvs = input
1287            .iter()
1288            .map(|m| {
1289                build_key_values_with_ts_seq_values(
1290                    &metadata,
1291                    m.k0.to_string(),
1292                    m.k1,
1293                    m.timestamps.iter().copied(),
1294                    m.v1.iter().copied(),
1295                    m.sequence,
1296                )
1297            })
1298            .collect::<Vec<_>>();
1299        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1300        let primary_key_codec = build_primary_key_codec(&metadata);
1301        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1302        for kv in kvs {
1303            converter.append_key_values(&kv).unwrap();
1304        }
1305        let part = converter.convert().unwrap();
1306        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1307        encoder.encode_part(&part).unwrap().unwrap()
1308    }
1309
1310    #[test]
1311    fn test_write_and_read_part_projection() {
1312        let part = encode(&[
1313            MutationInput {
1314                k0: "a",
1315                k1: 0,
1316                timestamps: &[1],
1317                v1: &[Some(0.1)],
1318                sequence: 0,
1319            },
1320            MutationInput {
1321                k0: "b",
1322                k1: 0,
1323                timestamps: &[1],
1324                v1: &[Some(0.0)],
1325                sequence: 0,
1326            },
1327            MutationInput {
1328                k0: "a",
1329                k1: 0,
1330                timestamps: &[2],
1331                v1: &[Some(0.2)],
1332                sequence: 1,
1333            },
1334        ]);
1335
1336        let projection = &[4u32];
1337        let mut reader = part
1338            .read(
1339                Arc::new(
1340                    BulkIterContext::new(
1341                        part.metadata.region_metadata.clone(),
1342                        Some(projection.as_slice()),
1343                        None,
1344                        false,
1345                    )
1346                    .unwrap(),
1347                ),
1348                None,
1349                None,
1350            )
1351            .unwrap()
1352            .expect("expect at least one row group");
1353
1354        let mut total_rows_read = 0;
1355        let mut field: Vec<f64> = vec![];
1356        for res in reader {
1357            let batch = res.unwrap();
1358            assert_eq!(5, batch.num_columns());
1359            field.extend_from_slice(
1360                batch
1361                    .column(0)
1362                    .as_any()
1363                    .downcast_ref::<Float64Array>()
1364                    .unwrap()
1365                    .values(),
1366            );
1367            total_rows_read += batch.num_rows();
1368        }
1369        assert_eq!(3, total_rows_read);
1370        assert_eq!(vec![0.1, 0.2, 0.0], field);
1371    }
1372
1373    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1374        let metadata = metadata_for_test();
1375        let kvs = key_values
1376            .into_iter()
1377            .map(|(k0, k1, (start, end), sequence)| {
1378                let ts = (start..end);
1379                let v1 = (start..end).map(|_| None);
1380                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1381            })
1382            .collect::<Vec<_>>();
1383        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1384        let primary_key_codec = build_primary_key_codec(&metadata);
1385        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1386        for kv in kvs {
1387            converter.append_key_values(&kv).unwrap();
1388        }
1389        let part = converter.convert().unwrap();
1390        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1391        encoder.encode_part(&part).unwrap().unwrap()
1392    }
1393
1394    fn check_prune_row_group(
1395        part: &EncodedBulkPart,
1396        predicate: Option<Predicate>,
1397        expected_rows: usize,
1398    ) {
1399        let context = Arc::new(
1400            BulkIterContext::new(
1401                part.metadata.region_metadata.clone(),
1402                None,
1403                predicate,
1404                false,
1405            )
1406            .unwrap(),
1407        );
1408        let mut reader = part
1409            .read(context, None, None)
1410            .unwrap()
1411            .expect("expect at least one row group");
1412        let mut total_rows_read = 0;
1413        for res in reader {
1414            let batch = res.unwrap();
1415            total_rows_read += batch.num_rows();
1416        }
1417        // Should only read row group 1.
1418        assert_eq!(expected_rows, total_rows_read);
1419    }
1420
1421    #[test]
1422    fn test_prune_row_groups() {
1423        let part = prepare(vec![
1424            ("a", 0, (0, 40), 1),
1425            ("a", 1, (0, 60), 1),
1426            ("b", 0, (0, 100), 2),
1427            ("b", 1, (100, 180), 3),
1428            ("b", 1, (180, 210), 4),
1429        ]);
1430
1431        let context = Arc::new(
1432            BulkIterContext::new(
1433                part.metadata.region_metadata.clone(),
1434                None,
1435                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1436                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1437                )])),
1438                false,
1439            )
1440            .unwrap(),
1441        );
1442        assert!(part.read(context, None, None).unwrap().is_none());
1443
1444        check_prune_row_group(&part, None, 310);
1445
1446        check_prune_row_group(
1447            &part,
1448            Some(Predicate::new(vec![
1449                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1450                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1451            ])),
1452            40,
1453        );
1454
1455        check_prune_row_group(
1456            &part,
1457            Some(Predicate::new(vec![
1458                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1459                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1460            ])),
1461            60,
1462        );
1463
1464        check_prune_row_group(
1465            &part,
1466            Some(Predicate::new(vec![
1467                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1468            ])),
1469            100,
1470        );
1471
1472        check_prune_row_group(
1473            &part,
1474            Some(Predicate::new(vec![
1475                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1476                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1477            ])),
1478            100,
1479        );
1480
1481        /// Predicates over field column can do precise filtering.
1482        check_prune_row_group(
1483            &part,
1484            Some(Predicate::new(vec![
1485                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1486            ])),
1487            1,
1488        );
1489    }
1490
1491    #[test]
1492    fn test_bulk_part_converter_append_and_convert() {
1493        let metadata = metadata_for_test();
1494        let capacity = 100;
1495        let primary_key_codec = build_primary_key_codec(&metadata);
1496        let schema = to_flat_sst_arrow_schema(
1497            &metadata,
1498            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1499        );
1500
1501        let mut converter =
1502            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1503
1504        let key_values1 = build_key_values_with_ts_seq_values(
1505            &metadata,
1506            "key1".to_string(),
1507            1u32,
1508            vec![1000, 2000].into_iter(),
1509            vec![Some(1.0), Some(2.0)].into_iter(),
1510            1,
1511        );
1512
1513        let key_values2 = build_key_values_with_ts_seq_values(
1514            &metadata,
1515            "key2".to_string(),
1516            2u32,
1517            vec![1500].into_iter(),
1518            vec![Some(3.0)].into_iter(),
1519            2,
1520        );
1521
1522        converter.append_key_values(&key_values1).unwrap();
1523        converter.append_key_values(&key_values2).unwrap();
1524
1525        let bulk_part = converter.convert().unwrap();
1526
1527        assert_eq!(bulk_part.num_rows(), 3);
1528        assert_eq!(bulk_part.min_timestamp, 1000);
1529        assert_eq!(bulk_part.max_timestamp, 2000);
1530        assert_eq!(bulk_part.sequence, 2);
1531        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1532
1533        // Validate primary key columns are stored
1534        // Schema should include primary key columns k0 and k1 at the beginning
1535        let schema = bulk_part.batch.schema();
1536        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1537        assert_eq!(
1538            field_names,
1539            vec![
1540                "k0",
1541                "k1",
1542                "v0",
1543                "v1",
1544                "ts",
1545                "__primary_key",
1546                "__sequence",
1547                "__op_type"
1548            ]
1549        );
1550    }
1551
1552    #[test]
1553    fn test_bulk_part_converter_sorting() {
1554        let metadata = metadata_for_test();
1555        let capacity = 100;
1556        let primary_key_codec = build_primary_key_codec(&metadata);
1557        let schema = to_flat_sst_arrow_schema(
1558            &metadata,
1559            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1560        );
1561
1562        let mut converter =
1563            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1564
1565        let key_values1 = build_key_values_with_ts_seq_values(
1566            &metadata,
1567            "z_key".to_string(),
1568            3u32,
1569            vec![3000].into_iter(),
1570            vec![Some(3.0)].into_iter(),
1571            3,
1572        );
1573
1574        let key_values2 = build_key_values_with_ts_seq_values(
1575            &metadata,
1576            "a_key".to_string(),
1577            1u32,
1578            vec![1000].into_iter(),
1579            vec![Some(1.0)].into_iter(),
1580            1,
1581        );
1582
1583        let key_values3 = build_key_values_with_ts_seq_values(
1584            &metadata,
1585            "m_key".to_string(),
1586            2u32,
1587            vec![2000].into_iter(),
1588            vec![Some(2.0)].into_iter(),
1589            2,
1590        );
1591
1592        converter.append_key_values(&key_values1).unwrap();
1593        converter.append_key_values(&key_values2).unwrap();
1594        converter.append_key_values(&key_values3).unwrap();
1595
1596        let bulk_part = converter.convert().unwrap();
1597
1598        assert_eq!(bulk_part.num_rows(), 3);
1599
1600        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1601        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1602
1603        let ts_array = ts_column
1604            .as_any()
1605            .downcast_ref::<TimestampMillisecondArray>()
1606            .unwrap();
1607        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1608
1609        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1610        assert_eq!(seq_array.values(), &[1, 2, 3]);
1611
1612        // Validate primary key columns are stored
1613        let schema = bulk_part.batch.schema();
1614        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1615        assert_eq!(
1616            field_names,
1617            vec![
1618                "k0",
1619                "k1",
1620                "v0",
1621                "v1",
1622                "ts",
1623                "__primary_key",
1624                "__sequence",
1625                "__op_type"
1626            ]
1627        );
1628    }
1629
1630    #[test]
1631    fn test_bulk_part_converter_empty() {
1632        let metadata = metadata_for_test();
1633        let capacity = 10;
1634        let primary_key_codec = build_primary_key_codec(&metadata);
1635        let schema = to_flat_sst_arrow_schema(
1636            &metadata,
1637            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1638        );
1639
1640        let converter =
1641            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1642
1643        let bulk_part = converter.convert().unwrap();
1644
1645        assert_eq!(bulk_part.num_rows(), 0);
1646        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1647        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1648        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1649
1650        // Validate primary key columns are present in schema even for empty batch
1651        let schema = bulk_part.batch.schema();
1652        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1653        assert_eq!(
1654            field_names,
1655            vec![
1656                "k0",
1657                "k1",
1658                "v0",
1659                "v1",
1660                "ts",
1661                "__primary_key",
1662                "__sequence",
1663                "__op_type"
1664            ]
1665        );
1666    }
1667
1668    #[test]
1669    fn test_bulk_part_converter_without_primary_key_columns() {
1670        let metadata = metadata_for_test();
1671        let primary_key_codec = build_primary_key_codec(&metadata);
1672        let schema = to_flat_sst_arrow_schema(
1673            &metadata,
1674            &FlatSchemaOptions {
1675                raw_pk_columns: false,
1676                string_pk_use_dict: true,
1677            },
1678        );
1679
1680        let capacity = 100;
1681        let mut converter =
1682            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1683
1684        let key_values1 = build_key_values_with_ts_seq_values(
1685            &metadata,
1686            "key1".to_string(),
1687            1u32,
1688            vec![1000, 2000].into_iter(),
1689            vec![Some(1.0), Some(2.0)].into_iter(),
1690            1,
1691        );
1692
1693        let key_values2 = build_key_values_with_ts_seq_values(
1694            &metadata,
1695            "key2".to_string(),
1696            2u32,
1697            vec![1500].into_iter(),
1698            vec![Some(3.0)].into_iter(),
1699            2,
1700        );
1701
1702        converter.append_key_values(&key_values1).unwrap();
1703        converter.append_key_values(&key_values2).unwrap();
1704
1705        let bulk_part = converter.convert().unwrap();
1706
1707        assert_eq!(bulk_part.num_rows(), 3);
1708        assert_eq!(bulk_part.min_timestamp, 1000);
1709        assert_eq!(bulk_part.max_timestamp, 2000);
1710        assert_eq!(bulk_part.sequence, 2);
1711        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1712
1713        // Validate primary key columns are NOT stored individually
1714        let schema = bulk_part.batch.schema();
1715        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1716        assert_eq!(
1717            field_names,
1718            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1719        );
1720    }
1721
1722    #[allow(clippy::too_many_arguments)]
1723    fn build_key_values_with_sparse_encoding(
1724        metadata: &RegionMetadataRef,
1725        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1726        table_id: u32,
1727        tsid: u64,
1728        k0: String,
1729        k1: String,
1730        timestamps: impl Iterator<Item = i64>,
1731        values: impl Iterator<Item = Option<f64>>,
1732        sequence: SequenceNumber,
1733    ) -> KeyValues {
1734        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1735        let pk_values = vec![
1736            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1737            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1738            (0, Value::String(k0.clone().into())),
1739            (1, Value::String(k1.clone().into())),
1740        ];
1741        let mut encoded_key = Vec::new();
1742        primary_key_codec
1743            .encode_values(&pk_values, &mut encoded_key)
1744            .unwrap();
1745        assert!(!encoded_key.is_empty());
1746
1747        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1748        let column_schema = vec![
1749            api::v1::ColumnSchema {
1750                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1751                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1752                    ConcreteDataType::binary_datatype(),
1753                )
1754                .unwrap()
1755                .datatype() as i32,
1756                semantic_type: api::v1::SemanticType::Tag as i32,
1757                ..Default::default()
1758            },
1759            api::v1::ColumnSchema {
1760                column_name: "ts".to_string(),
1761                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1762                    ConcreteDataType::timestamp_millisecond_datatype(),
1763                )
1764                .unwrap()
1765                .datatype() as i32,
1766                semantic_type: api::v1::SemanticType::Timestamp as i32,
1767                ..Default::default()
1768            },
1769            api::v1::ColumnSchema {
1770                column_name: "v0".to_string(),
1771                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1772                    ConcreteDataType::int64_datatype(),
1773                )
1774                .unwrap()
1775                .datatype() as i32,
1776                semantic_type: api::v1::SemanticType::Field as i32,
1777                ..Default::default()
1778            },
1779            api::v1::ColumnSchema {
1780                column_name: "v1".to_string(),
1781                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1782                    ConcreteDataType::float64_datatype(),
1783                )
1784                .unwrap()
1785                .datatype() as i32,
1786                semantic_type: api::v1::SemanticType::Field as i32,
1787                ..Default::default()
1788            },
1789        ];
1790
1791        let rows = timestamps
1792            .zip(values)
1793            .map(|(ts, v)| Row {
1794                values: vec![
1795                    api::v1::Value {
1796                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1797                            encoded_key.clone(),
1798                        )),
1799                    },
1800                    api::v1::Value {
1801                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1802                    },
1803                    api::v1::Value {
1804                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1805                    },
1806                    api::v1::Value {
1807                        value_data: v.map(api::v1::value::ValueData::F64Value),
1808                    },
1809                ],
1810            })
1811            .collect();
1812
1813        let mutation = api::v1::Mutation {
1814            op_type: 1,
1815            sequence,
1816            rows: Some(api::v1::Rows {
1817                schema: column_schema,
1818                rows,
1819            }),
1820            write_hint: Some(WriteHint {
1821                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1822            }),
1823        };
1824        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1825    }
1826
1827    #[test]
1828    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1829        use api::v1::SemanticType;
1830        use datatypes::schema::ColumnSchema;
1831        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1832        use store_api::storage::RegionId;
1833
1834        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1835        builder
1836            .push_column_metadata(ColumnMetadata {
1837                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1838                semantic_type: SemanticType::Tag,
1839                column_id: 0,
1840            })
1841            .push_column_metadata(ColumnMetadata {
1842                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1843                semantic_type: SemanticType::Tag,
1844                column_id: 1,
1845            })
1846            .push_column_metadata(ColumnMetadata {
1847                column_schema: ColumnSchema::new(
1848                    "ts",
1849                    ConcreteDataType::timestamp_millisecond_datatype(),
1850                    false,
1851                ),
1852                semantic_type: SemanticType::Timestamp,
1853                column_id: 2,
1854            })
1855            .push_column_metadata(ColumnMetadata {
1856                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1857                semantic_type: SemanticType::Field,
1858                column_id: 3,
1859            })
1860            .push_column_metadata(ColumnMetadata {
1861                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1862                semantic_type: SemanticType::Field,
1863                column_id: 4,
1864            })
1865            .primary_key(vec![0, 1])
1866            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1867        let metadata = Arc::new(builder.build().unwrap());
1868
1869        let primary_key_codec = build_primary_key_codec(&metadata);
1870        let schema = to_flat_sst_arrow_schema(
1871            &metadata,
1872            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1873        );
1874
1875        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1876        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1877
1878        let capacity = 100;
1879        let mut converter =
1880            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1881
1882        let key_values1 = build_key_values_with_sparse_encoding(
1883            &metadata,
1884            &primary_key_codec,
1885            2048u32, // table_id
1886            100u64,  // tsid
1887            "key11".to_string(),
1888            "key21".to_string(),
1889            vec![1000, 2000].into_iter(),
1890            vec![Some(1.0), Some(2.0)].into_iter(),
1891            1,
1892        );
1893
1894        let key_values2 = build_key_values_with_sparse_encoding(
1895            &metadata,
1896            &primary_key_codec,
1897            4096u32, // table_id
1898            200u64,  // tsid
1899            "key12".to_string(),
1900            "key22".to_string(),
1901            vec![1500].into_iter(),
1902            vec![Some(3.0)].into_iter(),
1903            2,
1904        );
1905
1906        converter.append_key_values(&key_values1).unwrap();
1907        converter.append_key_values(&key_values2).unwrap();
1908
1909        let bulk_part = converter.convert().unwrap();
1910
1911        assert_eq!(bulk_part.num_rows(), 3);
1912        assert_eq!(bulk_part.min_timestamp, 1000);
1913        assert_eq!(bulk_part.max_timestamp, 2000);
1914        assert_eq!(bulk_part.sequence, 2);
1915        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1916
1917        // For sparse encoding, primary key columns should NOT be stored individually
1918        // even when store_primary_key_columns is true, because sparse encoding
1919        // stores the encoded primary key in the __primary_key column
1920        let schema = bulk_part.batch.schema();
1921        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1922        assert_eq!(
1923            field_names,
1924            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1925        );
1926
1927        // Verify the __primary_key column contains encoded sparse keys
1928        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1929        let dict_array = primary_key_column
1930            .as_any()
1931            .downcast_ref::<DictionaryArray<UInt32Type>>()
1932            .unwrap();
1933
1934        // Should have non-zero entries indicating encoded primary keys
1935        assert!(!dict_array.is_empty());
1936        assert_eq!(dict_array.len(), 3); // 3 rows total
1937
1938        // Verify values are properly encoded binary data (not empty)
1939        let values = dict_array
1940            .values()
1941            .as_any()
1942            .downcast_ref::<BinaryArray>()
1943            .unwrap();
1944        for i in 0..values.len() {
1945            assert!(
1946                !values.value(i).is_empty(),
1947                "Encoded primary key should not be empty"
1948            );
1949        }
1950    }
1951
1952    #[test]
1953    fn test_convert_bulk_part_empty() {
1954        let metadata = metadata_for_test();
1955        let schema = to_flat_sst_arrow_schema(
1956            &metadata,
1957            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1958        );
1959        let primary_key_codec = build_primary_key_codec(&metadata);
1960
1961        // Create empty batch
1962        let empty_batch = RecordBatch::new_empty(schema.clone());
1963        let empty_part = BulkPart {
1964            batch: empty_batch,
1965            max_timestamp: 0,
1966            min_timestamp: 0,
1967            sequence: 0,
1968            timestamp_index: 0,
1969            raw_data: None,
1970        };
1971
1972        let result =
1973            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1974        assert!(result.is_none());
1975    }
1976
1977    #[test]
1978    fn test_convert_bulk_part_dense_with_pk_columns() {
1979        let metadata = metadata_for_test();
1980        let primary_key_codec = build_primary_key_codec(&metadata);
1981
1982        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1983            "key1", "key2", "key1",
1984        ]));
1985        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1986        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1987        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1988        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1989
1990        let input_schema = Arc::new(Schema::new(vec![
1991            Field::new("k0", ArrowDataType::Utf8, false),
1992            Field::new("k1", ArrowDataType::UInt32, false),
1993            Field::new("v0", ArrowDataType::Int64, true),
1994            Field::new("v1", ArrowDataType::Float64, true),
1995            Field::new(
1996                "ts",
1997                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1998                false,
1999            ),
2000        ]));
2001
2002        let input_batch = RecordBatch::try_new(
2003            input_schema,
2004            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2005        )
2006        .unwrap();
2007
2008        let part = BulkPart {
2009            batch: input_batch,
2010            max_timestamp: 2000,
2011            min_timestamp: 1000,
2012            sequence: 5,
2013            timestamp_index: 4,
2014            raw_data: None,
2015        };
2016
2017        let output_schema = to_flat_sst_arrow_schema(
2018            &metadata,
2019            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2020        );
2021
2022        let result = convert_bulk_part(
2023            part,
2024            &metadata,
2025            primary_key_codec,
2026            output_schema,
2027            true, // store primary key columns
2028        )
2029        .unwrap();
2030
2031        let converted = result.unwrap();
2032
2033        assert_eq!(converted.num_rows(), 3);
2034        assert_eq!(converted.max_timestamp, 2000);
2035        assert_eq!(converted.min_timestamp, 1000);
2036        assert_eq!(converted.sequence, 5);
2037
2038        let schema = converted.batch.schema();
2039        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2040        assert_eq!(
2041            field_names,
2042            vec![
2043                "k0",
2044                "k1",
2045                "v0",
2046                "v1",
2047                "ts",
2048                "__primary_key",
2049                "__sequence",
2050                "__op_type"
2051            ]
2052        );
2053
2054        let k0_col = converted.batch.column_by_name("k0").unwrap();
2055        assert!(matches!(
2056            k0_col.data_type(),
2057            ArrowDataType::Dictionary(_, _)
2058        ));
2059
2060        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2061        let dict_array = pk_col
2062            .as_any()
2063            .downcast_ref::<DictionaryArray<UInt32Type>>()
2064            .unwrap();
2065        let keys = dict_array.keys();
2066
2067        assert_eq!(keys.len(), 3);
2068    }
2069
2070    #[test]
2071    fn test_convert_bulk_part_dense_without_pk_columns() {
2072        let metadata = metadata_for_test();
2073        let primary_key_codec = build_primary_key_codec(&metadata);
2074
2075        // Create input batch with primary key columns (k0, k1)
2076        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2077        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2078        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2079        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2080        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2081
2082        let input_schema = Arc::new(Schema::new(vec![
2083            Field::new("k0", ArrowDataType::Utf8, false),
2084            Field::new("k1", ArrowDataType::UInt32, false),
2085            Field::new("v0", ArrowDataType::Int64, true),
2086            Field::new("v1", ArrowDataType::Float64, true),
2087            Field::new(
2088                "ts",
2089                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2090                false,
2091            ),
2092        ]));
2093
2094        let input_batch = RecordBatch::try_new(
2095            input_schema,
2096            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2097        )
2098        .unwrap();
2099
2100        let part = BulkPart {
2101            batch: input_batch,
2102            max_timestamp: 2000,
2103            min_timestamp: 1000,
2104            sequence: 3,
2105            timestamp_index: 4,
2106            raw_data: None,
2107        };
2108
2109        let output_schema = to_flat_sst_arrow_schema(
2110            &metadata,
2111            &FlatSchemaOptions {
2112                raw_pk_columns: false,
2113                string_pk_use_dict: true,
2114            },
2115        );
2116
2117        let result = convert_bulk_part(
2118            part,
2119            &metadata,
2120            primary_key_codec,
2121            output_schema,
2122            false, // don't store primary key columns
2123        )
2124        .unwrap();
2125
2126        let converted = result.unwrap();
2127
2128        assert_eq!(converted.num_rows(), 2);
2129        assert_eq!(converted.max_timestamp, 2000);
2130        assert_eq!(converted.min_timestamp, 1000);
2131        assert_eq!(converted.sequence, 3);
2132
2133        // Verify schema does NOT include individual primary key columns
2134        let schema = converted.batch.schema();
2135        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2136        assert_eq!(
2137            field_names,
2138            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2139        );
2140
2141        // Verify __primary_key column is present and is a dictionary
2142        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2143        assert!(matches!(
2144            pk_col.data_type(),
2145            ArrowDataType::Dictionary(_, _)
2146        ));
2147    }
2148
2149    #[test]
2150    fn test_convert_bulk_part_sparse_encoding() {
2151        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2152        builder
2153            .push_column_metadata(ColumnMetadata {
2154                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2155                semantic_type: SemanticType::Tag,
2156                column_id: 0,
2157            })
2158            .push_column_metadata(ColumnMetadata {
2159                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2160                semantic_type: SemanticType::Tag,
2161                column_id: 1,
2162            })
2163            .push_column_metadata(ColumnMetadata {
2164                column_schema: ColumnSchema::new(
2165                    "ts",
2166                    ConcreteDataType::timestamp_millisecond_datatype(),
2167                    false,
2168                ),
2169                semantic_type: SemanticType::Timestamp,
2170                column_id: 2,
2171            })
2172            .push_column_metadata(ColumnMetadata {
2173                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2174                semantic_type: SemanticType::Field,
2175                column_id: 3,
2176            })
2177            .push_column_metadata(ColumnMetadata {
2178                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2179                semantic_type: SemanticType::Field,
2180                column_id: 4,
2181            })
2182            .primary_key(vec![0, 1])
2183            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2184        let metadata = Arc::new(builder.build().unwrap());
2185
2186        let primary_key_codec = build_primary_key_codec(&metadata);
2187
2188        // Create input batch with __primary_key column (sparse encoding)
2189        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2190            b"encoded_key_1".as_slice(),
2191            b"encoded_key_2".as_slice(),
2192        ]));
2193        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2194        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2195        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2196
2197        let input_schema = Arc::new(Schema::new(vec![
2198            Field::new("__primary_key", ArrowDataType::Binary, false),
2199            Field::new("v0", ArrowDataType::Int64, true),
2200            Field::new("v1", ArrowDataType::Float64, true),
2201            Field::new(
2202                "ts",
2203                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2204                false,
2205            ),
2206        ]));
2207
2208        let input_batch =
2209            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2210                .unwrap();
2211
2212        let part = BulkPart {
2213            batch: input_batch,
2214            max_timestamp: 2000,
2215            min_timestamp: 1000,
2216            sequence: 7,
2217            timestamp_index: 3,
2218            raw_data: None,
2219        };
2220
2221        let output_schema = to_flat_sst_arrow_schema(
2222            &metadata,
2223            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2224        );
2225
2226        let result = convert_bulk_part(
2227            part,
2228            &metadata,
2229            primary_key_codec,
2230            output_schema,
2231            true, // store_primary_key_columns (ignored for sparse)
2232        )
2233        .unwrap();
2234
2235        let converted = result.unwrap();
2236
2237        assert_eq!(converted.num_rows(), 2);
2238        assert_eq!(converted.max_timestamp, 2000);
2239        assert_eq!(converted.min_timestamp, 1000);
2240        assert_eq!(converted.sequence, 7);
2241
2242        // Verify schema does NOT include individual primary key columns (sparse encoding)
2243        let schema = converted.batch.schema();
2244        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2245        assert_eq!(
2246            field_names,
2247            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2248        );
2249
2250        // Verify __primary_key is dictionary encoded
2251        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2252        assert!(matches!(
2253            pk_col.data_type(),
2254            ArrowDataType::Dictionary(_, _)
2255        ));
2256    }
2257
2258    #[test]
2259    fn test_convert_bulk_part_sorting_with_multiple_series() {
2260        let metadata = metadata_for_test();
2261        let primary_key_codec = build_primary_key_codec(&metadata);
2262
2263        // Create unsorted batch with multiple series and timestamps
2264        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2265            "series_b", "series_a", "series_b", "series_a",
2266        ]));
2267        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2268        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2269        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2270        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2271            2000, 1000, 4000, 3000,
2272        ]));
2273
2274        let input_schema = Arc::new(Schema::new(vec![
2275            Field::new("k0", ArrowDataType::Utf8, false),
2276            Field::new("k1", ArrowDataType::UInt32, false),
2277            Field::new("v0", ArrowDataType::Int64, true),
2278            Field::new("v1", ArrowDataType::Float64, true),
2279            Field::new(
2280                "ts",
2281                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2282                false,
2283            ),
2284        ]));
2285
2286        let input_batch = RecordBatch::try_new(
2287            input_schema,
2288            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2289        )
2290        .unwrap();
2291
2292        let part = BulkPart {
2293            batch: input_batch,
2294            max_timestamp: 4000,
2295            min_timestamp: 1000,
2296            sequence: 10,
2297            timestamp_index: 4,
2298            raw_data: None,
2299        };
2300
2301        let output_schema = to_flat_sst_arrow_schema(
2302            &metadata,
2303            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2304        );
2305
2306        let result =
2307            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2308
2309        let converted = result.unwrap();
2310
2311        assert_eq!(converted.num_rows(), 4);
2312
2313        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2314        let ts_col = converted.batch.column(converted.timestamp_index);
2315        let ts_array = ts_col
2316            .as_any()
2317            .downcast_ref::<TimestampMillisecondArray>()
2318            .unwrap();
2319
2320        // After sorting by (pk, ts), we should have:
2321        // series_a,1: ts=1000, 3000
2322        // series_b,2: ts=2000, 4000
2323        let timestamps: Vec<i64> = ts_array.values().to_vec();
2324        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2325    }
2326}