mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datatypes::arrow;
28use datatypes::arrow::array::{
29    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
30    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
31    TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt64Builder,
32    UInt8Array, UInt8Builder,
33};
34use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
35use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::{Value, ValueRef};
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
42use mito_codec::row_converter::{
43    build_primary_key_codec, DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt,
44};
45use parquet::arrow::ArrowWriter;
46use parquet::data_type::AsBytes;
47use parquet::file::metadata::ParquetMetaData;
48use parquet::file::properties::WriterProperties;
49use snafu::{OptionExt, ResultExt, Snafu};
50use store_api::codec::PrimaryKeyEncoding;
51use store_api::metadata::{RegionMetadata, RegionMetadataRef};
52use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
53use store_api::storage::SequenceNumber;
54use table::predicate::Predicate;
55
56use crate::error::{
57    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
58    EncodeSnafu, NewRecordBatchSnafu, Result,
59};
60use crate::memtable::bulk::context::BulkIterContextRef;
61use crate::memtable::bulk::part_reader::BulkPartIter;
62use crate::memtable::time_series::{ValueBuilder, Values};
63use crate::memtable::BoxedBatchIterator;
64use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
65use crate::sst::parquet::helper::parse_parquet_metadata;
66use crate::sst::to_sst_arrow_schema;
67
68const INIT_DICT_VALUE_CAPACITY: usize = 8;
69
70#[derive(Clone)]
71pub struct BulkPart {
72    pub batch: RecordBatch,
73    pub max_ts: i64,
74    pub min_ts: i64,
75    pub sequence: u64,
76    pub timestamp_index: usize,
77    pub raw_data: Option<ArrowIpc>,
78}
79
80impl TryFrom<BulkWalEntry> for BulkPart {
81    type Error = error::Error;
82
83    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
84        match value.body.expect("Entry payload should be present") {
85            Body::ArrowIpc(ipc) => {
86                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
87                    .context(error::ConvertBulkWalEntrySnafu)?;
88                let batch = decoder
89                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
90                    .context(error::ConvertBulkWalEntrySnafu)?;
91                Ok(Self {
92                    batch,
93                    max_ts: value.max_ts,
94                    min_ts: value.min_ts,
95                    sequence: value.sequence,
96                    timestamp_index: value.timestamp_index as usize,
97                    raw_data: Some(ipc),
98                })
99            }
100        }
101    }
102}
103
104impl TryFrom<&BulkPart> for BulkWalEntry {
105    type Error = error::Error;
106
107    fn try_from(value: &BulkPart) -> Result<Self> {
108        if let Some(ipc) = &value.raw_data {
109            Ok(BulkWalEntry {
110                sequence: value.sequence,
111                max_ts: value.max_ts,
112                min_ts: value.min_ts,
113                timestamp_index: value.timestamp_index as u32,
114                body: Some(Body::ArrowIpc(ipc.clone())),
115            })
116        } else {
117            let mut encoder = FlightEncoder::default();
118            let schema_bytes = encoder
119                .encode_schema(value.batch.schema().as_ref())
120                .data_header;
121            let [rb_data] = encoder
122                .encode(FlightMessage::RecordBatch(value.batch.clone()))
123                .try_into()
124                .map_err(|_| {
125                    error::UnsupportedOperationSnafu {
126                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
127                    }
128                    .build()
129                })?;
130            Ok(BulkWalEntry {
131                sequence: value.sequence,
132                max_ts: value.max_ts,
133                min_ts: value.min_ts,
134                timestamp_index: value.timestamp_index as u32,
135                body: Some(Body::ArrowIpc(ArrowIpc {
136                    schema: schema_bytes,
137                    data_header: rb_data.data_header,
138                    payload: rb_data.data_body,
139                })),
140            })
141        }
142    }
143}
144
145impl BulkPart {
146    pub(crate) fn estimated_size(&self) -> usize {
147        self.batch
148            .columns()
149            .iter()
150            // If can not get slice memory size, assume 0 here.
151            .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
152            .sum()
153    }
154
155    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
156    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
157        let vectors = region_metadata
158            .schema
159            .column_schemas()
160            .iter()
161            .map(|col| match self.batch.column_by_name(&col.name) {
162                None => Ok(None),
163                Some(col) => Helper::try_into_vector(col).map(Some),
164            })
165            .collect::<datatypes::error::Result<Vec<_>>>()
166            .context(error::ComputeVectorSnafu)?;
167
168        let rows = (0..self.num_rows())
169            .map(|row_idx| {
170                let values = (0..self.batch.num_columns())
171                    .map(|col_idx| {
172                        if let Some(v) = &vectors[col_idx] {
173                            value_to_grpc_value(v.get(row_idx))
174                        } else {
175                            api::v1::Value { value_data: None }
176                        }
177                    })
178                    .collect::<Vec<_>>();
179                api::v1::Row { values }
180            })
181            .collect::<Vec<_>>();
182
183        let schema = region_metadata
184            .column_metadatas
185            .iter()
186            .map(|c| {
187                let data_type_wrapper =
188                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
189                Ok(api::v1::ColumnSchema {
190                    column_name: c.column_schema.name.clone(),
191                    datatype: data_type_wrapper.datatype() as i32,
192                    semantic_type: c.semantic_type as i32,
193                    ..Default::default()
194                })
195            })
196            .collect::<api::error::Result<Vec<_>>>()
197            .context(error::ConvertColumnDataTypeSnafu {
198                reason: "failed to convert region metadata to column schema",
199            })?;
200
201        let rows = api::v1::Rows { schema, rows };
202
203        Ok(Mutation {
204            op_type: OpType::Put as i32,
205            sequence: self.sequence,
206            rows: Some(rows),
207            write_hint: None,
208        })
209    }
210
211    pub fn timestamps(&self) -> &ArrayRef {
212        self.batch.column(self.timestamp_index)
213    }
214
215    pub fn num_rows(&self) -> usize {
216        self.batch.num_rows()
217    }
218}
219
220/// Builder type for primary key dictionary array.
221type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
222
223/// Primary key column builder for handling strings specially.
224enum PrimaryKeyColumnBuilder {
225    /// String dictionary builder for string types.
226    StringDict(StringDictionaryBuilder<UInt32Type>),
227    /// Generic mutable vector for other types.
228    Vector(Box<dyn MutableVector>),
229}
230
231impl PrimaryKeyColumnBuilder {
232    /// Appends a value to the builder.
233    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
234        match self {
235            PrimaryKeyColumnBuilder::StringDict(builder) => {
236                if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
237                    // We know the value is a string.
238                    builder.append_value(s);
239                } else {
240                    builder.append_null();
241                }
242            }
243            PrimaryKeyColumnBuilder::Vector(builder) => {
244                builder.push_value_ref(value);
245            }
246        }
247        Ok(())
248    }
249
250    /// Converts the builder to an ArrayRef.
251    fn into_arrow_array(self) -> ArrayRef {
252        match self {
253            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
254            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
255        }
256    }
257}
258
259/// Converter that converts structs into [BulkPart].
260pub struct BulkPartConverter {
261    /// Region metadata.
262    region_metadata: RegionMetadataRef,
263    /// Schema of the converted batch.
264    schema: SchemaRef,
265    /// Primary key codec for encoding keys
266    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
267    /// Buffer for encoding primary key.
268    key_buf: Vec<u8>,
269    /// Primary key array builder.
270    key_array_builder: PrimaryKeyArrayBuilder,
271    /// Builders for non-primary key columns.
272    value_builder: ValueBuilder,
273    /// Builders for individual primary key columns.
274    /// The order of builders is the same as the order of primary key columns in the region metadata.
275    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
276
277    /// Max timestamp value.
278    max_ts: i64,
279    /// Min timestamp value.
280    min_ts: i64,
281    /// Max sequence number.
282    max_sequence: SequenceNumber,
283}
284
285impl BulkPartConverter {
286    /// Creates a new converter.
287    ///
288    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
289    /// stores primary key columns in arrays additionally.
290    pub fn new(
291        region_metadata: &RegionMetadataRef,
292        schema: SchemaRef,
293        capacity: usize,
294        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
295        store_primary_key_columns: bool,
296    ) -> Self {
297        debug_assert_eq!(
298            region_metadata.primary_key_encoding,
299            primary_key_codec.encoding()
300        );
301
302        let primary_key_column_builders = if store_primary_key_columns
303            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
304        {
305            new_primary_key_column_builders(region_metadata, capacity)
306        } else {
307            Vec::new()
308        };
309
310        Self {
311            region_metadata: region_metadata.clone(),
312            schema,
313            primary_key_codec,
314            key_buf: Vec::new(),
315            key_array_builder: PrimaryKeyArrayBuilder::new(),
316            value_builder: ValueBuilder::new(region_metadata, capacity),
317            primary_key_column_builders,
318            min_ts: i64::MAX,
319            max_ts: i64::MIN,
320            max_sequence: SequenceNumber::MIN,
321        }
322    }
323
324    /// Appends a [KeyValues] into the converter.
325    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
326        for kv in key_values.iter() {
327            self.append_key_value(&kv)?;
328        }
329
330        Ok(())
331    }
332
333    /// Appends a [KeyValue] to builders.
334    ///
335    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
336    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
337        // Handles primary key based on encoding type
338        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
339            // For sparse encoding, the primary key is already encoded in the KeyValue
340            // Gets the first (and only) primary key value which contains the encoded key
341            let mut primary_keys = kv.primary_keys();
342            if let Some(encoded) = primary_keys
343                .next()
344                .context(ColumnNotFoundSnafu {
345                    column: PRIMARY_KEY_COLUMN_NAME,
346                })?
347                .as_binary()
348                .context(DataTypeMismatchSnafu)?
349            {
350                self.key_array_builder
351                    .append(encoded)
352                    .context(ComputeArrowSnafu)?;
353            } else {
354                self.key_array_builder
355                    .append("")
356                    .context(ComputeArrowSnafu)?;
357            }
358        } else {
359            // For dense encoding, we need to encode the primary key columns
360            self.key_buf.clear();
361            self.primary_key_codec
362                .encode_key_value(kv, &mut self.key_buf)
363                .context(EncodeSnafu)?;
364            self.key_array_builder
365                .append(&self.key_buf)
366                .context(ComputeArrowSnafu)?;
367        };
368
369        // If storing primary key columns, append values to individual builders
370        if !self.primary_key_column_builders.is_empty() {
371            for (builder, pk_value) in self
372                .primary_key_column_builders
373                .iter_mut()
374                .zip(kv.primary_keys())
375            {
376                builder.push_value_ref(pk_value)?;
377            }
378        }
379
380        // Pushes other columns.
381        self.value_builder.push(
382            kv.timestamp(),
383            kv.sequence(),
384            kv.op_type() as u8,
385            kv.fields(),
386        );
387
388        // Updates statistics
389        // Safety: timestamp of kv must be both present and a valid timestamp value.
390        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
391        self.min_ts = self.min_ts.min(ts);
392        self.max_ts = self.max_ts.max(ts);
393        self.max_sequence = self.max_sequence.max(kv.sequence());
394
395        Ok(())
396    }
397
398    /// Converts buffered content into a [BulkPart].
399    ///
400    /// It sorts the record batch by (primary key, timestamp, sequence desc).
401    pub fn convert(mut self) -> Result<BulkPart> {
402        let values = Values::from(self.value_builder);
403        let mut columns =
404            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
405
406        // Build primary key column arrays if enabled.
407        for builder in self.primary_key_column_builders {
408            columns.push(builder.into_arrow_array());
409        }
410        // Then fields columns.
411        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
412        // Time index.
413        let timestamp_index = columns.len();
414        columns.push(values.timestamp.to_arrow_array());
415        // Primary key.
416        let pk_array = self.key_array_builder.finish();
417        columns.push(Arc::new(pk_array));
418        // Sequence and op type.
419        columns.push(values.sequence.to_arrow_array());
420        columns.push(values.op_type.to_arrow_array());
421
422        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
423        // Sorts the record batch.
424        let batch = sort_primary_key_record_batch(&batch)?;
425
426        Ok(BulkPart {
427            batch,
428            max_ts: self.max_ts,
429            min_ts: self.min_ts,
430            sequence: self.max_sequence,
431            timestamp_index,
432            raw_data: None,
433        })
434    }
435}
436
437fn new_primary_key_column_builders(
438    metadata: &RegionMetadata,
439    capacity: usize,
440) -> Vec<PrimaryKeyColumnBuilder> {
441    metadata
442        .primary_key_columns()
443        .map(|col| {
444            if col.column_schema.data_type.is_string() {
445                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
446                    capacity,
447                    INIT_DICT_VALUE_CAPACITY,
448                    capacity,
449                ))
450            } else {
451                PrimaryKeyColumnBuilder::Vector(
452                    col.column_schema.data_type.create_mutable_vector(capacity),
453                )
454            }
455        })
456        .collect()
457}
458
459/// Sorts the record batch with primary key format.
460fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
461    let total_columns = batch.num_columns();
462    let sort_columns = vec![
463        // Primary key column (ascending)
464        SortColumn {
465            values: batch.column(total_columns - 3).clone(),
466            options: Some(SortOptions {
467                descending: false,
468                nulls_first: true,
469            }),
470        },
471        // Time index column (ascending)
472        SortColumn {
473            values: batch.column(total_columns - 4).clone(),
474            options: Some(SortOptions {
475                descending: false,
476                nulls_first: true,
477            }),
478        },
479        // Sequence column (descending)
480        SortColumn {
481            values: batch.column(total_columns - 2).clone(),
482            options: Some(SortOptions {
483                descending: true,
484                nulls_first: true,
485            }),
486        },
487    ];
488
489    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
490        .context(ComputeArrowSnafu)?;
491
492    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
493}
494
495#[derive(Debug)]
496pub struct EncodedBulkPart {
497    data: Bytes,
498    metadata: BulkPartMeta,
499}
500
501impl EncodedBulkPart {
502    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
503        Self { data, metadata }
504    }
505
506    pub(crate) fn metadata(&self) -> &BulkPartMeta {
507        &self.metadata
508    }
509
510    pub(crate) fn read(
511        &self,
512        context: BulkIterContextRef,
513        sequence: Option<SequenceNumber>,
514    ) -> Result<Option<BoxedBatchIterator>> {
515        // use predicate to find row groups to read.
516        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
517
518        if row_groups_to_read.is_empty() {
519            // All row groups are filtered.
520            return Ok(None);
521        }
522
523        let iter = BulkPartIter::try_new(
524            context,
525            row_groups_to_read,
526            self.metadata.parquet_metadata.clone(),
527            self.data.clone(),
528            sequence,
529        )?;
530        Ok(Some(Box::new(iter) as BoxedBatchIterator))
531    }
532}
533
534#[derive(Debug)]
535pub struct BulkPartMeta {
536    /// Total rows in part.
537    pub num_rows: usize,
538    /// Max timestamp in part.
539    pub max_timestamp: i64,
540    /// Min timestamp in part.
541    pub min_timestamp: i64,
542    /// Part file metadata.
543    pub parquet_metadata: Arc<ParquetMetaData>,
544    /// Part region schema.
545    pub region_metadata: RegionMetadataRef,
546}
547
548pub struct BulkPartEncoder {
549    metadata: RegionMetadataRef,
550    pk_encoder: DensePrimaryKeyCodec,
551    row_group_size: usize,
552    dedup: bool,
553    writer_props: Option<WriterProperties>,
554}
555
556impl BulkPartEncoder {
557    pub(crate) fn new(
558        metadata: RegionMetadataRef,
559        dedup: bool,
560        row_group_size: usize,
561    ) -> BulkPartEncoder {
562        let codec = DensePrimaryKeyCodec::new(&metadata);
563        let writer_props = Some(
564            WriterProperties::builder()
565                .set_write_batch_size(row_group_size)
566                .set_max_row_group_size(row_group_size)
567                .build(),
568        );
569        Self {
570            metadata,
571            pk_encoder: codec,
572            row_group_size,
573            dedup,
574            writer_props,
575        }
576    }
577}
578
579impl BulkPartEncoder {
580    /// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`.
581    fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
582        let Some((arrow_record_batch, min_ts, max_ts)) =
583            mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
584        else {
585            return Ok(None);
586        };
587
588        let mut buf = Vec::with_capacity(4096);
589        let arrow_schema = arrow_record_batch.schema();
590
591        let file_metadata = {
592            let mut writer =
593                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
594                    .context(EncodeMemtableSnafu)?;
595            writer
596                .write(&arrow_record_batch)
597                .context(EncodeMemtableSnafu)?;
598            writer.finish().context(EncodeMemtableSnafu)?
599        };
600
601        let buf = Bytes::from(buf);
602        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
603
604        Ok(Some(EncodedBulkPart {
605            data: buf,
606            metadata: BulkPartMeta {
607                num_rows: arrow_record_batch.num_rows(),
608                max_timestamp: max_ts,
609                min_timestamp: min_ts,
610                parquet_metadata,
611                region_metadata: self.metadata.clone(),
612            },
613        }))
614    }
615}
616
617/// Converts mutations to record batches.
618fn mutations_to_record_batch(
619    mutations: &[Mutation],
620    metadata: &RegionMetadataRef,
621    pk_encoder: &DensePrimaryKeyCodec,
622    dedup: bool,
623) -> Result<Option<(RecordBatch, i64, i64)>> {
624    let total_rows: usize = mutations
625        .iter()
626        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
627        .sum();
628
629    if total_rows == 0 {
630        return Ok(None);
631    }
632
633    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
634
635    let mut ts_vector: Box<dyn MutableVector> = metadata
636        .time_index_column()
637        .column_schema
638        .data_type
639        .create_mutable_vector(total_rows);
640    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
641    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
642
643    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
644        .field_columns()
645        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
646        .collect();
647
648    let mut pk_buffer = vec![];
649    for m in mutations {
650        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
651            continue;
652        };
653
654        for row in key_values.iter() {
655            pk_buffer.clear();
656            pk_encoder
657                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
658                .context(EncodeSnafu)?;
659            pk_builder.append_value(pk_buffer.as_bytes());
660            ts_vector.push_value_ref(row.timestamp());
661            sequence_builder.append_value(row.sequence());
662            op_type_builder.append_value(row.op_type() as u8);
663            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
664                builder.push_value_ref(field);
665            }
666        }
667    }
668
669    let arrow_schema = to_sst_arrow_schema(metadata);
670    // safety: timestamp column must be valid, and values must not be None.
671    let timestamp_unit = metadata
672        .time_index_column()
673        .column_schema
674        .data_type
675        .as_timestamp()
676        .unwrap()
677        .unit();
678    let sorter = ArraysSorter {
679        encoded_primary_keys: pk_builder.finish(),
680        timestamp_unit,
681        timestamp: ts_vector.to_vector().to_arrow_array(),
682        sequence: sequence_builder.finish(),
683        op_type: op_type_builder.finish(),
684        fields: field_builders
685            .iter_mut()
686            .map(|f| f.to_vector().to_arrow_array()),
687        dedup,
688        arrow_schema,
689    };
690
691    sorter.sort().map(Some)
692}
693
694struct ArraysSorter<I> {
695    encoded_primary_keys: BinaryArray,
696    timestamp_unit: TimeUnit,
697    timestamp: ArrayRef,
698    sequence: UInt64Array,
699    op_type: UInt8Array,
700    fields: I,
701    dedup: bool,
702    arrow_schema: SchemaRef,
703}
704
705impl<I> ArraysSorter<I>
706where
707    I: Iterator<Item = ArrayRef>,
708{
709    /// Converts arrays to record batch.
710    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
711        debug_assert!(!self.timestamp.is_empty());
712        debug_assert!(self.timestamp.len() == self.sequence.len());
713        debug_assert!(self.timestamp.len() == self.op_type.len());
714        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
715
716        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
717        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
718        let mut to_sort = self
719            .encoded_primary_keys
720            .iter()
721            .zip(timestamp_iter)
722            .zip(self.sequence.iter())
723            .map(|((pk, timestamp), sequence)| {
724                max_timestamp = max_timestamp.max(*timestamp);
725                min_timestamp = min_timestamp.min(*timestamp);
726                (pk, timestamp, sequence)
727            })
728            .enumerate()
729            .collect::<Vec<_>>();
730
731        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
732            l_pk.cmp(r_pk)
733                .then(l_ts.cmp(r_ts))
734                .then(l_seq.cmp(r_seq).reverse())
735        });
736
737        if self.dedup {
738            // Dedup by timestamps while ignore sequence.
739            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
740                l_pk == r_pk && l_ts == r_ts
741            });
742        }
743
744        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
745
746        let pk_dictionary = Arc::new(binary_array_to_dictionary(
747            // safety: pk must be BinaryArray
748            arrow::compute::take(
749                &self.encoded_primary_keys,
750                &indices,
751                Some(TakeOptions {
752                    check_bounds: false,
753                }),
754            )
755            .context(ComputeArrowSnafu)?
756            .as_any()
757            .downcast_ref::<BinaryArray>()
758            .unwrap(),
759        )?) as ArrayRef;
760
761        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
762        for arr in self.fields {
763            arrays.push(
764                arrow::compute::take(
765                    &arr,
766                    &indices,
767                    Some(TakeOptions {
768                        check_bounds: false,
769                    }),
770                )
771                .context(ComputeArrowSnafu)?,
772            );
773        }
774
775        let timestamp = arrow::compute::take(
776            &self.timestamp,
777            &indices,
778            Some(TakeOptions {
779                check_bounds: false,
780            }),
781        )
782        .context(ComputeArrowSnafu)?;
783
784        arrays.push(timestamp);
785        arrays.push(pk_dictionary);
786        arrays.push(
787            arrow::compute::take(
788                &self.sequence,
789                &indices,
790                Some(TakeOptions {
791                    check_bounds: false,
792                }),
793            )
794            .context(ComputeArrowSnafu)?,
795        );
796
797        arrays.push(
798            arrow::compute::take(
799                &self.op_type,
800                &indices,
801                Some(TakeOptions {
802                    check_bounds: false,
803                }),
804            )
805            .context(ComputeArrowSnafu)?,
806        );
807
808        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
809        Ok((batch, min_timestamp, max_timestamp))
810    }
811}
812
813/// Converts timestamp array to an iter of i64 values.
814fn timestamp_array_to_iter(
815    timestamp_unit: TimeUnit,
816    timestamp: &ArrayRef,
817) -> impl Iterator<Item = &i64> {
818    match timestamp_unit {
819        // safety: timestamp column must be valid.
820        TimeUnit::Second => timestamp
821            .as_any()
822            .downcast_ref::<TimestampSecondArray>()
823            .unwrap()
824            .values()
825            .iter(),
826        TimeUnit::Millisecond => timestamp
827            .as_any()
828            .downcast_ref::<TimestampMillisecondArray>()
829            .unwrap()
830            .values()
831            .iter(),
832        TimeUnit::Microsecond => timestamp
833            .as_any()
834            .downcast_ref::<TimestampMicrosecondArray>()
835            .unwrap()
836            .values()
837            .iter(),
838        TimeUnit::Nanosecond => timestamp
839            .as_any()
840            .downcast_ref::<TimestampNanosecondArray>()
841            .unwrap()
842            .values()
843            .iter(),
844    }
845}
846
847/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
848fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
849    if input.is_empty() {
850        return Ok(DictionaryArray::new(
851            UInt32Array::from(Vec::<u32>::new()),
852            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
853        ));
854    }
855    let mut keys = Vec::with_capacity(16);
856    let mut values = BinaryBuilder::new();
857    let mut prev: usize = 0;
858    keys.push(prev as u32);
859    values.append_value(input.value(prev));
860
861    for current_bytes in input.iter().skip(1) {
862        // safety: encoded pk must present.
863        let current_bytes = current_bytes.unwrap();
864        let prev_bytes = input.value(prev);
865        if current_bytes != prev_bytes {
866            values.append_value(current_bytes);
867            prev += 1;
868        }
869        keys.push(prev as u32);
870    }
871
872    Ok(DictionaryArray::new(
873        UInt32Array::from(keys),
874        Arc::new(values.finish()) as ArrayRef,
875    ))
876}
877
878#[cfg(test)]
879mod tests {
880    use std::collections::VecDeque;
881
882    use api::v1::{Row, WriteHint};
883    use datafusion_common::ScalarValue;
884    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
885    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
886    use store_api::storage::consts::ReservedColumnId;
887
888    use super::*;
889    use crate::memtable::bulk::context::BulkIterContext;
890    use crate::sst::parquet::format::ReadFormat;
891    use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
892    use crate::test_util::memtable_util::{
893        build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key,
894        region_metadata_to_row_schema,
895    };
896
897    fn check_binary_array_to_dictionary(
898        input: &[&[u8]],
899        expected_keys: &[u32],
900        expected_values: &[&[u8]],
901    ) {
902        let input = BinaryArray::from_iter_values(input.iter());
903        let array = binary_array_to_dictionary(&input).unwrap();
904        assert_eq!(
905            &expected_keys,
906            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
907        );
908        assert_eq!(
909            expected_values,
910            &array
911                .values()
912                .as_any()
913                .downcast_ref::<BinaryArray>()
914                .unwrap()
915                .iter()
916                .map(|v| v.unwrap())
917                .collect::<Vec<_>>()
918        );
919    }
920
921    #[test]
922    fn test_binary_array_to_dictionary() {
923        check_binary_array_to_dictionary(&[], &[], &[]);
924
925        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
926
927        check_binary_array_to_dictionary(
928            &["a".as_bytes(), "a".as_bytes()],
929            &[0, 0],
930            &["a".as_bytes()],
931        );
932
933        check_binary_array_to_dictionary(
934            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
935            &[0, 0, 1],
936            &["a".as_bytes(), "b".as_bytes()],
937        );
938
939        check_binary_array_to_dictionary(
940            &[
941                "a".as_bytes(),
942                "a".as_bytes(),
943                "b".as_bytes(),
944                "c".as_bytes(),
945            ],
946            &[0, 0, 1, 2],
947            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
948        );
949    }
950
951    struct MutationInput<'a> {
952        k0: &'a str,
953        k1: u32,
954        timestamps: &'a [i64],
955        v1: &'a [Option<f64>],
956        sequence: u64,
957    }
958
959    #[derive(Debug, PartialOrd, PartialEq)]
960    struct BatchOutput<'a> {
961        pk_values: &'a [Value],
962        timestamps: &'a [i64],
963        v1: &'a [Option<f64>],
964    }
965
966    fn check_mutations_to_record_batches(
967        input: &[MutationInput],
968        expected: &[BatchOutput],
969        expected_timestamp: (i64, i64),
970        dedup: bool,
971    ) {
972        let metadata = metadata_for_test();
973        let mutations = input
974            .iter()
975            .map(|m| {
976                build_key_values_with_ts_seq_values(
977                    &metadata,
978                    m.k0.to_string(),
979                    m.k1,
980                    m.timestamps.iter().copied(),
981                    m.v1.iter().copied(),
982                    m.sequence,
983                )
984                .mutation
985            })
986            .collect::<Vec<_>>();
987        let total_rows: usize = mutations
988            .iter()
989            .flat_map(|m| m.rows.iter())
990            .map(|r| r.rows.len())
991            .sum();
992
993        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
994
995        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
996            .unwrap()
997            .unwrap();
998        let read_format = ReadFormat::new_with_all_columns(metadata.clone());
999        let mut batches = VecDeque::new();
1000        read_format
1001            .convert_record_batch(&batch, None, &mut batches)
1002            .unwrap();
1003        if !dedup {
1004            assert_eq!(
1005                total_rows,
1006                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1007            );
1008        }
1009        let batch_values = batches
1010            .into_iter()
1011            .map(|b| {
1012                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1013                let timestamps = b
1014                    .timestamps()
1015                    .as_any()
1016                    .downcast_ref::<TimestampMillisecondVector>()
1017                    .unwrap()
1018                    .iter_data()
1019                    .map(|v| v.unwrap().0.value())
1020                    .collect::<Vec<_>>();
1021                let float_values = b.fields()[1]
1022                    .data
1023                    .as_any()
1024                    .downcast_ref::<Float64Vector>()
1025                    .unwrap()
1026                    .iter_data()
1027                    .collect::<Vec<_>>();
1028
1029                (pk_values, timestamps, float_values)
1030            })
1031            .collect::<Vec<_>>();
1032        assert_eq!(expected.len(), batch_values.len());
1033
1034        for idx in 0..expected.len() {
1035            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1036            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1037            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1038        }
1039    }
1040
1041    #[test]
1042    fn test_mutations_to_record_batch() {
1043        check_mutations_to_record_batches(
1044            &[MutationInput {
1045                k0: "a",
1046                k1: 0,
1047                timestamps: &[0],
1048                v1: &[Some(0.1)],
1049                sequence: 0,
1050            }],
1051            &[BatchOutput {
1052                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1053                timestamps: &[0],
1054                v1: &[Some(0.1)],
1055            }],
1056            (0, 0),
1057            true,
1058        );
1059
1060        check_mutations_to_record_batches(
1061            &[
1062                MutationInput {
1063                    k0: "a",
1064                    k1: 0,
1065                    timestamps: &[0],
1066                    v1: &[Some(0.1)],
1067                    sequence: 0,
1068                },
1069                MutationInput {
1070                    k0: "b",
1071                    k1: 0,
1072                    timestamps: &[0],
1073                    v1: &[Some(0.0)],
1074                    sequence: 0,
1075                },
1076                MutationInput {
1077                    k0: "a",
1078                    k1: 0,
1079                    timestamps: &[1],
1080                    v1: &[Some(0.2)],
1081                    sequence: 1,
1082                },
1083                MutationInput {
1084                    k0: "a",
1085                    k1: 1,
1086                    timestamps: &[1],
1087                    v1: &[Some(0.3)],
1088                    sequence: 2,
1089                },
1090            ],
1091            &[
1092                BatchOutput {
1093                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1094                    timestamps: &[0, 1],
1095                    v1: &[Some(0.1), Some(0.2)],
1096                },
1097                BatchOutput {
1098                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1099                    timestamps: &[1],
1100                    v1: &[Some(0.3)],
1101                },
1102                BatchOutput {
1103                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1104                    timestamps: &[0],
1105                    v1: &[Some(0.0)],
1106                },
1107            ],
1108            (0, 1),
1109            true,
1110        );
1111
1112        check_mutations_to_record_batches(
1113            &[
1114                MutationInput {
1115                    k0: "a",
1116                    k1: 0,
1117                    timestamps: &[0],
1118                    v1: &[Some(0.1)],
1119                    sequence: 0,
1120                },
1121                MutationInput {
1122                    k0: "b",
1123                    k1: 0,
1124                    timestamps: &[0],
1125                    v1: &[Some(0.0)],
1126                    sequence: 0,
1127                },
1128                MutationInput {
1129                    k0: "a",
1130                    k1: 0,
1131                    timestamps: &[0],
1132                    v1: &[Some(0.2)],
1133                    sequence: 1,
1134                },
1135            ],
1136            &[
1137                BatchOutput {
1138                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1139                    timestamps: &[0],
1140                    v1: &[Some(0.2)],
1141                },
1142                BatchOutput {
1143                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1144                    timestamps: &[0],
1145                    v1: &[Some(0.0)],
1146                },
1147            ],
1148            (0, 0),
1149            true,
1150        );
1151        check_mutations_to_record_batches(
1152            &[
1153                MutationInput {
1154                    k0: "a",
1155                    k1: 0,
1156                    timestamps: &[0],
1157                    v1: &[Some(0.1)],
1158                    sequence: 0,
1159                },
1160                MutationInput {
1161                    k0: "b",
1162                    k1: 0,
1163                    timestamps: &[0],
1164                    v1: &[Some(0.0)],
1165                    sequence: 0,
1166                },
1167                MutationInput {
1168                    k0: "a",
1169                    k1: 0,
1170                    timestamps: &[0],
1171                    v1: &[Some(0.2)],
1172                    sequence: 1,
1173                },
1174            ],
1175            &[
1176                BatchOutput {
1177                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1178                    timestamps: &[0, 0],
1179                    v1: &[Some(0.2), Some(0.1)],
1180                },
1181                BatchOutput {
1182                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1183                    timestamps: &[0],
1184                    v1: &[Some(0.0)],
1185                },
1186            ],
1187            (0, 0),
1188            false,
1189        );
1190    }
1191
1192    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1193        let metadata = metadata_for_test();
1194        let mutations = input
1195            .iter()
1196            .map(|m| {
1197                build_key_values_with_ts_seq_values(
1198                    &metadata,
1199                    m.k0.to_string(),
1200                    m.k1,
1201                    m.timestamps.iter().copied(),
1202                    m.v1.iter().copied(),
1203                    m.sequence,
1204                )
1205                .mutation
1206            })
1207            .collect::<Vec<_>>();
1208        let encoder = BulkPartEncoder::new(metadata, true, 1024);
1209        encoder.encode_mutations(&mutations).unwrap().unwrap()
1210    }
1211
1212    #[test]
1213    fn test_write_and_read_part_projection() {
1214        let part = encode(&[
1215            MutationInput {
1216                k0: "a",
1217                k1: 0,
1218                timestamps: &[1],
1219                v1: &[Some(0.1)],
1220                sequence: 0,
1221            },
1222            MutationInput {
1223                k0: "b",
1224                k1: 0,
1225                timestamps: &[1],
1226                v1: &[Some(0.0)],
1227                sequence: 0,
1228            },
1229            MutationInput {
1230                k0: "a",
1231                k1: 0,
1232                timestamps: &[2],
1233                v1: &[Some(0.2)],
1234                sequence: 1,
1235            },
1236        ]);
1237
1238        let projection = &[4u32];
1239
1240        let mut reader = part
1241            .read(
1242                Arc::new(BulkIterContext::new(
1243                    part.metadata.region_metadata.clone(),
1244                    &Some(projection.as_slice()),
1245                    None,
1246                )),
1247                None,
1248            )
1249            .unwrap()
1250            .expect("expect at least one row group");
1251
1252        let mut total_rows_read = 0;
1253        let mut field = vec![];
1254        for res in reader {
1255            let batch = res.unwrap();
1256            assert_eq!(1, batch.fields().len());
1257            assert_eq!(4, batch.fields()[0].column_id);
1258            field.extend(
1259                batch.fields()[0]
1260                    .data
1261                    .as_any()
1262                    .downcast_ref::<Float64Vector>()
1263                    .unwrap()
1264                    .iter_data()
1265                    .map(|v| v.unwrap()),
1266            );
1267            total_rows_read += batch.num_rows();
1268        }
1269        assert_eq!(3, total_rows_read);
1270        assert_eq!(vec![0.1, 0.2, 0.0], field);
1271    }
1272
1273    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1274        let metadata = metadata_for_test();
1275        let mutations = key_values
1276            .into_iter()
1277            .map(|(k0, k1, (start, end), sequence)| {
1278                let ts = (start..end);
1279                let v1 = (start..end).map(|_| None);
1280                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1281                    .mutation
1282            })
1283            .collect::<Vec<_>>();
1284        let encoder = BulkPartEncoder::new(metadata, true, 100);
1285        encoder.encode_mutations(&mutations).unwrap().unwrap()
1286    }
1287
1288    fn check_prune_row_group(
1289        part: &EncodedBulkPart,
1290        predicate: Option<Predicate>,
1291        expected_rows: usize,
1292    ) {
1293        let context = Arc::new(BulkIterContext::new(
1294            part.metadata.region_metadata.clone(),
1295            &None,
1296            predicate,
1297        ));
1298        let mut reader = part
1299            .read(context, None)
1300            .unwrap()
1301            .expect("expect at least one row group");
1302        let mut total_rows_read = 0;
1303        for res in reader {
1304            let batch = res.unwrap();
1305            total_rows_read += batch.num_rows();
1306        }
1307        // Should only read row group 1.
1308        assert_eq!(expected_rows, total_rows_read);
1309    }
1310
1311    #[test]
1312    fn test_prune_row_groups() {
1313        let part = prepare(vec![
1314            ("a", 0, (0, 40), 1),
1315            ("a", 1, (0, 60), 1),
1316            ("b", 0, (0, 100), 2),
1317            ("b", 1, (100, 180), 3),
1318            ("b", 1, (180, 210), 4),
1319        ]);
1320
1321        let context = Arc::new(BulkIterContext::new(
1322            part.metadata.region_metadata.clone(),
1323            &None,
1324            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1325                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1326            )])),
1327        ));
1328        assert!(part.read(context, None).unwrap().is_none());
1329
1330        check_prune_row_group(&part, None, 310);
1331
1332        check_prune_row_group(
1333            &part,
1334            Some(Predicate::new(vec![
1335                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1336                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1337            ])),
1338            40,
1339        );
1340
1341        check_prune_row_group(
1342            &part,
1343            Some(Predicate::new(vec![
1344                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1345                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1346            ])),
1347            60,
1348        );
1349
1350        check_prune_row_group(
1351            &part,
1352            Some(Predicate::new(vec![
1353                datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1354            ])),
1355            100,
1356        );
1357
1358        check_prune_row_group(
1359            &part,
1360            Some(Predicate::new(vec![
1361                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1362                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1363            ])),
1364            100,
1365        );
1366
1367        /// Predicates over field column can do precise filtering.
1368        check_prune_row_group(
1369            &part,
1370            Some(Predicate::new(vec![
1371                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1372            ])),
1373            1,
1374        );
1375    }
1376
1377    #[test]
1378    fn test_bulk_part_converter_append_and_convert() {
1379        let metadata = metadata_for_test();
1380        let capacity = 100;
1381        let primary_key_codec = build_primary_key_codec(&metadata);
1382        let schema = to_flat_sst_arrow_schema(
1383            &metadata,
1384            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1385        );
1386
1387        let mut converter =
1388            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1389
1390        let key_values1 = build_key_values_with_ts_seq_values(
1391            &metadata,
1392            "key1".to_string(),
1393            1u32,
1394            vec![1000, 2000].into_iter(),
1395            vec![Some(1.0), Some(2.0)].into_iter(),
1396            1,
1397        );
1398
1399        let key_values2 = build_key_values_with_ts_seq_values(
1400            &metadata,
1401            "key2".to_string(),
1402            2u32,
1403            vec![1500].into_iter(),
1404            vec![Some(3.0)].into_iter(),
1405            2,
1406        );
1407
1408        converter.append_key_values(&key_values1).unwrap();
1409        converter.append_key_values(&key_values2).unwrap();
1410
1411        let bulk_part = converter.convert().unwrap();
1412
1413        assert_eq!(bulk_part.num_rows(), 3);
1414        assert_eq!(bulk_part.min_ts, 1000);
1415        assert_eq!(bulk_part.max_ts, 2000);
1416        assert_eq!(bulk_part.sequence, 2);
1417        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1418
1419        // Validate primary key columns are stored
1420        // Schema should include primary key columns k0 and k1 at the beginning
1421        let schema = bulk_part.batch.schema();
1422        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1423        assert_eq!(
1424            field_names,
1425            vec![
1426                "k0",
1427                "k1",
1428                "v0",
1429                "v1",
1430                "ts",
1431                "__primary_key",
1432                "__sequence",
1433                "__op_type"
1434            ]
1435        );
1436    }
1437
1438    #[test]
1439    fn test_bulk_part_converter_sorting() {
1440        let metadata = metadata_for_test();
1441        let capacity = 100;
1442        let primary_key_codec = build_primary_key_codec(&metadata);
1443        let schema = to_flat_sst_arrow_schema(
1444            &metadata,
1445            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1446        );
1447
1448        let mut converter =
1449            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1450
1451        let key_values1 = build_key_values_with_ts_seq_values(
1452            &metadata,
1453            "z_key".to_string(),
1454            3u32,
1455            vec![3000].into_iter(),
1456            vec![Some(3.0)].into_iter(),
1457            3,
1458        );
1459
1460        let key_values2 = build_key_values_with_ts_seq_values(
1461            &metadata,
1462            "a_key".to_string(),
1463            1u32,
1464            vec![1000].into_iter(),
1465            vec![Some(1.0)].into_iter(),
1466            1,
1467        );
1468
1469        let key_values3 = build_key_values_with_ts_seq_values(
1470            &metadata,
1471            "m_key".to_string(),
1472            2u32,
1473            vec![2000].into_iter(),
1474            vec![Some(2.0)].into_iter(),
1475            2,
1476        );
1477
1478        converter.append_key_values(&key_values1).unwrap();
1479        converter.append_key_values(&key_values2).unwrap();
1480        converter.append_key_values(&key_values3).unwrap();
1481
1482        let bulk_part = converter.convert().unwrap();
1483
1484        assert_eq!(bulk_part.num_rows(), 3);
1485
1486        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1487        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1488
1489        let ts_array = ts_column
1490            .as_any()
1491            .downcast_ref::<TimestampMillisecondArray>()
1492            .unwrap();
1493        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1494
1495        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1496        assert_eq!(seq_array.values(), &[1, 2, 3]);
1497
1498        // Validate primary key columns are stored
1499        let schema = bulk_part.batch.schema();
1500        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1501        assert_eq!(
1502            field_names,
1503            vec![
1504                "k0",
1505                "k1",
1506                "v0",
1507                "v1",
1508                "ts",
1509                "__primary_key",
1510                "__sequence",
1511                "__op_type"
1512            ]
1513        );
1514    }
1515
1516    #[test]
1517    fn test_bulk_part_converter_empty() {
1518        let metadata = metadata_for_test();
1519        let capacity = 10;
1520        let primary_key_codec = build_primary_key_codec(&metadata);
1521        let schema = to_flat_sst_arrow_schema(
1522            &metadata,
1523            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1524        );
1525
1526        let converter =
1527            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1528
1529        let bulk_part = converter.convert().unwrap();
1530
1531        assert_eq!(bulk_part.num_rows(), 0);
1532        assert_eq!(bulk_part.min_ts, i64::MAX);
1533        assert_eq!(bulk_part.max_ts, i64::MIN);
1534        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1535
1536        // Validate primary key columns are present in schema even for empty batch
1537        let schema = bulk_part.batch.schema();
1538        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1539        assert_eq!(
1540            field_names,
1541            vec![
1542                "k0",
1543                "k1",
1544                "v0",
1545                "v1",
1546                "ts",
1547                "__primary_key",
1548                "__sequence",
1549                "__op_type"
1550            ]
1551        );
1552    }
1553
1554    #[test]
1555    fn test_bulk_part_converter_without_primary_key_columns() {
1556        let metadata = metadata_for_test();
1557        let primary_key_codec = build_primary_key_codec(&metadata);
1558        let schema = to_flat_sst_arrow_schema(
1559            &metadata,
1560            &FlatSchemaOptions {
1561                raw_pk_columns: false,
1562                string_pk_use_dict: true,
1563            },
1564        );
1565
1566        let capacity = 100;
1567        let mut converter =
1568            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1569
1570        let key_values1 = build_key_values_with_ts_seq_values(
1571            &metadata,
1572            "key1".to_string(),
1573            1u32,
1574            vec![1000, 2000].into_iter(),
1575            vec![Some(1.0), Some(2.0)].into_iter(),
1576            1,
1577        );
1578
1579        let key_values2 = build_key_values_with_ts_seq_values(
1580            &metadata,
1581            "key2".to_string(),
1582            2u32,
1583            vec![1500].into_iter(),
1584            vec![Some(3.0)].into_iter(),
1585            2,
1586        );
1587
1588        converter.append_key_values(&key_values1).unwrap();
1589        converter.append_key_values(&key_values2).unwrap();
1590
1591        let bulk_part = converter.convert().unwrap();
1592
1593        assert_eq!(bulk_part.num_rows(), 3);
1594        assert_eq!(bulk_part.min_ts, 1000);
1595        assert_eq!(bulk_part.max_ts, 2000);
1596        assert_eq!(bulk_part.sequence, 2);
1597        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1598
1599        // Validate primary key columns are NOT stored individually
1600        let schema = bulk_part.batch.schema();
1601        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1602        assert_eq!(
1603            field_names,
1604            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1605        );
1606    }
1607
1608    #[allow(clippy::too_many_arguments)]
1609    fn build_key_values_with_sparse_encoding(
1610        metadata: &RegionMetadataRef,
1611        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1612        table_id: u32,
1613        tsid: u64,
1614        k0: String,
1615        k1: String,
1616        timestamps: impl Iterator<Item = i64>,
1617        values: impl Iterator<Item = Option<f64>>,
1618        sequence: SequenceNumber,
1619    ) -> KeyValues {
1620        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1621        let pk_values = vec![
1622            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1623            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1624            (0, Value::String(k0.clone().into())),
1625            (1, Value::String(k1.clone().into())),
1626        ];
1627        let mut encoded_key = Vec::new();
1628        primary_key_codec
1629            .encode_values(&pk_values, &mut encoded_key)
1630            .unwrap();
1631        assert!(!encoded_key.is_empty());
1632
1633        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1634        let column_schema = vec![
1635            api::v1::ColumnSchema {
1636                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1637                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1638                    ConcreteDataType::binary_datatype(),
1639                )
1640                .unwrap()
1641                .datatype() as i32,
1642                semantic_type: api::v1::SemanticType::Tag as i32,
1643                ..Default::default()
1644            },
1645            api::v1::ColumnSchema {
1646                column_name: "ts".to_string(),
1647                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1648                    ConcreteDataType::timestamp_millisecond_datatype(),
1649                )
1650                .unwrap()
1651                .datatype() as i32,
1652                semantic_type: api::v1::SemanticType::Timestamp as i32,
1653                ..Default::default()
1654            },
1655            api::v1::ColumnSchema {
1656                column_name: "v0".to_string(),
1657                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1658                    ConcreteDataType::int64_datatype(),
1659                )
1660                .unwrap()
1661                .datatype() as i32,
1662                semantic_type: api::v1::SemanticType::Field as i32,
1663                ..Default::default()
1664            },
1665            api::v1::ColumnSchema {
1666                column_name: "v1".to_string(),
1667                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1668                    ConcreteDataType::float64_datatype(),
1669                )
1670                .unwrap()
1671                .datatype() as i32,
1672                semantic_type: api::v1::SemanticType::Field as i32,
1673                ..Default::default()
1674            },
1675        ];
1676
1677        let rows = timestamps
1678            .zip(values)
1679            .map(|(ts, v)| Row {
1680                values: vec![
1681                    api::v1::Value {
1682                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1683                            encoded_key.clone(),
1684                        )),
1685                    },
1686                    api::v1::Value {
1687                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1688                    },
1689                    api::v1::Value {
1690                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1691                    },
1692                    api::v1::Value {
1693                        value_data: v.map(api::v1::value::ValueData::F64Value),
1694                    },
1695                ],
1696            })
1697            .collect();
1698
1699        let mutation = api::v1::Mutation {
1700            op_type: 1,
1701            sequence,
1702            rows: Some(api::v1::Rows {
1703                schema: column_schema,
1704                rows,
1705            }),
1706            write_hint: Some(WriteHint {
1707                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1708            }),
1709        };
1710        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1711    }
1712
1713    #[test]
1714    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1715        use api::v1::SemanticType;
1716        use datatypes::schema::ColumnSchema;
1717        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1718        use store_api::storage::RegionId;
1719
1720        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1721        builder
1722            .push_column_metadata(ColumnMetadata {
1723                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1724                semantic_type: SemanticType::Tag,
1725                column_id: 0,
1726            })
1727            .push_column_metadata(ColumnMetadata {
1728                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1729                semantic_type: SemanticType::Tag,
1730                column_id: 1,
1731            })
1732            .push_column_metadata(ColumnMetadata {
1733                column_schema: ColumnSchema::new(
1734                    "ts",
1735                    ConcreteDataType::timestamp_millisecond_datatype(),
1736                    false,
1737                ),
1738                semantic_type: SemanticType::Timestamp,
1739                column_id: 2,
1740            })
1741            .push_column_metadata(ColumnMetadata {
1742                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1743                semantic_type: SemanticType::Field,
1744                column_id: 3,
1745            })
1746            .push_column_metadata(ColumnMetadata {
1747                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1748                semantic_type: SemanticType::Field,
1749                column_id: 4,
1750            })
1751            .primary_key(vec![0, 1])
1752            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1753        let metadata = Arc::new(builder.build().unwrap());
1754
1755        let primary_key_codec = build_primary_key_codec(&metadata);
1756        let schema = to_flat_sst_arrow_schema(
1757            &metadata,
1758            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1759        );
1760
1761        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1762        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1763
1764        let capacity = 100;
1765        let mut converter =
1766            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1767
1768        let key_values1 = build_key_values_with_sparse_encoding(
1769            &metadata,
1770            &primary_key_codec,
1771            2048u32, // table_id
1772            100u64,  // tsid
1773            "key11".to_string(),
1774            "key21".to_string(),
1775            vec![1000, 2000].into_iter(),
1776            vec![Some(1.0), Some(2.0)].into_iter(),
1777            1,
1778        );
1779
1780        let key_values2 = build_key_values_with_sparse_encoding(
1781            &metadata,
1782            &primary_key_codec,
1783            4096u32, // table_id
1784            200u64,  // tsid
1785            "key12".to_string(),
1786            "key22".to_string(),
1787            vec![1500].into_iter(),
1788            vec![Some(3.0)].into_iter(),
1789            2,
1790        );
1791
1792        converter.append_key_values(&key_values1).unwrap();
1793        converter.append_key_values(&key_values2).unwrap();
1794
1795        let bulk_part = converter.convert().unwrap();
1796
1797        assert_eq!(bulk_part.num_rows(), 3);
1798        assert_eq!(bulk_part.min_ts, 1000);
1799        assert_eq!(bulk_part.max_ts, 2000);
1800        assert_eq!(bulk_part.sequence, 2);
1801        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1802
1803        // For sparse encoding, primary key columns should NOT be stored individually
1804        // even when store_primary_key_columns is true, because sparse encoding
1805        // stores the encoded primary key in the __primary_key column
1806        let schema = bulk_part.batch.schema();
1807        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1808        assert_eq!(
1809            field_names,
1810            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1811        );
1812
1813        // Verify the __primary_key column contains encoded sparse keys
1814        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1815        let dict_array = primary_key_column
1816            .as_any()
1817            .downcast_ref::<DictionaryArray<UInt32Type>>()
1818            .unwrap();
1819
1820        // Should have non-zero entries indicating encoded primary keys
1821        assert!(!dict_array.is_empty());
1822        assert_eq!(dict_array.len(), 3); // 3 rows total
1823
1824        // Verify values are properly encoded binary data (not empty)
1825        let values = dict_array
1826            .values()
1827            .as_any()
1828            .downcast_ref::<BinaryArray>()
1829            .unwrap();
1830        for i in 0..values.len() {
1831            assert!(
1832                !values.value(i).is_empty(),
1833                "Encoded primary key should not be empty"
1834            );
1835        }
1836    }
1837}