mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::data_type::AsBytes;
49use parquet::file::metadata::ParquetMetaData;
50use parquet::file::properties::WriterProperties;
51use snafu::{OptionExt, ResultExt, Snafu};
52use store_api::codec::PrimaryKeyEncoding;
53use store_api::metadata::{RegionMetadata, RegionMetadataRef};
54use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
55use store_api::storage::{FileId, SequenceNumber};
56use table::predicate::Predicate;
57
58use crate::error::{
59    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
60    EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
61};
62use crate::memtable::BoxedRecordBatchIterator;
63use crate::memtable::bulk::context::BulkIterContextRef;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::sst::index::IndexOutput;
67use crate::sst::parquet::flat_format::primary_key_column_index;
68use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
69use crate::sst::parquet::helper::parse_parquet_metadata;
70use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
71use crate::sst::to_sst_arrow_schema;
72
73const INIT_DICT_VALUE_CAPACITY: usize = 8;
74
75/// A raw bulk part in the memtable.
76#[derive(Clone)]
77pub struct BulkPart {
78    pub batch: RecordBatch,
79    pub max_timestamp: i64,
80    pub min_timestamp: i64,
81    pub sequence: u64,
82    pub timestamp_index: usize,
83    pub raw_data: Option<ArrowIpc>,
84}
85
86impl TryFrom<BulkWalEntry> for BulkPart {
87    type Error = error::Error;
88
89    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
90        match value.body.expect("Entry payload should be present") {
91            Body::ArrowIpc(ipc) => {
92                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
93                    .context(error::ConvertBulkWalEntrySnafu)?;
94                let batch = decoder
95                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
96                    .context(error::ConvertBulkWalEntrySnafu)?;
97                Ok(Self {
98                    batch,
99                    max_timestamp: value.max_ts,
100                    min_timestamp: value.min_ts,
101                    sequence: value.sequence,
102                    timestamp_index: value.timestamp_index as usize,
103                    raw_data: Some(ipc),
104                })
105            }
106        }
107    }
108}
109
110impl TryFrom<&BulkPart> for BulkWalEntry {
111    type Error = error::Error;
112
113    fn try_from(value: &BulkPart) -> Result<Self> {
114        if let Some(ipc) = &value.raw_data {
115            Ok(BulkWalEntry {
116                sequence: value.sequence,
117                max_ts: value.max_timestamp,
118                min_ts: value.min_timestamp,
119                timestamp_index: value.timestamp_index as u32,
120                body: Some(Body::ArrowIpc(ipc.clone())),
121            })
122        } else {
123            let mut encoder = FlightEncoder::default();
124            let schema_bytes = encoder
125                .encode_schema(value.batch.schema().as_ref())
126                .data_header;
127            let [rb_data] = encoder
128                .encode(FlightMessage::RecordBatch(value.batch.clone()))
129                .try_into()
130                .map_err(|_| {
131                    error::UnsupportedOperationSnafu {
132                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
133                    }
134                    .build()
135                })?;
136            Ok(BulkWalEntry {
137                sequence: value.sequence,
138                max_ts: value.max_timestamp,
139                min_ts: value.min_timestamp,
140                timestamp_index: value.timestamp_index as u32,
141                body: Some(Body::ArrowIpc(ArrowIpc {
142                    schema: schema_bytes,
143                    data_header: rb_data.data_header,
144                    payload: rb_data.data_body,
145                })),
146            })
147        }
148    }
149}
150
151impl BulkPart {
152    pub(crate) fn estimated_size(&self) -> usize {
153        record_batch_estimated_size(&self.batch)
154    }
155
156    /// Returns the estimated series count in this BulkPart.
157    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
158    pub fn estimated_series_count(&self) -> usize {
159        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
160        let pk_column = self.batch.column(pk_column_idx);
161        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
162            dict_array.values().len()
163        } else {
164            0
165        }
166    }
167
168    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
169    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
170        let vectors = region_metadata
171            .schema
172            .column_schemas()
173            .iter()
174            .map(|col| match self.batch.column_by_name(&col.name) {
175                None => Ok(None),
176                Some(col) => Helper::try_into_vector(col).map(Some),
177            })
178            .collect::<datatypes::error::Result<Vec<_>>>()
179            .context(error::ComputeVectorSnafu)?;
180
181        let rows = (0..self.num_rows())
182            .map(|row_idx| {
183                let values = (0..self.batch.num_columns())
184                    .map(|col_idx| {
185                        if let Some(v) = &vectors[col_idx] {
186                            value_to_grpc_value(v.get(row_idx))
187                        } else {
188                            api::v1::Value { value_data: None }
189                        }
190                    })
191                    .collect::<Vec<_>>();
192                api::v1::Row { values }
193            })
194            .collect::<Vec<_>>();
195
196        let schema = region_metadata
197            .column_metadatas
198            .iter()
199            .map(|c| {
200                let data_type_wrapper =
201                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
202                Ok(api::v1::ColumnSchema {
203                    column_name: c.column_schema.name.clone(),
204                    datatype: data_type_wrapper.datatype() as i32,
205                    semantic_type: c.semantic_type as i32,
206                    ..Default::default()
207                })
208            })
209            .collect::<api::error::Result<Vec<_>>>()
210            .context(error::ConvertColumnDataTypeSnafu {
211                reason: "failed to convert region metadata to column schema",
212            })?;
213
214        let rows = api::v1::Rows { schema, rows };
215
216        Ok(Mutation {
217            op_type: OpType::Put as i32,
218            sequence: self.sequence,
219            rows: Some(rows),
220            write_hint: None,
221        })
222    }
223
224    pub fn timestamps(&self) -> &ArrayRef {
225        self.batch.column(self.timestamp_index)
226    }
227
228    pub fn num_rows(&self) -> usize {
229        self.batch.num_rows()
230    }
231}
232
233/// More accurate estimation of the size of a record batch.
234pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
235    batch
236        .columns()
237        .iter()
238        // If can not get slice memory size, assume 0 here.
239        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
240        .sum()
241}
242
243/// Primary key column builder for handling strings specially.
244enum PrimaryKeyColumnBuilder {
245    /// String dictionary builder for string types.
246    StringDict(StringDictionaryBuilder<UInt32Type>),
247    /// Generic mutable vector for other types.
248    Vector(Box<dyn MutableVector>),
249}
250
251impl PrimaryKeyColumnBuilder {
252    /// Appends a value to the builder.
253    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
254        match self {
255            PrimaryKeyColumnBuilder::StringDict(builder) => {
256                if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
257                    // We know the value is a string.
258                    builder.append_value(s);
259                } else {
260                    builder.append_null();
261                }
262            }
263            PrimaryKeyColumnBuilder::Vector(builder) => {
264                builder.push_value_ref(value);
265            }
266        }
267        Ok(())
268    }
269
270    /// Converts the builder to an ArrayRef.
271    fn into_arrow_array(self) -> ArrayRef {
272        match self {
273            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
274            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
275        }
276    }
277}
278
279/// Converter that converts structs into [BulkPart].
280pub struct BulkPartConverter {
281    /// Region metadata.
282    region_metadata: RegionMetadataRef,
283    /// Schema of the converted batch.
284    schema: SchemaRef,
285    /// Primary key codec for encoding keys
286    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
287    /// Buffer for encoding primary key.
288    key_buf: Vec<u8>,
289    /// Primary key array builder.
290    key_array_builder: PrimaryKeyArrayBuilder,
291    /// Builders for non-primary key columns.
292    value_builder: ValueBuilder,
293    /// Builders for individual primary key columns.
294    /// The order of builders is the same as the order of primary key columns in the region metadata.
295    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
296
297    /// Max timestamp value.
298    max_ts: i64,
299    /// Min timestamp value.
300    min_ts: i64,
301    /// Max sequence number.
302    max_sequence: SequenceNumber,
303}
304
305impl BulkPartConverter {
306    /// Creates a new converter.
307    ///
308    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
309    /// stores primary key columns in arrays additionally.
310    pub fn new(
311        region_metadata: &RegionMetadataRef,
312        schema: SchemaRef,
313        capacity: usize,
314        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
315        store_primary_key_columns: bool,
316    ) -> Self {
317        debug_assert_eq!(
318            region_metadata.primary_key_encoding,
319            primary_key_codec.encoding()
320        );
321
322        let primary_key_column_builders = if store_primary_key_columns
323            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
324        {
325            new_primary_key_column_builders(region_metadata, capacity)
326        } else {
327            Vec::new()
328        };
329
330        Self {
331            region_metadata: region_metadata.clone(),
332            schema,
333            primary_key_codec,
334            key_buf: Vec::new(),
335            key_array_builder: PrimaryKeyArrayBuilder::new(),
336            value_builder: ValueBuilder::new(region_metadata, capacity),
337            primary_key_column_builders,
338            min_ts: i64::MAX,
339            max_ts: i64::MIN,
340            max_sequence: SequenceNumber::MIN,
341        }
342    }
343
344    /// Appends a [KeyValues] into the converter.
345    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
346        for kv in key_values.iter() {
347            self.append_key_value(&kv)?;
348        }
349
350        Ok(())
351    }
352
353    /// Appends a [KeyValue] to builders.
354    ///
355    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
356    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
357        // Handles primary key based on encoding type
358        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
359            // For sparse encoding, the primary key is already encoded in the KeyValue
360            // Gets the first (and only) primary key value which contains the encoded key
361            let mut primary_keys = kv.primary_keys();
362            if let Some(encoded) = primary_keys
363                .next()
364                .context(ColumnNotFoundSnafu {
365                    column: PRIMARY_KEY_COLUMN_NAME,
366                })?
367                .as_binary()
368                .context(DataTypeMismatchSnafu)?
369            {
370                self.key_array_builder
371                    .append(encoded)
372                    .context(ComputeArrowSnafu)?;
373            } else {
374                self.key_array_builder
375                    .append("")
376                    .context(ComputeArrowSnafu)?;
377            }
378        } else {
379            // For dense encoding, we need to encode the primary key columns
380            self.key_buf.clear();
381            self.primary_key_codec
382                .encode_key_value(kv, &mut self.key_buf)
383                .context(EncodeSnafu)?;
384            self.key_array_builder
385                .append(&self.key_buf)
386                .context(ComputeArrowSnafu)?;
387        };
388
389        // If storing primary key columns, append values to individual builders
390        if !self.primary_key_column_builders.is_empty() {
391            for (builder, pk_value) in self
392                .primary_key_column_builders
393                .iter_mut()
394                .zip(kv.primary_keys())
395            {
396                builder.push_value_ref(pk_value)?;
397            }
398        }
399
400        // Pushes other columns.
401        self.value_builder.push(
402            kv.timestamp(),
403            kv.sequence(),
404            kv.op_type() as u8,
405            kv.fields(),
406        );
407
408        // Updates statistics
409        // Safety: timestamp of kv must be both present and a valid timestamp value.
410        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
411        self.min_ts = self.min_ts.min(ts);
412        self.max_ts = self.max_ts.max(ts);
413        self.max_sequence = self.max_sequence.max(kv.sequence());
414
415        Ok(())
416    }
417
418    /// Converts buffered content into a [BulkPart].
419    ///
420    /// It sorts the record batch by (primary key, timestamp, sequence desc).
421    pub fn convert(mut self) -> Result<BulkPart> {
422        let values = Values::from(self.value_builder);
423        let mut columns =
424            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
425
426        // Build primary key column arrays if enabled.
427        for builder in self.primary_key_column_builders {
428            columns.push(builder.into_arrow_array());
429        }
430        // Then fields columns.
431        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
432        // Time index.
433        let timestamp_index = columns.len();
434        columns.push(values.timestamp.to_arrow_array());
435        // Primary key.
436        let pk_array = self.key_array_builder.finish();
437        columns.push(Arc::new(pk_array));
438        // Sequence and op type.
439        columns.push(values.sequence.to_arrow_array());
440        columns.push(values.op_type.to_arrow_array());
441
442        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
443        // Sorts the record batch.
444        let batch = sort_primary_key_record_batch(&batch)?;
445
446        Ok(BulkPart {
447            batch,
448            max_timestamp: self.max_ts,
449            min_timestamp: self.min_ts,
450            sequence: self.max_sequence,
451            timestamp_index,
452            raw_data: None,
453        })
454    }
455}
456
457fn new_primary_key_column_builders(
458    metadata: &RegionMetadata,
459    capacity: usize,
460) -> Vec<PrimaryKeyColumnBuilder> {
461    metadata
462        .primary_key_columns()
463        .map(|col| {
464            if col.column_schema.data_type.is_string() {
465                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
466                    capacity,
467                    INIT_DICT_VALUE_CAPACITY,
468                    capacity,
469                ))
470            } else {
471                PrimaryKeyColumnBuilder::Vector(
472                    col.column_schema.data_type.create_mutable_vector(capacity),
473                )
474            }
475        })
476        .collect()
477}
478
479/// Sorts the record batch with primary key format.
480fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
481    let total_columns = batch.num_columns();
482    let sort_columns = vec![
483        // Primary key column (ascending)
484        SortColumn {
485            values: batch.column(total_columns - 3).clone(),
486            options: Some(SortOptions {
487                descending: false,
488                nulls_first: true,
489            }),
490        },
491        // Time index column (ascending)
492        SortColumn {
493            values: batch.column(total_columns - 4).clone(),
494            options: Some(SortOptions {
495                descending: false,
496                nulls_first: true,
497            }),
498        },
499        // Sequence column (descending)
500        SortColumn {
501            values: batch.column(total_columns - 2).clone(),
502            options: Some(SortOptions {
503                descending: true,
504                nulls_first: true,
505            }),
506        },
507    ];
508
509    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
510        .context(ComputeArrowSnafu)?;
511
512    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
513}
514
515#[derive(Debug, Clone)]
516pub struct EncodedBulkPart {
517    data: Bytes,
518    metadata: BulkPartMeta,
519}
520
521impl EncodedBulkPart {
522    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
523        Self { data, metadata }
524    }
525
526    pub(crate) fn metadata(&self) -> &BulkPartMeta {
527        &self.metadata
528    }
529
530    /// Returns the size of the encoded data in bytes
531    pub(crate) fn size_bytes(&self) -> usize {
532        self.data.len()
533    }
534
535    /// Returns the encoded data.
536    pub(crate) fn data(&self) -> &Bytes {
537        &self.data
538    }
539
540    /// Converts this `EncodedBulkPart` to `SstInfo`.
541    ///
542    /// # Arguments
543    /// * `file_id` - The SST file ID to assign to this part
544    ///
545    /// # Returns
546    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
547    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
548        let unit = self.metadata.region_metadata.time_index_type().unit();
549        SstInfo {
550            file_id,
551            time_range: (
552                Timestamp::new(self.metadata.min_timestamp, unit),
553                Timestamp::new(self.metadata.max_timestamp, unit),
554            ),
555            file_size: self.data.len() as u64,
556            num_rows: self.metadata.num_rows,
557            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
558            file_metadata: Some(self.metadata.parquet_metadata.clone()),
559            index_metadata: IndexOutput::default(),
560        }
561    }
562
563    pub(crate) fn read(
564        &self,
565        context: BulkIterContextRef,
566        sequence: Option<SequenceNumber>,
567    ) -> Result<Option<BoxedRecordBatchIterator>> {
568        // use predicate to find row groups to read.
569        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
570
571        if row_groups_to_read.is_empty() {
572            // All row groups are filtered.
573            return Ok(None);
574        }
575
576        let iter = EncodedBulkPartIter::try_new(
577            context,
578            row_groups_to_read,
579            self.metadata.parquet_metadata.clone(),
580            self.data.clone(),
581            sequence,
582        )?;
583        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
584    }
585}
586
587#[derive(Debug, Clone)]
588pub struct BulkPartMeta {
589    /// Total rows in part.
590    pub num_rows: usize,
591    /// Max timestamp in part.
592    pub max_timestamp: i64,
593    /// Min timestamp in part.
594    pub min_timestamp: i64,
595    /// Part file metadata.
596    pub parquet_metadata: Arc<ParquetMetaData>,
597    /// Part region schema.
598    pub region_metadata: RegionMetadataRef,
599}
600
601/// Metrics for encoding a part.
602#[derive(Default, Debug)]
603pub struct BulkPartEncodeMetrics {
604    /// Cost of iterating over the data.
605    pub iter_cost: Duration,
606    /// Cost of writing the data.
607    pub write_cost: Duration,
608    /// Size of data before encoding.
609    pub raw_size: usize,
610    /// Size of data after encoding.
611    pub encoded_size: usize,
612    /// Number of rows in part.
613    pub num_rows: usize,
614}
615
616pub struct BulkPartEncoder {
617    metadata: RegionMetadataRef,
618    row_group_size: usize,
619    writer_props: Option<WriterProperties>,
620}
621
622impl BulkPartEncoder {
623    pub(crate) fn new(
624        metadata: RegionMetadataRef,
625        row_group_size: usize,
626    ) -> Result<BulkPartEncoder> {
627        // TODO(yingwen): Skip arrow schema if needed.
628        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
629        let key_value_meta =
630            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
631
632        // TODO(yingwen): Do we need compression?
633        let writer_props = Some(
634            WriterProperties::builder()
635                .set_key_value_metadata(Some(vec![key_value_meta]))
636                .set_write_batch_size(row_group_size)
637                .set_max_row_group_size(row_group_size)
638                .set_column_index_truncate_length(None)
639                .set_statistics_truncate_length(None)
640                .build(),
641        );
642
643        Ok(Self {
644            metadata,
645            row_group_size,
646            writer_props,
647        })
648    }
649}
650
651impl BulkPartEncoder {
652    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
653    pub fn encode_record_batch_iter(
654        &self,
655        iter: BoxedRecordBatchIterator,
656        arrow_schema: SchemaRef,
657        min_timestamp: i64,
658        max_timestamp: i64,
659        metrics: &mut BulkPartEncodeMetrics,
660    ) -> Result<Option<EncodedBulkPart>> {
661        let mut buf = Vec::with_capacity(4096);
662        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
663            .context(EncodeMemtableSnafu)?;
664        let mut total_rows = 0;
665
666        // Process each batch from the iterator
667        let mut iter_start = Instant::now();
668        for batch_result in iter {
669            metrics.iter_cost += iter_start.elapsed();
670            let batch = batch_result?;
671            if batch.num_rows() == 0 {
672                continue;
673            }
674
675            metrics.raw_size += record_batch_estimated_size(&batch);
676            let write_start = Instant::now();
677            writer.write(&batch).context(EncodeMemtableSnafu)?;
678            metrics.write_cost += write_start.elapsed();
679            total_rows += batch.num_rows();
680            iter_start = Instant::now();
681        }
682        metrics.iter_cost += iter_start.elapsed();
683        iter_start = Instant::now();
684
685        if total_rows == 0 {
686            return Ok(None);
687        }
688
689        let close_start = Instant::now();
690        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
691        metrics.write_cost += close_start.elapsed();
692        metrics.encoded_size += buf.len();
693        metrics.num_rows += total_rows;
694
695        let buf = Bytes::from(buf);
696        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
697
698        Ok(Some(EncodedBulkPart {
699            data: buf,
700            metadata: BulkPartMeta {
701                num_rows: total_rows,
702                max_timestamp,
703                min_timestamp,
704                parquet_metadata,
705                region_metadata: self.metadata.clone(),
706            },
707        }))
708    }
709
710    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
711    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
712        if part.batch.num_rows() == 0 {
713            return Ok(None);
714        }
715
716        let mut buf = Vec::with_capacity(4096);
717        let arrow_schema = part.batch.schema();
718
719        let file_metadata = {
720            let mut writer =
721                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
722                    .context(EncodeMemtableSnafu)?;
723            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
724            writer.finish().context(EncodeMemtableSnafu)?
725        };
726
727        let buf = Bytes::from(buf);
728        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
729
730        Ok(Some(EncodedBulkPart {
731            data: buf,
732            metadata: BulkPartMeta {
733                num_rows: part.batch.num_rows(),
734                max_timestamp: part.max_timestamp,
735                min_timestamp: part.min_timestamp,
736                parquet_metadata,
737                region_metadata: self.metadata.clone(),
738            },
739        }))
740    }
741}
742
743/// Converts mutations to record batches.
744fn mutations_to_record_batch(
745    mutations: &[Mutation],
746    metadata: &RegionMetadataRef,
747    pk_encoder: &DensePrimaryKeyCodec,
748    dedup: bool,
749) -> Result<Option<(RecordBatch, i64, i64)>> {
750    let total_rows: usize = mutations
751        .iter()
752        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
753        .sum();
754
755    if total_rows == 0 {
756        return Ok(None);
757    }
758
759    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
760
761    let mut ts_vector: Box<dyn MutableVector> = metadata
762        .time_index_column()
763        .column_schema
764        .data_type
765        .create_mutable_vector(total_rows);
766    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
767    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
768
769    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
770        .field_columns()
771        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
772        .collect();
773
774    let mut pk_buffer = vec![];
775    for m in mutations {
776        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
777            continue;
778        };
779
780        for row in key_values.iter() {
781            pk_buffer.clear();
782            pk_encoder
783                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
784                .context(EncodeSnafu)?;
785            pk_builder.append_value(pk_buffer.as_bytes());
786            ts_vector.push_value_ref(row.timestamp());
787            sequence_builder.append_value(row.sequence());
788            op_type_builder.append_value(row.op_type() as u8);
789            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
790                builder.push_value_ref(field);
791            }
792        }
793    }
794
795    let arrow_schema = to_sst_arrow_schema(metadata);
796    // safety: timestamp column must be valid, and values must not be None.
797    let timestamp_unit = metadata
798        .time_index_column()
799        .column_schema
800        .data_type
801        .as_timestamp()
802        .unwrap()
803        .unit();
804    let sorter = ArraysSorter {
805        encoded_primary_keys: pk_builder.finish(),
806        timestamp_unit,
807        timestamp: ts_vector.to_vector().to_arrow_array(),
808        sequence: sequence_builder.finish(),
809        op_type: op_type_builder.finish(),
810        fields: field_builders
811            .iter_mut()
812            .map(|f| f.to_vector().to_arrow_array()),
813        dedup,
814        arrow_schema,
815    };
816
817    sorter.sort().map(Some)
818}
819
820struct ArraysSorter<I> {
821    encoded_primary_keys: BinaryArray,
822    timestamp_unit: TimeUnit,
823    timestamp: ArrayRef,
824    sequence: UInt64Array,
825    op_type: UInt8Array,
826    fields: I,
827    dedup: bool,
828    arrow_schema: SchemaRef,
829}
830
831impl<I> ArraysSorter<I>
832where
833    I: Iterator<Item = ArrayRef>,
834{
835    /// Converts arrays to record batch.
836    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
837        debug_assert!(!self.timestamp.is_empty());
838        debug_assert!(self.timestamp.len() == self.sequence.len());
839        debug_assert!(self.timestamp.len() == self.op_type.len());
840        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
841
842        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
843        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
844        let mut to_sort = self
845            .encoded_primary_keys
846            .iter()
847            .zip(timestamp_iter)
848            .zip(self.sequence.iter())
849            .map(|((pk, timestamp), sequence)| {
850                max_timestamp = max_timestamp.max(*timestamp);
851                min_timestamp = min_timestamp.min(*timestamp);
852                (pk, timestamp, sequence)
853            })
854            .enumerate()
855            .collect::<Vec<_>>();
856
857        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
858            l_pk.cmp(r_pk)
859                .then(l_ts.cmp(r_ts))
860                .then(l_seq.cmp(r_seq).reverse())
861        });
862
863        if self.dedup {
864            // Dedup by timestamps while ignore sequence.
865            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
866                l_pk == r_pk && l_ts == r_ts
867            });
868        }
869
870        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
871
872        let pk_dictionary = Arc::new(binary_array_to_dictionary(
873            // safety: pk must be BinaryArray
874            arrow::compute::take(
875                &self.encoded_primary_keys,
876                &indices,
877                Some(TakeOptions {
878                    check_bounds: false,
879                }),
880            )
881            .context(ComputeArrowSnafu)?
882            .as_any()
883            .downcast_ref::<BinaryArray>()
884            .unwrap(),
885        )?) as ArrayRef;
886
887        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
888        for arr in self.fields {
889            arrays.push(
890                arrow::compute::take(
891                    &arr,
892                    &indices,
893                    Some(TakeOptions {
894                        check_bounds: false,
895                    }),
896                )
897                .context(ComputeArrowSnafu)?,
898            );
899        }
900
901        let timestamp = arrow::compute::take(
902            &self.timestamp,
903            &indices,
904            Some(TakeOptions {
905                check_bounds: false,
906            }),
907        )
908        .context(ComputeArrowSnafu)?;
909
910        arrays.push(timestamp);
911        arrays.push(pk_dictionary);
912        arrays.push(
913            arrow::compute::take(
914                &self.sequence,
915                &indices,
916                Some(TakeOptions {
917                    check_bounds: false,
918                }),
919            )
920            .context(ComputeArrowSnafu)?,
921        );
922
923        arrays.push(
924            arrow::compute::take(
925                &self.op_type,
926                &indices,
927                Some(TakeOptions {
928                    check_bounds: false,
929                }),
930            )
931            .context(ComputeArrowSnafu)?,
932        );
933
934        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
935        Ok((batch, min_timestamp, max_timestamp))
936    }
937}
938
939/// Converts timestamp array to an iter of i64 values.
940fn timestamp_array_to_iter(
941    timestamp_unit: TimeUnit,
942    timestamp: &ArrayRef,
943) -> impl Iterator<Item = &i64> {
944    match timestamp_unit {
945        // safety: timestamp column must be valid.
946        TimeUnit::Second => timestamp
947            .as_any()
948            .downcast_ref::<TimestampSecondArray>()
949            .unwrap()
950            .values()
951            .iter(),
952        TimeUnit::Millisecond => timestamp
953            .as_any()
954            .downcast_ref::<TimestampMillisecondArray>()
955            .unwrap()
956            .values()
957            .iter(),
958        TimeUnit::Microsecond => timestamp
959            .as_any()
960            .downcast_ref::<TimestampMicrosecondArray>()
961            .unwrap()
962            .values()
963            .iter(),
964        TimeUnit::Nanosecond => timestamp
965            .as_any()
966            .downcast_ref::<TimestampNanosecondArray>()
967            .unwrap()
968            .values()
969            .iter(),
970    }
971}
972
973/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
974fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
975    if input.is_empty() {
976        return Ok(DictionaryArray::new(
977            UInt32Array::from(Vec::<u32>::new()),
978            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
979        ));
980    }
981    let mut keys = Vec::with_capacity(16);
982    let mut values = BinaryBuilder::new();
983    let mut prev: usize = 0;
984    keys.push(prev as u32);
985    values.append_value(input.value(prev));
986
987    for current_bytes in input.iter().skip(1) {
988        // safety: encoded pk must present.
989        let current_bytes = current_bytes.unwrap();
990        let prev_bytes = input.value(prev);
991        if current_bytes != prev_bytes {
992            values.append_value(current_bytes);
993            prev += 1;
994        }
995        keys.push(prev as u32);
996    }
997
998    Ok(DictionaryArray::new(
999        UInt32Array::from(keys),
1000        Arc::new(values.finish()) as ArrayRef,
1001    ))
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use std::collections::VecDeque;
1007
1008    use api::v1::{Row, WriteHint};
1009    use datafusion_common::ScalarValue;
1010    use datatypes::arrow::array::Float64Array;
1011    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1012    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1013    use store_api::storage::consts::ReservedColumnId;
1014
1015    use super::*;
1016    use crate::memtable::bulk::context::BulkIterContext;
1017    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1018    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1019    use crate::test_util::memtable_util::{
1020        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1021    };
1022
1023    fn check_binary_array_to_dictionary(
1024        input: &[&[u8]],
1025        expected_keys: &[u32],
1026        expected_values: &[&[u8]],
1027    ) {
1028        let input = BinaryArray::from_iter_values(input.iter());
1029        let array = binary_array_to_dictionary(&input).unwrap();
1030        assert_eq!(
1031            &expected_keys,
1032            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1033        );
1034        assert_eq!(
1035            expected_values,
1036            &array
1037                .values()
1038                .as_any()
1039                .downcast_ref::<BinaryArray>()
1040                .unwrap()
1041                .iter()
1042                .map(|v| v.unwrap())
1043                .collect::<Vec<_>>()
1044        );
1045    }
1046
1047    #[test]
1048    fn test_binary_array_to_dictionary() {
1049        check_binary_array_to_dictionary(&[], &[], &[]);
1050
1051        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1052
1053        check_binary_array_to_dictionary(
1054            &["a".as_bytes(), "a".as_bytes()],
1055            &[0, 0],
1056            &["a".as_bytes()],
1057        );
1058
1059        check_binary_array_to_dictionary(
1060            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1061            &[0, 0, 1],
1062            &["a".as_bytes(), "b".as_bytes()],
1063        );
1064
1065        check_binary_array_to_dictionary(
1066            &[
1067                "a".as_bytes(),
1068                "a".as_bytes(),
1069                "b".as_bytes(),
1070                "c".as_bytes(),
1071            ],
1072            &[0, 0, 1, 2],
1073            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1074        );
1075    }
1076
1077    struct MutationInput<'a> {
1078        k0: &'a str,
1079        k1: u32,
1080        timestamps: &'a [i64],
1081        v1: &'a [Option<f64>],
1082        sequence: u64,
1083    }
1084
1085    #[derive(Debug, PartialOrd, PartialEq)]
1086    struct BatchOutput<'a> {
1087        pk_values: &'a [Value],
1088        timestamps: &'a [i64],
1089        v1: &'a [Option<f64>],
1090    }
1091
1092    fn check_mutations_to_record_batches(
1093        input: &[MutationInput],
1094        expected: &[BatchOutput],
1095        expected_timestamp: (i64, i64),
1096        dedup: bool,
1097    ) {
1098        let metadata = metadata_for_test();
1099        let mutations = input
1100            .iter()
1101            .map(|m| {
1102                build_key_values_with_ts_seq_values(
1103                    &metadata,
1104                    m.k0.to_string(),
1105                    m.k1,
1106                    m.timestamps.iter().copied(),
1107                    m.v1.iter().copied(),
1108                    m.sequence,
1109                )
1110                .mutation
1111            })
1112            .collect::<Vec<_>>();
1113        let total_rows: usize = mutations
1114            .iter()
1115            .flat_map(|m| m.rows.iter())
1116            .map(|r| r.rows.len())
1117            .sum();
1118
1119        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1120
1121        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1122            .unwrap()
1123            .unwrap();
1124        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1125        let mut batches = VecDeque::new();
1126        read_format
1127            .convert_record_batch(&batch, None, &mut batches)
1128            .unwrap();
1129        if !dedup {
1130            assert_eq!(
1131                total_rows,
1132                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1133            );
1134        }
1135        let batch_values = batches
1136            .into_iter()
1137            .map(|b| {
1138                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1139                let timestamps = b
1140                    .timestamps()
1141                    .as_any()
1142                    .downcast_ref::<TimestampMillisecondVector>()
1143                    .unwrap()
1144                    .iter_data()
1145                    .map(|v| v.unwrap().0.value())
1146                    .collect::<Vec<_>>();
1147                let float_values = b.fields()[1]
1148                    .data
1149                    .as_any()
1150                    .downcast_ref::<Float64Vector>()
1151                    .unwrap()
1152                    .iter_data()
1153                    .collect::<Vec<_>>();
1154
1155                (pk_values, timestamps, float_values)
1156            })
1157            .collect::<Vec<_>>();
1158        assert_eq!(expected.len(), batch_values.len());
1159
1160        for idx in 0..expected.len() {
1161            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1162            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1163            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1164        }
1165    }
1166
1167    #[test]
1168    fn test_mutations_to_record_batch() {
1169        check_mutations_to_record_batches(
1170            &[MutationInput {
1171                k0: "a",
1172                k1: 0,
1173                timestamps: &[0],
1174                v1: &[Some(0.1)],
1175                sequence: 0,
1176            }],
1177            &[BatchOutput {
1178                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1179                timestamps: &[0],
1180                v1: &[Some(0.1)],
1181            }],
1182            (0, 0),
1183            true,
1184        );
1185
1186        check_mutations_to_record_batches(
1187            &[
1188                MutationInput {
1189                    k0: "a",
1190                    k1: 0,
1191                    timestamps: &[0],
1192                    v1: &[Some(0.1)],
1193                    sequence: 0,
1194                },
1195                MutationInput {
1196                    k0: "b",
1197                    k1: 0,
1198                    timestamps: &[0],
1199                    v1: &[Some(0.0)],
1200                    sequence: 0,
1201                },
1202                MutationInput {
1203                    k0: "a",
1204                    k1: 0,
1205                    timestamps: &[1],
1206                    v1: &[Some(0.2)],
1207                    sequence: 1,
1208                },
1209                MutationInput {
1210                    k0: "a",
1211                    k1: 1,
1212                    timestamps: &[1],
1213                    v1: &[Some(0.3)],
1214                    sequence: 2,
1215                },
1216            ],
1217            &[
1218                BatchOutput {
1219                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1220                    timestamps: &[0, 1],
1221                    v1: &[Some(0.1), Some(0.2)],
1222                },
1223                BatchOutput {
1224                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1225                    timestamps: &[1],
1226                    v1: &[Some(0.3)],
1227                },
1228                BatchOutput {
1229                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1230                    timestamps: &[0],
1231                    v1: &[Some(0.0)],
1232                },
1233            ],
1234            (0, 1),
1235            true,
1236        );
1237
1238        check_mutations_to_record_batches(
1239            &[
1240                MutationInput {
1241                    k0: "a",
1242                    k1: 0,
1243                    timestamps: &[0],
1244                    v1: &[Some(0.1)],
1245                    sequence: 0,
1246                },
1247                MutationInput {
1248                    k0: "b",
1249                    k1: 0,
1250                    timestamps: &[0],
1251                    v1: &[Some(0.0)],
1252                    sequence: 0,
1253                },
1254                MutationInput {
1255                    k0: "a",
1256                    k1: 0,
1257                    timestamps: &[0],
1258                    v1: &[Some(0.2)],
1259                    sequence: 1,
1260                },
1261            ],
1262            &[
1263                BatchOutput {
1264                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1265                    timestamps: &[0],
1266                    v1: &[Some(0.2)],
1267                },
1268                BatchOutput {
1269                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1270                    timestamps: &[0],
1271                    v1: &[Some(0.0)],
1272                },
1273            ],
1274            (0, 0),
1275            true,
1276        );
1277        check_mutations_to_record_batches(
1278            &[
1279                MutationInput {
1280                    k0: "a",
1281                    k1: 0,
1282                    timestamps: &[0],
1283                    v1: &[Some(0.1)],
1284                    sequence: 0,
1285                },
1286                MutationInput {
1287                    k0: "b",
1288                    k1: 0,
1289                    timestamps: &[0],
1290                    v1: &[Some(0.0)],
1291                    sequence: 0,
1292                },
1293                MutationInput {
1294                    k0: "a",
1295                    k1: 0,
1296                    timestamps: &[0],
1297                    v1: &[Some(0.2)],
1298                    sequence: 1,
1299                },
1300            ],
1301            &[
1302                BatchOutput {
1303                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1304                    timestamps: &[0, 0],
1305                    v1: &[Some(0.2), Some(0.1)],
1306                },
1307                BatchOutput {
1308                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1309                    timestamps: &[0],
1310                    v1: &[Some(0.0)],
1311                },
1312            ],
1313            (0, 0),
1314            false,
1315        );
1316    }
1317
1318    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1319        let metadata = metadata_for_test();
1320        let kvs = input
1321            .iter()
1322            .map(|m| {
1323                build_key_values_with_ts_seq_values(
1324                    &metadata,
1325                    m.k0.to_string(),
1326                    m.k1,
1327                    m.timestamps.iter().copied(),
1328                    m.v1.iter().copied(),
1329                    m.sequence,
1330                )
1331            })
1332            .collect::<Vec<_>>();
1333        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1334        let primary_key_codec = build_primary_key_codec(&metadata);
1335        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1336        for kv in kvs {
1337            converter.append_key_values(&kv).unwrap();
1338        }
1339        let part = converter.convert().unwrap();
1340        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1341        encoder.encode_part(&part).unwrap().unwrap()
1342    }
1343
1344    #[test]
1345    fn test_write_and_read_part_projection() {
1346        let part = encode(&[
1347            MutationInput {
1348                k0: "a",
1349                k1: 0,
1350                timestamps: &[1],
1351                v1: &[Some(0.1)],
1352                sequence: 0,
1353            },
1354            MutationInput {
1355                k0: "b",
1356                k1: 0,
1357                timestamps: &[1],
1358                v1: &[Some(0.0)],
1359                sequence: 0,
1360            },
1361            MutationInput {
1362                k0: "a",
1363                k1: 0,
1364                timestamps: &[2],
1365                v1: &[Some(0.2)],
1366                sequence: 1,
1367            },
1368        ]);
1369
1370        let projection = &[4u32];
1371        let mut reader = part
1372            .read(
1373                Arc::new(BulkIterContext::new(
1374                    part.metadata.region_metadata.clone(),
1375                    &Some(projection.as_slice()),
1376                    None,
1377                )),
1378                None,
1379            )
1380            .unwrap()
1381            .expect("expect at least one row group");
1382
1383        let mut total_rows_read = 0;
1384        let mut field: Vec<f64> = vec![];
1385        for res in reader {
1386            let batch = res.unwrap();
1387            assert_eq!(5, batch.num_columns());
1388            field.extend_from_slice(
1389                batch
1390                    .column(0)
1391                    .as_any()
1392                    .downcast_ref::<Float64Array>()
1393                    .unwrap()
1394                    .values(),
1395            );
1396            total_rows_read += batch.num_rows();
1397        }
1398        assert_eq!(3, total_rows_read);
1399        assert_eq!(vec![0.1, 0.2, 0.0], field);
1400    }
1401
1402    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1403        let metadata = metadata_for_test();
1404        let kvs = key_values
1405            .into_iter()
1406            .map(|(k0, k1, (start, end), sequence)| {
1407                let ts = (start..end);
1408                let v1 = (start..end).map(|_| None);
1409                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1410            })
1411            .collect::<Vec<_>>();
1412        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1413        let primary_key_codec = build_primary_key_codec(&metadata);
1414        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1415        for kv in kvs {
1416            converter.append_key_values(&kv).unwrap();
1417        }
1418        let part = converter.convert().unwrap();
1419        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1420        encoder.encode_part(&part).unwrap().unwrap()
1421    }
1422
1423    fn check_prune_row_group(
1424        part: &EncodedBulkPart,
1425        predicate: Option<Predicate>,
1426        expected_rows: usize,
1427    ) {
1428        let context = Arc::new(BulkIterContext::new(
1429            part.metadata.region_metadata.clone(),
1430            &None,
1431            predicate,
1432        ));
1433        let mut reader = part
1434            .read(context, None)
1435            .unwrap()
1436            .expect("expect at least one row group");
1437        let mut total_rows_read = 0;
1438        for res in reader {
1439            let batch = res.unwrap();
1440            total_rows_read += batch.num_rows();
1441        }
1442        // Should only read row group 1.
1443        assert_eq!(expected_rows, total_rows_read);
1444    }
1445
1446    #[test]
1447    fn test_prune_row_groups() {
1448        let part = prepare(vec![
1449            ("a", 0, (0, 40), 1),
1450            ("a", 1, (0, 60), 1),
1451            ("b", 0, (0, 100), 2),
1452            ("b", 1, (100, 180), 3),
1453            ("b", 1, (180, 210), 4),
1454        ]);
1455
1456        let context = Arc::new(BulkIterContext::new(
1457            part.metadata.region_metadata.clone(),
1458            &None,
1459            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1460                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1461            )])),
1462        ));
1463        assert!(part.read(context, None).unwrap().is_none());
1464
1465        check_prune_row_group(&part, None, 310);
1466
1467        check_prune_row_group(
1468            &part,
1469            Some(Predicate::new(vec![
1470                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1471                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1472            ])),
1473            40,
1474        );
1475
1476        check_prune_row_group(
1477            &part,
1478            Some(Predicate::new(vec![
1479                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1480                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1481            ])),
1482            60,
1483        );
1484
1485        check_prune_row_group(
1486            &part,
1487            Some(Predicate::new(vec![
1488                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1489            ])),
1490            100,
1491        );
1492
1493        check_prune_row_group(
1494            &part,
1495            Some(Predicate::new(vec![
1496                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1497                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1498            ])),
1499            100,
1500        );
1501
1502        /// Predicates over field column can do precise filtering.
1503        check_prune_row_group(
1504            &part,
1505            Some(Predicate::new(vec![
1506                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1507            ])),
1508            1,
1509        );
1510    }
1511
1512    #[test]
1513    fn test_bulk_part_converter_append_and_convert() {
1514        let metadata = metadata_for_test();
1515        let capacity = 100;
1516        let primary_key_codec = build_primary_key_codec(&metadata);
1517        let schema = to_flat_sst_arrow_schema(
1518            &metadata,
1519            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1520        );
1521
1522        let mut converter =
1523            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1524
1525        let key_values1 = build_key_values_with_ts_seq_values(
1526            &metadata,
1527            "key1".to_string(),
1528            1u32,
1529            vec![1000, 2000].into_iter(),
1530            vec![Some(1.0), Some(2.0)].into_iter(),
1531            1,
1532        );
1533
1534        let key_values2 = build_key_values_with_ts_seq_values(
1535            &metadata,
1536            "key2".to_string(),
1537            2u32,
1538            vec![1500].into_iter(),
1539            vec![Some(3.0)].into_iter(),
1540            2,
1541        );
1542
1543        converter.append_key_values(&key_values1).unwrap();
1544        converter.append_key_values(&key_values2).unwrap();
1545
1546        let bulk_part = converter.convert().unwrap();
1547
1548        assert_eq!(bulk_part.num_rows(), 3);
1549        assert_eq!(bulk_part.min_timestamp, 1000);
1550        assert_eq!(bulk_part.max_timestamp, 2000);
1551        assert_eq!(bulk_part.sequence, 2);
1552        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1553
1554        // Validate primary key columns are stored
1555        // Schema should include primary key columns k0 and k1 at the beginning
1556        let schema = bulk_part.batch.schema();
1557        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1558        assert_eq!(
1559            field_names,
1560            vec![
1561                "k0",
1562                "k1",
1563                "v0",
1564                "v1",
1565                "ts",
1566                "__primary_key",
1567                "__sequence",
1568                "__op_type"
1569            ]
1570        );
1571    }
1572
1573    #[test]
1574    fn test_bulk_part_converter_sorting() {
1575        let metadata = metadata_for_test();
1576        let capacity = 100;
1577        let primary_key_codec = build_primary_key_codec(&metadata);
1578        let schema = to_flat_sst_arrow_schema(
1579            &metadata,
1580            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1581        );
1582
1583        let mut converter =
1584            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1585
1586        let key_values1 = build_key_values_with_ts_seq_values(
1587            &metadata,
1588            "z_key".to_string(),
1589            3u32,
1590            vec![3000].into_iter(),
1591            vec![Some(3.0)].into_iter(),
1592            3,
1593        );
1594
1595        let key_values2 = build_key_values_with_ts_seq_values(
1596            &metadata,
1597            "a_key".to_string(),
1598            1u32,
1599            vec![1000].into_iter(),
1600            vec![Some(1.0)].into_iter(),
1601            1,
1602        );
1603
1604        let key_values3 = build_key_values_with_ts_seq_values(
1605            &metadata,
1606            "m_key".to_string(),
1607            2u32,
1608            vec![2000].into_iter(),
1609            vec![Some(2.0)].into_iter(),
1610            2,
1611        );
1612
1613        converter.append_key_values(&key_values1).unwrap();
1614        converter.append_key_values(&key_values2).unwrap();
1615        converter.append_key_values(&key_values3).unwrap();
1616
1617        let bulk_part = converter.convert().unwrap();
1618
1619        assert_eq!(bulk_part.num_rows(), 3);
1620
1621        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1622        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1623
1624        let ts_array = ts_column
1625            .as_any()
1626            .downcast_ref::<TimestampMillisecondArray>()
1627            .unwrap();
1628        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1629
1630        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1631        assert_eq!(seq_array.values(), &[1, 2, 3]);
1632
1633        // Validate primary key columns are stored
1634        let schema = bulk_part.batch.schema();
1635        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1636        assert_eq!(
1637            field_names,
1638            vec![
1639                "k0",
1640                "k1",
1641                "v0",
1642                "v1",
1643                "ts",
1644                "__primary_key",
1645                "__sequence",
1646                "__op_type"
1647            ]
1648        );
1649    }
1650
1651    #[test]
1652    fn test_bulk_part_converter_empty() {
1653        let metadata = metadata_for_test();
1654        let capacity = 10;
1655        let primary_key_codec = build_primary_key_codec(&metadata);
1656        let schema = to_flat_sst_arrow_schema(
1657            &metadata,
1658            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1659        );
1660
1661        let converter =
1662            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1663
1664        let bulk_part = converter.convert().unwrap();
1665
1666        assert_eq!(bulk_part.num_rows(), 0);
1667        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1668        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1669        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1670
1671        // Validate primary key columns are present in schema even for empty batch
1672        let schema = bulk_part.batch.schema();
1673        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1674        assert_eq!(
1675            field_names,
1676            vec![
1677                "k0",
1678                "k1",
1679                "v0",
1680                "v1",
1681                "ts",
1682                "__primary_key",
1683                "__sequence",
1684                "__op_type"
1685            ]
1686        );
1687    }
1688
1689    #[test]
1690    fn test_bulk_part_converter_without_primary_key_columns() {
1691        let metadata = metadata_for_test();
1692        let primary_key_codec = build_primary_key_codec(&metadata);
1693        let schema = to_flat_sst_arrow_schema(
1694            &metadata,
1695            &FlatSchemaOptions {
1696                raw_pk_columns: false,
1697                string_pk_use_dict: true,
1698            },
1699        );
1700
1701        let capacity = 100;
1702        let mut converter =
1703            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1704
1705        let key_values1 = build_key_values_with_ts_seq_values(
1706            &metadata,
1707            "key1".to_string(),
1708            1u32,
1709            vec![1000, 2000].into_iter(),
1710            vec![Some(1.0), Some(2.0)].into_iter(),
1711            1,
1712        );
1713
1714        let key_values2 = build_key_values_with_ts_seq_values(
1715            &metadata,
1716            "key2".to_string(),
1717            2u32,
1718            vec![1500].into_iter(),
1719            vec![Some(3.0)].into_iter(),
1720            2,
1721        );
1722
1723        converter.append_key_values(&key_values1).unwrap();
1724        converter.append_key_values(&key_values2).unwrap();
1725
1726        let bulk_part = converter.convert().unwrap();
1727
1728        assert_eq!(bulk_part.num_rows(), 3);
1729        assert_eq!(bulk_part.min_timestamp, 1000);
1730        assert_eq!(bulk_part.max_timestamp, 2000);
1731        assert_eq!(bulk_part.sequence, 2);
1732        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1733
1734        // Validate primary key columns are NOT stored individually
1735        let schema = bulk_part.batch.schema();
1736        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1737        assert_eq!(
1738            field_names,
1739            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1740        );
1741    }
1742
1743    #[allow(clippy::too_many_arguments)]
1744    fn build_key_values_with_sparse_encoding(
1745        metadata: &RegionMetadataRef,
1746        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1747        table_id: u32,
1748        tsid: u64,
1749        k0: String,
1750        k1: String,
1751        timestamps: impl Iterator<Item = i64>,
1752        values: impl Iterator<Item = Option<f64>>,
1753        sequence: SequenceNumber,
1754    ) -> KeyValues {
1755        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1756        let pk_values = vec![
1757            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1758            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1759            (0, Value::String(k0.clone().into())),
1760            (1, Value::String(k1.clone().into())),
1761        ];
1762        let mut encoded_key = Vec::new();
1763        primary_key_codec
1764            .encode_values(&pk_values, &mut encoded_key)
1765            .unwrap();
1766        assert!(!encoded_key.is_empty());
1767
1768        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1769        let column_schema = vec![
1770            api::v1::ColumnSchema {
1771                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1772                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1773                    ConcreteDataType::binary_datatype(),
1774                )
1775                .unwrap()
1776                .datatype() as i32,
1777                semantic_type: api::v1::SemanticType::Tag as i32,
1778                ..Default::default()
1779            },
1780            api::v1::ColumnSchema {
1781                column_name: "ts".to_string(),
1782                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1783                    ConcreteDataType::timestamp_millisecond_datatype(),
1784                )
1785                .unwrap()
1786                .datatype() as i32,
1787                semantic_type: api::v1::SemanticType::Timestamp as i32,
1788                ..Default::default()
1789            },
1790            api::v1::ColumnSchema {
1791                column_name: "v0".to_string(),
1792                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1793                    ConcreteDataType::int64_datatype(),
1794                )
1795                .unwrap()
1796                .datatype() as i32,
1797                semantic_type: api::v1::SemanticType::Field as i32,
1798                ..Default::default()
1799            },
1800            api::v1::ColumnSchema {
1801                column_name: "v1".to_string(),
1802                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1803                    ConcreteDataType::float64_datatype(),
1804                )
1805                .unwrap()
1806                .datatype() as i32,
1807                semantic_type: api::v1::SemanticType::Field as i32,
1808                ..Default::default()
1809            },
1810        ];
1811
1812        let rows = timestamps
1813            .zip(values)
1814            .map(|(ts, v)| Row {
1815                values: vec![
1816                    api::v1::Value {
1817                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1818                            encoded_key.clone(),
1819                        )),
1820                    },
1821                    api::v1::Value {
1822                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1823                    },
1824                    api::v1::Value {
1825                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1826                    },
1827                    api::v1::Value {
1828                        value_data: v.map(api::v1::value::ValueData::F64Value),
1829                    },
1830                ],
1831            })
1832            .collect();
1833
1834        let mutation = api::v1::Mutation {
1835            op_type: 1,
1836            sequence,
1837            rows: Some(api::v1::Rows {
1838                schema: column_schema,
1839                rows,
1840            }),
1841            write_hint: Some(WriteHint {
1842                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1843            }),
1844        };
1845        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1846    }
1847
1848    #[test]
1849    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1850        use api::v1::SemanticType;
1851        use datatypes::schema::ColumnSchema;
1852        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1853        use store_api::storage::RegionId;
1854
1855        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1856        builder
1857            .push_column_metadata(ColumnMetadata {
1858                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1859                semantic_type: SemanticType::Tag,
1860                column_id: 0,
1861            })
1862            .push_column_metadata(ColumnMetadata {
1863                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1864                semantic_type: SemanticType::Tag,
1865                column_id: 1,
1866            })
1867            .push_column_metadata(ColumnMetadata {
1868                column_schema: ColumnSchema::new(
1869                    "ts",
1870                    ConcreteDataType::timestamp_millisecond_datatype(),
1871                    false,
1872                ),
1873                semantic_type: SemanticType::Timestamp,
1874                column_id: 2,
1875            })
1876            .push_column_metadata(ColumnMetadata {
1877                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1878                semantic_type: SemanticType::Field,
1879                column_id: 3,
1880            })
1881            .push_column_metadata(ColumnMetadata {
1882                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1883                semantic_type: SemanticType::Field,
1884                column_id: 4,
1885            })
1886            .primary_key(vec![0, 1])
1887            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1888        let metadata = Arc::new(builder.build().unwrap());
1889
1890        let primary_key_codec = build_primary_key_codec(&metadata);
1891        let schema = to_flat_sst_arrow_schema(
1892            &metadata,
1893            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1894        );
1895
1896        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1897        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1898
1899        let capacity = 100;
1900        let mut converter =
1901            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1902
1903        let key_values1 = build_key_values_with_sparse_encoding(
1904            &metadata,
1905            &primary_key_codec,
1906            2048u32, // table_id
1907            100u64,  // tsid
1908            "key11".to_string(),
1909            "key21".to_string(),
1910            vec![1000, 2000].into_iter(),
1911            vec![Some(1.0), Some(2.0)].into_iter(),
1912            1,
1913        );
1914
1915        let key_values2 = build_key_values_with_sparse_encoding(
1916            &metadata,
1917            &primary_key_codec,
1918            4096u32, // table_id
1919            200u64,  // tsid
1920            "key12".to_string(),
1921            "key22".to_string(),
1922            vec![1500].into_iter(),
1923            vec![Some(3.0)].into_iter(),
1924            2,
1925        );
1926
1927        converter.append_key_values(&key_values1).unwrap();
1928        converter.append_key_values(&key_values2).unwrap();
1929
1930        let bulk_part = converter.convert().unwrap();
1931
1932        assert_eq!(bulk_part.num_rows(), 3);
1933        assert_eq!(bulk_part.min_timestamp, 1000);
1934        assert_eq!(bulk_part.max_timestamp, 2000);
1935        assert_eq!(bulk_part.sequence, 2);
1936        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1937
1938        // For sparse encoding, primary key columns should NOT be stored individually
1939        // even when store_primary_key_columns is true, because sparse encoding
1940        // stores the encoded primary key in the __primary_key column
1941        let schema = bulk_part.batch.schema();
1942        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1943        assert_eq!(
1944            field_names,
1945            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1946        );
1947
1948        // Verify the __primary_key column contains encoded sparse keys
1949        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1950        let dict_array = primary_key_column
1951            .as_any()
1952            .downcast_ref::<DictionaryArray<UInt32Type>>()
1953            .unwrap();
1954
1955        // Should have non-zero entries indicating encoded primary keys
1956        assert!(!dict_array.is_empty());
1957        assert_eq!(dict_array.len(), 3); // 3 rows total
1958
1959        // Verify values are properly encoded binary data (not empty)
1960        let values = dict_array
1961            .values()
1962            .as_any()
1963            .downcast_ref::<BinaryArray>()
1964            .unwrap();
1965        for i in 0..values.len() {
1966            assert!(
1967                !values.value(i).is_empty(),
1968                "Encoded primary key should not be empty"
1969            );
1970        }
1971    }
1972}