Skip to main content

mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datatypes::arrow;
29use datatypes::arrow::array::{Array, ArrayRef, StringDictionaryBuilder, UInt8Array, UInt64Array};
30use datatypes::arrow::compute::{SortColumn, SortOptions};
31use datatypes::arrow::datatypes::{
32    DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
33};
34use datatypes::data_type::DataType;
35use datatypes::prelude::{MutableVector, Vector};
36use datatypes::value::ValueRef;
37use datatypes::vectors::Helper;
38use mito_codec::key_values::{KeyValue, KeyValues};
39use mito_codec::row_converter::PrimaryKeyCodec;
40use parquet::arrow::ArrowWriter;
41use parquet::basic::{Compression, ZstdLevel};
42use parquet::file::metadata::ParquetMetaData;
43use parquet::file::properties::WriterProperties;
44use smallvec::SmallVec;
45use snafu::{OptionExt, ResultExt};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
49use store_api::storage::{FileId, SequenceNumber, SequenceRange};
50
51use crate::error::{
52    self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
53    EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
54    NewRecordBatchSnafu, Result,
55};
56use crate::memtable::bulk::context::BulkIterContextRef;
57use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
58use crate::memtable::time_series::{ValueBuilder, Values};
59use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
60use crate::sst::SeriesEstimator;
61use crate::sst::index::IndexOutput;
62use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
63use crate::sst::parquet::flat_format::primary_key_column_index;
64use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
65use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
66
67const INIT_DICT_VALUE_CAPACITY: usize = 8;
68
69/// A raw bulk part in the memtable.
70#[derive(Clone)]
71pub struct BulkPart {
72    pub batch: RecordBatch,
73    pub max_timestamp: i64,
74    pub min_timestamp: i64,
75    pub sequence: u64,
76    pub timestamp_index: usize,
77    pub raw_data: Option<ArrowIpc>,
78}
79
80impl TryFrom<BulkWalEntry> for BulkPart {
81    type Error = error::Error;
82
83    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
84        match value.body.expect("Entry payload should be present") {
85            Body::ArrowIpc(ipc) => {
86                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
87                    .context(error::ConvertBulkWalEntrySnafu)?;
88                let batch = decoder
89                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
90                    .context(error::ConvertBulkWalEntrySnafu)?;
91                Ok(Self {
92                    batch,
93                    max_timestamp: value.max_ts,
94                    min_timestamp: value.min_ts,
95                    sequence: value.sequence,
96                    timestamp_index: value.timestamp_index as usize,
97                    raw_data: Some(ipc),
98                })
99            }
100        }
101    }
102}
103
104impl TryFrom<&BulkPart> for BulkWalEntry {
105    type Error = error::Error;
106
107    fn try_from(value: &BulkPart) -> Result<Self> {
108        if let Some(ipc) = &value.raw_data {
109            Ok(BulkWalEntry {
110                sequence: value.sequence,
111                max_ts: value.max_timestamp,
112                min_ts: value.min_timestamp,
113                timestamp_index: value.timestamp_index as u32,
114                body: Some(Body::ArrowIpc(ipc.clone())),
115            })
116        } else {
117            let mut encoder = FlightEncoder::default();
118            let schema_bytes = encoder
119                .encode_schema(value.batch.schema().as_ref())
120                .data_header;
121            let [rb_data] = encoder
122                .encode(FlightMessage::RecordBatch(value.batch.clone()))
123                .try_into()
124                .map_err(|_| {
125                    error::UnsupportedOperationSnafu {
126                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
127                    }
128                    .build()
129                })?;
130            Ok(BulkWalEntry {
131                sequence: value.sequence,
132                max_ts: value.max_timestamp,
133                min_ts: value.min_timestamp,
134                timestamp_index: value.timestamp_index as u32,
135                body: Some(Body::ArrowIpc(ArrowIpc {
136                    schema: schema_bytes,
137                    data_header: rb_data.data_header,
138                    payload: rb_data.data_body,
139                })),
140            })
141        }
142    }
143}
144
145impl BulkPart {
146    pub(crate) fn estimated_size(&self) -> usize {
147        record_batch_estimated_size(&self.batch)
148    }
149
150    /// Returns the estimated series count in this BulkPart.
151    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
152    pub fn estimated_series_count(&self) -> usize {
153        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
154        let pk_column = self.batch.column(pk_column_idx);
155        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
156            dict_array.values().len()
157        } else {
158            0
159        }
160    }
161
162    /// Creates MemtableStats from this BulkPart.
163    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
164        let ts_type = region_metadata.time_index_type();
165        let min_ts = ts_type.create_timestamp(self.min_timestamp);
166        let max_ts = ts_type.create_timestamp(self.max_timestamp);
167
168        MemtableStats {
169            estimated_bytes: self.estimated_size(),
170            time_range: Some((min_ts, max_ts)),
171            num_rows: self.num_rows(),
172            num_ranges: 1,
173            max_sequence: self.sequence,
174            series_count: self.estimated_series_count(),
175        }
176    }
177
178    /// Fills missing columns in the BulkPart batch with default values.
179    ///
180    /// This function checks if the batch schema matches the region metadata schema,
181    /// and if there are missing columns, it fills them with default values (or null
182    /// for nullable columns).
183    ///
184    /// # Arguments
185    ///
186    /// * `region_metadata` - The region metadata containing the expected schema
187    pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
188        // Builds a map of existing columns in the batch
189        let batch_schema = self.batch.schema();
190        let batch_columns: HashSet<_> = batch_schema
191            .fields()
192            .iter()
193            .map(|f| f.name().as_str())
194            .collect();
195
196        // Finds columns that need to be filled
197        let mut columns_to_fill = Vec::new();
198        for column_meta in &region_metadata.column_metadatas {
199            // TODO(yingwen): Returns error if it is impure default after we support filling
200            // bulk insert request in the frontend
201            if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
202                columns_to_fill.push(column_meta);
203            }
204        }
205
206        if columns_to_fill.is_empty() {
207            return Ok(());
208        }
209
210        let num_rows = self.batch.num_rows();
211
212        let mut new_columns = Vec::new();
213        let mut new_fields = Vec::new();
214
215        // First, adds all existing columns
216        new_fields.extend(batch_schema.fields().iter().cloned());
217        new_columns.extend_from_slice(self.batch.columns());
218
219        let region_id = region_metadata.region_id;
220        // Then adds the missing columns with default values
221        for column_meta in columns_to_fill {
222            let default_vector = column_meta
223                .column_schema
224                .create_default_vector(num_rows)
225                .context(CreateDefaultSnafu {
226                    region_id,
227                    column: &column_meta.column_schema.name,
228                })?
229                .with_context(|| InvalidRequestSnafu {
230                    region_id,
231                    reason: format!(
232                        "column {} does not have default value",
233                        column_meta.column_schema.name
234                    ),
235                })?;
236            let arrow_array = default_vector.to_arrow_array();
237            column_meta.column_schema.data_type.as_arrow_type();
238
239            new_fields.push(Arc::new(Field::new(
240                column_meta.column_schema.name.clone(),
241                column_meta.column_schema.data_type.as_arrow_type(),
242                column_meta.column_schema.is_nullable(),
243            )));
244            new_columns.push(arrow_array);
245        }
246
247        // Create a new schema and batch with the filled columns
248        let new_schema = Arc::new(Schema::new(new_fields));
249        let new_batch =
250            RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
251
252        // Update the batch
253        self.batch = new_batch;
254
255        Ok(())
256    }
257
258    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
259    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
260        let vectors = region_metadata
261            .schema
262            .column_schemas()
263            .iter()
264            .map(|col| match self.batch.column_by_name(&col.name) {
265                None => Ok(None),
266                Some(col) => Helper::try_into_vector(col).map(Some),
267            })
268            .collect::<datatypes::error::Result<Vec<_>>>()
269            .context(error::ComputeVectorSnafu)?;
270
271        let rows = (0..self.num_rows())
272            .map(|row_idx| {
273                let values = (0..self.batch.num_columns())
274                    .map(|col_idx| {
275                        if let Some(v) = &vectors[col_idx] {
276                            to_grpc_value(v.get(row_idx))
277                        } else {
278                            api::v1::Value { value_data: None }
279                        }
280                    })
281                    .collect::<Vec<_>>();
282                api::v1::Row { values }
283            })
284            .collect::<Vec<_>>();
285
286        let schema = region_metadata
287            .column_metadatas
288            .iter()
289            .map(|c| {
290                let data_type_wrapper =
291                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
292                Ok(api::v1::ColumnSchema {
293                    column_name: c.column_schema.name.clone(),
294                    datatype: data_type_wrapper.datatype() as i32,
295                    semantic_type: c.semantic_type as i32,
296                    ..Default::default()
297                })
298            })
299            .collect::<api::error::Result<Vec<_>>>()
300            .context(error::ConvertColumnDataTypeSnafu {
301                reason: "failed to convert region metadata to column schema",
302            })?;
303
304        let rows = api::v1::Rows { schema, rows };
305
306        Ok(Mutation {
307            op_type: OpType::Put as i32,
308            sequence: self.sequence,
309            rows: Some(rows),
310            write_hint: None,
311        })
312    }
313
314    pub fn timestamps(&self) -> &ArrayRef {
315        self.batch.column(self.timestamp_index)
316    }
317
318    pub fn num_rows(&self) -> usize {
319        self.batch.num_rows()
320    }
321}
322
323/// A collection of small unordered bulk parts.
324/// Used to batch small parts together before merging them into a sorted part.
325pub struct UnorderedPart {
326    /// Small bulk parts that haven't been sorted yet.
327    parts: Vec<BulkPart>,
328    /// Total number of rows across all parts.
329    total_rows: usize,
330    /// Minimum timestamp across all parts.
331    min_timestamp: i64,
332    /// Maximum timestamp across all parts.
333    max_timestamp: i64,
334    /// Maximum sequence number across all parts.
335    max_sequence: u64,
336    /// Row count threshold for accepting parts (default: 1024).
337    threshold: usize,
338    /// Row count threshold for compacting (default: 4096).
339    compact_threshold: usize,
340}
341
342impl Default for UnorderedPart {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348impl UnorderedPart {
349    /// Creates a new empty UnorderedPart.
350    pub fn new() -> Self {
351        Self {
352            parts: Vec::new(),
353            total_rows: 0,
354            min_timestamp: i64::MAX,
355            max_timestamp: i64::MIN,
356            max_sequence: 0,
357            threshold: 1024,
358            compact_threshold: 4096,
359        }
360    }
361
362    /// Sets the threshold for accepting parts into unordered_part.
363    pub fn set_threshold(&mut self, threshold: usize) {
364        self.threshold = threshold;
365    }
366
367    /// Sets the threshold for compacting unordered_part.
368    pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
369        self.compact_threshold = compact_threshold;
370    }
371
372    /// Returns the threshold for accepting parts.
373    pub fn threshold(&self) -> usize {
374        self.threshold
375    }
376
377    /// Returns the compact threshold.
378    pub fn compact_threshold(&self) -> usize {
379        self.compact_threshold
380    }
381
382    /// Returns true if this part should accept the given row count.
383    pub fn should_accept(&self, num_rows: usize) -> bool {
384        num_rows < self.threshold
385    }
386
387    /// Returns true if this part should be compacted.
388    pub fn should_compact(&self) -> bool {
389        self.total_rows >= self.compact_threshold
390    }
391
392    /// Adds a BulkPart to this unordered collection.
393    pub fn push(&mut self, part: BulkPart) {
394        self.total_rows += part.num_rows();
395        self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
396        self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
397        self.max_sequence = self.max_sequence.max(part.sequence);
398        self.parts.push(part);
399    }
400
401    /// Returns the total number of rows across all parts.
402    pub fn num_rows(&self) -> usize {
403        self.total_rows
404    }
405
406    /// Returns true if there are no parts.
407    pub fn is_empty(&self) -> bool {
408        self.parts.is_empty()
409    }
410
411    /// Returns the number of parts in this collection.
412    pub fn num_parts(&self) -> usize {
413        self.parts.len()
414    }
415
416    /// Concatenates and sorts all parts into a single RecordBatch.
417    /// Returns None if the collection is empty.
418    pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
419        if self.parts.is_empty() {
420            return Ok(None);
421        }
422
423        if self.parts.len() == 1 {
424            // If there's only one part, return its batch directly
425            return Ok(Some(self.parts[0].batch.clone()));
426        }
427
428        // Get the schema from the first part
429        let schema = self.parts[0].batch.schema();
430
431        // Concatenate all record batches
432        let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
433        let concatenated =
434            arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
435
436        // Sort the concatenated batch
437        let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
438
439        Ok(Some(sorted_batch))
440    }
441
442    /// Converts all parts into a single sorted BulkPart.
443    /// Returns None if the collection is empty.
444    pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
445        let Some(sorted_batch) = self.concat_and_sort()? else {
446            return Ok(None);
447        };
448
449        let timestamp_index = self.parts[0].timestamp_index;
450
451        Ok(Some(BulkPart {
452            batch: sorted_batch,
453            max_timestamp: self.max_timestamp,
454            min_timestamp: self.min_timestamp,
455            sequence: self.max_sequence,
456            timestamp_index,
457            raw_data: None,
458        }))
459    }
460
461    /// Clears all parts from this collection.
462    pub fn clear(&mut self) {
463        self.parts.clear();
464        self.total_rows = 0;
465        self.min_timestamp = i64::MAX;
466        self.max_timestamp = i64::MIN;
467        self.max_sequence = 0;
468    }
469}
470
471/// More accurate estimation of the size of a record batch.
472pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
473    batch
474        .columns()
475        .iter()
476        // If can not get slice memory size, assume 0 here.
477        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
478        .sum()
479}
480
481/// Primary key column builder for handling strings specially.
482enum PrimaryKeyColumnBuilder {
483    /// String dictionary builder for string types.
484    StringDict(StringDictionaryBuilder<UInt32Type>),
485    /// Generic mutable vector for other types.
486    Vector(Box<dyn MutableVector>),
487}
488
489impl PrimaryKeyColumnBuilder {
490    /// Appends a value to the builder.
491    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
492        match self {
493            PrimaryKeyColumnBuilder::StringDict(builder) => {
494                if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
495                    // We know the value is a string.
496                    builder.append_value(s);
497                } else {
498                    builder.append_null();
499                }
500            }
501            PrimaryKeyColumnBuilder::Vector(builder) => {
502                builder.push_value_ref(&value);
503            }
504        }
505        Ok(())
506    }
507
508    /// Converts the builder to an ArrayRef.
509    fn into_arrow_array(self) -> ArrayRef {
510        match self {
511            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
512            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
513        }
514    }
515}
516
517/// Converter that converts structs into [BulkPart].
518pub struct BulkPartConverter {
519    /// Schema of the converted batch.
520    schema: SchemaRef,
521    /// Primary key codec for encoding keys
522    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
523    /// Buffer for encoding primary key.
524    key_buf: Vec<u8>,
525    /// Primary key array builder.
526    key_array_builder: PrimaryKeyArrayBuilder,
527    /// Builders for non-primary key columns.
528    value_builder: ValueBuilder,
529    /// Builders for individual primary key columns.
530    /// The order of builders is the same as the order of primary key columns in the region metadata.
531    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
532
533    /// Max timestamp value.
534    max_ts: i64,
535    /// Min timestamp value.
536    min_ts: i64,
537    /// Max sequence number.
538    max_sequence: SequenceNumber,
539}
540
541impl BulkPartConverter {
542    /// Creates a new converter.
543    ///
544    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
545    /// stores primary key columns in arrays additionally.
546    pub fn new(
547        region_metadata: &RegionMetadataRef,
548        schema: SchemaRef,
549        capacity: usize,
550        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
551        store_primary_key_columns: bool,
552    ) -> Self {
553        debug_assert_eq!(
554            region_metadata.primary_key_encoding,
555            primary_key_codec.encoding()
556        );
557
558        let primary_key_column_builders = if store_primary_key_columns
559            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
560        {
561            new_primary_key_column_builders(region_metadata, capacity)
562        } else {
563            Vec::new()
564        };
565
566        Self {
567            schema,
568            primary_key_codec,
569            key_buf: Vec::new(),
570            key_array_builder: PrimaryKeyArrayBuilder::new(),
571            value_builder: ValueBuilder::new(region_metadata, capacity),
572            primary_key_column_builders,
573            min_ts: i64::MAX,
574            max_ts: i64::MIN,
575            max_sequence: SequenceNumber::MIN,
576        }
577    }
578
579    /// Appends a [KeyValues] into the converter.
580    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
581        for kv in key_values.iter() {
582            self.append_key_value(&kv)?;
583        }
584
585        Ok(())
586    }
587
588    /// Appends a [KeyValue] to builders.
589    ///
590    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
591    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
592        // Handles primary key based on encoding type
593        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
594            // For sparse encoding, the primary key is already encoded in the KeyValue
595            // Gets the first (and only) primary key value which contains the encoded key
596            let mut primary_keys = kv.primary_keys();
597            if let Some(encoded) = primary_keys
598                .next()
599                .context(ColumnNotFoundSnafu {
600                    column: PRIMARY_KEY_COLUMN_NAME,
601                })?
602                .try_into_binary()
603                .context(DataTypeMismatchSnafu)?
604            {
605                self.key_array_builder
606                    .append(encoded)
607                    .context(ComputeArrowSnafu)?;
608            } else {
609                self.key_array_builder
610                    .append("")
611                    .context(ComputeArrowSnafu)?;
612            }
613        } else {
614            // For dense encoding, we need to encode the primary key columns
615            self.key_buf.clear();
616            self.primary_key_codec
617                .encode_key_value(kv, &mut self.key_buf)
618                .context(EncodeSnafu)?;
619            self.key_array_builder
620                .append(&self.key_buf)
621                .context(ComputeArrowSnafu)?;
622        };
623
624        // If storing primary key columns, append values to individual builders
625        if !self.primary_key_column_builders.is_empty() {
626            for (builder, pk_value) in self
627                .primary_key_column_builders
628                .iter_mut()
629                .zip(kv.primary_keys())
630            {
631                builder.push_value_ref(pk_value)?;
632            }
633        }
634
635        // Pushes other columns.
636        self.value_builder.push(
637            kv.timestamp(),
638            kv.sequence(),
639            kv.op_type() as u8,
640            kv.fields(),
641        );
642
643        // Updates statistics
644        // Safety: timestamp of kv must be both present and a valid timestamp value.
645        let ts = kv
646            .timestamp()
647            .try_into_timestamp()
648            .unwrap()
649            .unwrap()
650            .value();
651        self.min_ts = self.min_ts.min(ts);
652        self.max_ts = self.max_ts.max(ts);
653        self.max_sequence = self.max_sequence.max(kv.sequence());
654
655        Ok(())
656    }
657
658    /// Converts buffered content into a [BulkPart].
659    ///
660    /// It sorts the record batch by (primary key, timestamp, sequence desc).
661    pub fn convert(mut self) -> Result<BulkPart> {
662        let values = Values::from(self.value_builder);
663        let mut columns =
664            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
665
666        // Build primary key column arrays if enabled.
667        for builder in self.primary_key_column_builders {
668            columns.push(builder.into_arrow_array());
669        }
670        // Then fields columns.
671        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
672        // Time index.
673        let timestamp_index = columns.len();
674        columns.push(values.timestamp.to_arrow_array());
675        // Primary key.
676        let pk_array = self.key_array_builder.finish();
677        columns.push(Arc::new(pk_array));
678        // Sequence and op type.
679        columns.push(values.sequence.to_arrow_array());
680        columns.push(values.op_type.to_arrow_array());
681
682        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
683        // Sorts the record batch.
684        let batch = sort_primary_key_record_batch(&batch)?;
685
686        Ok(BulkPart {
687            batch,
688            max_timestamp: self.max_ts,
689            min_timestamp: self.min_ts,
690            sequence: self.max_sequence,
691            timestamp_index,
692            raw_data: None,
693        })
694    }
695}
696
697fn new_primary_key_column_builders(
698    metadata: &RegionMetadata,
699    capacity: usize,
700) -> Vec<PrimaryKeyColumnBuilder> {
701    metadata
702        .primary_key_columns()
703        .map(|col| {
704            if col.column_schema.data_type.is_string() {
705                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
706                    capacity,
707                    INIT_DICT_VALUE_CAPACITY,
708                    capacity,
709                ))
710            } else {
711                PrimaryKeyColumnBuilder::Vector(
712                    col.column_schema.data_type.create_mutable_vector(capacity),
713                )
714            }
715        })
716        .collect()
717}
718
719/// Sorts the record batch with primary key format.
720pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
721    let total_columns = batch.num_columns();
722    let sort_columns = vec![
723        // Primary key column (ascending)
724        SortColumn {
725            values: batch.column(total_columns - 3).clone(),
726            options: Some(SortOptions {
727                descending: false,
728                nulls_first: true,
729            }),
730        },
731        // Time index column (ascending)
732        SortColumn {
733            values: batch.column(total_columns - 4).clone(),
734            options: Some(SortOptions {
735                descending: false,
736                nulls_first: true,
737            }),
738        },
739        // Sequence column (descending)
740        SortColumn {
741            values: batch.column(total_columns - 2).clone(),
742            options: Some(SortOptions {
743                descending: true,
744                nulls_first: true,
745            }),
746        },
747    ];
748
749    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
750        .context(ComputeArrowSnafu)?;
751
752    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
753}
754
755/// Converts a `BulkPart` that is unordered and without encoded primary keys into a `BulkPart`
756/// with the same format as produced by [BulkPartConverter].
757///
758/// This function takes a `BulkPart` where:
759/// - For dense encoding: Primary key columns may be stored as individual columns
760/// - For sparse encoding: The `__primary_key` column should already be present with encoded keys
761/// - The batch may not be sorted
762///
763/// And produces a `BulkPart` where:
764/// - Primary key columns are optionally stored (depending on `store_primary_key_columns` and encoding)
765/// - An encoded `__primary_key` dictionary column is present
766/// - The batch is sorted by (primary_key, timestamp, sequence desc)
767///
768/// # Arguments
769///
770/// * `part` - The input `BulkPart` to convert
771/// * `region_metadata` - Region metadata containing schema information
772/// * `primary_key_codec` - Codec for encoding primary keys
773/// * `schema` - Target schema for the output batch
774/// * `store_primary_key_columns` - If true and encoding is not sparse, stores individual primary key columns
775///
776/// # Returns
777///
778/// Returns `None` if the input part has no rows, otherwise returns a new `BulkPart` with
779/// encoded primary keys and sorted data.
780pub fn convert_bulk_part(
781    part: BulkPart,
782    region_metadata: &RegionMetadataRef,
783    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
784    schema: SchemaRef,
785    store_primary_key_columns: bool,
786) -> Result<Option<BulkPart>> {
787    if part.num_rows() == 0 {
788        return Ok(None);
789    }
790
791    let num_rows = part.num_rows();
792    let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
793
794    // Builds a column name-to-index map for efficient lookups
795    let input_schema = part.batch.schema();
796    let column_indices: HashMap<&str, usize> = input_schema
797        .fields()
798        .iter()
799        .enumerate()
800        .map(|(idx, field)| (field.name().as_str(), idx))
801        .collect();
802
803    // Determines the structure of the input batch by looking up columns by name
804    let mut output_columns = Vec::new();
805
806    // Extracts primary key columns if we need to encode them (dense encoding)
807    let pk_array = if is_sparse {
808        // For sparse encoding, the input should already have the __primary_key column
809        // We need to find it in the input batch
810        None
811    } else {
812        // For dense encoding, extract and encode primary key columns by name
813        let pk_vectors: Result<Vec<_>> = region_metadata
814            .primary_key_columns()
815            .map(|col_meta| {
816                let col_idx = column_indices
817                    .get(col_meta.column_schema.name.as_str())
818                    .context(ColumnNotFoundSnafu {
819                        column: &col_meta.column_schema.name,
820                    })?;
821                let col = part.batch.column(*col_idx);
822                Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
823            })
824            .collect();
825        let pk_vectors = pk_vectors?;
826
827        let mut key_array_builder = PrimaryKeyArrayBuilder::new();
828        let mut encode_buf = Vec::new();
829
830        for row_idx in 0..num_rows {
831            encode_buf.clear();
832
833            // Collects primary key values with column IDs for this row
834            let pk_values_with_ids: Vec<_> = region_metadata
835                .primary_key
836                .iter()
837                .zip(pk_vectors.iter())
838                .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
839                .collect();
840
841            // Encodes the primary key
842            primary_key_codec
843                .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
844                .context(EncodeSnafu)?;
845
846            key_array_builder
847                .append(&encode_buf)
848                .context(ComputeArrowSnafu)?;
849        }
850
851        Some(key_array_builder.finish())
852    };
853
854    // Adds primary key columns if storing them (only for dense encoding)
855    if store_primary_key_columns && !is_sparse {
856        for col_meta in region_metadata.primary_key_columns() {
857            let col_idx = column_indices
858                .get(col_meta.column_schema.name.as_str())
859                .context(ColumnNotFoundSnafu {
860                    column: &col_meta.column_schema.name,
861                })?;
862            let col = part.batch.column(*col_idx);
863
864            // Converts to dictionary if needed for string types
865            let col = if col_meta.column_schema.data_type.is_string() {
866                let target_type = ArrowDataType::Dictionary(
867                    Box::new(ArrowDataType::UInt32),
868                    Box::new(ArrowDataType::Utf8),
869                );
870                arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
871            } else {
872                col.clone()
873            };
874            output_columns.push(col);
875        }
876    }
877
878    // Adds field columns
879    for col_meta in region_metadata.field_columns() {
880        let col_idx = column_indices
881            .get(col_meta.column_schema.name.as_str())
882            .context(ColumnNotFoundSnafu {
883                column: &col_meta.column_schema.name,
884            })?;
885        output_columns.push(part.batch.column(*col_idx).clone());
886    }
887
888    // Adds timestamp column
889    let new_timestamp_index = output_columns.len();
890    let ts_col_idx = column_indices
891        .get(
892            region_metadata
893                .time_index_column()
894                .column_schema
895                .name
896                .as_str(),
897        )
898        .context(ColumnNotFoundSnafu {
899            column: &region_metadata.time_index_column().column_schema.name,
900        })?;
901    output_columns.push(part.batch.column(*ts_col_idx).clone());
902
903    // Adds encoded primary key dictionary column
904    let pk_dictionary = if let Some(pk_dict_array) = pk_array {
905        Arc::new(pk_dict_array) as ArrayRef
906    } else {
907        let pk_col_idx =
908            column_indices
909                .get(PRIMARY_KEY_COLUMN_NAME)
910                .context(ColumnNotFoundSnafu {
911                    column: PRIMARY_KEY_COLUMN_NAME,
912                })?;
913        let col = part.batch.column(*pk_col_idx);
914
915        // Casts to dictionary type if needed
916        let target_type = ArrowDataType::Dictionary(
917            Box::new(ArrowDataType::UInt32),
918            Box::new(ArrowDataType::Binary),
919        );
920        arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
921    };
922    output_columns.push(pk_dictionary);
923
924    let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
925    output_columns.push(Arc::new(sequence_array) as ArrayRef);
926
927    let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
928    output_columns.push(Arc::new(op_type_array) as ArrayRef);
929
930    let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
931
932    // Sorts the batch by (primary_key, timestamp, sequence desc)
933    let sorted_batch = sort_primary_key_record_batch(&batch)?;
934
935    Ok(Some(BulkPart {
936        batch: sorted_batch,
937        max_timestamp: part.max_timestamp,
938        min_timestamp: part.min_timestamp,
939        sequence: part.sequence,
940        timestamp_index: new_timestamp_index,
941        raw_data: None,
942    }))
943}
944
945#[derive(Debug, Clone)]
946pub struct EncodedBulkPart {
947    data: Bytes,
948    metadata: BulkPartMeta,
949}
950
951impl EncodedBulkPart {
952    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
953        Self { data, metadata }
954    }
955
956    pub fn metadata(&self) -> &BulkPartMeta {
957        &self.metadata
958    }
959
960    /// Returns the size of the encoded data in bytes
961    pub(crate) fn size_bytes(&self) -> usize {
962        self.data.len()
963    }
964
965    /// Returns the encoded data.
966    pub fn data(&self) -> &Bytes {
967        &self.data
968    }
969
970    /// Creates MemtableStats from this EncodedBulkPart.
971    pub fn to_memtable_stats(&self) -> MemtableStats {
972        let meta = &self.metadata;
973        let ts_type = meta.region_metadata.time_index_type();
974        let min_ts = ts_type.create_timestamp(meta.min_timestamp);
975        let max_ts = ts_type.create_timestamp(meta.max_timestamp);
976
977        MemtableStats {
978            estimated_bytes: self.size_bytes(),
979            time_range: Some((min_ts, max_ts)),
980            num_rows: meta.num_rows,
981            num_ranges: 1,
982            max_sequence: meta.max_sequence,
983            series_count: meta.num_series as usize,
984        }
985    }
986
987    /// Converts this `EncodedBulkPart` to `SstInfo`.
988    ///
989    /// # Arguments
990    /// * `file_id` - The SST file ID to assign to this part
991    ///
992    /// # Returns
993    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
994    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
995        let unit = self.metadata.region_metadata.time_index_type().unit();
996        let max_row_group_uncompressed_size: u64 = self
997            .metadata
998            .parquet_metadata
999            .row_groups()
1000            .iter()
1001            .map(|rg| {
1002                rg.columns()
1003                    .iter()
1004                    .map(|c| c.uncompressed_size() as u64)
1005                    .sum::<u64>()
1006            })
1007            .max()
1008            .unwrap_or(0);
1009        SstInfo {
1010            file_id,
1011            time_range: (
1012                Timestamp::new(self.metadata.min_timestamp, unit),
1013                Timestamp::new(self.metadata.max_timestamp, unit),
1014            ),
1015            file_size: self.data.len() as u64,
1016            max_row_group_uncompressed_size,
1017            num_rows: self.metadata.num_rows,
1018            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1019            file_metadata: Some(self.metadata.parquet_metadata.clone()),
1020            index_metadata: IndexOutput::default(),
1021            num_series: self.metadata.num_series,
1022        }
1023    }
1024
1025    pub(crate) fn read(
1026        &self,
1027        context: BulkIterContextRef,
1028        sequence: Option<SequenceRange>,
1029        mem_scan_metrics: Option<MemScanMetrics>,
1030    ) -> Result<Option<BoxedRecordBatchIterator>> {
1031        // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
1032        let skip_fields_for_pruning =
1033            Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
1034
1035        // use predicate to find row groups to read.
1036        let row_groups_to_read =
1037            context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1038
1039        if row_groups_to_read.is_empty() {
1040            // All row groups are filtered.
1041            return Ok(None);
1042        }
1043
1044        let iter = EncodedBulkPartIter::try_new(
1045            self,
1046            context,
1047            row_groups_to_read,
1048            sequence,
1049            mem_scan_metrics,
1050        )?;
1051        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1052    }
1053
1054    /// Computes whether to skip field columns based on PreFilterMode.
1055    fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
1056        match pre_filter_mode {
1057            PreFilterMode::All => false,
1058            PreFilterMode::SkipFields => true,
1059            PreFilterMode::SkipFieldsOnDelete => {
1060                // Check if any row group contains delete op
1061                (0..parquet_meta.num_row_groups()).any(|rg_idx| {
1062                    row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
1063                })
1064            }
1065        }
1066    }
1067}
1068
1069// TODO(yingwen): max_sequence
1070#[derive(Debug, Clone)]
1071pub struct BulkPartMeta {
1072    /// Total rows in part.
1073    pub num_rows: usize,
1074    /// Max timestamp in part.
1075    pub max_timestamp: i64,
1076    /// Min timestamp in part.
1077    pub min_timestamp: i64,
1078    /// Part file metadata.
1079    pub parquet_metadata: Arc<ParquetMetaData>,
1080    /// Part region schema.
1081    pub region_metadata: RegionMetadataRef,
1082    /// Number of series.
1083    pub num_series: u64,
1084    /// Maximum sequence number in part.
1085    pub max_sequence: u64,
1086}
1087
1088/// Metrics for encoding a part.
1089#[derive(Default, Debug)]
1090pub struct BulkPartEncodeMetrics {
1091    /// Cost of iterating over the data.
1092    pub iter_cost: Duration,
1093    /// Cost of writing the data.
1094    pub write_cost: Duration,
1095    /// Size of data before encoding.
1096    pub raw_size: usize,
1097    /// Size of data after encoding.
1098    pub encoded_size: usize,
1099    /// Number of rows in part.
1100    pub num_rows: usize,
1101}
1102
1103pub struct BulkPartEncoder {
1104    metadata: RegionMetadataRef,
1105    writer_props: Option<WriterProperties>,
1106}
1107
1108impl BulkPartEncoder {
1109    pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1110        // TODO(yingwen): Skip arrow schema if needed.
1111        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1112        let key_value_meta =
1113            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1114
1115        // TODO(yingwen): Do we need compression?
1116        let writer_props = Some(
1117            WriterProperties::builder()
1118                .set_key_value_metadata(Some(vec![key_value_meta]))
1119                .set_write_batch_size(row_group_size)
1120                .set_max_row_group_size(row_group_size)
1121                .set_compression(Compression::ZSTD(ZstdLevel::default()))
1122                .set_column_index_truncate_length(None)
1123                .set_statistics_truncate_length(None)
1124                .build(),
1125        );
1126
1127        Ok(Self {
1128            metadata,
1129            writer_props,
1130        })
1131    }
1132}
1133
1134impl BulkPartEncoder {
1135    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
1136    pub fn encode_record_batch_iter(
1137        &self,
1138        iter: BoxedRecordBatchIterator,
1139        arrow_schema: SchemaRef,
1140        min_timestamp: i64,
1141        max_timestamp: i64,
1142        max_sequence: u64,
1143        metrics: &mut BulkPartEncodeMetrics,
1144    ) -> Result<Option<EncodedBulkPart>> {
1145        let mut buf = Vec::with_capacity(4096);
1146        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1147            .context(EncodeMemtableSnafu)?;
1148        let mut total_rows = 0;
1149        let mut series_estimator = SeriesEstimator::default();
1150
1151        // Process each batch from the iterator
1152        let mut iter_start = Instant::now();
1153        for batch_result in iter {
1154            metrics.iter_cost += iter_start.elapsed();
1155            let batch = batch_result?;
1156            if batch.num_rows() == 0 {
1157                continue;
1158            }
1159
1160            series_estimator.update_flat(&batch);
1161            metrics.raw_size += record_batch_estimated_size(&batch);
1162            let write_start = Instant::now();
1163            writer.write(&batch).context(EncodeMemtableSnafu)?;
1164            metrics.write_cost += write_start.elapsed();
1165            total_rows += batch.num_rows();
1166            iter_start = Instant::now();
1167        }
1168        metrics.iter_cost += iter_start.elapsed();
1169
1170        if total_rows == 0 {
1171            return Ok(None);
1172        }
1173
1174        let close_start = Instant::now();
1175        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1176        metrics.write_cost += close_start.elapsed();
1177        metrics.encoded_size += buf.len();
1178        metrics.num_rows += total_rows;
1179
1180        let buf = Bytes::from(buf);
1181        let parquet_metadata = Arc::new(file_metadata);
1182        let num_series = series_estimator.finish();
1183
1184        Ok(Some(EncodedBulkPart {
1185            data: buf,
1186            metadata: BulkPartMeta {
1187                num_rows: total_rows,
1188                max_timestamp,
1189                min_timestamp,
1190                parquet_metadata,
1191                region_metadata: self.metadata.clone(),
1192                num_series,
1193                max_sequence,
1194            },
1195        }))
1196    }
1197
1198    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
1199    pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1200        if part.batch.num_rows() == 0 {
1201            return Ok(None);
1202        }
1203
1204        let mut buf = Vec::with_capacity(4096);
1205        let arrow_schema = part.batch.schema();
1206
1207        let file_metadata = {
1208            let mut writer =
1209                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1210                    .context(EncodeMemtableSnafu)?;
1211            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1212            writer.finish().context(EncodeMemtableSnafu)?
1213        };
1214
1215        let buf = Bytes::from(buf);
1216        let parquet_metadata = Arc::new(file_metadata);
1217
1218        Ok(Some(EncodedBulkPart {
1219            data: buf,
1220            metadata: BulkPartMeta {
1221                num_rows: part.batch.num_rows(),
1222                max_timestamp: part.max_timestamp,
1223                min_timestamp: part.min_timestamp,
1224                parquet_metadata,
1225                region_metadata: self.metadata.clone(),
1226                num_series: part.estimated_series_count() as u64,
1227                max_sequence: part.sequence,
1228            },
1229        }))
1230    }
1231}
1232
1233/// A collection of ordered RecordBatches representing a bulk part without parquet encoding.
1234///
1235/// Similar to `EncodedBulkPart` but stores raw RecordBatches instead of encoded parquet data.
1236/// The RecordBatches must be ordered by (primary key, timestamp, sequence desc).
1237/// Uses SmallVec to optimize for the common case of few batches while avoiding heap allocation.
1238#[derive(Debug, Clone)]
1239pub struct MultiBulkPart {
1240    /// Ordered record batches. SmallVec optimized for up to 4 batches inline.
1241    batches: SmallVec<[RecordBatch; 4]>,
1242    /// Total rows across all batches.
1243    total_rows: usize,
1244    /// Max timestamp in part.
1245    max_timestamp: i64,
1246    /// Min timestamp in part.
1247    min_timestamp: i64,
1248    /// Max sequence number in part.
1249    max_sequence: SequenceNumber,
1250    /// Number of series.
1251    series_count: usize,
1252}
1253
1254impl MultiBulkPart {
1255    /// Creates a new MultiBulkPart from a single BulkPart.
1256    pub fn from_bulk_part(part: BulkPart) -> Self {
1257        let num_rows = part.num_rows();
1258        let series_count = part.estimated_series_count();
1259        let mut batches = SmallVec::new();
1260        batches.push(part.batch);
1261
1262        Self {
1263            batches,
1264            total_rows: num_rows,
1265            max_timestamp: part.max_timestamp,
1266            min_timestamp: part.min_timestamp,
1267            max_sequence: part.sequence,
1268            series_count,
1269        }
1270    }
1271
1272    /// Creates a new MultiBulkPart from multiple ordered RecordBatches.
1273    ///
1274    /// # Arguments
1275    /// * `batches` - Ordered record batches
1276    /// * `min_timestamp` - Minimum timestamp across all batches
1277    /// * `max_timestamp` - Maximum timestamp across all batches
1278    /// * `max_sequence` - Maximum sequence number across all batches
1279    /// * `series_count` - Number of series in the batches
1280    ///
1281    /// # Panics
1282    /// Panics if batches is empty.
1283    pub fn new(
1284        batches: Vec<RecordBatch>,
1285        min_timestamp: i64,
1286        max_timestamp: i64,
1287        max_sequence: SequenceNumber,
1288        series_count: usize,
1289    ) -> Self {
1290        assert!(!batches.is_empty(), "batches must not be empty");
1291
1292        let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1293
1294        Self {
1295            batches: SmallVec::from_vec(batches),
1296            total_rows,
1297            max_timestamp,
1298            min_timestamp,
1299            max_sequence,
1300            series_count,
1301        }
1302    }
1303
1304    /// Returns the total number of rows across all batches.
1305    pub fn num_rows(&self) -> usize {
1306        self.total_rows
1307    }
1308
1309    /// Returns the minimum timestamp.
1310    pub fn min_timestamp(&self) -> i64 {
1311        self.min_timestamp
1312    }
1313
1314    /// Returns the maximum timestamp.
1315    pub fn max_timestamp(&self) -> i64 {
1316        self.max_timestamp
1317    }
1318
1319    /// Returns the maximum sequence number.
1320    pub fn max_sequence(&self) -> SequenceNumber {
1321        self.max_sequence
1322    }
1323
1324    /// Returns the number of series.
1325    pub fn series_count(&self) -> usize {
1326        self.series_count
1327    }
1328
1329    /// Returns the number of record batches in this part.
1330    pub fn num_batches(&self) -> usize {
1331        self.batches.len()
1332    }
1333
1334    /// Returns the estimated memory size of all batches.
1335    pub(crate) fn estimated_size(&self) -> usize {
1336        self.batches.iter().map(record_batch_estimated_size).sum()
1337    }
1338
1339    /// Reads data from this part with the given context and filters.
1340    pub(crate) fn read(
1341        &self,
1342        context: BulkIterContextRef,
1343        sequence: Option<SequenceRange>,
1344        mem_scan_metrics: Option<MemScanMetrics>,
1345    ) -> Result<Option<BoxedRecordBatchIterator>> {
1346        if self.batches.is_empty() {
1347            return Ok(None);
1348        }
1349
1350        let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1351            self.batches.iter().cloned().collect(),
1352            context,
1353            sequence,
1354            self.series_count,
1355            mem_scan_metrics,
1356        );
1357        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1358    }
1359
1360    /// Converts this `MultiBulkPart` to `MemtableStats`.
1361    pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1362        let ts_type = region_metadata.time_index_type();
1363        let min_ts = ts_type.create_timestamp(self.min_timestamp);
1364        let max_ts = ts_type.create_timestamp(self.max_timestamp);
1365
1366        MemtableStats {
1367            estimated_bytes: self.estimated_size(),
1368            time_range: Some((min_ts, max_ts)),
1369            num_rows: self.num_rows(),
1370            num_ranges: 1,
1371            max_sequence: self.max_sequence,
1372            series_count: self.series_count,
1373        }
1374    }
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use api::v1::{Row, SemanticType, WriteHint};
1380    use datafusion_common::ScalarValue;
1381    use datatypes::arrow::array::{
1382        BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1383    };
1384    use datatypes::arrow::datatypes::UInt32Type;
1385    use datatypes::prelude::{ConcreteDataType, Value};
1386    use datatypes::schema::ColumnSchema;
1387    use mito_codec::row_converter::build_primary_key_codec;
1388    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1389    use store_api::storage::RegionId;
1390    use store_api::storage::consts::ReservedColumnId;
1391    use table::predicate::Predicate;
1392
1393    use super::*;
1394    use crate::memtable::bulk::context::BulkIterContext;
1395    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1396    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1397
1398    struct MutationInput<'a> {
1399        k0: &'a str,
1400        k1: u32,
1401        timestamps: &'a [i64],
1402        v1: &'a [Option<f64>],
1403        sequence: u64,
1404    }
1405
1406    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1407        let metadata = metadata_for_test();
1408        let kvs = input
1409            .iter()
1410            .map(|m| {
1411                build_key_values_with_ts_seq_values(
1412                    &metadata,
1413                    m.k0.to_string(),
1414                    m.k1,
1415                    m.timestamps.iter().copied(),
1416                    m.v1.iter().copied(),
1417                    m.sequence,
1418                )
1419            })
1420            .collect::<Vec<_>>();
1421        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1422        let primary_key_codec = build_primary_key_codec(&metadata);
1423        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1424        for kv in kvs {
1425            converter.append_key_values(&kv).unwrap();
1426        }
1427        let part = converter.convert().unwrap();
1428        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1429        encoder.encode_part(&part).unwrap().unwrap()
1430    }
1431
1432    #[test]
1433    fn test_write_and_read_part_projection() {
1434        let part = encode(&[
1435            MutationInput {
1436                k0: "a",
1437                k1: 0,
1438                timestamps: &[1],
1439                v1: &[Some(0.1)],
1440                sequence: 0,
1441            },
1442            MutationInput {
1443                k0: "b",
1444                k1: 0,
1445                timestamps: &[1],
1446                v1: &[Some(0.0)],
1447                sequence: 0,
1448            },
1449            MutationInput {
1450                k0: "a",
1451                k1: 0,
1452                timestamps: &[2],
1453                v1: &[Some(0.2)],
1454                sequence: 1,
1455            },
1456        ]);
1457
1458        let projection = &[4u32];
1459        let reader = part
1460            .read(
1461                Arc::new(
1462                    BulkIterContext::new(
1463                        part.metadata.region_metadata.clone(),
1464                        Some(projection.as_slice()),
1465                        None,
1466                        false,
1467                    )
1468                    .unwrap(),
1469                ),
1470                None,
1471                None,
1472            )
1473            .unwrap()
1474            .expect("expect at least one row group");
1475
1476        let mut total_rows_read = 0;
1477        let mut field: Vec<f64> = vec![];
1478        for res in reader {
1479            let batch = res.unwrap();
1480            assert_eq!(5, batch.num_columns());
1481            field.extend_from_slice(
1482                batch
1483                    .column(0)
1484                    .as_any()
1485                    .downcast_ref::<Float64Array>()
1486                    .unwrap()
1487                    .values(),
1488            );
1489            total_rows_read += batch.num_rows();
1490        }
1491        assert_eq!(3, total_rows_read);
1492        assert_eq!(vec![0.1, 0.2, 0.0], field);
1493    }
1494
1495    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1496        let metadata = metadata_for_test();
1497        let kvs = key_values
1498            .into_iter()
1499            .map(|(k0, k1, (start, end), sequence)| {
1500                let ts = start..end;
1501                let v1 = (start..end).map(|_| None);
1502                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1503            })
1504            .collect::<Vec<_>>();
1505        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1506        let primary_key_codec = build_primary_key_codec(&metadata);
1507        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1508        for kv in kvs {
1509            converter.append_key_values(&kv).unwrap();
1510        }
1511        let part = converter.convert().unwrap();
1512        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1513        encoder.encode_part(&part).unwrap().unwrap()
1514    }
1515
1516    fn check_prune_row_group(
1517        part: &EncodedBulkPart,
1518        predicate: Option<Predicate>,
1519        expected_rows: usize,
1520    ) {
1521        let context = Arc::new(
1522            BulkIterContext::new(
1523                part.metadata.region_metadata.clone(),
1524                None,
1525                predicate,
1526                false,
1527            )
1528            .unwrap(),
1529        );
1530        let reader = part
1531            .read(context, None, None)
1532            .unwrap()
1533            .expect("expect at least one row group");
1534        let mut total_rows_read = 0;
1535        for res in reader {
1536            let batch = res.unwrap();
1537            total_rows_read += batch.num_rows();
1538        }
1539        // Should only read row group 1.
1540        assert_eq!(expected_rows, total_rows_read);
1541    }
1542
1543    #[test]
1544    fn test_prune_row_groups() {
1545        let part = prepare(vec![
1546            ("a", 0, (0, 40), 1),
1547            ("a", 1, (0, 60), 1),
1548            ("b", 0, (0, 100), 2),
1549            ("b", 1, (100, 180), 3),
1550            ("b", 1, (180, 210), 4),
1551        ]);
1552
1553        let context = Arc::new(
1554            BulkIterContext::new(
1555                part.metadata.region_metadata.clone(),
1556                None,
1557                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1558                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1559                )])),
1560                false,
1561            )
1562            .unwrap(),
1563        );
1564        assert!(part.read(context, None, None).unwrap().is_none());
1565
1566        check_prune_row_group(&part, None, 310);
1567
1568        check_prune_row_group(
1569            &part,
1570            Some(Predicate::new(vec![
1571                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1572                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1573            ])),
1574            40,
1575        );
1576
1577        check_prune_row_group(
1578            &part,
1579            Some(Predicate::new(vec![
1580                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1581                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1582            ])),
1583            60,
1584        );
1585
1586        check_prune_row_group(
1587            &part,
1588            Some(Predicate::new(vec![
1589                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1590            ])),
1591            100,
1592        );
1593
1594        check_prune_row_group(
1595            &part,
1596            Some(Predicate::new(vec![
1597                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1598                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1599            ])),
1600            100,
1601        );
1602
1603        // Predicates over field column can do precise filtering.
1604        check_prune_row_group(
1605            &part,
1606            Some(Predicate::new(vec![
1607                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1608            ])),
1609            1,
1610        );
1611    }
1612
1613    #[test]
1614    fn test_bulk_part_converter_append_and_convert() {
1615        let metadata = metadata_for_test();
1616        let capacity = 100;
1617        let primary_key_codec = build_primary_key_codec(&metadata);
1618        let schema = to_flat_sst_arrow_schema(
1619            &metadata,
1620            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1621        );
1622
1623        let mut converter =
1624            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1625
1626        let key_values1 = build_key_values_with_ts_seq_values(
1627            &metadata,
1628            "key1".to_string(),
1629            1u32,
1630            vec![1000, 2000].into_iter(),
1631            vec![Some(1.0), Some(2.0)].into_iter(),
1632            1,
1633        );
1634
1635        let key_values2 = build_key_values_with_ts_seq_values(
1636            &metadata,
1637            "key2".to_string(),
1638            2u32,
1639            vec![1500].into_iter(),
1640            vec![Some(3.0)].into_iter(),
1641            2,
1642        );
1643
1644        converter.append_key_values(&key_values1).unwrap();
1645        converter.append_key_values(&key_values2).unwrap();
1646
1647        let bulk_part = converter.convert().unwrap();
1648
1649        assert_eq!(bulk_part.num_rows(), 3);
1650        assert_eq!(bulk_part.min_timestamp, 1000);
1651        assert_eq!(bulk_part.max_timestamp, 2000);
1652        assert_eq!(bulk_part.sequence, 2);
1653        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1654
1655        // Validate primary key columns are stored
1656        // Schema should include primary key columns k0 and k1 at the beginning
1657        let schema = bulk_part.batch.schema();
1658        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1659        assert_eq!(
1660            field_names,
1661            vec![
1662                "k0",
1663                "k1",
1664                "v0",
1665                "v1",
1666                "ts",
1667                "__primary_key",
1668                "__sequence",
1669                "__op_type"
1670            ]
1671        );
1672    }
1673
1674    #[test]
1675    fn test_bulk_part_converter_sorting() {
1676        let metadata = metadata_for_test();
1677        let capacity = 100;
1678        let primary_key_codec = build_primary_key_codec(&metadata);
1679        let schema = to_flat_sst_arrow_schema(
1680            &metadata,
1681            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1682        );
1683
1684        let mut converter =
1685            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1686
1687        let key_values1 = build_key_values_with_ts_seq_values(
1688            &metadata,
1689            "z_key".to_string(),
1690            3u32,
1691            vec![3000].into_iter(),
1692            vec![Some(3.0)].into_iter(),
1693            3,
1694        );
1695
1696        let key_values2 = build_key_values_with_ts_seq_values(
1697            &metadata,
1698            "a_key".to_string(),
1699            1u32,
1700            vec![1000].into_iter(),
1701            vec![Some(1.0)].into_iter(),
1702            1,
1703        );
1704
1705        let key_values3 = build_key_values_with_ts_seq_values(
1706            &metadata,
1707            "m_key".to_string(),
1708            2u32,
1709            vec![2000].into_iter(),
1710            vec![Some(2.0)].into_iter(),
1711            2,
1712        );
1713
1714        converter.append_key_values(&key_values1).unwrap();
1715        converter.append_key_values(&key_values2).unwrap();
1716        converter.append_key_values(&key_values3).unwrap();
1717
1718        let bulk_part = converter.convert().unwrap();
1719
1720        assert_eq!(bulk_part.num_rows(), 3);
1721
1722        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1723        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1724
1725        let ts_array = ts_column
1726            .as_any()
1727            .downcast_ref::<TimestampMillisecondArray>()
1728            .unwrap();
1729        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1730
1731        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1732        assert_eq!(seq_array.values(), &[1, 2, 3]);
1733
1734        // Validate primary key columns are stored
1735        let schema = bulk_part.batch.schema();
1736        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1737        assert_eq!(
1738            field_names,
1739            vec![
1740                "k0",
1741                "k1",
1742                "v0",
1743                "v1",
1744                "ts",
1745                "__primary_key",
1746                "__sequence",
1747                "__op_type"
1748            ]
1749        );
1750    }
1751
1752    #[test]
1753    fn test_bulk_part_converter_empty() {
1754        let metadata = metadata_for_test();
1755        let capacity = 10;
1756        let primary_key_codec = build_primary_key_codec(&metadata);
1757        let schema = to_flat_sst_arrow_schema(
1758            &metadata,
1759            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1760        );
1761
1762        let converter =
1763            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1764
1765        let bulk_part = converter.convert().unwrap();
1766
1767        assert_eq!(bulk_part.num_rows(), 0);
1768        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1769        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1770        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1771
1772        // Validate primary key columns are present in schema even for empty batch
1773        let schema = bulk_part.batch.schema();
1774        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1775        assert_eq!(
1776            field_names,
1777            vec![
1778                "k0",
1779                "k1",
1780                "v0",
1781                "v1",
1782                "ts",
1783                "__primary_key",
1784                "__sequence",
1785                "__op_type"
1786            ]
1787        );
1788    }
1789
1790    #[test]
1791    fn test_bulk_part_converter_without_primary_key_columns() {
1792        let metadata = metadata_for_test();
1793        let primary_key_codec = build_primary_key_codec(&metadata);
1794        let schema = to_flat_sst_arrow_schema(
1795            &metadata,
1796            &FlatSchemaOptions {
1797                raw_pk_columns: false,
1798                string_pk_use_dict: true,
1799            },
1800        );
1801
1802        let capacity = 100;
1803        let mut converter =
1804            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1805
1806        let key_values1 = build_key_values_with_ts_seq_values(
1807            &metadata,
1808            "key1".to_string(),
1809            1u32,
1810            vec![1000, 2000].into_iter(),
1811            vec![Some(1.0), Some(2.0)].into_iter(),
1812            1,
1813        );
1814
1815        let key_values2 = build_key_values_with_ts_seq_values(
1816            &metadata,
1817            "key2".to_string(),
1818            2u32,
1819            vec![1500].into_iter(),
1820            vec![Some(3.0)].into_iter(),
1821            2,
1822        );
1823
1824        converter.append_key_values(&key_values1).unwrap();
1825        converter.append_key_values(&key_values2).unwrap();
1826
1827        let bulk_part = converter.convert().unwrap();
1828
1829        assert_eq!(bulk_part.num_rows(), 3);
1830        assert_eq!(bulk_part.min_timestamp, 1000);
1831        assert_eq!(bulk_part.max_timestamp, 2000);
1832        assert_eq!(bulk_part.sequence, 2);
1833        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1834
1835        // Validate primary key columns are NOT stored individually
1836        let schema = bulk_part.batch.schema();
1837        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1838        assert_eq!(
1839            field_names,
1840            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1841        );
1842    }
1843
1844    #[allow(clippy::too_many_arguments)]
1845    fn build_key_values_with_sparse_encoding(
1846        metadata: &RegionMetadataRef,
1847        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1848        table_id: u32,
1849        tsid: u64,
1850        k0: String,
1851        k1: String,
1852        timestamps: impl Iterator<Item = i64>,
1853        values: impl Iterator<Item = Option<f64>>,
1854        sequence: SequenceNumber,
1855    ) -> KeyValues {
1856        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1857        let pk_values = vec![
1858            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1859            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1860            (0, Value::String(k0.clone().into())),
1861            (1, Value::String(k1.clone().into())),
1862        ];
1863        let mut encoded_key = Vec::new();
1864        primary_key_codec
1865            .encode_values(&pk_values, &mut encoded_key)
1866            .unwrap();
1867        assert!(!encoded_key.is_empty());
1868
1869        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1870        let column_schema = vec![
1871            api::v1::ColumnSchema {
1872                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1873                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1874                    ConcreteDataType::binary_datatype(),
1875                )
1876                .unwrap()
1877                .datatype() as i32,
1878                semantic_type: api::v1::SemanticType::Tag as i32,
1879                ..Default::default()
1880            },
1881            api::v1::ColumnSchema {
1882                column_name: "ts".to_string(),
1883                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1884                    ConcreteDataType::timestamp_millisecond_datatype(),
1885                )
1886                .unwrap()
1887                .datatype() as i32,
1888                semantic_type: api::v1::SemanticType::Timestamp as i32,
1889                ..Default::default()
1890            },
1891            api::v1::ColumnSchema {
1892                column_name: "v0".to_string(),
1893                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1894                    ConcreteDataType::int64_datatype(),
1895                )
1896                .unwrap()
1897                .datatype() as i32,
1898                semantic_type: api::v1::SemanticType::Field as i32,
1899                ..Default::default()
1900            },
1901            api::v1::ColumnSchema {
1902                column_name: "v1".to_string(),
1903                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1904                    ConcreteDataType::float64_datatype(),
1905                )
1906                .unwrap()
1907                .datatype() as i32,
1908                semantic_type: api::v1::SemanticType::Field as i32,
1909                ..Default::default()
1910            },
1911        ];
1912
1913        let rows = timestamps
1914            .zip(values)
1915            .map(|(ts, v)| Row {
1916                values: vec![
1917                    api::v1::Value {
1918                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1919                            encoded_key.clone(),
1920                        )),
1921                    },
1922                    api::v1::Value {
1923                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1924                    },
1925                    api::v1::Value {
1926                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1927                    },
1928                    api::v1::Value {
1929                        value_data: v.map(api::v1::value::ValueData::F64Value),
1930                    },
1931                ],
1932            })
1933            .collect();
1934
1935        let mutation = api::v1::Mutation {
1936            op_type: 1,
1937            sequence,
1938            rows: Some(api::v1::Rows {
1939                schema: column_schema,
1940                rows,
1941            }),
1942            write_hint: Some(WriteHint {
1943                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1944            }),
1945        };
1946        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1947    }
1948
1949    #[test]
1950    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1951        use api::v1::SemanticType;
1952        use datatypes::schema::ColumnSchema;
1953        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1954        use store_api::storage::RegionId;
1955
1956        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1957        builder
1958            .push_column_metadata(ColumnMetadata {
1959                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1960                semantic_type: SemanticType::Tag,
1961                column_id: 0,
1962            })
1963            .push_column_metadata(ColumnMetadata {
1964                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1965                semantic_type: SemanticType::Tag,
1966                column_id: 1,
1967            })
1968            .push_column_metadata(ColumnMetadata {
1969                column_schema: ColumnSchema::new(
1970                    "ts",
1971                    ConcreteDataType::timestamp_millisecond_datatype(),
1972                    false,
1973                ),
1974                semantic_type: SemanticType::Timestamp,
1975                column_id: 2,
1976            })
1977            .push_column_metadata(ColumnMetadata {
1978                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1979                semantic_type: SemanticType::Field,
1980                column_id: 3,
1981            })
1982            .push_column_metadata(ColumnMetadata {
1983                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1984                semantic_type: SemanticType::Field,
1985                column_id: 4,
1986            })
1987            .primary_key(vec![0, 1])
1988            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1989        let metadata = Arc::new(builder.build().unwrap());
1990
1991        let primary_key_codec = build_primary_key_codec(&metadata);
1992        let schema = to_flat_sst_arrow_schema(
1993            &metadata,
1994            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1995        );
1996
1997        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1998        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1999
2000        let capacity = 100;
2001        let mut converter =
2002            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2003
2004        let key_values1 = build_key_values_with_sparse_encoding(
2005            &metadata,
2006            &primary_key_codec,
2007            2048u32, // table_id
2008            100u64,  // tsid
2009            "key11".to_string(),
2010            "key21".to_string(),
2011            vec![1000, 2000].into_iter(),
2012            vec![Some(1.0), Some(2.0)].into_iter(),
2013            1,
2014        );
2015
2016        let key_values2 = build_key_values_with_sparse_encoding(
2017            &metadata,
2018            &primary_key_codec,
2019            4096u32, // table_id
2020            200u64,  // tsid
2021            "key12".to_string(),
2022            "key22".to_string(),
2023            vec![1500].into_iter(),
2024            vec![Some(3.0)].into_iter(),
2025            2,
2026        );
2027
2028        converter.append_key_values(&key_values1).unwrap();
2029        converter.append_key_values(&key_values2).unwrap();
2030
2031        let bulk_part = converter.convert().unwrap();
2032
2033        assert_eq!(bulk_part.num_rows(), 3);
2034        assert_eq!(bulk_part.min_timestamp, 1000);
2035        assert_eq!(bulk_part.max_timestamp, 2000);
2036        assert_eq!(bulk_part.sequence, 2);
2037        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2038
2039        // For sparse encoding, primary key columns should NOT be stored individually
2040        // even when store_primary_key_columns is true, because sparse encoding
2041        // stores the encoded primary key in the __primary_key column
2042        let schema = bulk_part.batch.schema();
2043        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2044        assert_eq!(
2045            field_names,
2046            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2047        );
2048
2049        // Verify the __primary_key column contains encoded sparse keys
2050        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2051        let dict_array = primary_key_column
2052            .as_any()
2053            .downcast_ref::<DictionaryArray<UInt32Type>>()
2054            .unwrap();
2055
2056        // Should have non-zero entries indicating encoded primary keys
2057        assert!(!dict_array.is_empty());
2058        assert_eq!(dict_array.len(), 3); // 3 rows total
2059
2060        // Verify values are properly encoded binary data (not empty)
2061        let values = dict_array
2062            .values()
2063            .as_any()
2064            .downcast_ref::<BinaryArray>()
2065            .unwrap();
2066        for i in 0..values.len() {
2067            assert!(
2068                !values.value(i).is_empty(),
2069                "Encoded primary key should not be empty"
2070            );
2071        }
2072    }
2073
2074    #[test]
2075    fn test_convert_bulk_part_empty() {
2076        let metadata = metadata_for_test();
2077        let schema = to_flat_sst_arrow_schema(
2078            &metadata,
2079            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2080        );
2081        let primary_key_codec = build_primary_key_codec(&metadata);
2082
2083        // Create empty batch
2084        let empty_batch = RecordBatch::new_empty(schema.clone());
2085        let empty_part = BulkPart {
2086            batch: empty_batch,
2087            max_timestamp: 0,
2088            min_timestamp: 0,
2089            sequence: 0,
2090            timestamp_index: 0,
2091            raw_data: None,
2092        };
2093
2094        let result =
2095            convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2096        assert!(result.is_none());
2097    }
2098
2099    #[test]
2100    fn test_convert_bulk_part_dense_with_pk_columns() {
2101        let metadata = metadata_for_test();
2102        let primary_key_codec = build_primary_key_codec(&metadata);
2103
2104        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2105            "key1", "key2", "key1",
2106        ]));
2107        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2108        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2109        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2110        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2111
2112        let input_schema = Arc::new(Schema::new(vec![
2113            Field::new("k0", ArrowDataType::Utf8, false),
2114            Field::new("k1", ArrowDataType::UInt32, false),
2115            Field::new("v0", ArrowDataType::Int64, true),
2116            Field::new("v1", ArrowDataType::Float64, true),
2117            Field::new(
2118                "ts",
2119                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2120                false,
2121            ),
2122        ]));
2123
2124        let input_batch = RecordBatch::try_new(
2125            input_schema,
2126            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2127        )
2128        .unwrap();
2129
2130        let part = BulkPart {
2131            batch: input_batch,
2132            max_timestamp: 2000,
2133            min_timestamp: 1000,
2134            sequence: 5,
2135            timestamp_index: 4,
2136            raw_data: None,
2137        };
2138
2139        let output_schema = to_flat_sst_arrow_schema(
2140            &metadata,
2141            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2142        );
2143
2144        let result = convert_bulk_part(
2145            part,
2146            &metadata,
2147            primary_key_codec,
2148            output_schema,
2149            true, // store primary key columns
2150        )
2151        .unwrap();
2152
2153        let converted = result.unwrap();
2154
2155        assert_eq!(converted.num_rows(), 3);
2156        assert_eq!(converted.max_timestamp, 2000);
2157        assert_eq!(converted.min_timestamp, 1000);
2158        assert_eq!(converted.sequence, 5);
2159
2160        let schema = converted.batch.schema();
2161        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2162        assert_eq!(
2163            field_names,
2164            vec![
2165                "k0",
2166                "k1",
2167                "v0",
2168                "v1",
2169                "ts",
2170                "__primary_key",
2171                "__sequence",
2172                "__op_type"
2173            ]
2174        );
2175
2176        let k0_col = converted.batch.column_by_name("k0").unwrap();
2177        assert!(matches!(
2178            k0_col.data_type(),
2179            ArrowDataType::Dictionary(_, _)
2180        ));
2181
2182        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2183        let dict_array = pk_col
2184            .as_any()
2185            .downcast_ref::<DictionaryArray<UInt32Type>>()
2186            .unwrap();
2187        let keys = dict_array.keys();
2188
2189        assert_eq!(keys.len(), 3);
2190    }
2191
2192    #[test]
2193    fn test_convert_bulk_part_dense_without_pk_columns() {
2194        let metadata = metadata_for_test();
2195        let primary_key_codec = build_primary_key_codec(&metadata);
2196
2197        // Create input batch with primary key columns (k0, k1)
2198        let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2199        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2200        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2201        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2202        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2203
2204        let input_schema = Arc::new(Schema::new(vec![
2205            Field::new("k0", ArrowDataType::Utf8, false),
2206            Field::new("k1", ArrowDataType::UInt32, false),
2207            Field::new("v0", ArrowDataType::Int64, true),
2208            Field::new("v1", ArrowDataType::Float64, true),
2209            Field::new(
2210                "ts",
2211                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2212                false,
2213            ),
2214        ]));
2215
2216        let input_batch = RecordBatch::try_new(
2217            input_schema,
2218            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2219        )
2220        .unwrap();
2221
2222        let part = BulkPart {
2223            batch: input_batch,
2224            max_timestamp: 2000,
2225            min_timestamp: 1000,
2226            sequence: 3,
2227            timestamp_index: 4,
2228            raw_data: None,
2229        };
2230
2231        let output_schema = to_flat_sst_arrow_schema(
2232            &metadata,
2233            &FlatSchemaOptions {
2234                raw_pk_columns: false,
2235                string_pk_use_dict: true,
2236            },
2237        );
2238
2239        let result = convert_bulk_part(
2240            part,
2241            &metadata,
2242            primary_key_codec,
2243            output_schema,
2244            false, // don't store primary key columns
2245        )
2246        .unwrap();
2247
2248        let converted = result.unwrap();
2249
2250        assert_eq!(converted.num_rows(), 2);
2251        assert_eq!(converted.max_timestamp, 2000);
2252        assert_eq!(converted.min_timestamp, 1000);
2253        assert_eq!(converted.sequence, 3);
2254
2255        // Verify schema does NOT include individual primary key columns
2256        let schema = converted.batch.schema();
2257        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2258        assert_eq!(
2259            field_names,
2260            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2261        );
2262
2263        // Verify __primary_key column is present and is a dictionary
2264        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2265        assert!(matches!(
2266            pk_col.data_type(),
2267            ArrowDataType::Dictionary(_, _)
2268        ));
2269    }
2270
2271    #[test]
2272    fn test_convert_bulk_part_sparse_encoding() {
2273        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2274        builder
2275            .push_column_metadata(ColumnMetadata {
2276                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2277                semantic_type: SemanticType::Tag,
2278                column_id: 0,
2279            })
2280            .push_column_metadata(ColumnMetadata {
2281                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2282                semantic_type: SemanticType::Tag,
2283                column_id: 1,
2284            })
2285            .push_column_metadata(ColumnMetadata {
2286                column_schema: ColumnSchema::new(
2287                    "ts",
2288                    ConcreteDataType::timestamp_millisecond_datatype(),
2289                    false,
2290                ),
2291                semantic_type: SemanticType::Timestamp,
2292                column_id: 2,
2293            })
2294            .push_column_metadata(ColumnMetadata {
2295                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2296                semantic_type: SemanticType::Field,
2297                column_id: 3,
2298            })
2299            .push_column_metadata(ColumnMetadata {
2300                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2301                semantic_type: SemanticType::Field,
2302                column_id: 4,
2303            })
2304            .primary_key(vec![0, 1])
2305            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2306        let metadata = Arc::new(builder.build().unwrap());
2307
2308        let primary_key_codec = build_primary_key_codec(&metadata);
2309
2310        // Create input batch with __primary_key column (sparse encoding)
2311        let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2312            b"encoded_key_1".as_slice(),
2313            b"encoded_key_2".as_slice(),
2314        ]));
2315        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2316        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2317        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2318
2319        let input_schema = Arc::new(Schema::new(vec![
2320            Field::new("__primary_key", ArrowDataType::Binary, false),
2321            Field::new("v0", ArrowDataType::Int64, true),
2322            Field::new("v1", ArrowDataType::Float64, true),
2323            Field::new(
2324                "ts",
2325                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2326                false,
2327            ),
2328        ]));
2329
2330        let input_batch =
2331            RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2332                .unwrap();
2333
2334        let part = BulkPart {
2335            batch: input_batch,
2336            max_timestamp: 2000,
2337            min_timestamp: 1000,
2338            sequence: 7,
2339            timestamp_index: 3,
2340            raw_data: None,
2341        };
2342
2343        let output_schema = to_flat_sst_arrow_schema(
2344            &metadata,
2345            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2346        );
2347
2348        let result = convert_bulk_part(
2349            part,
2350            &metadata,
2351            primary_key_codec,
2352            output_schema,
2353            true, // store_primary_key_columns (ignored for sparse)
2354        )
2355        .unwrap();
2356
2357        let converted = result.unwrap();
2358
2359        assert_eq!(converted.num_rows(), 2);
2360        assert_eq!(converted.max_timestamp, 2000);
2361        assert_eq!(converted.min_timestamp, 1000);
2362        assert_eq!(converted.sequence, 7);
2363
2364        // Verify schema does NOT include individual primary key columns (sparse encoding)
2365        let schema = converted.batch.schema();
2366        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2367        assert_eq!(
2368            field_names,
2369            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2370        );
2371
2372        // Verify __primary_key is dictionary encoded
2373        let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2374        assert!(matches!(
2375            pk_col.data_type(),
2376            ArrowDataType::Dictionary(_, _)
2377        ));
2378    }
2379
2380    #[test]
2381    fn test_convert_bulk_part_sorting_with_multiple_series() {
2382        let metadata = metadata_for_test();
2383        let primary_key_codec = build_primary_key_codec(&metadata);
2384
2385        // Create unsorted batch with multiple series and timestamps
2386        let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2387            "series_b", "series_a", "series_b", "series_a",
2388        ]));
2389        let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2390        let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2391        let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2392        let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2393            2000, 1000, 4000, 3000,
2394        ]));
2395
2396        let input_schema = Arc::new(Schema::new(vec![
2397            Field::new("k0", ArrowDataType::Utf8, false),
2398            Field::new("k1", ArrowDataType::UInt32, false),
2399            Field::new("v0", ArrowDataType::Int64, true),
2400            Field::new("v1", ArrowDataType::Float64, true),
2401            Field::new(
2402                "ts",
2403                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2404                false,
2405            ),
2406        ]));
2407
2408        let input_batch = RecordBatch::try_new(
2409            input_schema,
2410            vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2411        )
2412        .unwrap();
2413
2414        let part = BulkPart {
2415            batch: input_batch,
2416            max_timestamp: 4000,
2417            min_timestamp: 1000,
2418            sequence: 10,
2419            timestamp_index: 4,
2420            raw_data: None,
2421        };
2422
2423        let output_schema = to_flat_sst_arrow_schema(
2424            &metadata,
2425            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2426        );
2427
2428        let result =
2429            convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2430
2431        let converted = result.unwrap();
2432
2433        assert_eq!(converted.num_rows(), 4);
2434
2435        // Verify data is sorted by (primary_key, timestamp, sequence desc)
2436        let ts_col = converted.batch.column(converted.timestamp_index);
2437        let ts_array = ts_col
2438            .as_any()
2439            .downcast_ref::<TimestampMillisecondArray>()
2440            .unwrap();
2441
2442        // After sorting by (pk, ts), we should have:
2443        // series_a,1: ts=1000, 3000
2444        // series_b,2: ts=2000, 4000
2445        let timestamps: Vec<i64> = ts_array.values().to_vec();
2446        assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2447    }
2448}