Skip to main content

mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52    NewRecordBatchSnafu, Result,
53};
54use crate::read::read_columns::ReadColumns;
55use crate::sst::parquet::format::{
56    FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
57    PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
58};
59use crate::sst::parquet::read_columns::ParquetReadColumns;
60use crate::sst::{
61    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
62    to_flat_sst_arrow_schema, with_field_id,
63};
64
65/// Helper for writing the SST format.
66pub(crate) struct FlatWriteFormat {
67    /// SST file schema.
68    arrow_schema: SchemaRef,
69    override_sequence: Option<SequenceNumber>,
70}
71
72impl FlatWriteFormat {
73    /// Creates a new helper.
74    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
75        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
76        FlatWriteFormat {
77            arrow_schema,
78            override_sequence: None,
79        }
80    }
81
82    /// Set override sequence.
83    pub(crate) fn with_override_sequence(
84        mut self,
85        override_sequence: Option<SequenceNumber>,
86    ) -> Self {
87        self.override_sequence = override_sequence;
88        self
89    }
90
91    /// Gets the arrow schema to store in parquet.
92    #[cfg(test)]
93    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
94        &self.arrow_schema
95    }
96
97    /// Convert `batch` to a arrow record batch to store in parquet.
98    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
99        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
100
101        let Some(override_sequence) = self.override_sequence else {
102            return Ok(batch.clone());
103        };
104
105        let mut columns = batch.columns().to_vec();
106        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
107        columns[sequence_column_index(batch.num_columns())] = sequence_array;
108
109        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
110    }
111}
112
113/// Returns the position of the sequence column.
114pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
115    num_columns - 2
116}
117
118/// Returns the position of the time index column.
119pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
120    num_columns - 4
121}
122
123/// Returns the position of the primary key column.
124pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
125    num_columns - 3
126}
127
128/// Returns the position of the op type key column.
129pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
130    num_columns - 1
131}
132
133/// Returns the start index of field columns in a flat batch.
134///
135/// `num_columns` is the total number of columns in the flat batch schema,
136/// including tag columns (if present), field columns, and fixed position columns
137/// (time index, primary key, sequence, op type).
138///
139/// For Dense encoding (raw PK columns included): field_column_start = primary_key.len()
140/// For Sparse encoding (no raw PK columns): field_column_start = 0
141pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
142    // Calculates field column start: total columns - fixed columns - field columns
143    // Field column count = total metadata columns - time index column - primary key columns
144    let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
145    num_columns - FIXED_POS_COLUMN_NUM - field_column_count
146}
147
148// TODO(yingwen): Add an option to skip reading internal columns if the region is
149// append only and doesn't use sparse encoding (We need to check the table id under
150// sparse encoding).
151/// Helper for reading the flat SST format with projection.
152///
153/// It only supports flat format that stores primary keys additionally.
154pub struct FlatReadFormat {
155    /// Sequence number to override the sequence read from the SST.
156    override_sequence: Option<SequenceNumber>,
157    /// Parquet format adapter.
158    parquet_adapter: ParquetAdapter,
159}
160
161impl FlatReadFormat {
162    /// Creates a helper with existing `metadata` and `column_ids` to read.
163    ///
164    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
165    pub fn new(
166        metadata: RegionMetadataRef,
167        read_cols: ReadColumns,
168        file_schema: Option<SchemaRef>,
169        file_path: &str,
170        skip_auto_convert: bool,
171    ) -> Result<FlatReadFormat> {
172        let num_columns = file_schema.as_ref().map(|x| x.fields().len());
173        let is_legacy = match num_columns {
174            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
175            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
176        };
177
178        let parquet_adapter = if is_legacy {
179            // Safety: is_legacy_format() ensures primary_key is not empty.
180            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
181                // Only skip auto convert when the primary key encoding is sparse.
182                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
183                    metadata,
184                    read_cols,
185                    skip_auto_convert,
186                ))
187            } else {
188                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
189                    metadata, read_cols, false,
190                ))
191            }
192        } else {
193            let file_schema = file_schema
194                .unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
195            ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, file_schema))
196        };
197
198        Ok(FlatReadFormat {
199            override_sequence: None,
200            parquet_adapter,
201        })
202    }
203
204    /// Sets the sequence number to override.
205    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
206        self.override_sequence = sequence;
207    }
208
209    /// Index of a column in the projected batch by its column id.
210    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
211        self.format_projection()
212            .column_id_to_projected_index
213            .get(&column_id)
214            .copied()
215    }
216
217    /// Returns min values of specific column in row groups.
218    pub fn min_values(
219        &self,
220        row_groups: &[impl Borrow<RowGroupMetaData>],
221        column_id: ColumnId,
222    ) -> StatValues {
223        match &self.parquet_adapter {
224            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
225            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
226        }
227    }
228
229    /// Returns max values of specific column in row groups.
230    pub fn max_values(
231        &self,
232        row_groups: &[impl Borrow<RowGroupMetaData>],
233        column_id: ColumnId,
234    ) -> StatValues {
235        match &self.parquet_adapter {
236            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
237            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
238        }
239    }
240
241    /// Returns null counts of specific column in row groups.
242    pub fn null_counts(
243        &self,
244        row_groups: &[impl Borrow<RowGroupMetaData>],
245        column_id: ColumnId,
246    ) -> StatValues {
247        match &self.parquet_adapter {
248            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
249            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
250        }
251    }
252
253    /// Gets the arrow schema of the SST file.
254    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
255        match &self.parquet_adapter {
256            ParquetAdapter::Flat(p) => &p.arrow_schema,
257            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
258        }
259    }
260
261    /// Gets the projected output schema produced by parquet reading.
262    pub(crate) fn output_arrow_schema(&self) -> Result<SchemaRef> {
263        let projection = self.parquet_read_columns().root_indices();
264        let schema = self
265            .arrow_schema()
266            .project(projection)
267            .context(ComputeArrowSnafu)?;
268        Ok(Arc::new(schema))
269    }
270
271    /// Gets the metadata of the SST.
272    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
273        match &self.parquet_adapter {
274            ParquetAdapter::Flat(p) => &p.metadata,
275            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
276        }
277    }
278
279    /// Get the sorted read columns to read from the sst file.
280    pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
281        match &self.parquet_adapter {
282            ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols,
283            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(),
284        }
285    }
286
287    /// Gets the projection in the flat format.
288    ///
289    /// When `skip_auto_convert` is enabled (primary-key format read), this returns the
290    /// primary-key format projection so filter/prune can resolve projected indices.
291    pub(crate) fn format_projection(&self) -> &FormatProjection {
292        match &self.parquet_adapter {
293            ParquetAdapter::Flat(p) => &p.format_projection,
294            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
295        }
296    }
297
298    /// Returns `true` if raw batches from parquet use the flat layout and
299    /// stores primary key columns as raw columns.
300    /// Returns `false` for the legacy primary-key-to-flat conversion path.
301    pub(crate) fn batch_has_raw_pk_columns(&self) -> bool {
302        matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
303    }
304
305    /// Creates a sequence array to override.
306    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
307        self.override_sequence
308            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
309    }
310
311    /// Convert a record batch to apply flat format conversion and override sequence array.
312    ///
313    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
314    /// then the sequence column replaced by the override sequence array.
315    pub(crate) fn convert_batch(
316        &self,
317        record_batch: RecordBatch,
318        override_sequence_array: Option<&ArrayRef>,
319    ) -> Result<RecordBatch> {
320        // First, apply flat format conversion.
321        let batch = match &self.parquet_adapter {
322            ParquetAdapter::Flat(_) => record_batch,
323            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
324        };
325
326        // Then apply sequence override if provided
327        let Some(override_array) = override_sequence_array else {
328            return Ok(batch);
329        };
330
331        let mut columns = batch.columns().to_vec();
332        let sequence_column_idx = sequence_column_index(batch.num_columns());
333
334        // Use the provided override sequence array, slicing if necessary to match batch length
335        let sequence_array = if override_array.len() > batch.num_rows() {
336            override_array.slice(0, batch.num_rows())
337        } else {
338            override_array.clone()
339        };
340
341        columns[sequence_column_idx] = sequence_array;
342
343        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
344    }
345
346    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
347    ///
348    /// * `metadata` is the region metadata (always assumes flat format).
349    /// * `num_columns` is the number of columns in the parquet file.
350    /// * `file_path` is the path to the parquet file, for error message.
351    pub(crate) fn is_legacy_format(
352        metadata: &RegionMetadata,
353        num_columns: usize,
354        file_path: &str,
355    ) -> Result<bool> {
356        if metadata.primary_key.is_empty() {
357            return Ok(false);
358        }
359
360        // For flat format, compute expected column number:
361        // all columns + internal columns (pk, sequence, op_type)
362        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
363
364        if expected_columns == num_columns {
365            // Same number of columns, no conversion needed
366            Ok(false)
367        } else {
368            ensure!(
369                expected_columns >= num_columns,
370                InvalidParquetSnafu {
371                    file: file_path,
372                    reason: format!(
373                        "Expected columns {} should be >= actual columns {}",
374                        expected_columns, num_columns
375                    )
376                }
377            );
378
379            // Different number of columns, check if the difference matches primary key count
380            let column_diff = expected_columns - num_columns;
381
382            ensure!(
383                column_diff == metadata.primary_key.len(),
384                InvalidParquetSnafu {
385                    file: file_path,
386                    reason: format!(
387                        "Column number difference {} does not match primary key count {}",
388                        column_diff,
389                        metadata.primary_key.len()
390                    )
391                }
392            );
393
394            Ok(true)
395        }
396    }
397}
398
399/// Wraps the parquet helper for different formats.
400enum ParquetAdapter {
401    Flat(ParquetFlat),
402    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
403}
404
405/// Helper to reads the parquet from primary key format into the flat format.
406struct ParquetPrimaryKeyToFlat {
407    /// The primary key format to read the parquet.
408    format: PrimaryKeyReadFormat,
409    /// Format converter for handling flat format conversion.
410    convert_format: Option<FlatConvertFormat>,
411    /// Projection computed for the flat format.
412    format_projection: FormatProjection,
413}
414
415impl ParquetPrimaryKeyToFlat {
416    /// Creates a helper with existing `metadata` and `column_ids` to read.
417    fn new(
418        metadata: RegionMetadataRef,
419        read_cols: ReadColumns,
420        skip_auto_convert: bool,
421    ) -> ParquetPrimaryKeyToFlat {
422        assert!(if skip_auto_convert {
423            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
424        } else {
425            true
426        });
427
428        // Creates a map to lookup index based on the new format.
429        let id_to_index = sst_column_id_indices(&metadata);
430        let sst_column_num =
431            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
432
433        let codec = build_primary_key_codec(&metadata);
434        let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone());
435        let (convert_format, format_projection) = if skip_auto_convert {
436            (
437                None,
438                FormatProjection {
439                    parquet_read_cols: format.parquet_read_columns().clone(),
440                    column_id_to_projected_index: format.field_id_to_projected_index().clone(),
441                },
442            )
443        } else {
444            // Computes the format projection for the new format.
445            let format_projection = FormatProjection::compute_format_projection(
446                &id_to_index,
447                sst_column_num,
448                read_cols.clone(),
449            );
450            (
451                FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
452                format_projection,
453            )
454        };
455
456        Self {
457            format,
458            convert_format,
459            format_projection,
460        }
461    }
462
463    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
464        if let Some(convert_format) = &self.convert_format {
465            convert_format.convert(record_batch)
466        } else {
467            Ok(record_batch)
468        }
469    }
470}
471
472/// Helper to reads the parquet in flat format directly.
473struct ParquetFlat {
474    /// The metadata stored in the SST.
475    metadata: RegionMetadataRef,
476    /// SST file schema.
477    arrow_schema: SchemaRef,
478    /// Projection computed for the flat format.
479    format_projection: FormatProjection,
480    /// Column id to index in SST.
481    column_id_to_sst_index: HashMap<ColumnId, usize>,
482}
483
484impl ParquetFlat {
485    /// Creates a helper with existing `metadata` and `column_ids` to read.
486    fn new(
487        metadata: RegionMetadataRef,
488        read_cols: ReadColumns,
489        arrow_schema: SchemaRef,
490    ) -> ParquetFlat {
491        // Creates a map to lookup index.
492        let id_to_index = sst_column_id_indices(&metadata);
493        let sst_column_num =
494            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
495        let format_projection =
496            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols);
497
498        Self {
499            metadata,
500            arrow_schema,
501            format_projection,
502            column_id_to_sst_index: id_to_index,
503        }
504    }
505
506    /// Returns min values of specific column in row groups.
507    fn min_values(
508        &self,
509        row_groups: &[impl Borrow<RowGroupMetaData>],
510        column_id: ColumnId,
511    ) -> StatValues {
512        self.get_stat_values(row_groups, column_id, true)
513    }
514
515    /// Returns max values of specific column in row groups.
516    fn max_values(
517        &self,
518        row_groups: &[impl Borrow<RowGroupMetaData>],
519        column_id: ColumnId,
520    ) -> StatValues {
521        self.get_stat_values(row_groups, column_id, false)
522    }
523
524    /// Returns null counts of specific column in row groups.
525    fn null_counts(
526        &self,
527        row_groups: &[impl Borrow<RowGroupMetaData>],
528        column_id: ColumnId,
529    ) -> StatValues {
530        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
531            // No such column in the SST.
532            return StatValues::NoColumn;
533        };
534
535        let stats = column_null_counts(row_groups, *index);
536        StatValues::from_stats_opt(stats)
537    }
538
539    fn get_stat_values(
540        &self,
541        row_groups: &[impl Borrow<RowGroupMetaData>],
542        column_id: ColumnId,
543        is_min: bool,
544    ) -> StatValues {
545        let Some(column) = self.metadata.column_by_id(column_id) else {
546            // No such column in the SST.
547            return StatValues::NoColumn;
548        };
549        // Safety: `column_id_to_sst_index` is built from `metadata`.
550        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
551
552        let stats = column_values(row_groups, column, *index, is_min);
553        StatValues::from_stats_opt(stats)
554    }
555}
556
557/// Returns a map that the key is the column id and the value is the column position
558/// in the SST.
559/// It only supports SSTs with raw primary key columns.
560pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
561    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
562    let mut column_index = 0;
563    // keys
564    for pk_id in &metadata.primary_key {
565        id_to_index.insert(*pk_id, column_index);
566        column_index += 1;
567    }
568    // fields
569    for column in &metadata.column_metadatas {
570        if column.semantic_type == SemanticType::Field {
571            id_to_index.insert(column.column_id, column_index);
572            column_index += 1;
573        }
574    }
575    // time index
576    id_to_index.insert(metadata.time_index_column().column_id, column_index);
577
578    id_to_index
579}
580
581/// Decodes primary keys from a batch and returns decoded primary key information.
582///
583/// The batch must contain a primary key column at the expected index.
584pub(crate) fn decode_primary_keys(
585    codec: &dyn PrimaryKeyCodec,
586    batch: &RecordBatch,
587) -> Result<DecodedPrimaryKeys> {
588    let primary_key_index = primary_key_column_index(batch.num_columns());
589    let pk_dict_array = batch
590        .column(primary_key_index)
591        .as_any()
592        .downcast_ref::<PrimaryKeyArray>()
593        .with_context(|| InvalidRecordBatchSnafu {
594            reason: "Primary key column is not a dictionary array".to_string(),
595        })?;
596    let pk_values_array = pk_dict_array
597        .values()
598        .as_any()
599        .downcast_ref::<BinaryArray>()
600        .with_context(|| InvalidRecordBatchSnafu {
601            reason: "Primary key values are not binary array".to_string(),
602        })?;
603
604    let keys = pk_dict_array.keys();
605
606    // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys.
607    // Maps original key index -> new decoded value index
608    let mut key_to_decoded_index = Vec::with_capacity(keys.len());
609    let mut decoded_pk_values = Vec::new();
610    let mut prev_key: Option<u32> = None;
611
612    // The parquet reader may read the whole dictionary page into the dictionary values, so
613    // we may decode many primary keys not in this batch if we decode the values array directly.
614    let pk_indices = keys.values();
615    for &current_key in pk_indices.iter().take(keys.len()) {
616        // Check if current key is the same as previous key
617        if let Some(prev) = prev_key
618            && prev == current_key
619        {
620            // Reuse the last decoded index
621            key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
622            continue;
623        }
624
625        // New key, decodes the value
626        let pk_bytes = pk_values_array.value(current_key as usize);
627        let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
628
629        decoded_pk_values.push(decoded_value);
630        key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
631        prev_key = Some(current_key);
632    }
633
634    // Create the keys array from key_to_decoded_index
635    let keys_array = UInt32Array::from(key_to_decoded_index);
636
637    Ok(DecodedPrimaryKeys {
638        decoded_pk_values,
639        keys_array,
640    })
641}
642
643/// Holds decoded primary key values and their indices.
644pub(crate) struct DecodedPrimaryKeys {
645    /// Decoded primary key values for unique keys in the dictionary.
646    decoded_pk_values: Vec<CompositeValues>,
647    /// Prebuilt keys array for creating dictionary arrays.
648    keys_array: UInt32Array,
649}
650
651impl DecodedPrimaryKeys {
652    /// Gets a tag column array by column id and data type.
653    ///
654    /// For sparse encoding, uses column_id to lookup values.
655    /// For dense encoding, uses pk_index to get values.
656    pub(crate) fn get_tag_column(
657        &self,
658        column_id: ColumnId,
659        pk_index: Option<usize>,
660        column_type: &ConcreteDataType,
661    ) -> Result<ArrayRef> {
662        // Gets values from the primary key.
663        let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
664        for decoded in &self.decoded_pk_values {
665            match decoded {
666                CompositeValues::Dense(dense) => {
667                    let pk_idx = pk_index.expect("pk_index required for dense encoding");
668                    if pk_idx < dense.len() {
669                        builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
670                    } else {
671                        builder.push_null();
672                    }
673                }
674                CompositeValues::Sparse(sparse) => {
675                    let value = sparse.get_or_null(column_id);
676                    builder.push_value_ref(&value.as_value_ref());
677                }
678            };
679        }
680
681        let values_vector = builder.to_vector();
682        let values_array = values_vector.to_arrow_array();
683
684        // Only creates dictionary array for string types, otherwise take values by keys
685        if column_type.is_string() {
686            // Creates dictionary array using the same keys for string types
687            // Note that the dictionary values may have nulls.
688            let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
689            Ok(Arc::new(dict_array))
690        } else {
691            // For non-string types, takes values by keys indices to create a regular array
692            let taken_array =
693                take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
694            Ok(taken_array)
695        }
696    }
697}
698
699/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
700/// primary key columns in flat format.
701pub(crate) struct FlatConvertFormat {
702    /// Metadata of the region.
703    metadata: RegionMetadataRef,
704    /// Primary key codec to decode primary keys.
705    codec: Arc<dyn PrimaryKeyCodec>,
706    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
707    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
708}
709
710impl FlatConvertFormat {
711    /// Creates a new `FlatConvertFormat`.
712    ///
713    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
714    /// The `codec` is the primary key codec of the `metadata`.
715    ///
716    /// Returns `None` if there is no primary key.
717    pub(crate) fn new(
718        metadata: RegionMetadataRef,
719        format_projection: &FormatProjection,
720        codec: Arc<dyn PrimaryKeyCodec>,
721    ) -> Option<Self> {
722        if metadata.primary_key.is_empty() {
723            return None;
724        }
725
726        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
727        let mut projected_primary_keys = Vec::new();
728        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
729            if format_projection
730                .column_id_to_projected_index
731                .contains_key(&column_id)
732            {
733                // We expect the format_projection is built from the metadata.
734                let column_index = metadata.column_index_by_id(column_id).unwrap();
735                projected_primary_keys.push((column_id, pk_index, column_index));
736            }
737        }
738
739        Some(Self {
740            metadata,
741            codec,
742            projected_primary_keys,
743        })
744    }
745
746    /// Converts a batch to have decoded primary key columns in flat format.
747    ///
748    /// The primary key array in the batch is a dictionary array.
749    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
750        if self.projected_primary_keys.is_empty() {
751            return Ok(batch);
752        }
753
754        let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
755
756        // Builds decoded tag column arrays.
757        let mut decoded_columns = Vec::new();
758        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
759            let column_metadata = &self.metadata.column_metadatas[*column_index];
760            let tag_column = decoded_pks.get_tag_column(
761                *column_id,
762                Some(*pk_index),
763                &column_metadata.column_schema.data_type,
764            )?;
765            decoded_columns.push(tag_column);
766        }
767
768        // Builds new columns: decoded tag columns first, then original columns
769        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
770        new_columns.extend(decoded_columns);
771        new_columns.extend_from_slice(batch.columns());
772
773        // Builds new schema
774        let mut new_fields =
775            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
776        for (column_id, _, column_index) in &self.projected_primary_keys {
777            let column_metadata = &self.metadata.column_metadatas[*column_index];
778            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
779            let field =
780                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
781            new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id)));
782        }
783        new_fields.extend(batch.schema().fields().iter().cloned());
784
785        let new_schema = Arc::new(Schema::new(new_fields));
786        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
787    }
788}
789
790#[cfg(test)]
791impl FlatReadFormat {
792    /// Creates a helper with existing `metadata` and all columns.
793    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
794        Self::new(
795            Arc::clone(&metadata),
796            ReadColumns::from_deduped_column_ids(
797                metadata.column_metadatas.iter().map(|c| c.column_id),
798            ),
799            None,
800            "test",
801            false,
802        )
803        .unwrap()
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use std::sync::Arc;
810
811    use api::v1::SemanticType;
812    use datatypes::prelude::ConcreteDataType;
813    use datatypes::schema::ColumnSchema;
814    use store_api::codec::PrimaryKeyEncoding;
815    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
816    use store_api::storage::RegionId;
817
818    use super::{FlatReadFormat, field_column_start};
819    use crate::read::read_columns::ReadColumns;
820    use crate::sst::{
821        FlatSchemaOptions, flat_sst_arrow_schema_column_num, to_flat_sst_arrow_schema,
822    };
823
824    /// Builds a `RegionMetadata` with the given number of tags and fields.
825    fn build_metadata(
826        num_tags: usize,
827        num_fields: usize,
828        encoding: PrimaryKeyEncoding,
829    ) -> RegionMetadata {
830        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
831        let mut col_id = 0u32;
832
833        for i in 0..num_tags {
834            builder.push_column_metadata(ColumnMetadata {
835                column_schema: ColumnSchema::new(
836                    format!("tag_{i}"),
837                    ConcreteDataType::string_datatype(),
838                    true,
839                ),
840                semantic_type: SemanticType::Tag,
841                column_id: col_id,
842            });
843            col_id += 1;
844        }
845
846        for i in 0..num_fields {
847            builder.push_column_metadata(ColumnMetadata {
848                column_schema: ColumnSchema::new(
849                    format!("field_{i}"),
850                    ConcreteDataType::uint64_datatype(),
851                    true,
852                ),
853                semantic_type: SemanticType::Field,
854                column_id: col_id,
855            });
856            col_id += 1;
857        }
858
859        builder.push_column_metadata(ColumnMetadata {
860            column_schema: ColumnSchema::new(
861                "ts".to_string(),
862                ConcreteDataType::timestamp_millisecond_datatype(),
863                false,
864            ),
865            semantic_type: SemanticType::Timestamp,
866            column_id: col_id,
867        });
868
869        let primary_key: Vec<u32> = (0..num_tags as u32).collect();
870        builder.primary_key(primary_key);
871        builder.primary_key_encoding(encoding);
872        builder.build().unwrap()
873    }
874
875    #[test]
876    fn test_field_column_start() {
877        // (num_tags, num_fields, encoding, expected)
878        let cases = [
879            (1, 1, PrimaryKeyEncoding::Dense, 1),
880            (2, 2, PrimaryKeyEncoding::Dense, 2),
881            (0, 2, PrimaryKeyEncoding::Dense, 0),
882            (2, 2, PrimaryKeyEncoding::Sparse, 0),
883        ];
884
885        for (num_tags, num_fields, encoding, expected) in cases {
886            let metadata = build_metadata(num_tags, num_fields, encoding);
887            let options = FlatSchemaOptions::from_encoding(encoding);
888            let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
889            let result = field_column_start(&metadata, num_columns);
890            assert_eq!(
891                result, expected,
892                "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
893            );
894        }
895    }
896
897    #[test]
898    fn test_output_arrow_schema_uses_projection() {
899        let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense));
900        let read_format = FlatReadFormat::new(
901            metadata.clone(),
902            ReadColumns::from_deduped_column_ids([0_u32, 2_u32]),
903            None,
904            "test",
905            false,
906        )
907        .unwrap();
908
909        let output_schema = read_format.output_arrow_schema().unwrap();
910        let projection = read_format.parquet_read_columns().root_indices();
911        let expected = Arc::new(
912            to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default())
913                .project(projection)
914                .unwrap(),
915        );
916
917        assert_eq!(expected, output_schema);
918    }
919}