mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52    NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55    FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56    StatValues,
57};
58use crate::sst::{
59    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60    to_flat_sst_arrow_schema,
61};
62
63/// Helper for writing the SST format.
64pub(crate) struct FlatWriteFormat {
65    /// SST file schema.
66    arrow_schema: SchemaRef,
67    override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71    /// Creates a new helper.
72    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74        FlatWriteFormat {
75            arrow_schema,
76            override_sequence: None,
77        }
78    }
79
80    /// Set override sequence.
81    pub(crate) fn with_override_sequence(
82        mut self,
83        override_sequence: Option<SequenceNumber>,
84    ) -> Self {
85        self.override_sequence = override_sequence;
86        self
87    }
88
89    /// Gets the arrow schema to store in parquet.
90    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91        &self.arrow_schema
92    }
93
94    /// Convert `batch` to a arrow record batch to store in parquet.
95    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98        let Some(override_sequence) = self.override_sequence else {
99            return Ok(batch.clone());
100        };
101
102        let mut columns = batch.columns().to_vec();
103        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104        columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107    }
108}
109
110/// Returns the position of the sequence column.
111pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112    num_columns - 2
113}
114
115/// Returns the position of the time index column.
116pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117    num_columns - 4
118}
119
120/// Returns the position of the primary key column.
121pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122    num_columns - 3
123}
124
125/// Returns the position of the op type key column.
126pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127    num_columns - 1
128}
129
130// TODO(yingwen): Add an option to skip reading internal columns.
131/// Helper for reading the flat SST format with projection.
132///
133/// It only supports flat format that stores primary keys additionally.
134pub struct FlatReadFormat {
135    /// Sequence number to override the sequence read from the SST.
136    override_sequence: Option<SequenceNumber>,
137    /// Parquet format adapter.
138    parquet_adapter: ParquetAdapter,
139}
140
141impl FlatReadFormat {
142    /// Creates a helper with existing `metadata` and `column_ids` to read.
143    ///
144    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
145    pub fn new(
146        metadata: RegionMetadataRef,
147        column_ids: impl Iterator<Item = ColumnId>,
148        num_columns: Option<usize>,
149        file_path: &str,
150        skip_auto_convert: bool,
151    ) -> Result<FlatReadFormat> {
152        let is_legacy = match num_columns {
153            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
154            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
155        };
156
157        let parquet_adapter = if is_legacy {
158            // Safety: is_legacy_format() ensures primary_key is not empty.
159            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
160                // Only skip auto convert when the primary key encoding is sparse.
161                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
162                    metadata,
163                    column_ids,
164                    skip_auto_convert,
165                ))
166            } else {
167                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
168                    metadata, column_ids, false,
169                ))
170            }
171        } else {
172            ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
173        };
174
175        Ok(FlatReadFormat {
176            override_sequence: None,
177            parquet_adapter,
178        })
179    }
180
181    /// Sets the sequence number to override.
182    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
183        self.override_sequence = sequence;
184    }
185
186    /// Index of a column in the projected batch by its column id.
187    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
188        self.format_projection()
189            .column_id_to_projected_index
190            .get(&column_id)
191            .copied()
192    }
193
194    /// Returns min values of specific column in row groups.
195    pub fn min_values(
196        &self,
197        row_groups: &[impl Borrow<RowGroupMetaData>],
198        column_id: ColumnId,
199    ) -> StatValues {
200        match &self.parquet_adapter {
201            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
202            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
203        }
204    }
205
206    /// Returns max values of specific column in row groups.
207    pub fn max_values(
208        &self,
209        row_groups: &[impl Borrow<RowGroupMetaData>],
210        column_id: ColumnId,
211    ) -> StatValues {
212        match &self.parquet_adapter {
213            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
214            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
215        }
216    }
217
218    /// Returns null counts of specific column in row groups.
219    pub fn null_counts(
220        &self,
221        row_groups: &[impl Borrow<RowGroupMetaData>],
222        column_id: ColumnId,
223    ) -> StatValues {
224        match &self.parquet_adapter {
225            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
226            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
227        }
228    }
229
230    /// Gets the arrow schema of the SST file.
231    ///
232    /// This schema is computed from the region metadata but should be the same
233    /// as the arrow schema decoded from the file metadata.
234    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
235        match &self.parquet_adapter {
236            ParquetAdapter::Flat(p) => &p.arrow_schema,
237            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
238        }
239    }
240
241    /// Gets the metadata of the SST.
242    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
243        match &self.parquet_adapter {
244            ParquetAdapter::Flat(p) => &p.metadata,
245            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
246        }
247    }
248
249    /// Gets sorted projection indices to read from the SST file.
250    pub(crate) fn projection_indices(&self) -> &[usize] {
251        match &self.parquet_adapter {
252            ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
253            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
254        }
255    }
256
257    /// Gets the projection in the flat format.
258    pub(crate) fn format_projection(&self) -> &FormatProjection {
259        match &self.parquet_adapter {
260            ParquetAdapter::Flat(p) => &p.format_projection,
261            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
262        }
263    }
264
265    /// Creates a sequence array to override.
266    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
267        self.override_sequence
268            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
269    }
270
271    /// Convert a record batch to apply flat format conversion and override sequence array.
272    ///
273    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
274    /// then the sequence column replaced by the override sequence array.
275    pub(crate) fn convert_batch(
276        &self,
277        record_batch: RecordBatch,
278        override_sequence_array: Option<&ArrayRef>,
279    ) -> Result<RecordBatch> {
280        // First, apply flat format conversion.
281        let batch = match &self.parquet_adapter {
282            ParquetAdapter::Flat(_) => record_batch,
283            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
284        };
285
286        // Then apply sequence override if provided
287        let Some(override_array) = override_sequence_array else {
288            return Ok(batch);
289        };
290
291        let mut columns = batch.columns().to_vec();
292        let sequence_column_idx = sequence_column_index(batch.num_columns());
293
294        // Use the provided override sequence array, slicing if necessary to match batch length
295        let sequence_array = if override_array.len() > batch.num_rows() {
296            override_array.slice(0, batch.num_rows())
297        } else {
298            override_array.clone()
299        };
300
301        columns[sequence_column_idx] = sequence_array;
302
303        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
304    }
305
306    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
307    ///
308    /// * `metadata` is the region metadata (always assumes flat format).
309    /// * `num_columns` is the number of columns in the parquet file.
310    /// * `file_path` is the path to the parquet file, for error message.
311    pub(crate) fn is_legacy_format(
312        metadata: &RegionMetadata,
313        num_columns: usize,
314        file_path: &str,
315    ) -> Result<bool> {
316        if metadata.primary_key.is_empty() {
317            return Ok(false);
318        }
319
320        // For flat format, compute expected column number:
321        // all columns + internal columns (pk, sequence, op_type)
322        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
323
324        if expected_columns == num_columns {
325            // Same number of columns, no conversion needed
326            Ok(false)
327        } else {
328            ensure!(
329                expected_columns >= num_columns,
330                InvalidParquetSnafu {
331                    file: file_path,
332                    reason: format!(
333                        "Expected columns {} should be >= actual columns {}",
334                        expected_columns, num_columns
335                    )
336                }
337            );
338
339            // Different number of columns, check if the difference matches primary key count
340            let column_diff = expected_columns - num_columns;
341
342            ensure!(
343                column_diff == metadata.primary_key.len(),
344                InvalidParquetSnafu {
345                    file: file_path,
346                    reason: format!(
347                        "Column number difference {} does not match primary key count {}",
348                        column_diff,
349                        metadata.primary_key.len()
350                    )
351                }
352            );
353
354            Ok(true)
355        }
356    }
357}
358
359/// Wraps the parquet helper for different formats.
360enum ParquetAdapter {
361    Flat(ParquetFlat),
362    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
363}
364
365/// Helper to reads the parquet from primary key format into the flat format.
366struct ParquetPrimaryKeyToFlat {
367    /// The primary key format to read the parquet.
368    format: PrimaryKeyReadFormat,
369    /// Format converter for handling flat format conversion.
370    convert_format: Option<FlatConvertFormat>,
371    /// Projection computed for the flat format.
372    format_projection: FormatProjection,
373}
374
375impl ParquetPrimaryKeyToFlat {
376    /// Creates a helper with existing `metadata` and `column_ids` to read.
377    fn new(
378        metadata: RegionMetadataRef,
379        column_ids: impl Iterator<Item = ColumnId>,
380        skip_auto_convert: bool,
381    ) -> ParquetPrimaryKeyToFlat {
382        assert!(if skip_auto_convert {
383            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
384        } else {
385            true
386        });
387
388        let column_ids: Vec<_> = column_ids.collect();
389
390        // Creates a map to lookup index based on the new format.
391        let id_to_index = sst_column_id_indices(&metadata);
392        let sst_column_num =
393            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
394        // Computes the format projection for the new format.
395        let format_projection = FormatProjection::compute_format_projection(
396            &id_to_index,
397            sst_column_num,
398            column_ids.iter().copied(),
399        );
400        let codec = build_primary_key_codec(&metadata);
401        let convert_format = if skip_auto_convert {
402            None
403        } else {
404            FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
405        };
406
407        let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
408
409        Self {
410            format,
411            convert_format,
412            format_projection,
413        }
414    }
415
416    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
417        if let Some(convert_format) = &self.convert_format {
418            convert_format.convert(record_batch)
419        } else {
420            Ok(record_batch)
421        }
422    }
423}
424
425/// Helper to reads the parquet in flat format directly.
426struct ParquetFlat {
427    /// The metadata stored in the SST.
428    metadata: RegionMetadataRef,
429    /// SST file schema.
430    arrow_schema: SchemaRef,
431    /// Projection computed for the flat format.
432    format_projection: FormatProjection,
433    /// Column id to index in SST.
434    column_id_to_sst_index: HashMap<ColumnId, usize>,
435}
436
437impl ParquetFlat {
438    /// Creates a helper with existing `metadata` and `column_ids` to read.
439    fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
440        // Creates a map to lookup index.
441        let id_to_index = sst_column_id_indices(&metadata);
442        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
443        let sst_column_num =
444            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
445        let format_projection =
446            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
447
448        Self {
449            metadata,
450            arrow_schema,
451            format_projection,
452            column_id_to_sst_index: id_to_index,
453        }
454    }
455
456    /// Returns min values of specific column in row groups.
457    fn min_values(
458        &self,
459        row_groups: &[impl Borrow<RowGroupMetaData>],
460        column_id: ColumnId,
461    ) -> StatValues {
462        self.get_stat_values(row_groups, column_id, true)
463    }
464
465    /// Returns max values of specific column in row groups.
466    fn max_values(
467        &self,
468        row_groups: &[impl Borrow<RowGroupMetaData>],
469        column_id: ColumnId,
470    ) -> StatValues {
471        self.get_stat_values(row_groups, column_id, false)
472    }
473
474    /// Returns null counts of specific column in row groups.
475    fn null_counts(
476        &self,
477        row_groups: &[impl Borrow<RowGroupMetaData>],
478        column_id: ColumnId,
479    ) -> StatValues {
480        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
481            // No such column in the SST.
482            return StatValues::NoColumn;
483        };
484
485        let stats = ReadFormat::column_null_counts(row_groups, *index);
486        StatValues::from_stats_opt(stats)
487    }
488
489    fn get_stat_values(
490        &self,
491        row_groups: &[impl Borrow<RowGroupMetaData>],
492        column_id: ColumnId,
493        is_min: bool,
494    ) -> StatValues {
495        let Some(column) = self.metadata.column_by_id(column_id) else {
496            // No such column in the SST.
497            return StatValues::NoColumn;
498        };
499        // Safety: `column_id_to_sst_index` is built from `metadata`.
500        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
501
502        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
503        StatValues::from_stats_opt(stats)
504    }
505}
506
507/// Returns a map that the key is the column id and the value is the column position
508/// in the SST.
509/// It only supports SSTs with raw primary key columns.
510pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
511    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
512    let mut column_index = 0;
513    // keys
514    for pk_id in &metadata.primary_key {
515        id_to_index.insert(*pk_id, column_index);
516        column_index += 1;
517    }
518    // fields
519    for column in &metadata.column_metadatas {
520        if column.semantic_type == SemanticType::Field {
521            id_to_index.insert(column.column_id, column_index);
522            column_index += 1;
523        }
524    }
525    // time index
526    id_to_index.insert(metadata.time_index_column().column_id, column_index);
527
528    id_to_index
529}
530
531/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
532/// primary key columns in flat format.
533pub(crate) struct FlatConvertFormat {
534    /// Metadata of the region.
535    metadata: RegionMetadataRef,
536    /// Primary key codec to decode primary keys.
537    codec: Arc<dyn PrimaryKeyCodec>,
538    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
539    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
540}
541
542impl FlatConvertFormat {
543    /// Creates a new `FlatConvertFormat`.
544    ///
545    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
546    /// The `codec` is the primary key codec of the `metadata`.
547    ///
548    /// Returns `None` if there is no primary key.
549    pub(crate) fn new(
550        metadata: RegionMetadataRef,
551        format_projection: &FormatProjection,
552        codec: Arc<dyn PrimaryKeyCodec>,
553    ) -> Option<Self> {
554        if metadata.primary_key.is_empty() {
555            return None;
556        }
557
558        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
559        let mut projected_primary_keys = Vec::new();
560        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
561            if format_projection
562                .column_id_to_projected_index
563                .contains_key(&column_id)
564            {
565                // We expect the format_projection is built from the metadata.
566                let column_index = metadata.column_index_by_id(column_id).unwrap();
567                projected_primary_keys.push((column_id, pk_index, column_index));
568            }
569        }
570
571        Some(Self {
572            metadata,
573            codec,
574            projected_primary_keys,
575        })
576    }
577
578    /// Converts a batch to have decoded primary key columns in flat format.
579    ///
580    /// The primary key array in the batch is a dictionary array. We decode each value which is a
581    /// primary key and reuse the keys array to build a dictionary array for each tag column.
582    /// The decoded columns are inserted in front of other columns.
583    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
584        if self.projected_primary_keys.is_empty() {
585            return Ok(batch);
586        }
587
588        let primary_key_index = primary_key_column_index(batch.num_columns());
589        let pk_dict_array = batch
590            .column(primary_key_index)
591            .as_any()
592            .downcast_ref::<PrimaryKeyArray>()
593            .with_context(|| InvalidRecordBatchSnafu {
594                reason: "Primary key column is not a dictionary array".to_string(),
595            })?;
596
597        let pk_values_array = pk_dict_array
598            .values()
599            .as_any()
600            .downcast_ref::<BinaryArray>()
601            .with_context(|| InvalidRecordBatchSnafu {
602                reason: "Primary key values are not binary array".to_string(),
603            })?;
604
605        // Decodes all primary key values
606        let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len());
607        for i in 0..pk_values_array.len() {
608            if pk_values_array.is_null(i) {
609                decoded_pk_values.push(None);
610            } else {
611                let pk_bytes = pk_values_array.value(i);
612                let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?;
613                decoded_pk_values.push(Some(decoded));
614            }
615        }
616
617        // Builds decoded tag column arrays.
618        let mut decoded_columns = Vec::new();
619        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
620            let column_metadata = &self.metadata.column_metadatas[*column_index];
621            let tag_column = self.build_primary_key_column(
622                *column_id,
623                *pk_index,
624                &column_metadata.column_schema.data_type,
625                pk_dict_array.keys(),
626                &decoded_pk_values,
627            )?;
628            decoded_columns.push(tag_column);
629        }
630
631        // Builds new columns: decoded tag columns first, then original columns
632        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
633        new_columns.extend(decoded_columns);
634        new_columns.extend_from_slice(batch.columns());
635
636        // Builds new schema
637        let mut new_fields =
638            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
639        for (_, _, column_index) in &self.projected_primary_keys {
640            let column_metadata = &self.metadata.column_metadatas[*column_index];
641            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
642            let field =
643                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
644            new_fields.push(field);
645        }
646        new_fields.extend(batch.schema().fields().iter().cloned());
647
648        let new_schema = Arc::new(Schema::new(new_fields));
649        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
650    }
651
652    /// Builds an array for a specific tag column.
653    ///
654    /// It may build a dictionary array if the type is string. Note that the dictionary
655    /// array may have null values, although keys are not null.
656    fn build_primary_key_column(
657        &self,
658        column_id: ColumnId,
659        pk_index: usize,
660        column_type: &ConcreteDataType,
661        keys: &UInt32Array,
662        decoded_pk_values: &[Option<CompositeValues>],
663    ) -> Result<ArrayRef> {
664        // Gets values from the primary key.
665        let mut builder = column_type.create_mutable_vector(decoded_pk_values.len());
666        for decoded_opt in decoded_pk_values {
667            match decoded_opt {
668                Some(decoded) => {
669                    match decoded {
670                        CompositeValues::Dense(dense) => {
671                            if pk_index < dense.len() {
672                                builder.push_value_ref(&dense[pk_index].1.as_value_ref());
673                            } else {
674                                builder.push_null();
675                            }
676                        }
677                        CompositeValues::Sparse(sparse) => {
678                            let value = sparse.get_or_null(column_id);
679                            builder.push_value_ref(&value.as_value_ref());
680                        }
681                    };
682                }
683                None => builder.push_null(),
684            }
685        }
686
687        let values_vector = builder.to_vector();
688        let values_array = values_vector.to_arrow_array();
689
690        // Only creates dictionary array for string types, otherwise take values by keys
691        if matches!(column_type, ConcreteDataType::String(_)) {
692            // Creates dictionary array using the same keys for string types
693            // Note that the dictionary values may have nulls.
694            let dict_array = DictionaryArray::new(keys.clone(), values_array);
695            Ok(Arc::new(dict_array))
696        } else {
697            // For non-string types, takes values by keys indices to create a regular array
698            let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?;
699            Ok(taken_array)
700        }
701    }
702}
703
704#[cfg(test)]
705impl FlatReadFormat {
706    /// Creates a helper with existing `metadata` and all columns.
707    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
708        Self::new(
709            Arc::clone(&metadata),
710            metadata.column_metadatas.iter().map(|c| c.column_id),
711            None,
712            "test",
713            false,
714        )
715        .unwrap()
716    }
717}