mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52    NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55    FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56    StatValues,
57};
58use crate::sst::{
59    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60    to_flat_sst_arrow_schema,
61};
62
63/// Helper for writing the SST format.
64pub(crate) struct FlatWriteFormat {
65    /// SST file schema.
66    arrow_schema: SchemaRef,
67    override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71    /// Creates a new helper.
72    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74        FlatWriteFormat {
75            arrow_schema,
76            override_sequence: None,
77        }
78    }
79
80    /// Set override sequence.
81    pub(crate) fn with_override_sequence(
82        mut self,
83        override_sequence: Option<SequenceNumber>,
84    ) -> Self {
85        self.override_sequence = override_sequence;
86        self
87    }
88
89    /// Gets the arrow schema to store in parquet.
90    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91        &self.arrow_schema
92    }
93
94    /// Convert `batch` to a arrow record batch to store in parquet.
95    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98        let Some(override_sequence) = self.override_sequence else {
99            return Ok(batch.clone());
100        };
101
102        let mut columns = batch.columns().to_vec();
103        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104        columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107    }
108}
109
110/// Returns the position of the sequence column.
111pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112    num_columns - 2
113}
114
115/// Returns the position of the time index column.
116pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117    num_columns - 4
118}
119
120/// Returns the position of the primary key column.
121pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122    num_columns - 3
123}
124
125/// Returns the position of the op type key column.
126pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127    num_columns - 1
128}
129
130// TODO(yingwen): Add an option to skip reading internal columns if the region is
131// append only and doesn't use sparse encoding (We need to check the table id under
132// sparse encoding).
133/// Helper for reading the flat SST format with projection.
134///
135/// It only supports flat format that stores primary keys additionally.
136pub struct FlatReadFormat {
137    /// Sequence number to override the sequence read from the SST.
138    override_sequence: Option<SequenceNumber>,
139    /// Parquet format adapter.
140    parquet_adapter: ParquetAdapter,
141}
142
143impl FlatReadFormat {
144    /// Creates a helper with existing `metadata` and `column_ids` to read.
145    ///
146    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
147    pub fn new(
148        metadata: RegionMetadataRef,
149        column_ids: impl Iterator<Item = ColumnId>,
150        num_columns: Option<usize>,
151        file_path: &str,
152        skip_auto_convert: bool,
153    ) -> Result<FlatReadFormat> {
154        let is_legacy = match num_columns {
155            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
156            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
157        };
158
159        let parquet_adapter = if is_legacy {
160            // Safety: is_legacy_format() ensures primary_key is not empty.
161            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
162                // Only skip auto convert when the primary key encoding is sparse.
163                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
164                    metadata,
165                    column_ids,
166                    skip_auto_convert,
167                ))
168            } else {
169                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
170                    metadata, column_ids, false,
171                ))
172            }
173        } else {
174            ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
175        };
176
177        Ok(FlatReadFormat {
178            override_sequence: None,
179            parquet_adapter,
180        })
181    }
182
183    /// Sets the sequence number to override.
184    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
185        self.override_sequence = sequence;
186    }
187
188    /// Index of a column in the projected batch by its column id.
189    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
190        self.format_projection()
191            .column_id_to_projected_index
192            .get(&column_id)
193            .copied()
194    }
195
196    /// Returns min values of specific column in row groups.
197    pub fn min_values(
198        &self,
199        row_groups: &[impl Borrow<RowGroupMetaData>],
200        column_id: ColumnId,
201    ) -> StatValues {
202        match &self.parquet_adapter {
203            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
204            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
205        }
206    }
207
208    /// Returns max values of specific column in row groups.
209    pub fn max_values(
210        &self,
211        row_groups: &[impl Borrow<RowGroupMetaData>],
212        column_id: ColumnId,
213    ) -> StatValues {
214        match &self.parquet_adapter {
215            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
216            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
217        }
218    }
219
220    /// Returns null counts of specific column in row groups.
221    pub fn null_counts(
222        &self,
223        row_groups: &[impl Borrow<RowGroupMetaData>],
224        column_id: ColumnId,
225    ) -> StatValues {
226        match &self.parquet_adapter {
227            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
228            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
229        }
230    }
231
232    /// Gets the arrow schema of the SST file.
233    ///
234    /// This schema is computed from the region metadata but should be the same
235    /// as the arrow schema decoded from the file metadata.
236    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
237        match &self.parquet_adapter {
238            ParquetAdapter::Flat(p) => &p.arrow_schema,
239            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
240        }
241    }
242
243    /// Gets the metadata of the SST.
244    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
245        match &self.parquet_adapter {
246            ParquetAdapter::Flat(p) => &p.metadata,
247            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
248        }
249    }
250
251    /// Gets sorted projection indices to read from the SST file.
252    pub(crate) fn projection_indices(&self) -> &[usize] {
253        match &self.parquet_adapter {
254            ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
255            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
256        }
257    }
258
259    /// Gets the projection in the flat format.
260    ///
261    /// When `skip_auto_convert` is enabled (primary-key format read), this returns the
262    /// primary-key format projection so filter/prune can resolve projected indices.
263    pub(crate) fn format_projection(&self) -> &FormatProjection {
264        match &self.parquet_adapter {
265            ParquetAdapter::Flat(p) => &p.format_projection,
266            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
267        }
268    }
269
270    /// Creates a sequence array to override.
271    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
272        self.override_sequence
273            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
274    }
275
276    /// Convert a record batch to apply flat format conversion and override sequence array.
277    ///
278    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
279    /// then the sequence column replaced by the override sequence array.
280    pub(crate) fn convert_batch(
281        &self,
282        record_batch: RecordBatch,
283        override_sequence_array: Option<&ArrayRef>,
284    ) -> Result<RecordBatch> {
285        // First, apply flat format conversion.
286        let batch = match &self.parquet_adapter {
287            ParquetAdapter::Flat(_) => record_batch,
288            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
289        };
290
291        // Then apply sequence override if provided
292        let Some(override_array) = override_sequence_array else {
293            return Ok(batch);
294        };
295
296        let mut columns = batch.columns().to_vec();
297        let sequence_column_idx = sequence_column_index(batch.num_columns());
298
299        // Use the provided override sequence array, slicing if necessary to match batch length
300        let sequence_array = if override_array.len() > batch.num_rows() {
301            override_array.slice(0, batch.num_rows())
302        } else {
303            override_array.clone()
304        };
305
306        columns[sequence_column_idx] = sequence_array;
307
308        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
309    }
310
311    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
312    ///
313    /// * `metadata` is the region metadata (always assumes flat format).
314    /// * `num_columns` is the number of columns in the parquet file.
315    /// * `file_path` is the path to the parquet file, for error message.
316    pub(crate) fn is_legacy_format(
317        metadata: &RegionMetadata,
318        num_columns: usize,
319        file_path: &str,
320    ) -> Result<bool> {
321        if metadata.primary_key.is_empty() {
322            return Ok(false);
323        }
324
325        // For flat format, compute expected column number:
326        // all columns + internal columns (pk, sequence, op_type)
327        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
328
329        if expected_columns == num_columns {
330            // Same number of columns, no conversion needed
331            Ok(false)
332        } else {
333            ensure!(
334                expected_columns >= num_columns,
335                InvalidParquetSnafu {
336                    file: file_path,
337                    reason: format!(
338                        "Expected columns {} should be >= actual columns {}",
339                        expected_columns, num_columns
340                    )
341                }
342            );
343
344            // Different number of columns, check if the difference matches primary key count
345            let column_diff = expected_columns - num_columns;
346
347            ensure!(
348                column_diff == metadata.primary_key.len(),
349                InvalidParquetSnafu {
350                    file: file_path,
351                    reason: format!(
352                        "Column number difference {} does not match primary key count {}",
353                        column_diff,
354                        metadata.primary_key.len()
355                    )
356                }
357            );
358
359            Ok(true)
360        }
361    }
362}
363
364/// Wraps the parquet helper for different formats.
365enum ParquetAdapter {
366    Flat(ParquetFlat),
367    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
368}
369
370/// Helper to reads the parquet from primary key format into the flat format.
371struct ParquetPrimaryKeyToFlat {
372    /// The primary key format to read the parquet.
373    format: PrimaryKeyReadFormat,
374    /// Format converter for handling flat format conversion.
375    convert_format: Option<FlatConvertFormat>,
376    /// Projection computed for the flat format.
377    format_projection: FormatProjection,
378}
379
380impl ParquetPrimaryKeyToFlat {
381    /// Creates a helper with existing `metadata` and `column_ids` to read.
382    fn new(
383        metadata: RegionMetadataRef,
384        column_ids: impl Iterator<Item = ColumnId>,
385        skip_auto_convert: bool,
386    ) -> ParquetPrimaryKeyToFlat {
387        assert!(if skip_auto_convert {
388            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
389        } else {
390            true
391        });
392
393        let column_ids: Vec<_> = column_ids.collect();
394
395        // Creates a map to lookup index based on the new format.
396        let id_to_index = sst_column_id_indices(&metadata);
397        let sst_column_num =
398            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
399
400        let codec = build_primary_key_codec(&metadata);
401        let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
402        let (convert_format, format_projection) = if skip_auto_convert {
403            (
404                None,
405                FormatProjection {
406                    projection_indices: format.projection_indices().to_vec(),
407                    column_id_to_projected_index: format.field_id_to_projected_index().clone(),
408                },
409            )
410        } else {
411            // Computes the format projection for the new format.
412            let format_projection = FormatProjection::compute_format_projection(
413                &id_to_index,
414                sst_column_num,
415                column_ids.iter().copied(),
416            );
417            (
418                FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
419                format_projection,
420            )
421        };
422
423        Self {
424            format,
425            convert_format,
426            format_projection,
427        }
428    }
429
430    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
431        if let Some(convert_format) = &self.convert_format {
432            convert_format.convert(record_batch)
433        } else {
434            Ok(record_batch)
435        }
436    }
437}
438
439/// Helper to reads the parquet in flat format directly.
440struct ParquetFlat {
441    /// The metadata stored in the SST.
442    metadata: RegionMetadataRef,
443    /// SST file schema.
444    arrow_schema: SchemaRef,
445    /// Projection computed for the flat format.
446    format_projection: FormatProjection,
447    /// Column id to index in SST.
448    column_id_to_sst_index: HashMap<ColumnId, usize>,
449}
450
451impl ParquetFlat {
452    /// Creates a helper with existing `metadata` and `column_ids` to read.
453    fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
454        // Creates a map to lookup index.
455        let id_to_index = sst_column_id_indices(&metadata);
456        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
457        let sst_column_num =
458            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
459        let format_projection =
460            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
461
462        Self {
463            metadata,
464            arrow_schema,
465            format_projection,
466            column_id_to_sst_index: id_to_index,
467        }
468    }
469
470    /// Returns min values of specific column in row groups.
471    fn min_values(
472        &self,
473        row_groups: &[impl Borrow<RowGroupMetaData>],
474        column_id: ColumnId,
475    ) -> StatValues {
476        self.get_stat_values(row_groups, column_id, true)
477    }
478
479    /// Returns max values of specific column in row groups.
480    fn max_values(
481        &self,
482        row_groups: &[impl Borrow<RowGroupMetaData>],
483        column_id: ColumnId,
484    ) -> StatValues {
485        self.get_stat_values(row_groups, column_id, false)
486    }
487
488    /// Returns null counts of specific column in row groups.
489    fn null_counts(
490        &self,
491        row_groups: &[impl Borrow<RowGroupMetaData>],
492        column_id: ColumnId,
493    ) -> StatValues {
494        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
495            // No such column in the SST.
496            return StatValues::NoColumn;
497        };
498
499        let stats = ReadFormat::column_null_counts(row_groups, *index);
500        StatValues::from_stats_opt(stats)
501    }
502
503    fn get_stat_values(
504        &self,
505        row_groups: &[impl Borrow<RowGroupMetaData>],
506        column_id: ColumnId,
507        is_min: bool,
508    ) -> StatValues {
509        let Some(column) = self.metadata.column_by_id(column_id) else {
510            // No such column in the SST.
511            return StatValues::NoColumn;
512        };
513        // Safety: `column_id_to_sst_index` is built from `metadata`.
514        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
515
516        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
517        StatValues::from_stats_opt(stats)
518    }
519}
520
521/// Returns a map that the key is the column id and the value is the column position
522/// in the SST.
523/// It only supports SSTs with raw primary key columns.
524pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
525    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
526    let mut column_index = 0;
527    // keys
528    for pk_id in &metadata.primary_key {
529        id_to_index.insert(*pk_id, column_index);
530        column_index += 1;
531    }
532    // fields
533    for column in &metadata.column_metadatas {
534        if column.semantic_type == SemanticType::Field {
535            id_to_index.insert(column.column_id, column_index);
536            column_index += 1;
537        }
538    }
539    // time index
540    id_to_index.insert(metadata.time_index_column().column_id, column_index);
541
542    id_to_index
543}
544
545/// Decodes primary keys from a batch and returns decoded primary key information.
546///
547/// The batch must contain a primary key column at the expected index.
548pub(crate) fn decode_primary_keys(
549    codec: &dyn PrimaryKeyCodec,
550    batch: &RecordBatch,
551) -> Result<DecodedPrimaryKeys> {
552    let primary_key_index = primary_key_column_index(batch.num_columns());
553    let pk_dict_array = batch
554        .column(primary_key_index)
555        .as_any()
556        .downcast_ref::<PrimaryKeyArray>()
557        .with_context(|| InvalidRecordBatchSnafu {
558            reason: "Primary key column is not a dictionary array".to_string(),
559        })?;
560    let pk_values_array = pk_dict_array
561        .values()
562        .as_any()
563        .downcast_ref::<BinaryArray>()
564        .with_context(|| InvalidRecordBatchSnafu {
565            reason: "Primary key values are not binary array".to_string(),
566        })?;
567
568    let keys = pk_dict_array.keys();
569
570    // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys.
571    // Maps original key index -> new decoded value index
572    let mut key_to_decoded_index = Vec::with_capacity(keys.len());
573    let mut decoded_pk_values = Vec::new();
574    let mut prev_key: Option<u32> = None;
575
576    // The parquet reader may read the whole dictionary page into the dictionary values, so
577    // we may decode many primary keys not in this batch if we decode the values array directly.
578    let pk_indices = keys.values();
579    for &current_key in pk_indices.iter().take(keys.len()) {
580        // Check if current key is the same as previous key
581        if let Some(prev) = prev_key
582            && prev == current_key
583        {
584            // Reuse the last decoded index
585            key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
586            continue;
587        }
588
589        // New key, decodes the value
590        let pk_bytes = pk_values_array.value(current_key as usize);
591        let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
592
593        decoded_pk_values.push(decoded_value);
594        key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
595        prev_key = Some(current_key);
596    }
597
598    // Create the keys array from key_to_decoded_index
599    let keys_array = UInt32Array::from(key_to_decoded_index);
600
601    Ok(DecodedPrimaryKeys {
602        decoded_pk_values,
603        keys_array,
604    })
605}
606
607/// Holds decoded primary key values and their indices.
608pub(crate) struct DecodedPrimaryKeys {
609    /// Decoded primary key values for unique keys in the dictionary.
610    decoded_pk_values: Vec<CompositeValues>,
611    /// Prebuilt keys array for creating dictionary arrays.
612    keys_array: UInt32Array,
613}
614
615impl DecodedPrimaryKeys {
616    /// Gets a tag column array by column id and data type.
617    ///
618    /// For sparse encoding, uses column_id to lookup values.
619    /// For dense encoding, uses pk_index to get values.
620    pub(crate) fn get_tag_column(
621        &self,
622        column_id: ColumnId,
623        pk_index: Option<usize>,
624        column_type: &ConcreteDataType,
625    ) -> Result<ArrayRef> {
626        // Gets values from the primary key.
627        let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
628        for decoded in &self.decoded_pk_values {
629            match decoded {
630                CompositeValues::Dense(dense) => {
631                    let pk_idx = pk_index.expect("pk_index required for dense encoding");
632                    if pk_idx < dense.len() {
633                        builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
634                    } else {
635                        builder.push_null();
636                    }
637                }
638                CompositeValues::Sparse(sparse) => {
639                    let value = sparse.get_or_null(column_id);
640                    builder.push_value_ref(&value.as_value_ref());
641                }
642            };
643        }
644
645        let values_vector = builder.to_vector();
646        let values_array = values_vector.to_arrow_array();
647
648        // Only creates dictionary array for string types, otherwise take values by keys
649        if column_type.is_string() {
650            // Creates dictionary array using the same keys for string types
651            // Note that the dictionary values may have nulls.
652            let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
653            Ok(Arc::new(dict_array))
654        } else {
655            // For non-string types, takes values by keys indices to create a regular array
656            let taken_array =
657                take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
658            Ok(taken_array)
659        }
660    }
661}
662
663/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
664/// primary key columns in flat format.
665pub(crate) struct FlatConvertFormat {
666    /// Metadata of the region.
667    metadata: RegionMetadataRef,
668    /// Primary key codec to decode primary keys.
669    codec: Arc<dyn PrimaryKeyCodec>,
670    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
671    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
672}
673
674impl FlatConvertFormat {
675    /// Creates a new `FlatConvertFormat`.
676    ///
677    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
678    /// The `codec` is the primary key codec of the `metadata`.
679    ///
680    /// Returns `None` if there is no primary key.
681    pub(crate) fn new(
682        metadata: RegionMetadataRef,
683        format_projection: &FormatProjection,
684        codec: Arc<dyn PrimaryKeyCodec>,
685    ) -> Option<Self> {
686        if metadata.primary_key.is_empty() {
687            return None;
688        }
689
690        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
691        let mut projected_primary_keys = Vec::new();
692        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
693            if format_projection
694                .column_id_to_projected_index
695                .contains_key(&column_id)
696            {
697                // We expect the format_projection is built from the metadata.
698                let column_index = metadata.column_index_by_id(column_id).unwrap();
699                projected_primary_keys.push((column_id, pk_index, column_index));
700            }
701        }
702
703        Some(Self {
704            metadata,
705            codec,
706            projected_primary_keys,
707        })
708    }
709
710    /// Converts a batch to have decoded primary key columns in flat format.
711    ///
712    /// The primary key array in the batch is a dictionary array.
713    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
714        if self.projected_primary_keys.is_empty() {
715            return Ok(batch);
716        }
717
718        let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
719
720        // Builds decoded tag column arrays.
721        let mut decoded_columns = Vec::new();
722        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
723            let column_metadata = &self.metadata.column_metadatas[*column_index];
724            let tag_column = decoded_pks.get_tag_column(
725                *column_id,
726                Some(*pk_index),
727                &column_metadata.column_schema.data_type,
728            )?;
729            decoded_columns.push(tag_column);
730        }
731
732        // Builds new columns: decoded tag columns first, then original columns
733        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
734        new_columns.extend(decoded_columns);
735        new_columns.extend_from_slice(batch.columns());
736
737        // Builds new schema
738        let mut new_fields =
739            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
740        for (_, _, column_index) in &self.projected_primary_keys {
741            let column_metadata = &self.metadata.column_metadatas[*column_index];
742            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
743            let field =
744                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
745            new_fields.push(field);
746        }
747        new_fields.extend(batch.schema().fields().iter().cloned());
748
749        let new_schema = Arc::new(Schema::new(new_fields));
750        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
751    }
752}
753
754#[cfg(test)]
755impl FlatReadFormat {
756    /// Creates a helper with existing `metadata` and all columns.
757    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
758        Self::new(
759            Arc::clone(&metadata),
760            metadata.column_metadatas.iter().map(|c| c.column_id),
761            None,
762            "test",
763            false,
764        )
765        .unwrap()
766    }
767}