mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datatypes::arrow;
28use datatypes::arrow::array::{
29    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
30    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
31    TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder,
32    UInt8Array, UInt8Builder,
33};
34use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
35use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::{Value, ValueRef};
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
42use mito_codec::row_converter::{
43    build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt,
44};
45use parquet::arrow::ArrowWriter;
46use parquet::data_type::AsBytes;
47use parquet::file::metadata::ParquetMetaData;
48use parquet::file::properties::WriterProperties;
49use snafu::{OptionExt, ResultExt, Snafu};
50use store_api::codec::PrimaryKeyEncoding;
51use store_api::metadata::{RegionMetadata, RegionMetadataRef};
52use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
53use store_api::storage::SequenceNumber;
54use table::predicate::Predicate;
55
56use crate::error::{
57    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
58    EncodeSnafu, NewRecordBatchSnafu, Result,
59};
60use crate::memtable::bulk::context::BulkIterContextRef;
61use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
62use crate::memtable::time_series::{ValueBuilder, Values};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::sst::parquet::flat_format::primary_key_column_index;
65use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
66use crate::sst::parquet::helper::parse_parquet_metadata;
67use crate::sst::to_sst_arrow_schema;
68
69const INIT_DICT_VALUE_CAPACITY: usize = 8;
70
71#[derive(Clone)]
72pub struct BulkPart {
73    pub batch: RecordBatch,
74    pub max_ts: i64,
75    pub min_ts: i64,
76    pub sequence: u64,
77    pub timestamp_index: usize,
78    pub raw_data: Option<ArrowIpc>,
79}
80
81impl TryFrom<BulkWalEntry> for BulkPart {
82    type Error = error::Error;
83
84    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
85        match value.body.expect("Entry payload should be present") {
86            Body::ArrowIpc(ipc) => {
87                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
88                    .context(error::ConvertBulkWalEntrySnafu)?;
89                let batch = decoder
90                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
91                    .context(error::ConvertBulkWalEntrySnafu)?;
92                Ok(Self {
93                    batch,
94                    max_ts: value.max_ts,
95                    min_ts: value.min_ts,
96                    sequence: value.sequence,
97                    timestamp_index: value.timestamp_index as usize,
98                    raw_data: Some(ipc),
99                })
100            }
101        }
102    }
103}
104
105impl TryFrom<&BulkPart> for BulkWalEntry {
106    type Error = error::Error;
107
108    fn try_from(value: &BulkPart) -> Result<Self> {
109        if let Some(ipc) = &value.raw_data {
110            Ok(BulkWalEntry {
111                sequence: value.sequence,
112                max_ts: value.max_ts,
113                min_ts: value.min_ts,
114                timestamp_index: value.timestamp_index as u32,
115                body: Some(Body::ArrowIpc(ipc.clone())),
116            })
117        } else {
118            let mut encoder = FlightEncoder::default();
119            let schema_bytes = encoder
120                .encode_schema(value.batch.schema().as_ref())
121                .data_header;
122            let [rb_data] = encoder
123                .encode(FlightMessage::RecordBatch(value.batch.clone()))
124                .try_into()
125                .map_err(|_| {
126                    error::UnsupportedOperationSnafu {
127                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
128                    }
129                    .build()
130                })?;
131            Ok(BulkWalEntry {
132                sequence: value.sequence,
133                max_ts: value.max_ts,
134                min_ts: value.min_ts,
135                timestamp_index: value.timestamp_index as u32,
136                body: Some(Body::ArrowIpc(ArrowIpc {
137                    schema: schema_bytes,
138                    data_header: rb_data.data_header,
139                    payload: rb_data.data_body,
140                })),
141            })
142        }
143    }
144}
145
146impl BulkPart {
147    pub(crate) fn estimated_size(&self) -> usize {
148        self.batch
149            .columns()
150            .iter()
151            // If can not get slice memory size, assume 0 here.
152            .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
153            .sum()
154    }
155
156    /// Returns the estimated series count in this BulkPart.
157    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
158    pub fn estimated_series_count(&self) -> usize {
159        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
160        let pk_column = self.batch.column(pk_column_idx);
161        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
162            dict_array.values().len()
163        } else {
164            0
165        }
166    }
167
168    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
169    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
170        let vectors = region_metadata
171            .schema
172            .column_schemas()
173            .iter()
174            .map(|col| match self.batch.column_by_name(&col.name) {
175                None => Ok(None),
176                Some(col) => Helper::try_into_vector(col).map(Some),
177            })
178            .collect::<datatypes::error::Result<Vec<_>>>()
179            .context(error::ComputeVectorSnafu)?;
180
181        let rows = (0..self.num_rows())
182            .map(|row_idx| {
183                let values = (0..self.batch.num_columns())
184                    .map(|col_idx| {
185                        if let Some(v) = &vectors[col_idx] {
186                            value_to_grpc_value(v.get(row_idx))
187                        } else {
188                            api::v1::Value { value_data: None }
189                        }
190                    })
191                    .collect::<Vec<_>>();
192                api::v1::Row { values }
193            })
194            .collect::<Vec<_>>();
195
196        let schema = region_metadata
197            .column_metadatas
198            .iter()
199            .map(|c| {
200                let data_type_wrapper =
201                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
202                Ok(api::v1::ColumnSchema {
203                    column_name: c.column_schema.name.clone(),
204                    datatype: data_type_wrapper.datatype() as i32,
205                    semantic_type: c.semantic_type as i32,
206                    ..Default::default()
207                })
208            })
209            .collect::<api::error::Result<Vec<_>>>()
210            .context(error::ConvertColumnDataTypeSnafu {
211                reason: "failed to convert region metadata to column schema",
212            })?;
213
214        let rows = api::v1::Rows { schema, rows };
215
216        Ok(Mutation {
217            op_type: OpType::Put as i32,
218            sequence: self.sequence,
219            rows: Some(rows),
220            write_hint: None,
221        })
222    }
223
224    pub fn timestamps(&self) -> &ArrayRef {
225        self.batch.column(self.timestamp_index)
226    }
227
228    pub fn num_rows(&self) -> usize {
229        self.batch.num_rows()
230    }
231}
232
233/// Primary key column builder for handling strings specially.
234enum PrimaryKeyColumnBuilder {
235    /// String dictionary builder for string types.
236    StringDict(StringDictionaryBuilder<UInt32Type>),
237    /// Generic mutable vector for other types.
238    Vector(Box<dyn MutableVector>),
239}
240
241impl PrimaryKeyColumnBuilder {
242    /// Appends a value to the builder.
243    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
244        match self {
245            PrimaryKeyColumnBuilder::StringDict(builder) => {
246                if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
247                    // We know the value is a string.
248                    builder.append_value(s);
249                } else {
250                    builder.append_null();
251                }
252            }
253            PrimaryKeyColumnBuilder::Vector(builder) => {
254                builder.push_value_ref(value);
255            }
256        }
257        Ok(())
258    }
259
260    /// Converts the builder to an ArrayRef.
261    fn into_arrow_array(self) -> ArrayRef {
262        match self {
263            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
264            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
265        }
266    }
267}
268
269/// Converter that converts structs into [BulkPart].
270pub struct BulkPartConverter {
271    /// Region metadata.
272    region_metadata: RegionMetadataRef,
273    /// Schema of the converted batch.
274    schema: SchemaRef,
275    /// Primary key codec for encoding keys
276    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
277    /// Buffer for encoding primary key.
278    key_buf: Vec<u8>,
279    /// Primary key array builder.
280    key_array_builder: PrimaryKeyArrayBuilder,
281    /// Builders for non-primary key columns.
282    value_builder: ValueBuilder,
283    /// Builders for individual primary key columns.
284    /// The order of builders is the same as the order of primary key columns in the region metadata.
285    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
286
287    /// Max timestamp value.
288    max_ts: i64,
289    /// Min timestamp value.
290    min_ts: i64,
291    /// Max sequence number.
292    max_sequence: SequenceNumber,
293}
294
295impl BulkPartConverter {
296    /// Creates a new converter.
297    ///
298    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
299    /// stores primary key columns in arrays additionally.
300    pub fn new(
301        region_metadata: &RegionMetadataRef,
302        schema: SchemaRef,
303        capacity: usize,
304        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
305        store_primary_key_columns: bool,
306    ) -> Self {
307        debug_assert_eq!(
308            region_metadata.primary_key_encoding,
309            primary_key_codec.encoding()
310        );
311
312        let primary_key_column_builders = if store_primary_key_columns
313            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
314        {
315            new_primary_key_column_builders(region_metadata, capacity)
316        } else {
317            Vec::new()
318        };
319
320        Self {
321            region_metadata: region_metadata.clone(),
322            schema,
323            primary_key_codec,
324            key_buf: Vec::new(),
325            key_array_builder: PrimaryKeyArrayBuilder::new(),
326            value_builder: ValueBuilder::new(region_metadata, capacity),
327            primary_key_column_builders,
328            min_ts: i64::MAX,
329            max_ts: i64::MIN,
330            max_sequence: SequenceNumber::MIN,
331        }
332    }
333
334    /// Appends a [KeyValues] into the converter.
335    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
336        for kv in key_values.iter() {
337            self.append_key_value(&kv)?;
338        }
339
340        Ok(())
341    }
342
343    /// Appends a [KeyValue] to builders.
344    ///
345    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
346    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
347        // Handles primary key based on encoding type
348        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
349            // For sparse encoding, the primary key is already encoded in the KeyValue
350            // Gets the first (and only) primary key value which contains the encoded key
351            let mut primary_keys = kv.primary_keys();
352            if let Some(encoded) = primary_keys
353                .next()
354                .context(ColumnNotFoundSnafu {
355                    column: PRIMARY_KEY_COLUMN_NAME,
356                })?
357                .as_binary()
358                .context(DataTypeMismatchSnafu)?
359            {
360                self.key_array_builder
361                    .append(encoded)
362                    .context(ComputeArrowSnafu)?;
363            } else {
364                self.key_array_builder
365                    .append("")
366                    .context(ComputeArrowSnafu)?;
367            }
368        } else {
369            // For dense encoding, we need to encode the primary key columns
370            self.key_buf.clear();
371            self.primary_key_codec
372                .encode_key_value(kv, &mut self.key_buf)
373                .context(EncodeSnafu)?;
374            self.key_array_builder
375                .append(&self.key_buf)
376                .context(ComputeArrowSnafu)?;
377        };
378
379        // If storing primary key columns, append values to individual builders
380        if !self.primary_key_column_builders.is_empty() {
381            for (builder, pk_value) in self
382                .primary_key_column_builders
383                .iter_mut()
384                .zip(kv.primary_keys())
385            {
386                builder.push_value_ref(pk_value)?;
387            }
388        }
389
390        // Pushes other columns.
391        self.value_builder.push(
392            kv.timestamp(),
393            kv.sequence(),
394            kv.op_type() as u8,
395            kv.fields(),
396        );
397
398        // Updates statistics
399        // Safety: timestamp of kv must be both present and a valid timestamp value.
400        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
401        self.min_ts = self.min_ts.min(ts);
402        self.max_ts = self.max_ts.max(ts);
403        self.max_sequence = self.max_sequence.max(kv.sequence());
404
405        Ok(())
406    }
407
408    /// Converts buffered content into a [BulkPart].
409    ///
410    /// It sorts the record batch by (primary key, timestamp, sequence desc).
411    pub fn convert(mut self) -> Result<BulkPart> {
412        let values = Values::from(self.value_builder);
413        let mut columns =
414            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
415
416        // Build primary key column arrays if enabled.
417        for builder in self.primary_key_column_builders {
418            columns.push(builder.into_arrow_array());
419        }
420        // Then fields columns.
421        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
422        // Time index.
423        let timestamp_index = columns.len();
424        columns.push(values.timestamp.to_arrow_array());
425        // Primary key.
426        let pk_array = self.key_array_builder.finish();
427        columns.push(Arc::new(pk_array));
428        // Sequence and op type.
429        columns.push(values.sequence.to_arrow_array());
430        columns.push(values.op_type.to_arrow_array());
431
432        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
433        // Sorts the record batch.
434        let batch = sort_primary_key_record_batch(&batch)?;
435
436        Ok(BulkPart {
437            batch,
438            max_ts: self.max_ts,
439            min_ts: self.min_ts,
440            sequence: self.max_sequence,
441            timestamp_index,
442            raw_data: None,
443        })
444    }
445}
446
447fn new_primary_key_column_builders(
448    metadata: &RegionMetadata,
449    capacity: usize,
450) -> Vec<PrimaryKeyColumnBuilder> {
451    metadata
452        .primary_key_columns()
453        .map(|col| {
454            if col.column_schema.data_type.is_string() {
455                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
456                    capacity,
457                    INIT_DICT_VALUE_CAPACITY,
458                    capacity,
459                ))
460            } else {
461                PrimaryKeyColumnBuilder::Vector(
462                    col.column_schema.data_type.create_mutable_vector(capacity),
463                )
464            }
465        })
466        .collect()
467}
468
469/// Sorts the record batch with primary key format.
470fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
471    let total_columns = batch.num_columns();
472    let sort_columns = vec![
473        // Primary key column (ascending)
474        SortColumn {
475            values: batch.column(total_columns - 3).clone(),
476            options: Some(SortOptions {
477                descending: false,
478                nulls_first: true,
479            }),
480        },
481        // Time index column (ascending)
482        SortColumn {
483            values: batch.column(total_columns - 4).clone(),
484            options: Some(SortOptions {
485                descending: false,
486                nulls_first: true,
487            }),
488        },
489        // Sequence column (descending)
490        SortColumn {
491            values: batch.column(total_columns - 2).clone(),
492            options: Some(SortOptions {
493                descending: true,
494                nulls_first: true,
495            }),
496        },
497    ];
498
499    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
500        .context(ComputeArrowSnafu)?;
501
502    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
503}
504
505#[derive(Debug, Clone)]
506pub struct EncodedBulkPart {
507    data: Bytes,
508    metadata: BulkPartMeta,
509}
510
511impl EncodedBulkPart {
512    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
513        Self { data, metadata }
514    }
515
516    pub(crate) fn metadata(&self) -> &BulkPartMeta {
517        &self.metadata
518    }
519
520    pub(crate) fn read(
521        &self,
522        context: BulkIterContextRef,
523        sequence: Option<SequenceNumber>,
524    ) -> Result<Option<BoxedRecordBatchIterator>> {
525        // use predicate to find row groups to read.
526        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
527
528        if row_groups_to_read.is_empty() {
529            // All row groups are filtered.
530            return Ok(None);
531        }
532
533        let iter = EncodedBulkPartIter::try_new(
534            context,
535            row_groups_to_read,
536            self.metadata.parquet_metadata.clone(),
537            self.data.clone(),
538            sequence,
539        )?;
540        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
541    }
542}
543
544#[derive(Debug, Clone)]
545pub struct BulkPartMeta {
546    /// Total rows in part.
547    pub num_rows: usize,
548    /// Max timestamp in part.
549    pub max_timestamp: i64,
550    /// Min timestamp in part.
551    pub min_timestamp: i64,
552    /// Part file metadata.
553    pub parquet_metadata: Arc<ParquetMetaData>,
554    /// Part region schema.
555    pub region_metadata: RegionMetadataRef,
556}
557
558pub struct BulkPartEncoder {
559    metadata: RegionMetadataRef,
560    row_group_size: usize,
561    writer_props: Option<WriterProperties>,
562}
563
564impl BulkPartEncoder {
565    pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder {
566        let writer_props = Some(
567            WriterProperties::builder()
568                .set_write_batch_size(row_group_size)
569                .set_max_row_group_size(row_group_size)
570                .build(),
571        );
572        Self {
573            metadata,
574            row_group_size,
575            writer_props,
576        }
577    }
578}
579
580impl BulkPartEncoder {
581    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
582    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
583        if part.batch.num_rows() == 0 {
584            return Ok(None);
585        }
586
587        let mut buf = Vec::with_capacity(4096);
588        let arrow_schema = part.batch.schema();
589
590        let file_metadata = {
591            let mut writer =
592                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
593                    .context(EncodeMemtableSnafu)?;
594            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
595            writer.finish().context(EncodeMemtableSnafu)?
596        };
597
598        let buf = Bytes::from(buf);
599        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
600
601        Ok(Some(EncodedBulkPart {
602            data: buf,
603            metadata: BulkPartMeta {
604                num_rows: part.batch.num_rows(),
605                max_timestamp: part.max_ts,
606                min_timestamp: part.min_ts,
607                parquet_metadata,
608                region_metadata: self.metadata.clone(),
609            },
610        }))
611    }
612}
613
614/// Converts mutations to record batches.
615fn mutations_to_record_batch(
616    mutations: &[Mutation],
617    metadata: &RegionMetadataRef,
618    pk_encoder: &DensePrimaryKeyCodec,
619    dedup: bool,
620) -> Result<Option<(RecordBatch, i64, i64)>> {
621    let total_rows: usize = mutations
622        .iter()
623        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
624        .sum();
625
626    if total_rows == 0 {
627        return Ok(None);
628    }
629
630    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
631
632    let mut ts_vector: Box<dyn MutableVector> = metadata
633        .time_index_column()
634        .column_schema
635        .data_type
636        .create_mutable_vector(total_rows);
637    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
638    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
639
640    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
641        .field_columns()
642        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
643        .collect();
644
645    let mut pk_buffer = vec![];
646    for m in mutations {
647        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
648            continue;
649        };
650
651        for row in key_values.iter() {
652            pk_buffer.clear();
653            pk_encoder
654                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
655                .context(EncodeSnafu)?;
656            pk_builder.append_value(pk_buffer.as_bytes());
657            ts_vector.push_value_ref(row.timestamp());
658            sequence_builder.append_value(row.sequence());
659            op_type_builder.append_value(row.op_type() as u8);
660            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
661                builder.push_value_ref(field);
662            }
663        }
664    }
665
666    let arrow_schema = to_sst_arrow_schema(metadata);
667    // safety: timestamp column must be valid, and values must not be None.
668    let timestamp_unit = metadata
669        .time_index_column()
670        .column_schema
671        .data_type
672        .as_timestamp()
673        .unwrap()
674        .unit();
675    let sorter = ArraysSorter {
676        encoded_primary_keys: pk_builder.finish(),
677        timestamp_unit,
678        timestamp: ts_vector.to_vector().to_arrow_array(),
679        sequence: sequence_builder.finish(),
680        op_type: op_type_builder.finish(),
681        fields: field_builders
682            .iter_mut()
683            .map(|f| f.to_vector().to_arrow_array()),
684        dedup,
685        arrow_schema,
686    };
687
688    sorter.sort().map(Some)
689}
690
691struct ArraysSorter<I> {
692    encoded_primary_keys: BinaryArray,
693    timestamp_unit: TimeUnit,
694    timestamp: ArrayRef,
695    sequence: UInt64Array,
696    op_type: UInt8Array,
697    fields: I,
698    dedup: bool,
699    arrow_schema: SchemaRef,
700}
701
702impl<I> ArraysSorter<I>
703where
704    I: Iterator<Item = ArrayRef>,
705{
706    /// Converts arrays to record batch.
707    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
708        debug_assert!(!self.timestamp.is_empty());
709        debug_assert!(self.timestamp.len() == self.sequence.len());
710        debug_assert!(self.timestamp.len() == self.op_type.len());
711        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
712
713        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
714        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
715        let mut to_sort = self
716            .encoded_primary_keys
717            .iter()
718            .zip(timestamp_iter)
719            .zip(self.sequence.iter())
720            .map(|((pk, timestamp), sequence)| {
721                max_timestamp = max_timestamp.max(*timestamp);
722                min_timestamp = min_timestamp.min(*timestamp);
723                (pk, timestamp, sequence)
724            })
725            .enumerate()
726            .collect::<Vec<_>>();
727
728        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
729            l_pk.cmp(r_pk)
730                .then(l_ts.cmp(r_ts))
731                .then(l_seq.cmp(r_seq).reverse())
732        });
733
734        if self.dedup {
735            // Dedup by timestamps while ignore sequence.
736            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
737                l_pk == r_pk && l_ts == r_ts
738            });
739        }
740
741        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
742
743        let pk_dictionary = Arc::new(binary_array_to_dictionary(
744            // safety: pk must be BinaryArray
745            arrow::compute::take(
746                &self.encoded_primary_keys,
747                &indices,
748                Some(TakeOptions {
749                    check_bounds: false,
750                }),
751            )
752            .context(ComputeArrowSnafu)?
753            .as_any()
754            .downcast_ref::<BinaryArray>()
755            .unwrap(),
756        )?) as ArrayRef;
757
758        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
759        for arr in self.fields {
760            arrays.push(
761                arrow::compute::take(
762                    &arr,
763                    &indices,
764                    Some(TakeOptions {
765                        check_bounds: false,
766                    }),
767                )
768                .context(ComputeArrowSnafu)?,
769            );
770        }
771
772        let timestamp = arrow::compute::take(
773            &self.timestamp,
774            &indices,
775            Some(TakeOptions {
776                check_bounds: false,
777            }),
778        )
779        .context(ComputeArrowSnafu)?;
780
781        arrays.push(timestamp);
782        arrays.push(pk_dictionary);
783        arrays.push(
784            arrow::compute::take(
785                &self.sequence,
786                &indices,
787                Some(TakeOptions {
788                    check_bounds: false,
789                }),
790            )
791            .context(ComputeArrowSnafu)?,
792        );
793
794        arrays.push(
795            arrow::compute::take(
796                &self.op_type,
797                &indices,
798                Some(TakeOptions {
799                    check_bounds: false,
800                }),
801            )
802            .context(ComputeArrowSnafu)?,
803        );
804
805        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
806        Ok((batch, min_timestamp, max_timestamp))
807    }
808}
809
810/// Converts timestamp array to an iter of i64 values.
811fn timestamp_array_to_iter(
812    timestamp_unit: TimeUnit,
813    timestamp: &ArrayRef,
814) -> impl Iterator<Item = &i64> {
815    match timestamp_unit {
816        // safety: timestamp column must be valid.
817        TimeUnit::Second => timestamp
818            .as_any()
819            .downcast_ref::<TimestampSecondArray>()
820            .unwrap()
821            .values()
822            .iter(),
823        TimeUnit::Millisecond => timestamp
824            .as_any()
825            .downcast_ref::<TimestampMillisecondArray>()
826            .unwrap()
827            .values()
828            .iter(),
829        TimeUnit::Microsecond => timestamp
830            .as_any()
831            .downcast_ref::<TimestampMicrosecondArray>()
832            .unwrap()
833            .values()
834            .iter(),
835        TimeUnit::Nanosecond => timestamp
836            .as_any()
837            .downcast_ref::<TimestampNanosecondArray>()
838            .unwrap()
839            .values()
840            .iter(),
841    }
842}
843
844/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
845fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
846    if input.is_empty() {
847        return Ok(DictionaryArray::new(
848            UInt32Array::from(Vec::<u32>::new()),
849            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
850        ));
851    }
852    let mut keys = Vec::with_capacity(16);
853    let mut values = BinaryBuilder::new();
854    let mut prev: usize = 0;
855    keys.push(prev as u32);
856    values.append_value(input.value(prev));
857
858    for current_bytes in input.iter().skip(1) {
859        // safety: encoded pk must present.
860        let current_bytes = current_bytes.unwrap();
861        let prev_bytes = input.value(prev);
862        if current_bytes != prev_bytes {
863            values.append_value(current_bytes);
864            prev += 1;
865        }
866        keys.push(prev as u32);
867    }
868
869    Ok(DictionaryArray::new(
870        UInt32Array::from(keys),
871        Arc::new(values.finish()) as ArrayRef,
872    ))
873}
874
875#[cfg(test)]
876mod tests {
877    use std::collections::VecDeque;
878
879    use api::v1::{Row, WriteHint};
880    use datafusion_common::ScalarValue;
881    use datatypes::arrow::array::Float64Array;
882    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
883    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
884    use store_api::storage::consts::ReservedColumnId;
885
886    use super::*;
887    use crate::memtable::bulk::context::BulkIterContext;
888    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
889    use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
890    use crate::test_util::memtable_util::{
891        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
892    };
893
894    fn check_binary_array_to_dictionary(
895        input: &[&[u8]],
896        expected_keys: &[u32],
897        expected_values: &[&[u8]],
898    ) {
899        let input = BinaryArray::from_iter_values(input.iter());
900        let array = binary_array_to_dictionary(&input).unwrap();
901        assert_eq!(
902            &expected_keys,
903            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
904        );
905        assert_eq!(
906            expected_values,
907            &array
908                .values()
909                .as_any()
910                .downcast_ref::<BinaryArray>()
911                .unwrap()
912                .iter()
913                .map(|v| v.unwrap())
914                .collect::<Vec<_>>()
915        );
916    }
917
918    #[test]
919    fn test_binary_array_to_dictionary() {
920        check_binary_array_to_dictionary(&[], &[], &[]);
921
922        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
923
924        check_binary_array_to_dictionary(
925            &["a".as_bytes(), "a".as_bytes()],
926            &[0, 0],
927            &["a".as_bytes()],
928        );
929
930        check_binary_array_to_dictionary(
931            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
932            &[0, 0, 1],
933            &["a".as_bytes(), "b".as_bytes()],
934        );
935
936        check_binary_array_to_dictionary(
937            &[
938                "a".as_bytes(),
939                "a".as_bytes(),
940                "b".as_bytes(),
941                "c".as_bytes(),
942            ],
943            &[0, 0, 1, 2],
944            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
945        );
946    }
947
948    struct MutationInput<'a> {
949        k0: &'a str,
950        k1: u32,
951        timestamps: &'a [i64],
952        v1: &'a [Option<f64>],
953        sequence: u64,
954    }
955
956    #[derive(Debug, PartialOrd, PartialEq)]
957    struct BatchOutput<'a> {
958        pk_values: &'a [Value],
959        timestamps: &'a [i64],
960        v1: &'a [Option<f64>],
961    }
962
963    fn check_mutations_to_record_batches(
964        input: &[MutationInput],
965        expected: &[BatchOutput],
966        expected_timestamp: (i64, i64),
967        dedup: bool,
968    ) {
969        let metadata = metadata_for_test();
970        let mutations = input
971            .iter()
972            .map(|m| {
973                build_key_values_with_ts_seq_values(
974                    &metadata,
975                    m.k0.to_string(),
976                    m.k1,
977                    m.timestamps.iter().copied(),
978                    m.v1.iter().copied(),
979                    m.sequence,
980                )
981                .mutation
982            })
983            .collect::<Vec<_>>();
984        let total_rows: usize = mutations
985            .iter()
986            .flat_map(|m| m.rows.iter())
987            .map(|r| r.rows.len())
988            .sum();
989
990        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
991
992        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
993            .unwrap()
994            .unwrap();
995        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
996        let mut batches = VecDeque::new();
997        read_format
998            .convert_record_batch(&batch, None, &mut batches)
999            .unwrap();
1000        if !dedup {
1001            assert_eq!(
1002                total_rows,
1003                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1004            );
1005        }
1006        let batch_values = batches
1007            .into_iter()
1008            .map(|b| {
1009                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1010                let timestamps = b
1011                    .timestamps()
1012                    .as_any()
1013                    .downcast_ref::<TimestampMillisecondVector>()
1014                    .unwrap()
1015                    .iter_data()
1016                    .map(|v| v.unwrap().0.value())
1017                    .collect::<Vec<_>>();
1018                let float_values = b.fields()[1]
1019                    .data
1020                    .as_any()
1021                    .downcast_ref::<Float64Vector>()
1022                    .unwrap()
1023                    .iter_data()
1024                    .collect::<Vec<_>>();
1025
1026                (pk_values, timestamps, float_values)
1027            })
1028            .collect::<Vec<_>>();
1029        assert_eq!(expected.len(), batch_values.len());
1030
1031        for idx in 0..expected.len() {
1032            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1033            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1034            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1035        }
1036    }
1037
1038    #[test]
1039    fn test_mutations_to_record_batch() {
1040        check_mutations_to_record_batches(
1041            &[MutationInput {
1042                k0: "a",
1043                k1: 0,
1044                timestamps: &[0],
1045                v1: &[Some(0.1)],
1046                sequence: 0,
1047            }],
1048            &[BatchOutput {
1049                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1050                timestamps: &[0],
1051                v1: &[Some(0.1)],
1052            }],
1053            (0, 0),
1054            true,
1055        );
1056
1057        check_mutations_to_record_batches(
1058            &[
1059                MutationInput {
1060                    k0: "a",
1061                    k1: 0,
1062                    timestamps: &[0],
1063                    v1: &[Some(0.1)],
1064                    sequence: 0,
1065                },
1066                MutationInput {
1067                    k0: "b",
1068                    k1: 0,
1069                    timestamps: &[0],
1070                    v1: &[Some(0.0)],
1071                    sequence: 0,
1072                },
1073                MutationInput {
1074                    k0: "a",
1075                    k1: 0,
1076                    timestamps: &[1],
1077                    v1: &[Some(0.2)],
1078                    sequence: 1,
1079                },
1080                MutationInput {
1081                    k0: "a",
1082                    k1: 1,
1083                    timestamps: &[1],
1084                    v1: &[Some(0.3)],
1085                    sequence: 2,
1086                },
1087            ],
1088            &[
1089                BatchOutput {
1090                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1091                    timestamps: &[0, 1],
1092                    v1: &[Some(0.1), Some(0.2)],
1093                },
1094                BatchOutput {
1095                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1096                    timestamps: &[1],
1097                    v1: &[Some(0.3)],
1098                },
1099                BatchOutput {
1100                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1101                    timestamps: &[0],
1102                    v1: &[Some(0.0)],
1103                },
1104            ],
1105            (0, 1),
1106            true,
1107        );
1108
1109        check_mutations_to_record_batches(
1110            &[
1111                MutationInput {
1112                    k0: "a",
1113                    k1: 0,
1114                    timestamps: &[0],
1115                    v1: &[Some(0.1)],
1116                    sequence: 0,
1117                },
1118                MutationInput {
1119                    k0: "b",
1120                    k1: 0,
1121                    timestamps: &[0],
1122                    v1: &[Some(0.0)],
1123                    sequence: 0,
1124                },
1125                MutationInput {
1126                    k0: "a",
1127                    k1: 0,
1128                    timestamps: &[0],
1129                    v1: &[Some(0.2)],
1130                    sequence: 1,
1131                },
1132            ],
1133            &[
1134                BatchOutput {
1135                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1136                    timestamps: &[0],
1137                    v1: &[Some(0.2)],
1138                },
1139                BatchOutput {
1140                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1141                    timestamps: &[0],
1142                    v1: &[Some(0.0)],
1143                },
1144            ],
1145            (0, 0),
1146            true,
1147        );
1148        check_mutations_to_record_batches(
1149            &[
1150                MutationInput {
1151                    k0: "a",
1152                    k1: 0,
1153                    timestamps: &[0],
1154                    v1: &[Some(0.1)],
1155                    sequence: 0,
1156                },
1157                MutationInput {
1158                    k0: "b",
1159                    k1: 0,
1160                    timestamps: &[0],
1161                    v1: &[Some(0.0)],
1162                    sequence: 0,
1163                },
1164                MutationInput {
1165                    k0: "a",
1166                    k1: 0,
1167                    timestamps: &[0],
1168                    v1: &[Some(0.2)],
1169                    sequence: 1,
1170                },
1171            ],
1172            &[
1173                BatchOutput {
1174                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1175                    timestamps: &[0, 0],
1176                    v1: &[Some(0.2), Some(0.1)],
1177                },
1178                BatchOutput {
1179                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1180                    timestamps: &[0],
1181                    v1: &[Some(0.0)],
1182                },
1183            ],
1184            (0, 0),
1185            false,
1186        );
1187    }
1188
1189    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1190        let metadata = metadata_for_test();
1191        let kvs = input
1192            .iter()
1193            .map(|m| {
1194                build_key_values_with_ts_seq_values(
1195                    &metadata,
1196                    m.k0.to_string(),
1197                    m.k1,
1198                    m.timestamps.iter().copied(),
1199                    m.v1.iter().copied(),
1200                    m.sequence,
1201                )
1202            })
1203            .collect::<Vec<_>>();
1204        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1205        let primary_key_codec = build_primary_key_codec(&metadata);
1206        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1207        for kv in kvs {
1208            converter.append_key_values(&kv).unwrap();
1209        }
1210        let part = converter.convert().unwrap();
1211        let encoder = BulkPartEncoder::new(metadata, 1024);
1212        encoder.encode_part(&part).unwrap().unwrap()
1213    }
1214
1215    #[test]
1216    fn test_write_and_read_part_projection() {
1217        let part = encode(&[
1218            MutationInput {
1219                k0: "a",
1220                k1: 0,
1221                timestamps: &[1],
1222                v1: &[Some(0.1)],
1223                sequence: 0,
1224            },
1225            MutationInput {
1226                k0: "b",
1227                k1: 0,
1228                timestamps: &[1],
1229                v1: &[Some(0.0)],
1230                sequence: 0,
1231            },
1232            MutationInput {
1233                k0: "a",
1234                k1: 0,
1235                timestamps: &[2],
1236                v1: &[Some(0.2)],
1237                sequence: 1,
1238            },
1239        ]);
1240
1241        let projection = &[4u32];
1242        let mut reader = part
1243            .read(
1244                Arc::new(BulkIterContext::new(
1245                    part.metadata.region_metadata.clone(),
1246                    &Some(projection.as_slice()),
1247                    None,
1248                )),
1249                None,
1250            )
1251            .unwrap()
1252            .expect("expect at least one row group");
1253
1254        let mut total_rows_read = 0;
1255        let mut field: Vec<f64> = vec![];
1256        for res in reader {
1257            let batch = res.unwrap();
1258            assert_eq!(5, batch.num_columns());
1259            field.extend_from_slice(
1260                batch
1261                    .column(0)
1262                    .as_any()
1263                    .downcast_ref::<Float64Array>()
1264                    .unwrap()
1265                    .values(),
1266            );
1267            total_rows_read += batch.num_rows();
1268        }
1269        assert_eq!(3, total_rows_read);
1270        assert_eq!(vec![0.1, 0.2, 0.0], field);
1271    }
1272
1273    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1274        let metadata = metadata_for_test();
1275        let kvs = key_values
1276            .into_iter()
1277            .map(|(k0, k1, (start, end), sequence)| {
1278                let ts = (start..end);
1279                let v1 = (start..end).map(|_| None);
1280                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1281            })
1282            .collect::<Vec<_>>();
1283        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1284        let primary_key_codec = build_primary_key_codec(&metadata);
1285        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1286        for kv in kvs {
1287            converter.append_key_values(&kv).unwrap();
1288        }
1289        let part = converter.convert().unwrap();
1290        let encoder = BulkPartEncoder::new(metadata, 1024);
1291        encoder.encode_part(&part).unwrap().unwrap()
1292    }
1293
1294    fn check_prune_row_group(
1295        part: &EncodedBulkPart,
1296        predicate: Option<Predicate>,
1297        expected_rows: usize,
1298    ) {
1299        let context = Arc::new(BulkIterContext::new(
1300            part.metadata.region_metadata.clone(),
1301            &None,
1302            predicate,
1303        ));
1304        let mut reader = part
1305            .read(context, None)
1306            .unwrap()
1307            .expect("expect at least one row group");
1308        let mut total_rows_read = 0;
1309        for res in reader {
1310            let batch = res.unwrap();
1311            total_rows_read += batch.num_rows();
1312        }
1313        // Should only read row group 1.
1314        assert_eq!(expected_rows, total_rows_read);
1315    }
1316
1317    #[test]
1318    fn test_prune_row_groups() {
1319        let part = prepare(vec![
1320            ("a", 0, (0, 40), 1),
1321            ("a", 1, (0, 60), 1),
1322            ("b", 0, (0, 100), 2),
1323            ("b", 1, (100, 180), 3),
1324            ("b", 1, (180, 210), 4),
1325        ]);
1326
1327        let context = Arc::new(BulkIterContext::new(
1328            part.metadata.region_metadata.clone(),
1329            &None,
1330            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1331                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1332            )])),
1333        ));
1334        assert!(part.read(context, None).unwrap().is_none());
1335
1336        check_prune_row_group(&part, None, 310);
1337
1338        check_prune_row_group(
1339            &part,
1340            Some(Predicate::new(vec![
1341                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1342                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1343            ])),
1344            40,
1345        );
1346
1347        check_prune_row_group(
1348            &part,
1349            Some(Predicate::new(vec![
1350                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1351                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1352            ])),
1353            60,
1354        );
1355
1356        check_prune_row_group(
1357            &part,
1358            Some(Predicate::new(vec![
1359                datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1360            ])),
1361            100,
1362        );
1363
1364        check_prune_row_group(
1365            &part,
1366            Some(Predicate::new(vec![
1367                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1368                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1369            ])),
1370            100,
1371        );
1372
1373        /// Predicates over field column can do precise filtering.
1374        check_prune_row_group(
1375            &part,
1376            Some(Predicate::new(vec![
1377                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1378            ])),
1379            1,
1380        );
1381    }
1382
1383    #[test]
1384    fn test_bulk_part_converter_append_and_convert() {
1385        let metadata = metadata_for_test();
1386        let capacity = 100;
1387        let primary_key_codec = build_primary_key_codec(&metadata);
1388        let schema = to_flat_sst_arrow_schema(
1389            &metadata,
1390            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1391        );
1392
1393        let mut converter =
1394            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1395
1396        let key_values1 = build_key_values_with_ts_seq_values(
1397            &metadata,
1398            "key1".to_string(),
1399            1u32,
1400            vec![1000, 2000].into_iter(),
1401            vec![Some(1.0), Some(2.0)].into_iter(),
1402            1,
1403        );
1404
1405        let key_values2 = build_key_values_with_ts_seq_values(
1406            &metadata,
1407            "key2".to_string(),
1408            2u32,
1409            vec![1500].into_iter(),
1410            vec![Some(3.0)].into_iter(),
1411            2,
1412        );
1413
1414        converter.append_key_values(&key_values1).unwrap();
1415        converter.append_key_values(&key_values2).unwrap();
1416
1417        let bulk_part = converter.convert().unwrap();
1418
1419        assert_eq!(bulk_part.num_rows(), 3);
1420        assert_eq!(bulk_part.min_ts, 1000);
1421        assert_eq!(bulk_part.max_ts, 2000);
1422        assert_eq!(bulk_part.sequence, 2);
1423        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1424
1425        // Validate primary key columns are stored
1426        // Schema should include primary key columns k0 and k1 at the beginning
1427        let schema = bulk_part.batch.schema();
1428        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1429        assert_eq!(
1430            field_names,
1431            vec![
1432                "k0",
1433                "k1",
1434                "v0",
1435                "v1",
1436                "ts",
1437                "__primary_key",
1438                "__sequence",
1439                "__op_type"
1440            ]
1441        );
1442    }
1443
1444    #[test]
1445    fn test_bulk_part_converter_sorting() {
1446        let metadata = metadata_for_test();
1447        let capacity = 100;
1448        let primary_key_codec = build_primary_key_codec(&metadata);
1449        let schema = to_flat_sst_arrow_schema(
1450            &metadata,
1451            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1452        );
1453
1454        let mut converter =
1455            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1456
1457        let key_values1 = build_key_values_with_ts_seq_values(
1458            &metadata,
1459            "z_key".to_string(),
1460            3u32,
1461            vec![3000].into_iter(),
1462            vec![Some(3.0)].into_iter(),
1463            3,
1464        );
1465
1466        let key_values2 = build_key_values_with_ts_seq_values(
1467            &metadata,
1468            "a_key".to_string(),
1469            1u32,
1470            vec![1000].into_iter(),
1471            vec![Some(1.0)].into_iter(),
1472            1,
1473        );
1474
1475        let key_values3 = build_key_values_with_ts_seq_values(
1476            &metadata,
1477            "m_key".to_string(),
1478            2u32,
1479            vec![2000].into_iter(),
1480            vec![Some(2.0)].into_iter(),
1481            2,
1482        );
1483
1484        converter.append_key_values(&key_values1).unwrap();
1485        converter.append_key_values(&key_values2).unwrap();
1486        converter.append_key_values(&key_values3).unwrap();
1487
1488        let bulk_part = converter.convert().unwrap();
1489
1490        assert_eq!(bulk_part.num_rows(), 3);
1491
1492        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1493        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1494
1495        let ts_array = ts_column
1496            .as_any()
1497            .downcast_ref::<TimestampMillisecondArray>()
1498            .unwrap();
1499        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1500
1501        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1502        assert_eq!(seq_array.values(), &[1, 2, 3]);
1503
1504        // Validate primary key columns are stored
1505        let schema = bulk_part.batch.schema();
1506        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1507        assert_eq!(
1508            field_names,
1509            vec![
1510                "k0",
1511                "k1",
1512                "v0",
1513                "v1",
1514                "ts",
1515                "__primary_key",
1516                "__sequence",
1517                "__op_type"
1518            ]
1519        );
1520    }
1521
1522    #[test]
1523    fn test_bulk_part_converter_empty() {
1524        let metadata = metadata_for_test();
1525        let capacity = 10;
1526        let primary_key_codec = build_primary_key_codec(&metadata);
1527        let schema = to_flat_sst_arrow_schema(
1528            &metadata,
1529            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1530        );
1531
1532        let converter =
1533            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1534
1535        let bulk_part = converter.convert().unwrap();
1536
1537        assert_eq!(bulk_part.num_rows(), 0);
1538        assert_eq!(bulk_part.min_ts, i64::MAX);
1539        assert_eq!(bulk_part.max_ts, i64::MIN);
1540        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1541
1542        // Validate primary key columns are present in schema even for empty batch
1543        let schema = bulk_part.batch.schema();
1544        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1545        assert_eq!(
1546            field_names,
1547            vec![
1548                "k0",
1549                "k1",
1550                "v0",
1551                "v1",
1552                "ts",
1553                "__primary_key",
1554                "__sequence",
1555                "__op_type"
1556            ]
1557        );
1558    }
1559
1560    #[test]
1561    fn test_bulk_part_converter_without_primary_key_columns() {
1562        let metadata = metadata_for_test();
1563        let primary_key_codec = build_primary_key_codec(&metadata);
1564        let schema = to_flat_sst_arrow_schema(
1565            &metadata,
1566            &FlatSchemaOptions {
1567                raw_pk_columns: false,
1568                string_pk_use_dict: true,
1569            },
1570        );
1571
1572        let capacity = 100;
1573        let mut converter =
1574            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1575
1576        let key_values1 = build_key_values_with_ts_seq_values(
1577            &metadata,
1578            "key1".to_string(),
1579            1u32,
1580            vec![1000, 2000].into_iter(),
1581            vec![Some(1.0), Some(2.0)].into_iter(),
1582            1,
1583        );
1584
1585        let key_values2 = build_key_values_with_ts_seq_values(
1586            &metadata,
1587            "key2".to_string(),
1588            2u32,
1589            vec![1500].into_iter(),
1590            vec![Some(3.0)].into_iter(),
1591            2,
1592        );
1593
1594        converter.append_key_values(&key_values1).unwrap();
1595        converter.append_key_values(&key_values2).unwrap();
1596
1597        let bulk_part = converter.convert().unwrap();
1598
1599        assert_eq!(bulk_part.num_rows(), 3);
1600        assert_eq!(bulk_part.min_ts, 1000);
1601        assert_eq!(bulk_part.max_ts, 2000);
1602        assert_eq!(bulk_part.sequence, 2);
1603        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1604
1605        // Validate primary key columns are NOT stored individually
1606        let schema = bulk_part.batch.schema();
1607        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1608        assert_eq!(
1609            field_names,
1610            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1611        );
1612    }
1613
1614    #[allow(clippy::too_many_arguments)]
1615    fn build_key_values_with_sparse_encoding(
1616        metadata: &RegionMetadataRef,
1617        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1618        table_id: u32,
1619        tsid: u64,
1620        k0: String,
1621        k1: String,
1622        timestamps: impl Iterator<Item = i64>,
1623        values: impl Iterator<Item = Option<f64>>,
1624        sequence: SequenceNumber,
1625    ) -> KeyValues {
1626        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1627        let pk_values = vec![
1628            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1629            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1630            (0, Value::String(k0.clone().into())),
1631            (1, Value::String(k1.clone().into())),
1632        ];
1633        let mut encoded_key = Vec::new();
1634        primary_key_codec
1635            .encode_values(&pk_values, &mut encoded_key)
1636            .unwrap();
1637        assert!(!encoded_key.is_empty());
1638
1639        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1640        let column_schema = vec![
1641            api::v1::ColumnSchema {
1642                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1643                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1644                    ConcreteDataType::binary_datatype(),
1645                )
1646                .unwrap()
1647                .datatype() as i32,
1648                semantic_type: api::v1::SemanticType::Tag as i32,
1649                ..Default::default()
1650            },
1651            api::v1::ColumnSchema {
1652                column_name: "ts".to_string(),
1653                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1654                    ConcreteDataType::timestamp_millisecond_datatype(),
1655                )
1656                .unwrap()
1657                .datatype() as i32,
1658                semantic_type: api::v1::SemanticType::Timestamp as i32,
1659                ..Default::default()
1660            },
1661            api::v1::ColumnSchema {
1662                column_name: "v0".to_string(),
1663                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1664                    ConcreteDataType::int64_datatype(),
1665                )
1666                .unwrap()
1667                .datatype() as i32,
1668                semantic_type: api::v1::SemanticType::Field as i32,
1669                ..Default::default()
1670            },
1671            api::v1::ColumnSchema {
1672                column_name: "v1".to_string(),
1673                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1674                    ConcreteDataType::float64_datatype(),
1675                )
1676                .unwrap()
1677                .datatype() as i32,
1678                semantic_type: api::v1::SemanticType::Field as i32,
1679                ..Default::default()
1680            },
1681        ];
1682
1683        let rows = timestamps
1684            .zip(values)
1685            .map(|(ts, v)| Row {
1686                values: vec![
1687                    api::v1::Value {
1688                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1689                            encoded_key.clone(),
1690                        )),
1691                    },
1692                    api::v1::Value {
1693                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1694                    },
1695                    api::v1::Value {
1696                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1697                    },
1698                    api::v1::Value {
1699                        value_data: v.map(api::v1::value::ValueData::F64Value),
1700                    },
1701                ],
1702            })
1703            .collect();
1704
1705        let mutation = api::v1::Mutation {
1706            op_type: 1,
1707            sequence,
1708            rows: Some(api::v1::Rows {
1709                schema: column_schema,
1710                rows,
1711            }),
1712            write_hint: Some(WriteHint {
1713                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1714            }),
1715        };
1716        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1717    }
1718
1719    #[test]
1720    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1721        use api::v1::SemanticType;
1722        use datatypes::schema::ColumnSchema;
1723        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1724        use store_api::storage::RegionId;
1725
1726        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1727        builder
1728            .push_column_metadata(ColumnMetadata {
1729                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1730                semantic_type: SemanticType::Tag,
1731                column_id: 0,
1732            })
1733            .push_column_metadata(ColumnMetadata {
1734                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1735                semantic_type: SemanticType::Tag,
1736                column_id: 1,
1737            })
1738            .push_column_metadata(ColumnMetadata {
1739                column_schema: ColumnSchema::new(
1740                    "ts",
1741                    ConcreteDataType::timestamp_millisecond_datatype(),
1742                    false,
1743                ),
1744                semantic_type: SemanticType::Timestamp,
1745                column_id: 2,
1746            })
1747            .push_column_metadata(ColumnMetadata {
1748                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1749                semantic_type: SemanticType::Field,
1750                column_id: 3,
1751            })
1752            .push_column_metadata(ColumnMetadata {
1753                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1754                semantic_type: SemanticType::Field,
1755                column_id: 4,
1756            })
1757            .primary_key(vec![0, 1])
1758            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1759        let metadata = Arc::new(builder.build().unwrap());
1760
1761        let primary_key_codec = build_primary_key_codec(&metadata);
1762        let schema = to_flat_sst_arrow_schema(
1763            &metadata,
1764            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1765        );
1766
1767        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1768        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1769
1770        let capacity = 100;
1771        let mut converter =
1772            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1773
1774        let key_values1 = build_key_values_with_sparse_encoding(
1775            &metadata,
1776            &primary_key_codec,
1777            2048u32, // table_id
1778            100u64,  // tsid
1779            "key11".to_string(),
1780            "key21".to_string(),
1781            vec![1000, 2000].into_iter(),
1782            vec![Some(1.0), Some(2.0)].into_iter(),
1783            1,
1784        );
1785
1786        let key_values2 = build_key_values_with_sparse_encoding(
1787            &metadata,
1788            &primary_key_codec,
1789            4096u32, // table_id
1790            200u64,  // tsid
1791            "key12".to_string(),
1792            "key22".to_string(),
1793            vec![1500].into_iter(),
1794            vec![Some(3.0)].into_iter(),
1795            2,
1796        );
1797
1798        converter.append_key_values(&key_values1).unwrap();
1799        converter.append_key_values(&key_values2).unwrap();
1800
1801        let bulk_part = converter.convert().unwrap();
1802
1803        assert_eq!(bulk_part.num_rows(), 3);
1804        assert_eq!(bulk_part.min_ts, 1000);
1805        assert_eq!(bulk_part.max_ts, 2000);
1806        assert_eq!(bulk_part.sequence, 2);
1807        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1808
1809        // For sparse encoding, primary key columns should NOT be stored individually
1810        // even when store_primary_key_columns is true, because sparse encoding
1811        // stores the encoded primary key in the __primary_key column
1812        let schema = bulk_part.batch.schema();
1813        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1814        assert_eq!(
1815            field_names,
1816            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1817        );
1818
1819        // Verify the __primary_key column contains encoded sparse keys
1820        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1821        let dict_array = primary_key_column
1822            .as_any()
1823            .downcast_ref::<DictionaryArray<UInt32Type>>()
1824            .unwrap();
1825
1826        // Should have non-zero entries indicating encoded primary keys
1827        assert!(!dict_array.is_empty());
1828        assert_eq!(dict_array.len(), 3); // 3 rows total
1829
1830        // Verify values are properly encoded binary data (not empty)
1831        let values = dict_array
1832            .values()
1833            .as_any()
1834            .downcast_ref::<BinaryArray>()
1835            .unwrap();
1836        for i in 0..values.len() {
1837            assert!(
1838                !values.value(i).is_empty(),
1839                "Encoded primary key should not be empty"
1840            );
1841        }
1842    }
1843}