Skip to main content

mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33    Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions};
36use datatypes::arrow::datatypes::{
37    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_json_extension_type;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::schema::ext::ArrowSchemaExt;
43use datatypes::types::JsonType;
44use datatypes::value::ValueRef;
45use datatypes::vectors::Helper;
46use datatypes::vectors::json::array::JsonArray;
47use mito_codec::key_values::{KeyValue, KeyValues};
48use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::file::metadata::ParquetMetaData;
52use parquet::file::properties::WriterProperties;
53use smallvec::SmallVec;
54use snafu::{OptionExt, ResultExt};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
59
60use crate::error::{
61    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
62    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
63    InvalidRequestSnafu, NewRecordBatchSnafu, Result,
64};
65use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
66use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
67use crate::memtable::time_series::{ValueBuilder, Values};
68use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
69use crate::sst::SeriesEstimator;
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
73use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77/// A raw bulk part in the memtable.
78#[derive(Clone)]
79pub struct BulkPart {
80    pub batch: RecordBatch,
81    pub max_timestamp: i64,
82    pub min_timestamp: i64,
83    pub sequence: u64,
84    pub timestamp_index: usize,
85    pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89    type Error = error::Error;
90
91    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92        match value.body.expect("Entry payload should be present") {
93            Body::ArrowIpc(ipc) => {
94                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95                    .context(error::ConvertBulkWalEntrySnafu)?;
96                let batch = decoder
97                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98                    .context(error::ConvertBulkWalEntrySnafu)?;
99                Ok(Self {
100                    batch,
101                    max_timestamp: value.max_ts,
102                    min_timestamp: value.min_ts,
103                    sequence: value.sequence,
104                    timestamp_index: value.timestamp_index as usize,
105                    raw_data: Some(ipc),
106                })
107            }
108        }
109    }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113    type Error = error::Error;
114
115    fn try_from(value: &BulkPart) -> Result<Self> {
116        if let Some(ipc) = &value.raw_data {
117            Ok(BulkWalEntry {
118                sequence: value.sequence,
119                max_ts: value.max_timestamp,
120                min_ts: value.min_timestamp,
121                timestamp_index: value.timestamp_index as u32,
122                body: Some(Body::ArrowIpc(ipc.clone())),
123            })
124        } else {
125            let mut encoder = FlightEncoder::default();
126            let schema_bytes = encoder
127                .encode_schema(value.batch.schema().as_ref())
128                .data_header;
129            let [rb_data] = encoder
130                .encode(FlightMessage::RecordBatch(value.batch.clone()))
131                .try_into()
132                .map_err(|_| {
133                    error::UnsupportedOperationSnafu {
134                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135                    }
136                    .build()
137                })?;
138            Ok(BulkWalEntry {
139                sequence: value.sequence,
140                max_ts: value.max_timestamp,
141                min_ts: value.min_timestamp,
142                timestamp_index: value.timestamp_index as u32,
143                body: Some(Body::ArrowIpc(ArrowIpc {
144                    schema: schema_bytes,
145                    data_header: rb_data.data_header,
146                    payload: rb_data.data_body,
147                })),
148            })
149        }
150    }
151}
152
153impl BulkPart {
154    pub(crate) fn estimated_size(&self) -> usize {
155        record_batch_estimated_size(&self.batch)
156    }
157
158    /// Returns the estimated series count in this BulkPart.
159    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
160    pub fn estimated_series_count(&self) -> usize {
161        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162        let pk_column = self.batch.column(pk_column_idx);
163        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164            dict_array.values().len()
165        } else {
166            0
167        }
168    }
169
170    /// Creates MemtableStats from this BulkPart.
171    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
172        let ts_type = region_metadata.time_index_type();
173        let min_ts = ts_type.create_timestamp(self.min_timestamp);
174        let max_ts = ts_type.create_timestamp(self.max_timestamp);
175
176        MemtableStats {
177            estimated_bytes: self.estimated_size(),
178            time_range: Some((min_ts, max_ts)),
179            num_rows: self.num_rows(),
180            num_ranges: 1,
181            max_sequence: self.sequence,
182            series_count: self.estimated_series_count(),
183        }
184    }
185
186    /// Fills missing columns in the BulkPart batch with default values.
187    ///
188    /// This function checks if the batch schema matches the region metadata schema,
189    /// and if there are missing columns, it fills them with default values (or null
190    /// for nullable columns).
191    ///
192    /// # Arguments
193    ///
194    /// * `region_metadata` - The region metadata containing the expected schema
195    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
196        // Builds a map of existing columns in the batch
197        let batch_schema = self.batch.schema();
198        let batch_columns: HashSet<_> = batch_schema
199            .fields()
200            .iter()
201            .map(|f| f.name().as_str())
202            .collect();
203
204        // Finds columns that need to be filled
205        let mut columns_to_fill = Vec::new();
206        for column_meta in &region_metadata.column_metadatas {
207            // TODO(yingwen): Returns error if it is impure default after we support filling
208            // bulk insert request in the frontend
209            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
210                columns_to_fill.push(column_meta);
211            }
212        }
213
214        if columns_to_fill.is_empty() {
215            return Ok(());
216        }
217
218        let num_rows = self.batch.num_rows();
219
220        let mut new_columns = Vec::new();
221        let mut new_fields = Vec::new();
222
223        // First, adds all existing columns
224        new_fields.extend(batch_schema.fields().iter().cloned());
225        new_columns.extend_from_slice(self.batch.columns());
226
227        let region_id = region_metadata.region_id;
228        // Then adds the missing columns with default values
229        for column_meta in columns_to_fill {
230            let default_vector = column_meta
231                .column_schema
232                .create_default_vector(num_rows)
233                .context(CreateDefaultSnafu {
234                    region_id,
235                    column: &column_meta.column_schema.name,
236                })?
237                .with_context(|| InvalidRequestSnafu {
238                    region_id,
239                    reason: format!(
240                        "column {} does not have default value",
241                        column_meta.column_schema.name
242                    ),
243                })?;
244            let arrow_array = default_vector.to_arrow_array();
245            column_meta.column_schema.data_type.as_arrow_type();
246
247            new_fields.push(Arc::new(Field::new(
248                column_meta.column_schema.name.clone(),
249                column_meta.column_schema.data_type.as_arrow_type(),
250                column_meta.column_schema.is_nullable(),
251            )));
252            new_columns.push(arrow_array);
253        }
254
255        // Create a new schema and batch with the filled columns
256        let new_schema = Arc::new(Schema::new(new_fields));
257        let new_batch =
258            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
259
260        // Update the batch
261        self.batch = new_batch;
262
263        Ok(())
264    }
265
266    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
267    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
268        let vectors = region_metadata
269            .schema
270            .column_schemas()
271            .iter()
272            .map(|col| match self.batch.column_by_name(&col.name) {
273                None => Ok(None),
274                Some(col) => Helper::try_into_vector(col).map(Some),
275            })
276            .collect::<datatypes::error::Result<Vec<_>>>()
277            .context(error::ComputeVectorSnafu)?;
278
279        let rows = (0..self.num_rows())
280            .map(|row_idx| {
281                let values = (0..self.batch.num_columns())
282                    .map(|col_idx| {
283                        if let Some(v) = &vectors[col_idx] {
284                            to_grpc_value(v.get(row_idx))
285                        } else {
286                            api::v1::Value { value_data: None }
287                        }
288                    })
289                    .collect::<Vec<_>>();
290                api::v1::Row { values }
291            })
292            .collect::<Vec<_>>();
293
294        let schema = region_metadata
295            .column_metadatas
296            .iter()
297            .map(|c| {
298                let data_type_wrapper =
299                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
300                Ok(api::v1::ColumnSchema {
301                    column_name: c.column_schema.name.clone(),
302                    datatype: data_type_wrapper.datatype() as i32,
303                    semantic_type: c.semantic_type as i32,
304                    ..Default::default()
305                })
306            })
307            .collect::<api::error::Result<Vec<_>>>()
308            .context(error::ConvertColumnDataTypeSnafu {
309                reason: "failed to convert region metadata to column schema",
310            })?;
311
312        let rows = api::v1::Rows { schema, rows };
313
314        Ok(Mutation {
315            op_type: OpType::Put as i32,
316            sequence: self.sequence,
317            rows: Some(rows),
318            write_hint: None,
319        })
320    }
321
322    pub fn timestamps(&self) -> &ArrayRef {
323        self.batch.column(self.timestamp_index)
324    }
325
326    pub fn num_rows(&self) -> usize {
327        self.batch.num_rows()
328    }
329}
330
331/// A collection of small unordered bulk parts.
332/// Used to batch small parts together before merging them into a sorted part.
333pub struct UnorderedPart {
334    /// Small bulk parts that haven't been sorted yet.
335    parts: Vec<BulkPart>,
336    /// Total number of rows across all parts.
337    total_rows: usize,
338    /// Minimum timestamp across all parts.
339    min_timestamp: i64,
340    /// Maximum timestamp across all parts.
341    max_timestamp: i64,
342    /// Maximum sequence number across all parts.
343    max_sequence: u64,
344    /// Row count threshold for accepting parts (default: 1024).
345    threshold: usize,
346    /// Row count threshold for compacting (default: 4096).
347    compact_threshold: usize,
348}
349
350impl Default for UnorderedPart {
351    fn default() -> Self {
352        Self::new()
353    }
354}
355
356impl UnorderedPart {
357    /// Creates a new empty UnorderedPart.
358    pub fn new() -> Self {
359        Self {
360            parts: Vec::new(),
361            total_rows: 0,
362            min_timestamp: i64::MAX,
363            max_timestamp: i64::MIN,
364            max_sequence: 0,
365            threshold: 1024,
366            compact_threshold: 4096,
367        }
368    }
369
370    /// Sets the threshold for accepting parts into unordered_part.
371    pub fn set_threshold(&mut self, threshold: usize) {
372        self.threshold = threshold;
373    }
374
375    /// Sets the threshold for compacting unordered_part.
376    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
377        self.compact_threshold = compact_threshold;
378    }
379
380    /// Returns the threshold for accepting parts.
381    pub fn threshold(&self) -> usize {
382        self.threshold
383    }
384
385    /// Returns the compact threshold.
386    pub fn compact_threshold(&self) -> usize {
387        self.compact_threshold
388    }
389
390    /// Returns true if this part should accept the given row count.
391    pub fn should_accept(&self, num_rows: usize) -> bool {
392        num_rows < self.threshold
393    }
394
395    /// Returns true if this part should be compacted.
396    pub fn should_compact(&self) -> bool {
397        self.total_rows >= self.compact_threshold
398    }
399
400    /// Adds a BulkPart to this unordered collection.
401    pub fn push(&mut self, part: BulkPart) {
402        self.total_rows += part.num_rows();
403        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
404        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
405        self.max_sequence = self.max_sequence.max(part.sequence);
406        self.parts.push(part);
407    }
408
409    /// Returns the total number of rows across all parts.
410    pub fn num_rows(&self) -> usize {
411        self.total_rows
412    }
413
414    /// Returns true if there are no parts.
415    pub fn is_empty(&self) -> bool {
416        self.parts.is_empty()
417    }
418
419    /// Returns the number of parts in this collection.
420    pub fn num_parts(&self) -> usize {
421        self.parts.len()
422    }
423
424    /// Concatenates and sorts all parts into a single RecordBatch.
425    /// Returns None if the collection is empty.
426    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
427        if self.parts.is_empty() {
428            return Ok(None);
429        }
430
431        if self.parts.len() == 1 {
432            // If there's only one part, return its batch directly
433            return Ok(Some(self.parts[0].batch.clone()));
434        }
435
436        // Get the schema from the first part
437        let schema = self.parts[0].batch.schema();
438        let concatenated = if schema.has_json_extension_field() {
439            let (schema, batches) = align_parts(&self.parts)?;
440            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
441        } else {
442            arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
443                .context(ComputeArrowSnafu)?
444        };
445
446        // Sort the concatenated batch
447        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
448
449        Ok(Some(sorted_batch))
450    }
451
452    /// Converts all parts into a single sorted BulkPart.
453    /// Returns None if the collection is empty.
454    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
455        let Some(sorted_batch) = self.concat_and_sort()? else {
456            return Ok(None);
457        };
458
459        let timestamp_index = self.parts[0].timestamp_index;
460
461        Ok(Some(BulkPart {
462            batch: sorted_batch,
463            max_timestamp: self.max_timestamp,
464            min_timestamp: self.min_timestamp,
465            sequence: self.max_sequence,
466            timestamp_index,
467            raw_data: None,
468        }))
469    }
470
471    /// Clears all parts from this collection.
472    pub fn clear(&mut self) {
473        self.parts.clear();
474        self.total_rows = 0;
475        self.min_timestamp = i64::MAX;
476        self.max_timestamp = i64::MIN;
477        self.max_sequence = 0;
478    }
479}
480
481/// Align the JSON columns in [BulkPart]s, to unified Arrow arrays. So that we can compute (concat,
482/// sort, etc.) on them.
483fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
484    debug_assert!(
485        !parts.is_empty()
486            && parts
487                .windows(2)
488                .all(|w| w[0].batch.schema_ref().fields().len()
489                    == w[1].batch.schema_ref().fields().len())
490    );
491
492    let first = &parts[0];
493    let base_schema = first.batch.schema_ref();
494    let rest = &parts[1..];
495
496    let mut merged_types = HashMap::new();
497    let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
498    for (i, field) in base_schema.fields().iter().enumerate() {
499        if is_json_extension_type(field) {
500            let mut merged = JsonType::from(field.data_type());
501            rest.iter()
502                .try_fold(&mut merged, |acc, x| {
503                    acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
504                    Ok(acc)
505                })
506                .context(DataTypeMismatchSnafu)?;
507            merged_types.insert(i, merged.as_arrow_type());
508
509            aligned_fields.push(Arc::new(
510                Field::new(
511                    field.name().clone(),
512                    merged.as_arrow_type(),
513                    field.is_nullable(),
514                )
515                .with_metadata(field.metadata().clone()),
516            ));
517        } else {
518            aligned_fields.push(field.clone())
519        };
520    }
521    let aligned_schema = Arc::new(Schema::new_with_metadata(
522        aligned_fields,
523        base_schema.metadata().clone(),
524    ));
525
526    let mut aligned_batches = Vec::with_capacity(parts.len());
527    for part in parts {
528        let mut columns = Vec::with_capacity(part.batch.num_columns());
529        for (i, column) in part.batch.columns().iter().enumerate() {
530            if let Some(expect) = merged_types.get(&i) {
531                columns.push(
532                    JsonArray::from(column)
533                        .try_align(expect)
534                        .context(ConvertValueSnafu)?,
535                );
536            } else {
537                columns.push(column.clone());
538            }
539        }
540        aligned_batches.push(
541            RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
542        );
543    }
544
545    Ok((aligned_schema, aligned_batches))
546}
547
548/// More accurate estimation of the size of a record batch.
549pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
550    batch
551        .columns()
552        .iter()
553        // If can not get slice memory size, assume 0 here.
554        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
555        .sum()
556}
557
558/// Primary key column builder for handling strings specially.
559enum PrimaryKeyColumnBuilder {
560    /// String dictionary builder for string types.
561    StringDict(StringDictionaryBuilder<UInt32Type>),
562    /// Generic mutable vector for other types.
563    Vector(Box<dyn MutableVector>),
564}
565
566impl PrimaryKeyColumnBuilder {
567    /// Appends a value to the builder.
568    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
569        match self {
570            PrimaryKeyColumnBuilder::StringDict(builder) => {
571                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
572                    // We know the value is a string.
573                    builder.append_value(s);
574                } else {
575                    builder.append_null();
576                }
577            }
578            PrimaryKeyColumnBuilder::Vector(builder) => {
579                builder.push_value_ref(&value);
580            }
581        }
582        Ok(())
583    }
584
585    /// Converts the builder to an ArrayRef.
586    fn into_arrow_array(self) -> ArrayRef {
587        match self {
588            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
589            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
590        }
591    }
592}
593
594/// Converter that converts structs into [BulkPart].
595pub struct BulkPartConverter {
596    /// Schema of the converted batch.
597    schema: SchemaRef,
598    /// Primary key codec for encoding keys
599    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
600    /// Buffer for encoding primary key.
601    key_buf: Vec<u8>,
602    /// Primary key array builder.
603    key_array_builder: PrimaryKeyArrayBuilder,
604    /// Builders for non-primary key columns.
605    value_builder: ValueBuilder,
606    /// Builders for individual primary key columns.
607    /// The order of builders is the same as the order of primary key columns in the region metadata.
608    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
609
610    /// Max timestamp value.
611    max_ts: i64,
612    /// Min timestamp value.
613    min_ts: i64,
614    /// Max sequence number.
615    max_sequence: SequenceNumber,
616}
617
618impl BulkPartConverter {
619    /// Creates a new converter.
620    ///
621    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
622    /// stores primary key columns in arrays additionally.
623    pub fn new(
624        region_metadata: &RegionMetadataRef,
625        schema: SchemaRef,
626        capacity: usize,
627        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
628        store_primary_key_columns: bool,
629    ) -> Self {
630        debug_assert_eq!(
631            region_metadata.primary_key_encoding,
632            primary_key_codec.encoding()
633        );
634
635        let primary_key_column_builders = if store_primary_key_columns
636            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
637        {
638            new_primary_key_column_builders(region_metadata, capacity)
639        } else {
640            Vec::new()
641        };
642
643        Self {
644            schema,
645            primary_key_codec,
646            key_buf: Vec::new(),
647            key_array_builder: PrimaryKeyArrayBuilder::new(),
648            value_builder: ValueBuilder::new(region_metadata, capacity),
649            primary_key_column_builders,
650            min_ts: i64::MAX,
651            max_ts: i64::MIN,
652            max_sequence: SequenceNumber::MIN,
653        }
654    }
655
656    /// Appends a [KeyValues] into the converter.
657    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
658        for kv in key_values.iter() {
659            self.append_key_value(&kv)?;
660        }
661
662        Ok(())
663    }
664
665    /// Appends a [KeyValue] to builders.
666    ///
667    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
668    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
669        // Handles primary key based on encoding type
670        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
671            // For sparse encoding, the primary key is already encoded in the KeyValue
672            // Gets the first (and only) primary key value which contains the encoded key
673            let mut primary_keys = kv.primary_keys();
674            if let Some(encoded) = primary_keys
675                .next()
676                .context(ColumnNotFoundSnafu {
677                    column: PRIMARY_KEY_COLUMN_NAME,
678                })?
679                .try_into_binary()
680                .context(DataTypeMismatchSnafu)?
681            {
682                self.key_array_builder
683                    .append(encoded)
684                    .context(ComputeArrowSnafu)?;
685            } else {
686                self.key_array_builder
687                    .append("")
688                    .context(ComputeArrowSnafu)?;
689            }
690        } else {
691            // For dense encoding, we need to encode the primary key columns
692            self.key_buf.clear();
693            self.primary_key_codec
694                .encode_key_value(kv, &mut self.key_buf)
695                .context(EncodeSnafu)?;
696            self.key_array_builder
697                .append(&self.key_buf)
698                .context(ComputeArrowSnafu)?;
699        };
700
701        // If storing primary key columns, append values to individual builders
702        if !self.primary_key_column_builders.is_empty() {
703            for (builder, pk_value) in self
704                .primary_key_column_builders
705                .iter_mut()
706                .zip(kv.primary_keys())
707            {
708                builder.push_value_ref(pk_value)?;
709            }
710        }
711
712        // Pushes other columns.
713        self.value_builder.push(
714            kv.timestamp(),
715            kv.sequence(),
716            kv.op_type() as u8,
717            kv.fields(),
718        );
719
720        // Updates statistics
721        // Safety: timestamp of kv must be both present and a valid timestamp value.
722        let ts = kv
723            .timestamp()
724            .try_into_timestamp()
725            .unwrap()
726            .unwrap()
727            .value();
728        self.min_ts = self.min_ts.min(ts);
729        self.max_ts = self.max_ts.max(ts);
730        self.max_sequence = self.max_sequence.max(kv.sequence());
731
732        Ok(())
733    }
734
735    /// Converts buffered content into a [BulkPart].
736    ///
737    /// It sorts the record batch by (primary key, timestamp, sequence desc).
738    pub fn convert(mut self) -> Result<BulkPart> {
739        let values = Values::from(self.value_builder);
740        let mut columns =
741            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
742
743        // Build primary key column arrays if enabled.
744        for builder in self.primary_key_column_builders {
745            columns.push(builder.into_arrow_array());
746        }
747        // Then fields columns.
748        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
749        // Time index.
750        let timestamp_index = columns.len();
751        columns.push(values.timestamp.to_arrow_array());
752        // Primary key.
753        let pk_array = self.key_array_builder.finish();
754        columns.push(Arc::new(pk_array));
755        // Sequence and op type.
756        columns.push(values.sequence.to_arrow_array());
757        columns.push(values.op_type.to_arrow_array());
758
759        // The actual datatype of JSON array is data oriented, not to be derived from the Region
760        // metadata, which is static. So here we have to align the schema.
761        let schema = align_schema_with_json_array(self.schema, &columns);
762        let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
763        // Sorts the record batch.
764        let batch = sort_primary_key_record_batch(&batch)?;
765
766        Ok(BulkPart {
767            batch,
768            max_timestamp: self.max_ts,
769            min_timestamp: self.min_ts,
770            sequence: self.max_sequence,
771            timestamp_index,
772            raw_data: None,
773        })
774    }
775}
776
777fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
778    if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
779        return schema;
780    }
781
782    let mut fields = Vec::with_capacity(schema.fields().len());
783    for (field, array) in schema.fields().iter().zip(columns) {
784        if !is_json_extension_type(field) {
785            fields.push(field.clone());
786            continue;
787        }
788
789        let mut field = field.as_ref().clone();
790        field.set_data_type(array.data_type().clone());
791        fields.push(Arc::new(field));
792    }
793
794    Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
795}
796
797fn new_primary_key_column_builders(
798    metadata: &RegionMetadata,
799    capacity: usize,
800) -> Vec<PrimaryKeyColumnBuilder> {
801    metadata
802        .primary_key_columns()
803        .map(|col| {
804            if col.column_schema.data_type.is_string() {
805                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
806                    capacity,
807                    INIT_DICT_VALUE_CAPACITY,
808                    capacity,
809                ))
810            } else {
811                PrimaryKeyColumnBuilder::Vector(
812                    col.column_schema.data_type.create_mutable_vector(capacity),
813                )
814            }
815        })
816        .collect()
817}
818
819/// Sorts the record batch with primary key format.
820pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
821    let total_columns = batch.num_columns();
822    let sort_columns = vec![
823        // Primary key column (ascending)
824        SortColumn {
825            values: batch.column(total_columns - 3).clone(),
826            options: Some(SortOptions {
827                descending: false,
828                nulls_first: true,
829            }),
830        },
831        // Time index column (ascending)
832        SortColumn {
833            values: batch.column(total_columns - 4).clone(),
834            options: Some(SortOptions {
835                descending: false,
836                nulls_first: true,
837            }),
838        },
839        // Sequence column (descending)
840        SortColumn {
841            values: batch.column(total_columns - 2).clone(),
842            options: Some(SortOptions {
843                descending: true,
844                nulls_first: true,
845            }),
846        },
847    ];
848
849    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
850        .context(ComputeArrowSnafu)?;
851
852    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
853}
854
855/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
856/// with the same format as produced by [BulkPartConverter].
857///
858/// This function takes a `BulkPart` where:
859/// - For dense encoding: Primary key columns may be stored as individual columns
860/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
861/// - The batch may not be sorted
862///
863/// And produces a `BulkPart` where:
864/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
865/// - An encoded `__primary_key` dictionary column is present
866/// - The batch is sorted by (primary_key, timestamp, sequence desc)
867///
868/// # Arguments
869///
870/// * `part` - The input `BulkPart` to convert
871/// * `region_metadata` - Region metadata containing schema information
872/// * `primary_key_codec` - Codec for encoding primary keys
873/// * `schema` - Target schema for the output batch
874/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
875///
876/// # Returns
877///
878/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
879/// encoded primary keys and sorted data.
880pub fn convert_bulk_part(
881    part: BulkPart,
882    region_metadata: &RegionMetadataRef,
883    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
884    schema: SchemaRef,
885    store_primary_key_columns: bool,
886) -> Result<Option<BulkPart>> {
887    if part.num_rows() == 0 {
888        return Ok(None);
889    }
890
891    let num_rows = part.num_rows();
892    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
893
894    // Builds a column name-to-index map for efficient lookups
895    let input_schema = part.batch.schema();
896    let column_indices: HashMap<&str, usize> = input_schema
897        .fields()
898        .iter()
899        .enumerate()
900        .map(|(idx, field)| (field.name().as_str(), idx))
901        .collect();
902
903    // Determines the structure of the input batch by looking up columns by name
904    let mut output_columns = Vec::new();
905
906    // Extracts primary key columns if we need to encode them (dense encoding)
907    let pk_array = if is_sparse {
908        // For sparse encoding, the input should already have the __primary_key column
909        // We need to find it in the input batch
910        None
911    } else {
912        // For dense encoding, extract and encode primary key columns by name
913        let pk_vectors: Result<Vec<_>> = region_metadata
914            .primary_key_columns()
915            .map(|col_meta| {
916                let col_idx = column_indices
917                    .get(col_meta.column_schema.name.as_str())
918                    .context(ColumnNotFoundSnafu {
919                        column: &col_meta.column_schema.name,
920                    })?;
921                let col = part.batch.column(*col_idx);
922                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
923            })
924            .collect();
925        let pk_vectors = pk_vectors?;
926
927        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
928        let mut encode_buf = Vec::new();
929
930        for row_idx in 0..num_rows {
931            encode_buf.clear();
932
933            // Collects primary key values with column IDs for this row
934            let pk_values_with_ids: Vec<_> = region_metadata
935                .primary_key
936                .iter()
937                .zip(pk_vectors.iter())
938                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
939                .collect();
940
941            // Encodes the primary key
942            primary_key_codec
943                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
944                .context(EncodeSnafu)?;
945
946            key_array_builder
947                .append(&encode_buf)
948                .context(ComputeArrowSnafu)?;
949        }
950
951        Some(key_array_builder.finish())
952    };
953
954    // Adds primary key columns if storing them (only for dense encoding)
955    if store_primary_key_columns && !is_sparse {
956        for col_meta in region_metadata.primary_key_columns() {
957            let col_idx = column_indices
958                .get(col_meta.column_schema.name.as_str())
959                .context(ColumnNotFoundSnafu {
960                    column: &col_meta.column_schema.name,
961                })?;
962            let col = part.batch.column(*col_idx);
963
964            // Converts to dictionary if needed for string types
965            let col = if col_meta.column_schema.data_type.is_string() {
966                let target_type = ArrowDataType::Dictionary(
967                    Box::new(ArrowDataType::UInt32),
968                    Box::new(ArrowDataType::Utf8),
969                );
970                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
971            } else {
972                col.clone()
973            };
974            output_columns.push(col);
975        }
976    }
977
978    // Adds field columns
979    for col_meta in region_metadata.field_columns() {
980        let col_idx = column_indices
981            .get(col_meta.column_schema.name.as_str())
982            .context(ColumnNotFoundSnafu {
983                column: &col_meta.column_schema.name,
984            })?;
985        output_columns.push(part.batch.column(*col_idx).clone());
986    }
987
988    // Adds timestamp column
989    let new_timestamp_index = output_columns.len();
990    let ts_col_idx = column_indices
991        .get(
992            region_metadata
993                .time_index_column()
994                .column_schema
995                .name
996                .as_str(),
997        )
998        .context(ColumnNotFoundSnafu {
999            column: &region_metadata.time_index_column().column_schema.name,
1000        })?;
1001    output_columns.push(part.batch.column(*ts_col_idx).clone());
1002
1003    // Adds encoded primary key dictionary column
1004    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
1005        Arc::new(pk_dict_array) as ArrayRef
1006    } else {
1007        let pk_col_idx =
1008            column_indices
1009                .get(PRIMARY_KEY_COLUMN_NAME)
1010                .context(ColumnNotFoundSnafu {
1011                    column: PRIMARY_KEY_COLUMN_NAME,
1012                })?;
1013        let col = part.batch.column(*pk_col_idx);
1014
1015        // Casts to dictionary type if needed
1016        let target_type = ArrowDataType::Dictionary(
1017            Box::new(ArrowDataType::UInt32),
1018            Box::new(ArrowDataType::Binary),
1019        );
1020        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
1021    };
1022    output_columns.push(pk_dictionary);
1023
1024    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
1025    output_columns.push(Arc::new(sequence_array) as ArrayRef);
1026
1027    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
1028    output_columns.push(Arc::new(op_type_array) as ArrayRef);
1029
1030    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
1031
1032    // Sorts the batch by (primary_key, timestamp, sequence desc)
1033    let sorted_batch = sort_primary_key_record_batch(&batch)?;
1034
1035    Ok(Some(BulkPart {
1036        batch: sorted_batch,
1037        max_timestamp: part.max_timestamp,
1038        min_timestamp: part.min_timestamp,
1039        sequence: part.sequence,
1040        timestamp_index: new_timestamp_index,
1041        raw_data: None,
1042    }))
1043}
1044
1045#[derive(Debug, Clone)]
1046pub struct EncodedBulkPart {
1047    data: Bytes,
1048    metadata: BulkPartMeta,
1049}
1050
1051impl EncodedBulkPart {
1052    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
1053        Self { data, metadata }
1054    }
1055
1056    pub fn metadata(&self) -> &BulkPartMeta {
1057        &self.metadata
1058    }
1059
1060    /// Returns the size of the encoded data in bytes
1061    pub(crate) fn size_bytes(&self) -> usize {
1062        self.data.len()
1063    }
1064
1065    /// Returns the encoded data.
1066    pub fn data(&self) -> &Bytes {
1067        &self.data
1068    }
1069
1070    /// Creates MemtableStats from this EncodedBulkPart.
1071    pub fn to_memtable_stats(&self) -> MemtableStats {
1072        let meta = &self.metadata;
1073        let ts_type = meta.region_metadata.time_index_type();
1074        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1075        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1076
1077        MemtableStats {
1078            estimated_bytes: self.size_bytes(),
1079            time_range: Some((min_ts, max_ts)),
1080            num_rows: meta.num_rows,
1081            num_ranges: 1,
1082            max_sequence: meta.max_sequence,
1083            series_count: meta.num_series as usize,
1084        }
1085    }
1086
1087    /// Converts this `EncodedBulkPart` to `SstInfo`.
1088    ///
1089    /// # Arguments
1090    /// * `file_id` - The SST file ID to assign to this part
1091    ///
1092    /// # Returns
1093    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1094    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1095        let unit = self.metadata.region_metadata.time_index_type().unit();
1096        let max_row_group_uncompressed_size: u64 = self
1097            .metadata
1098            .parquet_metadata
1099            .row_groups()
1100            .iter()
1101            .map(|rg| {
1102                rg.columns()
1103                    .iter()
1104                    .map(|c| c.uncompressed_size() as u64)
1105                    .sum::<u64>()
1106            })
1107            .max()
1108            .unwrap_or(0);
1109        SstInfo {
1110            file_id,
1111            time_range: (
1112                Timestamp::new(self.metadata.min_timestamp, unit),
1113                Timestamp::new(self.metadata.max_timestamp, unit),
1114            ),
1115            file_size: self.data.len() as u64,
1116            max_row_group_uncompressed_size,
1117            num_rows: self.metadata.num_rows,
1118            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1119            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1120            index_metadata: IndexOutput::default(),
1121            num_series: self.metadata.num_series,
1122        }
1123    }
1124
1125    pub(crate) fn read(
1126        &self,
1127        context: BulkIterContextRef,
1128        sequence: Option<SequenceRange>,
1129        mem_scan_metrics: Option<MemScanMetrics>,
1130    ) -> Result<Option<BoxedRecordBatchIterator>> {
1131        // Compute skip_fields for row group pruning from the configured pre-filter mode.
1132        let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1133
1134        // use predicate to find row groups to read.
1135        let row_groups_to_read =
1136            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1137
1138        if row_groups_to_read.is_empty() {
1139            // All row groups are filtered.
1140            return Ok(None);
1141        }
1142
1143        let iter = EncodedBulkPartIter::try_new(
1144            self,
1145            context,
1146            row_groups_to_read,
1147            sequence,
1148            mem_scan_metrics,
1149        )?;
1150        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1151    }
1152}
1153
1154// TODO(yingwen): max_sequence
1155#[derive(Debug, Clone)]
1156pub struct BulkPartMeta {
1157    /// Total rows in part.
1158    pub num_rows: usize,
1159    /// Max timestamp in part.
1160    pub max_timestamp: i64,
1161    /// Min timestamp in part.
1162    pub min_timestamp: i64,
1163    /// Part file metadata.
1164    pub parquet_metadata: Arc<ParquetMetaData>,
1165    /// Part region schema.
1166    pub region_metadata: RegionMetadataRef,
1167    /// Number of series.
1168    pub num_series: u64,
1169    /// Maximum sequence number in part.
1170    pub max_sequence: u64,
1171}
1172
1173/// Metrics for encoding a part.
1174#[derive(Default, Debug)]
1175pub struct BulkPartEncodeMetrics {
1176    /// Cost of iterating over the data.
1177    pub iter_cost: Duration,
1178    /// Cost of writing the data.
1179    pub write_cost: Duration,
1180    /// Size of data before encoding.
1181    pub raw_size: usize,
1182    /// Size of data after encoding.
1183    pub encoded_size: usize,
1184    /// Number of rows in part.
1185    pub num_rows: usize,
1186}
1187
1188pub struct BulkPartEncoder {
1189    metadata: RegionMetadataRef,
1190    writer_props: Option<WriterProperties>,
1191}
1192
1193impl BulkPartEncoder {
1194    pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1195        // TODO(yingwen): Skip arrow schema if needed.
1196        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1197        let key_value_meta =
1198            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1199
1200        // TODO(yingwen): Do we need compression?
1201        let writer_props = Some(
1202            WriterProperties::builder()
1203                .set_key_value_metadata(Some(vec![key_value_meta]))
1204                .set_write_batch_size(row_group_size)
1205                .set_max_row_group_size(row_group_size)
1206                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1207                .set_column_index_truncate_length(None)
1208                .set_statistics_truncate_length(None)
1209                .build(),
1210        );
1211
1212        Ok(Self {
1213            metadata,
1214            writer_props,
1215        })
1216    }
1217}
1218
1219impl BulkPartEncoder {
1220    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1221    pub fn encode_record_batch_iter(
1222        &self,
1223        iter: BoxedRecordBatchIterator,
1224        arrow_schema: SchemaRef,
1225        min_timestamp: i64,
1226        max_timestamp: i64,
1227        max_sequence: u64,
1228        metrics: &mut BulkPartEncodeMetrics,
1229    ) -> Result<Option<EncodedBulkPart>> {
1230        let mut buf = Vec::with_capacity(4096);
1231        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1232            .context(EncodeMemtableSnafu)?;
1233        let mut total_rows = 0;
1234        let mut series_estimator = SeriesEstimator::default();
1235
1236        // Process each batch from the iterator
1237        let mut iter_start = Instant::now();
1238        for batch_result in iter {
1239            metrics.iter_cost += iter_start.elapsed();
1240            let batch = batch_result?;
1241            if batch.num_rows() == 0 {
1242                continue;
1243            }
1244
1245            series_estimator.update_flat(&batch);
1246            metrics.raw_size += record_batch_estimated_size(&batch);
1247            let write_start = Instant::now();
1248            writer.write(&batch).context(EncodeMemtableSnafu)?;
1249            metrics.write_cost += write_start.elapsed();
1250            total_rows += batch.num_rows();
1251            iter_start = Instant::now();
1252        }
1253        metrics.iter_cost += iter_start.elapsed();
1254
1255        if total_rows == 0 {
1256            return Ok(None);
1257        }
1258
1259        let close_start = Instant::now();
1260        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1261        metrics.write_cost += close_start.elapsed();
1262        metrics.encoded_size += buf.len();
1263        metrics.num_rows += total_rows;
1264
1265        let buf = Bytes::from(buf);
1266        let parquet_metadata = Arc::new(file_metadata);
1267        let num_series = series_estimator.finish();
1268
1269        Ok(Some(EncodedBulkPart {
1270            data: buf,
1271            metadata: BulkPartMeta {
1272                num_rows: total_rows,
1273                max_timestamp,
1274                min_timestamp,
1275                parquet_metadata,
1276                region_metadata: self.metadata.clone(),
1277                num_series,
1278                max_sequence,
1279            },
1280        }))
1281    }
1282
1283    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1284    pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1285        if part.batch.num_rows() == 0 {
1286            return Ok(None);
1287        }
1288
1289        let mut buf = Vec::with_capacity(4096);
1290        let arrow_schema = part.batch.schema();
1291
1292        let file_metadata = {
1293            let mut writer =
1294                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1295                    .context(EncodeMemtableSnafu)?;
1296            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1297            writer.finish().context(EncodeMemtableSnafu)?
1298        };
1299
1300        let buf = Bytes::from(buf);
1301        let parquet_metadata = Arc::new(file_metadata);
1302
1303        Ok(Some(EncodedBulkPart {
1304            data: buf,
1305            metadata: BulkPartMeta {
1306                num_rows: part.batch.num_rows(),
1307                max_timestamp: part.max_timestamp,
1308                min_timestamp: part.min_timestamp,
1309                parquet_metadata,
1310                region_metadata: self.metadata.clone(),
1311                num_series: part.estimated_series_count() as u64,
1312                max_sequence: part.sequence,
1313            },
1314        }))
1315    }
1316}
1317
1318/// Per-batch min/max statistics for the first tag column in a `MultiBulkPart`.
1319///
1320/// Since batches are sorted by primary key, we can extract the min/max of the first tag
1321/// from the first/last row's encoded primary key in each batch. These statistics enable
1322/// batch-level pruning using predicates, analogous to row-group pruning in parquet.
1323#[derive(Debug, Clone)]
1324struct BatchStats {
1325    /// Number of batches.
1326    num_batches: usize,
1327    /// Column id of the first tag.
1328    first_tag_id: ColumnId,
1329    /// Min values of the first tag, one element per batch.
1330    min_values: ArrayRef,
1331    /// Max values of the first tag, one element per batch.
1332    max_values: ArrayRef,
1333}
1334
1335impl BatchStats {
1336    /// Computes batch statistics from a slice of record batches.
1337    ///
1338    /// Returns `None` if there is no primary key (no first tag to collect stats for)
1339    /// or if extracting statistics fails.
1340    fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1341        // `primary_key.first()` is correct for both dense and sparse encodings.
1342        // For dense, values follow the order of `metadata.primary_key`.
1343        // For sparse, `decode_leftmost` decodes the first value which also
1344        // corresponds to `primary_key.first()`. See `SparsePrimaryKeyCodec` for format details.
1345        let first_tag_id = *metadata.primary_key.first()?;
1346        let first_tag_column = metadata.column_by_id(first_tag_id)?;
1347        let data_type = &first_tag_column.column_schema.data_type;
1348
1349        let converter = build_primary_key_codec_with_fields(
1350            metadata.primary_key_encoding,
1351            [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1352        );
1353        let pk_index = primary_key_column_index(batches.first()?.num_columns());
1354
1355        let mut min_builder = data_type.create_mutable_vector(batches.len());
1356        let mut max_builder = data_type.create_mutable_vector(batches.len());
1357
1358        for batch in batches {
1359            match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1360                Some((min_val, max_val)) => {
1361                    min_builder.push_value_ref(&min_val.as_value_ref());
1362                    max_builder.push_value_ref(&max_val.as_value_ref());
1363                }
1364                None => {
1365                    min_builder.push_null();
1366                    max_builder.push_null();
1367                }
1368            }
1369        }
1370
1371        Some(Self {
1372            num_batches: batches.len(),
1373            first_tag_id,
1374            min_values: min_builder.to_vector().to_arrow_array(),
1375            max_values: max_builder.to_vector().to_arrow_array(),
1376        })
1377    }
1378
1379    /// Extracts the first tag value from the first and last rows of a batch.
1380    fn extract_first_tag_bounds(
1381        batch: &RecordBatch,
1382        pk_index: usize,
1383        converter: &dyn PrimaryKeyCodec,
1384    ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1385        if batch.num_rows() == 0 {
1386            return None;
1387        }
1388
1389        let pk_dict = batch
1390            .column(pk_index)
1391            .as_any()
1392            .downcast_ref::<PrimaryKeyArray>()?;
1393        let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1394
1395        let keys = pk_dict.keys();
1396        let min_key = keys.value(0);
1397        let max_key = keys.value(batch.num_rows() - 1);
1398        let min_bytes = pk_values.value(min_key as usize);
1399        let max_bytes = pk_values.value(max_key as usize);
1400
1401        Some((
1402            converter.decode_leftmost(min_bytes).ok()??,
1403            converter.decode_leftmost(max_bytes).ok()??,
1404        ))
1405    }
1406}
1407
1408/// Adapter implementing `PruningStatistics` for `BatchStats`.
1409///
1410/// Used with `Predicate::prune_with_stats()` to skip batches whose first-tag
1411/// min/max range does not match the query predicate.
1412struct BatchPruningStats<'a> {
1413    stats: &'a BatchStats,
1414    metadata: &'a RegionMetadataRef,
1415}
1416
1417impl PruningStatistics for BatchPruningStats<'_> {
1418    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1419        let col = self.metadata.column_by_name(&column.name)?;
1420        if col.column_id == self.stats.first_tag_id {
1421            Some(self.stats.min_values.clone())
1422        } else {
1423            None
1424        }
1425    }
1426
1427    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1428        let col = self.metadata.column_by_name(&column.name)?;
1429        if col.column_id == self.stats.first_tag_id {
1430            Some(self.stats.max_values.clone())
1431        } else {
1432            None
1433        }
1434    }
1435
1436    fn num_containers(&self) -> usize {
1437        self.stats.num_batches
1438    }
1439
1440    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1441        None
1442    }
1443
1444    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1445        None
1446    }
1447
1448    fn contained(
1449        &self,
1450        _column: &Column,
1451        _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1452    ) -> Option<BooleanArray> {
1453        None
1454    }
1455}
1456
1457/// Returns true if the predicate references the given column name.
1458fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1459    let mut columns = HashSet::new();
1460    for expr in predicate.exprs() {
1461        let _ = expr_to_columns(expr, &mut columns);
1462    }
1463    columns.iter().any(|col| col.name == column_name)
1464}
1465
1466/// Returns true if the batch should be pruned (skipped) based on the first-tag min/max
1467/// statistics and the predicate in the context. Returns false if no pruning is possible
1468/// (no primary key, no predicate, or the batch matches the predicate).
1469pub(crate) fn should_prune_bulk_part(
1470    batch: &RecordBatch,
1471    context: &BulkIterContext,
1472    metadata: &RegionMetadata,
1473) -> bool {
1474    let predicate = match &context.predicate {
1475        Some(p) => p,
1476        None => return false,
1477    };
1478    // Check if the predicate references the first tag column to avoid computing
1479    // expensive batch statistics when they won't help with pruning.
1480    let first_tag_id = match metadata.primary_key.first() {
1481        Some(id) => *id,
1482        None => return false,
1483    };
1484    // Safety: `first_tag_id` comes from `metadata.primary_key` so the column always exists.
1485    let first_tag_name = &metadata
1486        .column_by_id(first_tag_id)
1487        .unwrap()
1488        .column_schema
1489        .name;
1490    if !predicate_references_column(predicate, first_tag_name) {
1491        return false;
1492    }
1493    let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1494        Some(s) => s,
1495        None => return false,
1496    };
1497    let region_meta = context.read_format().metadata();
1498    let pruning_stats = BatchPruningStats {
1499        stats: &stats,
1500        metadata: region_meta,
1501    };
1502    let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1503    !mask.first().copied().unwrap_or(true)
1504}
1505
1506/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1507///
1508/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1509/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1510/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1511#[derive(Debug, Clone)]
1512pub struct MultiBulkPart {
1513    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1514    batches: SmallVec<[RecordBatch; 4]>,
1515    /// Total rows across all batches.
1516    total_rows: usize,
1517    /// Max timestamp in part.
1518    max_timestamp: i64,
1519    /// Min timestamp in part.
1520    min_timestamp: i64,
1521    /// Max sequence number in part.
1522    max_sequence: SequenceNumber,
1523    /// Number of series.
1524    series_count: usize,
1525    /// Pre-computed per-batch statistics for the first tag column.
1526    /// `None` if there is no primary key.
1527    batch_stats: Option<BatchStats>,
1528}
1529
1530impl MultiBulkPart {
1531    /// Creates a new MultiBulkPart from a single BulkPart.
1532    pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1533        let num_rows = part.num_rows();
1534        let series_count = part.estimated_series_count();
1535        let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1536        let mut batches = SmallVec::new();
1537        batches.push(part.batch);
1538
1539        Self {
1540            batches,
1541            total_rows: num_rows,
1542            max_timestamp: part.max_timestamp,
1543            min_timestamp: part.min_timestamp,
1544            max_sequence: part.sequence,
1545            series_count,
1546            batch_stats,
1547        }
1548    }
1549
1550    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1551    ///
1552    /// # Arguments
1553    /// * `batches` - Ordered record batches
1554    /// * `min_timestamp` - Minimum timestamp across all batches
1555    /// * `max_timestamp` - Maximum timestamp across all batches
1556    /// * `max_sequence` - Maximum sequence number across all batches
1557    /// * `series_count` - Number of series in the batches
1558    /// * `metadata` - Region metadata for computing batch statistics
1559    ///
1560    /// # Panics
1561    /// Panics if batches is empty.
1562    pub fn new(
1563        batches: Vec<RecordBatch>,
1564        min_timestamp: i64,
1565        max_timestamp: i64,
1566        max_sequence: SequenceNumber,
1567        series_count: usize,
1568        metadata: &RegionMetadata,
1569    ) -> Self {
1570        assert!(!batches.is_empty(), "batches must not be empty");
1571
1572        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1573        let batch_stats = BatchStats::compute(&batches, metadata);
1574
1575        Self {
1576            batches: SmallVec::from_vec(batches),
1577            total_rows,
1578            max_timestamp,
1579            min_timestamp,
1580            max_sequence,
1581            series_count,
1582            batch_stats,
1583        }
1584    }
1585
1586    /// Returns the total number of rows across all batches.
1587    pub fn num_rows(&self) -> usize {
1588        self.total_rows
1589    }
1590
1591    /// Returns the minimum timestamp.
1592    pub fn min_timestamp(&self) -> i64 {
1593        self.min_timestamp
1594    }
1595
1596    /// Returns the maximum timestamp.
1597    pub fn max_timestamp(&self) -> i64 {
1598        self.max_timestamp
1599    }
1600
1601    /// Returns the maximum sequence number.
1602    pub fn max_sequence(&self) -> SequenceNumber {
1603        self.max_sequence
1604    }
1605
1606    /// Returns the number of series.
1607    pub fn series_count(&self) -> usize {
1608        self.series_count
1609    }
1610
1611    /// Returns the number of record batches in this part.
1612    pub fn num_batches(&self) -> usize {
1613        self.batches.len()
1614    }
1615
1616    /// Returns the estimated memory size of all batches.
1617    pub(crate) fn estimated_size(&self) -> usize {
1618        self.batches.iter().map(record_batch_estimated_size).sum()
1619    }
1620
1621    /// Reads data from this part with the given context and filters.
1622    ///
1623    /// If batch-level statistics are available and a predicate is set, prunes
1624    /// batches whose first-tag min/max range doesn't match the predicate before
1625    /// creating the iterator.
1626    pub(crate) fn read(
1627        &self,
1628        context: BulkIterContextRef,
1629        sequence: Option<SequenceRange>,
1630        mem_scan_metrics: Option<MemScanMetrics>,
1631    ) -> Result<Option<BoxedRecordBatchIterator>> {
1632        if self.batches.is_empty() {
1633            return Ok(None);
1634        }
1635
1636        let batches_to_read = self.prune_batches(&context);
1637
1638        if batches_to_read.is_empty() {
1639            return Ok(None);
1640        }
1641
1642        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1643            batches_to_read,
1644            context,
1645            sequence,
1646            self.series_count,
1647            mem_scan_metrics,
1648        );
1649        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1650    }
1651
1652    /// Prunes batches using the first-tag min/max statistics and the predicate.
1653    /// Returns all batches if no stats or no predicate is available.
1654    fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1655        if let Some(stats) = &self.batch_stats
1656            && let Some(predicate) = &context.predicate
1657        {
1658            let region_meta = context.read_format().metadata();
1659            let pruning_stats = BatchPruningStats {
1660                stats,
1661                metadata: region_meta,
1662            };
1663            let mask =
1664                predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1665            self.batches
1666                .iter()
1667                .zip(mask.iter())
1668                .filter_map(
1669                    |(batch, &selected)| {
1670                        if selected { Some(batch.clone()) } else { None }
1671                    },
1672                )
1673                .collect()
1674        } else {
1675            self.batches.iter().cloned().collect()
1676        }
1677    }
1678
1679    /// Converts this `MultiBulkPart` to `MemtableStats`.
1680    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1681        let ts_type = region_metadata.time_index_type();
1682        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1683        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1684
1685        MemtableStats {
1686            estimated_bytes: self.estimated_size(),
1687            time_range: Some((min_ts, max_ts)),
1688            num_rows: self.num_rows(),
1689            num_ranges: 1,
1690            max_sequence: self.max_sequence,
1691            series_count: self.series_count,
1692        }
1693    }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698    use api::v1::{Row, SemanticType, WriteHint};
1699    use datafusion_common::ScalarValue;
1700    use datatypes::arrow::array::{
1701        BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1702    };
1703    use datatypes::arrow::datatypes::UInt32Type;
1704    use datatypes::prelude::{ConcreteDataType, Value};
1705    use datatypes::schema::ColumnSchema;
1706    use mito_codec::row_converter::build_primary_key_codec;
1707    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1708    use store_api::storage::RegionId;
1709    use store_api::storage::consts::ReservedColumnId;
1710    use table::predicate::Predicate;
1711
1712    use super::*;
1713    use crate::memtable::bulk::context::BulkIterContext;
1714    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1715    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1716
1717    struct MutationInput<'a> {
1718        k0: &'a str,
1719        k1: u32,
1720        timestamps: &'a [i64],
1721        v1: &'a [Option<f64>],
1722        sequence: u64,
1723    }
1724
1725    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1726        let metadata = metadata_for_test();
1727        let kvs = input
1728            .iter()
1729            .map(|m| {
1730                build_key_values_with_ts_seq_values(
1731                    &metadata,
1732                    m.k0.to_string(),
1733                    m.k1,
1734                    m.timestamps.iter().copied(),
1735                    m.v1.iter().copied(),
1736                    m.sequence,
1737                )
1738            })
1739            .collect::<Vec<_>>();
1740        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1741        let primary_key_codec = build_primary_key_codec(&metadata);
1742        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1743        for kv in kvs {
1744            converter.append_key_values(&kv).unwrap();
1745        }
1746        let part = converter.convert().unwrap();
1747        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1748        encoder.encode_part(&part).unwrap().unwrap()
1749    }
1750
1751    #[test]
1752    fn test_write_and_read_part_projection() {
1753        let part = encode(&[
1754            MutationInput {
1755                k0: "a",
1756                k1: 0,
1757                timestamps: &[1],
1758                v1: &[Some(0.1)],
1759                sequence: 0,
1760            },
1761            MutationInput {
1762                k0: "b",
1763                k1: 0,
1764                timestamps: &[1],
1765                v1: &[Some(0.0)],
1766                sequence: 0,
1767            },
1768            MutationInput {
1769                k0: "a",
1770                k1: 0,
1771                timestamps: &[2],
1772                v1: &[Some(0.2)],
1773                sequence: 1,
1774            },
1775        ]);
1776
1777        let projection = &[4u32];
1778        let reader = part
1779            .read(
1780                Arc::new(
1781                    BulkIterContext::new(
1782                        part.metadata.region_metadata.clone(),
1783                        Some(projection.as_slice()),
1784                        None,
1785                        false,
1786                    )
1787                    .unwrap(),
1788                ),
1789                None,
1790                None,
1791            )
1792            .unwrap()
1793            .expect("expect at least one row group");
1794
1795        let mut total_rows_read = 0;
1796        let mut field: Vec<f64> = vec![];
1797        for res in reader {
1798            let batch = res.unwrap();
1799            assert_eq!(5, batch.num_columns());
1800            field.extend_from_slice(
1801                batch
1802                    .column(0)
1803                    .as_any()
1804                    .downcast_ref::<Float64Array>()
1805                    .unwrap()
1806                    .values(),
1807            );
1808            total_rows_read += batch.num_rows();
1809        }
1810        assert_eq!(3, total_rows_read);
1811        assert_eq!(vec![0.1, 0.2, 0.0], field);
1812    }
1813
1814    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1815        let metadata = metadata_for_test();
1816        let kvs = key_values
1817            .into_iter()
1818            .map(|(k0, k1, (start, end), sequence)| {
1819                let ts = start..end;
1820                let v1 = (start..end).map(|_| None);
1821                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1822            })
1823            .collect::<Vec<_>>();
1824        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1825        let primary_key_codec = build_primary_key_codec(&metadata);
1826        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1827        for kv in kvs {
1828            converter.append_key_values(&kv).unwrap();
1829        }
1830        let part = converter.convert().unwrap();
1831        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1832        encoder.encode_part(&part).unwrap().unwrap()
1833    }
1834
1835    fn check_prune_row_group(
1836        part: &EncodedBulkPart,
1837        predicate: Option<Predicate>,
1838        expected_rows: usize,
1839    ) {
1840        let context = Arc::new(
1841            BulkIterContext::new(
1842                part.metadata.region_metadata.clone(),
1843                None,
1844                predicate,
1845                false,
1846            )
1847            .unwrap(),
1848        );
1849        let reader = part
1850            .read(context, None, None)
1851            .unwrap()
1852            .expect("expect at least one row group");
1853        let mut total_rows_read = 0;
1854        for res in reader {
1855            let batch = res.unwrap();
1856            total_rows_read += batch.num_rows();
1857        }
1858        // Should only read row group 1.
1859        assert_eq!(expected_rows, total_rows_read);
1860    }
1861
1862    #[test]
1863    fn test_prune_row_groups() {
1864        let part = prepare(vec![
1865            ("a", 0, (0, 40), 1),
1866            ("a", 1, (0, 60), 1),
1867            ("b", 0, (0, 100), 2),
1868            ("b", 1, (100, 180), 3),
1869            ("b", 1, (180, 210), 4),
1870        ]);
1871
1872        let context = Arc::new(
1873            BulkIterContext::new(
1874                part.metadata.region_metadata.clone(),
1875                None,
1876                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1877                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1878                )])),
1879                false,
1880            )
1881            .unwrap(),
1882        );
1883        assert!(part.read(context, None, None).unwrap().is_none());
1884
1885        check_prune_row_group(&part, None, 310);
1886
1887        check_prune_row_group(
1888            &part,
1889            Some(Predicate::new(vec![
1890                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1891                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1892            ])),
1893            40,
1894        );
1895
1896        check_prune_row_group(
1897            &part,
1898            Some(Predicate::new(vec![
1899                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1900                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1901            ])),
1902            60,
1903        );
1904
1905        check_prune_row_group(
1906            &part,
1907            Some(Predicate::new(vec![
1908                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1909            ])),
1910            100,
1911        );
1912
1913        check_prune_row_group(
1914            &part,
1915            Some(Predicate::new(vec![
1916                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1917                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1918            ])),
1919            100,
1920        );
1921
1922        // Predicates over field column can do precise filtering.
1923        check_prune_row_group(
1924            &part,
1925            Some(Predicate::new(vec![
1926                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1927            ])),
1928            1,
1929        );
1930    }
1931
1932    #[test]
1933    fn test_bulk_part_converter_append_and_convert() {
1934        let metadata = metadata_for_test();
1935        let capacity = 100;
1936        let primary_key_codec = build_primary_key_codec(&metadata);
1937        let schema = to_flat_sst_arrow_schema(
1938            &metadata,
1939            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1940        );
1941
1942        let mut converter =
1943            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1944
1945        let key_values1 = build_key_values_with_ts_seq_values(
1946            &metadata,
1947            "key1".to_string(),
1948            1u32,
1949            vec![1000, 2000].into_iter(),
1950            vec![Some(1.0), Some(2.0)].into_iter(),
1951            1,
1952        );
1953
1954        let key_values2 = build_key_values_with_ts_seq_values(
1955            &metadata,
1956            "key2".to_string(),
1957            2u32,
1958            vec![1500].into_iter(),
1959            vec![Some(3.0)].into_iter(),
1960            2,
1961        );
1962
1963        converter.append_key_values(&key_values1).unwrap();
1964        converter.append_key_values(&key_values2).unwrap();
1965
1966        let bulk_part = converter.convert().unwrap();
1967
1968        assert_eq!(bulk_part.num_rows(), 3);
1969        assert_eq!(bulk_part.min_timestamp, 1000);
1970        assert_eq!(bulk_part.max_timestamp, 2000);
1971        assert_eq!(bulk_part.sequence, 2);
1972        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1973
1974        // Validate primary key columns are stored
1975        // Schema should include primary key columns k0 and k1 at the beginning
1976        let schema = bulk_part.batch.schema();
1977        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1978        assert_eq!(
1979            field_names,
1980            vec![
1981                "k0",
1982                "k1",
1983                "v0",
1984                "v1",
1985                "ts",
1986                "__primary_key",
1987                "__sequence",
1988                "__op_type"
1989            ]
1990        );
1991    }
1992
1993    #[test]
1994    fn test_bulk_part_converter_sorting() {
1995        let metadata = metadata_for_test();
1996        let capacity = 100;
1997        let primary_key_codec = build_primary_key_codec(&metadata);
1998        let schema = to_flat_sst_arrow_schema(
1999            &metadata,
2000            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2001        );
2002
2003        let mut converter =
2004            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2005
2006        let key_values1 = build_key_values_with_ts_seq_values(
2007            &metadata,
2008            "z_key".to_string(),
2009            3u32,
2010            vec![3000].into_iter(),
2011            vec![Some(3.0)].into_iter(),
2012            3,
2013        );
2014
2015        let key_values2 = build_key_values_with_ts_seq_values(
2016            &metadata,
2017            "a_key".to_string(),
2018            1u32,
2019            vec![1000].into_iter(),
2020            vec![Some(1.0)].into_iter(),
2021            1,
2022        );
2023
2024        let key_values3 = build_key_values_with_ts_seq_values(
2025            &metadata,
2026            "m_key".to_string(),
2027            2u32,
2028            vec![2000].into_iter(),
2029            vec![Some(2.0)].into_iter(),
2030            2,
2031        );
2032
2033        converter.append_key_values(&key_values1).unwrap();
2034        converter.append_key_values(&key_values2).unwrap();
2035        converter.append_key_values(&key_values3).unwrap();
2036
2037        let bulk_part = converter.convert().unwrap();
2038
2039        assert_eq!(bulk_part.num_rows(), 3);
2040
2041        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2042        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2043
2044        let ts_array = ts_column
2045            .as_any()
2046            .downcast_ref::<TimestampMillisecondArray>()
2047            .unwrap();
2048        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2049
2050        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2051        assert_eq!(seq_array.values(), &[1, 2, 3]);
2052
2053        // Validate primary key columns are stored
2054        let schema = bulk_part.batch.schema();
2055        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2056        assert_eq!(
2057            field_names,
2058            vec![
2059                "k0",
2060                "k1",
2061                "v0",
2062                "v1",
2063                "ts",
2064                "__primary_key",
2065                "__sequence",
2066                "__op_type"
2067            ]
2068        );
2069    }
2070
2071    #[test]
2072    fn test_bulk_part_converter_empty() {
2073        let metadata = metadata_for_test();
2074        let capacity = 10;
2075        let primary_key_codec = build_primary_key_codec(&metadata);
2076        let schema = to_flat_sst_arrow_schema(
2077            &metadata,
2078            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2079        );
2080
2081        let converter =
2082            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2083
2084        let bulk_part = converter.convert().unwrap();
2085
2086        assert_eq!(bulk_part.num_rows(), 0);
2087        assert_eq!(bulk_part.min_timestamp, i64::MAX);
2088        assert_eq!(bulk_part.max_timestamp, i64::MIN);
2089        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2090
2091        // Validate primary key columns are present in schema even for empty batch
2092        let schema = bulk_part.batch.schema();
2093        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2094        assert_eq!(
2095            field_names,
2096            vec![
2097                "k0",
2098                "k1",
2099                "v0",
2100                "v1",
2101                "ts",
2102                "__primary_key",
2103                "__sequence",
2104                "__op_type"
2105            ]
2106        );
2107    }
2108
2109    #[test]
2110    fn test_bulk_part_converter_without_primary_key_columns() {
2111        let metadata = metadata_for_test();
2112        let primary_key_codec = build_primary_key_codec(&metadata);
2113        let schema = to_flat_sst_arrow_schema(
2114            &metadata,
2115            &FlatSchemaOptions {
2116                raw_pk_columns: false,
2117                string_pk_use_dict: true,
2118            },
2119        );
2120
2121        let capacity = 100;
2122        let mut converter =
2123            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2124
2125        let key_values1 = build_key_values_with_ts_seq_values(
2126            &metadata,
2127            "key1".to_string(),
2128            1u32,
2129            vec![1000, 2000].into_iter(),
2130            vec![Some(1.0), Some(2.0)].into_iter(),
2131            1,
2132        );
2133
2134        let key_values2 = build_key_values_with_ts_seq_values(
2135            &metadata,
2136            "key2".to_string(),
2137            2u32,
2138            vec![1500].into_iter(),
2139            vec![Some(3.0)].into_iter(),
2140            2,
2141        );
2142
2143        converter.append_key_values(&key_values1).unwrap();
2144        converter.append_key_values(&key_values2).unwrap();
2145
2146        let bulk_part = converter.convert().unwrap();
2147
2148        assert_eq!(bulk_part.num_rows(), 3);
2149        assert_eq!(bulk_part.min_timestamp, 1000);
2150        assert_eq!(bulk_part.max_timestamp, 2000);
2151        assert_eq!(bulk_part.sequence, 2);
2152        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2153
2154        // Validate primary key columns are NOT stored individually
2155        let schema = bulk_part.batch.schema();
2156        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2157        assert_eq!(
2158            field_names,
2159            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2160        );
2161    }
2162
2163    #[allow(clippy::too_many_arguments)]
2164    fn build_key_values_with_sparse_encoding(
2165        metadata: &RegionMetadataRef,
2166        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2167        table_id: u32,
2168        tsid: u64,
2169        k0: String,
2170        k1: String,
2171        timestamps: impl Iterator<Item = i64>,
2172        values: impl Iterator<Item = Option<f64>>,
2173        sequence: SequenceNumber,
2174    ) -> KeyValues {
2175        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
2176        let pk_values = vec![
2177            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2178            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2179            (0, Value::String(k0.clone().into())),
2180            (1, Value::String(k1.clone().into())),
2181        ];
2182        let mut encoded_key = Vec::new();
2183        primary_key_codec
2184            .encode_values(&pk_values, &mut encoded_key)
2185            .unwrap();
2186        assert!(!encoded_key.is_empty());
2187
2188        // Create schema for sparse encoding: __primary_key, ts, v0, v1
2189        let column_schema = vec![
2190            api::v1::ColumnSchema {
2191                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2192                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2193                    ConcreteDataType::binary_datatype(),
2194                )
2195                .unwrap()
2196                .datatype() as i32,
2197                semantic_type: api::v1::SemanticType::Tag as i32,
2198                ..Default::default()
2199            },
2200            api::v1::ColumnSchema {
2201                column_name: "ts".to_string(),
2202                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2203                    ConcreteDataType::timestamp_millisecond_datatype(),
2204                )
2205                .unwrap()
2206                .datatype() as i32,
2207                semantic_type: api::v1::SemanticType::Timestamp as i32,
2208                ..Default::default()
2209            },
2210            api::v1::ColumnSchema {
2211                column_name: "v0".to_string(),
2212                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2213                    ConcreteDataType::int64_datatype(),
2214                )
2215                .unwrap()
2216                .datatype() as i32,
2217                semantic_type: api::v1::SemanticType::Field as i32,
2218                ..Default::default()
2219            },
2220            api::v1::ColumnSchema {
2221                column_name: "v1".to_string(),
2222                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2223                    ConcreteDataType::float64_datatype(),
2224                )
2225                .unwrap()
2226                .datatype() as i32,
2227                semantic_type: api::v1::SemanticType::Field as i32,
2228                ..Default::default()
2229            },
2230        ];
2231
2232        let rows = timestamps
2233            .zip(values)
2234            .map(|(ts, v)| Row {
2235                values: vec![
2236                    api::v1::Value {
2237                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2238                            encoded_key.clone(),
2239                        )),
2240                    },
2241                    api::v1::Value {
2242                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2243                    },
2244                    api::v1::Value {
2245                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2246                    },
2247                    api::v1::Value {
2248                        value_data: v.map(api::v1::value::ValueData::F64Value),
2249                    },
2250                ],
2251            })
2252            .collect();
2253
2254        let mutation = api::v1::Mutation {
2255            op_type: 1,
2256            sequence,
2257            rows: Some(api::v1::Rows {
2258                schema: column_schema,
2259                rows,
2260            }),
2261            write_hint: Some(WriteHint {
2262                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2263            }),
2264        };
2265        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2266    }
2267
2268    #[test]
2269    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2270        use api::v1::SemanticType;
2271        use datatypes::schema::ColumnSchema;
2272        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2273        use store_api::storage::RegionId;
2274
2275        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2276        builder
2277            .push_column_metadata(ColumnMetadata {
2278                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2279                semantic_type: SemanticType::Tag,
2280                column_id: 0,
2281            })
2282            .push_column_metadata(ColumnMetadata {
2283                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2284                semantic_type: SemanticType::Tag,
2285                column_id: 1,
2286            })
2287            .push_column_metadata(ColumnMetadata {
2288                column_schema: ColumnSchema::new(
2289                    "ts",
2290                    ConcreteDataType::timestamp_millisecond_datatype(),
2291                    false,
2292                ),
2293                semantic_type: SemanticType::Timestamp,
2294                column_id: 2,
2295            })
2296            .push_column_metadata(ColumnMetadata {
2297                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2298                semantic_type: SemanticType::Field,
2299                column_id: 3,
2300            })
2301            .push_column_metadata(ColumnMetadata {
2302                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2303                semantic_type: SemanticType::Field,
2304                column_id: 4,
2305            })
2306            .primary_key(vec![0, 1])
2307            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2308        let metadata = Arc::new(builder.build().unwrap());
2309
2310        let primary_key_codec = build_primary_key_codec(&metadata);
2311        let schema = to_flat_sst_arrow_schema(
2312            &metadata,
2313            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2314        );
2315
2316        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2317        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2318
2319        let capacity = 100;
2320        let mut converter =
2321            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2322
2323        let key_values1 = build_key_values_with_sparse_encoding(
2324            &metadata,
2325            &primary_key_codec,
2326            2048u32, // table_id
2327            100u64,  // tsid
2328            "key11".to_string(),
2329            "key21".to_string(),
2330            vec![1000, 2000].into_iter(),
2331            vec![Some(1.0), Some(2.0)].into_iter(),
2332            1,
2333        );
2334
2335        let key_values2 = build_key_values_with_sparse_encoding(
2336            &metadata,
2337            &primary_key_codec,
2338            4096u32, // table_id
2339            200u64,  // tsid
2340            "key12".to_string(),
2341            "key22".to_string(),
2342            vec![1500].into_iter(),
2343            vec![Some(3.0)].into_iter(),
2344            2,
2345        );
2346
2347        converter.append_key_values(&key_values1).unwrap();
2348        converter.append_key_values(&key_values2).unwrap();
2349
2350        let bulk_part = converter.convert().unwrap();
2351
2352        assert_eq!(bulk_part.num_rows(), 3);
2353        assert_eq!(bulk_part.min_timestamp, 1000);
2354        assert_eq!(bulk_part.max_timestamp, 2000);
2355        assert_eq!(bulk_part.sequence, 2);
2356        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2357
2358        // For sparse encoding, primary key columns should NOT be stored individually
2359        // even when store_primary_key_columns is true, because sparse encoding
2360        // stores the encoded primary key in the __primary_key column
2361        let schema = bulk_part.batch.schema();
2362        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2363        assert_eq!(
2364            field_names,
2365            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2366        );
2367
2368        // Verify the __primary_key column contains encoded sparse keys
2369        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2370        let dict_array = primary_key_column
2371            .as_any()
2372            .downcast_ref::<DictionaryArray<UInt32Type>>()
2373            .unwrap();
2374
2375        // Should have non-zero entries indicating encoded primary keys
2376        assert!(!dict_array.is_empty());
2377        assert_eq!(dict_array.len(), 3); // 3 rows total
2378
2379        // Verify values are properly encoded binary data (not empty)
2380        let values = dict_array
2381            .values()
2382            .as_any()
2383            .downcast_ref::<BinaryArray>()
2384            .unwrap();
2385        for i in 0..values.len() {
2386            assert!(
2387                !values.value(i).is_empty(),
2388                "Encoded primary key should not be empty"
2389            );
2390        }
2391    }
2392
2393    #[test]
2394    fn test_convert_bulk_part_empty() {
2395        let metadata = metadata_for_test();
2396        let schema = to_flat_sst_arrow_schema(
2397            &metadata,
2398            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2399        );
2400        let primary_key_codec = build_primary_key_codec(&metadata);
2401
2402        // Create empty batch
2403        let empty_batch = RecordBatch::new_empty(schema.clone());
2404        let empty_part = BulkPart {
2405            batch: empty_batch,
2406            max_timestamp: 0,
2407            min_timestamp: 0,
2408            sequence: 0,
2409            timestamp_index: 0,
2410            raw_data: None,
2411        };
2412
2413        let result =
2414            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2415        assert!(result.is_none());
2416    }
2417
2418    #[test]
2419    fn test_convert_bulk_part_dense_with_pk_columns() {
2420        let metadata = metadata_for_test();
2421        let primary_key_codec = build_primary_key_codec(&metadata);
2422
2423        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2424            "key1", "key2", "key1",
2425        ]));
2426        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2427        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2428        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2429        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2430
2431        let input_schema = Arc::new(Schema::new(vec![
2432            Field::new("k0", ArrowDataType::Utf8, false),
2433            Field::new("k1", ArrowDataType::UInt32, false),
2434            Field::new("v0", ArrowDataType::Int64, true),
2435            Field::new("v1", ArrowDataType::Float64, true),
2436            Field::new(
2437                "ts",
2438                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2439                false,
2440            ),
2441        ]));
2442
2443        let input_batch = RecordBatch::try_new(
2444            input_schema,
2445            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2446        )
2447        .unwrap();
2448
2449        let part = BulkPart {
2450            batch: input_batch,
2451            max_timestamp: 2000,
2452            min_timestamp: 1000,
2453            sequence: 5,
2454            timestamp_index: 4,
2455            raw_data: None,
2456        };
2457
2458        let output_schema = to_flat_sst_arrow_schema(
2459            &metadata,
2460            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2461        );
2462
2463        let result = convert_bulk_part(
2464            part,
2465            &metadata,
2466            primary_key_codec,
2467            output_schema,
2468            true, // store primary key columns
2469        )
2470        .unwrap();
2471
2472        let converted = result.unwrap();
2473
2474        assert_eq!(converted.num_rows(), 3);
2475        assert_eq!(converted.max_timestamp, 2000);
2476        assert_eq!(converted.min_timestamp, 1000);
2477        assert_eq!(converted.sequence, 5);
2478
2479        let schema = converted.batch.schema();
2480        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2481        assert_eq!(
2482            field_names,
2483            vec![
2484                "k0",
2485                "k1",
2486                "v0",
2487                "v1",
2488                "ts",
2489                "__primary_key",
2490                "__sequence",
2491                "__op_type"
2492            ]
2493        );
2494
2495        let k0_col = converted.batch.column_by_name("k0").unwrap();
2496        assert!(matches!(
2497            k0_col.data_type(),
2498            ArrowDataType::Dictionary(_, _)
2499        ));
2500
2501        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2502        let dict_array = pk_col
2503            .as_any()
2504            .downcast_ref::<DictionaryArray<UInt32Type>>()
2505            .unwrap();
2506        let keys = dict_array.keys();
2507
2508        assert_eq!(keys.len(), 3);
2509    }
2510
2511    #[test]
2512    fn test_convert_bulk_part_dense_without_pk_columns() {
2513        let metadata = metadata_for_test();
2514        let primary_key_codec = build_primary_key_codec(&metadata);
2515
2516        // Create input batch with primary key columns (k0, k1)
2517        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2518        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2519        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2520        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2521        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2522
2523        let input_schema = Arc::new(Schema::new(vec![
2524            Field::new("k0", ArrowDataType::Utf8, false),
2525            Field::new("k1", ArrowDataType::UInt32, false),
2526            Field::new("v0", ArrowDataType::Int64, true),
2527            Field::new("v1", ArrowDataType::Float64, true),
2528            Field::new(
2529                "ts",
2530                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2531                false,
2532            ),
2533        ]));
2534
2535        let input_batch = RecordBatch::try_new(
2536            input_schema,
2537            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2538        )
2539        .unwrap();
2540
2541        let part = BulkPart {
2542            batch: input_batch,
2543            max_timestamp: 2000,
2544            min_timestamp: 1000,
2545            sequence: 3,
2546            timestamp_index: 4,
2547            raw_data: None,
2548        };
2549
2550        let output_schema = to_flat_sst_arrow_schema(
2551            &metadata,
2552            &FlatSchemaOptions {
2553                raw_pk_columns: false,
2554                string_pk_use_dict: true,
2555            },
2556        );
2557
2558        let result = convert_bulk_part(
2559            part,
2560            &metadata,
2561            primary_key_codec,
2562            output_schema,
2563            false, // don't store primary key columns
2564        )
2565        .unwrap();
2566
2567        let converted = result.unwrap();
2568
2569        assert_eq!(converted.num_rows(), 2);
2570        assert_eq!(converted.max_timestamp, 2000);
2571        assert_eq!(converted.min_timestamp, 1000);
2572        assert_eq!(converted.sequence, 3);
2573
2574        // Verify schema does NOT include individual primary key columns
2575        let schema = converted.batch.schema();
2576        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2577        assert_eq!(
2578            field_names,
2579            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2580        );
2581
2582        // Verify __primary_key column is present and is a dictionary
2583        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2584        assert!(matches!(
2585            pk_col.data_type(),
2586            ArrowDataType::Dictionary(_, _)
2587        ));
2588    }
2589
2590    #[test]
2591    fn test_convert_bulk_part_sparse_encoding() {
2592        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2593        builder
2594            .push_column_metadata(ColumnMetadata {
2595                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2596                semantic_type: SemanticType::Tag,
2597                column_id: 0,
2598            })
2599            .push_column_metadata(ColumnMetadata {
2600                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2601                semantic_type: SemanticType::Tag,
2602                column_id: 1,
2603            })
2604            .push_column_metadata(ColumnMetadata {
2605                column_schema: ColumnSchema::new(
2606                    "ts",
2607                    ConcreteDataType::timestamp_millisecond_datatype(),
2608                    false,
2609                ),
2610                semantic_type: SemanticType::Timestamp,
2611                column_id: 2,
2612            })
2613            .push_column_metadata(ColumnMetadata {
2614                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2615                semantic_type: SemanticType::Field,
2616                column_id: 3,
2617            })
2618            .push_column_metadata(ColumnMetadata {
2619                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2620                semantic_type: SemanticType::Field,
2621                column_id: 4,
2622            })
2623            .primary_key(vec![0, 1])
2624            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2625        let metadata = Arc::new(builder.build().unwrap());
2626
2627        let primary_key_codec = build_primary_key_codec(&metadata);
2628
2629        // Create input batch with __primary_key column (sparse encoding)
2630        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2631            b"encoded_key_1".as_slice(),
2632            b"encoded_key_2".as_slice(),
2633        ]));
2634        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2635        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2636        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2637
2638        let input_schema = Arc::new(Schema::new(vec![
2639            Field::new("__primary_key", ArrowDataType::Binary, false),
2640            Field::new("v0", ArrowDataType::Int64, true),
2641            Field::new("v1", ArrowDataType::Float64, true),
2642            Field::new(
2643                "ts",
2644                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2645                false,
2646            ),
2647        ]));
2648
2649        let input_batch =
2650            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2651                .unwrap();
2652
2653        let part = BulkPart {
2654            batch: input_batch,
2655            max_timestamp: 2000,
2656            min_timestamp: 1000,
2657            sequence: 7,
2658            timestamp_index: 3,
2659            raw_data: None,
2660        };
2661
2662        let output_schema = to_flat_sst_arrow_schema(
2663            &metadata,
2664            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2665        );
2666
2667        let result = convert_bulk_part(
2668            part,
2669            &metadata,
2670            primary_key_codec,
2671            output_schema,
2672            true, // store_primary_key_columns (ignored for sparse)
2673        )
2674        .unwrap();
2675
2676        let converted = result.unwrap();
2677
2678        assert_eq!(converted.num_rows(), 2);
2679        assert_eq!(converted.max_timestamp, 2000);
2680        assert_eq!(converted.min_timestamp, 1000);
2681        assert_eq!(converted.sequence, 7);
2682
2683        // Verify schema does NOT include individual primary key columns (sparse encoding)
2684        let schema = converted.batch.schema();
2685        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2686        assert_eq!(
2687            field_names,
2688            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2689        );
2690
2691        // Verify __primary_key is dictionary encoded
2692        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2693        assert!(matches!(
2694            pk_col.data_type(),
2695            ArrowDataType::Dictionary(_, _)
2696        ));
2697    }
2698
2699    #[test]
2700    fn test_convert_bulk_part_sorting_with_multiple_series() {
2701        let metadata = metadata_for_test();
2702        let primary_key_codec = build_primary_key_codec(&metadata);
2703
2704        // Create unsorted batch with multiple series and timestamps
2705        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2706            "series_b", "series_a", "series_b", "series_a",
2707        ]));
2708        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2709        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2710        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2711        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2712            2000, 1000, 4000, 3000,
2713        ]));
2714
2715        let input_schema = Arc::new(Schema::new(vec![
2716            Field::new("k0", ArrowDataType::Utf8, false),
2717            Field::new("k1", ArrowDataType::UInt32, false),
2718            Field::new("v0", ArrowDataType::Int64, true),
2719            Field::new("v1", ArrowDataType::Float64, true),
2720            Field::new(
2721                "ts",
2722                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2723                false,
2724            ),
2725        ]));
2726
2727        let input_batch = RecordBatch::try_new(
2728            input_schema,
2729            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2730        )
2731        .unwrap();
2732
2733        let part = BulkPart {
2734            batch: input_batch,
2735            max_timestamp: 4000,
2736            min_timestamp: 1000,
2737            sequence: 10,
2738            timestamp_index: 4,
2739            raw_data: None,
2740        };
2741
2742        let output_schema = to_flat_sst_arrow_schema(
2743            &metadata,
2744            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2745        );
2746
2747        let result =
2748            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2749
2750        let converted = result.unwrap();
2751
2752        assert_eq!(converted.num_rows(), 4);
2753
2754        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2755        let ts_col = converted.batch.column(converted.timestamp_index);
2756        let ts_array = ts_col
2757            .as_any()
2758            .downcast_ref::<TimestampMillisecondArray>()
2759            .unwrap();
2760
2761        // After sorting by (pk, ts), we should have:
2762        // series_a,1: ts=1000, 3000
2763        // series_b,2: ts=2000, 4000
2764        let timestamps: Vec<i64> = ts_array.values().to_vec();
2765        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2766    }
2767
2768    /// Helper to create a converted BulkPart (with __primary_key column) from MutationInputs.
2769    fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2770        let metadata = metadata_for_test();
2771        let kvs = inputs
2772            .iter()
2773            .map(|m| {
2774                build_key_values_with_ts_seq_values(
2775                    &metadata,
2776                    m.k0.to_string(),
2777                    m.k1,
2778                    m.timestamps.iter().copied(),
2779                    m.v1.iter().copied(),
2780                    m.sequence,
2781                )
2782            })
2783            .collect::<Vec<_>>();
2784        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2785        let primary_key_codec = build_primary_key_codec(&metadata);
2786        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2787        for kv in kvs {
2788            converter.append_key_values(&kv).unwrap();
2789        }
2790        converter.convert().unwrap()
2791    }
2792
2793    /// Helper to create a MultiBulkPart where each group becomes a separate batch.
2794    fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2795        let metadata = metadata_for_test();
2796        let mut all_batches = Vec::new();
2797        let mut min_ts = i64::MAX;
2798        let mut max_ts = i64::MIN;
2799        let mut max_seq = 0u64;
2800
2801        for inputs in groups {
2802            let part = build_converted_bulk_part(inputs);
2803            min_ts = min_ts.min(part.min_timestamp);
2804            max_ts = max_ts.max(part.max_timestamp);
2805            max_seq = max_seq.max(part.sequence);
2806            all_batches.push(part.batch);
2807        }
2808
2809        let multi = MultiBulkPart::new(
2810            all_batches,
2811            min_ts,
2812            max_ts,
2813            max_seq,
2814            groups.len(),
2815            &metadata,
2816        );
2817        (multi, metadata)
2818    }
2819
2820    #[test]
2821    fn test_multi_bulk_part_prune_batches() {
2822        // Three batches with distinct k0 ranges: ["a"], ["m"], ["z"].
2823        let (multi, metadata) = build_multi_bulk_part(&[
2824            &[MutationInput {
2825                k0: "a",
2826                k1: 0,
2827                timestamps: &[1, 2],
2828                v1: &[Some(1.0), Some(2.0)],
2829                sequence: 0,
2830            }],
2831            &[MutationInput {
2832                k0: "m",
2833                k1: 0,
2834                timestamps: &[3, 4],
2835                v1: &[Some(3.0), Some(4.0)],
2836                sequence: 1,
2837            }],
2838            &[MutationInput {
2839                k0: "z",
2840                k1: 0,
2841                timestamps: &[5, 6],
2842                v1: &[Some(5.0), Some(6.0)],
2843                sequence: 2,
2844            }],
2845        ]);
2846        assert_eq!(multi.num_rows(), 6);
2847        assert_eq!(multi.num_batches(), 3);
2848
2849        // k0 = "m" => only middle batch (2 rows).
2850        let context = Arc::new(
2851            BulkIterContext::new(
2852                metadata.clone(),
2853                None,
2854                Some(Predicate::new(vec![
2855                    datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2856                ])),
2857                false,
2858            )
2859            .unwrap(),
2860        );
2861        let reader = multi
2862            .read(context, None, None)
2863            .unwrap()
2864            .expect("should have results");
2865        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2866        assert_eq!(total_rows, 2);
2867
2868        // k0 = "nonexistent" => all pruned, returns None.
2869        let context = Arc::new(
2870            BulkIterContext::new(
2871                metadata.clone(),
2872                None,
2873                Some(Predicate::new(vec![
2874                    datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2875                ])),
2876                false,
2877            )
2878            .unwrap(),
2879        );
2880        assert!(multi.read(context, None, None).unwrap().is_none());
2881
2882        // No predicate => all 6 rows.
2883        let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2884        let reader = multi
2885            .read(context, None, None)
2886            .unwrap()
2887            .expect("should have results");
2888        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2889        assert_eq!(total_rows, 6);
2890    }
2891}