mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use snafu::{OptionExt, ResultExt, Snafu};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
59use table::predicate::Predicate;
60
61use crate::error::{
62    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
63    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
64    InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
65};
66use crate::memtable::bulk::context::BulkIterContextRef;
67use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
68use crate::memtable::time_series::{ValueBuilder, Values};
69use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
74use crate::sst::parquet::helper::parse_parquet_metadata;
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80/// A raw bulk part in the memtable.
81#[derive(Clone)]
82pub struct BulkPart {
83    pub batch: RecordBatch,
84    pub max_timestamp: i64,
85    pub min_timestamp: i64,
86    pub sequence: u64,
87    pub timestamp_index: usize,
88    pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92    type Error = error::Error;
93
94    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95        match value.body.expect("Entry payload should be present") {
96            Body::ArrowIpc(ipc) => {
97                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                let batch = decoder
100                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101                    .context(error::ConvertBulkWalEntrySnafu)?;
102                Ok(Self {
103                    batch,
104                    max_timestamp: value.max_ts,
105                    min_timestamp: value.min_ts,
106                    sequence: value.sequence,
107                    timestamp_index: value.timestamp_index as usize,
108                    raw_data: Some(ipc),
109                })
110            }
111        }
112    }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116    type Error = error::Error;
117
118    fn try_from(value: &BulkPart) -> Result<Self> {
119        if let Some(ipc) = &value.raw_data {
120            Ok(BulkWalEntry {
121                sequence: value.sequence,
122                max_ts: value.max_timestamp,
123                min_ts: value.min_timestamp,
124                timestamp_index: value.timestamp_index as u32,
125                body: Some(Body::ArrowIpc(ipc.clone())),
126            })
127        } else {
128            let mut encoder = FlightEncoder::default();
129            let schema_bytes = encoder
130                .encode_schema(value.batch.schema().as_ref())
131                .data_header;
132            let [rb_data] = encoder
133                .encode(FlightMessage::RecordBatch(value.batch.clone()))
134                .try_into()
135                .map_err(|_| {
136                    error::UnsupportedOperationSnafu {
137                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138                    }
139                    .build()
140                })?;
141            Ok(BulkWalEntry {
142                sequence: value.sequence,
143                max_ts: value.max_timestamp,
144                min_ts: value.min_timestamp,
145                timestamp_index: value.timestamp_index as u32,
146                body: Some(Body::ArrowIpc(ArrowIpc {
147                    schema: schema_bytes,
148                    data_header: rb_data.data_header,
149                    payload: rb_data.data_body,
150                })),
151            })
152        }
153    }
154}
155
156impl BulkPart {
157    pub(crate) fn estimated_size(&self) -> usize {
158        record_batch_estimated_size(&self.batch)
159    }
160
161    /// Returns the estimated series count in this BulkPart.
162    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
163    pub fn estimated_series_count(&self) -> usize {
164        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165        let pk_column = self.batch.column(pk_column_idx);
166        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167            dict_array.values().len()
168        } else {
169            0
170        }
171    }
172
173    /// Creates MemtableStats from this BulkPart.
174    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
175        let ts_type = region_metadata.time_index_type();
176        let min_ts = ts_type.create_timestamp(self.min_timestamp);
177        let max_ts = ts_type.create_timestamp(self.max_timestamp);
178
179        MemtableStats {
180            estimated_bytes: self.estimated_size(),
181            time_range: Some((min_ts, max_ts)),
182            num_rows: self.num_rows(),
183            num_ranges: 1,
184            max_sequence: self.sequence,
185            series_count: self.estimated_series_count(),
186        }
187    }
188
189    /// Fills missing columns in the BulkPart batch with default values.
190    ///
191    /// This function checks if the batch schema matches the region metadata schema,
192    /// and if there are missing columns, it fills them with default values (or null
193    /// for nullable columns).
194    ///
195    /// # Arguments
196    ///
197    /// * `region_metadata` - The region metadata containing the expected schema
198    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
199        // Builds a map of existing columns in the batch
200        let batch_schema = self.batch.schema();
201        let batch_columns: HashSet<_> = batch_schema
202            .fields()
203            .iter()
204            .map(|f| f.name().as_str())
205            .collect();
206
207        // Finds columns that need to be filled
208        let mut columns_to_fill = Vec::new();
209        for column_meta in &region_metadata.column_metadatas {
210            // TODO(yingwen): Returns error if it is impure default after we support filling
211            // bulk insert request in the frontend
212            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
213                columns_to_fill.push(column_meta);
214            }
215        }
216
217        if columns_to_fill.is_empty() {
218            return Ok(());
219        }
220
221        let num_rows = self.batch.num_rows();
222
223        let mut new_columns = Vec::new();
224        let mut new_fields = Vec::new();
225
226        // First, adds all existing columns
227        new_fields.extend(batch_schema.fields().iter().cloned());
228        new_columns.extend_from_slice(self.batch.columns());
229
230        let region_id = region_metadata.region_id;
231        // Then adds the missing columns with default values
232        for column_meta in columns_to_fill {
233            let default_vector = column_meta
234                .column_schema
235                .create_default_vector(num_rows)
236                .context(CreateDefaultSnafu {
237                    region_id,
238                    column: &column_meta.column_schema.name,
239                })?
240                .with_context(|| InvalidRequestSnafu {
241                    region_id,
242                    reason: format!(
243                        "column {} does not have default value",
244                        column_meta.column_schema.name
245                    ),
246                })?;
247            let arrow_array = default_vector.to_arrow_array();
248            column_meta.column_schema.data_type.as_arrow_type();
249
250            new_fields.push(Arc::new(Field::new(
251                column_meta.column_schema.name.clone(),
252                column_meta.column_schema.data_type.as_arrow_type(),
253                column_meta.column_schema.is_nullable(),
254            )));
255            new_columns.push(arrow_array);
256        }
257
258        // Create a new schema and batch with the filled columns
259        let new_schema = Arc::new(Schema::new(new_fields));
260        let new_batch =
261            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
262
263        // Update the batch
264        self.batch = new_batch;
265
266        Ok(())
267    }
268
269    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
270    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
271        let vectors = region_metadata
272            .schema
273            .column_schemas()
274            .iter()
275            .map(|col| match self.batch.column_by_name(&col.name) {
276                None => Ok(None),
277                Some(col) => Helper::try_into_vector(col).map(Some),
278            })
279            .collect::<datatypes::error::Result<Vec<_>>>()
280            .context(error::ComputeVectorSnafu)?;
281
282        let rows = (0..self.num_rows())
283            .map(|row_idx| {
284                let values = (0..self.batch.num_columns())
285                    .map(|col_idx| {
286                        if let Some(v) = &vectors[col_idx] {
287                            to_grpc_value(v.get(row_idx))
288                        } else {
289                            api::v1::Value { value_data: None }
290                        }
291                    })
292                    .collect::<Vec<_>>();
293                api::v1::Row { values }
294            })
295            .collect::<Vec<_>>();
296
297        let schema = region_metadata
298            .column_metadatas
299            .iter()
300            .map(|c| {
301                let data_type_wrapper =
302                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
303                Ok(api::v1::ColumnSchema {
304                    column_name: c.column_schema.name.clone(),
305                    datatype: data_type_wrapper.datatype() as i32,
306                    semantic_type: c.semantic_type as i32,
307                    ..Default::default()
308                })
309            })
310            .collect::<api::error::Result<Vec<_>>>()
311            .context(error::ConvertColumnDataTypeSnafu {
312                reason: "failed to convert region metadata to column schema",
313            })?;
314
315        let rows = api::v1::Rows { schema, rows };
316
317        Ok(Mutation {
318            op_type: OpType::Put as i32,
319            sequence: self.sequence,
320            rows: Some(rows),
321            write_hint: None,
322        })
323    }
324
325    pub fn timestamps(&self) -> &ArrayRef {
326        self.batch.column(self.timestamp_index)
327    }
328
329    pub fn num_rows(&self) -> usize {
330        self.batch.num_rows()
331    }
332}
333
334/// A collection of small unordered bulk parts.
335/// Used to batch small parts together before merging them into a sorted part.
336pub struct UnorderedPart {
337    /// Small bulk parts that haven't been sorted yet.
338    parts: Vec<BulkPart>,
339    /// Total number of rows across all parts.
340    total_rows: usize,
341    /// Minimum timestamp across all parts.
342    min_timestamp: i64,
343    /// Maximum timestamp across all parts.
344    max_timestamp: i64,
345    /// Maximum sequence number across all parts.
346    max_sequence: u64,
347    /// Row count threshold for accepting parts (default: 1024).
348    threshold: usize,
349    /// Row count threshold for compacting (default: 4096).
350    compact_threshold: usize,
351}
352
353impl Default for UnorderedPart {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359impl UnorderedPart {
360    /// Creates a new empty UnorderedPart.
361    pub fn new() -> Self {
362        Self {
363            parts: Vec::new(),
364            total_rows: 0,
365            min_timestamp: i64::MAX,
366            max_timestamp: i64::MIN,
367            max_sequence: 0,
368            threshold: 1024,
369            compact_threshold: 4096,
370        }
371    }
372
373    /// Sets the threshold for accepting parts into unordered_part.
374    pub fn set_threshold(&mut self, threshold: usize) {
375        self.threshold = threshold;
376    }
377
378    /// Sets the threshold for compacting unordered_part.
379    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
380        self.compact_threshold = compact_threshold;
381    }
382
383    /// Returns the threshold for accepting parts.
384    pub fn threshold(&self) -> usize {
385        self.threshold
386    }
387
388    /// Returns the compact threshold.
389    pub fn compact_threshold(&self) -> usize {
390        self.compact_threshold
391    }
392
393    /// Returns true if this part should accept the given row count.
394    pub fn should_accept(&self, num_rows: usize) -> bool {
395        num_rows < self.threshold
396    }
397
398    /// Returns true if this part should be compacted.
399    pub fn should_compact(&self) -> bool {
400        self.total_rows >= self.compact_threshold
401    }
402
403    /// Adds a BulkPart to this unordered collection.
404    pub fn push(&mut self, part: BulkPart) {
405        self.total_rows += part.num_rows();
406        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
407        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
408        self.max_sequence = self.max_sequence.max(part.sequence);
409        self.parts.push(part);
410    }
411
412    /// Returns the total number of rows across all parts.
413    pub fn num_rows(&self) -> usize {
414        self.total_rows
415    }
416
417    /// Returns true if there are no parts.
418    pub fn is_empty(&self) -> bool {
419        self.parts.is_empty()
420    }
421
422    /// Returns the number of parts in this collection.
423    pub fn num_parts(&self) -> usize {
424        self.parts.len()
425    }
426
427    /// Concatenates and sorts all parts into a single RecordBatch.
428    /// Returns None if the collection is empty.
429    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
430        if self.parts.is_empty() {
431            return Ok(None);
432        }
433
434        if self.parts.len() == 1 {
435            // If there's only one part, return its batch directly
436            return Ok(Some(self.parts[0].batch.clone()));
437        }
438
439        // Get the schema from the first part
440        let schema = self.parts[0].batch.schema();
441
442        // Concatenate all record batches
443        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
444        let concatenated =
445            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
446
447        // Sort the concatenated batch
448        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
449
450        Ok(Some(sorted_batch))
451    }
452
453    /// Converts all parts into a single sorted BulkPart.
454    /// Returns None if the collection is empty.
455    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
456        let Some(sorted_batch) = self.concat_and_sort()? else {
457            return Ok(None);
458        };
459
460        let timestamp_index = self.parts[0].timestamp_index;
461
462        Ok(Some(BulkPart {
463            batch: sorted_batch,
464            max_timestamp: self.max_timestamp,
465            min_timestamp: self.min_timestamp,
466            sequence: self.max_sequence,
467            timestamp_index,
468            raw_data: None,
469        }))
470    }
471
472    /// Clears all parts from this collection.
473    pub fn clear(&mut self) {
474        self.parts.clear();
475        self.total_rows = 0;
476        self.min_timestamp = i64::MAX;
477        self.max_timestamp = i64::MIN;
478        self.max_sequence = 0;
479    }
480}
481
482/// More accurate estimation of the size of a record batch.
483pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
484    batch
485        .columns()
486        .iter()
487        // If can not get slice memory size, assume 0 here.
488        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
489        .sum()
490}
491
492/// Primary key column builder for handling strings specially.
493enum PrimaryKeyColumnBuilder {
494    /// String dictionary builder for string types.
495    StringDict(StringDictionaryBuilder<UInt32Type>),
496    /// Generic mutable vector for other types.
497    Vector(Box<dyn MutableVector>),
498}
499
500impl PrimaryKeyColumnBuilder {
501    /// Appends a value to the builder.
502    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
503        match self {
504            PrimaryKeyColumnBuilder::StringDict(builder) => {
505                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
506                    // We know the value is a string.
507                    builder.append_value(s);
508                } else {
509                    builder.append_null();
510                }
511            }
512            PrimaryKeyColumnBuilder::Vector(builder) => {
513                builder.push_value_ref(&value);
514            }
515        }
516        Ok(())
517    }
518
519    /// Converts the builder to an ArrayRef.
520    fn into_arrow_array(self) -> ArrayRef {
521        match self {
522            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
523            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
524        }
525    }
526}
527
528/// Converter that converts structs into [BulkPart].
529pub struct BulkPartConverter {
530    /// Region metadata.
531    region_metadata: RegionMetadataRef,
532    /// Schema of the converted batch.
533    schema: SchemaRef,
534    /// Primary key codec for encoding keys
535    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
536    /// Buffer for encoding primary key.
537    key_buf: Vec<u8>,
538    /// Primary key array builder.
539    key_array_builder: PrimaryKeyArrayBuilder,
540    /// Builders for non-primary key columns.
541    value_builder: ValueBuilder,
542    /// Builders for individual primary key columns.
543    /// The order of builders is the same as the order of primary key columns in the region metadata.
544    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
545
546    /// Max timestamp value.
547    max_ts: i64,
548    /// Min timestamp value.
549    min_ts: i64,
550    /// Max sequence number.
551    max_sequence: SequenceNumber,
552}
553
554impl BulkPartConverter {
555    /// Creates a new converter.
556    ///
557    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
558    /// stores primary key columns in arrays additionally.
559    pub fn new(
560        region_metadata: &RegionMetadataRef,
561        schema: SchemaRef,
562        capacity: usize,
563        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
564        store_primary_key_columns: bool,
565    ) -> Self {
566        debug_assert_eq!(
567            region_metadata.primary_key_encoding,
568            primary_key_codec.encoding()
569        );
570
571        let primary_key_column_builders = if store_primary_key_columns
572            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
573        {
574            new_primary_key_column_builders(region_metadata, capacity)
575        } else {
576            Vec::new()
577        };
578
579        Self {
580            region_metadata: region_metadata.clone(),
581            schema,
582            primary_key_codec,
583            key_buf: Vec::new(),
584            key_array_builder: PrimaryKeyArrayBuilder::new(),
585            value_builder: ValueBuilder::new(region_metadata, capacity),
586            primary_key_column_builders,
587            min_ts: i64::MAX,
588            max_ts: i64::MIN,
589            max_sequence: SequenceNumber::MIN,
590        }
591    }
592
593    /// Appends a [KeyValues] into the converter.
594    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595        for kv in key_values.iter() {
596            self.append_key_value(&kv)?;
597        }
598
599        Ok(())
600    }
601
602    /// Appends a [KeyValue] to builders.
603    ///
604    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
605    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606        // Handles primary key based on encoding type
607        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608            // For sparse encoding, the primary key is already encoded in the KeyValue
609            // Gets the first (and only) primary key value which contains the encoded key
610            let mut primary_keys = kv.primary_keys();
611            if let Some(encoded) = primary_keys
612                .next()
613                .context(ColumnNotFoundSnafu {
614                    column: PRIMARY_KEY_COLUMN_NAME,
615                })?
616                .try_into_binary()
617                .context(DataTypeMismatchSnafu)?
618            {
619                self.key_array_builder
620                    .append(encoded)
621                    .context(ComputeArrowSnafu)?;
622            } else {
623                self.key_array_builder
624                    .append("")
625                    .context(ComputeArrowSnafu)?;
626            }
627        } else {
628            // For dense encoding, we need to encode the primary key columns
629            self.key_buf.clear();
630            self.primary_key_codec
631                .encode_key_value(kv, &mut self.key_buf)
632                .context(EncodeSnafu)?;
633            self.key_array_builder
634                .append(&self.key_buf)
635                .context(ComputeArrowSnafu)?;
636        };
637
638        // If storing primary key columns, append values to individual builders
639        if !self.primary_key_column_builders.is_empty() {
640            for (builder, pk_value) in self
641                .primary_key_column_builders
642                .iter_mut()
643                .zip(kv.primary_keys())
644            {
645                builder.push_value_ref(pk_value)?;
646            }
647        }
648
649        // Pushes other columns.
650        self.value_builder.push(
651            kv.timestamp(),
652            kv.sequence(),
653            kv.op_type() as u8,
654            kv.fields(),
655        );
656
657        // Updates statistics
658        // Safety: timestamp of kv must be both present and a valid timestamp value.
659        let ts = kv
660            .timestamp()
661            .try_into_timestamp()
662            .unwrap()
663            .unwrap()
664            .value();
665        self.min_ts = self.min_ts.min(ts);
666        self.max_ts = self.max_ts.max(ts);
667        self.max_sequence = self.max_sequence.max(kv.sequence());
668
669        Ok(())
670    }
671
672    /// Converts buffered content into a [BulkPart].
673    ///
674    /// It sorts the record batch by (primary key, timestamp, sequence desc).
675    pub fn convert(mut self) -> Result<BulkPart> {
676        let values = Values::from(self.value_builder);
677        let mut columns =
678            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680        // Build primary key column arrays if enabled.
681        for builder in self.primary_key_column_builders {
682            columns.push(builder.into_arrow_array());
683        }
684        // Then fields columns.
685        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686        // Time index.
687        let timestamp_index = columns.len();
688        columns.push(values.timestamp.to_arrow_array());
689        // Primary key.
690        let pk_array = self.key_array_builder.finish();
691        columns.push(Arc::new(pk_array));
692        // Sequence and op type.
693        columns.push(values.sequence.to_arrow_array());
694        columns.push(values.op_type.to_arrow_array());
695
696        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
697        // Sorts the record batch.
698        let batch = sort_primary_key_record_batch(&batch)?;
699
700        Ok(BulkPart {
701            batch,
702            max_timestamp: self.max_ts,
703            min_timestamp: self.min_ts,
704            sequence: self.max_sequence,
705            timestamp_index,
706            raw_data: None,
707        })
708    }
709}
710
711fn new_primary_key_column_builders(
712    metadata: &RegionMetadata,
713    capacity: usize,
714) -> Vec<PrimaryKeyColumnBuilder> {
715    metadata
716        .primary_key_columns()
717        .map(|col| {
718            if col.column_schema.data_type.is_string() {
719                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
720                    capacity,
721                    INIT_DICT_VALUE_CAPACITY,
722                    capacity,
723                ))
724            } else {
725                PrimaryKeyColumnBuilder::Vector(
726                    col.column_schema.data_type.create_mutable_vector(capacity),
727                )
728            }
729        })
730        .collect()
731}
732
733/// Sorts the record batch with primary key format.
734pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
735    let total_columns = batch.num_columns();
736    let sort_columns = vec![
737        // Primary key column (ascending)
738        SortColumn {
739            values: batch.column(total_columns - 3).clone(),
740            options: Some(SortOptions {
741                descending: false,
742                nulls_first: true,
743            }),
744        },
745        // Time index column (ascending)
746        SortColumn {
747            values: batch.column(total_columns - 4).clone(),
748            options: Some(SortOptions {
749                descending: false,
750                nulls_first: true,
751            }),
752        },
753        // Sequence column (descending)
754        SortColumn {
755            values: batch.column(total_columns - 2).clone(),
756            options: Some(SortOptions {
757                descending: true,
758                nulls_first: true,
759            }),
760        },
761    ];
762
763    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
764        .context(ComputeArrowSnafu)?;
765
766    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
767}
768
769/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
770/// with the same format as produced by [BulkPartConverter].
771///
772/// This function takes a `BulkPart` where:
773/// - For dense encoding: Primary key columns may be stored as individual columns
774/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
775/// - The batch may not be sorted
776///
777/// And produces a `BulkPart` where:
778/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
779/// - An encoded `__primary_key` dictionary column is present
780/// - The batch is sorted by (primary_key, timestamp, sequence desc)
781///
782/// # Arguments
783///
784/// * `part` - The input `BulkPart` to convert
785/// * `region_metadata` - Region metadata containing schema information
786/// * `primary_key_codec` - Codec for encoding primary keys
787/// * `schema` - Target schema for the output batch
788/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
789///
790/// # Returns
791///
792/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
793/// encoded primary keys and sorted data.
794pub fn convert_bulk_part(
795    part: BulkPart,
796    region_metadata: &RegionMetadataRef,
797    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
798    schema: SchemaRef,
799    store_primary_key_columns: bool,
800) -> Result<Option<BulkPart>> {
801    if part.num_rows() == 0 {
802        return Ok(None);
803    }
804
805    let num_rows = part.num_rows();
806    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
807
808    // Builds a column name-to-index map for efficient lookups
809    let input_schema = part.batch.schema();
810    let column_indices: HashMap<&str, usize> = input_schema
811        .fields()
812        .iter()
813        .enumerate()
814        .map(|(idx, field)| (field.name().as_str(), idx))
815        .collect();
816
817    // Determines the structure of the input batch by looking up columns by name
818    let mut output_columns = Vec::new();
819
820    // Extracts primary key columns if we need to encode them (dense encoding)
821    let pk_array = if is_sparse {
822        // For sparse encoding, the input should already have the __primary_key column
823        // We need to find it in the input batch
824        None
825    } else {
826        // For dense encoding, extract and encode primary key columns by name
827        let pk_vectors: Result<Vec<_>> = region_metadata
828            .primary_key_columns()
829            .map(|col_meta| {
830                let col_idx = column_indices
831                    .get(col_meta.column_schema.name.as_str())
832                    .context(ColumnNotFoundSnafu {
833                        column: &col_meta.column_schema.name,
834                    })?;
835                let col = part.batch.column(*col_idx);
836                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
837            })
838            .collect();
839        let pk_vectors = pk_vectors?;
840
841        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
842        let mut encode_buf = Vec::new();
843
844        for row_idx in 0..num_rows {
845            encode_buf.clear();
846
847            // Collects primary key values with column IDs for this row
848            let pk_values_with_ids: Vec<_> = region_metadata
849                .primary_key
850                .iter()
851                .zip(pk_vectors.iter())
852                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
853                .collect();
854
855            // Encodes the primary key
856            primary_key_codec
857                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
858                .context(EncodeSnafu)?;
859
860            key_array_builder
861                .append(&encode_buf)
862                .context(ComputeArrowSnafu)?;
863        }
864
865        Some(key_array_builder.finish())
866    };
867
868    // Adds primary key columns if storing them (only for dense encoding)
869    if store_primary_key_columns && !is_sparse {
870        for col_meta in region_metadata.primary_key_columns() {
871            let col_idx = column_indices
872                .get(col_meta.column_schema.name.as_str())
873                .context(ColumnNotFoundSnafu {
874                    column: &col_meta.column_schema.name,
875                })?;
876            let col = part.batch.column(*col_idx);
877
878            // Converts to dictionary if needed for string types
879            let col = if col_meta.column_schema.data_type.is_string() {
880                let target_type = ArrowDataType::Dictionary(
881                    Box::new(ArrowDataType::UInt32),
882                    Box::new(ArrowDataType::Utf8),
883                );
884                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
885            } else {
886                col.clone()
887            };
888            output_columns.push(col);
889        }
890    }
891
892    // Adds field columns
893    for col_meta in region_metadata.field_columns() {
894        let col_idx = column_indices
895            .get(col_meta.column_schema.name.as_str())
896            .context(ColumnNotFoundSnafu {
897                column: &col_meta.column_schema.name,
898            })?;
899        output_columns.push(part.batch.column(*col_idx).clone());
900    }
901
902    // Adds timestamp column
903    let new_timestamp_index = output_columns.len();
904    let ts_col_idx = column_indices
905        .get(
906            region_metadata
907                .time_index_column()
908                .column_schema
909                .name
910                .as_str(),
911        )
912        .context(ColumnNotFoundSnafu {
913            column: &region_metadata.time_index_column().column_schema.name,
914        })?;
915    output_columns.push(part.batch.column(*ts_col_idx).clone());
916
917    // Adds encoded primary key dictionary column
918    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
919        Arc::new(pk_dict_array) as ArrayRef
920    } else {
921        let pk_col_idx =
922            column_indices
923                .get(PRIMARY_KEY_COLUMN_NAME)
924                .context(ColumnNotFoundSnafu {
925                    column: PRIMARY_KEY_COLUMN_NAME,
926                })?;
927        let col = part.batch.column(*pk_col_idx);
928
929        // Casts to dictionary type if needed
930        let target_type = ArrowDataType::Dictionary(
931            Box::new(ArrowDataType::UInt32),
932            Box::new(ArrowDataType::Binary),
933        );
934        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
935    };
936    output_columns.push(pk_dictionary);
937
938    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
939    output_columns.push(Arc::new(sequence_array) as ArrayRef);
940
941    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
942    output_columns.push(Arc::new(op_type_array) as ArrayRef);
943
944    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
945
946    // Sorts the batch by (primary_key, timestamp, sequence desc)
947    let sorted_batch = sort_primary_key_record_batch(&batch)?;
948
949    Ok(Some(BulkPart {
950        batch: sorted_batch,
951        max_timestamp: part.max_timestamp,
952        min_timestamp: part.min_timestamp,
953        sequence: part.sequence,
954        timestamp_index: new_timestamp_index,
955        raw_data: None,
956    }))
957}
958
959#[derive(Debug, Clone)]
960pub struct EncodedBulkPart {
961    data: Bytes,
962    metadata: BulkPartMeta,
963}
964
965impl EncodedBulkPart {
966    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
967        Self { data, metadata }
968    }
969
970    pub(crate) fn metadata(&self) -> &BulkPartMeta {
971        &self.metadata
972    }
973
974    /// Returns the size of the encoded data in bytes
975    pub(crate) fn size_bytes(&self) -> usize {
976        self.data.len()
977    }
978
979    /// Returns the encoded data.
980    pub(crate) fn data(&self) -> &Bytes {
981        &self.data
982    }
983
984    /// Creates MemtableStats from this EncodedBulkPart.
985    pub fn to_memtable_stats(&self) -> MemtableStats {
986        let meta = &self.metadata;
987        let ts_type = meta.region_metadata.time_index_type();
988        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
989        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
990
991        MemtableStats {
992            estimated_bytes: self.size_bytes(),
993            time_range: Some((min_ts, max_ts)),
994            num_rows: meta.num_rows,
995            num_ranges: 1,
996            max_sequence: meta.max_sequence,
997            series_count: meta.num_series as usize,
998        }
999    }
1000
1001    /// Converts this `EncodedBulkPart` to `SstInfo`.
1002    ///
1003    /// # Arguments
1004    /// * `file_id` - The SST file ID to assign to this part
1005    ///
1006    /// # Returns
1007    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1008    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1009        let unit = self.metadata.region_metadata.time_index_type().unit();
1010        let max_row_group_uncompressed_size: u64 = self
1011            .metadata
1012            .parquet_metadata
1013            .row_groups()
1014            .iter()
1015            .map(|rg| {
1016                rg.columns()
1017                    .iter()
1018                    .map(|c| c.uncompressed_size() as u64)
1019                    .sum::<u64>()
1020            })
1021            .max()
1022            .unwrap_or(0);
1023        SstInfo {
1024            file_id,
1025            time_range: (
1026                Timestamp::new(self.metadata.min_timestamp, unit),
1027                Timestamp::new(self.metadata.max_timestamp, unit),
1028            ),
1029            file_size: self.data.len() as u64,
1030            max_row_group_uncompressed_size,
1031            num_rows: self.metadata.num_rows,
1032            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1033            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1034            index_metadata: IndexOutput::default(),
1035            num_series: self.metadata.num_series,
1036        }
1037    }
1038
1039    pub(crate) fn read(
1040        &self,
1041        context: BulkIterContextRef,
1042        sequence: Option<SequenceRange>,
1043        mem_scan_metrics: Option<MemScanMetrics>,
1044    ) -> Result<Option<BoxedRecordBatchIterator>> {
1045        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
1046        let skip_fields_for_pruning =
1047            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1048
1049        // use predicate to find row groups to read.
1050        let row_groups_to_read =
1051            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1052
1053        if row_groups_to_read.is_empty() {
1054            // All row groups are filtered.
1055            return Ok(None);
1056        }
1057
1058        let iter = EncodedBulkPartIter::try_new(
1059            self,
1060            context,
1061            row_groups_to_read,
1062            sequence,
1063            mem_scan_metrics,
1064        )?;
1065        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1066    }
1067
1068    /// Computes whether to skip field columns based on PreFilterMode.
1069    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1070        match pre_filter_mode {
1071            PreFilterMode::All => false,
1072            PreFilterMode::SkipFields => true,
1073            PreFilterMode::SkipFieldsOnDelete => {
1074                // Check if any row group contains delete op
1075                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1076                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1077                })
1078            }
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone)]
1084pub struct BulkPartMeta {
1085    /// Total rows in part.
1086    pub num_rows: usize,
1087    /// Max timestamp in part.
1088    pub max_timestamp: i64,
1089    /// Min timestamp in part.
1090    pub min_timestamp: i64,
1091    /// Part file metadata.
1092    pub parquet_metadata: Arc<ParquetMetaData>,
1093    /// Part region schema.
1094    pub region_metadata: RegionMetadataRef,
1095    /// Number of series.
1096    pub num_series: u64,
1097    /// Maximum sequence number in part.
1098    pub max_sequence: u64,
1099}
1100
1101/// Metrics for encoding a part.
1102#[derive(Default, Debug)]
1103pub struct BulkPartEncodeMetrics {
1104    /// Cost of iterating over the data.
1105    pub iter_cost: Duration,
1106    /// Cost of writing the data.
1107    pub write_cost: Duration,
1108    /// Size of data before encoding.
1109    pub raw_size: usize,
1110    /// Size of data after encoding.
1111    pub encoded_size: usize,
1112    /// Number of rows in part.
1113    pub num_rows: usize,
1114}
1115
1116pub struct BulkPartEncoder {
1117    metadata: RegionMetadataRef,
1118    row_group_size: usize,
1119    writer_props: Option<WriterProperties>,
1120}
1121
1122impl BulkPartEncoder {
1123    pub(crate) fn new(
1124        metadata: RegionMetadataRef,
1125        row_group_size: usize,
1126    ) -> Result<BulkPartEncoder> {
1127        // TODO(yingwen): Skip arrow schema if needed.
1128        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1129        let key_value_meta =
1130            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1131
1132        // TODO(yingwen): Do we need compression?
1133        let writer_props = Some(
1134            WriterProperties::builder()
1135                .set_key_value_metadata(Some(vec![key_value_meta]))
1136                .set_write_batch_size(row_group_size)
1137                .set_max_row_group_size(row_group_size)
1138                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1139                .set_column_index_truncate_length(None)
1140                .set_statistics_truncate_length(None)
1141                .build(),
1142        );
1143
1144        Ok(Self {
1145            metadata,
1146            row_group_size,
1147            writer_props,
1148        })
1149    }
1150}
1151
1152impl BulkPartEncoder {
1153    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1154    pub fn encode_record_batch_iter(
1155        &self,
1156        iter: BoxedRecordBatchIterator,
1157        arrow_schema: SchemaRef,
1158        min_timestamp: i64,
1159        max_timestamp: i64,
1160        max_sequence: u64,
1161        metrics: &mut BulkPartEncodeMetrics,
1162    ) -> Result<Option<EncodedBulkPart>> {
1163        let mut buf = Vec::with_capacity(4096);
1164        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1165            .context(EncodeMemtableSnafu)?;
1166        let mut total_rows = 0;
1167        let mut series_estimator = SeriesEstimator::default();
1168
1169        // Process each batch from the iterator
1170        let mut iter_start = Instant::now();
1171        for batch_result in iter {
1172            metrics.iter_cost += iter_start.elapsed();
1173            let batch = batch_result?;
1174            if batch.num_rows() == 0 {
1175                continue;
1176            }
1177
1178            series_estimator.update_flat(&batch);
1179            metrics.raw_size += record_batch_estimated_size(&batch);
1180            let write_start = Instant::now();
1181            writer.write(&batch).context(EncodeMemtableSnafu)?;
1182            metrics.write_cost += write_start.elapsed();
1183            total_rows += batch.num_rows();
1184            iter_start = Instant::now();
1185        }
1186        metrics.iter_cost += iter_start.elapsed();
1187        iter_start = Instant::now();
1188
1189        if total_rows == 0 {
1190            return Ok(None);
1191        }
1192
1193        let close_start = Instant::now();
1194        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1195        metrics.write_cost += close_start.elapsed();
1196        metrics.encoded_size += buf.len();
1197        metrics.num_rows += total_rows;
1198
1199        let buf = Bytes::from(buf);
1200        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1201        let num_series = series_estimator.finish();
1202
1203        Ok(Some(EncodedBulkPart {
1204            data: buf,
1205            metadata: BulkPartMeta {
1206                num_rows: total_rows,
1207                max_timestamp,
1208                min_timestamp,
1209                parquet_metadata,
1210                region_metadata: self.metadata.clone(),
1211                num_series,
1212                max_sequence,
1213            },
1214        }))
1215    }
1216
1217    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1218    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1219        if part.batch.num_rows() == 0 {
1220            return Ok(None);
1221        }
1222
1223        let mut buf = Vec::with_capacity(4096);
1224        let arrow_schema = part.batch.schema();
1225
1226        let file_metadata = {
1227            let mut writer =
1228                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1229                    .context(EncodeMemtableSnafu)?;
1230            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1231            writer.finish().context(EncodeMemtableSnafu)?
1232        };
1233
1234        let buf = Bytes::from(buf);
1235        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
1236
1237        Ok(Some(EncodedBulkPart {
1238            data: buf,
1239            metadata: BulkPartMeta {
1240                num_rows: part.batch.num_rows(),
1241                max_timestamp: part.max_timestamp,
1242                min_timestamp: part.min_timestamp,
1243                parquet_metadata,
1244                region_metadata: self.metadata.clone(),
1245                num_series: part.estimated_series_count() as u64,
1246                max_sequence: part.sequence,
1247            },
1248        }))
1249    }
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use api::v1::{Row, SemanticType, WriteHint};
1255    use datafusion_common::ScalarValue;
1256    use datatypes::arrow::array::Float64Array;
1257    use datatypes::prelude::{ConcreteDataType, Value};
1258    use datatypes::schema::ColumnSchema;
1259    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1260    use store_api::storage::RegionId;
1261    use store_api::storage::consts::ReservedColumnId;
1262
1263    use super::*;
1264    use crate::memtable::bulk::context::BulkIterContext;
1265    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1266    use crate::test_util::memtable_util::{
1267        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1268    };
1269
1270    struct MutationInput<'a> {
1271        k0: &'a str,
1272        k1: u32,
1273        timestamps: &'a [i64],
1274        v1: &'a [Option<f64>],
1275        sequence: u64,
1276    }
1277
1278    #[derive(Debug, PartialOrd, PartialEq)]
1279    struct BatchOutput<'a> {
1280        pk_values: &'a [Value],
1281        timestamps: &'a [i64],
1282        v1: &'a [Option<f64>],
1283    }
1284
1285    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1286        let metadata = metadata_for_test();
1287        let kvs = input
1288            .iter()
1289            .map(|m| {
1290                build_key_values_with_ts_seq_values(
1291                    &metadata,
1292                    m.k0.to_string(),
1293                    m.k1,
1294                    m.timestamps.iter().copied(),
1295                    m.v1.iter().copied(),
1296                    m.sequence,
1297                )
1298            })
1299            .collect::<Vec<_>>();
1300        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1301        let primary_key_codec = build_primary_key_codec(&metadata);
1302        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1303        for kv in kvs {
1304            converter.append_key_values(&kv).unwrap();
1305        }
1306        let part = converter.convert().unwrap();
1307        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1308        encoder.encode_part(&part).unwrap().unwrap()
1309    }
1310
1311    #[test]
1312    fn test_write_and_read_part_projection() {
1313        let part = encode(&[
1314            MutationInput {
1315                k0: "a",
1316                k1: 0,
1317                timestamps: &[1],
1318                v1: &[Some(0.1)],
1319                sequence: 0,
1320            },
1321            MutationInput {
1322                k0: "b",
1323                k1: 0,
1324                timestamps: &[1],
1325                v1: &[Some(0.0)],
1326                sequence: 0,
1327            },
1328            MutationInput {
1329                k0: "a",
1330                k1: 0,
1331                timestamps: &[2],
1332                v1: &[Some(0.2)],
1333                sequence: 1,
1334            },
1335        ]);
1336
1337        let projection = &[4u32];
1338        let mut reader = part
1339            .read(
1340                Arc::new(
1341                    BulkIterContext::new(
1342                        part.metadata.region_metadata.clone(),
1343                        Some(projection.as_slice()),
1344                        None,
1345                        false,
1346                    )
1347                    .unwrap(),
1348                ),
1349                None,
1350                None,
1351            )
1352            .unwrap()
1353            .expect("expect at least one row group");
1354
1355        let mut total_rows_read = 0;
1356        let mut field: Vec<f64> = vec![];
1357        for res in reader {
1358            let batch = res.unwrap();
1359            assert_eq!(5, batch.num_columns());
1360            field.extend_from_slice(
1361                batch
1362                    .column(0)
1363                    .as_any()
1364                    .downcast_ref::<Float64Array>()
1365                    .unwrap()
1366                    .values(),
1367            );
1368            total_rows_read += batch.num_rows();
1369        }
1370        assert_eq!(3, total_rows_read);
1371        assert_eq!(vec![0.1, 0.2, 0.0], field);
1372    }
1373
1374    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1375        let metadata = metadata_for_test();
1376        let kvs = key_values
1377            .into_iter()
1378            .map(|(k0, k1, (start, end), sequence)| {
1379                let ts = (start..end);
1380                let v1 = (start..end).map(|_| None);
1381                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1382            })
1383            .collect::<Vec<_>>();
1384        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1385        let primary_key_codec = build_primary_key_codec(&metadata);
1386        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1387        for kv in kvs {
1388            converter.append_key_values(&kv).unwrap();
1389        }
1390        let part = converter.convert().unwrap();
1391        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1392        encoder.encode_part(&part).unwrap().unwrap()
1393    }
1394
1395    fn check_prune_row_group(
1396        part: &EncodedBulkPart,
1397        predicate: Option<Predicate>,
1398        expected_rows: usize,
1399    ) {
1400        let context = Arc::new(
1401            BulkIterContext::new(
1402                part.metadata.region_metadata.clone(),
1403                None,
1404                predicate,
1405                false,
1406            )
1407            .unwrap(),
1408        );
1409        let mut reader = part
1410            .read(context, None, None)
1411            .unwrap()
1412            .expect("expect at least one row group");
1413        let mut total_rows_read = 0;
1414        for res in reader {
1415            let batch = res.unwrap();
1416            total_rows_read += batch.num_rows();
1417        }
1418        // Should only read row group 1.
1419        assert_eq!(expected_rows, total_rows_read);
1420    }
1421
1422    #[test]
1423    fn test_prune_row_groups() {
1424        let part = prepare(vec![
1425            ("a", 0, (0, 40), 1),
1426            ("a", 1, (0, 60), 1),
1427            ("b", 0, (0, 100), 2),
1428            ("b", 1, (100, 180), 3),
1429            ("b", 1, (180, 210), 4),
1430        ]);
1431
1432        let context = Arc::new(
1433            BulkIterContext::new(
1434                part.metadata.region_metadata.clone(),
1435                None,
1436                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1437                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1438                )])),
1439                false,
1440            )
1441            .unwrap(),
1442        );
1443        assert!(part.read(context, None, None).unwrap().is_none());
1444
1445        check_prune_row_group(&part, None, 310);
1446
1447        check_prune_row_group(
1448            &part,
1449            Some(Predicate::new(vec![
1450                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1451                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1452            ])),
1453            40,
1454        );
1455
1456        check_prune_row_group(
1457            &part,
1458            Some(Predicate::new(vec![
1459                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1460                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1461            ])),
1462            60,
1463        );
1464
1465        check_prune_row_group(
1466            &part,
1467            Some(Predicate::new(vec![
1468                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1469            ])),
1470            100,
1471        );
1472
1473        check_prune_row_group(
1474            &part,
1475            Some(Predicate::new(vec![
1476                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1477                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1478            ])),
1479            100,
1480        );
1481
1482        /// Predicates over field column can do precise filtering.
1483        check_prune_row_group(
1484            &part,
1485            Some(Predicate::new(vec![
1486                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1487            ])),
1488            1,
1489        );
1490    }
1491
1492    #[test]
1493    fn test_bulk_part_converter_append_and_convert() {
1494        let metadata = metadata_for_test();
1495        let capacity = 100;
1496        let primary_key_codec = build_primary_key_codec(&metadata);
1497        let schema = to_flat_sst_arrow_schema(
1498            &metadata,
1499            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1500        );
1501
1502        let mut converter =
1503            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1504
1505        let key_values1 = build_key_values_with_ts_seq_values(
1506            &metadata,
1507            "key1".to_string(),
1508            1u32,
1509            vec![1000, 2000].into_iter(),
1510            vec![Some(1.0), Some(2.0)].into_iter(),
1511            1,
1512        );
1513
1514        let key_values2 = build_key_values_with_ts_seq_values(
1515            &metadata,
1516            "key2".to_string(),
1517            2u32,
1518            vec![1500].into_iter(),
1519            vec![Some(3.0)].into_iter(),
1520            2,
1521        );
1522
1523        converter.append_key_values(&key_values1).unwrap();
1524        converter.append_key_values(&key_values2).unwrap();
1525
1526        let bulk_part = converter.convert().unwrap();
1527
1528        assert_eq!(bulk_part.num_rows(), 3);
1529        assert_eq!(bulk_part.min_timestamp, 1000);
1530        assert_eq!(bulk_part.max_timestamp, 2000);
1531        assert_eq!(bulk_part.sequence, 2);
1532        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1533
1534        // Validate primary key columns are stored
1535        // Schema should include primary key columns k0 and k1 at the beginning
1536        let schema = bulk_part.batch.schema();
1537        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1538        assert_eq!(
1539            field_names,
1540            vec![
1541                "k0",
1542                "k1",
1543                "v0",
1544                "v1",
1545                "ts",
1546                "__primary_key",
1547                "__sequence",
1548                "__op_type"
1549            ]
1550        );
1551    }
1552
1553    #[test]
1554    fn test_bulk_part_converter_sorting() {
1555        let metadata = metadata_for_test();
1556        let capacity = 100;
1557        let primary_key_codec = build_primary_key_codec(&metadata);
1558        let schema = to_flat_sst_arrow_schema(
1559            &metadata,
1560            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1561        );
1562
1563        let mut converter =
1564            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1565
1566        let key_values1 = build_key_values_with_ts_seq_values(
1567            &metadata,
1568            "z_key".to_string(),
1569            3u32,
1570            vec![3000].into_iter(),
1571            vec![Some(3.0)].into_iter(),
1572            3,
1573        );
1574
1575        let key_values2 = build_key_values_with_ts_seq_values(
1576            &metadata,
1577            "a_key".to_string(),
1578            1u32,
1579            vec![1000].into_iter(),
1580            vec![Some(1.0)].into_iter(),
1581            1,
1582        );
1583
1584        let key_values3 = build_key_values_with_ts_seq_values(
1585            &metadata,
1586            "m_key".to_string(),
1587            2u32,
1588            vec![2000].into_iter(),
1589            vec![Some(2.0)].into_iter(),
1590            2,
1591        );
1592
1593        converter.append_key_values(&key_values1).unwrap();
1594        converter.append_key_values(&key_values2).unwrap();
1595        converter.append_key_values(&key_values3).unwrap();
1596
1597        let bulk_part = converter.convert().unwrap();
1598
1599        assert_eq!(bulk_part.num_rows(), 3);
1600
1601        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1602        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1603
1604        let ts_array = ts_column
1605            .as_any()
1606            .downcast_ref::<TimestampMillisecondArray>()
1607            .unwrap();
1608        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1609
1610        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1611        assert_eq!(seq_array.values(), &[1, 2, 3]);
1612
1613        // Validate primary key columns are stored
1614        let schema = bulk_part.batch.schema();
1615        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1616        assert_eq!(
1617            field_names,
1618            vec![
1619                "k0",
1620                "k1",
1621                "v0",
1622                "v1",
1623                "ts",
1624                "__primary_key",
1625                "__sequence",
1626                "__op_type"
1627            ]
1628        );
1629    }
1630
1631    #[test]
1632    fn test_bulk_part_converter_empty() {
1633        let metadata = metadata_for_test();
1634        let capacity = 10;
1635        let primary_key_codec = build_primary_key_codec(&metadata);
1636        let schema = to_flat_sst_arrow_schema(
1637            &metadata,
1638            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1639        );
1640
1641        let converter =
1642            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1643
1644        let bulk_part = converter.convert().unwrap();
1645
1646        assert_eq!(bulk_part.num_rows(), 0);
1647        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1648        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1649        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1650
1651        // Validate primary key columns are present in schema even for empty batch
1652        let schema = bulk_part.batch.schema();
1653        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1654        assert_eq!(
1655            field_names,
1656            vec![
1657                "k0",
1658                "k1",
1659                "v0",
1660                "v1",
1661                "ts",
1662                "__primary_key",
1663                "__sequence",
1664                "__op_type"
1665            ]
1666        );
1667    }
1668
1669    #[test]
1670    fn test_bulk_part_converter_without_primary_key_columns() {
1671        let metadata = metadata_for_test();
1672        let primary_key_codec = build_primary_key_codec(&metadata);
1673        let schema = to_flat_sst_arrow_schema(
1674            &metadata,
1675            &FlatSchemaOptions {
1676                raw_pk_columns: false,
1677                string_pk_use_dict: true,
1678            },
1679        );
1680
1681        let capacity = 100;
1682        let mut converter =
1683            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1684
1685        let key_values1 = build_key_values_with_ts_seq_values(
1686            &metadata,
1687            "key1".to_string(),
1688            1u32,
1689            vec![1000, 2000].into_iter(),
1690            vec![Some(1.0), Some(2.0)].into_iter(),
1691            1,
1692        );
1693
1694        let key_values2 = build_key_values_with_ts_seq_values(
1695            &metadata,
1696            "key2".to_string(),
1697            2u32,
1698            vec![1500].into_iter(),
1699            vec![Some(3.0)].into_iter(),
1700            2,
1701        );
1702
1703        converter.append_key_values(&key_values1).unwrap();
1704        converter.append_key_values(&key_values2).unwrap();
1705
1706        let bulk_part = converter.convert().unwrap();
1707
1708        assert_eq!(bulk_part.num_rows(), 3);
1709        assert_eq!(bulk_part.min_timestamp, 1000);
1710        assert_eq!(bulk_part.max_timestamp, 2000);
1711        assert_eq!(bulk_part.sequence, 2);
1712        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1713
1714        // Validate primary key columns are NOT stored individually
1715        let schema = bulk_part.batch.schema();
1716        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1717        assert_eq!(
1718            field_names,
1719            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1720        );
1721    }
1722
1723    #[allow(clippy::too_many_arguments)]
1724    fn build_key_values_with_sparse_encoding(
1725        metadata: &RegionMetadataRef,
1726        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1727        table_id: u32,
1728        tsid: u64,
1729        k0: String,
1730        k1: String,
1731        timestamps: impl Iterator<Item = i64>,
1732        values: impl Iterator<Item = Option<f64>>,
1733        sequence: SequenceNumber,
1734    ) -> KeyValues {
1735        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1736        let pk_values = vec![
1737            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1738            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1739            (0, Value::String(k0.clone().into())),
1740            (1, Value::String(k1.clone().into())),
1741        ];
1742        let mut encoded_key = Vec::new();
1743        primary_key_codec
1744            .encode_values(&pk_values, &mut encoded_key)
1745            .unwrap();
1746        assert!(!encoded_key.is_empty());
1747
1748        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1749        let column_schema = vec![
1750            api::v1::ColumnSchema {
1751                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1752                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1753                    ConcreteDataType::binary_datatype(),
1754                )
1755                .unwrap()
1756                .datatype() as i32,
1757                semantic_type: api::v1::SemanticType::Tag as i32,
1758                ..Default::default()
1759            },
1760            api::v1::ColumnSchema {
1761                column_name: "ts".to_string(),
1762                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1763                    ConcreteDataType::timestamp_millisecond_datatype(),
1764                )
1765                .unwrap()
1766                .datatype() as i32,
1767                semantic_type: api::v1::SemanticType::Timestamp as i32,
1768                ..Default::default()
1769            },
1770            api::v1::ColumnSchema {
1771                column_name: "v0".to_string(),
1772                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1773                    ConcreteDataType::int64_datatype(),
1774                )
1775                .unwrap()
1776                .datatype() as i32,
1777                semantic_type: api::v1::SemanticType::Field as i32,
1778                ..Default::default()
1779            },
1780            api::v1::ColumnSchema {
1781                column_name: "v1".to_string(),
1782                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1783                    ConcreteDataType::float64_datatype(),
1784                )
1785                .unwrap()
1786                .datatype() as i32,
1787                semantic_type: api::v1::SemanticType::Field as i32,
1788                ..Default::default()
1789            },
1790        ];
1791
1792        let rows = timestamps
1793            .zip(values)
1794            .map(|(ts, v)| Row {
1795                values: vec![
1796                    api::v1::Value {
1797                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1798                            encoded_key.clone(),
1799                        )),
1800                    },
1801                    api::v1::Value {
1802                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1803                    },
1804                    api::v1::Value {
1805                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1806                    },
1807                    api::v1::Value {
1808                        value_data: v.map(api::v1::value::ValueData::F64Value),
1809                    },
1810                ],
1811            })
1812            .collect();
1813
1814        let mutation = api::v1::Mutation {
1815            op_type: 1,
1816            sequence,
1817            rows: Some(api::v1::Rows {
1818                schema: column_schema,
1819                rows,
1820            }),
1821            write_hint: Some(WriteHint {
1822                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1823            }),
1824        };
1825        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1826    }
1827
1828    #[test]
1829    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1830        use api::v1::SemanticType;
1831        use datatypes::schema::ColumnSchema;
1832        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1833        use store_api::storage::RegionId;
1834
1835        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1836        builder
1837            .push_column_metadata(ColumnMetadata {
1838                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1839                semantic_type: SemanticType::Tag,
1840                column_id: 0,
1841            })
1842            .push_column_metadata(ColumnMetadata {
1843                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1844                semantic_type: SemanticType::Tag,
1845                column_id: 1,
1846            })
1847            .push_column_metadata(ColumnMetadata {
1848                column_schema: ColumnSchema::new(
1849                    "ts",
1850                    ConcreteDataType::timestamp_millisecond_datatype(),
1851                    false,
1852                ),
1853                semantic_type: SemanticType::Timestamp,
1854                column_id: 2,
1855            })
1856            .push_column_metadata(ColumnMetadata {
1857                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1858                semantic_type: SemanticType::Field,
1859                column_id: 3,
1860            })
1861            .push_column_metadata(ColumnMetadata {
1862                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1863                semantic_type: SemanticType::Field,
1864                column_id: 4,
1865            })
1866            .primary_key(vec![0, 1])
1867            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1868        let metadata = Arc::new(builder.build().unwrap());
1869
1870        let primary_key_codec = build_primary_key_codec(&metadata);
1871        let schema = to_flat_sst_arrow_schema(
1872            &metadata,
1873            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1874        );
1875
1876        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1877        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1878
1879        let capacity = 100;
1880        let mut converter =
1881            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1882
1883        let key_values1 = build_key_values_with_sparse_encoding(
1884            &metadata,
1885            &primary_key_codec,
1886            2048u32, // table_id
1887            100u64,  // tsid
1888            "key11".to_string(),
1889            "key21".to_string(),
1890            vec![1000, 2000].into_iter(),
1891            vec![Some(1.0), Some(2.0)].into_iter(),
1892            1,
1893        );
1894
1895        let key_values2 = build_key_values_with_sparse_encoding(
1896            &metadata,
1897            &primary_key_codec,
1898            4096u32, // table_id
1899            200u64,  // tsid
1900            "key12".to_string(),
1901            "key22".to_string(),
1902            vec![1500].into_iter(),
1903            vec![Some(3.0)].into_iter(),
1904            2,
1905        );
1906
1907        converter.append_key_values(&key_values1).unwrap();
1908        converter.append_key_values(&key_values2).unwrap();
1909
1910        let bulk_part = converter.convert().unwrap();
1911
1912        assert_eq!(bulk_part.num_rows(), 3);
1913        assert_eq!(bulk_part.min_timestamp, 1000);
1914        assert_eq!(bulk_part.max_timestamp, 2000);
1915        assert_eq!(bulk_part.sequence, 2);
1916        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1917
1918        // For sparse encoding, primary key columns should NOT be stored individually
1919        // even when store_primary_key_columns is true, because sparse encoding
1920        // stores the encoded primary key in the __primary_key column
1921        let schema = bulk_part.batch.schema();
1922        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1923        assert_eq!(
1924            field_names,
1925            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1926        );
1927
1928        // Verify the __primary_key column contains encoded sparse keys
1929        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1930        let dict_array = primary_key_column
1931            .as_any()
1932            .downcast_ref::<DictionaryArray<UInt32Type>>()
1933            .unwrap();
1934
1935        // Should have non-zero entries indicating encoded primary keys
1936        assert!(!dict_array.is_empty());
1937        assert_eq!(dict_array.len(), 3); // 3 rows total
1938
1939        // Verify values are properly encoded binary data (not empty)
1940        let values = dict_array
1941            .values()
1942            .as_any()
1943            .downcast_ref::<BinaryArray>()
1944            .unwrap();
1945        for i in 0..values.len() {
1946            assert!(
1947                !values.value(i).is_empty(),
1948                "Encoded primary key should not be empty"
1949            );
1950        }
1951    }
1952
1953    #[test]
1954    fn test_convert_bulk_part_empty() {
1955        let metadata = metadata_for_test();
1956        let schema = to_flat_sst_arrow_schema(
1957            &metadata,
1958            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1959        );
1960        let primary_key_codec = build_primary_key_codec(&metadata);
1961
1962        // Create empty batch
1963        let empty_batch = RecordBatch::new_empty(schema.clone());
1964        let empty_part = BulkPart {
1965            batch: empty_batch,
1966            max_timestamp: 0,
1967            min_timestamp: 0,
1968            sequence: 0,
1969            timestamp_index: 0,
1970            raw_data: None,
1971        };
1972
1973        let result =
1974            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
1975        assert!(result.is_none());
1976    }
1977
1978    #[test]
1979    fn test_convert_bulk_part_dense_with_pk_columns() {
1980        let metadata = metadata_for_test();
1981        let primary_key_codec = build_primary_key_codec(&metadata);
1982
1983        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
1984            "key1", "key2", "key1",
1985        ]));
1986        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
1987        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
1988        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
1989        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
1990
1991        let input_schema = Arc::new(Schema::new(vec![
1992            Field::new("k0", ArrowDataType::Utf8, false),
1993            Field::new("k1", ArrowDataType::UInt32, false),
1994            Field::new("v0", ArrowDataType::Int64, true),
1995            Field::new("v1", ArrowDataType::Float64, true),
1996            Field::new(
1997                "ts",
1998                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1999                false,
2000            ),
2001        ]));
2002
2003        let input_batch = RecordBatch::try_new(
2004            input_schema,
2005            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2006        )
2007        .unwrap();
2008
2009        let part = BulkPart {
2010            batch: input_batch,
2011            max_timestamp: 2000,
2012            min_timestamp: 1000,
2013            sequence: 5,
2014            timestamp_index: 4,
2015            raw_data: None,
2016        };
2017
2018        let output_schema = to_flat_sst_arrow_schema(
2019            &metadata,
2020            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2021        );
2022
2023        let result = convert_bulk_part(
2024            part,
2025            &metadata,
2026            primary_key_codec,
2027            output_schema,
2028            true, // store primary key columns
2029        )
2030        .unwrap();
2031
2032        let converted = result.unwrap();
2033
2034        assert_eq!(converted.num_rows(), 3);
2035        assert_eq!(converted.max_timestamp, 2000);
2036        assert_eq!(converted.min_timestamp, 1000);
2037        assert_eq!(converted.sequence, 5);
2038
2039        let schema = converted.batch.schema();
2040        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2041        assert_eq!(
2042            field_names,
2043            vec![
2044                "k0",
2045                "k1",
2046                "v0",
2047                "v1",
2048                "ts",
2049                "__primary_key",
2050                "__sequence",
2051                "__op_type"
2052            ]
2053        );
2054
2055        let k0_col = converted.batch.column_by_name("k0").unwrap();
2056        assert!(matches!(
2057            k0_col.data_type(),
2058            ArrowDataType::Dictionary(_, _)
2059        ));
2060
2061        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2062        let dict_array = pk_col
2063            .as_any()
2064            .downcast_ref::<DictionaryArray<UInt32Type>>()
2065            .unwrap();
2066        let keys = dict_array.keys();
2067
2068        assert_eq!(keys.len(), 3);
2069    }
2070
2071    #[test]
2072    fn test_convert_bulk_part_dense_without_pk_columns() {
2073        let metadata = metadata_for_test();
2074        let primary_key_codec = build_primary_key_codec(&metadata);
2075
2076        // Create input batch with primary key columns (k0, k1)
2077        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2078        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2079        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2080        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2081        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2082
2083        let input_schema = Arc::new(Schema::new(vec![
2084            Field::new("k0", ArrowDataType::Utf8, false),
2085            Field::new("k1", ArrowDataType::UInt32, false),
2086            Field::new("v0", ArrowDataType::Int64, true),
2087            Field::new("v1", ArrowDataType::Float64, true),
2088            Field::new(
2089                "ts",
2090                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2091                false,
2092            ),
2093        ]));
2094
2095        let input_batch = RecordBatch::try_new(
2096            input_schema,
2097            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2098        )
2099        .unwrap();
2100
2101        let part = BulkPart {
2102            batch: input_batch,
2103            max_timestamp: 2000,
2104            min_timestamp: 1000,
2105            sequence: 3,
2106            timestamp_index: 4,
2107            raw_data: None,
2108        };
2109
2110        let output_schema = to_flat_sst_arrow_schema(
2111            &metadata,
2112            &FlatSchemaOptions {
2113                raw_pk_columns: false,
2114                string_pk_use_dict: true,
2115            },
2116        );
2117
2118        let result = convert_bulk_part(
2119            part,
2120            &metadata,
2121            primary_key_codec,
2122            output_schema,
2123            false, // don't store primary key columns
2124        )
2125        .unwrap();
2126
2127        let converted = result.unwrap();
2128
2129        assert_eq!(converted.num_rows(), 2);
2130        assert_eq!(converted.max_timestamp, 2000);
2131        assert_eq!(converted.min_timestamp, 1000);
2132        assert_eq!(converted.sequence, 3);
2133
2134        // Verify schema does NOT include individual primary key columns
2135        let schema = converted.batch.schema();
2136        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2137        assert_eq!(
2138            field_names,
2139            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2140        );
2141
2142        // Verify __primary_key column is present and is a dictionary
2143        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2144        assert!(matches!(
2145            pk_col.data_type(),
2146            ArrowDataType::Dictionary(_, _)
2147        ));
2148    }
2149
2150    #[test]
2151    fn test_convert_bulk_part_sparse_encoding() {
2152        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2153        builder
2154            .push_column_metadata(ColumnMetadata {
2155                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2156                semantic_type: SemanticType::Tag,
2157                column_id: 0,
2158            })
2159            .push_column_metadata(ColumnMetadata {
2160                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2161                semantic_type: SemanticType::Tag,
2162                column_id: 1,
2163            })
2164            .push_column_metadata(ColumnMetadata {
2165                column_schema: ColumnSchema::new(
2166                    "ts",
2167                    ConcreteDataType::timestamp_millisecond_datatype(),
2168                    false,
2169                ),
2170                semantic_type: SemanticType::Timestamp,
2171                column_id: 2,
2172            })
2173            .push_column_metadata(ColumnMetadata {
2174                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2175                semantic_type: SemanticType::Field,
2176                column_id: 3,
2177            })
2178            .push_column_metadata(ColumnMetadata {
2179                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2180                semantic_type: SemanticType::Field,
2181                column_id: 4,
2182            })
2183            .primary_key(vec![0, 1])
2184            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2185        let metadata = Arc::new(builder.build().unwrap());
2186
2187        let primary_key_codec = build_primary_key_codec(&metadata);
2188
2189        // Create input batch with __primary_key column (sparse encoding)
2190        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2191            b"encoded_key_1".as_slice(),
2192            b"encoded_key_2".as_slice(),
2193        ]));
2194        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2195        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2196        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2197
2198        let input_schema = Arc::new(Schema::new(vec![
2199            Field::new("__primary_key", ArrowDataType::Binary, false),
2200            Field::new("v0", ArrowDataType::Int64, true),
2201            Field::new("v1", ArrowDataType::Float64, true),
2202            Field::new(
2203                "ts",
2204                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2205                false,
2206            ),
2207        ]));
2208
2209        let input_batch =
2210            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2211                .unwrap();
2212
2213        let part = BulkPart {
2214            batch: input_batch,
2215            max_timestamp: 2000,
2216            min_timestamp: 1000,
2217            sequence: 7,
2218            timestamp_index: 3,
2219            raw_data: None,
2220        };
2221
2222        let output_schema = to_flat_sst_arrow_schema(
2223            &metadata,
2224            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2225        );
2226
2227        let result = convert_bulk_part(
2228            part,
2229            &metadata,
2230            primary_key_codec,
2231            output_schema,
2232            true, // store_primary_key_columns (ignored for sparse)
2233        )
2234        .unwrap();
2235
2236        let converted = result.unwrap();
2237
2238        assert_eq!(converted.num_rows(), 2);
2239        assert_eq!(converted.max_timestamp, 2000);
2240        assert_eq!(converted.min_timestamp, 1000);
2241        assert_eq!(converted.sequence, 7);
2242
2243        // Verify schema does NOT include individual primary key columns (sparse encoding)
2244        let schema = converted.batch.schema();
2245        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2246        assert_eq!(
2247            field_names,
2248            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2249        );
2250
2251        // Verify __primary_key is dictionary encoded
2252        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2253        assert!(matches!(
2254            pk_col.data_type(),
2255            ArrowDataType::Dictionary(_, _)
2256        ));
2257    }
2258
2259    #[test]
2260    fn test_convert_bulk_part_sorting_with_multiple_series() {
2261        let metadata = metadata_for_test();
2262        let primary_key_codec = build_primary_key_codec(&metadata);
2263
2264        // Create unsorted batch with multiple series and timestamps
2265        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2266            "series_b", "series_a", "series_b", "series_a",
2267        ]));
2268        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2269        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2270        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2271        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2272            2000, 1000, 4000, 3000,
2273        ]));
2274
2275        let input_schema = Arc::new(Schema::new(vec![
2276            Field::new("k0", ArrowDataType::Utf8, false),
2277            Field::new("k1", ArrowDataType::UInt32, false),
2278            Field::new("v0", ArrowDataType::Int64, true),
2279            Field::new("v1", ArrowDataType::Float64, true),
2280            Field::new(
2281                "ts",
2282                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2283                false,
2284            ),
2285        ]));
2286
2287        let input_batch = RecordBatch::try_new(
2288            input_schema,
2289            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2290        )
2291        .unwrap();
2292
2293        let part = BulkPart {
2294            batch: input_batch,
2295            max_timestamp: 4000,
2296            min_timestamp: 1000,
2297            sequence: 10,
2298            timestamp_index: 4,
2299            raw_data: None,
2300        };
2301
2302        let output_schema = to_flat_sst_arrow_schema(
2303            &metadata,
2304            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2305        );
2306
2307        let result =
2308            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2309
2310        let converted = result.unwrap();
2311
2312        assert_eq!(converted.num_rows(), 4);
2313
2314        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2315        let ts_col = converted.batch.column(converted.timestamp_index);
2316        let ts_array = ts_col
2317            .as_any()
2318            .downcast_ref::<TimestampMillisecondArray>()
2319            .unwrap();
2320
2321        // After sorting by (pk, ts), we should have:
2322        // series_a,1: ts=1000, 3000
2323        // series_b,2: ts=2000, 4000
2324        let timestamps: Vec<i64> = ts_array.values().to_vec();
2325        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2326    }
2327}