mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{
38    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
39};
40use datatypes::arrow_array::BinaryArray;
41use datatypes::data_type::DataType;
42use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
43use datatypes::value::{Value, ValueRef};
44use datatypes::vectors::Helper;
45use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
46use mito_codec::row_converter::{
47    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
48};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::data_type::AsBytes;
52use parquet::file::metadata::ParquetMetaData;
53use parquet::file::properties::WriterProperties;
54use smallvec::SmallVec;
55use snafu::{OptionExt, ResultExt, Snafu};
56use store_api::codec::PrimaryKeyEncoding;
57use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
58use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
59use store_api::storage::{FileId, RegionId, SequenceNumber, SequenceRange};
60use table::predicate::Predicate;
61
62use crate::error::{
63    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
64    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
65    InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
66};
67use crate::memtable::bulk::context::BulkIterContextRef;
68use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
69use crate::memtable::time_series::{ValueBuilder, Values};
70use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
71use crate::sst::index::IndexOutput;
72use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
73use crate::sst::parquet::flat_format::primary_key_column_index;
74use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
75use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
76use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
77
78const INIT_DICT_VALUE_CAPACITY: usize = 8;
79
80/// A raw bulk part in the memtable.
81#[derive(Clone)]
82pub struct BulkPart {
83    pub batch: RecordBatch,
84    pub max_timestamp: i64,
85    pub min_timestamp: i64,
86    pub sequence: u64,
87    pub timestamp_index: usize,
88    pub raw_data: Option<ArrowIpc>,
89}
90
91impl TryFrom<BulkWalEntry> for BulkPart {
92    type Error = error::Error;
93
94    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
95        match value.body.expect("Entry payload should be present") {
96            Body::ArrowIpc(ipc) => {
97                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                let batch = decoder
100                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
101                    .context(error::ConvertBulkWalEntrySnafu)?;
102                Ok(Self {
103                    batch,
104                    max_timestamp: value.max_ts,
105                    min_timestamp: value.min_ts,
106                    sequence: value.sequence,
107                    timestamp_index: value.timestamp_index as usize,
108                    raw_data: Some(ipc),
109                })
110            }
111        }
112    }
113}
114
115impl TryFrom<&BulkPart> for BulkWalEntry {
116    type Error = error::Error;
117
118    fn try_from(value: &BulkPart) -> Result<Self> {
119        if let Some(ipc) = &value.raw_data {
120            Ok(BulkWalEntry {
121                sequence: value.sequence,
122                max_ts: value.max_timestamp,
123                min_ts: value.min_timestamp,
124                timestamp_index: value.timestamp_index as u32,
125                body: Some(Body::ArrowIpc(ipc.clone())),
126            })
127        } else {
128            let mut encoder = FlightEncoder::default();
129            let schema_bytes = encoder
130                .encode_schema(value.batch.schema().as_ref())
131                .data_header;
132            let [rb_data] = encoder
133                .encode(FlightMessage::RecordBatch(value.batch.clone()))
134                .try_into()
135                .map_err(|_| {
136                    error::UnsupportedOperationSnafu {
137                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
138                    }
139                    .build()
140                })?;
141            Ok(BulkWalEntry {
142                sequence: value.sequence,
143                max_ts: value.max_timestamp,
144                min_ts: value.min_timestamp,
145                timestamp_index: value.timestamp_index as u32,
146                body: Some(Body::ArrowIpc(ArrowIpc {
147                    schema: schema_bytes,
148                    data_header: rb_data.data_header,
149                    payload: rb_data.data_body,
150                })),
151            })
152        }
153    }
154}
155
156impl BulkPart {
157    pub(crate) fn estimated_size(&self) -> usize {
158        record_batch_estimated_size(&self.batch)
159    }
160
161    /// Returns the estimated series count in this BulkPart.
162    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
163    pub fn estimated_series_count(&self) -> usize {
164        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
165        let pk_column = self.batch.column(pk_column_idx);
166        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
167            dict_array.values().len()
168        } else {
169            0
170        }
171    }
172
173    /// Creates MemtableStats from this BulkPart.
174    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
175        let ts_type = region_metadata.time_index_type();
176        let min_ts = ts_type.create_timestamp(self.min_timestamp);
177        let max_ts = ts_type.create_timestamp(self.max_timestamp);
178
179        MemtableStats {
180            estimated_bytes: self.estimated_size(),
181            time_range: Some((min_ts, max_ts)),
182            num_rows: self.num_rows(),
183            num_ranges: 1,
184            max_sequence: self.sequence,
185            series_count: self.estimated_series_count(),
186        }
187    }
188
189    /// Fills missing columns in the BulkPart batch with default values.
190    ///
191    /// This function checks if the batch schema matches the region metadata schema,
192    /// and if there are missing columns, it fills them with default values (or null
193    /// for nullable columns).
194    ///
195    /// # Arguments
196    ///
197    /// * `region_metadata` - The region metadata containing the expected schema
198    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
199        // Builds a map of existing columns in the batch
200        let batch_schema = self.batch.schema();
201        let batch_columns: HashSet<_> = batch_schema
202            .fields()
203            .iter()
204            .map(|f| f.name().as_str())
205            .collect();
206
207        // Finds columns that need to be filled
208        let mut columns_to_fill = Vec::new();
209        for column_meta in &region_metadata.column_metadatas {
210            // TODO(yingwen): Returns error if it is impure default after we support filling
211            // bulk insert request in the frontend
212            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
213                columns_to_fill.push(column_meta);
214            }
215        }
216
217        if columns_to_fill.is_empty() {
218            return Ok(());
219        }
220
221        let num_rows = self.batch.num_rows();
222
223        let mut new_columns = Vec::new();
224        let mut new_fields = Vec::new();
225
226        // First, adds all existing columns
227        new_fields.extend(batch_schema.fields().iter().cloned());
228        new_columns.extend_from_slice(self.batch.columns());
229
230        let region_id = region_metadata.region_id;
231        // Then adds the missing columns with default values
232        for column_meta in columns_to_fill {
233            let default_vector = column_meta
234                .column_schema
235                .create_default_vector(num_rows)
236                .context(CreateDefaultSnafu {
237                    region_id,
238                    column: &column_meta.column_schema.name,
239                })?
240                .with_context(|| InvalidRequestSnafu {
241                    region_id,
242                    reason: format!(
243                        "column {} does not have default value",
244                        column_meta.column_schema.name
245                    ),
246                })?;
247            let arrow_array = default_vector.to_arrow_array();
248            column_meta.column_schema.data_type.as_arrow_type();
249
250            new_fields.push(Arc::new(Field::new(
251                column_meta.column_schema.name.clone(),
252                column_meta.column_schema.data_type.as_arrow_type(),
253                column_meta.column_schema.is_nullable(),
254            )));
255            new_columns.push(arrow_array);
256        }
257
258        // Create a new schema and batch with the filled columns
259        let new_schema = Arc::new(Schema::new(new_fields));
260        let new_batch =
261            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
262
263        // Update the batch
264        self.batch = new_batch;
265
266        Ok(())
267    }
268
269    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
270    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
271        let vectors = region_metadata
272            .schema
273            .column_schemas()
274            .iter()
275            .map(|col| match self.batch.column_by_name(&col.name) {
276                None => Ok(None),
277                Some(col) => Helper::try_into_vector(col).map(Some),
278            })
279            .collect::<datatypes::error::Result<Vec<_>>>()
280            .context(error::ComputeVectorSnafu)?;
281
282        let rows = (0..self.num_rows())
283            .map(|row_idx| {
284                let values = (0..self.batch.num_columns())
285                    .map(|col_idx| {
286                        if let Some(v) = &vectors[col_idx] {
287                            to_grpc_value(v.get(row_idx))
288                        } else {
289                            api::v1::Value { value_data: None }
290                        }
291                    })
292                    .collect::<Vec<_>>();
293                api::v1::Row { values }
294            })
295            .collect::<Vec<_>>();
296
297        let schema = region_metadata
298            .column_metadatas
299            .iter()
300            .map(|c| {
301                let data_type_wrapper =
302                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
303                Ok(api::v1::ColumnSchema {
304                    column_name: c.column_schema.name.clone(),
305                    datatype: data_type_wrapper.datatype() as i32,
306                    semantic_type: c.semantic_type as i32,
307                    ..Default::default()
308                })
309            })
310            .collect::<api::error::Result<Vec<_>>>()
311            .context(error::ConvertColumnDataTypeSnafu {
312                reason: "failed to convert region metadata to column schema",
313            })?;
314
315        let rows = api::v1::Rows { schema, rows };
316
317        Ok(Mutation {
318            op_type: OpType::Put as i32,
319            sequence: self.sequence,
320            rows: Some(rows),
321            write_hint: None,
322        })
323    }
324
325    pub fn timestamps(&self) -> &ArrayRef {
326        self.batch.column(self.timestamp_index)
327    }
328
329    pub fn num_rows(&self) -> usize {
330        self.batch.num_rows()
331    }
332}
333
334/// A collection of small unordered bulk parts.
335/// Used to batch small parts together before merging them into a sorted part.
336pub struct UnorderedPart {
337    /// Small bulk parts that haven't been sorted yet.
338    parts: Vec<BulkPart>,
339    /// Total number of rows across all parts.
340    total_rows: usize,
341    /// Minimum timestamp across all parts.
342    min_timestamp: i64,
343    /// Maximum timestamp across all parts.
344    max_timestamp: i64,
345    /// Maximum sequence number across all parts.
346    max_sequence: u64,
347    /// Row count threshold for accepting parts (default: 1024).
348    threshold: usize,
349    /// Row count threshold for compacting (default: 4096).
350    compact_threshold: usize,
351}
352
353impl Default for UnorderedPart {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359impl UnorderedPart {
360    /// Creates a new empty UnorderedPart.
361    pub fn new() -> Self {
362        Self {
363            parts: Vec::new(),
364            total_rows: 0,
365            min_timestamp: i64::MAX,
366            max_timestamp: i64::MIN,
367            max_sequence: 0,
368            threshold: 1024,
369            compact_threshold: 4096,
370        }
371    }
372
373    /// Sets the threshold for accepting parts into unordered_part.
374    pub fn set_threshold(&mut self, threshold: usize) {
375        self.threshold = threshold;
376    }
377
378    /// Sets the threshold for compacting unordered_part.
379    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
380        self.compact_threshold = compact_threshold;
381    }
382
383    /// Returns the threshold for accepting parts.
384    pub fn threshold(&self) -> usize {
385        self.threshold
386    }
387
388    /// Returns the compact threshold.
389    pub fn compact_threshold(&self) -> usize {
390        self.compact_threshold
391    }
392
393    /// Returns true if this part should accept the given row count.
394    pub fn should_accept(&self, num_rows: usize) -> bool {
395        num_rows < self.threshold
396    }
397
398    /// Returns true if this part should be compacted.
399    pub fn should_compact(&self) -> bool {
400        self.total_rows >= self.compact_threshold
401    }
402
403    /// Adds a BulkPart to this unordered collection.
404    pub fn push(&mut self, part: BulkPart) {
405        self.total_rows += part.num_rows();
406        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
407        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
408        self.max_sequence = self.max_sequence.max(part.sequence);
409        self.parts.push(part);
410    }
411
412    /// Returns the total number of rows across all parts.
413    pub fn num_rows(&self) -> usize {
414        self.total_rows
415    }
416
417    /// Returns true if there are no parts.
418    pub fn is_empty(&self) -> bool {
419        self.parts.is_empty()
420    }
421
422    /// Returns the number of parts in this collection.
423    pub fn num_parts(&self) -> usize {
424        self.parts.len()
425    }
426
427    /// Concatenates and sorts all parts into a single RecordBatch.
428    /// Returns None if the collection is empty.
429    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
430        if self.parts.is_empty() {
431            return Ok(None);
432        }
433
434        if self.parts.len() == 1 {
435            // If there's only one part, return its batch directly
436            return Ok(Some(self.parts[0].batch.clone()));
437        }
438
439        // Get the schema from the first part
440        let schema = self.parts[0].batch.schema();
441
442        // Concatenate all record batches
443        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
444        let concatenated =
445            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
446
447        // Sort the concatenated batch
448        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
449
450        Ok(Some(sorted_batch))
451    }
452
453    /// Converts all parts into a single sorted BulkPart.
454    /// Returns None if the collection is empty.
455    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
456        let Some(sorted_batch) = self.concat_and_sort()? else {
457            return Ok(None);
458        };
459
460        let timestamp_index = self.parts[0].timestamp_index;
461
462        Ok(Some(BulkPart {
463            batch: sorted_batch,
464            max_timestamp: self.max_timestamp,
465            min_timestamp: self.min_timestamp,
466            sequence: self.max_sequence,
467            timestamp_index,
468            raw_data: None,
469        }))
470    }
471
472    /// Clears all parts from this collection.
473    pub fn clear(&mut self) {
474        self.parts.clear();
475        self.total_rows = 0;
476        self.min_timestamp = i64::MAX;
477        self.max_timestamp = i64::MIN;
478        self.max_sequence = 0;
479    }
480}
481
482/// More accurate estimation of the size of a record batch.
483pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
484    batch
485        .columns()
486        .iter()
487        // If can not get slice memory size, assume 0 here.
488        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
489        .sum()
490}
491
492/// Primary key column builder for handling strings specially.
493enum PrimaryKeyColumnBuilder {
494    /// String dictionary builder for string types.
495    StringDict(StringDictionaryBuilder<UInt32Type>),
496    /// Generic mutable vector for other types.
497    Vector(Box<dyn MutableVector>),
498}
499
500impl PrimaryKeyColumnBuilder {
501    /// Appends a value to the builder.
502    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
503        match self {
504            PrimaryKeyColumnBuilder::StringDict(builder) => {
505                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
506                    // We know the value is a string.
507                    builder.append_value(s);
508                } else {
509                    builder.append_null();
510                }
511            }
512            PrimaryKeyColumnBuilder::Vector(builder) => {
513                builder.push_value_ref(&value);
514            }
515        }
516        Ok(())
517    }
518
519    /// Converts the builder to an ArrayRef.
520    fn into_arrow_array(self) -> ArrayRef {
521        match self {
522            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
523            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
524        }
525    }
526}
527
528/// Converter that converts structs into [BulkPart].
529pub struct BulkPartConverter {
530    /// Region metadata.
531    region_metadata: RegionMetadataRef,
532    /// Schema of the converted batch.
533    schema: SchemaRef,
534    /// Primary key codec for encoding keys
535    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
536    /// Buffer for encoding primary key.
537    key_buf: Vec<u8>,
538    /// Primary key array builder.
539    key_array_builder: PrimaryKeyArrayBuilder,
540    /// Builders for non-primary key columns.
541    value_builder: ValueBuilder,
542    /// Builders for individual primary key columns.
543    /// The order of builders is the same as the order of primary key columns in the region metadata.
544    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
545
546    /// Max timestamp value.
547    max_ts: i64,
548    /// Min timestamp value.
549    min_ts: i64,
550    /// Max sequence number.
551    max_sequence: SequenceNumber,
552}
553
554impl BulkPartConverter {
555    /// Creates a new converter.
556    ///
557    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
558    /// stores primary key columns in arrays additionally.
559    pub fn new(
560        region_metadata: &RegionMetadataRef,
561        schema: SchemaRef,
562        capacity: usize,
563        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
564        store_primary_key_columns: bool,
565    ) -> Self {
566        debug_assert_eq!(
567            region_metadata.primary_key_encoding,
568            primary_key_codec.encoding()
569        );
570
571        let primary_key_column_builders = if store_primary_key_columns
572            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
573        {
574            new_primary_key_column_builders(region_metadata, capacity)
575        } else {
576            Vec::new()
577        };
578
579        Self {
580            region_metadata: region_metadata.clone(),
581            schema,
582            primary_key_codec,
583            key_buf: Vec::new(),
584            key_array_builder: PrimaryKeyArrayBuilder::new(),
585            value_builder: ValueBuilder::new(region_metadata, capacity),
586            primary_key_column_builders,
587            min_ts: i64::MAX,
588            max_ts: i64::MIN,
589            max_sequence: SequenceNumber::MIN,
590        }
591    }
592
593    /// Appends a [KeyValues] into the converter.
594    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595        for kv in key_values.iter() {
596            self.append_key_value(&kv)?;
597        }
598
599        Ok(())
600    }
601
602    /// Appends a [KeyValue] to builders.
603    ///
604    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
605    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606        // Handles primary key based on encoding type
607        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608            // For sparse encoding, the primary key is already encoded in the KeyValue
609            // Gets the first (and only) primary key value which contains the encoded key
610            let mut primary_keys = kv.primary_keys();
611            if let Some(encoded) = primary_keys
612                .next()
613                .context(ColumnNotFoundSnafu {
614                    column: PRIMARY_KEY_COLUMN_NAME,
615                })?
616                .try_into_binary()
617                .context(DataTypeMismatchSnafu)?
618            {
619                self.key_array_builder
620                    .append(encoded)
621                    .context(ComputeArrowSnafu)?;
622            } else {
623                self.key_array_builder
624                    .append("")
625                    .context(ComputeArrowSnafu)?;
626            }
627        } else {
628            // For dense encoding, we need to encode the primary key columns
629            self.key_buf.clear();
630            self.primary_key_codec
631                .encode_key_value(kv, &mut self.key_buf)
632                .context(EncodeSnafu)?;
633            self.key_array_builder
634                .append(&self.key_buf)
635                .context(ComputeArrowSnafu)?;
636        };
637
638        // If storing primary key columns, append values to individual builders
639        if !self.primary_key_column_builders.is_empty() {
640            for (builder, pk_value) in self
641                .primary_key_column_builders
642                .iter_mut()
643                .zip(kv.primary_keys())
644            {
645                builder.push_value_ref(pk_value)?;
646            }
647        }
648
649        // Pushes other columns.
650        self.value_builder.push(
651            kv.timestamp(),
652            kv.sequence(),
653            kv.op_type() as u8,
654            kv.fields(),
655        );
656
657        // Updates statistics
658        // Safety: timestamp of kv must be both present and a valid timestamp value.
659        let ts = kv
660            .timestamp()
661            .try_into_timestamp()
662            .unwrap()
663            .unwrap()
664            .value();
665        self.min_ts = self.min_ts.min(ts);
666        self.max_ts = self.max_ts.max(ts);
667        self.max_sequence = self.max_sequence.max(kv.sequence());
668
669        Ok(())
670    }
671
672    /// Converts buffered content into a [BulkPart].
673    ///
674    /// It sorts the record batch by (primary key, timestamp, sequence desc).
675    pub fn convert(mut self) -> Result<BulkPart> {
676        let values = Values::from(self.value_builder);
677        let mut columns =
678            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680        // Build primary key column arrays if enabled.
681        for builder in self.primary_key_column_builders {
682            columns.push(builder.into_arrow_array());
683        }
684        // Then fields columns.
685        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686        // Time index.
687        let timestamp_index = columns.len();
688        columns.push(values.timestamp.to_arrow_array());
689        // Primary key.
690        let pk_array = self.key_array_builder.finish();
691        columns.push(Arc::new(pk_array));
692        // Sequence and op type.
693        columns.push(values.sequence.to_arrow_array());
694        columns.push(values.op_type.to_arrow_array());
695
696        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
697        // Sorts the record batch.
698        let batch = sort_primary_key_record_batch(&batch)?;
699
700        Ok(BulkPart {
701            batch,
702            max_timestamp: self.max_ts,
703            min_timestamp: self.min_ts,
704            sequence: self.max_sequence,
705            timestamp_index,
706            raw_data: None,
707        })
708    }
709}
710
711fn new_primary_key_column_builders(
712    metadata: &RegionMetadata,
713    capacity: usize,
714) -> Vec<PrimaryKeyColumnBuilder> {
715    metadata
716        .primary_key_columns()
717        .map(|col| {
718            if col.column_schema.data_type.is_string() {
719                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
720                    capacity,
721                    INIT_DICT_VALUE_CAPACITY,
722                    capacity,
723                ))
724            } else {
725                PrimaryKeyColumnBuilder::Vector(
726                    col.column_schema.data_type.create_mutable_vector(capacity),
727                )
728            }
729        })
730        .collect()
731}
732
733/// Sorts the record batch with primary key format.
734pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
735    let total_columns = batch.num_columns();
736    let sort_columns = vec![
737        // Primary key column (ascending)
738        SortColumn {
739            values: batch.column(total_columns - 3).clone(),
740            options: Some(SortOptions {
741                descending: false,
742                nulls_first: true,
743            }),
744        },
745        // Time index column (ascending)
746        SortColumn {
747            values: batch.column(total_columns - 4).clone(),
748            options: Some(SortOptions {
749                descending: false,
750                nulls_first: true,
751            }),
752        },
753        // Sequence column (descending)
754        SortColumn {
755            values: batch.column(total_columns - 2).clone(),
756            options: Some(SortOptions {
757                descending: true,
758                nulls_first: true,
759            }),
760        },
761    ];
762
763    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
764        .context(ComputeArrowSnafu)?;
765
766    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
767}
768
769/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
770/// with the same format as produced by [BulkPartConverter].
771///
772/// This function takes a `BulkPart` where:
773/// - For dense encoding: Primary key columns may be stored as individual columns
774/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
775/// - The batch may not be sorted
776///
777/// And produces a `BulkPart` where:
778/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
779/// - An encoded `__primary_key` dictionary column is present
780/// - The batch is sorted by (primary_key, timestamp, sequence desc)
781///
782/// # Arguments
783///
784/// * `part` - The input `BulkPart` to convert
785/// * `region_metadata` - Region metadata containing schema information
786/// * `primary_key_codec` - Codec for encoding primary keys
787/// * `schema` - Target schema for the output batch
788/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
789///
790/// # Returns
791///
792/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
793/// encoded primary keys and sorted data.
794pub fn convert_bulk_part(
795    part: BulkPart,
796    region_metadata: &RegionMetadataRef,
797    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
798    schema: SchemaRef,
799    store_primary_key_columns: bool,
800) -> Result<Option<BulkPart>> {
801    if part.num_rows() == 0 {
802        return Ok(None);
803    }
804
805    let num_rows = part.num_rows();
806    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
807
808    // Builds a column name-to-index map for efficient lookups
809    let input_schema = part.batch.schema();
810    let column_indices: HashMap<&str, usize> = input_schema
811        .fields()
812        .iter()
813        .enumerate()
814        .map(|(idx, field)| (field.name().as_str(), idx))
815        .collect();
816
817    // Determines the structure of the input batch by looking up columns by name
818    let mut output_columns = Vec::new();
819
820    // Extracts primary key columns if we need to encode them (dense encoding)
821    let pk_array = if is_sparse {
822        // For sparse encoding, the input should already have the __primary_key column
823        // We need to find it in the input batch
824        None
825    } else {
826        // For dense encoding, extract and encode primary key columns by name
827        let pk_vectors: Result<Vec<_>> = region_metadata
828            .primary_key_columns()
829            .map(|col_meta| {
830                let col_idx = column_indices
831                    .get(col_meta.column_schema.name.as_str())
832                    .context(ColumnNotFoundSnafu {
833                        column: &col_meta.column_schema.name,
834                    })?;
835                let col = part.batch.column(*col_idx);
836                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
837            })
838            .collect();
839        let pk_vectors = pk_vectors?;
840
841        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
842        let mut encode_buf = Vec::new();
843
844        for row_idx in 0..num_rows {
845            encode_buf.clear();
846
847            // Collects primary key values with column IDs for this row
848            let pk_values_with_ids: Vec<_> = region_metadata
849                .primary_key
850                .iter()
851                .zip(pk_vectors.iter())
852                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
853                .collect();
854
855            // Encodes the primary key
856            primary_key_codec
857                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
858                .context(EncodeSnafu)?;
859
860            key_array_builder
861                .append(&encode_buf)
862                .context(ComputeArrowSnafu)?;
863        }
864
865        Some(key_array_builder.finish())
866    };
867
868    // Adds primary key columns if storing them (only for dense encoding)
869    if store_primary_key_columns && !is_sparse {
870        for col_meta in region_metadata.primary_key_columns() {
871            let col_idx = column_indices
872                .get(col_meta.column_schema.name.as_str())
873                .context(ColumnNotFoundSnafu {
874                    column: &col_meta.column_schema.name,
875                })?;
876            let col = part.batch.column(*col_idx);
877
878            // Converts to dictionary if needed for string types
879            let col = if col_meta.column_schema.data_type.is_string() {
880                let target_type = ArrowDataType::Dictionary(
881                    Box::new(ArrowDataType::UInt32),
882                    Box::new(ArrowDataType::Utf8),
883                );
884                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
885            } else {
886                col.clone()
887            };
888            output_columns.push(col);
889        }
890    }
891
892    // Adds field columns
893    for col_meta in region_metadata.field_columns() {
894        let col_idx = column_indices
895            .get(col_meta.column_schema.name.as_str())
896            .context(ColumnNotFoundSnafu {
897                column: &col_meta.column_schema.name,
898            })?;
899        output_columns.push(part.batch.column(*col_idx).clone());
900    }
901
902    // Adds timestamp column
903    let new_timestamp_index = output_columns.len();
904    let ts_col_idx = column_indices
905        .get(
906            region_metadata
907                .time_index_column()
908                .column_schema
909                .name
910                .as_str(),
911        )
912        .context(ColumnNotFoundSnafu {
913            column: &region_metadata.time_index_column().column_schema.name,
914        })?;
915    output_columns.push(part.batch.column(*ts_col_idx).clone());
916
917    // Adds encoded primary key dictionary column
918    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
919        Arc::new(pk_dict_array) as ArrayRef
920    } else {
921        let pk_col_idx =
922            column_indices
923                .get(PRIMARY_KEY_COLUMN_NAME)
924                .context(ColumnNotFoundSnafu {
925                    column: PRIMARY_KEY_COLUMN_NAME,
926                })?;
927        let col = part.batch.column(*pk_col_idx);
928
929        // Casts to dictionary type if needed
930        let target_type = ArrowDataType::Dictionary(
931            Box::new(ArrowDataType::UInt32),
932            Box::new(ArrowDataType::Binary),
933        );
934        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
935    };
936    output_columns.push(pk_dictionary);
937
938    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
939    output_columns.push(Arc::new(sequence_array) as ArrayRef);
940
941    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
942    output_columns.push(Arc::new(op_type_array) as ArrayRef);
943
944    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
945
946    // Sorts the batch by (primary_key, timestamp, sequence desc)
947    let sorted_batch = sort_primary_key_record_batch(&batch)?;
948
949    Ok(Some(BulkPart {
950        batch: sorted_batch,
951        max_timestamp: part.max_timestamp,
952        min_timestamp: part.min_timestamp,
953        sequence: part.sequence,
954        timestamp_index: new_timestamp_index,
955        raw_data: None,
956    }))
957}
958
959#[derive(Debug, Clone)]
960pub struct EncodedBulkPart {
961    data: Bytes,
962    metadata: BulkPartMeta,
963}
964
965impl EncodedBulkPart {
966    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
967        Self { data, metadata }
968    }
969
970    pub(crate) fn metadata(&self) -> &BulkPartMeta {
971        &self.metadata
972    }
973
974    /// Returns the size of the encoded data in bytes
975    pub(crate) fn size_bytes(&self) -> usize {
976        self.data.len()
977    }
978
979    /// Returns the encoded data.
980    pub(crate) fn data(&self) -> &Bytes {
981        &self.data
982    }
983
984    /// Creates MemtableStats from this EncodedBulkPart.
985    pub fn to_memtable_stats(&self) -> MemtableStats {
986        let meta = &self.metadata;
987        let ts_type = meta.region_metadata.time_index_type();
988        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
989        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
990
991        MemtableStats {
992            estimated_bytes: self.size_bytes(),
993            time_range: Some((min_ts, max_ts)),
994            num_rows: meta.num_rows,
995            num_ranges: 1,
996            max_sequence: meta.max_sequence,
997            series_count: meta.num_series as usize,
998        }
999    }
1000
1001    /// Converts this `EncodedBulkPart` to `SstInfo`.
1002    ///
1003    /// # Arguments
1004    /// * `file_id` - The SST file ID to assign to this part
1005    ///
1006    /// # Returns
1007    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1008    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1009        let unit = self.metadata.region_metadata.time_index_type().unit();
1010        let max_row_group_uncompressed_size: u64 = self
1011            .metadata
1012            .parquet_metadata
1013            .row_groups()
1014            .iter()
1015            .map(|rg| {
1016                rg.columns()
1017                    .iter()
1018                    .map(|c| c.uncompressed_size() as u64)
1019                    .sum::<u64>()
1020            })
1021            .max()
1022            .unwrap_or(0);
1023        SstInfo {
1024            file_id,
1025            time_range: (
1026                Timestamp::new(self.metadata.min_timestamp, unit),
1027                Timestamp::new(self.metadata.max_timestamp, unit),
1028            ),
1029            file_size: self.data.len() as u64,
1030            max_row_group_uncompressed_size,
1031            num_rows: self.metadata.num_rows,
1032            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1033            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1034            index_metadata: IndexOutput::default(),
1035            num_series: self.metadata.num_series,
1036        }
1037    }
1038
1039    pub(crate) fn read(
1040        &self,
1041        context: BulkIterContextRef,
1042        sequence: Option<SequenceRange>,
1043        mem_scan_metrics: Option<MemScanMetrics>,
1044    ) -> Result<Option<BoxedRecordBatchIterator>> {
1045        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
1046        let skip_fields_for_pruning =
1047            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1048
1049        // use predicate to find row groups to read.
1050        let row_groups_to_read =
1051            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1052
1053        if row_groups_to_read.is_empty() {
1054            // All row groups are filtered.
1055            return Ok(None);
1056        }
1057
1058        let iter = EncodedBulkPartIter::try_new(
1059            self,
1060            context,
1061            row_groups_to_read,
1062            sequence,
1063            mem_scan_metrics,
1064        )?;
1065        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1066    }
1067
1068    /// Computes whether to skip field columns based on PreFilterMode.
1069    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1070        match pre_filter_mode {
1071            PreFilterMode::All => false,
1072            PreFilterMode::SkipFields => true,
1073            PreFilterMode::SkipFieldsOnDelete => {
1074                // Check if any row group contains delete op
1075                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1076                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1077                })
1078            }
1079        }
1080    }
1081}
1082
1083// TODO(yingwen): max_sequence
1084#[derive(Debug, Clone)]
1085pub struct BulkPartMeta {
1086    /// Total rows in part.
1087    pub num_rows: usize,
1088    /// Max timestamp in part.
1089    pub max_timestamp: i64,
1090    /// Min timestamp in part.
1091    pub min_timestamp: i64,
1092    /// Part file metadata.
1093    pub parquet_metadata: Arc<ParquetMetaData>,
1094    /// Part region schema.
1095    pub region_metadata: RegionMetadataRef,
1096    /// Number of series.
1097    pub num_series: u64,
1098    /// Maximum sequence number in part.
1099    pub max_sequence: u64,
1100}
1101
1102/// Metrics for encoding a part.
1103#[derive(Default, Debug)]
1104pub struct BulkPartEncodeMetrics {
1105    /// Cost of iterating over the data.
1106    pub iter_cost: Duration,
1107    /// Cost of writing the data.
1108    pub write_cost: Duration,
1109    /// Size of data before encoding.
1110    pub raw_size: usize,
1111    /// Size of data after encoding.
1112    pub encoded_size: usize,
1113    /// Number of rows in part.
1114    pub num_rows: usize,
1115}
1116
1117pub struct BulkPartEncoder {
1118    metadata: RegionMetadataRef,
1119    row_group_size: usize,
1120    writer_props: Option<WriterProperties>,
1121}
1122
1123impl BulkPartEncoder {
1124    pub(crate) fn new(
1125        metadata: RegionMetadataRef,
1126        row_group_size: usize,
1127    ) -> Result<BulkPartEncoder> {
1128        // TODO(yingwen): Skip arrow schema if needed.
1129        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1130        let key_value_meta =
1131            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1132
1133        // TODO(yingwen): Do we need compression?
1134        let writer_props = Some(
1135            WriterProperties::builder()
1136                .set_key_value_metadata(Some(vec![key_value_meta]))
1137                .set_write_batch_size(row_group_size)
1138                .set_max_row_group_size(row_group_size)
1139                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1140                .set_column_index_truncate_length(None)
1141                .set_statistics_truncate_length(None)
1142                .build(),
1143        );
1144
1145        Ok(Self {
1146            metadata,
1147            row_group_size,
1148            writer_props,
1149        })
1150    }
1151}
1152
1153impl BulkPartEncoder {
1154    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1155    pub fn encode_record_batch_iter(
1156        &self,
1157        iter: BoxedRecordBatchIterator,
1158        arrow_schema: SchemaRef,
1159        min_timestamp: i64,
1160        max_timestamp: i64,
1161        max_sequence: u64,
1162        metrics: &mut BulkPartEncodeMetrics,
1163    ) -> Result<Option<EncodedBulkPart>> {
1164        let mut buf = Vec::with_capacity(4096);
1165        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1166            .context(EncodeMemtableSnafu)?;
1167        let mut total_rows = 0;
1168        let mut series_estimator = SeriesEstimator::default();
1169
1170        // Process each batch from the iterator
1171        let mut iter_start = Instant::now();
1172        for batch_result in iter {
1173            metrics.iter_cost += iter_start.elapsed();
1174            let batch = batch_result?;
1175            if batch.num_rows() == 0 {
1176                continue;
1177            }
1178
1179            series_estimator.update_flat(&batch);
1180            metrics.raw_size += record_batch_estimated_size(&batch);
1181            let write_start = Instant::now();
1182            writer.write(&batch).context(EncodeMemtableSnafu)?;
1183            metrics.write_cost += write_start.elapsed();
1184            total_rows += batch.num_rows();
1185            iter_start = Instant::now();
1186        }
1187        metrics.iter_cost += iter_start.elapsed();
1188        iter_start = Instant::now();
1189
1190        if total_rows == 0 {
1191            return Ok(None);
1192        }
1193
1194        let close_start = Instant::now();
1195        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1196        metrics.write_cost += close_start.elapsed();
1197        metrics.encoded_size += buf.len();
1198        metrics.num_rows += total_rows;
1199
1200        let buf = Bytes::from(buf);
1201        let parquet_metadata = Arc::new(file_metadata);
1202        let num_series = series_estimator.finish();
1203
1204        Ok(Some(EncodedBulkPart {
1205            data: buf,
1206            metadata: BulkPartMeta {
1207                num_rows: total_rows,
1208                max_timestamp,
1209                min_timestamp,
1210                parquet_metadata,
1211                region_metadata: self.metadata.clone(),
1212                num_series,
1213                max_sequence,
1214            },
1215        }))
1216    }
1217
1218    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1219    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1220        if part.batch.num_rows() == 0 {
1221            return Ok(None);
1222        }
1223
1224        let mut buf = Vec::with_capacity(4096);
1225        let arrow_schema = part.batch.schema();
1226
1227        let file_metadata = {
1228            let mut writer =
1229                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1230                    .context(EncodeMemtableSnafu)?;
1231            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1232            writer.finish().context(EncodeMemtableSnafu)?
1233        };
1234
1235        let buf = Bytes::from(buf);
1236        let parquet_metadata = Arc::new(file_metadata);
1237
1238        Ok(Some(EncodedBulkPart {
1239            data: buf,
1240            metadata: BulkPartMeta {
1241                num_rows: part.batch.num_rows(),
1242                max_timestamp: part.max_timestamp,
1243                min_timestamp: part.min_timestamp,
1244                parquet_metadata,
1245                region_metadata: self.metadata.clone(),
1246                num_series: part.estimated_series_count() as u64,
1247                max_sequence: part.sequence,
1248            },
1249        }))
1250    }
1251}
1252
1253/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1254///
1255/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1256/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1257/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1258#[derive(Debug, Clone)]
1259pub struct MultiBulkPart {
1260    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1261    batches: SmallVec<[RecordBatch; 4]>,
1262    /// Total rows across all batches.
1263    total_rows: usize,
1264    /// Max timestamp in part.
1265    max_timestamp: i64,
1266    /// Min timestamp in part.
1267    min_timestamp: i64,
1268    /// Max sequence number in part.
1269    max_sequence: SequenceNumber,
1270    /// Number of series.
1271    series_count: usize,
1272}
1273
1274impl MultiBulkPart {
1275    /// Creates a new MultiBulkPart from a single BulkPart.
1276    pub fn from_bulk_part(part: BulkPart) -> Self {
1277        let num_rows = part.num_rows();
1278        let series_count = part.estimated_series_count();
1279        let mut batches = SmallVec::new();
1280        batches.push(part.batch);
1281
1282        Self {
1283            batches,
1284            total_rows: num_rows,
1285            max_timestamp: part.max_timestamp,
1286            min_timestamp: part.min_timestamp,
1287            max_sequence: part.sequence,
1288            series_count,
1289        }
1290    }
1291
1292    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1293    ///
1294    /// # Arguments
1295    /// * `batches` - Ordered record batches
1296    /// * `min_timestamp` - Minimum timestamp across all batches
1297    /// * `max_timestamp` - Maximum timestamp across all batches
1298    /// * `max_sequence` - Maximum sequence number across all batches
1299    /// * `series_count` - Number of series in the batches
1300    ///
1301    /// # Panics
1302    /// Panics if batches is empty.
1303    pub fn new(
1304        batches: Vec<RecordBatch>,
1305        min_timestamp: i64,
1306        max_timestamp: i64,
1307        max_sequence: SequenceNumber,
1308        series_count: usize,
1309    ) -> Self {
1310        assert!(!batches.is_empty(), "batches must not be empty");
1311
1312        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1313
1314        Self {
1315            batches: SmallVec::from_vec(batches),
1316            total_rows,
1317            max_timestamp,
1318            min_timestamp,
1319            max_sequence,
1320            series_count,
1321        }
1322    }
1323
1324    /// Returns the total number of rows across all batches.
1325    pub fn num_rows(&self) -> usize {
1326        self.total_rows
1327    }
1328
1329    /// Returns the minimum timestamp.
1330    pub fn min_timestamp(&self) -> i64 {
1331        self.min_timestamp
1332    }
1333
1334    /// Returns the maximum timestamp.
1335    pub fn max_timestamp(&self) -> i64 {
1336        self.max_timestamp
1337    }
1338
1339    /// Returns the maximum sequence number.
1340    pub fn max_sequence(&self) -> SequenceNumber {
1341        self.max_sequence
1342    }
1343
1344    /// Returns the number of series.
1345    pub fn series_count(&self) -> usize {
1346        self.series_count
1347    }
1348
1349    /// Returns the number of record batches in this part.
1350    pub fn num_batches(&self) -> usize {
1351        self.batches.len()
1352    }
1353
1354    /// Returns an iterator over the record batches.
1355    pub(crate) fn batches(&self) -> impl Iterator<Item = &RecordBatch> {
1356        self.batches.iter()
1357    }
1358
1359    /// Returns the estimated memory size of all batches.
1360    pub(crate) fn estimated_size(&self) -> usize {
1361        self.batches.iter().map(record_batch_estimated_size).sum()
1362    }
1363
1364    /// Reads data from this part with the given context and filters.
1365    pub(crate) fn read(
1366        &self,
1367        context: BulkIterContextRef,
1368        sequence: Option<SequenceRange>,
1369        mem_scan_metrics: Option<MemScanMetrics>,
1370    ) -> Result<Option<BoxedRecordBatchIterator>> {
1371        if self.batches.is_empty() {
1372            return Ok(None);
1373        }
1374
1375        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1376            self.batches.iter().cloned().collect(),
1377            context,
1378            sequence,
1379            self.series_count,
1380            mem_scan_metrics,
1381        );
1382        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1383    }
1384
1385    /// Converts this `MultiBulkPart` to `MemtableStats`.
1386    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1387        let ts_type = region_metadata.time_index_type();
1388        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1389        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1390
1391        MemtableStats {
1392            estimated_bytes: self.estimated_size(),
1393            time_range: Some((min_ts, max_ts)),
1394            num_rows: self.num_rows(),
1395            num_ranges: 1,
1396            max_sequence: self.max_sequence,
1397            series_count: self.series_count,
1398        }
1399    }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404    use api::v1::{Row, SemanticType, WriteHint};
1405    use datafusion_common::ScalarValue;
1406    use datatypes::arrow::array::Float64Array;
1407    use datatypes::prelude::{ConcreteDataType, Value};
1408    use datatypes::schema::ColumnSchema;
1409    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1410    use store_api::storage::RegionId;
1411    use store_api::storage::consts::ReservedColumnId;
1412
1413    use super::*;
1414    use crate::memtable::bulk::context::BulkIterContext;
1415    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1416    use crate::test_util::memtable_util::{
1417        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1418    };
1419
1420    struct MutationInput<'a> {
1421        k0: &'a str,
1422        k1: u32,
1423        timestamps: &'a [i64],
1424        v1: &'a [Option<f64>],
1425        sequence: u64,
1426    }
1427
1428    #[derive(Debug, PartialOrd, PartialEq)]
1429    struct BatchOutput<'a> {
1430        pk_values: &'a [Value],
1431        timestamps: &'a [i64],
1432        v1: &'a [Option<f64>],
1433    }
1434
1435    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1436        let metadata = metadata_for_test();
1437        let kvs = input
1438            .iter()
1439            .map(|m| {
1440                build_key_values_with_ts_seq_values(
1441                    &metadata,
1442                    m.k0.to_string(),
1443                    m.k1,
1444                    m.timestamps.iter().copied(),
1445                    m.v1.iter().copied(),
1446                    m.sequence,
1447                )
1448            })
1449            .collect::<Vec<_>>();
1450        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1451        let primary_key_codec = build_primary_key_codec(&metadata);
1452        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1453        for kv in kvs {
1454            converter.append_key_values(&kv).unwrap();
1455        }
1456        let part = converter.convert().unwrap();
1457        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1458        encoder.encode_part(&part).unwrap().unwrap()
1459    }
1460
1461    #[test]
1462    fn test_write_and_read_part_projection() {
1463        let part = encode(&[
1464            MutationInput {
1465                k0: "a",
1466                k1: 0,
1467                timestamps: &[1],
1468                v1: &[Some(0.1)],
1469                sequence: 0,
1470            },
1471            MutationInput {
1472                k0: "b",
1473                k1: 0,
1474                timestamps: &[1],
1475                v1: &[Some(0.0)],
1476                sequence: 0,
1477            },
1478            MutationInput {
1479                k0: "a",
1480                k1: 0,
1481                timestamps: &[2],
1482                v1: &[Some(0.2)],
1483                sequence: 1,
1484            },
1485        ]);
1486
1487        let projection = &[4u32];
1488        let mut reader = part
1489            .read(
1490                Arc::new(
1491                    BulkIterContext::new(
1492                        part.metadata.region_metadata.clone(),
1493                        Some(projection.as_slice()),
1494                        None,
1495                        false,
1496                    )
1497                    .unwrap(),
1498                ),
1499                None,
1500                None,
1501            )
1502            .unwrap()
1503            .expect("expect at least one row group");
1504
1505        let mut total_rows_read = 0;
1506        let mut field: Vec<f64> = vec![];
1507        for res in reader {
1508            let batch = res.unwrap();
1509            assert_eq!(5, batch.num_columns());
1510            field.extend_from_slice(
1511                batch
1512                    .column(0)
1513                    .as_any()
1514                    .downcast_ref::<Float64Array>()
1515                    .unwrap()
1516                    .values(),
1517            );
1518            total_rows_read += batch.num_rows();
1519        }
1520        assert_eq!(3, total_rows_read);
1521        assert_eq!(vec![0.1, 0.2, 0.0], field);
1522    }
1523
1524    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1525        let metadata = metadata_for_test();
1526        let kvs = key_values
1527            .into_iter()
1528            .map(|(k0, k1, (start, end), sequence)| {
1529                let ts = (start..end);
1530                let v1 = (start..end).map(|_| None);
1531                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1532            })
1533            .collect::<Vec<_>>();
1534        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1535        let primary_key_codec = build_primary_key_codec(&metadata);
1536        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1537        for kv in kvs {
1538            converter.append_key_values(&kv).unwrap();
1539        }
1540        let part = converter.convert().unwrap();
1541        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1542        encoder.encode_part(&part).unwrap().unwrap()
1543    }
1544
1545    fn check_prune_row_group(
1546        part: &EncodedBulkPart,
1547        predicate: Option<Predicate>,
1548        expected_rows: usize,
1549    ) {
1550        let context = Arc::new(
1551            BulkIterContext::new(
1552                part.metadata.region_metadata.clone(),
1553                None,
1554                predicate,
1555                false,
1556            )
1557            .unwrap(),
1558        );
1559        let mut reader = part
1560            .read(context, None, None)
1561            .unwrap()
1562            .expect("expect at least one row group");
1563        let mut total_rows_read = 0;
1564        for res in reader {
1565            let batch = res.unwrap();
1566            total_rows_read += batch.num_rows();
1567        }
1568        // Should only read row group 1.
1569        assert_eq!(expected_rows, total_rows_read);
1570    }
1571
1572    #[test]
1573    fn test_prune_row_groups() {
1574        let part = prepare(vec![
1575            ("a", 0, (0, 40), 1),
1576            ("a", 1, (0, 60), 1),
1577            ("b", 0, (0, 100), 2),
1578            ("b", 1, (100, 180), 3),
1579            ("b", 1, (180, 210), 4),
1580        ]);
1581
1582        let context = Arc::new(
1583            BulkIterContext::new(
1584                part.metadata.region_metadata.clone(),
1585                None,
1586                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1587                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1588                )])),
1589                false,
1590            )
1591            .unwrap(),
1592        );
1593        assert!(part.read(context, None, None).unwrap().is_none());
1594
1595        check_prune_row_group(&part, None, 310);
1596
1597        check_prune_row_group(
1598            &part,
1599            Some(Predicate::new(vec![
1600                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1601                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1602            ])),
1603            40,
1604        );
1605
1606        check_prune_row_group(
1607            &part,
1608            Some(Predicate::new(vec![
1609                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1610                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1611            ])),
1612            60,
1613        );
1614
1615        check_prune_row_group(
1616            &part,
1617            Some(Predicate::new(vec![
1618                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1619            ])),
1620            100,
1621        );
1622
1623        check_prune_row_group(
1624            &part,
1625            Some(Predicate::new(vec![
1626                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1627                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1628            ])),
1629            100,
1630        );
1631
1632        /// Predicates over field column can do precise filtering.
1633        check_prune_row_group(
1634            &part,
1635            Some(Predicate::new(vec![
1636                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1637            ])),
1638            1,
1639        );
1640    }
1641
1642    #[test]
1643    fn test_bulk_part_converter_append_and_convert() {
1644        let metadata = metadata_for_test();
1645        let capacity = 100;
1646        let primary_key_codec = build_primary_key_codec(&metadata);
1647        let schema = to_flat_sst_arrow_schema(
1648            &metadata,
1649            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1650        );
1651
1652        let mut converter =
1653            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1654
1655        let key_values1 = build_key_values_with_ts_seq_values(
1656            &metadata,
1657            "key1".to_string(),
1658            1u32,
1659            vec![1000, 2000].into_iter(),
1660            vec![Some(1.0), Some(2.0)].into_iter(),
1661            1,
1662        );
1663
1664        let key_values2 = build_key_values_with_ts_seq_values(
1665            &metadata,
1666            "key2".to_string(),
1667            2u32,
1668            vec![1500].into_iter(),
1669            vec![Some(3.0)].into_iter(),
1670            2,
1671        );
1672
1673        converter.append_key_values(&key_values1).unwrap();
1674        converter.append_key_values(&key_values2).unwrap();
1675
1676        let bulk_part = converter.convert().unwrap();
1677
1678        assert_eq!(bulk_part.num_rows(), 3);
1679        assert_eq!(bulk_part.min_timestamp, 1000);
1680        assert_eq!(bulk_part.max_timestamp, 2000);
1681        assert_eq!(bulk_part.sequence, 2);
1682        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1683
1684        // Validate primary key columns are stored
1685        // Schema should include primary key columns k0 and k1 at the beginning
1686        let schema = bulk_part.batch.schema();
1687        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1688        assert_eq!(
1689            field_names,
1690            vec![
1691                "k0",
1692                "k1",
1693                "v0",
1694                "v1",
1695                "ts",
1696                "__primary_key",
1697                "__sequence",
1698                "__op_type"
1699            ]
1700        );
1701    }
1702
1703    #[test]
1704    fn test_bulk_part_converter_sorting() {
1705        let metadata = metadata_for_test();
1706        let capacity = 100;
1707        let primary_key_codec = build_primary_key_codec(&metadata);
1708        let schema = to_flat_sst_arrow_schema(
1709            &metadata,
1710            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1711        );
1712
1713        let mut converter =
1714            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1715
1716        let key_values1 = build_key_values_with_ts_seq_values(
1717            &metadata,
1718            "z_key".to_string(),
1719            3u32,
1720            vec![3000].into_iter(),
1721            vec![Some(3.0)].into_iter(),
1722            3,
1723        );
1724
1725        let key_values2 = build_key_values_with_ts_seq_values(
1726            &metadata,
1727            "a_key".to_string(),
1728            1u32,
1729            vec![1000].into_iter(),
1730            vec![Some(1.0)].into_iter(),
1731            1,
1732        );
1733
1734        let key_values3 = build_key_values_with_ts_seq_values(
1735            &metadata,
1736            "m_key".to_string(),
1737            2u32,
1738            vec![2000].into_iter(),
1739            vec![Some(2.0)].into_iter(),
1740            2,
1741        );
1742
1743        converter.append_key_values(&key_values1).unwrap();
1744        converter.append_key_values(&key_values2).unwrap();
1745        converter.append_key_values(&key_values3).unwrap();
1746
1747        let bulk_part = converter.convert().unwrap();
1748
1749        assert_eq!(bulk_part.num_rows(), 3);
1750
1751        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1752        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1753
1754        let ts_array = ts_column
1755            .as_any()
1756            .downcast_ref::<TimestampMillisecondArray>()
1757            .unwrap();
1758        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1759
1760        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1761        assert_eq!(seq_array.values(), &[1, 2, 3]);
1762
1763        // Validate primary key columns are stored
1764        let schema = bulk_part.batch.schema();
1765        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1766        assert_eq!(
1767            field_names,
1768            vec![
1769                "k0",
1770                "k1",
1771                "v0",
1772                "v1",
1773                "ts",
1774                "__primary_key",
1775                "__sequence",
1776                "__op_type"
1777            ]
1778        );
1779    }
1780
1781    #[test]
1782    fn test_bulk_part_converter_empty() {
1783        let metadata = metadata_for_test();
1784        let capacity = 10;
1785        let primary_key_codec = build_primary_key_codec(&metadata);
1786        let schema = to_flat_sst_arrow_schema(
1787            &metadata,
1788            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1789        );
1790
1791        let converter =
1792            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1793
1794        let bulk_part = converter.convert().unwrap();
1795
1796        assert_eq!(bulk_part.num_rows(), 0);
1797        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1798        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1799        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1800
1801        // Validate primary key columns are present in schema even for empty batch
1802        let schema = bulk_part.batch.schema();
1803        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1804        assert_eq!(
1805            field_names,
1806            vec![
1807                "k0",
1808                "k1",
1809                "v0",
1810                "v1",
1811                "ts",
1812                "__primary_key",
1813                "__sequence",
1814                "__op_type"
1815            ]
1816        );
1817    }
1818
1819    #[test]
1820    fn test_bulk_part_converter_without_primary_key_columns() {
1821        let metadata = metadata_for_test();
1822        let primary_key_codec = build_primary_key_codec(&metadata);
1823        let schema = to_flat_sst_arrow_schema(
1824            &metadata,
1825            &FlatSchemaOptions {
1826                raw_pk_columns: false,
1827                string_pk_use_dict: true,
1828            },
1829        );
1830
1831        let capacity = 100;
1832        let mut converter =
1833            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1834
1835        let key_values1 = build_key_values_with_ts_seq_values(
1836            &metadata,
1837            "key1".to_string(),
1838            1u32,
1839            vec![1000, 2000].into_iter(),
1840            vec![Some(1.0), Some(2.0)].into_iter(),
1841            1,
1842        );
1843
1844        let key_values2 = build_key_values_with_ts_seq_values(
1845            &metadata,
1846            "key2".to_string(),
1847            2u32,
1848            vec![1500].into_iter(),
1849            vec![Some(3.0)].into_iter(),
1850            2,
1851        );
1852
1853        converter.append_key_values(&key_values1).unwrap();
1854        converter.append_key_values(&key_values2).unwrap();
1855
1856        let bulk_part = converter.convert().unwrap();
1857
1858        assert_eq!(bulk_part.num_rows(), 3);
1859        assert_eq!(bulk_part.min_timestamp, 1000);
1860        assert_eq!(bulk_part.max_timestamp, 2000);
1861        assert_eq!(bulk_part.sequence, 2);
1862        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1863
1864        // Validate primary key columns are NOT stored individually
1865        let schema = bulk_part.batch.schema();
1866        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1867        assert_eq!(
1868            field_names,
1869            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1870        );
1871    }
1872
1873    #[allow(clippy::too_many_arguments)]
1874    fn build_key_values_with_sparse_encoding(
1875        metadata: &RegionMetadataRef,
1876        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1877        table_id: u32,
1878        tsid: u64,
1879        k0: String,
1880        k1: String,
1881        timestamps: impl Iterator<Item = i64>,
1882        values: impl Iterator<Item = Option<f64>>,
1883        sequence: SequenceNumber,
1884    ) -> KeyValues {
1885        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1886        let pk_values = vec![
1887            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1888            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1889            (0, Value::String(k0.clone().into())),
1890            (1, Value::String(k1.clone().into())),
1891        ];
1892        let mut encoded_key = Vec::new();
1893        primary_key_codec
1894            .encode_values(&pk_values, &mut encoded_key)
1895            .unwrap();
1896        assert!(!encoded_key.is_empty());
1897
1898        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1899        let column_schema = vec![
1900            api::v1::ColumnSchema {
1901                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1902                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1903                    ConcreteDataType::binary_datatype(),
1904                )
1905                .unwrap()
1906                .datatype() as i32,
1907                semantic_type: api::v1::SemanticType::Tag as i32,
1908                ..Default::default()
1909            },
1910            api::v1::ColumnSchema {
1911                column_name: "ts".to_string(),
1912                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1913                    ConcreteDataType::timestamp_millisecond_datatype(),
1914                )
1915                .unwrap()
1916                .datatype() as i32,
1917                semantic_type: api::v1::SemanticType::Timestamp as i32,
1918                ..Default::default()
1919            },
1920            api::v1::ColumnSchema {
1921                column_name: "v0".to_string(),
1922                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1923                    ConcreteDataType::int64_datatype(),
1924                )
1925                .unwrap()
1926                .datatype() as i32,
1927                semantic_type: api::v1::SemanticType::Field as i32,
1928                ..Default::default()
1929            },
1930            api::v1::ColumnSchema {
1931                column_name: "v1".to_string(),
1932                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1933                    ConcreteDataType::float64_datatype(),
1934                )
1935                .unwrap()
1936                .datatype() as i32,
1937                semantic_type: api::v1::SemanticType::Field as i32,
1938                ..Default::default()
1939            },
1940        ];
1941
1942        let rows = timestamps
1943            .zip(values)
1944            .map(|(ts, v)| Row {
1945                values: vec![
1946                    api::v1::Value {
1947                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1948                            encoded_key.clone(),
1949                        )),
1950                    },
1951                    api::v1::Value {
1952                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1953                    },
1954                    api::v1::Value {
1955                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1956                    },
1957                    api::v1::Value {
1958                        value_data: v.map(api::v1::value::ValueData::F64Value),
1959                    },
1960                ],
1961            })
1962            .collect();
1963
1964        let mutation = api::v1::Mutation {
1965            op_type: 1,
1966            sequence,
1967            rows: Some(api::v1::Rows {
1968                schema: column_schema,
1969                rows,
1970            }),
1971            write_hint: Some(WriteHint {
1972                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1973            }),
1974        };
1975        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1976    }
1977
1978    #[test]
1979    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1980        use api::v1::SemanticType;
1981        use datatypes::schema::ColumnSchema;
1982        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1983        use store_api::storage::RegionId;
1984
1985        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1986        builder
1987            .push_column_metadata(ColumnMetadata {
1988                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1989                semantic_type: SemanticType::Tag,
1990                column_id: 0,
1991            })
1992            .push_column_metadata(ColumnMetadata {
1993                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1994                semantic_type: SemanticType::Tag,
1995                column_id: 1,
1996            })
1997            .push_column_metadata(ColumnMetadata {
1998                column_schema: ColumnSchema::new(
1999                    "ts",
2000                    ConcreteDataType::timestamp_millisecond_datatype(),
2001                    false,
2002                ),
2003                semantic_type: SemanticType::Timestamp,
2004                column_id: 2,
2005            })
2006            .push_column_metadata(ColumnMetadata {
2007                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2008                semantic_type: SemanticType::Field,
2009                column_id: 3,
2010            })
2011            .push_column_metadata(ColumnMetadata {
2012                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2013                semantic_type: SemanticType::Field,
2014                column_id: 4,
2015            })
2016            .primary_key(vec![0, 1])
2017            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2018        let metadata = Arc::new(builder.build().unwrap());
2019
2020        let primary_key_codec = build_primary_key_codec(&metadata);
2021        let schema = to_flat_sst_arrow_schema(
2022            &metadata,
2023            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2024        );
2025
2026        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2027        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2028
2029        let capacity = 100;
2030        let mut converter =
2031            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2032
2033        let key_values1 = build_key_values_with_sparse_encoding(
2034            &metadata,
2035            &primary_key_codec,
2036            2048u32, // table_id
2037            100u64,  // tsid
2038            "key11".to_string(),
2039            "key21".to_string(),
2040            vec![1000, 2000].into_iter(),
2041            vec![Some(1.0), Some(2.0)].into_iter(),
2042            1,
2043        );
2044
2045        let key_values2 = build_key_values_with_sparse_encoding(
2046            &metadata,
2047            &primary_key_codec,
2048            4096u32, // table_id
2049            200u64,  // tsid
2050            "key12".to_string(),
2051            "key22".to_string(),
2052            vec![1500].into_iter(),
2053            vec![Some(3.0)].into_iter(),
2054            2,
2055        );
2056
2057        converter.append_key_values(&key_values1).unwrap();
2058        converter.append_key_values(&key_values2).unwrap();
2059
2060        let bulk_part = converter.convert().unwrap();
2061
2062        assert_eq!(bulk_part.num_rows(), 3);
2063        assert_eq!(bulk_part.min_timestamp, 1000);
2064        assert_eq!(bulk_part.max_timestamp, 2000);
2065        assert_eq!(bulk_part.sequence, 2);
2066        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2067
2068        // For sparse encoding, primary key columns should NOT be stored individually
2069        // even when store_primary_key_columns is true, because sparse encoding
2070        // stores the encoded primary key in the __primary_key column
2071        let schema = bulk_part.batch.schema();
2072        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2073        assert_eq!(
2074            field_names,
2075            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2076        );
2077
2078        // Verify the __primary_key column contains encoded sparse keys
2079        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2080        let dict_array = primary_key_column
2081            .as_any()
2082            .downcast_ref::<DictionaryArray<UInt32Type>>()
2083            .unwrap();
2084
2085        // Should have non-zero entries indicating encoded primary keys
2086        assert!(!dict_array.is_empty());
2087        assert_eq!(dict_array.len(), 3); // 3 rows total
2088
2089        // Verify values are properly encoded binary data (not empty)
2090        let values = dict_array
2091            .values()
2092            .as_any()
2093            .downcast_ref::<BinaryArray>()
2094            .unwrap();
2095        for i in 0..values.len() {
2096            assert!(
2097                !values.value(i).is_empty(),
2098                "Encoded primary key should not be empty"
2099            );
2100        }
2101    }
2102
2103    #[test]
2104    fn test_convert_bulk_part_empty() {
2105        let metadata = metadata_for_test();
2106        let schema = to_flat_sst_arrow_schema(
2107            &metadata,
2108            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2109        );
2110        let primary_key_codec = build_primary_key_codec(&metadata);
2111
2112        // Create empty batch
2113        let empty_batch = RecordBatch::new_empty(schema.clone());
2114        let empty_part = BulkPart {
2115            batch: empty_batch,
2116            max_timestamp: 0,
2117            min_timestamp: 0,
2118            sequence: 0,
2119            timestamp_index: 0,
2120            raw_data: None,
2121        };
2122
2123        let result =
2124            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2125        assert!(result.is_none());
2126    }
2127
2128    #[test]
2129    fn test_convert_bulk_part_dense_with_pk_columns() {
2130        let metadata = metadata_for_test();
2131        let primary_key_codec = build_primary_key_codec(&metadata);
2132
2133        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2134            "key1", "key2", "key1",
2135        ]));
2136        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2137        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2138        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2139        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2140
2141        let input_schema = Arc::new(Schema::new(vec![
2142            Field::new("k0", ArrowDataType::Utf8, false),
2143            Field::new("k1", ArrowDataType::UInt32, false),
2144            Field::new("v0", ArrowDataType::Int64, true),
2145            Field::new("v1", ArrowDataType::Float64, true),
2146            Field::new(
2147                "ts",
2148                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2149                false,
2150            ),
2151        ]));
2152
2153        let input_batch = RecordBatch::try_new(
2154            input_schema,
2155            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2156        )
2157        .unwrap();
2158
2159        let part = BulkPart {
2160            batch: input_batch,
2161            max_timestamp: 2000,
2162            min_timestamp: 1000,
2163            sequence: 5,
2164            timestamp_index: 4,
2165            raw_data: None,
2166        };
2167
2168        let output_schema = to_flat_sst_arrow_schema(
2169            &metadata,
2170            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2171        );
2172
2173        let result = convert_bulk_part(
2174            part,
2175            &metadata,
2176            primary_key_codec,
2177            output_schema,
2178            true, // store primary key columns
2179        )
2180        .unwrap();
2181
2182        let converted = result.unwrap();
2183
2184        assert_eq!(converted.num_rows(), 3);
2185        assert_eq!(converted.max_timestamp, 2000);
2186        assert_eq!(converted.min_timestamp, 1000);
2187        assert_eq!(converted.sequence, 5);
2188
2189        let schema = converted.batch.schema();
2190        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2191        assert_eq!(
2192            field_names,
2193            vec![
2194                "k0",
2195                "k1",
2196                "v0",
2197                "v1",
2198                "ts",
2199                "__primary_key",
2200                "__sequence",
2201                "__op_type"
2202            ]
2203        );
2204
2205        let k0_col = converted.batch.column_by_name("k0").unwrap();
2206        assert!(matches!(
2207            k0_col.data_type(),
2208            ArrowDataType::Dictionary(_, _)
2209        ));
2210
2211        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2212        let dict_array = pk_col
2213            .as_any()
2214            .downcast_ref::<DictionaryArray<UInt32Type>>()
2215            .unwrap();
2216        let keys = dict_array.keys();
2217
2218        assert_eq!(keys.len(), 3);
2219    }
2220
2221    #[test]
2222    fn test_convert_bulk_part_dense_without_pk_columns() {
2223        let metadata = metadata_for_test();
2224        let primary_key_codec = build_primary_key_codec(&metadata);
2225
2226        // Create input batch with primary key columns (k0, k1)
2227        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2228        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2229        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2230        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2231        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2232
2233        let input_schema = Arc::new(Schema::new(vec![
2234            Field::new("k0", ArrowDataType::Utf8, false),
2235            Field::new("k1", ArrowDataType::UInt32, false),
2236            Field::new("v0", ArrowDataType::Int64, true),
2237            Field::new("v1", ArrowDataType::Float64, true),
2238            Field::new(
2239                "ts",
2240                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2241                false,
2242            ),
2243        ]));
2244
2245        let input_batch = RecordBatch::try_new(
2246            input_schema,
2247            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2248        )
2249        .unwrap();
2250
2251        let part = BulkPart {
2252            batch: input_batch,
2253            max_timestamp: 2000,
2254            min_timestamp: 1000,
2255            sequence: 3,
2256            timestamp_index: 4,
2257            raw_data: None,
2258        };
2259
2260        let output_schema = to_flat_sst_arrow_schema(
2261            &metadata,
2262            &FlatSchemaOptions {
2263                raw_pk_columns: false,
2264                string_pk_use_dict: true,
2265            },
2266        );
2267
2268        let result = convert_bulk_part(
2269            part,
2270            &metadata,
2271            primary_key_codec,
2272            output_schema,
2273            false, // don't store primary key columns
2274        )
2275        .unwrap();
2276
2277        let converted = result.unwrap();
2278
2279        assert_eq!(converted.num_rows(), 2);
2280        assert_eq!(converted.max_timestamp, 2000);
2281        assert_eq!(converted.min_timestamp, 1000);
2282        assert_eq!(converted.sequence, 3);
2283
2284        // Verify schema does NOT include individual primary key columns
2285        let schema = converted.batch.schema();
2286        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2287        assert_eq!(
2288            field_names,
2289            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2290        );
2291
2292        // Verify __primary_key column is present and is a dictionary
2293        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2294        assert!(matches!(
2295            pk_col.data_type(),
2296            ArrowDataType::Dictionary(_, _)
2297        ));
2298    }
2299
2300    #[test]
2301    fn test_convert_bulk_part_sparse_encoding() {
2302        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2303        builder
2304            .push_column_metadata(ColumnMetadata {
2305                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2306                semantic_type: SemanticType::Tag,
2307                column_id: 0,
2308            })
2309            .push_column_metadata(ColumnMetadata {
2310                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2311                semantic_type: SemanticType::Tag,
2312                column_id: 1,
2313            })
2314            .push_column_metadata(ColumnMetadata {
2315                column_schema: ColumnSchema::new(
2316                    "ts",
2317                    ConcreteDataType::timestamp_millisecond_datatype(),
2318                    false,
2319                ),
2320                semantic_type: SemanticType::Timestamp,
2321                column_id: 2,
2322            })
2323            .push_column_metadata(ColumnMetadata {
2324                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2325                semantic_type: SemanticType::Field,
2326                column_id: 3,
2327            })
2328            .push_column_metadata(ColumnMetadata {
2329                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2330                semantic_type: SemanticType::Field,
2331                column_id: 4,
2332            })
2333            .primary_key(vec![0, 1])
2334            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2335        let metadata = Arc::new(builder.build().unwrap());
2336
2337        let primary_key_codec = build_primary_key_codec(&metadata);
2338
2339        // Create input batch with __primary_key column (sparse encoding)
2340        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2341            b"encoded_key_1".as_slice(),
2342            b"encoded_key_2".as_slice(),
2343        ]));
2344        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2345        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2346        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2347
2348        let input_schema = Arc::new(Schema::new(vec![
2349            Field::new("__primary_key", ArrowDataType::Binary, false),
2350            Field::new("v0", ArrowDataType::Int64, true),
2351            Field::new("v1", ArrowDataType::Float64, true),
2352            Field::new(
2353                "ts",
2354                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2355                false,
2356            ),
2357        ]));
2358
2359        let input_batch =
2360            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2361                .unwrap();
2362
2363        let part = BulkPart {
2364            batch: input_batch,
2365            max_timestamp: 2000,
2366            min_timestamp: 1000,
2367            sequence: 7,
2368            timestamp_index: 3,
2369            raw_data: None,
2370        };
2371
2372        let output_schema = to_flat_sst_arrow_schema(
2373            &metadata,
2374            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2375        );
2376
2377        let result = convert_bulk_part(
2378            part,
2379            &metadata,
2380            primary_key_codec,
2381            output_schema,
2382            true, // store_primary_key_columns (ignored for sparse)
2383        )
2384        .unwrap();
2385
2386        let converted = result.unwrap();
2387
2388        assert_eq!(converted.num_rows(), 2);
2389        assert_eq!(converted.max_timestamp, 2000);
2390        assert_eq!(converted.min_timestamp, 1000);
2391        assert_eq!(converted.sequence, 7);
2392
2393        // Verify schema does NOT include individual primary key columns (sparse encoding)
2394        let schema = converted.batch.schema();
2395        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2396        assert_eq!(
2397            field_names,
2398            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2399        );
2400
2401        // Verify __primary_key is dictionary encoded
2402        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2403        assert!(matches!(
2404            pk_col.data_type(),
2405            ArrowDataType::Dictionary(_, _)
2406        ));
2407    }
2408
2409    #[test]
2410    fn test_convert_bulk_part_sorting_with_multiple_series() {
2411        let metadata = metadata_for_test();
2412        let primary_key_codec = build_primary_key_codec(&metadata);
2413
2414        // Create unsorted batch with multiple series and timestamps
2415        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2416            "series_b", "series_a", "series_b", "series_a",
2417        ]));
2418        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2419        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2420        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2421        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2422            2000, 1000, 4000, 3000,
2423        ]));
2424
2425        let input_schema = Arc::new(Schema::new(vec![
2426            Field::new("k0", ArrowDataType::Utf8, false),
2427            Field::new("k1", ArrowDataType::UInt32, false),
2428            Field::new("v0", ArrowDataType::Int64, true),
2429            Field::new("v1", ArrowDataType::Float64, true),
2430            Field::new(
2431                "ts",
2432                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2433                false,
2434            ),
2435        ]));
2436
2437        let input_batch = RecordBatch::try_new(
2438            input_schema,
2439            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2440        )
2441        .unwrap();
2442
2443        let part = BulkPart {
2444            batch: input_batch,
2445            max_timestamp: 4000,
2446            min_timestamp: 1000,
2447            sequence: 10,
2448            timestamp_index: 4,
2449            raw_data: None,
2450        };
2451
2452        let output_schema = to_flat_sst_arrow_schema(
2453            &metadata,
2454            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2455        );
2456
2457        let result =
2458            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2459
2460        let converted = result.unwrap();
2461
2462        assert_eq!(converted.num_rows(), 4);
2463
2464        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2465        let ts_col = converted.batch.column(converted.timestamp_index);
2466        let ts_array = ts_col
2467            .as_any()
2468            .downcast_ref::<TimestampMillisecondArray>()
2469            .unwrap();
2470
2471        // After sorting by (pk, ts), we should have:
2472        // series_a,1: ts=1000, 3000
2473        // series_b,2: ts=2000, 4000
2474        let timestamps: Vec<i64> = ts_array.values().to_vec();
2475        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2476    }
2477}