Skip to main content

mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33    Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions, concat_batches};
36use datatypes::arrow::datatypes::{
37    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_structured_json_field;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::value::ValueRef;
43use datatypes::vectors::Helper;
44use mito_codec::key_values::{KeyValue, KeyValues};
45use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
46use parquet::arrow::ArrowWriter;
47use parquet::basic::{Compression, ZstdLevel};
48use parquet::file::metadata::ParquetMetaData;
49use parquet::file::properties::WriterProperties;
50use smallvec::SmallVec;
51use snafu::{OptionExt, ResultExt};
52use store_api::codec::PrimaryKeyEncoding;
53use store_api::metadata::{RegionMetadata, RegionMetadataRef};
54use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
55use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
56
57use crate::error::{
58    self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
59    EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
60    NewRecordBatchSnafu, Result,
61};
62use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
63use crate::memtable::bulk::json_align::Json2Aligner;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
67use crate::sst::SeriesEstimator;
68use crate::sst::index::IndexOutput;
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
71use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
72
73const INIT_DICT_VALUE_CAPACITY: usize = 8;
74
75/// A raw bulk part in the memtable.
76#[derive(Clone)]
77pub struct BulkPart {
78    pub batch: RecordBatch,
79    pub max_timestamp: i64,
80    pub min_timestamp: i64,
81    pub sequence: u64,
82    pub timestamp_index: usize,
83    pub raw_data: Option<ArrowIpc>,
84}
85
86impl TryFrom<BulkWalEntry> for BulkPart {
87    type Error = error::Error;
88
89    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
90        match value.body.expect("Entry payload should be present") {
91            Body::ArrowIpc(ipc) => {
92                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
93                    .context(error::ConvertBulkWalEntrySnafu)?;
94                let batch = decoder
95                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
96                    .context(error::ConvertBulkWalEntrySnafu)?;
97                Ok(Self {
98                    batch,
99                    max_timestamp: value.max_ts,
100                    min_timestamp: value.min_ts,
101                    sequence: value.sequence,
102                    timestamp_index: value.timestamp_index as usize,
103                    raw_data: Some(ipc),
104                })
105            }
106        }
107    }
108}
109
110impl TryFrom<&BulkPart> for BulkWalEntry {
111    type Error = error::Error;
112
113    fn try_from(value: &BulkPart) -> Result<Self> {
114        if let Some(ipc) = &value.raw_data {
115            Ok(BulkWalEntry {
116                sequence: value.sequence,
117                max_ts: value.max_timestamp,
118                min_ts: value.min_timestamp,
119                timestamp_index: value.timestamp_index as u32,
120                body: Some(Body::ArrowIpc(ipc.clone())),
121            })
122        } else {
123            let mut encoder = FlightEncoder::default();
124            let schema_bytes = encoder
125                .encode_schema(value.batch.schema().as_ref())
126                .data_header;
127            let [rb_data] = encoder
128                .encode(FlightMessage::RecordBatch(value.batch.clone()))
129                .try_into()
130                .map_err(|_| {
131                    error::UnsupportedOperationSnafu {
132                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
133                    }
134                    .build()
135                })?;
136            Ok(BulkWalEntry {
137                sequence: value.sequence,
138                max_ts: value.max_timestamp,
139                min_ts: value.min_timestamp,
140                timestamp_index: value.timestamp_index as u32,
141                body: Some(Body::ArrowIpc(ArrowIpc {
142                    schema: schema_bytes,
143                    data_header: rb_data.data_header,
144                    payload: rb_data.data_body,
145                })),
146            })
147        }
148    }
149}
150
151impl BulkPart {
152    pub(crate) fn schema(&self) -> SchemaRef {
153        self.batch.schema()
154    }
155
156    pub(crate) fn estimated_size(&self) -> usize {
157        record_batch_estimated_size(&self.batch)
158    }
159
160    /// Returns the estimated series count in this BulkPart.
161    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
162    pub fn estimated_series_count(&self) -> usize {
163        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
164        let pk_column = self.batch.column(pk_column_idx);
165        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
166            dict_array.values().len()
167        } else {
168            0
169        }
170    }
171
172    /// Creates MemtableStats from this BulkPart.
173    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
174        let ts_type = region_metadata.time_index_type();
175        let min_ts = ts_type.create_timestamp(self.min_timestamp);
176        let max_ts = ts_type.create_timestamp(self.max_timestamp);
177
178        MemtableStats {
179            estimated_bytes: self.estimated_size(),
180            time_range: Some((min_ts, max_ts)),
181            num_rows: self.num_rows(),
182            num_ranges: 1,
183            max_sequence: self.sequence,
184            series_count: self.estimated_series_count(),
185        }
186    }
187
188    /// Fills missing columns in the BulkPart batch with default values.
189    ///
190    /// This function checks if the batch schema matches the region metadata schema,
191    /// and if there are missing columns, it fills them with default values (or null
192    /// for nullable columns).
193    ///
194    /// # Arguments
195    ///
196    /// * `region_metadata` - The region metadata containing the expected schema
197    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
198        // Builds a map of existing columns in the batch
199        let batch_schema = self.batch.schema();
200        let batch_columns: HashSet<_> = batch_schema
201            .fields()
202            .iter()
203            .map(|f| f.name().as_str())
204            .collect();
205
206        // Finds columns that need to be filled
207        let mut columns_to_fill = Vec::new();
208        for column_meta in &region_metadata.column_metadatas {
209            // TODO(yingwen): Returns error if it is impure default after we support filling
210            // bulk insert request in the frontend
211            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
212                columns_to_fill.push(column_meta);
213            }
214        }
215
216        if columns_to_fill.is_empty() {
217            return Ok(());
218        }
219
220        let num_rows = self.batch.num_rows();
221
222        let mut new_columns = Vec::new();
223        let mut new_fields = Vec::new();
224
225        // First, adds all existing columns
226        new_fields.extend(batch_schema.fields().iter().cloned());
227        new_columns.extend_from_slice(self.batch.columns());
228
229        let region_id = region_metadata.region_id;
230        // Then adds the missing columns with default values
231        for column_meta in columns_to_fill {
232            let default_vector = column_meta
233                .column_schema
234                .create_default_vector(num_rows)
235                .context(CreateDefaultSnafu {
236                    region_id,
237                    column: &column_meta.column_schema.name,
238                })?
239                .with_context(|| InvalidRequestSnafu {
240                    region_id,
241                    reason: format!(
242                        "column {} does not have default value",
243                        column_meta.column_schema.name
244                    ),
245                })?;
246            let arrow_array = default_vector.to_arrow_array();
247            column_meta.column_schema.data_type.as_arrow_type();
248
249            new_fields.push(Arc::new(Field::new(
250                column_meta.column_schema.name.clone(),
251                column_meta.column_schema.data_type.as_arrow_type(),
252                column_meta.column_schema.is_nullable(),
253            )));
254            new_columns.push(arrow_array);
255        }
256
257        // Create a new schema and batch with the filled columns
258        let new_schema = Arc::new(Schema::new(new_fields));
259        let new_batch =
260            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
261
262        // Update the batch
263        self.batch = new_batch;
264
265        Ok(())
266    }
267
268    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
269    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
270        let vectors = region_metadata
271            .schema
272            .column_schemas()
273            .iter()
274            .map(|col| match self.batch.column_by_name(&col.name) {
275                None => Ok(None),
276                Some(col) => Helper::try_into_vector(col).map(Some),
277            })
278            .collect::<datatypes::error::Result<Vec<_>>>()
279            .context(error::ComputeVectorSnafu)?;
280
281        let rows = (0..self.num_rows())
282            .map(|row_idx| {
283                let values = (0..self.batch.num_columns())
284                    .map(|col_idx| {
285                        if let Some(v) = &vectors[col_idx] {
286                            to_grpc_value(v.get(row_idx))
287                        } else {
288                            api::v1::Value { value_data: None }
289                        }
290                    })
291                    .collect::<Vec<_>>();
292                api::v1::Row { values }
293            })
294            .collect::<Vec<_>>();
295
296        let schema = region_metadata
297            .column_metadatas
298            .iter()
299            .map(|c| {
300                let data_type_wrapper =
301                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
302                Ok(api::v1::ColumnSchema {
303                    column_name: c.column_schema.name.clone(),
304                    datatype: data_type_wrapper.datatype() as i32,
305                    semantic_type: c.semantic_type as i32,
306                    ..Default::default()
307                })
308            })
309            .collect::<api::error::Result<Vec<_>>>()
310            .context(error::ConvertColumnDataTypeSnafu {
311                reason: "failed to convert region metadata to column schema",
312            })?;
313
314        let rows = api::v1::Rows { schema, rows };
315
316        Ok(Mutation {
317            op_type: OpType::Put as i32,
318            sequence: self.sequence,
319            rows: Some(rows),
320            write_hint: None,
321        })
322    }
323
324    pub fn timestamps(&self) -> &ArrayRef {
325        self.batch.column(self.timestamp_index)
326    }
327
328    pub fn num_rows(&self) -> usize {
329        self.batch.num_rows()
330    }
331}
332
333/// A collection of small unordered bulk parts.
334/// Used to batch small parts together before merging them into a sorted part.
335pub struct UnorderedPart {
336    /// Small bulk parts that haven't been sorted yet.
337    parts: Vec<BulkPart>,
338    /// Total number of rows across all parts.
339    total_rows: usize,
340    /// Minimum timestamp across all parts.
341    min_timestamp: i64,
342    /// Maximum timestamp across all parts.
343    max_timestamp: i64,
344    /// Maximum sequence number across all parts.
345    max_sequence: u64,
346    /// Row count threshold for accepting parts (default: 1024).
347    threshold: usize,
348    /// Row count threshold for compacting (default: 4096).
349    compact_threshold: usize,
350}
351
352impl Default for UnorderedPart {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358impl UnorderedPart {
359    /// Creates a new empty UnorderedPart.
360    pub fn new() -> Self {
361        Self {
362            parts: Vec::new(),
363            total_rows: 0,
364            min_timestamp: i64::MAX,
365            max_timestamp: i64::MIN,
366            max_sequence: 0,
367            threshold: 1024,
368            compact_threshold: 4096,
369        }
370    }
371
372    /// Sets the threshold for accepting parts into unordered_part.
373    pub fn set_threshold(&mut self, threshold: usize) {
374        self.threshold = threshold;
375    }
376
377    /// Sets the threshold for compacting unordered_part.
378    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
379        self.compact_threshold = compact_threshold;
380    }
381
382    /// Returns the threshold for accepting parts.
383    pub fn threshold(&self) -> usize {
384        self.threshold
385    }
386
387    /// Returns the compact threshold.
388    pub fn compact_threshold(&self) -> usize {
389        self.compact_threshold
390    }
391
392    /// Returns true if this part should accept the given row count.
393    pub fn should_accept(&self, num_rows: usize) -> bool {
394        num_rows < self.threshold
395    }
396
397    /// Returns true if this part should be compacted.
398    pub fn should_compact(&self) -> bool {
399        self.total_rows >= self.compact_threshold
400    }
401
402    /// Adds a BulkPart to this unordered collection.
403    pub fn push(&mut self, part: BulkPart) {
404        self.total_rows += part.num_rows();
405        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
406        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
407        self.max_sequence = self.max_sequence.max(part.sequence);
408        self.parts.push(part);
409    }
410
411    /// Returns the total number of rows across all parts.
412    pub fn num_rows(&self) -> usize {
413        self.total_rows
414    }
415
416    /// Returns true if there are no parts.
417    pub fn is_empty(&self) -> bool {
418        self.parts.is_empty()
419    }
420
421    /// Returns the number of parts in this collection.
422    pub fn num_parts(&self) -> usize {
423        self.parts.len()
424    }
425
426    /// Concatenates and sorts all parts into a single RecordBatch.
427    /// Returns None if the collection is empty.
428    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
429        if self.parts.is_empty() {
430            return Ok(None);
431        }
432
433        if self.parts.len() == 1 {
434            // If there's only one part, return its batch directly
435            return Ok(Some(self.parts[0].batch.clone()));
436        }
437
438        // Get the schema from the first part
439        let schema = self.parts[0].batch.schema();
440        let concatenated = if schema.fields().iter().any(is_structured_json_field) {
441            let aligner = Json2Aligner::try_new(self.parts.iter().map(|part| part.batch.schema()))?;
442            let aligned_batches =
443                aligner.align_batches(self.parts.iter().map(|part| part.batch.clone()))?;
444            concat_batches(aligner.schema(), &aligned_batches).context(ComputeArrowSnafu)?
445        } else {
446            concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
447                .context(ComputeArrowSnafu)?
448        };
449
450        // Sort the concatenated batch
451        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
452
453        Ok(Some(sorted_batch))
454    }
455
456    /// Converts all parts into a single sorted BulkPart.
457    /// Returns None if the collection is empty.
458    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
459        let Some(sorted_batch) = self.concat_and_sort()? else {
460            return Ok(None);
461        };
462
463        let timestamp_index = self.parts[0].timestamp_index;
464
465        Ok(Some(BulkPart {
466            batch: sorted_batch,
467            max_timestamp: self.max_timestamp,
468            min_timestamp: self.min_timestamp,
469            sequence: self.max_sequence,
470            timestamp_index,
471            raw_data: None,
472        }))
473    }
474
475    /// Clears all parts from this collection.
476    pub fn clear(&mut self) {
477        self.parts.clear();
478        self.total_rows = 0;
479        self.min_timestamp = i64::MAX;
480        self.max_timestamp = i64::MIN;
481        self.max_sequence = 0;
482    }
483}
484
485/// More accurate estimation of the size of a record batch.
486pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
487    batch
488        .columns()
489        .iter()
490        // If can not get slice memory size, assume 0 here.
491        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
492        .sum()
493}
494
495/// Primary key column builder for handling strings specially.
496enum PrimaryKeyColumnBuilder {
497    /// String dictionary builder for string types.
498    StringDict(StringDictionaryBuilder<UInt32Type>),
499    /// Generic mutable vector for other types.
500    Vector(Box<dyn MutableVector>),
501}
502
503impl PrimaryKeyColumnBuilder {
504    /// Appends a value to the builder.
505    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
506        match self {
507            PrimaryKeyColumnBuilder::StringDict(builder) => {
508                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
509                    // We know the value is a string.
510                    builder.append_value(s);
511                } else {
512                    builder.append_null();
513                }
514            }
515            PrimaryKeyColumnBuilder::Vector(builder) => {
516                builder.push_value_ref(&value);
517            }
518        }
519        Ok(())
520    }
521
522    /// Converts the builder to an ArrayRef.
523    fn into_arrow_array(self) -> ArrayRef {
524        match self {
525            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
526            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
527        }
528    }
529}
530
531/// Converter that converts structs into [BulkPart].
532pub struct BulkPartConverter {
533    /// Schema of the converted batch.
534    schema: SchemaRef,
535    /// Primary key codec for encoding keys
536    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
537    /// Buffer for encoding primary key.
538    key_buf: Vec<u8>,
539    /// Primary key array builder.
540    key_array_builder: PrimaryKeyArrayBuilder,
541    /// Builders for non-primary key columns.
542    value_builder: ValueBuilder,
543    /// Builders for individual primary key columns.
544    /// The order of builders is the same as the order of primary key columns in the region metadata.
545    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
546
547    /// Max timestamp value.
548    max_ts: i64,
549    /// Min timestamp value.
550    min_ts: i64,
551    /// Max sequence number.
552    max_sequence: SequenceNumber,
553}
554
555impl BulkPartConverter {
556    /// Creates a new converter.
557    ///
558    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
559    /// stores primary key columns in arrays additionally.
560    pub fn new(
561        region_metadata: &RegionMetadataRef,
562        schema: SchemaRef,
563        capacity: usize,
564        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
565        store_primary_key_columns: bool,
566    ) -> Self {
567        debug_assert_eq!(
568            region_metadata.primary_key_encoding,
569            primary_key_codec.encoding()
570        );
571
572        let primary_key_column_builders = if store_primary_key_columns
573            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
574        {
575            new_primary_key_column_builders(region_metadata, capacity)
576        } else {
577            Vec::new()
578        };
579
580        Self {
581            schema,
582            primary_key_codec,
583            key_buf: Vec::new(),
584            key_array_builder: PrimaryKeyArrayBuilder::new(),
585            value_builder: ValueBuilder::new(region_metadata, capacity),
586            primary_key_column_builders,
587            min_ts: i64::MAX,
588            max_ts: i64::MIN,
589            max_sequence: SequenceNumber::MIN,
590        }
591    }
592
593    /// Appends a [KeyValues] into the converter.
594    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595        for kv in key_values.iter() {
596            self.append_key_value(&kv)?;
597        }
598
599        Ok(())
600    }
601
602    /// Appends a [KeyValue] to builders.
603    ///
604    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
605    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606        // Handles primary key based on encoding type
607        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608            // For sparse encoding, the primary key is already encoded in the KeyValue
609            // Gets the first (and only) primary key value which contains the encoded key
610            let mut primary_keys = kv.primary_keys();
611            if let Some(encoded) = primary_keys
612                .next()
613                .context(ColumnNotFoundSnafu {
614                    column: PRIMARY_KEY_COLUMN_NAME,
615                })?
616                .try_into_binary()
617                .context(DataTypeMismatchSnafu)?
618            {
619                self.key_array_builder
620                    .append(encoded)
621                    .context(ComputeArrowSnafu)?;
622            } else {
623                self.key_array_builder
624                    .append("")
625                    .context(ComputeArrowSnafu)?;
626            }
627        } else {
628            // For dense encoding, we need to encode the primary key columns
629            self.key_buf.clear();
630            self.primary_key_codec
631                .encode_key_value(kv, &mut self.key_buf)
632                .context(EncodeSnafu)?;
633            self.key_array_builder
634                .append(&self.key_buf)
635                .context(ComputeArrowSnafu)?;
636        };
637
638        // If storing primary key columns, append values to individual builders
639        if !self.primary_key_column_builders.is_empty() {
640            for (builder, pk_value) in self
641                .primary_key_column_builders
642                .iter_mut()
643                .zip(kv.primary_keys())
644            {
645                builder.push_value_ref(pk_value)?;
646            }
647        }
648
649        // Pushes other columns.
650        self.value_builder.push(
651            kv.timestamp(),
652            kv.sequence(),
653            kv.op_type() as u8,
654            kv.fields(),
655        );
656
657        // Updates statistics
658        // Safety: timestamp of kv must be both present and a valid timestamp value.
659        let ts = kv
660            .timestamp()
661            .try_into_timestamp()
662            .unwrap()
663            .unwrap()
664            .value();
665        self.min_ts = self.min_ts.min(ts);
666        self.max_ts = self.max_ts.max(ts);
667        self.max_sequence = self.max_sequence.max(kv.sequence());
668
669        Ok(())
670    }
671
672    /// Converts buffered content into a [BulkPart].
673    ///
674    /// It sorts the record batch by (primary key, timestamp, sequence desc).
675    pub fn convert(mut self) -> Result<BulkPart> {
676        let values = Values::from(self.value_builder);
677        let mut columns =
678            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680        // Build primary key column arrays if enabled.
681        for builder in self.primary_key_column_builders {
682            columns.push(builder.into_arrow_array());
683        }
684        // Then fields columns.
685        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686        // Time index.
687        let timestamp_index = columns.len();
688        columns.push(values.timestamp.to_arrow_array());
689        // Primary key.
690        let pk_array = self.key_array_builder.finish();
691        columns.push(Arc::new(pk_array));
692        // Sequence and op type.
693        columns.push(values.sequence.to_arrow_array());
694        columns.push(values.op_type.to_arrow_array());
695
696        // The actual datatype of JSON array is data oriented, not to be derived from the Region
697        // metadata, which is static. So here we have to align the schema.
698        let schema = align_schema_with_json_array(self.schema, &columns);
699        let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
700        // Sorts the record batch.
701        let batch = sort_primary_key_record_batch(&batch)?;
702
703        Ok(BulkPart {
704            batch,
705            max_timestamp: self.max_ts,
706            min_timestamp: self.min_ts,
707            sequence: self.max_sequence,
708            timestamp_index,
709            raw_data: None,
710        })
711    }
712}
713
714fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
715    if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
716        return schema;
717    }
718
719    let mut fields = Vec::with_capacity(schema.fields().len());
720    for (field, array) in schema.fields().iter().zip(columns) {
721        if !is_structured_json_field(field) {
722            fields.push(field.clone());
723            continue;
724        }
725
726        let mut field = field.as_ref().clone();
727        field.set_data_type(array.data_type().clone());
728        fields.push(Arc::new(field));
729    }
730
731    Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
732}
733
734fn new_primary_key_column_builders(
735    metadata: &RegionMetadata,
736    capacity: usize,
737) -> Vec<PrimaryKeyColumnBuilder> {
738    metadata
739        .primary_key_columns()
740        .map(|col| {
741            if col.column_schema.data_type.is_string() {
742                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
743                    capacity,
744                    INIT_DICT_VALUE_CAPACITY,
745                    capacity,
746                ))
747            } else {
748                PrimaryKeyColumnBuilder::Vector(
749                    col.column_schema.data_type.create_mutable_vector(capacity),
750                )
751            }
752        })
753        .collect()
754}
755
756/// Sorts the record batch with primary key format.
757pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
758    let total_columns = batch.num_columns();
759    let sort_columns = vec![
760        // Primary key column (ascending)
761        SortColumn {
762            values: batch.column(total_columns - 3).clone(),
763            options: Some(SortOptions {
764                descending: false,
765                nulls_first: true,
766            }),
767        },
768        // Time index column (ascending)
769        SortColumn {
770            values: batch.column(total_columns - 4).clone(),
771            options: Some(SortOptions {
772                descending: false,
773                nulls_first: true,
774            }),
775        },
776        // Sequence column (descending)
777        SortColumn {
778            values: batch.column(total_columns - 2).clone(),
779            options: Some(SortOptions {
780                descending: true,
781                nulls_first: true,
782            }),
783        },
784    ];
785
786    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
787        .context(ComputeArrowSnafu)?;
788
789    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
790}
791
792/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
793/// with the same format as produced by [BulkPartConverter].
794///
795/// This function takes a `BulkPart` where:
796/// - For dense encoding: Primary key columns may be stored as individual columns
797/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
798/// - The batch may not be sorted
799///
800/// And produces a `BulkPart` where:
801/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
802/// - An encoded `__primary_key` dictionary column is present
803/// - The batch is sorted by (primary_key, timestamp, sequence desc)
804///
805/// # Arguments
806///
807/// * `part` - The input `BulkPart` to convert
808/// * `region_metadata` - Region metadata containing schema information
809/// * `primary_key_codec` - Codec for encoding primary keys
810/// * `schema` - Target schema for the output batch
811/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
812///
813/// # Returns
814///
815/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
816/// encoded primary keys and sorted data.
817pub fn convert_bulk_part(
818    part: BulkPart,
819    region_metadata: &RegionMetadataRef,
820    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
821    schema: SchemaRef,
822    store_primary_key_columns: bool,
823) -> Result<Option<BulkPart>> {
824    if part.num_rows() == 0 {
825        return Ok(None);
826    }
827
828    let num_rows = part.num_rows();
829    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
830
831    // Builds a column name-to-index map for efficient lookups
832    let input_schema = part.batch.schema();
833    let column_indices: HashMap<&str, usize> = input_schema
834        .fields()
835        .iter()
836        .enumerate()
837        .map(|(idx, field)| (field.name().as_str(), idx))
838        .collect();
839
840    // Determines the structure of the input batch by looking up columns by name
841    let mut output_columns = Vec::new();
842
843    // Extracts primary key columns if we need to encode them (dense encoding)
844    let pk_array = if is_sparse {
845        // For sparse encoding, the input should already have the __primary_key column
846        // We need to find it in the input batch
847        None
848    } else {
849        // For dense encoding, extract and encode primary key columns by name
850        let pk_vectors: Result<Vec<_>> = region_metadata
851            .primary_key_columns()
852            .map(|col_meta| {
853                let col_idx = column_indices
854                    .get(col_meta.column_schema.name.as_str())
855                    .context(ColumnNotFoundSnafu {
856                        column: &col_meta.column_schema.name,
857                    })?;
858                let col = part.batch.column(*col_idx);
859                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
860            })
861            .collect();
862        let pk_vectors = pk_vectors?;
863
864        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
865        let mut encode_buf = Vec::new();
866
867        for row_idx in 0..num_rows {
868            encode_buf.clear();
869
870            // Collects primary key values with column IDs for this row
871            let pk_values_with_ids: Vec<_> = region_metadata
872                .primary_key
873                .iter()
874                .zip(pk_vectors.iter())
875                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
876                .collect();
877
878            // Encodes the primary key
879            primary_key_codec
880                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
881                .context(EncodeSnafu)?;
882
883            key_array_builder
884                .append(&encode_buf)
885                .context(ComputeArrowSnafu)?;
886        }
887
888        Some(key_array_builder.finish())
889    };
890
891    // Adds primary key columns if storing them (only for dense encoding)
892    if store_primary_key_columns && !is_sparse {
893        for col_meta in region_metadata.primary_key_columns() {
894            let col_idx = column_indices
895                .get(col_meta.column_schema.name.as_str())
896                .context(ColumnNotFoundSnafu {
897                    column: &col_meta.column_schema.name,
898                })?;
899            let col = part.batch.column(*col_idx);
900
901            // Converts to dictionary if needed for string types
902            let col = if col_meta.column_schema.data_type.is_string() {
903                let target_type = ArrowDataType::Dictionary(
904                    Box::new(ArrowDataType::UInt32),
905                    Box::new(ArrowDataType::Utf8),
906                );
907                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
908            } else {
909                col.clone()
910            };
911            output_columns.push(col);
912        }
913    }
914
915    // Adds field columns
916    for col_meta in region_metadata.field_columns() {
917        let col_idx = column_indices
918            .get(col_meta.column_schema.name.as_str())
919            .context(ColumnNotFoundSnafu {
920                column: &col_meta.column_schema.name,
921            })?;
922        output_columns.push(part.batch.column(*col_idx).clone());
923    }
924
925    // Adds timestamp column
926    let new_timestamp_index = output_columns.len();
927    let ts_col_idx = column_indices
928        .get(
929            region_metadata
930                .time_index_column()
931                .column_schema
932                .name
933                .as_str(),
934        )
935        .context(ColumnNotFoundSnafu {
936            column: &region_metadata.time_index_column().column_schema.name,
937        })?;
938    output_columns.push(part.batch.column(*ts_col_idx).clone());
939
940    // Adds encoded primary key dictionary column
941    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
942        Arc::new(pk_dict_array) as ArrayRef
943    } else {
944        let pk_col_idx =
945            column_indices
946                .get(PRIMARY_KEY_COLUMN_NAME)
947                .context(ColumnNotFoundSnafu {
948                    column: PRIMARY_KEY_COLUMN_NAME,
949                })?;
950        let col = part.batch.column(*pk_col_idx);
951
952        // Casts to dictionary type if needed
953        let target_type = ArrowDataType::Dictionary(
954            Box::new(ArrowDataType::UInt32),
955            Box::new(ArrowDataType::Binary),
956        );
957        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
958    };
959    output_columns.push(pk_dictionary);
960
961    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
962    output_columns.push(Arc::new(sequence_array) as ArrayRef);
963
964    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
965    output_columns.push(Arc::new(op_type_array) as ArrayRef);
966
967    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
968
969    // Sorts the batch by (primary_key, timestamp, sequence desc)
970    let sorted_batch = sort_primary_key_record_batch(&batch)?;
971
972    Ok(Some(BulkPart {
973        batch: sorted_batch,
974        max_timestamp: part.max_timestamp,
975        min_timestamp: part.min_timestamp,
976        sequence: part.sequence,
977        timestamp_index: new_timestamp_index,
978        raw_data: None,
979    }))
980}
981
982#[derive(Debug, Clone)]
983pub struct EncodedBulkPart {
984    data: Bytes,
985    metadata: BulkPartMeta,
986    /// Cached Arrow schema to avoid rebuilding it from parquet metadata.
987    schema: SchemaRef,
988}
989
990impl EncodedBulkPart {
991    pub fn new(data: Bytes, metadata: BulkPartMeta, schema: SchemaRef) -> Self {
992        Self {
993            data,
994            metadata,
995            schema,
996        }
997    }
998
999    pub fn metadata(&self) -> &BulkPartMeta {
1000        &self.metadata
1001    }
1002
1003    pub(crate) fn schema(&self) -> SchemaRef {
1004        self.schema.clone()
1005    }
1006
1007    /// Returns the size of the encoded data in bytes
1008    pub(crate) fn size_bytes(&self) -> usize {
1009        self.data.len()
1010    }
1011
1012    /// Returns the encoded data.
1013    pub fn data(&self) -> &Bytes {
1014        &self.data
1015    }
1016
1017    /// Creates MemtableStats from this EncodedBulkPart.
1018    pub fn to_memtable_stats(&self) -> MemtableStats {
1019        let meta = &self.metadata;
1020        let ts_type = meta.region_metadata.time_index_type();
1021        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1022        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1023
1024        MemtableStats {
1025            estimated_bytes: self.size_bytes(),
1026            time_range: Some((min_ts, max_ts)),
1027            num_rows: meta.num_rows,
1028            num_ranges: 1,
1029            max_sequence: meta.max_sequence,
1030            series_count: meta.num_series as usize,
1031        }
1032    }
1033
1034    /// Converts this `EncodedBulkPart` to `SstInfo`.
1035    ///
1036    /// # Arguments
1037    /// * `file_id` - The SST file ID to assign to this part
1038    ///
1039    /// # Returns
1040    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
1041    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1042        let unit = self.metadata.region_metadata.time_index_type().unit();
1043        let max_row_group_uncompressed_size: u64 = self
1044            .metadata
1045            .parquet_metadata
1046            .row_groups()
1047            .iter()
1048            .map(|rg| {
1049                rg.columns()
1050                    .iter()
1051                    .map(|c| c.uncompressed_size() as u64)
1052                    .sum::<u64>()
1053            })
1054            .max()
1055            .unwrap_or(0);
1056        SstInfo {
1057            file_id,
1058            time_range: (
1059                Timestamp::new(self.metadata.min_timestamp, unit),
1060                Timestamp::new(self.metadata.max_timestamp, unit),
1061            ),
1062            file_size: self.data.len() as u64,
1063            max_row_group_uncompressed_size,
1064            num_rows: self.metadata.num_rows,
1065            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1066            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1067            index_metadata: IndexOutput::default(),
1068            num_series: self.metadata.num_series,
1069        }
1070    }
1071
1072    pub(crate) fn read(
1073        &self,
1074        context: BulkIterContextRef,
1075        sequence: Option<SequenceRange>,
1076        mem_scan_metrics: Option<MemScanMetrics>,
1077    ) -> Result<Option<BoxedRecordBatchIterator>> {
1078        // Compute skip_fields for row group pruning from the configured pre-filter mode.
1079        let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1080
1081        // use predicate to find row groups to read.
1082        let row_groups_to_read =
1083            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1084
1085        if row_groups_to_read.is_empty() {
1086            // All row groups are filtered.
1087            return Ok(None);
1088        }
1089
1090        let iter = EncodedBulkPartIter::try_new(
1091            self,
1092            context,
1093            row_groups_to_read,
1094            sequence,
1095            mem_scan_metrics,
1096        )?;
1097        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1098    }
1099}
1100
1101// TODO(yingwen): max_sequence
1102#[derive(Debug, Clone)]
1103pub struct BulkPartMeta {
1104    /// Total rows in part.
1105    pub num_rows: usize,
1106    /// Max timestamp in part.
1107    pub max_timestamp: i64,
1108    /// Min timestamp in part.
1109    pub min_timestamp: i64,
1110    /// Part file metadata.
1111    pub parquet_metadata: Arc<ParquetMetaData>,
1112    /// Part region schema.
1113    pub region_metadata: RegionMetadataRef,
1114    /// Number of series.
1115    pub num_series: u64,
1116    /// Maximum sequence number in part.
1117    pub max_sequence: u64,
1118}
1119
1120/// Metrics for encoding a part.
1121#[derive(Default, Debug)]
1122pub struct BulkPartEncodeMetrics {
1123    /// Cost of iterating over the data.
1124    pub iter_cost: Duration,
1125    /// Cost of writing the data.
1126    pub write_cost: Duration,
1127    /// Size of data before encoding.
1128    pub raw_size: usize,
1129    /// Size of data after encoding.
1130    pub encoded_size: usize,
1131    /// Number of rows in part.
1132    pub num_rows: usize,
1133}
1134
1135pub struct BulkPartEncoder {
1136    metadata: RegionMetadataRef,
1137    writer_props: Option<WriterProperties>,
1138}
1139
1140impl BulkPartEncoder {
1141    pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1142        // TODO(yingwen): Skip arrow schema if needed.
1143        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1144        let key_value_meta =
1145            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1146
1147        // TODO(yingwen): Do we need compression?
1148        let writer_props = Some(
1149            WriterProperties::builder()
1150                .set_key_value_metadata(Some(vec![key_value_meta]))
1151                .set_write_batch_size(row_group_size)
1152                .set_max_row_group_row_count(Some(row_group_size))
1153                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1154                .set_column_index_truncate_length(None)
1155                .set_statistics_truncate_length(None)
1156                .build(),
1157        );
1158
1159        Ok(Self {
1160            metadata,
1161            writer_props,
1162        })
1163    }
1164}
1165
1166impl BulkPartEncoder {
1167    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1168    pub fn encode_record_batch_iter(
1169        &self,
1170        iter: BoxedRecordBatchIterator,
1171        arrow_schema: SchemaRef,
1172        min_timestamp: i64,
1173        max_timestamp: i64,
1174        max_sequence: u64,
1175        metrics: &mut BulkPartEncodeMetrics,
1176    ) -> Result<Option<EncodedBulkPart>> {
1177        let mut buf = Vec::with_capacity(4096);
1178        let mut writer =
1179            ArrowWriter::try_new(&mut buf, arrow_schema.clone(), self.writer_props.clone())
1180                .context(EncodeMemtableSnafu)?;
1181        let mut total_rows = 0;
1182        let mut series_estimator = SeriesEstimator::default();
1183
1184        // Process each batch from the iterator
1185        let mut iter_start = Instant::now();
1186        for batch_result in iter {
1187            metrics.iter_cost += iter_start.elapsed();
1188            let batch = batch_result?;
1189            if batch.num_rows() == 0 {
1190                continue;
1191            }
1192
1193            series_estimator.update_flat(&batch);
1194            metrics.raw_size += record_batch_estimated_size(&batch);
1195            let write_start = Instant::now();
1196            writer.write(&batch).context(EncodeMemtableSnafu)?;
1197            metrics.write_cost += write_start.elapsed();
1198            total_rows += batch.num_rows();
1199            iter_start = Instant::now();
1200        }
1201        metrics.iter_cost += iter_start.elapsed();
1202
1203        if total_rows == 0 {
1204            return Ok(None);
1205        }
1206
1207        let close_start = Instant::now();
1208        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1209        metrics.write_cost += close_start.elapsed();
1210        metrics.encoded_size += buf.len();
1211        metrics.num_rows += total_rows;
1212
1213        let buf = Bytes::from(buf);
1214        let parquet_metadata = Arc::new(file_metadata);
1215        let num_series = series_estimator.finish();
1216
1217        Ok(Some(EncodedBulkPart {
1218            data: buf,
1219            metadata: BulkPartMeta {
1220                num_rows: total_rows,
1221                max_timestamp,
1222                min_timestamp,
1223                parquet_metadata,
1224                region_metadata: self.metadata.clone(),
1225                num_series,
1226                max_sequence,
1227            },
1228            schema: arrow_schema,
1229        }))
1230    }
1231
1232    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1233    pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1234        if part.batch.num_rows() == 0 {
1235            return Ok(None);
1236        }
1237
1238        let mut buf = Vec::with_capacity(4096);
1239        let arrow_schema = part.batch.schema();
1240
1241        let file_metadata = {
1242            let mut writer =
1243                ArrowWriter::try_new(&mut buf, arrow_schema.clone(), self.writer_props.clone())
1244                    .context(EncodeMemtableSnafu)?;
1245            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1246            writer.finish().context(EncodeMemtableSnafu)?
1247        };
1248
1249        let buf = Bytes::from(buf);
1250        let parquet_metadata = Arc::new(file_metadata);
1251
1252        Ok(Some(EncodedBulkPart {
1253            data: buf,
1254            metadata: BulkPartMeta {
1255                num_rows: part.batch.num_rows(),
1256                max_timestamp: part.max_timestamp,
1257                min_timestamp: part.min_timestamp,
1258                parquet_metadata,
1259                region_metadata: self.metadata.clone(),
1260                num_series: part.estimated_series_count() as u64,
1261                max_sequence: part.sequence,
1262            },
1263            schema: arrow_schema,
1264        }))
1265    }
1266}
1267
1268/// Per-batch min/max statistics for the first tag column in a `MultiBulkPart`.
1269///
1270/// Since batches are sorted by primary key, we can extract the min/max of the first tag
1271/// from the first/last row's encoded primary key in each batch. These statistics enable
1272/// batch-level pruning using predicates, analogous to row-group pruning in parquet.
1273#[derive(Debug, Clone)]
1274struct BatchStats {
1275    /// Number of batches.
1276    num_batches: usize,
1277    /// Column id of the first tag.
1278    first_tag_id: ColumnId,
1279    /// Min values of the first tag, one element per batch.
1280    min_values: ArrayRef,
1281    /// Max values of the first tag, one element per batch.
1282    max_values: ArrayRef,
1283}
1284
1285impl BatchStats {
1286    /// Computes batch statistics from a slice of record batches.
1287    ///
1288    /// Returns `None` if there is no primary key (no first tag to collect stats for)
1289    /// or if extracting statistics fails.
1290    fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1291        // `primary_key.first()` is correct for both dense and sparse encodings.
1292        // For dense, values follow the order of `metadata.primary_key`.
1293        // For sparse, `decode_leftmost` decodes the first value which also
1294        // corresponds to `primary_key.first()`. See `SparsePrimaryKeyCodec` for format details.
1295        let first_tag_id = *metadata.primary_key.first()?;
1296        let first_tag_column = metadata.column_by_id(first_tag_id)?;
1297        let data_type = &first_tag_column.column_schema.data_type;
1298
1299        let converter = build_primary_key_codec_with_fields(
1300            metadata.primary_key_encoding,
1301            [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1302        );
1303        let pk_index = primary_key_column_index(batches.first()?.num_columns());
1304
1305        let mut min_builder = data_type.create_mutable_vector(batches.len());
1306        let mut max_builder = data_type.create_mutable_vector(batches.len());
1307
1308        for batch in batches {
1309            match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1310                Some((min_val, max_val)) => {
1311                    min_builder.push_value_ref(&min_val.as_value_ref());
1312                    max_builder.push_value_ref(&max_val.as_value_ref());
1313                }
1314                None => {
1315                    min_builder.push_null();
1316                    max_builder.push_null();
1317                }
1318            }
1319        }
1320
1321        Some(Self {
1322            num_batches: batches.len(),
1323            first_tag_id,
1324            min_values: min_builder.to_vector().to_arrow_array(),
1325            max_values: max_builder.to_vector().to_arrow_array(),
1326        })
1327    }
1328
1329    /// Extracts the first tag value from the first and last rows of a batch.
1330    fn extract_first_tag_bounds(
1331        batch: &RecordBatch,
1332        pk_index: usize,
1333        converter: &dyn PrimaryKeyCodec,
1334    ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1335        if batch.num_rows() == 0 {
1336            return None;
1337        }
1338
1339        let pk_dict = batch
1340            .column(pk_index)
1341            .as_any()
1342            .downcast_ref::<PrimaryKeyArray>()?;
1343        let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1344
1345        let keys = pk_dict.keys();
1346        let min_key = keys.value(0);
1347        let max_key = keys.value(batch.num_rows() - 1);
1348        let min_bytes = pk_values.value(min_key as usize);
1349        let max_bytes = pk_values.value(max_key as usize);
1350
1351        Some((
1352            converter.decode_leftmost(min_bytes).ok()??,
1353            converter.decode_leftmost(max_bytes).ok()??,
1354        ))
1355    }
1356}
1357
1358/// Adapter implementing `PruningStatistics` for `BatchStats`.
1359///
1360/// Used with `Predicate::prune_with_stats()` to skip batches whose first-tag
1361/// min/max range does not match the query predicate.
1362struct BatchPruningStats<'a> {
1363    stats: &'a BatchStats,
1364    metadata: &'a RegionMetadataRef,
1365}
1366
1367impl PruningStatistics for BatchPruningStats<'_> {
1368    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1369        let col = self.metadata.column_by_name(&column.name)?;
1370        if col.column_id == self.stats.first_tag_id {
1371            Some(self.stats.min_values.clone())
1372        } else {
1373            None
1374        }
1375    }
1376
1377    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1378        let col = self.metadata.column_by_name(&column.name)?;
1379        if col.column_id == self.stats.first_tag_id {
1380            Some(self.stats.max_values.clone())
1381        } else {
1382            None
1383        }
1384    }
1385
1386    fn num_containers(&self) -> usize {
1387        self.stats.num_batches
1388    }
1389
1390    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1391        None
1392    }
1393
1394    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1395        None
1396    }
1397
1398    fn contained(
1399        &self,
1400        _column: &Column,
1401        _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1402    ) -> Option<BooleanArray> {
1403        None
1404    }
1405}
1406
1407/// Returns true if the predicate references the given column name.
1408fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1409    let mut columns = HashSet::new();
1410    for expr in predicate.exprs() {
1411        let _ = expr_to_columns(expr, &mut columns);
1412    }
1413    columns.iter().any(|col| col.name == column_name)
1414}
1415
1416/// Returns true if the batch should be pruned (skipped) based on the first-tag min/max
1417/// statistics and the predicate in the context. Returns false if no pruning is possible
1418/// (no primary key, no predicate, or the batch matches the predicate).
1419pub(crate) fn should_prune_bulk_part(
1420    batch: &RecordBatch,
1421    context: &BulkIterContext,
1422    metadata: &RegionMetadata,
1423) -> bool {
1424    let predicate = match &context.predicate {
1425        Some(p) => p,
1426        None => return false,
1427    };
1428    // Check if the predicate references the first tag column to avoid computing
1429    // expensive batch statistics when they won't help with pruning.
1430    let first_tag_id = match metadata.primary_key.first() {
1431        Some(id) => *id,
1432        None => return false,
1433    };
1434    // Safety: `first_tag_id` comes from `metadata.primary_key` so the column always exists.
1435    let first_tag_name = &metadata
1436        .column_by_id(first_tag_id)
1437        .unwrap()
1438        .column_schema
1439        .name;
1440    if !predicate_references_column(predicate, first_tag_name) {
1441        return false;
1442    }
1443    let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1444        Some(s) => s,
1445        None => return false,
1446    };
1447    let region_meta = context.read_format().metadata();
1448    let pruning_stats = BatchPruningStats {
1449        stats: &stats,
1450        metadata: region_meta,
1451    };
1452    let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1453    !mask.first().copied().unwrap_or(true)
1454}
1455
1456/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1457///
1458/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1459/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1460/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1461#[derive(Debug, Clone)]
1462pub struct MultiBulkPart {
1463    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1464    batches: SmallVec<[RecordBatch; 4]>,
1465    /// Total rows across all batches.
1466    total_rows: usize,
1467    /// Max timestamp in part.
1468    max_timestamp: i64,
1469    /// Min timestamp in part.
1470    min_timestamp: i64,
1471    /// Max sequence number in part.
1472    max_sequence: SequenceNumber,
1473    /// Number of series.
1474    series_count: usize,
1475    /// Pre-computed per-batch statistics for the first tag column.
1476    /// `None` if there is no primary key.
1477    batch_stats: Option<BatchStats>,
1478}
1479
1480impl MultiBulkPart {
1481    /// Creates a new MultiBulkPart from a single BulkPart.
1482    pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1483        let num_rows = part.num_rows();
1484        let series_count = part.estimated_series_count();
1485        let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1486        let mut batches = SmallVec::new();
1487        batches.push(part.batch);
1488
1489        Self {
1490            batches,
1491            total_rows: num_rows,
1492            max_timestamp: part.max_timestamp,
1493            min_timestamp: part.min_timestamp,
1494            max_sequence: part.sequence,
1495            series_count,
1496            batch_stats,
1497        }
1498    }
1499
1500    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1501    ///
1502    /// # Arguments
1503    /// * `batches` - Ordered record batches
1504    /// * `min_timestamp` - Minimum timestamp across all batches
1505    /// * `max_timestamp` - Maximum timestamp across all batches
1506    /// * `max_sequence` - Maximum sequence number across all batches
1507    /// * `series_count` - Number of series in the batches
1508    /// * `metadata` - Region metadata for computing batch statistics
1509    ///
1510    /// # Panics
1511    /// Panics if batches is empty.
1512    pub fn new(
1513        batches: Vec<RecordBatch>,
1514        min_timestamp: i64,
1515        max_timestamp: i64,
1516        max_sequence: SequenceNumber,
1517        series_count: usize,
1518        metadata: &RegionMetadata,
1519    ) -> Self {
1520        assert!(!batches.is_empty(), "batches must not be empty");
1521
1522        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1523        let batch_stats = BatchStats::compute(&batches, metadata);
1524
1525        Self {
1526            batches: SmallVec::from_vec(batches),
1527            total_rows,
1528            max_timestamp,
1529            min_timestamp,
1530            max_sequence,
1531            series_count,
1532            batch_stats,
1533        }
1534    }
1535
1536    /// Returns the total number of rows across all batches.
1537    pub fn num_rows(&self) -> usize {
1538        self.total_rows
1539    }
1540
1541    pub(crate) fn schemas(&self) -> impl Iterator<Item = SchemaRef> + '_ {
1542        self.batches.iter().map(|batch| batch.schema())
1543    }
1544
1545    /// Returns the minimum timestamp.
1546    pub fn min_timestamp(&self) -> i64 {
1547        self.min_timestamp
1548    }
1549
1550    /// Returns the maximum timestamp.
1551    pub fn max_timestamp(&self) -> i64 {
1552        self.max_timestamp
1553    }
1554
1555    /// Returns the maximum sequence number.
1556    pub fn max_sequence(&self) -> SequenceNumber {
1557        self.max_sequence
1558    }
1559
1560    /// Returns the number of series.
1561    pub fn series_count(&self) -> usize {
1562        self.series_count
1563    }
1564
1565    /// Returns the number of record batches in this part.
1566    pub fn num_batches(&self) -> usize {
1567        self.batches.len()
1568    }
1569
1570    /// Returns the estimated memory size of all batches.
1571    pub(crate) fn estimated_size(&self) -> usize {
1572        self.batches.iter().map(record_batch_estimated_size).sum()
1573    }
1574
1575    /// Reads data from this part with the given context and filters.
1576    ///
1577    /// If batch-level statistics are available and a predicate is set, prunes
1578    /// batches whose first-tag min/max range doesn't match the predicate before
1579    /// creating the iterator.
1580    pub(crate) fn read(
1581        &self,
1582        context: BulkIterContextRef,
1583        sequence: Option<SequenceRange>,
1584        mem_scan_metrics: Option<MemScanMetrics>,
1585    ) -> Result<Option<BoxedRecordBatchIterator>> {
1586        if self.batches.is_empty() {
1587            return Ok(None);
1588        }
1589
1590        let batches_to_read = self.prune_batches(&context);
1591
1592        if batches_to_read.is_empty() {
1593            return Ok(None);
1594        }
1595
1596        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1597            batches_to_read,
1598            context,
1599            sequence,
1600            self.series_count,
1601            mem_scan_metrics,
1602        );
1603        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1604    }
1605
1606    /// Prunes batches using the first-tag min/max statistics and the predicate.
1607    /// Returns all batches if no stats or no predicate is available.
1608    fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1609        if let Some(stats) = &self.batch_stats
1610            && let Some(predicate) = &context.predicate
1611        {
1612            let region_meta = context.read_format().metadata();
1613            let pruning_stats = BatchPruningStats {
1614                stats,
1615                metadata: region_meta,
1616            };
1617            let mask =
1618                predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1619            self.batches
1620                .iter()
1621                .zip(mask.iter())
1622                .filter_map(
1623                    |(batch, &selected)| {
1624                        if selected { Some(batch.clone()) } else { None }
1625                    },
1626                )
1627                .collect()
1628        } else {
1629            self.batches.iter().cloned().collect()
1630        }
1631    }
1632
1633    /// Converts this `MultiBulkPart` to `MemtableStats`.
1634    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1635        let ts_type = region_metadata.time_index_type();
1636        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1637        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1638
1639        MemtableStats {
1640            estimated_bytes: self.estimated_size(),
1641            time_range: Some((min_ts, max_ts)),
1642            num_rows: self.num_rows(),
1643            num_ranges: 1,
1644            max_sequence: self.max_sequence,
1645            series_count: self.series_count,
1646        }
1647    }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652    use api::v1::{Row, SemanticType, WriteHint};
1653    use datafusion_common::ScalarValue;
1654    use datatypes::arrow::array::{
1655        BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1656    };
1657    use datatypes::arrow::datatypes::UInt32Type;
1658    use datatypes::prelude::{ConcreteDataType, Value};
1659    use datatypes::schema::ColumnSchema;
1660    use mito_codec::row_converter::build_primary_key_codec;
1661    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1662    use store_api::storage::RegionId;
1663    use store_api::storage::consts::ReservedColumnId;
1664    use table::predicate::Predicate;
1665
1666    use super::*;
1667    use crate::memtable::bulk::context::BulkIterContext;
1668    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1669    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1670
1671    struct MutationInput<'a> {
1672        k0: &'a str,
1673        k1: u32,
1674        timestamps: &'a [i64],
1675        v1: &'a [Option<f64>],
1676        sequence: u64,
1677    }
1678
1679    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1680        let metadata = metadata_for_test();
1681        let kvs = input
1682            .iter()
1683            .map(|m| {
1684                build_key_values_with_ts_seq_values(
1685                    &metadata,
1686                    m.k0.to_string(),
1687                    m.k1,
1688                    m.timestamps.iter().copied(),
1689                    m.v1.iter().copied(),
1690                    m.sequence,
1691                )
1692            })
1693            .collect::<Vec<_>>();
1694        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1695        let primary_key_codec = build_primary_key_codec(&metadata);
1696        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1697        for kv in kvs {
1698            converter.append_key_values(&kv).unwrap();
1699        }
1700        let part = converter.convert().unwrap();
1701        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1702        encoder.encode_part(&part).unwrap().unwrap()
1703    }
1704
1705    #[test]
1706    fn test_write_and_read_part_projection() {
1707        let part = encode(&[
1708            MutationInput {
1709                k0: "a",
1710                k1: 0,
1711                timestamps: &[1],
1712                v1: &[Some(0.1)],
1713                sequence: 0,
1714            },
1715            MutationInput {
1716                k0: "b",
1717                k1: 0,
1718                timestamps: &[1],
1719                v1: &[Some(0.0)],
1720                sequence: 0,
1721            },
1722            MutationInput {
1723                k0: "a",
1724                k1: 0,
1725                timestamps: &[2],
1726                v1: &[Some(0.2)],
1727                sequence: 1,
1728            },
1729        ]);
1730
1731        let projection = &[4u32];
1732        let reader = part
1733            .read(
1734                Arc::new(
1735                    BulkIterContext::new(
1736                        part.metadata.region_metadata.clone(),
1737                        Some(projection.as_slice()),
1738                        None,
1739                        false,
1740                    )
1741                    .unwrap(),
1742                ),
1743                None,
1744                None,
1745            )
1746            .unwrap()
1747            .expect("expect at least one row group");
1748
1749        let mut total_rows_read = 0;
1750        let mut field: Vec<f64> = vec![];
1751        for res in reader {
1752            let batch = res.unwrap();
1753            assert_eq!(5, batch.num_columns());
1754            field.extend_from_slice(
1755                batch
1756                    .column(0)
1757                    .as_any()
1758                    .downcast_ref::<Float64Array>()
1759                    .unwrap()
1760                    .values(),
1761            );
1762            total_rows_read += batch.num_rows();
1763        }
1764        assert_eq!(3, total_rows_read);
1765        assert_eq!(vec![0.1, 0.2, 0.0], field);
1766    }
1767
1768    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1769        let metadata = metadata_for_test();
1770        let kvs = key_values
1771            .into_iter()
1772            .map(|(k0, k1, (start, end), sequence)| {
1773                let ts = start..end;
1774                let v1 = (start..end).map(|_| None);
1775                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1776            })
1777            .collect::<Vec<_>>();
1778        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1779        let primary_key_codec = build_primary_key_codec(&metadata);
1780        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1781        for kv in kvs {
1782            converter.append_key_values(&kv).unwrap();
1783        }
1784        let part = converter.convert().unwrap();
1785        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1786        encoder.encode_part(&part).unwrap().unwrap()
1787    }
1788
1789    fn check_prune_row_group(
1790        part: &EncodedBulkPart,
1791        predicate: Option<Predicate>,
1792        expected_rows: usize,
1793    ) {
1794        let context = Arc::new(
1795            BulkIterContext::new(
1796                part.metadata.region_metadata.clone(),
1797                None,
1798                predicate,
1799                false,
1800            )
1801            .unwrap(),
1802        );
1803        let reader = part
1804            .read(context, None, None)
1805            .unwrap()
1806            .expect("expect at least one row group");
1807        let mut total_rows_read = 0;
1808        for res in reader {
1809            let batch = res.unwrap();
1810            total_rows_read += batch.num_rows();
1811        }
1812        // Should only read row group 1.
1813        assert_eq!(expected_rows, total_rows_read);
1814    }
1815
1816    #[test]
1817    fn test_prune_row_groups() {
1818        let part = prepare(vec![
1819            ("a", 0, (0, 40), 1),
1820            ("a", 1, (0, 60), 1),
1821            ("b", 0, (0, 100), 2),
1822            ("b", 1, (100, 180), 3),
1823            ("b", 1, (180, 210), 4),
1824        ]);
1825
1826        let context = Arc::new(
1827            BulkIterContext::new(
1828                part.metadata.region_metadata.clone(),
1829                None,
1830                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1831                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1832                )])),
1833                false,
1834            )
1835            .unwrap(),
1836        );
1837        assert!(part.read(context, None, None).unwrap().is_none());
1838
1839        check_prune_row_group(&part, None, 310);
1840
1841        check_prune_row_group(
1842            &part,
1843            Some(Predicate::new(vec![
1844                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1845                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1846            ])),
1847            40,
1848        );
1849
1850        check_prune_row_group(
1851            &part,
1852            Some(Predicate::new(vec![
1853                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1854                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1855            ])),
1856            60,
1857        );
1858
1859        check_prune_row_group(
1860            &part,
1861            Some(Predicate::new(vec![
1862                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1863            ])),
1864            100,
1865        );
1866
1867        check_prune_row_group(
1868            &part,
1869            Some(Predicate::new(vec![
1870                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1871                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1872            ])),
1873            100,
1874        );
1875
1876        // Predicates over field column can do precise filtering.
1877        check_prune_row_group(
1878            &part,
1879            Some(Predicate::new(vec![
1880                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1881            ])),
1882            1,
1883        );
1884    }
1885
1886    #[test]
1887    fn test_bulk_part_converter_append_and_convert() {
1888        let metadata = metadata_for_test();
1889        let capacity = 100;
1890        let primary_key_codec = build_primary_key_codec(&metadata);
1891        let schema = to_flat_sst_arrow_schema(
1892            &metadata,
1893            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1894        );
1895
1896        let mut converter =
1897            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1898
1899        let key_values1 = build_key_values_with_ts_seq_values(
1900            &metadata,
1901            "key1".to_string(),
1902            1u32,
1903            vec![1000, 2000].into_iter(),
1904            vec![Some(1.0), Some(2.0)].into_iter(),
1905            1,
1906        );
1907
1908        let key_values2 = build_key_values_with_ts_seq_values(
1909            &metadata,
1910            "key2".to_string(),
1911            2u32,
1912            vec![1500].into_iter(),
1913            vec![Some(3.0)].into_iter(),
1914            2,
1915        );
1916
1917        converter.append_key_values(&key_values1).unwrap();
1918        converter.append_key_values(&key_values2).unwrap();
1919
1920        let bulk_part = converter.convert().unwrap();
1921
1922        assert_eq!(bulk_part.num_rows(), 3);
1923        assert_eq!(bulk_part.min_timestamp, 1000);
1924        assert_eq!(bulk_part.max_timestamp, 2000);
1925        assert_eq!(bulk_part.sequence, 2);
1926        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1927
1928        // Validate primary key columns are stored
1929        // Schema should include primary key columns k0 and k1 at the beginning
1930        let schema = bulk_part.batch.schema();
1931        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1932        assert_eq!(
1933            field_names,
1934            vec![
1935                "k0",
1936                "k1",
1937                "v0",
1938                "v1",
1939                "ts",
1940                "__primary_key",
1941                "__sequence",
1942                "__op_type"
1943            ]
1944        );
1945    }
1946
1947    #[test]
1948    fn test_bulk_part_converter_sorting() {
1949        let metadata = metadata_for_test();
1950        let capacity = 100;
1951        let primary_key_codec = build_primary_key_codec(&metadata);
1952        let schema = to_flat_sst_arrow_schema(
1953            &metadata,
1954            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1955        );
1956
1957        let mut converter =
1958            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1959
1960        let key_values1 = build_key_values_with_ts_seq_values(
1961            &metadata,
1962            "z_key".to_string(),
1963            3u32,
1964            vec![3000].into_iter(),
1965            vec![Some(3.0)].into_iter(),
1966            3,
1967        );
1968
1969        let key_values2 = build_key_values_with_ts_seq_values(
1970            &metadata,
1971            "a_key".to_string(),
1972            1u32,
1973            vec![1000].into_iter(),
1974            vec![Some(1.0)].into_iter(),
1975            1,
1976        );
1977
1978        let key_values3 = build_key_values_with_ts_seq_values(
1979            &metadata,
1980            "m_key".to_string(),
1981            2u32,
1982            vec![2000].into_iter(),
1983            vec![Some(2.0)].into_iter(),
1984            2,
1985        );
1986
1987        converter.append_key_values(&key_values1).unwrap();
1988        converter.append_key_values(&key_values2).unwrap();
1989        converter.append_key_values(&key_values3).unwrap();
1990
1991        let bulk_part = converter.convert().unwrap();
1992
1993        assert_eq!(bulk_part.num_rows(), 3);
1994
1995        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1996        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1997
1998        let ts_array = ts_column
1999            .as_any()
2000            .downcast_ref::<TimestampMillisecondArray>()
2001            .unwrap();
2002        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2003
2004        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2005        assert_eq!(seq_array.values(), &[1, 2, 3]);
2006
2007        // Validate primary key columns are stored
2008        let schema = bulk_part.batch.schema();
2009        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2010        assert_eq!(
2011            field_names,
2012            vec![
2013                "k0",
2014                "k1",
2015                "v0",
2016                "v1",
2017                "ts",
2018                "__primary_key",
2019                "__sequence",
2020                "__op_type"
2021            ]
2022        );
2023    }
2024
2025    #[test]
2026    fn test_bulk_part_converter_empty() {
2027        let metadata = metadata_for_test();
2028        let capacity = 10;
2029        let primary_key_codec = build_primary_key_codec(&metadata);
2030        let schema = to_flat_sst_arrow_schema(
2031            &metadata,
2032            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2033        );
2034
2035        let converter =
2036            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2037
2038        let bulk_part = converter.convert().unwrap();
2039
2040        assert_eq!(bulk_part.num_rows(), 0);
2041        assert_eq!(bulk_part.min_timestamp, i64::MAX);
2042        assert_eq!(bulk_part.max_timestamp, i64::MIN);
2043        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2044
2045        // Validate primary key columns are present in schema even for empty batch
2046        let schema = bulk_part.batch.schema();
2047        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2048        assert_eq!(
2049            field_names,
2050            vec![
2051                "k0",
2052                "k1",
2053                "v0",
2054                "v1",
2055                "ts",
2056                "__primary_key",
2057                "__sequence",
2058                "__op_type"
2059            ]
2060        );
2061    }
2062
2063    #[test]
2064    fn test_bulk_part_converter_without_primary_key_columns() {
2065        let metadata = metadata_for_test();
2066        let primary_key_codec = build_primary_key_codec(&metadata);
2067        let schema = to_flat_sst_arrow_schema(
2068            &metadata,
2069            &FlatSchemaOptions {
2070                raw_pk_columns: false,
2071                string_pk_use_dict: true,
2072                ..Default::default()
2073            },
2074        );
2075
2076        let capacity = 100;
2077        let mut converter =
2078            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2079
2080        let key_values1 = build_key_values_with_ts_seq_values(
2081            &metadata,
2082            "key1".to_string(),
2083            1u32,
2084            vec![1000, 2000].into_iter(),
2085            vec![Some(1.0), Some(2.0)].into_iter(),
2086            1,
2087        );
2088
2089        let key_values2 = build_key_values_with_ts_seq_values(
2090            &metadata,
2091            "key2".to_string(),
2092            2u32,
2093            vec![1500].into_iter(),
2094            vec![Some(3.0)].into_iter(),
2095            2,
2096        );
2097
2098        converter.append_key_values(&key_values1).unwrap();
2099        converter.append_key_values(&key_values2).unwrap();
2100
2101        let bulk_part = converter.convert().unwrap();
2102
2103        assert_eq!(bulk_part.num_rows(), 3);
2104        assert_eq!(bulk_part.min_timestamp, 1000);
2105        assert_eq!(bulk_part.max_timestamp, 2000);
2106        assert_eq!(bulk_part.sequence, 2);
2107        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2108
2109        // Validate primary key columns are NOT stored individually
2110        let schema = bulk_part.batch.schema();
2111        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2112        assert_eq!(
2113            field_names,
2114            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2115        );
2116    }
2117
2118    #[allow(clippy::too_many_arguments)]
2119    fn build_key_values_with_sparse_encoding(
2120        metadata: &RegionMetadataRef,
2121        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2122        table_id: u32,
2123        tsid: u64,
2124        k0: String,
2125        k1: String,
2126        timestamps: impl Iterator<Item = i64>,
2127        values: impl Iterator<Item = Option<f64>>,
2128        sequence: SequenceNumber,
2129    ) -> KeyValues {
2130        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
2131        let pk_values = vec![
2132            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2133            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2134            (0, Value::String(k0.clone().into())),
2135            (1, Value::String(k1.clone().into())),
2136        ];
2137        let mut encoded_key = Vec::new();
2138        primary_key_codec
2139            .encode_values(&pk_values, &mut encoded_key)
2140            .unwrap();
2141        assert!(!encoded_key.is_empty());
2142
2143        // Create schema for sparse encoding: __primary_key, ts, v0, v1
2144        let column_schema = vec![
2145            api::v1::ColumnSchema {
2146                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2147                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2148                    ConcreteDataType::binary_datatype(),
2149                )
2150                .unwrap()
2151                .datatype() as i32,
2152                semantic_type: api::v1::SemanticType::Tag as i32,
2153                ..Default::default()
2154            },
2155            api::v1::ColumnSchema {
2156                column_name: "ts".to_string(),
2157                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2158                    ConcreteDataType::timestamp_millisecond_datatype(),
2159                )
2160                .unwrap()
2161                .datatype() as i32,
2162                semantic_type: api::v1::SemanticType::Timestamp as i32,
2163                ..Default::default()
2164            },
2165            api::v1::ColumnSchema {
2166                column_name: "v0".to_string(),
2167                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2168                    ConcreteDataType::int64_datatype(),
2169                )
2170                .unwrap()
2171                .datatype() as i32,
2172                semantic_type: api::v1::SemanticType::Field as i32,
2173                ..Default::default()
2174            },
2175            api::v1::ColumnSchema {
2176                column_name: "v1".to_string(),
2177                datatype: api::helper::ColumnDataTypeWrapper::try_from(
2178                    ConcreteDataType::float64_datatype(),
2179                )
2180                .unwrap()
2181                .datatype() as i32,
2182                semantic_type: api::v1::SemanticType::Field as i32,
2183                ..Default::default()
2184            },
2185        ];
2186
2187        let rows = timestamps
2188            .zip(values)
2189            .map(|(ts, v)| Row {
2190                values: vec![
2191                    api::v1::Value {
2192                        value_data: Some(api::v1::value::ValueData::BinaryValue(
2193                            encoded_key.clone(),
2194                        )),
2195                    },
2196                    api::v1::Value {
2197                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2198                    },
2199                    api::v1::Value {
2200                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2201                    },
2202                    api::v1::Value {
2203                        value_data: v.map(api::v1::value::ValueData::F64Value),
2204                    },
2205                ],
2206            })
2207            .collect();
2208
2209        let mutation = api::v1::Mutation {
2210            op_type: 1,
2211            sequence,
2212            rows: Some(api::v1::Rows {
2213                schema: column_schema,
2214                rows,
2215            }),
2216            write_hint: Some(WriteHint {
2217                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2218            }),
2219        };
2220        KeyValues::new(metadata.as_ref(), mutation).unwrap()
2221    }
2222
2223    #[test]
2224    fn test_bulk_part_converter_sparse_primary_key_encoding() {
2225        use api::v1::SemanticType;
2226        use datatypes::schema::ColumnSchema;
2227        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2228        use store_api::storage::RegionId;
2229
2230        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2231        builder
2232            .push_column_metadata(ColumnMetadata {
2233                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2234                semantic_type: SemanticType::Tag,
2235                column_id: 0,
2236            })
2237            .push_column_metadata(ColumnMetadata {
2238                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2239                semantic_type: SemanticType::Tag,
2240                column_id: 1,
2241            })
2242            .push_column_metadata(ColumnMetadata {
2243                column_schema: ColumnSchema::new(
2244                    "ts",
2245                    ConcreteDataType::timestamp_millisecond_datatype(),
2246                    false,
2247                ),
2248                semantic_type: SemanticType::Timestamp,
2249                column_id: 2,
2250            })
2251            .push_column_metadata(ColumnMetadata {
2252                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2253                semantic_type: SemanticType::Field,
2254                column_id: 3,
2255            })
2256            .push_column_metadata(ColumnMetadata {
2257                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2258                semantic_type: SemanticType::Field,
2259                column_id: 4,
2260            })
2261            .primary_key(vec![0, 1])
2262            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2263        let metadata = Arc::new(builder.build().unwrap());
2264
2265        let primary_key_codec = build_primary_key_codec(&metadata);
2266        let schema = to_flat_sst_arrow_schema(
2267            &metadata,
2268            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2269        );
2270
2271        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2272        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2273
2274        let capacity = 100;
2275        let mut converter =
2276            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2277
2278        let key_values1 = build_key_values_with_sparse_encoding(
2279            &metadata,
2280            &primary_key_codec,
2281            2048u32, // table_id
2282            100u64,  // tsid
2283            "key11".to_string(),
2284            "key21".to_string(),
2285            vec![1000, 2000].into_iter(),
2286            vec![Some(1.0), Some(2.0)].into_iter(),
2287            1,
2288        );
2289
2290        let key_values2 = build_key_values_with_sparse_encoding(
2291            &metadata,
2292            &primary_key_codec,
2293            4096u32, // table_id
2294            200u64,  // tsid
2295            "key12".to_string(),
2296            "key22".to_string(),
2297            vec![1500].into_iter(),
2298            vec![Some(3.0)].into_iter(),
2299            2,
2300        );
2301
2302        converter.append_key_values(&key_values1).unwrap();
2303        converter.append_key_values(&key_values2).unwrap();
2304
2305        let bulk_part = converter.convert().unwrap();
2306
2307        assert_eq!(bulk_part.num_rows(), 3);
2308        assert_eq!(bulk_part.min_timestamp, 1000);
2309        assert_eq!(bulk_part.max_timestamp, 2000);
2310        assert_eq!(bulk_part.sequence, 2);
2311        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2312
2313        // For sparse encoding, primary key columns should NOT be stored individually
2314        // even when store_primary_key_columns is true, because sparse encoding
2315        // stores the encoded primary key in the __primary_key column
2316        let schema = bulk_part.batch.schema();
2317        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2318        assert_eq!(
2319            field_names,
2320            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2321        );
2322
2323        // Verify the __primary_key column contains encoded sparse keys
2324        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2325        let dict_array = primary_key_column
2326            .as_any()
2327            .downcast_ref::<DictionaryArray<UInt32Type>>()
2328            .unwrap();
2329
2330        // Should have non-zero entries indicating encoded primary keys
2331        assert!(!dict_array.is_empty());
2332        assert_eq!(dict_array.len(), 3); // 3 rows total
2333
2334        // Verify values are properly encoded binary data (not empty)
2335        let values = dict_array
2336            .values()
2337            .as_any()
2338            .downcast_ref::<BinaryArray>()
2339            .unwrap();
2340        for i in 0..values.len() {
2341            assert!(
2342                !values.value(i).is_empty(),
2343                "Encoded primary key should not be empty"
2344            );
2345        }
2346    }
2347
2348    #[test]
2349    fn test_convert_bulk_part_empty() {
2350        let metadata = metadata_for_test();
2351        let schema = to_flat_sst_arrow_schema(
2352            &metadata,
2353            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2354        );
2355        let primary_key_codec = build_primary_key_codec(&metadata);
2356
2357        // Create empty batch
2358        let empty_batch = RecordBatch::new_empty(schema.clone());
2359        let empty_part = BulkPart {
2360            batch: empty_batch,
2361            max_timestamp: 0,
2362            min_timestamp: 0,
2363            sequence: 0,
2364            timestamp_index: 0,
2365            raw_data: None,
2366        };
2367
2368        let result =
2369            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2370        assert!(result.is_none());
2371    }
2372
2373    #[test]
2374    fn test_convert_bulk_part_dense_with_pk_columns() {
2375        let metadata = metadata_for_test();
2376        let primary_key_codec = build_primary_key_codec(&metadata);
2377
2378        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2379            "key1", "key2", "key1",
2380        ]));
2381        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2382        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2383        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2384        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2385
2386        let input_schema = Arc::new(Schema::new(vec![
2387            Field::new("k0", ArrowDataType::Utf8, false),
2388            Field::new("k1", ArrowDataType::UInt32, false),
2389            Field::new("v0", ArrowDataType::Int64, true),
2390            Field::new("v1", ArrowDataType::Float64, true),
2391            Field::new(
2392                "ts",
2393                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2394                false,
2395            ),
2396        ]));
2397
2398        let input_batch = RecordBatch::try_new(
2399            input_schema,
2400            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2401        )
2402        .unwrap();
2403
2404        let part = BulkPart {
2405            batch: input_batch,
2406            max_timestamp: 2000,
2407            min_timestamp: 1000,
2408            sequence: 5,
2409            timestamp_index: 4,
2410            raw_data: None,
2411        };
2412
2413        let output_schema = to_flat_sst_arrow_schema(
2414            &metadata,
2415            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2416        );
2417
2418        let result = convert_bulk_part(
2419            part,
2420            &metadata,
2421            primary_key_codec,
2422            output_schema,
2423            true, // store primary key columns
2424        )
2425        .unwrap();
2426
2427        let converted = result.unwrap();
2428
2429        assert_eq!(converted.num_rows(), 3);
2430        assert_eq!(converted.max_timestamp, 2000);
2431        assert_eq!(converted.min_timestamp, 1000);
2432        assert_eq!(converted.sequence, 5);
2433
2434        let schema = converted.batch.schema();
2435        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2436        assert_eq!(
2437            field_names,
2438            vec![
2439                "k0",
2440                "k1",
2441                "v0",
2442                "v1",
2443                "ts",
2444                "__primary_key",
2445                "__sequence",
2446                "__op_type"
2447            ]
2448        );
2449
2450        let k0_col = converted.batch.column_by_name("k0").unwrap();
2451        assert!(matches!(
2452            k0_col.data_type(),
2453            ArrowDataType::Dictionary(_, _)
2454        ));
2455
2456        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2457        let dict_array = pk_col
2458            .as_any()
2459            .downcast_ref::<DictionaryArray<UInt32Type>>()
2460            .unwrap();
2461        let keys = dict_array.keys();
2462
2463        assert_eq!(keys.len(), 3);
2464    }
2465
2466    #[test]
2467    fn test_convert_bulk_part_dense_without_pk_columns() {
2468        let metadata = metadata_for_test();
2469        let primary_key_codec = build_primary_key_codec(&metadata);
2470
2471        // Create input batch with primary key columns (k0, k1)
2472        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2473        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2474        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2475        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2476        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2477
2478        let input_schema = Arc::new(Schema::new(vec![
2479            Field::new("k0", ArrowDataType::Utf8, false),
2480            Field::new("k1", ArrowDataType::UInt32, false),
2481            Field::new("v0", ArrowDataType::Int64, true),
2482            Field::new("v1", ArrowDataType::Float64, true),
2483            Field::new(
2484                "ts",
2485                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2486                false,
2487            ),
2488        ]));
2489
2490        let input_batch = RecordBatch::try_new(
2491            input_schema,
2492            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2493        )
2494        .unwrap();
2495
2496        let part = BulkPart {
2497            batch: input_batch,
2498            max_timestamp: 2000,
2499            min_timestamp: 1000,
2500            sequence: 3,
2501            timestamp_index: 4,
2502            raw_data: None,
2503        };
2504
2505        let output_schema = to_flat_sst_arrow_schema(
2506            &metadata,
2507            &FlatSchemaOptions {
2508                raw_pk_columns: false,
2509                string_pk_use_dict: true,
2510                ..Default::default()
2511            },
2512        );
2513
2514        let result = convert_bulk_part(
2515            part,
2516            &metadata,
2517            primary_key_codec,
2518            output_schema,
2519            false, // don't store primary key columns
2520        )
2521        .unwrap();
2522
2523        let converted = result.unwrap();
2524
2525        assert_eq!(converted.num_rows(), 2);
2526        assert_eq!(converted.max_timestamp, 2000);
2527        assert_eq!(converted.min_timestamp, 1000);
2528        assert_eq!(converted.sequence, 3);
2529
2530        // Verify schema does NOT include individual primary key columns
2531        let schema = converted.batch.schema();
2532        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2533        assert_eq!(
2534            field_names,
2535            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2536        );
2537
2538        // Verify __primary_key column is present and is a dictionary
2539        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2540        assert!(matches!(
2541            pk_col.data_type(),
2542            ArrowDataType::Dictionary(_, _)
2543        ));
2544    }
2545
2546    #[test]
2547    fn test_convert_bulk_part_sparse_encoding() {
2548        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2549        builder
2550            .push_column_metadata(ColumnMetadata {
2551                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2552                semantic_type: SemanticType::Tag,
2553                column_id: 0,
2554            })
2555            .push_column_metadata(ColumnMetadata {
2556                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2557                semantic_type: SemanticType::Tag,
2558                column_id: 1,
2559            })
2560            .push_column_metadata(ColumnMetadata {
2561                column_schema: ColumnSchema::new(
2562                    "ts",
2563                    ConcreteDataType::timestamp_millisecond_datatype(),
2564                    false,
2565                ),
2566                semantic_type: SemanticType::Timestamp,
2567                column_id: 2,
2568            })
2569            .push_column_metadata(ColumnMetadata {
2570                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2571                semantic_type: SemanticType::Field,
2572                column_id: 3,
2573            })
2574            .push_column_metadata(ColumnMetadata {
2575                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2576                semantic_type: SemanticType::Field,
2577                column_id: 4,
2578            })
2579            .primary_key(vec![0, 1])
2580            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2581        let metadata = Arc::new(builder.build().unwrap());
2582
2583        let primary_key_codec = build_primary_key_codec(&metadata);
2584
2585        // Create input batch with __primary_key column (sparse encoding)
2586        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2587            b"encoded_key_1".as_slice(),
2588            b"encoded_key_2".as_slice(),
2589        ]));
2590        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2591        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2592        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2593
2594        let input_schema = Arc::new(Schema::new(vec![
2595            Field::new("__primary_key", ArrowDataType::Binary, false),
2596            Field::new("v0", ArrowDataType::Int64, true),
2597            Field::new("v1", ArrowDataType::Float64, true),
2598            Field::new(
2599                "ts",
2600                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2601                false,
2602            ),
2603        ]));
2604
2605        let input_batch =
2606            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2607                .unwrap();
2608
2609        let part = BulkPart {
2610            batch: input_batch,
2611            max_timestamp: 2000,
2612            min_timestamp: 1000,
2613            sequence: 7,
2614            timestamp_index: 3,
2615            raw_data: None,
2616        };
2617
2618        let output_schema = to_flat_sst_arrow_schema(
2619            &metadata,
2620            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2621        );
2622
2623        let result = convert_bulk_part(
2624            part,
2625            &metadata,
2626            primary_key_codec,
2627            output_schema,
2628            true, // store_primary_key_columns (ignored for sparse)
2629        )
2630        .unwrap();
2631
2632        let converted = result.unwrap();
2633
2634        assert_eq!(converted.num_rows(), 2);
2635        assert_eq!(converted.max_timestamp, 2000);
2636        assert_eq!(converted.min_timestamp, 1000);
2637        assert_eq!(converted.sequence, 7);
2638
2639        // Verify schema does NOT include individual primary key columns (sparse encoding)
2640        let schema = converted.batch.schema();
2641        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2642        assert_eq!(
2643            field_names,
2644            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2645        );
2646
2647        // Verify __primary_key is dictionary encoded
2648        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2649        assert!(matches!(
2650            pk_col.data_type(),
2651            ArrowDataType::Dictionary(_, _)
2652        ));
2653    }
2654
2655    #[test]
2656    fn test_convert_bulk_part_sorting_with_multiple_series() {
2657        let metadata = metadata_for_test();
2658        let primary_key_codec = build_primary_key_codec(&metadata);
2659
2660        // Create unsorted batch with multiple series and timestamps
2661        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2662            "series_b", "series_a", "series_b", "series_a",
2663        ]));
2664        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2665        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2666        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2667        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2668            2000, 1000, 4000, 3000,
2669        ]));
2670
2671        let input_schema = Arc::new(Schema::new(vec![
2672            Field::new("k0", ArrowDataType::Utf8, false),
2673            Field::new("k1", ArrowDataType::UInt32, false),
2674            Field::new("v0", ArrowDataType::Int64, true),
2675            Field::new("v1", ArrowDataType::Float64, true),
2676            Field::new(
2677                "ts",
2678                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2679                false,
2680            ),
2681        ]));
2682
2683        let input_batch = RecordBatch::try_new(
2684            input_schema,
2685            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2686        )
2687        .unwrap();
2688
2689        let part = BulkPart {
2690            batch: input_batch,
2691            max_timestamp: 4000,
2692            min_timestamp: 1000,
2693            sequence: 10,
2694            timestamp_index: 4,
2695            raw_data: None,
2696        };
2697
2698        let output_schema = to_flat_sst_arrow_schema(
2699            &metadata,
2700            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2701        );
2702
2703        let result =
2704            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2705
2706        let converted = result.unwrap();
2707
2708        assert_eq!(converted.num_rows(), 4);
2709
2710        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2711        let ts_col = converted.batch.column(converted.timestamp_index);
2712        let ts_array = ts_col
2713            .as_any()
2714            .downcast_ref::<TimestampMillisecondArray>()
2715            .unwrap();
2716
2717        // After sorting by (pk, ts), we should have:
2718        // series_a,1: ts=1000, 3000
2719        // series_b,2: ts=2000, 4000
2720        let timestamps: Vec<i64> = ts_array.values().to_vec();
2721        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2722    }
2723
2724    /// Helper to create a converted BulkPart (with __primary_key column) from MutationInputs.
2725    fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2726        let metadata = metadata_for_test();
2727        let kvs = inputs
2728            .iter()
2729            .map(|m| {
2730                build_key_values_with_ts_seq_values(
2731                    &metadata,
2732                    m.k0.to_string(),
2733                    m.k1,
2734                    m.timestamps.iter().copied(),
2735                    m.v1.iter().copied(),
2736                    m.sequence,
2737                )
2738            })
2739            .collect::<Vec<_>>();
2740        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2741        let primary_key_codec = build_primary_key_codec(&metadata);
2742        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2743        for kv in kvs {
2744            converter.append_key_values(&kv).unwrap();
2745        }
2746        converter.convert().unwrap()
2747    }
2748
2749    /// Helper to create a MultiBulkPart where each group becomes a separate batch.
2750    fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2751        let metadata = metadata_for_test();
2752        let mut all_batches = Vec::new();
2753        let mut min_ts = i64::MAX;
2754        let mut max_ts = i64::MIN;
2755        let mut max_seq = 0u64;
2756
2757        for inputs in groups {
2758            let part = build_converted_bulk_part(inputs);
2759            min_ts = min_ts.min(part.min_timestamp);
2760            max_ts = max_ts.max(part.max_timestamp);
2761            max_seq = max_seq.max(part.sequence);
2762            all_batches.push(part.batch);
2763        }
2764
2765        let multi = MultiBulkPart::new(
2766            all_batches,
2767            min_ts,
2768            max_ts,
2769            max_seq,
2770            groups.len(),
2771            &metadata,
2772        );
2773        (multi, metadata)
2774    }
2775
2776    #[test]
2777    fn test_multi_bulk_part_prune_batches() {
2778        // Three batches with distinct k0 ranges: ["a"], ["m"], ["z"].
2779        let (multi, metadata) = build_multi_bulk_part(&[
2780            &[MutationInput {
2781                k0: "a",
2782                k1: 0,
2783                timestamps: &[1, 2],
2784                v1: &[Some(1.0), Some(2.0)],
2785                sequence: 0,
2786            }],
2787            &[MutationInput {
2788                k0: "m",
2789                k1: 0,
2790                timestamps: &[3, 4],
2791                v1: &[Some(3.0), Some(4.0)],
2792                sequence: 1,
2793            }],
2794            &[MutationInput {
2795                k0: "z",
2796                k1: 0,
2797                timestamps: &[5, 6],
2798                v1: &[Some(5.0), Some(6.0)],
2799                sequence: 2,
2800            }],
2801        ]);
2802        assert_eq!(multi.num_rows(), 6);
2803        assert_eq!(multi.num_batches(), 3);
2804
2805        // k0 = "m" => only middle batch (2 rows).
2806        let context = Arc::new(
2807            BulkIterContext::new(
2808                metadata.clone(),
2809                None,
2810                Some(Predicate::new(vec![
2811                    datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2812                ])),
2813                false,
2814            )
2815            .unwrap(),
2816        );
2817        let reader = multi
2818            .read(context, None, None)
2819            .unwrap()
2820            .expect("should have results");
2821        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2822        assert_eq!(total_rows, 2);
2823
2824        // k0 = "nonexistent" => all pruned, returns None.
2825        let context = Arc::new(
2826            BulkIterContext::new(
2827                metadata.clone(),
2828                None,
2829                Some(Predicate::new(vec![
2830                    datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2831                ])),
2832                false,
2833            )
2834            .unwrap(),
2835        );
2836        assert!(multi.read(context, None, None).unwrap().is_none());
2837
2838        // No predicate => all 6 rows.
2839        let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2840        let reader = multi
2841            .read(context, None, None)
2842            .unwrap()
2843            .expect("should have results");
2844        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2845        assert_eq!(total_rows, 6);
2846    }
2847}