Skip to main content

mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33    Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions};
36use datatypes::arrow::datatypes::{
37    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_structured_json_field;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::types::JsonType;
43use datatypes::value::ValueRef;
44use datatypes::vectors::Helper;
45use datatypes::vectors::json::array::JsonArray;
46use mito_codec::key_values::{KeyValue, KeyValues};
47use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
48use parquet::arrow::ArrowWriter;
49use parquet::basic::{Compression, ZstdLevel};
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use smallvec::SmallVec;
53use snafu::{OptionExt, ResultExt};
54use store_api::codec::PrimaryKeyEncoding;
55use store_api::metadata::{RegionMetadata, RegionMetadataRef};
56use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
57use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
58
59use crate::error::{
60    self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
61    DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
62    InvalidRequestSnafu, NewRecordBatchSnafu, Result,
63};
64use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
68use crate::sst::SeriesEstimator;
69use crate::sst::index::IndexOutput;
70use crate::sst::parquet::flat_format::primary_key_column_index;
71use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73
74const INIT_DICT_VALUE_CAPACITY: usize = 8;
75
76/// A raw bulk part in the memtable.
77#[derive(Clone)]
78pub struct BulkPart {
79    pub batch: RecordBatch,
80    pub max_timestamp: i64,
81    pub min_timestamp: i64,
82    pub sequence: u64,
83    pub timestamp_index: usize,
84    pub raw_data: Option<ArrowIpc>,
85}
86
87impl TryFrom<BulkWalEntry> for BulkPart {
88    type Error = error::Error;
89
90    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
91        match value.body.expect("Entry payload should be present") {
92            Body::ArrowIpc(ipc) => {
93                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
94                    .context(error::ConvertBulkWalEntrySnafu)?;
95                let batch = decoder
96                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
97                    .context(error::ConvertBulkWalEntrySnafu)?;
98                Ok(Self {
99                    batch,
100                    max_timestamp: value.max_ts,
101                    min_timestamp: value.min_ts,
102                    sequence: value.sequence,
103                    timestamp_index: value.timestamp_index as usize,
104                    raw_data: Some(ipc),
105                })
106            }
107        }
108    }
109}
110
111impl TryFrom<&BulkPart> for BulkWalEntry {
112    type Error = error::Error;
113
114    fn try_from(value: &BulkPart) -> Result<Self> {
115        if let Some(ipc) = &value.raw_data {
116            Ok(BulkWalEntry {
117                sequence: value.sequence,
118                max_ts: value.max_timestamp,
119                min_ts: value.min_timestamp,
120                timestamp_index: value.timestamp_index as u32,
121                body: Some(Body::ArrowIpc(ipc.clone())),
122            })
123        } else {
124            let mut encoder = FlightEncoder::default();
125            let schema_bytes = encoder
126                .encode_schema(value.batch.schema().as_ref())
127                .data_header;
128            let [rb_data] = encoder
129                .encode(FlightMessage::RecordBatch(value.batch.clone()))
130                .try_into()
131                .map_err(|_| {
132                    error::UnsupportedOperationSnafu {
133                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
134                    }
135                    .build()
136                })?;
137            Ok(BulkWalEntry {
138                sequence: value.sequence,
139                max_ts: value.max_timestamp,
140                min_ts: value.min_timestamp,
141                timestamp_index: value.timestamp_index as u32,
142                body: Some(Body::ArrowIpc(ArrowIpc {
143                    schema: schema_bytes,
144                    data_header: rb_data.data_header,
145                    payload: rb_data.data_body,
146                })),
147            })
148        }
149    }
150}
151
152impl BulkPart {
153    pub(crate) fn estimated_size(&self) -> usize {
154        record_batch_estimated_size(&self.batch)
155    }
156
157    /// Returns the estimated series count in this BulkPart.
158    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
159    pub fn estimated_series_count(&self) -> usize {
160        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
161        let pk_column = self.batch.column(pk_column_idx);
162        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
163            dict_array.values().len()
164        } else {
165            0
166        }
167    }
168
169    /// Creates MemtableStats from this BulkPart.
170    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
171        let ts_type = region_metadata.time_index_type();
172        let min_ts = ts_type.create_timestamp(self.min_timestamp);
173        let max_ts = ts_type.create_timestamp(self.max_timestamp);
174
175        MemtableStats {
176            estimated_bytes: self.estimated_size(),
177            time_range: Some((min_ts, max_ts)),
178            num_rows: self.num_rows(),
179            num_ranges: 1,
180            max_sequence: self.sequence,
181            series_count: self.estimated_series_count(),
182        }
183    }
184
185    /// Fills missing columns in the BulkPart batch with default values.
186    ///
187    /// This function checks if the batch schema matches the region metadata schema,
188    /// and if there are missing columns, it fills them with default values (or null
189    /// for nullable columns).
190    ///
191    /// # Arguments
192    ///
193    /// * `region_metadata` - The region metadata containing the expected schema
194    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
195        // Builds a map of existing columns in the batch
196        let batch_schema = self.batch.schema();
197        let batch_columns: HashSet<_> = batch_schema
198            .fields()
199            .iter()
200            .map(|f| f.name().as_str())
201            .collect();
202
203        // Finds columns that need to be filled
204        let mut columns_to_fill = Vec::new();
205        for column_meta in &region_metadata.column_metadatas {
206            // TODO(yingwen): Returns error if it is impure default after we support filling
207            // bulk insert request in the frontend
208            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
209                columns_to_fill.push(column_meta);
210            }
211        }
212
213        if columns_to_fill.is_empty() {
214            return Ok(());
215        }
216
217        let num_rows = self.batch.num_rows();
218
219        let mut new_columns = Vec::new();
220        let mut new_fields = Vec::new();
221
222        // First, adds all existing columns
223        new_fields.extend(batch_schema.fields().iter().cloned());
224        new_columns.extend_from_slice(self.batch.columns());
225
226        let region_id = region_metadata.region_id;
227        // Then adds the missing columns with default values
228        for column_meta in columns_to_fill {
229            let default_vector = column_meta
230                .column_schema
231                .create_default_vector(num_rows)
232                .context(CreateDefaultSnafu {
233                    region_id,
234                    column: &column_meta.column_schema.name,
235                })?
236                .with_context(|| InvalidRequestSnafu {
237                    region_id,
238                    reason: format!(
239                        "column {} does not have default value",
240                        column_meta.column_schema.name
241                    ),
242                })?;
243            let arrow_array = default_vector.to_arrow_array();
244            column_meta.column_schema.data_type.as_arrow_type();
245
246            new_fields.push(Arc::new(Field::new(
247                column_meta.column_schema.name.clone(),
248                column_meta.column_schema.data_type.as_arrow_type(),
249                column_meta.column_schema.is_nullable(),
250            )));
251            new_columns.push(arrow_array);
252        }
253
254        // Create a new schema and batch with the filled columns
255        let new_schema = Arc::new(Schema::new(new_fields));
256        let new_batch =
257            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
258
259        // Update the batch
260        self.batch = new_batch;
261
262        Ok(())
263    }
264
265    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
266    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
267        let vectors = region_metadata
268            .schema
269            .column_schemas()
270            .iter()
271            .map(|col| match self.batch.column_by_name(&col.name) {
272                None => Ok(None),
273                Some(col) => Helper::try_into_vector(col).map(Some),
274            })
275            .collect::<datatypes::error::Result<Vec<_>>>()
276            .context(error::ComputeVectorSnafu)?;
277
278        let rows = (0..self.num_rows())
279            .map(|row_idx| {
280                let values = (0..self.batch.num_columns())
281                    .map(|col_idx| {
282                        if let Some(v) = &vectors[col_idx] {
283                            to_grpc_value(v.get(row_idx))
284                        } else {
285                            api::v1::Value { value_data: None }
286                        }
287                    })
288                    .collect::<Vec<_>>();
289                api::v1::Row { values }
290            })
291            .collect::<Vec<_>>();
292
293        let schema = region_metadata
294            .column_metadatas
295            .iter()
296            .map(|c| {
297                let data_type_wrapper =
298                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
299                Ok(api::v1::ColumnSchema {
300                    column_name: c.column_schema.name.clone(),
301                    datatype: data_type_wrapper.datatype() as i32,
302                    semantic_type: c.semantic_type as i32,
303                    ..Default::default()
304                })
305            })
306            .collect::<api::error::Result<Vec<_>>>()
307            .context(error::ConvertColumnDataTypeSnafu {
308                reason: "failed to convert region metadata to column schema",
309            })?;
310
311        let rows = api::v1::Rows { schema, rows };
312
313        Ok(Mutation {
314            op_type: OpType::Put as i32,
315            sequence: self.sequence,
316            rows: Some(rows),
317            write_hint: None,
318        })
319    }
320
321    pub fn timestamps(&self) -> &ArrayRef {
322        self.batch.column(self.timestamp_index)
323    }
324
325    pub fn num_rows(&self) -> usize {
326        self.batch.num_rows()
327    }
328}
329
330/// A collection of small unordered bulk parts.
331/// Used to batch small parts together before merging them into a sorted part.
332pub struct UnorderedPart {
333    /// Small bulk parts that haven't been sorted yet.
334    parts: Vec<BulkPart>,
335    /// Total number of rows across all parts.
336    total_rows: usize,
337    /// Minimum timestamp across all parts.
338    min_timestamp: i64,
339    /// Maximum timestamp across all parts.
340    max_timestamp: i64,
341    /// Maximum sequence number across all parts.
342    max_sequence: u64,
343    /// Row count threshold for accepting parts (default: 1024).
344    threshold: usize,
345    /// Row count threshold for compacting (default: 4096).
346    compact_threshold: usize,
347}
348
349impl Default for UnorderedPart {
350    fn default() -> Self {
351        Self::new()
352    }
353}
354
355impl UnorderedPart {
356    /// Creates a new empty UnorderedPart.
357    pub fn new() -> Self {
358        Self {
359            parts: Vec::new(),
360            total_rows: 0,
361            min_timestamp: i64::MAX,
362            max_timestamp: i64::MIN,
363            max_sequence: 0,
364            threshold: 1024,
365            compact_threshold: 4096,
366        }
367    }
368
369    /// Sets the threshold for accepting parts into unordered_part.
370    pub fn set_threshold(&mut self, threshold: usize) {
371        self.threshold = threshold;
372    }
373
374    /// Sets the threshold for compacting unordered_part.
375    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
376        self.compact_threshold = compact_threshold;
377    }
378
379    /// Returns the threshold for accepting parts.
380    pub fn threshold(&self) -> usize {
381        self.threshold
382    }
383
384    /// Returns the compact threshold.
385    pub fn compact_threshold(&self) -> usize {
386        self.compact_threshold
387    }
388
389    /// Returns true if this part should accept the given row count.
390    pub fn should_accept(&self, num_rows: usize) -> bool {
391        num_rows < self.threshold
392    }
393
394    /// Returns true if this part should be compacted.
395    pub fn should_compact(&self) -> bool {
396        self.total_rows >= self.compact_threshold
397    }
398
399    /// Adds a BulkPart to this unordered collection.
400    pub fn push(&mut self, part: BulkPart) {
401        self.total_rows += part.num_rows();
402        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
403        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
404        self.max_sequence = self.max_sequence.max(part.sequence);
405        self.parts.push(part);
406    }
407
408    /// Returns the total number of rows across all parts.
409    pub fn num_rows(&self) -> usize {
410        self.total_rows
411    }
412
413    /// Returns true if there are no parts.
414    pub fn is_empty(&self) -> bool {
415        self.parts.is_empty()
416    }
417
418    /// Returns the number of parts in this collection.
419    pub fn num_parts(&self) -> usize {
420        self.parts.len()
421    }
422
423    /// Concatenates and sorts all parts into a single RecordBatch.
424    /// Returns None if the collection is empty.
425    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
426        if self.parts.is_empty() {
427            return Ok(None);
428        }
429
430        if self.parts.len() == 1 {
431            // If there's only one part, return its batch directly
432            return Ok(Some(self.parts[0].batch.clone()));
433        }
434
435        // Get the schema from the first part
436        let schema = self.parts[0].batch.schema();
437        let concatenated = if schema.fields().iter().any(is_structured_json_field) {
438            let (schema, batches) = align_parts(&self.parts)?;
439            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
440        } else {
441            arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
442                .context(ComputeArrowSnafu)?
443        };
444
445        // Sort the concatenated batch
446        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
447
448        Ok(Some(sorted_batch))
449    }
450
451    /// Converts all parts into a single sorted BulkPart.
452    /// Returns None if the collection is empty.
453    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
454        let Some(sorted_batch) = self.concat_and_sort()? else {
455            return Ok(None);
456        };
457
458        let timestamp_index = self.parts[0].timestamp_index;
459
460        Ok(Some(BulkPart {
461            batch: sorted_batch,
462            max_timestamp: self.max_timestamp,
463            min_timestamp: self.min_timestamp,
464            sequence: self.max_sequence,
465            timestamp_index,
466            raw_data: None,
467        }))
468    }
469
470    /// Clears all parts from this collection.
471    pub fn clear(&mut self) {
472        self.parts.clear();
473        self.total_rows = 0;
474        self.min_timestamp = i64::MAX;
475        self.max_timestamp = i64::MIN;
476        self.max_sequence = 0;
477    }
478}
479
480/// Align the JSON columns in [BulkPart]s, to unified Arrow arrays. So that we can compute (concat,
481/// sort, etc.) on them.
482fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
483    debug_assert!(
484        !parts.is_empty()
485            && parts
486                .windows(2)
487                .all(|w| w[0].batch.schema_ref().fields().len()
488                    == w[1].batch.schema_ref().fields().len())
489    );
490
491    let first = &parts[0];
492    let base_schema = first.batch.schema_ref();
493    let rest = &parts[1..];
494
495    let mut merged_types = HashMap::new();
496    let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
497    for (i, field) in base_schema.fields().iter().enumerate() {
498        if is_structured_json_field(field) {
499            let mut merged = JsonType::from(field.data_type());
500            rest.iter()
501                .try_fold(&mut merged, |acc, x| {
502                    acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
503                    Ok(acc)
504                })
505                .context(DataTypeMismatchSnafu)?;
506            merged_types.insert(i, merged.as_arrow_type());
507
508            aligned_fields.push(Arc::new(
509                Field::new(
510                    field.name().clone(),
511                    merged.as_arrow_type(),
512                    field.is_nullable(),
513                )
514                .with_metadata(field.metadata().clone()),
515            ));
516        } else {
517            aligned_fields.push(field.clone())
518        };
519    }
520    let aligned_schema = Arc::new(Schema::new_with_metadata(
521        aligned_fields,
522        base_schema.metadata().clone(),
523    ));
524
525    let mut aligned_batches = Vec::with_capacity(parts.len());
526    for part in parts {
527        let mut columns = Vec::with_capacity(part.batch.num_columns());
528        for (i, column) in part.batch.columns().iter().enumerate() {
529            if let Some(expect) = merged_types.get(&i) {
530                columns.push(
531                    JsonArray::from(column)
532                        .try_align(expect)
533                        .context(ConvertValueSnafu)?,
534                );
535            } else {
536                columns.push(column.clone());
537            }
538        }
539        aligned_batches.push(
540            RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
541        );
542    }
543
544    Ok((aligned_schema, aligned_batches))
545}
546
547/// More accurate estimation of the size of a record batch.
548pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
549    batch
550        .columns()
551        .iter()
552        // If can not get slice memory size, assume 0 here.
553        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
554        .sum()
555}
556
557/// Primary key column builder for handling strings specially.
558enum PrimaryKeyColumnBuilder {
559    /// String dictionary builder for string types.
560    StringDict(StringDictionaryBuilder<UInt32Type>),
561    /// Generic mutable vector for other types.
562    Vector(Box<dyn MutableVector>),
563}
564
565impl PrimaryKeyColumnBuilder {
566    /// Appends a value to the builder.
567    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
568        match self {
569            PrimaryKeyColumnBuilder::StringDict(builder) => {
570                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
571                    // We know the value is a string.
572                    builder.append_value(s);
573                } else {
574                    builder.append_null();
575                }
576            }
577            PrimaryKeyColumnBuilder::Vector(builder) => {
578                builder.push_value_ref(&value);
579            }
580        }
581        Ok(())
582    }
583
584    /// Converts the builder to an ArrayRef.
585    fn into_arrow_array(self) -> ArrayRef {
586        match self {
587            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
588            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
589        }
590    }
591}
592
593/// Converter that converts structs into [BulkPart].
594pub struct BulkPartConverter {
595    /// Schema of the converted batch.
596    schema: SchemaRef,
597    /// Primary key codec for encoding keys
598    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
599    /// Buffer for encoding primary key.
600    key_buf: Vec<u8>,
601    /// Primary key array builder.
602    key_array_builder: PrimaryKeyArrayBuilder,
603    /// Builders for non-primary key columns.
604    value_builder: ValueBuilder,
605    /// Builders for individual primary key columns.
606    /// The order of builders is the same as the order of primary key columns in the region metadata.
607    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
608
609    /// Max timestamp value.
610    max_ts: i64,
611    /// Min timestamp value.
612    min_ts: i64,
613    /// Max sequence number.
614    max_sequence: SequenceNumber,
615}
616
617impl BulkPartConverter {
618    /// Creates a new converter.
619    ///
620    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
621    /// stores primary key columns in arrays additionally.
622    pub fn new(
623        region_metadata: &RegionMetadataRef,
624        schema: SchemaRef,
625        capacity: usize,
626        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
627        store_primary_key_columns: bool,
628    ) -> Self {
629        debug_assert_eq!(
630            region_metadata.primary_key_encoding,
631            primary_key_codec.encoding()
632        );
633
634        let primary_key_column_builders = if store_primary_key_columns
635            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
636        {
637            new_primary_key_column_builders(region_metadata, capacity)
638        } else {
639            Vec::new()
640        };
641
642        Self {
643            schema,
644            primary_key_codec,
645            key_buf: Vec::new(),
646            key_array_builder: PrimaryKeyArrayBuilder::new(),
647            value_builder: ValueBuilder::new(region_metadata, capacity),
648            primary_key_column_builders,
649            min_ts: i64::MAX,
650            max_ts: i64::MIN,
651            max_sequence: SequenceNumber::MIN,
652        }
653    }
654
655    /// Appends a [KeyValues] into the converter.
656    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
657        for kv in key_values.iter() {
658            self.append_key_value(&kv)?;
659        }
660
661        Ok(())
662    }
663
664    /// Appends a [KeyValue] to builders.
665    ///
666    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
667    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
668        // Handles primary key based on encoding type
669        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
670            // For sparse encoding, the primary key is already encoded in the KeyValue
671            // Gets the first (and only) primary key value which contains the encoded key
672            let mut primary_keys = kv.primary_keys();
673            if let Some(encoded) = primary_keys
674                .next()
675                .context(ColumnNotFoundSnafu {
676                    column: PRIMARY_KEY_COLUMN_NAME,
677                })?
678                .try_into_binary()
679                .context(DataTypeMismatchSnafu)?
680            {
681                self.key_array_builder
682                    .append(encoded)
683                    .context(ComputeArrowSnafu)?;
684            } else {
685                self.key_array_builder
686                    .append("")
687                    .context(ComputeArrowSnafu)?;
688            }
689        } else {
690            // For dense encoding, we need to encode the primary key columns
691            self.key_buf.clear();
692            self.primary_key_codec
693                .encode_key_value(kv, &mut self.key_buf)
694                .context(EncodeSnafu)?;
695            self.key_array_builder
696                .append(&self.key_buf)
697                .context(ComputeArrowSnafu)?;
698        };
699
700        // If storing primary key columns, append values to individual builders
701        if !self.primary_key_column_builders.is_empty() {
702            for (builder, pk_value) in self
703                .primary_key_column_builders
704                .iter_mut()
705                .zip(kv.primary_keys())
706            {
707                builder.push_value_ref(pk_value)?;
708            }
709        }
710
711        // Pushes other columns.
712        self.value_builder.push(
713            kv.timestamp(),
714            kv.sequence(),
715            kv.op_type() as u8,
716            kv.fields(),
717        );
718
719        // Updates statistics
720        // Safety: timestamp of kv must be both present and a valid timestamp value.
721        let ts = kv
722            .timestamp()
723            .try_into_timestamp()
724            .unwrap()
725            .unwrap()
726            .value();
727        self.min_ts = self.min_ts.min(ts);
728        self.max_ts = self.max_ts.max(ts);
729        self.max_sequence = self.max_sequence.max(kv.sequence());
730
731        Ok(())
732    }
733
734    /// Converts buffered content into a [BulkPart].
735    ///
736    /// It sorts the record batch by (primary key, timestamp, sequence desc).
737    pub fn convert(mut self) -> Result<BulkPart> {
738        let values = Values::from(self.value_builder);
739        let mut columns =
740            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
741
742        // Build primary key column arrays if enabled.
743        for builder in self.primary_key_column_builders {
744            columns.push(builder.into_arrow_array());
745        }
746        // Then fields columns.
747        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
748        // Time index.
749        let timestamp_index = columns.len();
750        columns.push(values.timestamp.to_arrow_array());
751        // Primary key.
752        let pk_array = self.key_array_builder.finish();
753        columns.push(Arc::new(pk_array));
754        // Sequence and op type.
755        columns.push(values.sequence.to_arrow_array());
756        columns.push(values.op_type.to_arrow_array());
757
758        // The actual datatype of JSON array is data oriented, not to be derived from the Region
759        // metadata, which is static. So here we have to align the schema.
760        let schema = align_schema_with_json_array(self.schema, &columns);
761        let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
762        // Sorts the record batch.
763        let batch = sort_primary_key_record_batch(&batch)?;
764
765        Ok(BulkPart {
766            batch,
767            max_timestamp: self.max_ts,
768            min_timestamp: self.min_ts,
769            sequence: self.max_sequence,
770            timestamp_index,
771            raw_data: None,
772        })
773    }
774}
775
776fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
777    if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
778        return schema;
779    }
780
781    let mut fields = Vec::with_capacity(schema.fields().len());
782    for (field, array) in schema.fields().iter().zip(columns) {
783        if !is_structured_json_field(field) {
784            fields.push(field.clone());
785            continue;
786        }
787
788        let mut field = field.as_ref().clone();
789        field.set_data_type(array.data_type().clone());
790        fields.push(Arc::new(field));
791    }
792
793    Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
794}
795
796fn new_primary_key_column_builders(
797    metadata: &RegionMetadata,
798    capacity: usize,
799) -> Vec<PrimaryKeyColumnBuilder> {
800    metadata
801        .primary_key_columns()
802        .map(|col| {
803            if col.column_schema.data_type.is_string() {
804                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
805                    capacity,
806                    INIT_DICT_VALUE_CAPACITY,
807                    capacity,
808                ))
809            } else {
810                PrimaryKeyColumnBuilder::Vector(
811                    col.column_schema.data_type.create_mutable_vector(capacity),
812                )
813            }
814        })
815        .collect()
816}
817
818/// Sorts the record batch with primary key format.
819pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
820    let total_columns = batch.num_columns();
821    let sort_columns = vec![
822        // Primary key column (ascending)
823        SortColumn {
824            values: batch.column(total_columns - 3).clone(),
825            options: Some(SortOptions {
826                descending: false,
827                nulls_first: true,
828            }),
829        },
830        // Time index column (ascending)
831        SortColumn {
832            values: batch.column(total_columns - 4).clone(),
833            options: Some(SortOptions {
834                descending: false,
835                nulls_first: true,
836            }),
837        },
838        // Sequence column (descending)
839        SortColumn {
840            values: batch.column(total_columns - 2).clone(),
841            options: Some(SortOptions {
842                descending: true,
843                nulls_first: true,
844            }),
845        },
846    ];
847
848    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
849        .context(ComputeArrowSnafu)?;
850
851    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
852}
853
854/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
855/// with the same format as produced by [BulkPartConverter].
856///
857/// This function takes a `BulkPart` where:
858/// - For dense encoding: Primary key columns may be stored as individual columns
859/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
860/// - The batch may not be sorted
861///
862/// And produces a `BulkPart` where:
863/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
864/// - An encoded `__primary_key` dictionary column is present
865/// - The batch is sorted by (primary_key, timestamp, sequence desc)
866///
867/// # Arguments
868///
869/// * `part` - The input `BulkPart` to convert
870/// * `region_metadata` - Region metadata containing schema information
871/// * `primary_key_codec` - Codec for encoding primary keys
872/// * `schema` - Target schema for the output batch
873/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
874///
875/// # Returns
876///
877/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
878/// encoded primary keys and sorted data.
879pub fn convert_bulk_part(
880    part: BulkPart,
881    region_metadata: &RegionMetadataRef,
882    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
883    schema: SchemaRef,
884    store_primary_key_columns: bool,
885) -> Result<Option<BulkPart>> {
886    if part.num_rows() == 0 {
887        return Ok(None);
888    }
889
890    let num_rows = part.num_rows();
891    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
892
893    // Builds a column name-to-index map for efficient lookups
894    let input_schema = part.batch.schema();
895    let column_indices: HashMap<&str, usize> = input_schema
896        .fields()
897        .iter()
898        .enumerate()
899        .map(|(idx, field)| (field.name().as_str(), idx))
900        .collect();
901
902    // Determines the structure of the input batch by looking up columns by name
903    let mut output_columns = Vec::new();
904
905    // Extracts primary key columns if we need to encode them (dense encoding)
906    let pk_array = if is_sparse {
907        // For sparse encoding, the input should already have the __primary_key column
908        // We need to find it in the input batch
909        None
910    } else {
911        // For dense encoding, extract and encode primary key columns by name
912        let pk_vectors: Result<Vec<_>> = region_metadata
913            .primary_key_columns()
914            .map(|col_meta| {
915                let col_idx = column_indices
916                    .get(col_meta.column_schema.name.as_str())
917                    .context(ColumnNotFoundSnafu {
918                        column: &col_meta.column_schema.name,
919                    })?;
920                let col = part.batch.column(*col_idx);
921                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
922            })
923            .collect();
924        let pk_vectors = pk_vectors?;
925
926        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
927        let mut encode_buf = Vec::new();
928
929        for row_idx in 0..num_rows {
930            encode_buf.clear();
931
932            // Collects primary key values with column IDs for this row
933            let pk_values_with_ids: Vec<_> = region_metadata
934                .primary_key
935                .iter()
936                .zip(pk_vectors.iter())
937                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
938                .collect();
939
940            // Encodes the primary key
941            primary_key_codec
942                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
943                .context(EncodeSnafu)?;
944
945            key_array_builder
946                .append(&encode_buf)
947                .context(ComputeArrowSnafu)?;
948        }
949
950        Some(key_array_builder.finish())
951    };
952
953    // Adds primary key columns if storing them (only for dense encoding)
954    if store_primary_key_columns && !is_sparse {
955        for col_meta in region_metadata.primary_key_columns() {
956            let col_idx = column_indices
957                .get(col_meta.column_schema.name.as_str())
958                .context(ColumnNotFoundSnafu {
959                    column: &col_meta.column_schema.name,
960                })?;
961            let col = part.batch.column(*col_idx);
962
963            // Converts to dictionary if needed for string types
964            let col = if col_meta.column_schema.data_type.is_string() {
965                let target_type = ArrowDataType::Dictionary(
966                    Box::new(ArrowDataType::UInt32),
967                    Box::new(ArrowDataType::Utf8),
968                );
969                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
970            } else {
971                col.clone()
972            };
973            output_columns.push(col);
974        }
975    }
976
977    // Adds field columns
978    for col_meta in region_metadata.field_columns() {
979        let col_idx = column_indices
980            .get(col_meta.column_schema.name.as_str())
981            .context(ColumnNotFoundSnafu {
982                column: &col_meta.column_schema.name,
983            })?;
984        output_columns.push(part.batch.column(*col_idx).clone());
985    }
986
987    // Adds timestamp column
988    let new_timestamp_index = output_columns.len();
989    let ts_col_idx = column_indices
990        .get(
991            region_metadata
992                .time_index_column()
993                .column_schema
994                .name
995                .as_str(),
996        )
997        .context(ColumnNotFoundSnafu {
998            column: &region_metadata.time_index_column().column_schema.name,
999        })?;
1000    output_columns.push(part.batch.column(*ts_col_idx).clone());
1001
1002    // Adds encoded primary key dictionary column
1003    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
1004        Arc::new(pk_dict_array) as ArrayRef
1005    } else {
1006        let pk_col_idx =
1007            column_indices
1008                .get(PRIMARY_KEY_COLUMN_NAME)
1009                .context(ColumnNotFoundSnafu {
1010                    column: PRIMARY_KEY_COLUMN_NAME,
1011                })?;
1012        let col = part.batch.column(*pk_col_idx);
1013
1014        // Casts to dictionary type if needed
1015        let target_type = ArrowDataType::Dictionary(
1016            Box::new(ArrowDataType::UInt32),
1017            Box::new(ArrowDataType::Binary),
1018        );
1019        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
1020    };
1021    output_columns.push(pk_dictionary);
1022
1023    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
1024    output_columns.push(Arc::new(sequence_array) as ArrayRef);
1025
1026    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
1027    output_columns.push(Arc::new(op_type_array) as ArrayRef);
1028
1029    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
1030
1031    // Sorts the batch by (primary_key, timestamp, sequence desc)
1032    let sorted_batch = sort_primary_key_record_batch(&batch)?;
1033
1034    Ok(Some(BulkPart {
1035        batch: sorted_batch,
1036        max_timestamp: part.max_timestamp,
1037        min_timestamp: part.min_timestamp,
1038        sequence: part.sequence,
1039        timestamp_index: new_timestamp_index,
1040        raw_data: None,
1041    }))
1042}
1043
1044#[derive(Debug, Clone)]
1045pub struct EncodedBulkPart {
1046    data: Bytes,
1047    metadata: BulkPartMeta,
1048}
1049
1050impl EncodedBulkPart {
1051    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
1052        Self { data, metadata }
1053    }
1054
1055    pub fn metadata(&self) -> &BulkPartMeta {
1056        &self.metadata
1057    }
1058
1059    /// Returns the size of the encoded data in bytes
1060    pub(crate) fn size_bytes(&self) -> usize {
1061        self.data.len()
1062    }
1063
1064    /// Returns the encoded data.
1065    pub fn data(&self) -> &Bytes {
1066        &self.data
1067    }
1068
1069    /// Creates MemtableStats from this EncodedBulkPart.
1070    pub fn to_memtable_stats(&self) -> MemtableStats {
1071        let meta = &self.metadata;
1072        let ts_type = meta.region_metadata.time_index_type();
1073        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1074        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1075
1076        MemtableStats {
1077            estimated_bytes: self.size_bytes(),
1078            time_range: Some((min_ts, max_ts)),
1079            num_rows: meta.num_rows,
1080            num_ranges: 1,
1081            max_sequence: meta.max_sequence,
1082            series_count: meta.num_series as usize,
1083        }
1084    }
1085
1086    /// Converts this `EncodedBulkPart` to `SstInfo`.
1087    ///
1088    /// # Arguments
1089    /// * `file_id` - The SST file ID to assign to this part
1090    ///
1091    /// # Returns
1092    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1093    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1094        let unit = self.metadata.region_metadata.time_index_type().unit();
1095        let max_row_group_uncompressed_size: u64 = self
1096            .metadata
1097            .parquet_metadata
1098            .row_groups()
1099            .iter()
1100            .map(|rg| {
1101                rg.columns()
1102                    .iter()
1103                    .map(|c| c.uncompressed_size() as u64)
1104                    .sum::<u64>()
1105            })
1106            .max()
1107            .unwrap_or(0);
1108        SstInfo {
1109            file_id,
1110            time_range: (
1111                Timestamp::new(self.metadata.min_timestamp, unit),
1112                Timestamp::new(self.metadata.max_timestamp, unit),
1113            ),
1114            file_size: self.data.len() as u64,
1115            max_row_group_uncompressed_size,
1116            num_rows: self.metadata.num_rows,
1117            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1118            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1119            index_metadata: IndexOutput::default(),
1120            num_series: self.metadata.num_series,
1121        }
1122    }
1123
1124    pub(crate) fn read(
1125        &self,
1126        context: BulkIterContextRef,
1127        sequence: Option<SequenceRange>,
1128        mem_scan_metrics: Option<MemScanMetrics>,
1129    ) -> Result<Option<BoxedRecordBatchIterator>> {
1130        // Compute skip_fields for row group pruning from the configured pre-filter mode.
1131        let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1132
1133        // use predicate to find row groups to read.
1134        let row_groups_to_read =
1135            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1136
1137        if row_groups_to_read.is_empty() {
1138            // All row groups are filtered.
1139            return Ok(None);
1140        }
1141
1142        let iter = EncodedBulkPartIter::try_new(
1143            self,
1144            context,
1145            row_groups_to_read,
1146            sequence,
1147            mem_scan_metrics,
1148        )?;
1149        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1150    }
1151}
1152
1153// TODO(yingwen): max_sequence
1154#[derive(Debug, Clone)]
1155pub struct BulkPartMeta {
1156    /// Total rows in part.
1157    pub num_rows: usize,
1158    /// Max timestamp in part.
1159    pub max_timestamp: i64,
1160    /// Min timestamp in part.
1161    pub min_timestamp: i64,
1162    /// Part file metadata.
1163    pub parquet_metadata: Arc<ParquetMetaData>,
1164    /// Part region schema.
1165    pub region_metadata: RegionMetadataRef,
1166    /// Number of series.
1167    pub num_series: u64,
1168    /// Maximum sequence number in part.
1169    pub max_sequence: u64,
1170}
1171
1172/// Metrics for encoding a part.
1173#[derive(Default, Debug)]
1174pub struct BulkPartEncodeMetrics {
1175    /// Cost of iterating over the data.
1176    pub iter_cost: Duration,
1177    /// Cost of writing the data.
1178    pub write_cost: Duration,
1179    /// Size of data before encoding.
1180    pub raw_size: usize,
1181    /// Size of data after encoding.
1182    pub encoded_size: usize,
1183    /// Number of rows in part.
1184    pub num_rows: usize,
1185}
1186
1187pub struct BulkPartEncoder {
1188    metadata: RegionMetadataRef,
1189    writer_props: Option<WriterProperties>,
1190}
1191
1192impl BulkPartEncoder {
1193    pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1194        // TODO(yingwen): Skip arrow schema if needed.
1195        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1196        let key_value_meta =
1197            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1198
1199        // TODO(yingwen): Do we need compression?
1200        let writer_props = Some(
1201            WriterProperties::builder()
1202                .set_key_value_metadata(Some(vec![key_value_meta]))
1203                .set_write_batch_size(row_group_size)
1204                .set_max_row_group_row_count(Some(row_group_size))
1205                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1206                .set_column_index_truncate_length(None)
1207                .set_statistics_truncate_length(None)
1208                .build(),
1209        );
1210
1211        Ok(Self {
1212            metadata,
1213            writer_props,
1214        })
1215    }
1216}
1217
1218impl BulkPartEncoder {
1219    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1220    pub fn encode_record_batch_iter(
1221        &self,
1222        iter: BoxedRecordBatchIterator,
1223        arrow_schema: SchemaRef,
1224        min_timestamp: i64,
1225        max_timestamp: i64,
1226        max_sequence: u64,
1227        metrics: &mut BulkPartEncodeMetrics,
1228    ) -> Result<Option<EncodedBulkPart>> {
1229        let mut buf = Vec::with_capacity(4096);
1230        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1231            .context(EncodeMemtableSnafu)?;
1232        let mut total_rows = 0;
1233        let mut series_estimator = SeriesEstimator::default();
1234
1235        // Process each batch from the iterator
1236        let mut iter_start = Instant::now();
1237        for batch_result in iter {
1238            metrics.iter_cost += iter_start.elapsed();
1239            let batch = batch_result?;
1240            if batch.num_rows() == 0 {
1241                continue;
1242            }
1243
1244            series_estimator.update_flat(&batch);
1245            metrics.raw_size += record_batch_estimated_size(&batch);
1246            let write_start = Instant::now();
1247            writer.write(&batch).context(EncodeMemtableSnafu)?;
1248            metrics.write_cost += write_start.elapsed();
1249            total_rows += batch.num_rows();
1250            iter_start = Instant::now();
1251        }
1252        metrics.iter_cost += iter_start.elapsed();
1253
1254        if total_rows == 0 {
1255            return Ok(None);
1256        }
1257
1258        let close_start = Instant::now();
1259        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1260        metrics.write_cost += close_start.elapsed();
1261        metrics.encoded_size += buf.len();
1262        metrics.num_rows += total_rows;
1263
1264        let buf = Bytes::from(buf);
1265        let parquet_metadata = Arc::new(file_metadata);
1266        let num_series = series_estimator.finish();
1267
1268        Ok(Some(EncodedBulkPart {
1269            data: buf,
1270            metadata: BulkPartMeta {
1271                num_rows: total_rows,
1272                max_timestamp,
1273                min_timestamp,
1274                parquet_metadata,
1275                region_metadata: self.metadata.clone(),
1276                num_series,
1277                max_sequence,
1278            },
1279        }))
1280    }
1281
1282    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1283    pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1284        if part.batch.num_rows() == 0 {
1285            return Ok(None);
1286        }
1287
1288        let mut buf = Vec::with_capacity(4096);
1289        let arrow_schema = part.batch.schema();
1290
1291        let file_metadata = {
1292            let mut writer =
1293                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1294                    .context(EncodeMemtableSnafu)?;
1295            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1296            writer.finish().context(EncodeMemtableSnafu)?
1297        };
1298
1299        let buf = Bytes::from(buf);
1300        let parquet_metadata = Arc::new(file_metadata);
1301
1302        Ok(Some(EncodedBulkPart {
1303            data: buf,
1304            metadata: BulkPartMeta {
1305                num_rows: part.batch.num_rows(),
1306                max_timestamp: part.max_timestamp,
1307                min_timestamp: part.min_timestamp,
1308                parquet_metadata,
1309                region_metadata: self.metadata.clone(),
1310                num_series: part.estimated_series_count() as u64,
1311                max_sequence: part.sequence,
1312            },
1313        }))
1314    }
1315}
1316
1317/// Per-batch min/max statistics for the first tag column in a `MultiBulkPart`.
1318///
1319/// Since batches are sorted by primary key, we can extract the min/max of the first tag
1320/// from the first/last row's encoded primary key in each batch. These statistics enable
1321/// batch-level pruning using predicates, analogous to row-group pruning in parquet.
1322#[derive(Debug, Clone)]
1323struct BatchStats {
1324    /// Number of batches.
1325    num_batches: usize,
1326    /// Column id of the first tag.
1327    first_tag_id: ColumnId,
1328    /// Min values of the first tag, one element per batch.
1329    min_values: ArrayRef,
1330    /// Max values of the first tag, one element per batch.
1331    max_values: ArrayRef,
1332}
1333
1334impl BatchStats {
1335    /// Computes batch statistics from a slice of record batches.
1336    ///
1337    /// Returns `None` if there is no primary key (no first tag to collect stats for)
1338    /// or if extracting statistics fails.
1339    fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1340        // `primary_key.first()` is correct for both dense and sparse encodings.
1341        // For dense, values follow the order of `metadata.primary_key`.
1342        // For sparse, `decode_leftmost` decodes the first value which also
1343        // corresponds to `primary_key.first()`. See `SparsePrimaryKeyCodec` for format details.
1344        let first_tag_id = *metadata.primary_key.first()?;
1345        let first_tag_column = metadata.column_by_id(first_tag_id)?;
1346        let data_type = &first_tag_column.column_schema.data_type;
1347
1348        let converter = build_primary_key_codec_with_fields(
1349            metadata.primary_key_encoding,
1350            [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1351        );
1352        let pk_index = primary_key_column_index(batches.first()?.num_columns());
1353
1354        let mut min_builder = data_type.create_mutable_vector(batches.len());
1355        let mut max_builder = data_type.create_mutable_vector(batches.len());
1356
1357        for batch in batches {
1358            match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1359                Some((min_val, max_val)) => {
1360                    min_builder.push_value_ref(&min_val.as_value_ref());
1361                    max_builder.push_value_ref(&max_val.as_value_ref());
1362                }
1363                None => {
1364                    min_builder.push_null();
1365                    max_builder.push_null();
1366                }
1367            }
1368        }
1369
1370        Some(Self {
1371            num_batches: batches.len(),
1372            first_tag_id,
1373            min_values: min_builder.to_vector().to_arrow_array(),
1374            max_values: max_builder.to_vector().to_arrow_array(),
1375        })
1376    }
1377
1378    /// Extracts the first tag value from the first and last rows of a batch.
1379    fn extract_first_tag_bounds(
1380        batch: &RecordBatch,
1381        pk_index: usize,
1382        converter: &dyn PrimaryKeyCodec,
1383    ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1384        if batch.num_rows() == 0 {
1385            return None;
1386        }
1387
1388        let pk_dict = batch
1389            .column(pk_index)
1390            .as_any()
1391            .downcast_ref::<PrimaryKeyArray>()?;
1392        let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1393
1394        let keys = pk_dict.keys();
1395        let min_key = keys.value(0);
1396        let max_key = keys.value(batch.num_rows() - 1);
1397        let min_bytes = pk_values.value(min_key as usize);
1398        let max_bytes = pk_values.value(max_key as usize);
1399
1400        Some((
1401            converter.decode_leftmost(min_bytes).ok()??,
1402            converter.decode_leftmost(max_bytes).ok()??,
1403        ))
1404    }
1405}
1406
1407/// Adapter implementing `PruningStatistics` for `BatchStats`.
1408///
1409/// Used with `Predicate::prune_with_stats()` to skip batches whose first-tag
1410/// min/max range does not match the query predicate.
1411struct BatchPruningStats<'a> {
1412    stats: &'a BatchStats,
1413    metadata: &'a RegionMetadataRef,
1414}
1415
1416impl PruningStatistics for BatchPruningStats<'_> {
1417    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1418        let col = self.metadata.column_by_name(&column.name)?;
1419        if col.column_id == self.stats.first_tag_id {
1420            Some(self.stats.min_values.clone())
1421        } else {
1422            None
1423        }
1424    }
1425
1426    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1427        let col = self.metadata.column_by_name(&column.name)?;
1428        if col.column_id == self.stats.first_tag_id {
1429            Some(self.stats.max_values.clone())
1430        } else {
1431            None
1432        }
1433    }
1434
1435    fn num_containers(&self) -> usize {
1436        self.stats.num_batches
1437    }
1438
1439    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1440        None
1441    }
1442
1443    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1444        None
1445    }
1446
1447    fn contained(
1448        &self,
1449        _column: &Column,
1450        _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1451    ) -> Option<BooleanArray> {
1452        None
1453    }
1454}
1455
1456/// Returns true if the predicate references the given column name.
1457fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1458    let mut columns = HashSet::new();
1459    for expr in predicate.exprs() {
1460        let _ = expr_to_columns(expr, &mut columns);
1461    }
1462    columns.iter().any(|col| col.name == column_name)
1463}
1464
1465/// Returns true if the batch should be pruned (skipped) based on the first-tag min/max
1466/// statistics and the predicate in the context. Returns false if no pruning is possible
1467/// (no primary key, no predicate, or the batch matches the predicate).
1468pub(crate) fn should_prune_bulk_part(
1469    batch: &RecordBatch,
1470    context: &BulkIterContext,
1471    metadata: &RegionMetadata,
1472) -> bool {
1473    let predicate = match &context.predicate {
1474        Some(p) => p,
1475        None => return false,
1476    };
1477    // Check if the predicate references the first tag column to avoid computing
1478    // expensive batch statistics when they won't help with pruning.
1479    let first_tag_id = match metadata.primary_key.first() {
1480        Some(id) => *id,
1481        None => return false,
1482    };
1483    // Safety: `first_tag_id` comes from `metadata.primary_key` so the column always exists.
1484    let first_tag_name = &metadata
1485        .column_by_id(first_tag_id)
1486        .unwrap()
1487        .column_schema
1488        .name;
1489    if !predicate_references_column(predicate, first_tag_name) {
1490        return false;
1491    }
1492    let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1493        Some(s) => s,
1494        None => return false,
1495    };
1496    let region_meta = context.read_format().metadata();
1497    let pruning_stats = BatchPruningStats {
1498        stats: &stats,
1499        metadata: region_meta,
1500    };
1501    let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1502    !mask.first().copied().unwrap_or(true)
1503}
1504
1505/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1506///
1507/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1508/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1509/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1510#[derive(Debug, Clone)]
1511pub struct MultiBulkPart {
1512    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1513    batches: SmallVec<[RecordBatch; 4]>,
1514    /// Total rows across all batches.
1515    total_rows: usize,
1516    /// Max timestamp in part.
1517    max_timestamp: i64,
1518    /// Min timestamp in part.
1519    min_timestamp: i64,
1520    /// Max sequence number in part.
1521    max_sequence: SequenceNumber,
1522    /// Number of series.
1523    series_count: usize,
1524    /// Pre-computed per-batch statistics for the first tag column.
1525    /// `None` if there is no primary key.
1526    batch_stats: Option<BatchStats>,
1527}
1528
1529impl MultiBulkPart {
1530    /// Creates a new MultiBulkPart from a single BulkPart.
1531    pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1532        let num_rows = part.num_rows();
1533        let series_count = part.estimated_series_count();
1534        let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1535        let mut batches = SmallVec::new();
1536        batches.push(part.batch);
1537
1538        Self {
1539            batches,
1540            total_rows: num_rows,
1541            max_timestamp: part.max_timestamp,
1542            min_timestamp: part.min_timestamp,
1543            max_sequence: part.sequence,
1544            series_count,
1545            batch_stats,
1546        }
1547    }
1548
1549    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1550    ///
1551    /// # Arguments
1552    /// * `batches` - Ordered record batches
1553    /// * `min_timestamp` - Minimum timestamp across all batches
1554    /// * `max_timestamp` - Maximum timestamp across all batches
1555    /// * `max_sequence` - Maximum sequence number across all batches
1556    /// * `series_count` - Number of series in the batches
1557    /// * `metadata` - Region metadata for computing batch statistics
1558    ///
1559    /// # Panics
1560    /// Panics if batches is empty.
1561    pub fn new(
1562        batches: Vec<RecordBatch>,
1563        min_timestamp: i64,
1564        max_timestamp: i64,
1565        max_sequence: SequenceNumber,
1566        series_count: usize,
1567        metadata: &RegionMetadata,
1568    ) -> Self {
1569        assert!(!batches.is_empty(), "batches must not be empty");
1570
1571        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1572        let batch_stats = BatchStats::compute(&batches, metadata);
1573
1574        Self {
1575            batches: SmallVec::from_vec(batches),
1576            total_rows,
1577            max_timestamp,
1578            min_timestamp,
1579            max_sequence,
1580            series_count,
1581            batch_stats,
1582        }
1583    }
1584
1585    /// Returns the total number of rows across all batches.
1586    pub fn num_rows(&self) -> usize {
1587        self.total_rows
1588    }
1589
1590    /// Returns the minimum timestamp.
1591    pub fn min_timestamp(&self) -> i64 {
1592        self.min_timestamp
1593    }
1594
1595    /// Returns the maximum timestamp.
1596    pub fn max_timestamp(&self) -> i64 {
1597        self.max_timestamp
1598    }
1599
1600    /// Returns the maximum sequence number.
1601    pub fn max_sequence(&self) -> SequenceNumber {
1602        self.max_sequence
1603    }
1604
1605    /// Returns the number of series.
1606    pub fn series_count(&self) -> usize {
1607        self.series_count
1608    }
1609
1610    /// Returns the number of record batches in this part.
1611    pub fn num_batches(&self) -> usize {
1612        self.batches.len()
1613    }
1614
1615    /// Returns the estimated memory size of all batches.
1616    pub(crate) fn estimated_size(&self) -> usize {
1617        self.batches.iter().map(record_batch_estimated_size).sum()
1618    }
1619
1620    /// Reads data from this part with the given context and filters.
1621    ///
1622    /// If batch-level statistics are available and a predicate is set, prunes
1623    /// batches whose first-tag min/max range doesn't match the predicate before
1624    /// creating the iterator.
1625    pub(crate) fn read(
1626        &self,
1627        context: BulkIterContextRef,
1628        sequence: Option<SequenceRange>,
1629        mem_scan_metrics: Option<MemScanMetrics>,
1630    ) -> Result<Option<BoxedRecordBatchIterator>> {
1631        if self.batches.is_empty() {
1632            return Ok(None);
1633        }
1634
1635        let batches_to_read = self.prune_batches(&context);
1636
1637        if batches_to_read.is_empty() {
1638            return Ok(None);
1639        }
1640
1641        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1642            batches_to_read,
1643            context,
1644            sequence,
1645            self.series_count,
1646            mem_scan_metrics,
1647        );
1648        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1649    }
1650
1651    /// Prunes batches using the first-tag min/max statistics and the predicate.
1652    /// Returns all batches if no stats or no predicate is available.
1653    fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1654        if let Some(stats) = &self.batch_stats
1655            && let Some(predicate) = &context.predicate
1656        {
1657            let region_meta = context.read_format().metadata();
1658            let pruning_stats = BatchPruningStats {
1659                stats,
1660                metadata: region_meta,
1661            };
1662            let mask =
1663                predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1664            self.batches
1665                .iter()
1666                .zip(mask.iter())
1667                .filter_map(
1668                    |(batch, &selected)| {
1669                        if selected { Some(batch.clone()) } else { None }
1670                    },
1671                )
1672                .collect()
1673        } else {
1674            self.batches.iter().cloned().collect()
1675        }
1676    }
1677
1678    /// Converts this `MultiBulkPart` to `MemtableStats`.
1679    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1680        let ts_type = region_metadata.time_index_type();
1681        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1682        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1683
1684        MemtableStats {
1685            estimated_bytes: self.estimated_size(),
1686            time_range: Some((min_ts, max_ts)),
1687            num_rows: self.num_rows(),
1688            num_ranges: 1,
1689            max_sequence: self.max_sequence,
1690            series_count: self.series_count,
1691        }
1692    }
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697    use api::v1::{Row, SemanticType, WriteHint};
1698    use datafusion_common::ScalarValue;
1699    use datatypes::arrow::array::{
1700        BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1701    };
1702    use datatypes::arrow::datatypes::UInt32Type;
1703    use datatypes::prelude::{ConcreteDataType, Value};
1704    use datatypes::schema::ColumnSchema;
1705    use mito_codec::row_converter::build_primary_key_codec;
1706    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1707    use store_api::storage::RegionId;
1708    use store_api::storage::consts::ReservedColumnId;
1709    use table::predicate::Predicate;
1710
1711    use super::*;
1712    use crate::memtable::bulk::context::BulkIterContext;
1713    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1714    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1715
1716    struct MutationInput<'a> {
1717        k0: &'a str,
1718        k1: u32,
1719        timestamps: &'a [i64],
1720        v1: &'a [Option<f64>],
1721        sequence: u64,
1722    }
1723
1724    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1725        let metadata = metadata_for_test();
1726        let kvs = input
1727            .iter()
1728            .map(|m| {
1729                build_key_values_with_ts_seq_values(
1730                    &metadata,
1731                    m.k0.to_string(),
1732                    m.k1,
1733                    m.timestamps.iter().copied(),
1734                    m.v1.iter().copied(),
1735                    m.sequence,
1736                )
1737            })
1738            .collect::<Vec<_>>();
1739        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1740        let primary_key_codec = build_primary_key_codec(&metadata);
1741        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1742        for kv in kvs {
1743            converter.append_key_values(&kv).unwrap();
1744        }
1745        let part = converter.convert().unwrap();
1746        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1747        encoder.encode_part(&part).unwrap().unwrap()
1748    }
1749
1750    #[test]
1751    fn test_write_and_read_part_projection() {
1752        let part = encode(&[
1753            MutationInput {
1754                k0: "a",
1755                k1: 0,
1756                timestamps: &[1],
1757                v1: &[Some(0.1)],
1758                sequence: 0,
1759            },
1760            MutationInput {
1761                k0: "b",
1762                k1: 0,
1763                timestamps: &[1],
1764                v1: &[Some(0.0)],
1765                sequence: 0,
1766            },
1767            MutationInput {
1768                k0: "a",
1769                k1: 0,
1770                timestamps: &[2],
1771                v1: &[Some(0.2)],
1772                sequence: 1,
1773            },
1774        ]);
1775
1776        let projection = &[4u32];
1777        let reader = part
1778            .read(
1779                Arc::new(
1780                    BulkIterContext::new(
1781                        part.metadata.region_metadata.clone(),
1782                        Some(projection.as_slice()),
1783                        None,
1784                        false,
1785                    )
1786                    .unwrap(),
1787                ),
1788                None,
1789                None,
1790            )
1791            .unwrap()
1792            .expect("expect at least one row group");
1793
1794        let mut total_rows_read = 0;
1795        let mut field: Vec<f64> = vec![];
1796        for res in reader {
1797            let batch = res.unwrap();
1798            assert_eq!(5, batch.num_columns());
1799            field.extend_from_slice(
1800                batch
1801                    .column(0)
1802                    .as_any()
1803                    .downcast_ref::<Float64Array>()
1804                    .unwrap()
1805                    .values(),
1806            );
1807            total_rows_read += batch.num_rows();
1808        }
1809        assert_eq!(3, total_rows_read);
1810        assert_eq!(vec![0.1, 0.2, 0.0], field);
1811    }
1812
1813    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1814        let metadata = metadata_for_test();
1815        let kvs = key_values
1816            .into_iter()
1817            .map(|(k0, k1, (start, end), sequence)| {
1818                let ts = start..end;
1819                let v1 = (start..end).map(|_| None);
1820                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1821            })
1822            .collect::<Vec<_>>();
1823        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1824        let primary_key_codec = build_primary_key_codec(&metadata);
1825        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1826        for kv in kvs {
1827            converter.append_key_values(&kv).unwrap();
1828        }
1829        let part = converter.convert().unwrap();
1830        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1831        encoder.encode_part(&part).unwrap().unwrap()
1832    }
1833
1834    fn check_prune_row_group(
1835        part: &EncodedBulkPart,
1836        predicate: Option<Predicate>,
1837        expected_rows: usize,
1838    ) {
1839        let context = Arc::new(
1840            BulkIterContext::new(
1841                part.metadata.region_metadata.clone(),
1842                None,
1843                predicate,
1844                false,
1845            )
1846            .unwrap(),
1847        );
1848        let reader = part
1849            .read(context, None, None)
1850            .unwrap()
1851            .expect("expect at least one row group");
1852        let mut total_rows_read = 0;
1853        for res in reader {
1854            let batch = res.unwrap();
1855            total_rows_read += batch.num_rows();
1856        }
1857        // Should only read row group 1.
1858        assert_eq!(expected_rows, total_rows_read);
1859    }
1860
1861    #[test]
1862    fn test_prune_row_groups() {
1863        let part = prepare(vec![
1864            ("a", 0, (0, 40), 1),
1865            ("a", 1, (0, 60), 1),
1866            ("b", 0, (0, 100), 2),
1867            ("b", 1, (100, 180), 3),
1868            ("b", 1, (180, 210), 4),
1869        ]);
1870
1871        let context = Arc::new(
1872            BulkIterContext::new(
1873                part.metadata.region_metadata.clone(),
1874                None,
1875                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1876                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1877                )])),
1878                false,
1879            )
1880            .unwrap(),
1881        );
1882        assert!(part.read(context, None, None).unwrap().is_none());
1883
1884        check_prune_row_group(&part, None, 310);
1885
1886        check_prune_row_group(
1887            &part,
1888            Some(Predicate::new(vec![
1889                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1890                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1891            ])),
1892            40,
1893        );
1894
1895        check_prune_row_group(
1896            &part,
1897            Some(Predicate::new(vec![
1898                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1899                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1900            ])),
1901            60,
1902        );
1903
1904        check_prune_row_group(
1905            &part,
1906            Some(Predicate::new(vec![
1907                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1908            ])),
1909            100,
1910        );
1911
1912        check_prune_row_group(
1913            &part,
1914            Some(Predicate::new(vec![
1915                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1916                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1917            ])),
1918            100,
1919        );
1920
1921        // Predicates over field column can do precise filtering.
1922        check_prune_row_group(
1923            &part,
1924            Some(Predicate::new(vec![
1925                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1926            ])),
1927            1,
1928        );
1929    }
1930
1931    #[test]
1932    fn test_bulk_part_converter_append_and_convert() {
1933        let metadata = metadata_for_test();
1934        let capacity = 100;
1935        let primary_key_codec = build_primary_key_codec(&metadata);
1936        let schema = to_flat_sst_arrow_schema(
1937            &metadata,
1938            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1939        );
1940
1941        let mut converter =
1942            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1943
1944        let key_values1 = build_key_values_with_ts_seq_values(
1945            &metadata,
1946            "key1".to_string(),
1947            1u32,
1948            vec![1000, 2000].into_iter(),
1949            vec![Some(1.0), Some(2.0)].into_iter(),
1950            1,
1951        );
1952
1953        let key_values2 = build_key_values_with_ts_seq_values(
1954            &metadata,
1955            "key2".to_string(),
1956            2u32,
1957            vec![1500].into_iter(),
1958            vec![Some(3.0)].into_iter(),
1959            2,
1960        );
1961
1962        converter.append_key_values(&key_values1).unwrap();
1963        converter.append_key_values(&key_values2).unwrap();
1964
1965        let bulk_part = converter.convert().unwrap();
1966
1967        assert_eq!(bulk_part.num_rows(), 3);
1968        assert_eq!(bulk_part.min_timestamp, 1000);
1969        assert_eq!(bulk_part.max_timestamp, 2000);
1970        assert_eq!(bulk_part.sequence, 2);
1971        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1972
1973        // Validate primary key columns are stored
1974        // Schema should include primary key columns k0 and k1 at the beginning
1975        let schema = bulk_part.batch.schema();
1976        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1977        assert_eq!(
1978            field_names,
1979            vec![
1980                "k0",
1981                "k1",
1982                "v0",
1983                "v1",
1984                "ts",
1985                "__primary_key",
1986                "__sequence",
1987                "__op_type"
1988            ]
1989        );
1990    }
1991
1992    #[test]
1993    fn test_bulk_part_converter_sorting() {
1994        let metadata = metadata_for_test();
1995        let capacity = 100;
1996        let primary_key_codec = build_primary_key_codec(&metadata);
1997        let schema = to_flat_sst_arrow_schema(
1998            &metadata,
1999            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2000        );
2001
2002        let mut converter =
2003            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2004
2005        let key_values1 = build_key_values_with_ts_seq_values(
2006            &metadata,
2007            "z_key".to_string(),
2008            3u32,
2009            vec![3000].into_iter(),
2010            vec![Some(3.0)].into_iter(),
2011            3,
2012        );
2013
2014        let key_values2 = build_key_values_with_ts_seq_values(
2015            &metadata,
2016            "a_key".to_string(),
2017            1u32,
2018            vec![1000].into_iter(),
2019            vec![Some(1.0)].into_iter(),
2020            1,
2021        );
2022
2023        let key_values3 = build_key_values_with_ts_seq_values(
2024            &metadata,
2025            "m_key".to_string(),
2026            2u32,
2027            vec![2000].into_iter(),
2028            vec![Some(2.0)].into_iter(),
2029            2,
2030        );
2031
2032        converter.append_key_values(&key_values1).unwrap();
2033        converter.append_key_values(&key_values2).unwrap();
2034        converter.append_key_values(&key_values3).unwrap();
2035
2036        let bulk_part = converter.convert().unwrap();
2037
2038        assert_eq!(bulk_part.num_rows(), 3);
2039
2040        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2041        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2042
2043        let ts_array = ts_column
2044            .as_any()
2045            .downcast_ref::<TimestampMillisecondArray>()
2046            .unwrap();
2047        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2048
2049        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2050        assert_eq!(seq_array.values(), &[1, 2, 3]);
2051
2052        // Validate primary key columns are stored
2053        let schema = bulk_part.batch.schema();
2054        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2055        assert_eq!(
2056            field_names,
2057            vec![
2058                "k0",
2059                "k1",
2060                "v0",
2061                "v1",
2062                "ts",
2063                "__primary_key",
2064                "__sequence",
2065                "__op_type"
2066            ]
2067        );
2068    }
2069
2070    #[test]
2071    fn test_bulk_part_converter_empty() {
2072        let metadata = metadata_for_test();
2073        let capacity = 10;
2074        let primary_key_codec = build_primary_key_codec(&metadata);
2075        let schema = to_flat_sst_arrow_schema(
2076            &metadata,
2077            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2078        );
2079
2080        let converter =
2081            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2082
2083        let bulk_part = converter.convert().unwrap();
2084
2085        assert_eq!(bulk_part.num_rows(), 0);
2086        assert_eq!(bulk_part.min_timestamp, i64::MAX);
2087        assert_eq!(bulk_part.max_timestamp, i64::MIN);
2088        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2089
2090        // Validate primary key columns are present in schema even for empty batch
2091        let schema = bulk_part.batch.schema();
2092        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2093        assert_eq!(
2094            field_names,
2095            vec![
2096                "k0",
2097                "k1",
2098                "v0",
2099                "v1",
2100                "ts",
2101                "__primary_key",
2102                "__sequence",
2103                "__op_type"
2104            ]
2105        );
2106    }
2107
2108    #[test]
2109    fn test_bulk_part_converter_without_primary_key_columns() {
2110        let metadata = metadata_for_test();
2111        let primary_key_codec = build_primary_key_codec(&metadata);
2112        let schema = to_flat_sst_arrow_schema(
2113            &metadata,
2114            &FlatSchemaOptions {
2115                raw_pk_columns: false,
2116                string_pk_use_dict: true,
2117                ..Default::default()
2118            },
2119        );
2120
2121        let capacity = 100;
2122        let mut converter =
2123            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2124
2125        let key_values1 = build_key_values_with_ts_seq_values(
2126            &metadata,
2127            "key1".to_string(),
2128            1u32,
2129            vec![1000, 2000].into_iter(),
2130            vec![Some(1.0), Some(2.0)].into_iter(),
2131            1,
2132        );
2133
2134        let key_values2 = build_key_values_with_ts_seq_values(
2135            &metadata,
2136            "key2".to_string(),
2137            2u32,
2138            vec![1500].into_iter(),
2139            vec![Some(3.0)].into_iter(),
2140            2,
2141        );
2142
2143        converter.append_key_values(&key_values1).unwrap();
2144        converter.append_key_values(&key_values2).unwrap();
2145
2146        let bulk_part = converter.convert().unwrap();
2147
2148        assert_eq!(bulk_part.num_rows(), 3);
2149        assert_eq!(bulk_part.min_timestamp, 1000);
2150        assert_eq!(bulk_part.max_timestamp, 2000);
2151        assert_eq!(bulk_part.sequence, 2);
2152        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2153
2154        // Validate primary key columns are NOT stored individually
2155        let schema = bulk_part.batch.schema();
2156        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2157        assert_eq!(
2158            field_names,
2159            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2160        );
2161    }
2162
2163    #[allow(clippy::too_many_arguments)]
2164    fn build_key_values_with_sparse_encoding(
2165        metadata: &RegionMetadataRef,
2166        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2167        table_id: u32,
2168        tsid: u64,
2169        k0: String,
2170        k1: String,
2171        timestamps: impl Iterator<Item = i64>,
2172        values: impl Iterator<Item = Option<f64>>,
2173        sequence: SequenceNumber,
2174    ) -> KeyValues {
2175        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
2176        let pk_values = vec![
2177            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2178            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2179            (0, Value::String(k0.clone().into())),
2180            (1, Value::String(k1.clone().into())),
2181        ];
2182        let mut encoded_key = Vec::new();
2183        primary_key_codec
2184            .encode_values(&pk_values, &mut encoded_key)
2185            .unwrap();
2186        assert!(!encoded_key.is_empty());
2187
2188        // Create schema for sparse encoding: __primary_key, ts, v0, v1
2189        let column_schema = vec![
2190            api::v1::ColumnSchema {
2191                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2192                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2193                    ConcreteDataType::binary_datatype(),
2194                )
2195                .unwrap()
2196                .datatype() as i32,
2197                semantic_type: api::v1::SemanticType::Tag as i32,
2198                ..Default::default()
2199            },
2200            api::v1::ColumnSchema {
2201                column_name: "ts".to_string(),
2202                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2203                    ConcreteDataType::timestamp_millisecond_datatype(),
2204                )
2205                .unwrap()
2206                .datatype() as i32,
2207                semantic_type: api::v1::SemanticType::Timestamp as i32,
2208                ..Default::default()
2209            },
2210            api::v1::ColumnSchema {
2211                column_name: "v0".to_string(),
2212                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2213                    ConcreteDataType::int64_datatype(),
2214                )
2215                .unwrap()
2216                .datatype() as i32,
2217                semantic_type: api::v1::SemanticType::Field as i32,
2218                ..Default::default()
2219            },
2220            api::v1::ColumnSchema {
2221                column_name: "v1".to_string(),
2222                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2223                    ConcreteDataType::float64_datatype(),
2224                )
2225                .unwrap()
2226                .datatype() as i32,
2227                semantic_type: api::v1::SemanticType::Field as i32,
2228                ..Default::default()
2229            },
2230        ];
2231
2232        let rows = timestamps
2233            .zip(values)
2234            .map(|(ts, v)| Row {
2235                values: vec![
2236                    api::v1::Value {
2237                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2238                            encoded_key.clone(),
2239                        )),
2240                    },
2241                    api::v1::Value {
2242                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2243                    },
2244                    api::v1::Value {
2245                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2246                    },
2247                    api::v1::Value {
2248                        value_data: v.map(api::v1::value::ValueData::F64Value),
2249                    },
2250                ],
2251            })
2252            .collect();
2253
2254        let mutation = api::v1::Mutation {
2255            op_type: 1,
2256            sequence,
2257            rows: Some(api::v1::Rows {
2258                schema: column_schema,
2259                rows,
2260            }),
2261            write_hint: Some(WriteHint {
2262                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2263            }),
2264        };
2265        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2266    }
2267
2268    #[test]
2269    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2270        use api::v1::SemanticType;
2271        use datatypes::schema::ColumnSchema;
2272        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2273        use store_api::storage::RegionId;
2274
2275        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2276        builder
2277            .push_column_metadata(ColumnMetadata {
2278                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2279                semantic_type: SemanticType::Tag,
2280                column_id: 0,
2281            })
2282            .push_column_metadata(ColumnMetadata {
2283                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2284                semantic_type: SemanticType::Tag,
2285                column_id: 1,
2286            })
2287            .push_column_metadata(ColumnMetadata {
2288                column_schema: ColumnSchema::new(
2289                    "ts",
2290                    ConcreteDataType::timestamp_millisecond_datatype(),
2291                    false,
2292                ),
2293                semantic_type: SemanticType::Timestamp,
2294                column_id: 2,
2295            })
2296            .push_column_metadata(ColumnMetadata {
2297                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2298                semantic_type: SemanticType::Field,
2299                column_id: 3,
2300            })
2301            .push_column_metadata(ColumnMetadata {
2302                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2303                semantic_type: SemanticType::Field,
2304                column_id: 4,
2305            })
2306            .primary_key(vec![0, 1])
2307            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2308        let metadata = Arc::new(builder.build().unwrap());
2309
2310        let primary_key_codec = build_primary_key_codec(&metadata);
2311        let schema = to_flat_sst_arrow_schema(
2312            &metadata,
2313            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2314        );
2315
2316        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2317        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2318
2319        let capacity = 100;
2320        let mut converter =
2321            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2322
2323        let key_values1 = build_key_values_with_sparse_encoding(
2324            &metadata,
2325            &primary_key_codec,
2326            2048u32, // table_id
2327            100u64,  // tsid
2328            "key11".to_string(),
2329            "key21".to_string(),
2330            vec![1000, 2000].into_iter(),
2331            vec![Some(1.0), Some(2.0)].into_iter(),
2332            1,
2333        );
2334
2335        let key_values2 = build_key_values_with_sparse_encoding(
2336            &metadata,
2337            &primary_key_codec,
2338            4096u32, // table_id
2339            200u64,  // tsid
2340            "key12".to_string(),
2341            "key22".to_string(),
2342            vec![1500].into_iter(),
2343            vec![Some(3.0)].into_iter(),
2344            2,
2345        );
2346
2347        converter.append_key_values(&key_values1).unwrap();
2348        converter.append_key_values(&key_values2).unwrap();
2349
2350        let bulk_part = converter.convert().unwrap();
2351
2352        assert_eq!(bulk_part.num_rows(), 3);
2353        assert_eq!(bulk_part.min_timestamp, 1000);
2354        assert_eq!(bulk_part.max_timestamp, 2000);
2355        assert_eq!(bulk_part.sequence, 2);
2356        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2357
2358        // For sparse encoding, primary key columns should NOT be stored individually
2359        // even when store_primary_key_columns is true, because sparse encoding
2360        // stores the encoded primary key in the __primary_key column
2361        let schema = bulk_part.batch.schema();
2362        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2363        assert_eq!(
2364            field_names,
2365            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2366        );
2367
2368        // Verify the __primary_key column contains encoded sparse keys
2369        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2370        let dict_array = primary_key_column
2371            .as_any()
2372            .downcast_ref::<DictionaryArray<UInt32Type>>()
2373            .unwrap();
2374
2375        // Should have non-zero entries indicating encoded primary keys
2376        assert!(!dict_array.is_empty());
2377        assert_eq!(dict_array.len(), 3); // 3 rows total
2378
2379        // Verify values are properly encoded binary data (not empty)
2380        let values = dict_array
2381            .values()
2382            .as_any()
2383            .downcast_ref::<BinaryArray>()
2384            .unwrap();
2385        for i in 0..values.len() {
2386            assert!(
2387                !values.value(i).is_empty(),
2388                "Encoded primary key should not be empty"
2389            );
2390        }
2391    }
2392
2393    #[test]
2394    fn test_convert_bulk_part_empty() {
2395        let metadata = metadata_for_test();
2396        let schema = to_flat_sst_arrow_schema(
2397            &metadata,
2398            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2399        );
2400        let primary_key_codec = build_primary_key_codec(&metadata);
2401
2402        // Create empty batch
2403        let empty_batch = RecordBatch::new_empty(schema.clone());
2404        let empty_part = BulkPart {
2405            batch: empty_batch,
2406            max_timestamp: 0,
2407            min_timestamp: 0,
2408            sequence: 0,
2409            timestamp_index: 0,
2410            raw_data: None,
2411        };
2412
2413        let result =
2414            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2415        assert!(result.is_none());
2416    }
2417
2418    #[test]
2419    fn test_convert_bulk_part_dense_with_pk_columns() {
2420        let metadata = metadata_for_test();
2421        let primary_key_codec = build_primary_key_codec(&metadata);
2422
2423        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2424            "key1", "key2", "key1",
2425        ]));
2426        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2427        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2428        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2429        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2430
2431        let input_schema = Arc::new(Schema::new(vec![
2432            Field::new("k0", ArrowDataType::Utf8, false),
2433            Field::new("k1", ArrowDataType::UInt32, false),
2434            Field::new("v0", ArrowDataType::Int64, true),
2435            Field::new("v1", ArrowDataType::Float64, true),
2436            Field::new(
2437                "ts",
2438                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2439                false,
2440            ),
2441        ]));
2442
2443        let input_batch = RecordBatch::try_new(
2444            input_schema,
2445            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2446        )
2447        .unwrap();
2448
2449        let part = BulkPart {
2450            batch: input_batch,
2451            max_timestamp: 2000,
2452            min_timestamp: 1000,
2453            sequence: 5,
2454            timestamp_index: 4,
2455            raw_data: None,
2456        };
2457
2458        let output_schema = to_flat_sst_arrow_schema(
2459            &metadata,
2460            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2461        );
2462
2463        let result = convert_bulk_part(
2464            part,
2465            &metadata,
2466            primary_key_codec,
2467            output_schema,
2468            true, // store primary key columns
2469        )
2470        .unwrap();
2471
2472        let converted = result.unwrap();
2473
2474        assert_eq!(converted.num_rows(), 3);
2475        assert_eq!(converted.max_timestamp, 2000);
2476        assert_eq!(converted.min_timestamp, 1000);
2477        assert_eq!(converted.sequence, 5);
2478
2479        let schema = converted.batch.schema();
2480        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2481        assert_eq!(
2482            field_names,
2483            vec![
2484                "k0",
2485                "k1",
2486                "v0",
2487                "v1",
2488                "ts",
2489                "__primary_key",
2490                "__sequence",
2491                "__op_type"
2492            ]
2493        );
2494
2495        let k0_col = converted.batch.column_by_name("k0").unwrap();
2496        assert!(matches!(
2497            k0_col.data_type(),
2498            ArrowDataType::Dictionary(_, _)
2499        ));
2500
2501        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2502        let dict_array = pk_col
2503            .as_any()
2504            .downcast_ref::<DictionaryArray<UInt32Type>>()
2505            .unwrap();
2506        let keys = dict_array.keys();
2507
2508        assert_eq!(keys.len(), 3);
2509    }
2510
2511    #[test]
2512    fn test_convert_bulk_part_dense_without_pk_columns() {
2513        let metadata = metadata_for_test();
2514        let primary_key_codec = build_primary_key_codec(&metadata);
2515
2516        // Create input batch with primary key columns (k0, k1)
2517        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2518        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2519        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2520        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2521        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2522
2523        let input_schema = Arc::new(Schema::new(vec![
2524            Field::new("k0", ArrowDataType::Utf8, false),
2525            Field::new("k1", ArrowDataType::UInt32, false),
2526            Field::new("v0", ArrowDataType::Int64, true),
2527            Field::new("v1", ArrowDataType::Float64, true),
2528            Field::new(
2529                "ts",
2530                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2531                false,
2532            ),
2533        ]));
2534
2535        let input_batch = RecordBatch::try_new(
2536            input_schema,
2537            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2538        )
2539        .unwrap();
2540
2541        let part = BulkPart {
2542            batch: input_batch,
2543            max_timestamp: 2000,
2544            min_timestamp: 1000,
2545            sequence: 3,
2546            timestamp_index: 4,
2547            raw_data: None,
2548        };
2549
2550        let output_schema = to_flat_sst_arrow_schema(
2551            &metadata,
2552            &FlatSchemaOptions {
2553                raw_pk_columns: false,
2554                string_pk_use_dict: true,
2555                ..Default::default()
2556            },
2557        );
2558
2559        let result = convert_bulk_part(
2560            part,
2561            &metadata,
2562            primary_key_codec,
2563            output_schema,
2564            false, // don't store primary key columns
2565        )
2566        .unwrap();
2567
2568        let converted = result.unwrap();
2569
2570        assert_eq!(converted.num_rows(), 2);
2571        assert_eq!(converted.max_timestamp, 2000);
2572        assert_eq!(converted.min_timestamp, 1000);
2573        assert_eq!(converted.sequence, 3);
2574
2575        // Verify schema does NOT include individual primary key columns
2576        let schema = converted.batch.schema();
2577        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2578        assert_eq!(
2579            field_names,
2580            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2581        );
2582
2583        // Verify __primary_key column is present and is a dictionary
2584        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2585        assert!(matches!(
2586            pk_col.data_type(),
2587            ArrowDataType::Dictionary(_, _)
2588        ));
2589    }
2590
2591    #[test]
2592    fn test_convert_bulk_part_sparse_encoding() {
2593        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2594        builder
2595            .push_column_metadata(ColumnMetadata {
2596                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2597                semantic_type: SemanticType::Tag,
2598                column_id: 0,
2599            })
2600            .push_column_metadata(ColumnMetadata {
2601                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2602                semantic_type: SemanticType::Tag,
2603                column_id: 1,
2604            })
2605            .push_column_metadata(ColumnMetadata {
2606                column_schema: ColumnSchema::new(
2607                    "ts",
2608                    ConcreteDataType::timestamp_millisecond_datatype(),
2609                    false,
2610                ),
2611                semantic_type: SemanticType::Timestamp,
2612                column_id: 2,
2613            })
2614            .push_column_metadata(ColumnMetadata {
2615                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2616                semantic_type: SemanticType::Field,
2617                column_id: 3,
2618            })
2619            .push_column_metadata(ColumnMetadata {
2620                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2621                semantic_type: SemanticType::Field,
2622                column_id: 4,
2623            })
2624            .primary_key(vec![0, 1])
2625            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2626        let metadata = Arc::new(builder.build().unwrap());
2627
2628        let primary_key_codec = build_primary_key_codec(&metadata);
2629
2630        // Create input batch with __primary_key column (sparse encoding)
2631        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2632            b"encoded_key_1".as_slice(),
2633            b"encoded_key_2".as_slice(),
2634        ]));
2635        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2636        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2637        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2638
2639        let input_schema = Arc::new(Schema::new(vec![
2640            Field::new("__primary_key", ArrowDataType::Binary, false),
2641            Field::new("v0", ArrowDataType::Int64, true),
2642            Field::new("v1", ArrowDataType::Float64, true),
2643            Field::new(
2644                "ts",
2645                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2646                false,
2647            ),
2648        ]));
2649
2650        let input_batch =
2651            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2652                .unwrap();
2653
2654        let part = BulkPart {
2655            batch: input_batch,
2656            max_timestamp: 2000,
2657            min_timestamp: 1000,
2658            sequence: 7,
2659            timestamp_index: 3,
2660            raw_data: None,
2661        };
2662
2663        let output_schema = to_flat_sst_arrow_schema(
2664            &metadata,
2665            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2666        );
2667
2668        let result = convert_bulk_part(
2669            part,
2670            &metadata,
2671            primary_key_codec,
2672            output_schema,
2673            true, // store_primary_key_columns (ignored for sparse)
2674        )
2675        .unwrap();
2676
2677        let converted = result.unwrap();
2678
2679        assert_eq!(converted.num_rows(), 2);
2680        assert_eq!(converted.max_timestamp, 2000);
2681        assert_eq!(converted.min_timestamp, 1000);
2682        assert_eq!(converted.sequence, 7);
2683
2684        // Verify schema does NOT include individual primary key columns (sparse encoding)
2685        let schema = converted.batch.schema();
2686        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2687        assert_eq!(
2688            field_names,
2689            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2690        );
2691
2692        // Verify __primary_key is dictionary encoded
2693        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2694        assert!(matches!(
2695            pk_col.data_type(),
2696            ArrowDataType::Dictionary(_, _)
2697        ));
2698    }
2699
2700    #[test]
2701    fn test_convert_bulk_part_sorting_with_multiple_series() {
2702        let metadata = metadata_for_test();
2703        let primary_key_codec = build_primary_key_codec(&metadata);
2704
2705        // Create unsorted batch with multiple series and timestamps
2706        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2707            "series_b", "series_a", "series_b", "series_a",
2708        ]));
2709        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2710        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2711        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2712        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2713            2000, 1000, 4000, 3000,
2714        ]));
2715
2716        let input_schema = Arc::new(Schema::new(vec![
2717            Field::new("k0", ArrowDataType::Utf8, false),
2718            Field::new("k1", ArrowDataType::UInt32, false),
2719            Field::new("v0", ArrowDataType::Int64, true),
2720            Field::new("v1", ArrowDataType::Float64, true),
2721            Field::new(
2722                "ts",
2723                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2724                false,
2725            ),
2726        ]));
2727
2728        let input_batch = RecordBatch::try_new(
2729            input_schema,
2730            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2731        )
2732        .unwrap();
2733
2734        let part = BulkPart {
2735            batch: input_batch,
2736            max_timestamp: 4000,
2737            min_timestamp: 1000,
2738            sequence: 10,
2739            timestamp_index: 4,
2740            raw_data: None,
2741        };
2742
2743        let output_schema = to_flat_sst_arrow_schema(
2744            &metadata,
2745            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2746        );
2747
2748        let result =
2749            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2750
2751        let converted = result.unwrap();
2752
2753        assert_eq!(converted.num_rows(), 4);
2754
2755        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2756        let ts_col = converted.batch.column(converted.timestamp_index);
2757        let ts_array = ts_col
2758            .as_any()
2759            .downcast_ref::<TimestampMillisecondArray>()
2760            .unwrap();
2761
2762        // After sorting by (pk, ts), we should have:
2763        // series_a,1: ts=1000, 3000
2764        // series_b,2: ts=2000, 4000
2765        let timestamps: Vec<i64> = ts_array.values().to_vec();
2766        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2767    }
2768
2769    /// Helper to create a converted BulkPart (with __primary_key column) from MutationInputs.
2770    fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2771        let metadata = metadata_for_test();
2772        let kvs = inputs
2773            .iter()
2774            .map(|m| {
2775                build_key_values_with_ts_seq_values(
2776                    &metadata,
2777                    m.k0.to_string(),
2778                    m.k1,
2779                    m.timestamps.iter().copied(),
2780                    m.v1.iter().copied(),
2781                    m.sequence,
2782                )
2783            })
2784            .collect::<Vec<_>>();
2785        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2786        let primary_key_codec = build_primary_key_codec(&metadata);
2787        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2788        for kv in kvs {
2789            converter.append_key_values(&kv).unwrap();
2790        }
2791        converter.convert().unwrap()
2792    }
2793
2794    /// Helper to create a MultiBulkPart where each group becomes a separate batch.
2795    fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2796        let metadata = metadata_for_test();
2797        let mut all_batches = Vec::new();
2798        let mut min_ts = i64::MAX;
2799        let mut max_ts = i64::MIN;
2800        let mut max_seq = 0u64;
2801
2802        for inputs in groups {
2803            let part = build_converted_bulk_part(inputs);
2804            min_ts = min_ts.min(part.min_timestamp);
2805            max_ts = max_ts.max(part.max_timestamp);
2806            max_seq = max_seq.max(part.sequence);
2807            all_batches.push(part.batch);
2808        }
2809
2810        let multi = MultiBulkPart::new(
2811            all_batches,
2812            min_ts,
2813            max_ts,
2814            max_seq,
2815            groups.len(),
2816            &metadata,
2817        );
2818        (multi, metadata)
2819    }
2820
2821    #[test]
2822    fn test_multi_bulk_part_prune_batches() {
2823        // Three batches with distinct k0 ranges: ["a"], ["m"], ["z"].
2824        let (multi, metadata) = build_multi_bulk_part(&[
2825            &[MutationInput {
2826                k0: "a",
2827                k1: 0,
2828                timestamps: &[1, 2],
2829                v1: &[Some(1.0), Some(2.0)],
2830                sequence: 0,
2831            }],
2832            &[MutationInput {
2833                k0: "m",
2834                k1: 0,
2835                timestamps: &[3, 4],
2836                v1: &[Some(3.0), Some(4.0)],
2837                sequence: 1,
2838            }],
2839            &[MutationInput {
2840                k0: "z",
2841                k1: 0,
2842                timestamps: &[5, 6],
2843                v1: &[Some(5.0), Some(6.0)],
2844                sequence: 2,
2845            }],
2846        ]);
2847        assert_eq!(multi.num_rows(), 6);
2848        assert_eq!(multi.num_batches(), 3);
2849
2850        // k0 = "m" => only middle batch (2 rows).
2851        let context = Arc::new(
2852            BulkIterContext::new(
2853                metadata.clone(),
2854                None,
2855                Some(Predicate::new(vec![
2856                    datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2857                ])),
2858                false,
2859            )
2860            .unwrap(),
2861        );
2862        let reader = multi
2863            .read(context, None, None)
2864            .unwrap()
2865            .expect("should have results");
2866        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2867        assert_eq!(total_rows, 2);
2868
2869        // k0 = "nonexistent" => all pruned, returns None.
2870        let context = Arc::new(
2871            BulkIterContext::new(
2872                metadata.clone(),
2873                None,
2874                Some(Predicate::new(vec![
2875                    datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2876                ])),
2877                false,
2878            )
2879            .unwrap(),
2880        );
2881        assert!(multi.read(context, None, None).unwrap().is_none());
2882
2883        // No predicate => all 6 rows.
2884        let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2885        let reader = multi
2886            .read(context, None, None)
2887            .unwrap()
2888            .expect("should have results");
2889        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2890        assert_eq!(total_rows, 6);
2891    }
2892}