mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52    NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55    FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56    StatValues,
57};
58use crate::sst::{
59    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60    to_flat_sst_arrow_schema,
61};
62
63/// Helper for writing the SST format.
64pub(crate) struct FlatWriteFormat {
65    /// SST file schema.
66    arrow_schema: SchemaRef,
67    override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71    /// Creates a new helper.
72    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74        FlatWriteFormat {
75            arrow_schema,
76            override_sequence: None,
77        }
78    }
79
80    /// Set override sequence.
81    pub(crate) fn with_override_sequence(
82        mut self,
83        override_sequence: Option<SequenceNumber>,
84    ) -> Self {
85        self.override_sequence = override_sequence;
86        self
87    }
88
89    /// Gets the arrow schema to store in parquet.
90    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91        &self.arrow_schema
92    }
93
94    /// Convert `batch` to a arrow record batch to store in parquet.
95    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98        let Some(override_sequence) = self.override_sequence else {
99            return Ok(batch.clone());
100        };
101
102        let mut columns = batch.columns().to_vec();
103        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104        columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107    }
108}
109
110/// Returns the position of the sequence column.
111pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112    num_columns - 2
113}
114
115/// Returns the position of the time index column.
116pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117    num_columns - 4
118}
119
120/// Returns the position of the primary key column.
121pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122    num_columns - 3
123}
124
125/// Returns the position of the op type key column.
126pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127    num_columns - 1
128}
129
130// TODO(yingwen): Add an option to skip reading internal columns if the region is
131// append only and doesn't use sparse encoding (We need to check the table id under
132// sparse encoding).
133/// Helper for reading the flat SST format with projection.
134///
135/// It only supports flat format that stores primary keys additionally.
136pub struct FlatReadFormat {
137    /// Sequence number to override the sequence read from the SST.
138    override_sequence: Option<SequenceNumber>,
139    /// Parquet format adapter.
140    parquet_adapter: ParquetAdapter,
141}
142
143impl FlatReadFormat {
144    /// Creates a helper with existing `metadata` and `column_ids` to read.
145    ///
146    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
147    pub fn new(
148        metadata: RegionMetadataRef,
149        column_ids: impl Iterator<Item = ColumnId>,
150        num_columns: Option<usize>,
151        file_path: &str,
152        skip_auto_convert: bool,
153    ) -> Result<FlatReadFormat> {
154        let is_legacy = match num_columns {
155            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
156            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
157        };
158
159        let parquet_adapter = if is_legacy {
160            // Safety: is_legacy_format() ensures primary_key is not empty.
161            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
162                // Only skip auto convert when the primary key encoding is sparse.
163                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
164                    metadata,
165                    column_ids,
166                    skip_auto_convert,
167                ))
168            } else {
169                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
170                    metadata, column_ids, false,
171                ))
172            }
173        } else {
174            ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
175        };
176
177        Ok(FlatReadFormat {
178            override_sequence: None,
179            parquet_adapter,
180        })
181    }
182
183    /// Sets the sequence number to override.
184    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
185        self.override_sequence = sequence;
186    }
187
188    /// Index of a column in the projected batch by its column id.
189    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
190        self.format_projection()
191            .column_id_to_projected_index
192            .get(&column_id)
193            .copied()
194    }
195
196    /// Returns min values of specific column in row groups.
197    pub fn min_values(
198        &self,
199        row_groups: &[impl Borrow<RowGroupMetaData>],
200        column_id: ColumnId,
201    ) -> StatValues {
202        match &self.parquet_adapter {
203            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
204            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
205        }
206    }
207
208    /// Returns max values of specific column in row groups.
209    pub fn max_values(
210        &self,
211        row_groups: &[impl Borrow<RowGroupMetaData>],
212        column_id: ColumnId,
213    ) -> StatValues {
214        match &self.parquet_adapter {
215            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
216            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
217        }
218    }
219
220    /// Returns null counts of specific column in row groups.
221    pub fn null_counts(
222        &self,
223        row_groups: &[impl Borrow<RowGroupMetaData>],
224        column_id: ColumnId,
225    ) -> StatValues {
226        match &self.parquet_adapter {
227            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
228            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
229        }
230    }
231
232    /// Gets the arrow schema of the SST file.
233    ///
234    /// This schema is computed from the region metadata but should be the same
235    /// as the arrow schema decoded from the file metadata.
236    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
237        match &self.parquet_adapter {
238            ParquetAdapter::Flat(p) => &p.arrow_schema,
239            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
240        }
241    }
242
243    /// Gets the metadata of the SST.
244    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
245        match &self.parquet_adapter {
246            ParquetAdapter::Flat(p) => &p.metadata,
247            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
248        }
249    }
250
251    /// Gets sorted projection indices to read from the SST file.
252    pub(crate) fn projection_indices(&self) -> &[usize] {
253        match &self.parquet_adapter {
254            ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
255            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
256        }
257    }
258
259    /// Gets the projection in the flat format.
260    pub(crate) fn format_projection(&self) -> &FormatProjection {
261        match &self.parquet_adapter {
262            ParquetAdapter::Flat(p) => &p.format_projection,
263            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
264        }
265    }
266
267    /// Creates a sequence array to override.
268    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
269        self.override_sequence
270            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
271    }
272
273    /// Convert a record batch to apply flat format conversion and override sequence array.
274    ///
275    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
276    /// then the sequence column replaced by the override sequence array.
277    pub(crate) fn convert_batch(
278        &self,
279        record_batch: RecordBatch,
280        override_sequence_array: Option<&ArrayRef>,
281    ) -> Result<RecordBatch> {
282        // First, apply flat format conversion.
283        let batch = match &self.parquet_adapter {
284            ParquetAdapter::Flat(_) => record_batch,
285            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
286        };
287
288        // Then apply sequence override if provided
289        let Some(override_array) = override_sequence_array else {
290            return Ok(batch);
291        };
292
293        let mut columns = batch.columns().to_vec();
294        let sequence_column_idx = sequence_column_index(batch.num_columns());
295
296        // Use the provided override sequence array, slicing if necessary to match batch length
297        let sequence_array = if override_array.len() > batch.num_rows() {
298            override_array.slice(0, batch.num_rows())
299        } else {
300            override_array.clone()
301        };
302
303        columns[sequence_column_idx] = sequence_array;
304
305        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
306    }
307
308    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
309    ///
310    /// * `metadata` is the region metadata (always assumes flat format).
311    /// * `num_columns` is the number of columns in the parquet file.
312    /// * `file_path` is the path to the parquet file, for error message.
313    pub(crate) fn is_legacy_format(
314        metadata: &RegionMetadata,
315        num_columns: usize,
316        file_path: &str,
317    ) -> Result<bool> {
318        if metadata.primary_key.is_empty() {
319            return Ok(false);
320        }
321
322        // For flat format, compute expected column number:
323        // all columns + internal columns (pk, sequence, op_type)
324        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
325
326        if expected_columns == num_columns {
327            // Same number of columns, no conversion needed
328            Ok(false)
329        } else {
330            ensure!(
331                expected_columns >= num_columns,
332                InvalidParquetSnafu {
333                    file: file_path,
334                    reason: format!(
335                        "Expected columns {} should be >= actual columns {}",
336                        expected_columns, num_columns
337                    )
338                }
339            );
340
341            // Different number of columns, check if the difference matches primary key count
342            let column_diff = expected_columns - num_columns;
343
344            ensure!(
345                column_diff == metadata.primary_key.len(),
346                InvalidParquetSnafu {
347                    file: file_path,
348                    reason: format!(
349                        "Column number difference {} does not match primary key count {}",
350                        column_diff,
351                        metadata.primary_key.len()
352                    )
353                }
354            );
355
356            Ok(true)
357        }
358    }
359}
360
361/// Wraps the parquet helper for different formats.
362enum ParquetAdapter {
363    Flat(ParquetFlat),
364    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
365}
366
367/// Helper to reads the parquet from primary key format into the flat format.
368struct ParquetPrimaryKeyToFlat {
369    /// The primary key format to read the parquet.
370    format: PrimaryKeyReadFormat,
371    /// Format converter for handling flat format conversion.
372    convert_format: Option<FlatConvertFormat>,
373    /// Projection computed for the flat format.
374    format_projection: FormatProjection,
375}
376
377impl ParquetPrimaryKeyToFlat {
378    /// Creates a helper with existing `metadata` and `column_ids` to read.
379    fn new(
380        metadata: RegionMetadataRef,
381        column_ids: impl Iterator<Item = ColumnId>,
382        skip_auto_convert: bool,
383    ) -> ParquetPrimaryKeyToFlat {
384        assert!(if skip_auto_convert {
385            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
386        } else {
387            true
388        });
389
390        let column_ids: Vec<_> = column_ids.collect();
391
392        // Creates a map to lookup index based on the new format.
393        let id_to_index = sst_column_id_indices(&metadata);
394        let sst_column_num =
395            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
396        // Computes the format projection for the new format.
397        let format_projection = FormatProjection::compute_format_projection(
398            &id_to_index,
399            sst_column_num,
400            column_ids.iter().copied(),
401        );
402        let codec = build_primary_key_codec(&metadata);
403        let convert_format = if skip_auto_convert {
404            None
405        } else {
406            FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
407        };
408
409        let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
410
411        Self {
412            format,
413            convert_format,
414            format_projection,
415        }
416    }
417
418    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
419        if let Some(convert_format) = &self.convert_format {
420            convert_format.convert(record_batch)
421        } else {
422            Ok(record_batch)
423        }
424    }
425}
426
427/// Helper to reads the parquet in flat format directly.
428struct ParquetFlat {
429    /// The metadata stored in the SST.
430    metadata: RegionMetadataRef,
431    /// SST file schema.
432    arrow_schema: SchemaRef,
433    /// Projection computed for the flat format.
434    format_projection: FormatProjection,
435    /// Column id to index in SST.
436    column_id_to_sst_index: HashMap<ColumnId, usize>,
437}
438
439impl ParquetFlat {
440    /// Creates a helper with existing `metadata` and `column_ids` to read.
441    fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
442        // Creates a map to lookup index.
443        let id_to_index = sst_column_id_indices(&metadata);
444        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
445        let sst_column_num =
446            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
447        let format_projection =
448            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
449
450        Self {
451            metadata,
452            arrow_schema,
453            format_projection,
454            column_id_to_sst_index: id_to_index,
455        }
456    }
457
458    /// Returns min values of specific column in row groups.
459    fn min_values(
460        &self,
461        row_groups: &[impl Borrow<RowGroupMetaData>],
462        column_id: ColumnId,
463    ) -> StatValues {
464        self.get_stat_values(row_groups, column_id, true)
465    }
466
467    /// Returns max values of specific column in row groups.
468    fn max_values(
469        &self,
470        row_groups: &[impl Borrow<RowGroupMetaData>],
471        column_id: ColumnId,
472    ) -> StatValues {
473        self.get_stat_values(row_groups, column_id, false)
474    }
475
476    /// Returns null counts of specific column in row groups.
477    fn null_counts(
478        &self,
479        row_groups: &[impl Borrow<RowGroupMetaData>],
480        column_id: ColumnId,
481    ) -> StatValues {
482        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
483            // No such column in the SST.
484            return StatValues::NoColumn;
485        };
486
487        let stats = ReadFormat::column_null_counts(row_groups, *index);
488        StatValues::from_stats_opt(stats)
489    }
490
491    fn get_stat_values(
492        &self,
493        row_groups: &[impl Borrow<RowGroupMetaData>],
494        column_id: ColumnId,
495        is_min: bool,
496    ) -> StatValues {
497        let Some(column) = self.metadata.column_by_id(column_id) else {
498            // No such column in the SST.
499            return StatValues::NoColumn;
500        };
501        // Safety: `column_id_to_sst_index` is built from `metadata`.
502        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
503
504        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
505        StatValues::from_stats_opt(stats)
506    }
507}
508
509/// Returns a map that the key is the column id and the value is the column position
510/// in the SST.
511/// It only supports SSTs with raw primary key columns.
512pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
513    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
514    let mut column_index = 0;
515    // keys
516    for pk_id in &metadata.primary_key {
517        id_to_index.insert(*pk_id, column_index);
518        column_index += 1;
519    }
520    // fields
521    for column in &metadata.column_metadatas {
522        if column.semantic_type == SemanticType::Field {
523            id_to_index.insert(column.column_id, column_index);
524            column_index += 1;
525        }
526    }
527    // time index
528    id_to_index.insert(metadata.time_index_column().column_id, column_index);
529
530    id_to_index
531}
532
533/// Decodes primary keys from a batch and returns decoded primary key information.
534///
535/// The batch must contain a primary key column at the expected index.
536pub(crate) fn decode_primary_keys(
537    codec: &dyn PrimaryKeyCodec,
538    batch: &RecordBatch,
539) -> Result<DecodedPrimaryKeys> {
540    let primary_key_index = primary_key_column_index(batch.num_columns());
541    let pk_dict_array = batch
542        .column(primary_key_index)
543        .as_any()
544        .downcast_ref::<PrimaryKeyArray>()
545        .with_context(|| InvalidRecordBatchSnafu {
546            reason: "Primary key column is not a dictionary array".to_string(),
547        })?;
548    let pk_values_array = pk_dict_array
549        .values()
550        .as_any()
551        .downcast_ref::<BinaryArray>()
552        .with_context(|| InvalidRecordBatchSnafu {
553            reason: "Primary key values are not binary array".to_string(),
554        })?;
555
556    let keys = pk_dict_array.keys();
557
558    // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys.
559    // Maps original key index -> new decoded value index
560    let mut key_to_decoded_index = Vec::with_capacity(keys.len());
561    let mut decoded_pk_values = Vec::new();
562    let mut prev_key: Option<u32> = None;
563
564    // The parquet reader may read the whole dictionary page into the dictionary values, so
565    // we may decode many primary keys not in this batch if we decode the values array directly.
566    for i in 0..keys.len() {
567        let current_key = keys.value(i);
568
569        // Check if current key is the same as previous key
570        if let Some(prev) = prev_key
571            && prev == current_key
572        {
573            // Reuse the last decoded index
574            key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
575            continue;
576        }
577
578        // New key, decodes the value
579        let pk_bytes = pk_values_array.value(current_key as usize);
580        let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
581
582        decoded_pk_values.push(decoded_value);
583        key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
584        prev_key = Some(current_key);
585    }
586
587    // Create the keys array from key_to_decoded_index
588    let keys_array = UInt32Array::from(key_to_decoded_index);
589
590    Ok(DecodedPrimaryKeys {
591        decoded_pk_values,
592        keys_array,
593    })
594}
595
596/// Holds decoded primary key values and their indices.
597pub(crate) struct DecodedPrimaryKeys {
598    /// Decoded primary key values for unique keys in the dictionary.
599    decoded_pk_values: Vec<CompositeValues>,
600    /// Prebuilt keys array for creating dictionary arrays.
601    keys_array: UInt32Array,
602}
603
604impl DecodedPrimaryKeys {
605    /// Gets a tag column array by column id and data type.
606    ///
607    /// For sparse encoding, uses column_id to lookup values.
608    /// For dense encoding, uses pk_index to get values.
609    pub(crate) fn get_tag_column(
610        &self,
611        column_id: ColumnId,
612        pk_index: Option<usize>,
613        column_type: &ConcreteDataType,
614    ) -> Result<ArrayRef> {
615        // Gets values from the primary key.
616        let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
617        for decoded in &self.decoded_pk_values {
618            match decoded {
619                CompositeValues::Dense(dense) => {
620                    let pk_idx = pk_index.expect("pk_index required for dense encoding");
621                    if pk_idx < dense.len() {
622                        builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
623                    } else {
624                        builder.push_null();
625                    }
626                }
627                CompositeValues::Sparse(sparse) => {
628                    let value = sparse.get_or_null(column_id);
629                    builder.push_value_ref(&value.as_value_ref());
630                }
631            };
632        }
633
634        let values_vector = builder.to_vector();
635        let values_array = values_vector.to_arrow_array();
636
637        // Only creates dictionary array for string types, otherwise take values by keys
638        if column_type.is_string() {
639            // Creates dictionary array using the same keys for string types
640            // Note that the dictionary values may have nulls.
641            let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
642            Ok(Arc::new(dict_array))
643        } else {
644            // For non-string types, takes values by keys indices to create a regular array
645            let taken_array =
646                take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
647            Ok(taken_array)
648        }
649    }
650}
651
652/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
653/// primary key columns in flat format.
654pub(crate) struct FlatConvertFormat {
655    /// Metadata of the region.
656    metadata: RegionMetadataRef,
657    /// Primary key codec to decode primary keys.
658    codec: Arc<dyn PrimaryKeyCodec>,
659    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
660    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
661}
662
663impl FlatConvertFormat {
664    /// Creates a new `FlatConvertFormat`.
665    ///
666    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
667    /// The `codec` is the primary key codec of the `metadata`.
668    ///
669    /// Returns `None` if there is no primary key.
670    pub(crate) fn new(
671        metadata: RegionMetadataRef,
672        format_projection: &FormatProjection,
673        codec: Arc<dyn PrimaryKeyCodec>,
674    ) -> Option<Self> {
675        if metadata.primary_key.is_empty() {
676            return None;
677        }
678
679        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
680        let mut projected_primary_keys = Vec::new();
681        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
682            if format_projection
683                .column_id_to_projected_index
684                .contains_key(&column_id)
685            {
686                // We expect the format_projection is built from the metadata.
687                let column_index = metadata.column_index_by_id(column_id).unwrap();
688                projected_primary_keys.push((column_id, pk_index, column_index));
689            }
690        }
691
692        Some(Self {
693            metadata,
694            codec,
695            projected_primary_keys,
696        })
697    }
698
699    /// Converts a batch to have decoded primary key columns in flat format.
700    ///
701    /// The primary key array in the batch is a dictionary array.
702    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
703        if self.projected_primary_keys.is_empty() {
704            return Ok(batch);
705        }
706
707        let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
708
709        // Builds decoded tag column arrays.
710        let mut decoded_columns = Vec::new();
711        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
712            let column_metadata = &self.metadata.column_metadatas[*column_index];
713            let tag_column = decoded_pks.get_tag_column(
714                *column_id,
715                Some(*pk_index),
716                &column_metadata.column_schema.data_type,
717            )?;
718            decoded_columns.push(tag_column);
719        }
720
721        // Builds new columns: decoded tag columns first, then original columns
722        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
723        new_columns.extend(decoded_columns);
724        new_columns.extend_from_slice(batch.columns());
725
726        // Builds new schema
727        let mut new_fields =
728            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
729        for (_, _, column_index) in &self.projected_primary_keys {
730            let column_metadata = &self.metadata.column_metadatas[*column_index];
731            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
732            let field =
733                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
734            new_fields.push(field);
735        }
736        new_fields.extend(batch.schema().fields().iter().cloned());
737
738        let new_schema = Arc::new(Schema::new(new_fields));
739        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
740    }
741}
742
743#[cfg(test)]
744impl FlatReadFormat {
745    /// Creates a helper with existing `metadata` and all columns.
746    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
747        Self::new(
748            Arc::clone(&metadata),
749            metadata.column_metadatas.iter().map(|c| c.column_id),
750            None,
751            "test",
752            false,
753        )
754        .unwrap()
755    }
756}