mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use datatypes::arrow;
30use datatypes::arrow::array::{
31    Array, ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, DictionaryArray, StringBuilder,
32    StringDictionaryBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt8Builder, UInt32Array,
34    UInt64Array, UInt64Builder,
35};
36use datatypes::arrow::compute::{SortColumn, SortOptions, TakeOptions};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow_array::BinaryArray;
39use datatypes::data_type::DataType;
40use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
41use datatypes::value::{Value, ValueRef};
42use datatypes::vectors::Helper;
43use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
44use mito_codec::row_converter::{
45    DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, build_primary_key_codec,
46};
47use parquet::arrow::ArrowWriter;
48use parquet::basic::{Compression, ZstdLevel};
49use parquet::data_type::AsBytes;
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use snafu::{OptionExt, ResultExt, Snafu};
53use store_api::codec::PrimaryKeyEncoding;
54use store_api::metadata::{RegionMetadata, RegionMetadataRef};
55use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
56use store_api::storage::{FileId, SequenceNumber};
57use table::predicate::Predicate;
58
59use crate::error::{
60    self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
61    EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
62};
63use crate::memtable::BoxedRecordBatchIterator;
64use crate::memtable::bulk::context::BulkIterContextRef;
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::sst::index::IndexOutput;
68use crate::sst::parquet::flat_format::primary_key_column_index;
69use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
70use crate::sst::parquet::helper::parse_parquet_metadata;
71use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
72use crate::sst::to_sst_arrow_schema;
73
74const INIT_DICT_VALUE_CAPACITY: usize = 8;
75
76/// A raw bulk part in the memtable.
77#[derive(Clone)]
78pub struct BulkPart {
79    pub batch: RecordBatch,
80    pub max_timestamp: i64,
81    pub min_timestamp: i64,
82    pub sequence: u64,
83    pub timestamp_index: usize,
84    pub raw_data: Option<ArrowIpc>,
85}
86
87impl TryFrom<BulkWalEntry> for BulkPart {
88    type Error = error::Error;
89
90    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
91        match value.body.expect("Entry payload should be present") {
92            Body::ArrowIpc(ipc) => {
93                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
94                    .context(error::ConvertBulkWalEntrySnafu)?;
95                let batch = decoder
96                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
97                    .context(error::ConvertBulkWalEntrySnafu)?;
98                Ok(Self {
99                    batch,
100                    max_timestamp: value.max_ts,
101                    min_timestamp: value.min_ts,
102                    sequence: value.sequence,
103                    timestamp_index: value.timestamp_index as usize,
104                    raw_data: Some(ipc),
105                })
106            }
107        }
108    }
109}
110
111impl TryFrom<&BulkPart> for BulkWalEntry {
112    type Error = error::Error;
113
114    fn try_from(value: &BulkPart) -> Result<Self> {
115        if let Some(ipc) = &value.raw_data {
116            Ok(BulkWalEntry {
117                sequence: value.sequence,
118                max_ts: value.max_timestamp,
119                min_ts: value.min_timestamp,
120                timestamp_index: value.timestamp_index as u32,
121                body: Some(Body::ArrowIpc(ipc.clone())),
122            })
123        } else {
124            let mut encoder = FlightEncoder::default();
125            let schema_bytes = encoder
126                .encode_schema(value.batch.schema().as_ref())
127                .data_header;
128            let [rb_data] = encoder
129                .encode(FlightMessage::RecordBatch(value.batch.clone()))
130                .try_into()
131                .map_err(|_| {
132                    error::UnsupportedOperationSnafu {
133                        err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
134                    }
135                    .build()
136                })?;
137            Ok(BulkWalEntry {
138                sequence: value.sequence,
139                max_ts: value.max_timestamp,
140                min_ts: value.min_timestamp,
141                timestamp_index: value.timestamp_index as u32,
142                body: Some(Body::ArrowIpc(ArrowIpc {
143                    schema: schema_bytes,
144                    data_header: rb_data.data_header,
145                    payload: rb_data.data_body,
146                })),
147            })
148        }
149    }
150}
151
152impl BulkPart {
153    pub(crate) fn estimated_size(&self) -> usize {
154        record_batch_estimated_size(&self.batch)
155    }
156
157    /// Returns the estimated series count in this BulkPart.
158    /// This is calculated from the dictionary values count of the PrimaryKeyArray.
159    pub fn estimated_series_count(&self) -> usize {
160        let pk_column_idx = primary_key_column_index(self.batch.num_columns());
161        let pk_column = self.batch.column(pk_column_idx);
162        if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
163            dict_array.values().len()
164        } else {
165            0
166        }
167    }
168
169    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
170    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
171        let vectors = region_metadata
172            .schema
173            .column_schemas()
174            .iter()
175            .map(|col| match self.batch.column_by_name(&col.name) {
176                None => Ok(None),
177                Some(col) => Helper::try_into_vector(col).map(Some),
178            })
179            .collect::<datatypes::error::Result<Vec<_>>>()
180            .context(error::ComputeVectorSnafu)?;
181
182        let rows = (0..self.num_rows())
183            .map(|row_idx| {
184                let values = (0..self.batch.num_columns())
185                    .map(|col_idx| {
186                        if let Some(v) = &vectors[col_idx] {
187                            value_to_grpc_value(v.get(row_idx))
188                        } else {
189                            api::v1::Value { value_data: None }
190                        }
191                    })
192                    .collect::<Vec<_>>();
193                api::v1::Row { values }
194            })
195            .collect::<Vec<_>>();
196
197        let schema = region_metadata
198            .column_metadatas
199            .iter()
200            .map(|c| {
201                let data_type_wrapper =
202                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
203                Ok(api::v1::ColumnSchema {
204                    column_name: c.column_schema.name.clone(),
205                    datatype: data_type_wrapper.datatype() as i32,
206                    semantic_type: c.semantic_type as i32,
207                    ..Default::default()
208                })
209            })
210            .collect::<api::error::Result<Vec<_>>>()
211            .context(error::ConvertColumnDataTypeSnafu {
212                reason: "failed to convert region metadata to column schema",
213            })?;
214
215        let rows = api::v1::Rows { schema, rows };
216
217        Ok(Mutation {
218            op_type: OpType::Put as i32,
219            sequence: self.sequence,
220            rows: Some(rows),
221            write_hint: None,
222        })
223    }
224
225    pub fn timestamps(&self) -> &ArrayRef {
226        self.batch.column(self.timestamp_index)
227    }
228
229    pub fn num_rows(&self) -> usize {
230        self.batch.num_rows()
231    }
232}
233
234/// More accurate estimation of the size of a record batch.
235pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
236    batch
237        .columns()
238        .iter()
239        // If can not get slice memory size, assume 0 here.
240        .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
241        .sum()
242}
243
244/// Primary key column builder for handling strings specially.
245enum PrimaryKeyColumnBuilder {
246    /// String dictionary builder for string types.
247    StringDict(StringDictionaryBuilder<UInt32Type>),
248    /// Generic mutable vector for other types.
249    Vector(Box<dyn MutableVector>),
250}
251
252impl PrimaryKeyColumnBuilder {
253    /// Appends a value to the builder.
254    fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
255        match self {
256            PrimaryKeyColumnBuilder::StringDict(builder) => {
257                if let Some(s) = value.as_string().context(DataTypeMismatchSnafu)? {
258                    // We know the value is a string.
259                    builder.append_value(s);
260                } else {
261                    builder.append_null();
262                }
263            }
264            PrimaryKeyColumnBuilder::Vector(builder) => {
265                builder.push_value_ref(&value);
266            }
267        }
268        Ok(())
269    }
270
271    /// Converts the builder to an ArrayRef.
272    fn into_arrow_array(self) -> ArrayRef {
273        match self {
274            PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
275            PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
276        }
277    }
278}
279
280/// Converter that converts structs into [BulkPart].
281pub struct BulkPartConverter {
282    /// Region metadata.
283    region_metadata: RegionMetadataRef,
284    /// Schema of the converted batch.
285    schema: SchemaRef,
286    /// Primary key codec for encoding keys
287    primary_key_codec: Arc<dyn PrimaryKeyCodec>,
288    /// Buffer for encoding primary key.
289    key_buf: Vec<u8>,
290    /// Primary key array builder.
291    key_array_builder: PrimaryKeyArrayBuilder,
292    /// Builders for non-primary key columns.
293    value_builder: ValueBuilder,
294    /// Builders for individual primary key columns.
295    /// The order of builders is the same as the order of primary key columns in the region metadata.
296    primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
297
298    /// Max timestamp value.
299    max_ts: i64,
300    /// Min timestamp value.
301    min_ts: i64,
302    /// Max sequence number.
303    max_sequence: SequenceNumber,
304}
305
306impl BulkPartConverter {
307    /// Creates a new converter.
308    ///
309    /// If `store_primary_key_columns` is true and the encoding is not sparse encoding, it
310    /// stores primary key columns in arrays additionally.
311    pub fn new(
312        region_metadata: &RegionMetadataRef,
313        schema: SchemaRef,
314        capacity: usize,
315        primary_key_codec: Arc<dyn PrimaryKeyCodec>,
316        store_primary_key_columns: bool,
317    ) -> Self {
318        debug_assert_eq!(
319            region_metadata.primary_key_encoding,
320            primary_key_codec.encoding()
321        );
322
323        let primary_key_column_builders = if store_primary_key_columns
324            && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
325        {
326            new_primary_key_column_builders(region_metadata, capacity)
327        } else {
328            Vec::new()
329        };
330
331        Self {
332            region_metadata: region_metadata.clone(),
333            schema,
334            primary_key_codec,
335            key_buf: Vec::new(),
336            key_array_builder: PrimaryKeyArrayBuilder::new(),
337            value_builder: ValueBuilder::new(region_metadata, capacity),
338            primary_key_column_builders,
339            min_ts: i64::MAX,
340            max_ts: i64::MIN,
341            max_sequence: SequenceNumber::MIN,
342        }
343    }
344
345    /// Appends a [KeyValues] into the converter.
346    pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
347        for kv in key_values.iter() {
348            self.append_key_value(&kv)?;
349        }
350
351        Ok(())
352    }
353
354    /// Appends a [KeyValue] to builders.
355    ///
356    /// If the primary key uses sparse encoding, callers must encoded the primary key in the [KeyValue].
357    fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
358        // Handles primary key based on encoding type
359        if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
360            // For sparse encoding, the primary key is already encoded in the KeyValue
361            // Gets the first (and only) primary key value which contains the encoded key
362            let mut primary_keys = kv.primary_keys();
363            if let Some(encoded) = primary_keys
364                .next()
365                .context(ColumnNotFoundSnafu {
366                    column: PRIMARY_KEY_COLUMN_NAME,
367                })?
368                .as_binary()
369                .context(DataTypeMismatchSnafu)?
370            {
371                self.key_array_builder
372                    .append(encoded)
373                    .context(ComputeArrowSnafu)?;
374            } else {
375                self.key_array_builder
376                    .append("")
377                    .context(ComputeArrowSnafu)?;
378            }
379        } else {
380            // For dense encoding, we need to encode the primary key columns
381            self.key_buf.clear();
382            self.primary_key_codec
383                .encode_key_value(kv, &mut self.key_buf)
384                .context(EncodeSnafu)?;
385            self.key_array_builder
386                .append(&self.key_buf)
387                .context(ComputeArrowSnafu)?;
388        };
389
390        // If storing primary key columns, append values to individual builders
391        if !self.primary_key_column_builders.is_empty() {
392            for (builder, pk_value) in self
393                .primary_key_column_builders
394                .iter_mut()
395                .zip(kv.primary_keys())
396            {
397                builder.push_value_ref(pk_value)?;
398            }
399        }
400
401        // Pushes other columns.
402        self.value_builder.push(
403            kv.timestamp(),
404            kv.sequence(),
405            kv.op_type() as u8,
406            kv.fields(),
407        );
408
409        // Updates statistics
410        // Safety: timestamp of kv must be both present and a valid timestamp value.
411        let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
412        self.min_ts = self.min_ts.min(ts);
413        self.max_ts = self.max_ts.max(ts);
414        self.max_sequence = self.max_sequence.max(kv.sequence());
415
416        Ok(())
417    }
418
419    /// Converts buffered content into a [BulkPart].
420    ///
421    /// It sorts the record batch by (primary key, timestamp, sequence desc).
422    pub fn convert(mut self) -> Result<BulkPart> {
423        let values = Values::from(self.value_builder);
424        let mut columns =
425            Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
426
427        // Build primary key column arrays if enabled.
428        for builder in self.primary_key_column_builders {
429            columns.push(builder.into_arrow_array());
430        }
431        // Then fields columns.
432        columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
433        // Time index.
434        let timestamp_index = columns.len();
435        columns.push(values.timestamp.to_arrow_array());
436        // Primary key.
437        let pk_array = self.key_array_builder.finish();
438        columns.push(Arc::new(pk_array));
439        // Sequence and op type.
440        columns.push(values.sequence.to_arrow_array());
441        columns.push(values.op_type.to_arrow_array());
442
443        let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
444        // Sorts the record batch.
445        let batch = sort_primary_key_record_batch(&batch)?;
446
447        Ok(BulkPart {
448            batch,
449            max_timestamp: self.max_ts,
450            min_timestamp: self.min_ts,
451            sequence: self.max_sequence,
452            timestamp_index,
453            raw_data: None,
454        })
455    }
456}
457
458fn new_primary_key_column_builders(
459    metadata: &RegionMetadata,
460    capacity: usize,
461) -> Vec<PrimaryKeyColumnBuilder> {
462    metadata
463        .primary_key_columns()
464        .map(|col| {
465            if col.column_schema.data_type.is_string() {
466                PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
467                    capacity,
468                    INIT_DICT_VALUE_CAPACITY,
469                    capacity,
470                ))
471            } else {
472                PrimaryKeyColumnBuilder::Vector(
473                    col.column_schema.data_type.create_mutable_vector(capacity),
474                )
475            }
476        })
477        .collect()
478}
479
480/// Sorts the record batch with primary key format.
481fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
482    let total_columns = batch.num_columns();
483    let sort_columns = vec![
484        // Primary key column (ascending)
485        SortColumn {
486            values: batch.column(total_columns - 3).clone(),
487            options: Some(SortOptions {
488                descending: false,
489                nulls_first: true,
490            }),
491        },
492        // Time index column (ascending)
493        SortColumn {
494            values: batch.column(total_columns - 4).clone(),
495            options: Some(SortOptions {
496                descending: false,
497                nulls_first: true,
498            }),
499        },
500        // Sequence column (descending)
501        SortColumn {
502            values: batch.column(total_columns - 2).clone(),
503            options: Some(SortOptions {
504                descending: true,
505                nulls_first: true,
506            }),
507        },
508    ];
509
510    let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
511        .context(ComputeArrowSnafu)?;
512
513    datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
514}
515
516#[derive(Debug, Clone)]
517pub struct EncodedBulkPart {
518    data: Bytes,
519    metadata: BulkPartMeta,
520}
521
522impl EncodedBulkPart {
523    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
524        Self { data, metadata }
525    }
526
527    pub(crate) fn metadata(&self) -> &BulkPartMeta {
528        &self.metadata
529    }
530
531    /// Returns the size of the encoded data in bytes
532    pub(crate) fn size_bytes(&self) -> usize {
533        self.data.len()
534    }
535
536    /// Returns the encoded data.
537    pub(crate) fn data(&self) -> &Bytes {
538        &self.data
539    }
540
541    /// Converts this `EncodedBulkPart` to `SstInfo`.
542    ///
543    /// # Arguments
544    /// * `file_id` - The SST file ID to assign to this part
545    ///
546    /// # Returns
547    /// Returns a `SstInfo` instance with information derived from this bulk part's metadata
548    pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
549        let unit = self.metadata.region_metadata.time_index_type().unit();
550        SstInfo {
551            file_id,
552            time_range: (
553                Timestamp::new(self.metadata.min_timestamp, unit),
554                Timestamp::new(self.metadata.max_timestamp, unit),
555            ),
556            file_size: self.data.len() as u64,
557            num_rows: self.metadata.num_rows,
558            num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
559            file_metadata: Some(self.metadata.parquet_metadata.clone()),
560            index_metadata: IndexOutput::default(),
561        }
562    }
563
564    pub(crate) fn read(
565        &self,
566        context: BulkIterContextRef,
567        sequence: Option<SequenceNumber>,
568    ) -> Result<Option<BoxedRecordBatchIterator>> {
569        // use predicate to find row groups to read.
570        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
571
572        if row_groups_to_read.is_empty() {
573            // All row groups are filtered.
574            return Ok(None);
575        }
576
577        let iter = EncodedBulkPartIter::try_new(
578            context,
579            row_groups_to_read,
580            self.metadata.parquet_metadata.clone(),
581            self.data.clone(),
582            sequence,
583        )?;
584        Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
585    }
586}
587
588#[derive(Debug, Clone)]
589pub struct BulkPartMeta {
590    /// Total rows in part.
591    pub num_rows: usize,
592    /// Max timestamp in part.
593    pub max_timestamp: i64,
594    /// Min timestamp in part.
595    pub min_timestamp: i64,
596    /// Part file metadata.
597    pub parquet_metadata: Arc<ParquetMetaData>,
598    /// Part region schema.
599    pub region_metadata: RegionMetadataRef,
600}
601
602/// Metrics for encoding a part.
603#[derive(Default, Debug)]
604pub struct BulkPartEncodeMetrics {
605    /// Cost of iterating over the data.
606    pub iter_cost: Duration,
607    /// Cost of writing the data.
608    pub write_cost: Duration,
609    /// Size of data before encoding.
610    pub raw_size: usize,
611    /// Size of data after encoding.
612    pub encoded_size: usize,
613    /// Number of rows in part.
614    pub num_rows: usize,
615}
616
617pub struct BulkPartEncoder {
618    metadata: RegionMetadataRef,
619    row_group_size: usize,
620    writer_props: Option<WriterProperties>,
621}
622
623impl BulkPartEncoder {
624    pub(crate) fn new(
625        metadata: RegionMetadataRef,
626        row_group_size: usize,
627    ) -> Result<BulkPartEncoder> {
628        // TODO(yingwen): Skip arrow schema if needed.
629        let json = metadata.to_json().context(InvalidMetadataSnafu)?;
630        let key_value_meta =
631            parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
632
633        // TODO(yingwen): Do we need compression?
634        let writer_props = Some(
635            WriterProperties::builder()
636                .set_key_value_metadata(Some(vec![key_value_meta]))
637                .set_write_batch_size(row_group_size)
638                .set_max_row_group_size(row_group_size)
639                .set_compression(Compression::ZSTD(ZstdLevel::default()))
640                .set_column_index_truncate_length(None)
641                .set_statistics_truncate_length(None)
642                .build(),
643        );
644
645        Ok(Self {
646            metadata,
647            row_group_size,
648            writer_props,
649        })
650    }
651}
652
653impl BulkPartEncoder {
654    /// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
655    pub fn encode_record_batch_iter(
656        &self,
657        iter: BoxedRecordBatchIterator,
658        arrow_schema: SchemaRef,
659        min_timestamp: i64,
660        max_timestamp: i64,
661        metrics: &mut BulkPartEncodeMetrics,
662    ) -> Result<Option<EncodedBulkPart>> {
663        let mut buf = Vec::with_capacity(4096);
664        let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
665            .context(EncodeMemtableSnafu)?;
666        let mut total_rows = 0;
667
668        // Process each batch from the iterator
669        let mut iter_start = Instant::now();
670        for batch_result in iter {
671            metrics.iter_cost += iter_start.elapsed();
672            let batch = batch_result?;
673            if batch.num_rows() == 0 {
674                continue;
675            }
676
677            metrics.raw_size += record_batch_estimated_size(&batch);
678            let write_start = Instant::now();
679            writer.write(&batch).context(EncodeMemtableSnafu)?;
680            metrics.write_cost += write_start.elapsed();
681            total_rows += batch.num_rows();
682            iter_start = Instant::now();
683        }
684        metrics.iter_cost += iter_start.elapsed();
685        iter_start = Instant::now();
686
687        if total_rows == 0 {
688            return Ok(None);
689        }
690
691        let close_start = Instant::now();
692        let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
693        metrics.write_cost += close_start.elapsed();
694        metrics.encoded_size += buf.len();
695        metrics.num_rows += total_rows;
696
697        let buf = Bytes::from(buf);
698        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
699
700        Ok(Some(EncodedBulkPart {
701            data: buf,
702            metadata: BulkPartMeta {
703                num_rows: total_rows,
704                max_timestamp,
705                min_timestamp,
706                parquet_metadata,
707                region_metadata: self.metadata.clone(),
708            },
709        }))
710    }
711
712    /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
713    fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
714        if part.batch.num_rows() == 0 {
715            return Ok(None);
716        }
717
718        let mut buf = Vec::with_capacity(4096);
719        let arrow_schema = part.batch.schema();
720
721        let file_metadata = {
722            let mut writer =
723                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
724                    .context(EncodeMemtableSnafu)?;
725            writer.write(&part.batch).context(EncodeMemtableSnafu)?;
726            writer.finish().context(EncodeMemtableSnafu)?
727        };
728
729        let buf = Bytes::from(buf);
730        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
731
732        Ok(Some(EncodedBulkPart {
733            data: buf,
734            metadata: BulkPartMeta {
735                num_rows: part.batch.num_rows(),
736                max_timestamp: part.max_timestamp,
737                min_timestamp: part.min_timestamp,
738                parquet_metadata,
739                region_metadata: self.metadata.clone(),
740            },
741        }))
742    }
743}
744
745/// Converts mutations to record batches.
746fn mutations_to_record_batch(
747    mutations: &[Mutation],
748    metadata: &RegionMetadataRef,
749    pk_encoder: &DensePrimaryKeyCodec,
750    dedup: bool,
751) -> Result<Option<(RecordBatch, i64, i64)>> {
752    let total_rows: usize = mutations
753        .iter()
754        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
755        .sum();
756
757    if total_rows == 0 {
758        return Ok(None);
759    }
760
761    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
762
763    let mut ts_vector: Box<dyn MutableVector> = metadata
764        .time_index_column()
765        .column_schema
766        .data_type
767        .create_mutable_vector(total_rows);
768    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
769    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
770
771    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
772        .field_columns()
773        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
774        .collect();
775
776    let mut pk_buffer = vec![];
777    for m in mutations {
778        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
779            continue;
780        };
781
782        for row in key_values.iter() {
783            pk_buffer.clear();
784            pk_encoder
785                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
786                .context(EncodeSnafu)?;
787            pk_builder.append_value(pk_buffer.as_bytes());
788            ts_vector.push_value_ref(&row.timestamp());
789            sequence_builder.append_value(row.sequence());
790            op_type_builder.append_value(row.op_type() as u8);
791            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
792                builder.push_value_ref(&field);
793            }
794        }
795    }
796
797    let arrow_schema = to_sst_arrow_schema(metadata);
798    // safety: timestamp column must be valid, and values must not be None.
799    let timestamp_unit = metadata
800        .time_index_column()
801        .column_schema
802        .data_type
803        .as_timestamp()
804        .unwrap()
805        .unit();
806    let sorter = ArraysSorter {
807        encoded_primary_keys: pk_builder.finish(),
808        timestamp_unit,
809        timestamp: ts_vector.to_vector().to_arrow_array(),
810        sequence: sequence_builder.finish(),
811        op_type: op_type_builder.finish(),
812        fields: field_builders
813            .iter_mut()
814            .map(|f| f.to_vector().to_arrow_array()),
815        dedup,
816        arrow_schema,
817    };
818
819    sorter.sort().map(Some)
820}
821
822struct ArraysSorter<I> {
823    encoded_primary_keys: BinaryArray,
824    timestamp_unit: TimeUnit,
825    timestamp: ArrayRef,
826    sequence: UInt64Array,
827    op_type: UInt8Array,
828    fields: I,
829    dedup: bool,
830    arrow_schema: SchemaRef,
831}
832
833impl<I> ArraysSorter<I>
834where
835    I: Iterator<Item = ArrayRef>,
836{
837    /// Converts arrays to record batch.
838    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
839        debug_assert!(!self.timestamp.is_empty());
840        debug_assert!(self.timestamp.len() == self.sequence.len());
841        debug_assert!(self.timestamp.len() == self.op_type.len());
842        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
843
844        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
845        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
846        let mut to_sort = self
847            .encoded_primary_keys
848            .iter()
849            .zip(timestamp_iter)
850            .zip(self.sequence.iter())
851            .map(|((pk, timestamp), sequence)| {
852                max_timestamp = max_timestamp.max(*timestamp);
853                min_timestamp = min_timestamp.min(*timestamp);
854                (pk, timestamp, sequence)
855            })
856            .enumerate()
857            .collect::<Vec<_>>();
858
859        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
860            l_pk.cmp(r_pk)
861                .then(l_ts.cmp(r_ts))
862                .then(l_seq.cmp(r_seq).reverse())
863        });
864
865        if self.dedup {
866            // Dedup by timestamps while ignore sequence.
867            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
868                l_pk == r_pk && l_ts == r_ts
869            });
870        }
871
872        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
873
874        let pk_dictionary = Arc::new(binary_array_to_dictionary(
875            // safety: pk must be BinaryArray
876            arrow::compute::take(
877                &self.encoded_primary_keys,
878                &indices,
879                Some(TakeOptions {
880                    check_bounds: false,
881                }),
882            )
883            .context(ComputeArrowSnafu)?
884            .as_any()
885            .downcast_ref::<BinaryArray>()
886            .unwrap(),
887        )?) as ArrayRef;
888
889        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
890        for arr in self.fields {
891            arrays.push(
892                arrow::compute::take(
893                    &arr,
894                    &indices,
895                    Some(TakeOptions {
896                        check_bounds: false,
897                    }),
898                )
899                .context(ComputeArrowSnafu)?,
900            );
901        }
902
903        let timestamp = arrow::compute::take(
904            &self.timestamp,
905            &indices,
906            Some(TakeOptions {
907                check_bounds: false,
908            }),
909        )
910        .context(ComputeArrowSnafu)?;
911
912        arrays.push(timestamp);
913        arrays.push(pk_dictionary);
914        arrays.push(
915            arrow::compute::take(
916                &self.sequence,
917                &indices,
918                Some(TakeOptions {
919                    check_bounds: false,
920                }),
921            )
922            .context(ComputeArrowSnafu)?,
923        );
924
925        arrays.push(
926            arrow::compute::take(
927                &self.op_type,
928                &indices,
929                Some(TakeOptions {
930                    check_bounds: false,
931                }),
932            )
933            .context(ComputeArrowSnafu)?,
934        );
935
936        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
937        Ok((batch, min_timestamp, max_timestamp))
938    }
939}
940
941/// Converts timestamp array to an iter of i64 values.
942fn timestamp_array_to_iter(
943    timestamp_unit: TimeUnit,
944    timestamp: &ArrayRef,
945) -> impl Iterator<Item = &i64> {
946    match timestamp_unit {
947        // safety: timestamp column must be valid.
948        TimeUnit::Second => timestamp
949            .as_any()
950            .downcast_ref::<TimestampSecondArray>()
951            .unwrap()
952            .values()
953            .iter(),
954        TimeUnit::Millisecond => timestamp
955            .as_any()
956            .downcast_ref::<TimestampMillisecondArray>()
957            .unwrap()
958            .values()
959            .iter(),
960        TimeUnit::Microsecond => timestamp
961            .as_any()
962            .downcast_ref::<TimestampMicrosecondArray>()
963            .unwrap()
964            .values()
965            .iter(),
966        TimeUnit::Nanosecond => timestamp
967            .as_any()
968            .downcast_ref::<TimestampNanosecondArray>()
969            .unwrap()
970            .values()
971            .iter(),
972    }
973}
974
975/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
976fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
977    if input.is_empty() {
978        return Ok(DictionaryArray::new(
979            UInt32Array::from(Vec::<u32>::new()),
980            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
981        ));
982    }
983    let mut keys = Vec::with_capacity(16);
984    let mut values = BinaryBuilder::new();
985    let mut prev: usize = 0;
986    keys.push(prev as u32);
987    values.append_value(input.value(prev));
988
989    for current_bytes in input.iter().skip(1) {
990        // safety: encoded pk must present.
991        let current_bytes = current_bytes.unwrap();
992        let prev_bytes = input.value(prev);
993        if current_bytes != prev_bytes {
994            values.append_value(current_bytes);
995            prev += 1;
996        }
997        keys.push(prev as u32);
998    }
999
1000    Ok(DictionaryArray::new(
1001        UInt32Array::from(keys),
1002        Arc::new(values.finish()) as ArrayRef,
1003    ))
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use std::collections::VecDeque;
1009
1010    use api::v1::{Row, WriteHint};
1011    use datafusion_common::ScalarValue;
1012    use datatypes::arrow::array::Float64Array;
1013    use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
1014    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
1015    use store_api::storage::consts::ReservedColumnId;
1016
1017    use super::*;
1018    use crate::memtable::bulk::context::BulkIterContext;
1019    use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
1020    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1021    use crate::test_util::memtable_util::{
1022        build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
1023    };
1024
1025    fn check_binary_array_to_dictionary(
1026        input: &[&[u8]],
1027        expected_keys: &[u32],
1028        expected_values: &[&[u8]],
1029    ) {
1030        let input = BinaryArray::from_iter_values(input.iter());
1031        let array = binary_array_to_dictionary(&input).unwrap();
1032        assert_eq!(
1033            &expected_keys,
1034            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
1035        );
1036        assert_eq!(
1037            expected_values,
1038            &array
1039                .values()
1040                .as_any()
1041                .downcast_ref::<BinaryArray>()
1042                .unwrap()
1043                .iter()
1044                .map(|v| v.unwrap())
1045                .collect::<Vec<_>>()
1046        );
1047    }
1048
1049    #[test]
1050    fn test_binary_array_to_dictionary() {
1051        check_binary_array_to_dictionary(&[], &[], &[]);
1052
1053        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
1054
1055        check_binary_array_to_dictionary(
1056            &["a".as_bytes(), "a".as_bytes()],
1057            &[0, 0],
1058            &["a".as_bytes()],
1059        );
1060
1061        check_binary_array_to_dictionary(
1062            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
1063            &[0, 0, 1],
1064            &["a".as_bytes(), "b".as_bytes()],
1065        );
1066
1067        check_binary_array_to_dictionary(
1068            &[
1069                "a".as_bytes(),
1070                "a".as_bytes(),
1071                "b".as_bytes(),
1072                "c".as_bytes(),
1073            ],
1074            &[0, 0, 1, 2],
1075            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
1076        );
1077    }
1078
1079    struct MutationInput<'a> {
1080        k0: &'a str,
1081        k1: u32,
1082        timestamps: &'a [i64],
1083        v1: &'a [Option<f64>],
1084        sequence: u64,
1085    }
1086
1087    #[derive(Debug, PartialOrd, PartialEq)]
1088    struct BatchOutput<'a> {
1089        pk_values: &'a [Value],
1090        timestamps: &'a [i64],
1091        v1: &'a [Option<f64>],
1092    }
1093
1094    fn check_mutations_to_record_batches(
1095        input: &[MutationInput],
1096        expected: &[BatchOutput],
1097        expected_timestamp: (i64, i64),
1098        dedup: bool,
1099    ) {
1100        let metadata = metadata_for_test();
1101        let mutations = input
1102            .iter()
1103            .map(|m| {
1104                build_key_values_with_ts_seq_values(
1105                    &metadata,
1106                    m.k0.to_string(),
1107                    m.k1,
1108                    m.timestamps.iter().copied(),
1109                    m.v1.iter().copied(),
1110                    m.sequence,
1111                )
1112                .mutation
1113            })
1114            .collect::<Vec<_>>();
1115        let total_rows: usize = mutations
1116            .iter()
1117            .flat_map(|m| m.rows.iter())
1118            .map(|r| r.rows.len())
1119            .sum();
1120
1121        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
1122
1123        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
1124            .unwrap()
1125            .unwrap();
1126        let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
1127        let mut batches = VecDeque::new();
1128        read_format
1129            .convert_record_batch(&batch, None, &mut batches)
1130            .unwrap();
1131        if !dedup {
1132            assert_eq!(
1133                total_rows,
1134                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
1135            );
1136        }
1137        let batch_values = batches
1138            .into_iter()
1139            .map(|b| {
1140                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
1141                let timestamps = b
1142                    .timestamps()
1143                    .as_any()
1144                    .downcast_ref::<TimestampMillisecondVector>()
1145                    .unwrap()
1146                    .iter_data()
1147                    .map(|v| v.unwrap().0.value())
1148                    .collect::<Vec<_>>();
1149                let float_values = b.fields()[1]
1150                    .data
1151                    .as_any()
1152                    .downcast_ref::<Float64Vector>()
1153                    .unwrap()
1154                    .iter_data()
1155                    .collect::<Vec<_>>();
1156
1157                (pk_values, timestamps, float_values)
1158            })
1159            .collect::<Vec<_>>();
1160        assert_eq!(expected.len(), batch_values.len());
1161
1162        for idx in 0..expected.len() {
1163            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
1164            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
1165            assert_eq!(expected[idx].v1, &batch_values[idx].2);
1166        }
1167    }
1168
1169    #[test]
1170    fn test_mutations_to_record_batch() {
1171        check_mutations_to_record_batches(
1172            &[MutationInput {
1173                k0: "a",
1174                k1: 0,
1175                timestamps: &[0],
1176                v1: &[Some(0.1)],
1177                sequence: 0,
1178            }],
1179            &[BatchOutput {
1180                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1181                timestamps: &[0],
1182                v1: &[Some(0.1)],
1183            }],
1184            (0, 0),
1185            true,
1186        );
1187
1188        check_mutations_to_record_batches(
1189            &[
1190                MutationInput {
1191                    k0: "a",
1192                    k1: 0,
1193                    timestamps: &[0],
1194                    v1: &[Some(0.1)],
1195                    sequence: 0,
1196                },
1197                MutationInput {
1198                    k0: "b",
1199                    k1: 0,
1200                    timestamps: &[0],
1201                    v1: &[Some(0.0)],
1202                    sequence: 0,
1203                },
1204                MutationInput {
1205                    k0: "a",
1206                    k1: 0,
1207                    timestamps: &[1],
1208                    v1: &[Some(0.2)],
1209                    sequence: 1,
1210                },
1211                MutationInput {
1212                    k0: "a",
1213                    k1: 1,
1214                    timestamps: &[1],
1215                    v1: &[Some(0.3)],
1216                    sequence: 2,
1217                },
1218            ],
1219            &[
1220                BatchOutput {
1221                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1222                    timestamps: &[0, 1],
1223                    v1: &[Some(0.1), Some(0.2)],
1224                },
1225                BatchOutput {
1226                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
1227                    timestamps: &[1],
1228                    v1: &[Some(0.3)],
1229                },
1230                BatchOutput {
1231                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1232                    timestamps: &[0],
1233                    v1: &[Some(0.0)],
1234                },
1235            ],
1236            (0, 1),
1237            true,
1238        );
1239
1240        check_mutations_to_record_batches(
1241            &[
1242                MutationInput {
1243                    k0: "a",
1244                    k1: 0,
1245                    timestamps: &[0],
1246                    v1: &[Some(0.1)],
1247                    sequence: 0,
1248                },
1249                MutationInput {
1250                    k0: "b",
1251                    k1: 0,
1252                    timestamps: &[0],
1253                    v1: &[Some(0.0)],
1254                    sequence: 0,
1255                },
1256                MutationInput {
1257                    k0: "a",
1258                    k1: 0,
1259                    timestamps: &[0],
1260                    v1: &[Some(0.2)],
1261                    sequence: 1,
1262                },
1263            ],
1264            &[
1265                BatchOutput {
1266                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1267                    timestamps: &[0],
1268                    v1: &[Some(0.2)],
1269                },
1270                BatchOutput {
1271                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1272                    timestamps: &[0],
1273                    v1: &[Some(0.0)],
1274                },
1275            ],
1276            (0, 0),
1277            true,
1278        );
1279        check_mutations_to_record_batches(
1280            &[
1281                MutationInput {
1282                    k0: "a",
1283                    k1: 0,
1284                    timestamps: &[0],
1285                    v1: &[Some(0.1)],
1286                    sequence: 0,
1287                },
1288                MutationInput {
1289                    k0: "b",
1290                    k1: 0,
1291                    timestamps: &[0],
1292                    v1: &[Some(0.0)],
1293                    sequence: 0,
1294                },
1295                MutationInput {
1296                    k0: "a",
1297                    k1: 0,
1298                    timestamps: &[0],
1299                    v1: &[Some(0.2)],
1300                    sequence: 1,
1301                },
1302            ],
1303            &[
1304                BatchOutput {
1305                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
1306                    timestamps: &[0, 0],
1307                    v1: &[Some(0.2), Some(0.1)],
1308                },
1309                BatchOutput {
1310                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
1311                    timestamps: &[0],
1312                    v1: &[Some(0.0)],
1313                },
1314            ],
1315            (0, 0),
1316            false,
1317        );
1318    }
1319
1320    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1321        let metadata = metadata_for_test();
1322        let kvs = input
1323            .iter()
1324            .map(|m| {
1325                build_key_values_with_ts_seq_values(
1326                    &metadata,
1327                    m.k0.to_string(),
1328                    m.k1,
1329                    m.timestamps.iter().copied(),
1330                    m.v1.iter().copied(),
1331                    m.sequence,
1332                )
1333            })
1334            .collect::<Vec<_>>();
1335        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1336        let primary_key_codec = build_primary_key_codec(&metadata);
1337        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1338        for kv in kvs {
1339            converter.append_key_values(&kv).unwrap();
1340        }
1341        let part = converter.convert().unwrap();
1342        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1343        encoder.encode_part(&part).unwrap().unwrap()
1344    }
1345
1346    #[test]
1347    fn test_write_and_read_part_projection() {
1348        let part = encode(&[
1349            MutationInput {
1350                k0: "a",
1351                k1: 0,
1352                timestamps: &[1],
1353                v1: &[Some(0.1)],
1354                sequence: 0,
1355            },
1356            MutationInput {
1357                k0: "b",
1358                k1: 0,
1359                timestamps: &[1],
1360                v1: &[Some(0.0)],
1361                sequence: 0,
1362            },
1363            MutationInput {
1364                k0: "a",
1365                k1: 0,
1366                timestamps: &[2],
1367                v1: &[Some(0.2)],
1368                sequence: 1,
1369            },
1370        ]);
1371
1372        let projection = &[4u32];
1373        let mut reader = part
1374            .read(
1375                Arc::new(
1376                    BulkIterContext::new(
1377                        part.metadata.region_metadata.clone(),
1378                        Some(projection.as_slice()),
1379                        None,
1380                        false,
1381                    )
1382                    .unwrap(),
1383                ),
1384                None,
1385            )
1386            .unwrap()
1387            .expect("expect at least one row group");
1388
1389        let mut total_rows_read = 0;
1390        let mut field: Vec<f64> = vec![];
1391        for res in reader {
1392            let batch = res.unwrap();
1393            assert_eq!(5, batch.num_columns());
1394            field.extend_from_slice(
1395                batch
1396                    .column(0)
1397                    .as_any()
1398                    .downcast_ref::<Float64Array>()
1399                    .unwrap()
1400                    .values(),
1401            );
1402            total_rows_read += batch.num_rows();
1403        }
1404        assert_eq!(3, total_rows_read);
1405        assert_eq!(vec![0.1, 0.2, 0.0], field);
1406    }
1407
1408    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1409        let metadata = metadata_for_test();
1410        let kvs = key_values
1411            .into_iter()
1412            .map(|(k0, k1, (start, end), sequence)| {
1413                let ts = (start..end);
1414                let v1 = (start..end).map(|_| None);
1415                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1416            })
1417            .collect::<Vec<_>>();
1418        let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1419        let primary_key_codec = build_primary_key_codec(&metadata);
1420        let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1421        for kv in kvs {
1422            converter.append_key_values(&kv).unwrap();
1423        }
1424        let part = converter.convert().unwrap();
1425        let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1426        encoder.encode_part(&part).unwrap().unwrap()
1427    }
1428
1429    fn check_prune_row_group(
1430        part: &EncodedBulkPart,
1431        predicate: Option<Predicate>,
1432        expected_rows: usize,
1433    ) {
1434        let context = Arc::new(
1435            BulkIterContext::new(
1436                part.metadata.region_metadata.clone(),
1437                None,
1438                predicate,
1439                false,
1440            )
1441            .unwrap(),
1442        );
1443        let mut reader = part
1444            .read(context, None)
1445            .unwrap()
1446            .expect("expect at least one row group");
1447        let mut total_rows_read = 0;
1448        for res in reader {
1449            let batch = res.unwrap();
1450            total_rows_read += batch.num_rows();
1451        }
1452        // Should only read row group 1.
1453        assert_eq!(expected_rows, total_rows_read);
1454    }
1455
1456    #[test]
1457    fn test_prune_row_groups() {
1458        let part = prepare(vec![
1459            ("a", 0, (0, 40), 1),
1460            ("a", 1, (0, 60), 1),
1461            ("b", 0, (0, 100), 2),
1462            ("b", 1, (100, 180), 3),
1463            ("b", 1, (180, 210), 4),
1464        ]);
1465
1466        let context = Arc::new(
1467            BulkIterContext::new(
1468                part.metadata.region_metadata.clone(),
1469                None,
1470                Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1471                    datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1472                )])),
1473                false,
1474            )
1475            .unwrap(),
1476        );
1477        assert!(part.read(context, None).unwrap().is_none());
1478
1479        check_prune_row_group(&part, None, 310);
1480
1481        check_prune_row_group(
1482            &part,
1483            Some(Predicate::new(vec![
1484                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1485                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1486            ])),
1487            40,
1488        );
1489
1490        check_prune_row_group(
1491            &part,
1492            Some(Predicate::new(vec![
1493                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1494                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1495            ])),
1496            60,
1497        );
1498
1499        check_prune_row_group(
1500            &part,
1501            Some(Predicate::new(vec![
1502                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1503            ])),
1504            100,
1505        );
1506
1507        check_prune_row_group(
1508            &part,
1509            Some(Predicate::new(vec![
1510                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1511                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1512            ])),
1513            100,
1514        );
1515
1516        /// Predicates over field column can do precise filtering.
1517        check_prune_row_group(
1518            &part,
1519            Some(Predicate::new(vec![
1520                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1521            ])),
1522            1,
1523        );
1524    }
1525
1526    #[test]
1527    fn test_bulk_part_converter_append_and_convert() {
1528        let metadata = metadata_for_test();
1529        let capacity = 100;
1530        let primary_key_codec = build_primary_key_codec(&metadata);
1531        let schema = to_flat_sst_arrow_schema(
1532            &metadata,
1533            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1534        );
1535
1536        let mut converter =
1537            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1538
1539        let key_values1 = build_key_values_with_ts_seq_values(
1540            &metadata,
1541            "key1".to_string(),
1542            1u32,
1543            vec![1000, 2000].into_iter(),
1544            vec![Some(1.0), Some(2.0)].into_iter(),
1545            1,
1546        );
1547
1548        let key_values2 = build_key_values_with_ts_seq_values(
1549            &metadata,
1550            "key2".to_string(),
1551            2u32,
1552            vec![1500].into_iter(),
1553            vec![Some(3.0)].into_iter(),
1554            2,
1555        );
1556
1557        converter.append_key_values(&key_values1).unwrap();
1558        converter.append_key_values(&key_values2).unwrap();
1559
1560        let bulk_part = converter.convert().unwrap();
1561
1562        assert_eq!(bulk_part.num_rows(), 3);
1563        assert_eq!(bulk_part.min_timestamp, 1000);
1564        assert_eq!(bulk_part.max_timestamp, 2000);
1565        assert_eq!(bulk_part.sequence, 2);
1566        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1567
1568        // Validate primary key columns are stored
1569        // Schema should include primary key columns k0 and k1 at the beginning
1570        let schema = bulk_part.batch.schema();
1571        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1572        assert_eq!(
1573            field_names,
1574            vec![
1575                "k0",
1576                "k1",
1577                "v0",
1578                "v1",
1579                "ts",
1580                "__primary_key",
1581                "__sequence",
1582                "__op_type"
1583            ]
1584        );
1585    }
1586
1587    #[test]
1588    fn test_bulk_part_converter_sorting() {
1589        let metadata = metadata_for_test();
1590        let capacity = 100;
1591        let primary_key_codec = build_primary_key_codec(&metadata);
1592        let schema = to_flat_sst_arrow_schema(
1593            &metadata,
1594            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1595        );
1596
1597        let mut converter =
1598            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1599
1600        let key_values1 = build_key_values_with_ts_seq_values(
1601            &metadata,
1602            "z_key".to_string(),
1603            3u32,
1604            vec![3000].into_iter(),
1605            vec![Some(3.0)].into_iter(),
1606            3,
1607        );
1608
1609        let key_values2 = build_key_values_with_ts_seq_values(
1610            &metadata,
1611            "a_key".to_string(),
1612            1u32,
1613            vec![1000].into_iter(),
1614            vec![Some(1.0)].into_iter(),
1615            1,
1616        );
1617
1618        let key_values3 = build_key_values_with_ts_seq_values(
1619            &metadata,
1620            "m_key".to_string(),
1621            2u32,
1622            vec![2000].into_iter(),
1623            vec![Some(2.0)].into_iter(),
1624            2,
1625        );
1626
1627        converter.append_key_values(&key_values1).unwrap();
1628        converter.append_key_values(&key_values2).unwrap();
1629        converter.append_key_values(&key_values3).unwrap();
1630
1631        let bulk_part = converter.convert().unwrap();
1632
1633        assert_eq!(bulk_part.num_rows(), 3);
1634
1635        let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1636        let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1637
1638        let ts_array = ts_column
1639            .as_any()
1640            .downcast_ref::<TimestampMillisecondArray>()
1641            .unwrap();
1642        let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
1643
1644        assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
1645        assert_eq!(seq_array.values(), &[1, 2, 3]);
1646
1647        // Validate primary key columns are stored
1648        let schema = bulk_part.batch.schema();
1649        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1650        assert_eq!(
1651            field_names,
1652            vec![
1653                "k0",
1654                "k1",
1655                "v0",
1656                "v1",
1657                "ts",
1658                "__primary_key",
1659                "__sequence",
1660                "__op_type"
1661            ]
1662        );
1663    }
1664
1665    #[test]
1666    fn test_bulk_part_converter_empty() {
1667        let metadata = metadata_for_test();
1668        let capacity = 10;
1669        let primary_key_codec = build_primary_key_codec(&metadata);
1670        let schema = to_flat_sst_arrow_schema(
1671            &metadata,
1672            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1673        );
1674
1675        let converter =
1676            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1677
1678        let bulk_part = converter.convert().unwrap();
1679
1680        assert_eq!(bulk_part.num_rows(), 0);
1681        assert_eq!(bulk_part.min_timestamp, i64::MAX);
1682        assert_eq!(bulk_part.max_timestamp, i64::MIN);
1683        assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
1684
1685        // Validate primary key columns are present in schema even for empty batch
1686        let schema = bulk_part.batch.schema();
1687        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1688        assert_eq!(
1689            field_names,
1690            vec![
1691                "k0",
1692                "k1",
1693                "v0",
1694                "v1",
1695                "ts",
1696                "__primary_key",
1697                "__sequence",
1698                "__op_type"
1699            ]
1700        );
1701    }
1702
1703    #[test]
1704    fn test_bulk_part_converter_without_primary_key_columns() {
1705        let metadata = metadata_for_test();
1706        let primary_key_codec = build_primary_key_codec(&metadata);
1707        let schema = to_flat_sst_arrow_schema(
1708            &metadata,
1709            &FlatSchemaOptions {
1710                raw_pk_columns: false,
1711                string_pk_use_dict: true,
1712            },
1713        );
1714
1715        let capacity = 100;
1716        let mut converter =
1717            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
1718
1719        let key_values1 = build_key_values_with_ts_seq_values(
1720            &metadata,
1721            "key1".to_string(),
1722            1u32,
1723            vec![1000, 2000].into_iter(),
1724            vec![Some(1.0), Some(2.0)].into_iter(),
1725            1,
1726        );
1727
1728        let key_values2 = build_key_values_with_ts_seq_values(
1729            &metadata,
1730            "key2".to_string(),
1731            2u32,
1732            vec![1500].into_iter(),
1733            vec![Some(3.0)].into_iter(),
1734            2,
1735        );
1736
1737        converter.append_key_values(&key_values1).unwrap();
1738        converter.append_key_values(&key_values2).unwrap();
1739
1740        let bulk_part = converter.convert().unwrap();
1741
1742        assert_eq!(bulk_part.num_rows(), 3);
1743        assert_eq!(bulk_part.min_timestamp, 1000);
1744        assert_eq!(bulk_part.max_timestamp, 2000);
1745        assert_eq!(bulk_part.sequence, 2);
1746        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1747
1748        // Validate primary key columns are NOT stored individually
1749        let schema = bulk_part.batch.schema();
1750        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1751        assert_eq!(
1752            field_names,
1753            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1754        );
1755    }
1756
1757    #[allow(clippy::too_many_arguments)]
1758    fn build_key_values_with_sparse_encoding(
1759        metadata: &RegionMetadataRef,
1760        primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
1761        table_id: u32,
1762        tsid: u64,
1763        k0: String,
1764        k1: String,
1765        timestamps: impl Iterator<Item = i64>,
1766        values: impl Iterator<Item = Option<f64>>,
1767        sequence: SequenceNumber,
1768    ) -> KeyValues {
1769        // Encode the primary key (__table_id, __tsid, k0, k1) into binary format using the sparse codec
1770        let pk_values = vec![
1771            (ReservedColumnId::table_id(), Value::UInt32(table_id)),
1772            (ReservedColumnId::tsid(), Value::UInt64(tsid)),
1773            (0, Value::String(k0.clone().into())),
1774            (1, Value::String(k1.clone().into())),
1775        ];
1776        let mut encoded_key = Vec::new();
1777        primary_key_codec
1778            .encode_values(&pk_values, &mut encoded_key)
1779            .unwrap();
1780        assert!(!encoded_key.is_empty());
1781
1782        // Create schema for sparse encoding: __primary_key, ts, v0, v1
1783        let column_schema = vec![
1784            api::v1::ColumnSchema {
1785                column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
1786                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1787                    ConcreteDataType::binary_datatype(),
1788                )
1789                .unwrap()
1790                .datatype() as i32,
1791                semantic_type: api::v1::SemanticType::Tag as i32,
1792                ..Default::default()
1793            },
1794            api::v1::ColumnSchema {
1795                column_name: "ts".to_string(),
1796                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1797                    ConcreteDataType::timestamp_millisecond_datatype(),
1798                )
1799                .unwrap()
1800                .datatype() as i32,
1801                semantic_type: api::v1::SemanticType::Timestamp as i32,
1802                ..Default::default()
1803            },
1804            api::v1::ColumnSchema {
1805                column_name: "v0".to_string(),
1806                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1807                    ConcreteDataType::int64_datatype(),
1808                )
1809                .unwrap()
1810                .datatype() as i32,
1811                semantic_type: api::v1::SemanticType::Field as i32,
1812                ..Default::default()
1813            },
1814            api::v1::ColumnSchema {
1815                column_name: "v1".to_string(),
1816                datatype: api::helper::ColumnDataTypeWrapper::try_from(
1817                    ConcreteDataType::float64_datatype(),
1818                )
1819                .unwrap()
1820                .datatype() as i32,
1821                semantic_type: api::v1::SemanticType::Field as i32,
1822                ..Default::default()
1823            },
1824        ];
1825
1826        let rows = timestamps
1827            .zip(values)
1828            .map(|(ts, v)| Row {
1829                values: vec![
1830                    api::v1::Value {
1831                        value_data: Some(api::v1::value::ValueData::BinaryValue(
1832                            encoded_key.clone(),
1833                        )),
1834                    },
1835                    api::v1::Value {
1836                        value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
1837                    },
1838                    api::v1::Value {
1839                        value_data: Some(api::v1::value::ValueData::I64Value(ts)),
1840                    },
1841                    api::v1::Value {
1842                        value_data: v.map(api::v1::value::ValueData::F64Value),
1843                    },
1844                ],
1845            })
1846            .collect();
1847
1848        let mutation = api::v1::Mutation {
1849            op_type: 1,
1850            sequence,
1851            rows: Some(api::v1::Rows {
1852                schema: column_schema,
1853                rows,
1854            }),
1855            write_hint: Some(WriteHint {
1856                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
1857            }),
1858        };
1859        KeyValues::new(metadata.as_ref(), mutation).unwrap()
1860    }
1861
1862    #[test]
1863    fn test_bulk_part_converter_sparse_primary_key_encoding() {
1864        use api::v1::SemanticType;
1865        use datatypes::schema::ColumnSchema;
1866        use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1867        use store_api::storage::RegionId;
1868
1869        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
1870        builder
1871            .push_column_metadata(ColumnMetadata {
1872                column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
1873                semantic_type: SemanticType::Tag,
1874                column_id: 0,
1875            })
1876            .push_column_metadata(ColumnMetadata {
1877                column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
1878                semantic_type: SemanticType::Tag,
1879                column_id: 1,
1880            })
1881            .push_column_metadata(ColumnMetadata {
1882                column_schema: ColumnSchema::new(
1883                    "ts",
1884                    ConcreteDataType::timestamp_millisecond_datatype(),
1885                    false,
1886                ),
1887                semantic_type: SemanticType::Timestamp,
1888                column_id: 2,
1889            })
1890            .push_column_metadata(ColumnMetadata {
1891                column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
1892                semantic_type: SemanticType::Field,
1893                column_id: 3,
1894            })
1895            .push_column_metadata(ColumnMetadata {
1896                column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
1897                semantic_type: SemanticType::Field,
1898                column_id: 4,
1899            })
1900            .primary_key(vec![0, 1])
1901            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1902        let metadata = Arc::new(builder.build().unwrap());
1903
1904        let primary_key_codec = build_primary_key_codec(&metadata);
1905        let schema = to_flat_sst_arrow_schema(
1906            &metadata,
1907            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1908        );
1909
1910        assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
1911        assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
1912
1913        let capacity = 100;
1914        let mut converter =
1915            BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
1916
1917        let key_values1 = build_key_values_with_sparse_encoding(
1918            &metadata,
1919            &primary_key_codec,
1920            2048u32, // table_id
1921            100u64,  // tsid
1922            "key11".to_string(),
1923            "key21".to_string(),
1924            vec![1000, 2000].into_iter(),
1925            vec![Some(1.0), Some(2.0)].into_iter(),
1926            1,
1927        );
1928
1929        let key_values2 = build_key_values_with_sparse_encoding(
1930            &metadata,
1931            &primary_key_codec,
1932            4096u32, // table_id
1933            200u64,  // tsid
1934            "key12".to_string(),
1935            "key22".to_string(),
1936            vec![1500].into_iter(),
1937            vec![Some(3.0)].into_iter(),
1938            2,
1939        );
1940
1941        converter.append_key_values(&key_values1).unwrap();
1942        converter.append_key_values(&key_values2).unwrap();
1943
1944        let bulk_part = converter.convert().unwrap();
1945
1946        assert_eq!(bulk_part.num_rows(), 3);
1947        assert_eq!(bulk_part.min_timestamp, 1000);
1948        assert_eq!(bulk_part.max_timestamp, 2000);
1949        assert_eq!(bulk_part.sequence, 2);
1950        assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1951
1952        // For sparse encoding, primary key columns should NOT be stored individually
1953        // even when store_primary_key_columns is true, because sparse encoding
1954        // stores the encoded primary key in the __primary_key column
1955        let schema = bulk_part.batch.schema();
1956        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1957        assert_eq!(
1958            field_names,
1959            vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
1960        );
1961
1962        // Verify the __primary_key column contains encoded sparse keys
1963        let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
1964        let dict_array = primary_key_column
1965            .as_any()
1966            .downcast_ref::<DictionaryArray<UInt32Type>>()
1967            .unwrap();
1968
1969        // Should have non-zero entries indicating encoded primary keys
1970        assert!(!dict_array.is_empty());
1971        assert_eq!(dict_array.len(), 3); // 3 rows total
1972
1973        // Verify values are properly encoded binary data (not empty)
1974        let values = dict_array
1975            .values()
1976            .as_any()
1977            .downcast_ref::<BinaryArray>()
1978            .unwrap();
1979        for i in 0..values.len() {
1980            assert!(
1981                !values.value(i).is_empty(),
1982                "Encoded primary key should not be empty"
1983            );
1984        }
1985    }
1986}